Skip to content
Permalink
Browse files
feat: Enable server side flow control by default with the option to t…
…urn it off (#231)

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server. If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature --in case they encouter issues with server side flow control-- can pass in use_legacy_flow_control=True in SubscriberClient.subscribe().

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server.
If flow_control.max_messages > 0 or flow_control.max_bytes > 0, flow control will be enforced
at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature
--in case they encouter issues with server side flow control-- can pass in
use_legacy_flow_control=true in subscriberclient.subscribe().

Co-authored-by: Tianzi Cai <tianzi@google.com>
  • Loading branch information
fayssalmartanigcp and anguillanneuf committed Nov 10, 2020
1 parent b6d9bd7 commit 94d738c07c6404a152c6729f5ba4b106b1fe9355
@@ -105,6 +105,9 @@ class StreamingPullManager(object):
``projects/{project}/subscriptions/{subscription}``.
flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
control settings.
use_legacy_flow_control (bool): Disables enforcing flow control settings
at the Cloud PubSub server and uses the less accurate method of only
enforcing flow control at the client side.
scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler
to use to process messages. If not provided, a thread pool-based
scheduler will be used.
@@ -115,11 +118,17 @@ class StreamingPullManager(object):
RPC instead of over the streaming RPC."""

def __init__(
self, client, subscription, flow_control=types.FlowControl(), scheduler=None
self,
client,
subscription,
flow_control=types.FlowControl(),
scheduler=None,
use_legacy_flow_control=False,
):
self._client = client
self._subscription = subscription
self._flow_control = flow_control
self._use_legacy_flow_control = use_legacy_flow_control
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline = 10
@@ -587,8 +596,12 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
max_outstanding_messages=self._flow_control.max_messages,
max_outstanding_bytes=self._flow_control.max_bytes,
max_outstanding_messages=(
0 if self._use_legacy_flow_control else self._flow_control.max_messages
),
max_outstanding_bytes=(
0 if self._use_legacy_flow_control else self._flow_control.max_bytes
),
)

# Return the initial request.
@@ -157,7 +157,14 @@ def api(self):
"""The underlying gapic API client."""
return self._api

def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
def subscribe(
self,
subscription,
callback,
flow_control=(),
scheduler=None,
use_legacy_flow_control=False,
):
"""Asynchronously start receiving messages on a given subscription.
This method starts a background thread to begin pulling messages from
@@ -179,6 +186,10 @@ def subscribe(self, subscription, callback, flow_control=(), scheduler=None):
settings may lead to faster throughput for messages that do not take
a long time to process.
The ``use_legacy_flow_control`` argument disables enforcing flow control
settings at the Cloud PubSub server and uses the less accurate method of
only enforcing flow control at the client side.
This method starts the receiver in the background and returns a
*Future* representing its execution. Waiting on the future (calling
``result()``) will block forever or until a non-recoverable error
@@ -238,7 +249,11 @@ def callback(message):
flow_control = types.FlowControl(*flow_control)

manager = streaming_pull_manager.StreamingPullManager(
self, subscription, flow_control=flow_control, scheduler=scheduler
self,
subscription,
flow_control=flow_control,
scheduler=scheduler,
use_legacy_flow_control=use_legacy_flow_control,
)

future = futures.StreamingPullFuture(manager)
@@ -170,6 +170,16 @@ def test_streaming_flow_control():
assert request.max_outstanding_bytes == 1000


def test_streaming_flow_control_use_legacy_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
use_legacy_flow_control=True,
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
assert request.max_outstanding_messages == 0
assert request.max_outstanding_bytes == 0


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)

0 comments on commit 94d738c

Please sign in to comment.