Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set x-goog-request-params for streaming pull request #884

Merged
merged 11 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._stream_metadata = [
["x-goog-request-params", "subscription=" + subscription]
]

# If max_duration_per_lease_extension is the default
# we set the stream_ack_deadline to the default of 60
Expand Down Expand Up @@ -845,6 +848,7 @@ def open(
initial_request=get_initial_request,
should_recover=self._should_recover,
should_terminate=self._should_terminate,
metadata=self._stream_metadata,
throttle_reopen=True,
)
self._rpc.add_done_callback(self._on_rpc_done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def test__wrap_callback_errors_error():


def test_constructor_and_default_state():
mock.sentinel.subscription = str()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client, mock.sentinel.subscription
)
Expand All @@ -113,6 +114,7 @@ def test_constructor_and_default_state():


def test_constructor_with_default_options():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
Expand All @@ -128,6 +130,7 @@ def test_constructor_with_default_options():


def test_constructor_with_min_and_max_duration_per_lease_extension_():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
)
Expand All @@ -142,6 +145,7 @@ def test_constructor_with_min_and_max_duration_per_lease_extension_():


def test_constructor_with_min_duration_per_lease_extension_too_low():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
)
Expand All @@ -156,6 +160,7 @@ def test_constructor_with_min_duration_per_lease_extension_too_low():


def test_constructor_with_max_duration_per_lease_extension_too_high():
mock.sentinel.subscription = str()
flow_control_ = types.FlowControl(
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
)
Expand Down Expand Up @@ -1181,6 +1186,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
initial_request=mock.ANY,
should_recover=manager._should_recover,
should_terminate=manager._should_terminate,
metadata=manager._stream_metadata,
throttle_reopen=True,
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
Expand Down