Skip to content
Permalink
Browse files
feat: Add support for server-side flow control (#143)
* chore: Remove notes about ordering keys being experimental.

* Revert "chore: Remove notes about ordering keys being experimental."

This reverts commit 38b2a3e.

* feat: Add support for server-side flow control

* Add unit test for flow control
  • Loading branch information
kamalaboulhosn committed Jul 7, 2020
1 parent 1cb6746 commit 04e261c602a2919cc75b3efa3dab099fb2cf704c
Showing with 11 additions and 0 deletions.
  1. +2 −0 google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
  2. +9 −0 tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
@@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
subscription=self._subscription,
client_id=self._client_id,
max_outstanding_messages=self._flow_control.max_messages,
max_outstanding_bytes=self._flow_control.max_bytes,
)

# Return the initial request.
@@ -159,6 +159,15 @@ def test_client_id():
assert client_id_1 != client_id_2


def test_streaming_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
assert request.max_outstanding_messages == 10
assert request.max_outstanding_bytes == 1000


def test_ack_deadline_with_max_duration_per_lease_extension():
manager = make_manager()
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)

0 comments on commit 04e261c

Please sign in to comment.