Skip to content

Commit

Permalink
fix: ensure bytes cannot go negative on requests sent to the server (#…
Browse files Browse the repository at this point in the history
…300)

This can happen if the server sends a message that goes over the byte limit, which should never happen, but apparently does in edge cases. This ensures that the client doesn't fail if this happens.

fixes: #294
  • Loading branch information
dpcollins-google committed Feb 18, 2022
1 parent f2dc815 commit 6d3690a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
15 changes: 13 additions & 2 deletions google/cloud/pubsublite/internal/wire/flow_control_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
_MAX_INT64 = 0x7FFFFFFFFFFFFFFF


def _clamp(val: int):
if val > _MAX_INT64:
return _MAX_INT64
if val < 0:
return 0
return val


class _AggregateRequest:
_request: FlowControlRequest.meta.pb

Expand All @@ -39,10 +47,13 @@ def __add__(self, other: FlowControlRequest):
return self

def to_optional(self) -> Optional[FlowControlRequest]:
if self._request.allowed_messages == 0 and self._request.allowed_bytes == 0:
allowed_messages = _clamp(self._request.allowed_messages)
allowed_bytes = _clamp(self._request.allowed_bytes)
if allowed_messages == 0 and allowed_bytes == 0:
return None
request = FlowControlRequest()
request._pb = self._request
request._pb.allowed_messages = allowed_messages
request._pb.allowed_bytes = allowed_bytes
return request


Expand Down
14 changes: 14 additions & 0 deletions tests/unit/pubsublite/internal/wire/flow_control_batcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ def test_add_remove():
restart_2 = batcher.request_for_restart()
assert restart_2.allowed_bytes == 5
assert restart_2.allowed_messages == 1


def test_negative_bytes_not_negative_request():
batcher = FlowControlBatcher()
batcher.add(FlowControlRequest(allowed_bytes=10, allowed_messages=3))
restart_1 = batcher.request_for_restart()
assert restart_1.allowed_bytes == 10
assert restart_1.allowed_messages == 3
batcher.on_messages(
[SequencedMessage(size_bytes=10000), SequencedMessage(size_bytes=3)]
)
restart_2 = batcher.request_for_restart()
assert restart_2.allowed_bytes == 0
assert restart_2.allowed_messages == 1

0 comments on commit 6d3690a

Please sign in to comment.