From f067af348b8d3deb72981c58d942e887c0efb5ff Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Date: Fri, 11 Nov 2022 10:38:38 -0500 Subject: [PATCH] fix: remove suboptimal logic in leasing behavior (#816) * fix: subtract time spent leasing from max snooze value * Revert "fix: subtract time spent leasing from max snooze value" This reverts commit 01f7ff4319508bc570dd8a335ffe2968102157b7. * fix: remove suboptimal list operations in leasing * remove typing * add default_deadline as separate argument to send_unary_modack * remove unused import * fix test_streaming_pull_manager * fix test_streaming_pull_manager lint * drop expired_ack_ids from lease management * add return value to _send_lease_modacks in unit tests * remove unused import * addressing comments * fix comment * fix modify_deadline_seconds generator * fix modify_deadline_seconds generator * fix subscripting in streaming_pull_manager * fix mypy checks * fix mypy checks * fix lint --- .../subscriber/_protocol/dispatcher.py | 38 +++++++---- .../pubsub_v1/subscriber/_protocol/leaser.py | 25 ++++++- .../_protocol/streaming_pull_manager.py | 65 +++++++++++-------- .../pubsub_v1/subscriber/test_dispatcher.py | 65 +++++++++++++++---- .../unit/pubsub_v1/subscriber/test_leaser.py | 49 ++++++++++++++ .../subscriber/test_streaming_pull_manager.py | 42 ++++++++++-- 6 files changed, 228 insertions(+), 56 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index ed2f5d217..15ad4abb3 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -319,7 +319,11 @@ def lease(self, items: Sequence[requests.LeaseRequest]) -> None: self._manager.leaser.add(items) self._manager.maybe_pause_consumer() - def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: + def modify_ack_deadline( + self, + items: Sequence[requests.ModAckRequest], + default_deadline: Optional[float] = None, + ) -> None: """Modify the ack deadline for the given messages. Args: @@ -337,16 +341,28 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } - # no further work needs to be done for `requests_to_retry` - requests_completed, requests_to_retry = self._manager.send_unary_modack( - modify_deadline_ack_ids=list( - itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) - ), - modify_deadline_seconds=list( - itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) - ), - ack_reqs_dict=ack_reqs_dict, - ) + requests_to_retry: List[requests.ModAckRequest] + if default_deadline is None: + # no further work needs to be done for `requests_to_retry` + _, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=list( + itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE) + ), + modify_deadline_seconds=list( + itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE) + ), + ack_reqs_dict=ack_reqs_dict, + default_deadline=None, + ) + else: + _, requests_to_retry = self._manager.send_unary_modack( + modify_deadline_ack_ids=itertools.islice( + ack_ids_gen, _ACK_IDS_BATCH_SIZE + ), + modify_deadline_seconds=None, + ack_reqs_dict=ack_reqs_dict, + default_deadline=default_deadline, + ) assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried." diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 508f4d7ce..16018e384 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -187,6 +187,7 @@ def maintain_leases(self) -> None: # We do not actually call `modify_ack_deadline` over and over # because it is more efficient to make a single request. ack_ids = leased_messages.keys() + expired_ack_ids = set() if ack_ids: _LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids)) @@ -197,8 +198,25 @@ def maintain_leases(self) -> None: # is inactive. assert self._manager.dispatcher is not None ack_id_gen = (ack_id for ack_id in ack_ids) - self._manager._send_lease_modacks(ack_id_gen, deadline) + expired_ack_ids = self._manager._send_lease_modacks( + ack_id_gen, deadline + ) + start_time = time.time() + # If exactly once delivery is enabled, we should drop all expired ack_ids from lease management. + if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids): + assert self._manager.dispatcher is not None + self._manager.dispatcher.drop( + [ + requests.DropRequest( + ack_id, + leased_messages.get(ack_id).size, # type: ignore + leased_messages.get(ack_id).ordering_key, # type: ignore + ) + for ack_id in expired_ack_ids + if ack_id in leased_messages + ] + ) # Now wait an appropriate period of time and do this again. # # We determine the appropriate period of time based on a random @@ -208,7 +226,10 @@ def maintain_leases(self) -> None: # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9) + # If we spent any time iterating over expired acks, we should subtract this from the deadline. + snooze = random.uniform( + _MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time)) + ) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 89dc93e74..13974ebe4 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -20,7 +20,7 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple +from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple import uuid import grpc # type: ignore @@ -686,7 +686,11 @@ def send_unary_ack( return requests_completed, requests_to_retry def send_unary_modack( - self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict + self, + modify_deadline_ack_ids, + modify_deadline_seconds, + ack_reqs_dict, + default_deadline=None, ) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]: """Send a request using a separate unary request instead of over the stream. @@ -694,22 +698,32 @@ def send_unary_modack( error is re-raised. """ assert modify_deadline_ack_ids + # Either we have a generator or a single deadline. + assert modify_deadline_seconds is None or default_deadline is None error_status = None modack_errors_dict = None try: - # Send ack_ids with the same deadline seconds together. - deadline_to_ack_ids = collections.defaultdict(list) - - for n, ack_id in enumerate(modify_deadline_ack_ids): - deadline = modify_deadline_seconds[n] - deadline_to_ack_ids[deadline].append(ack_id) - - for deadline, ack_ids in deadline_to_ack_ids.items(): + if default_deadline is None: + # Send ack_ids with the same deadline seconds together. + deadline_to_ack_ids = collections.defaultdict(list) + + for n, ack_id in enumerate(modify_deadline_ack_ids): + deadline = modify_deadline_seconds[n] + deadline_to_ack_ids[deadline].append(ack_id) + + for deadline, ack_ids in deadline_to_ack_ids.items(): + self._client.modify_ack_deadline( + subscription=self._subscription, + ack_ids=ack_ids, + ack_deadline_seconds=deadline, + ) + else: + # We can send all requests with the default deadline. self._client.modify_ack_deadline( subscription=self._subscription, - ack_ids=ack_ids, - ack_deadline_seconds=deadline, + ack_ids=modify_deadline_ack_ids, + ack_deadline_seconds=default_deadline, ) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( @@ -990,21 +1004,20 @@ def _get_initial_request( def _send_lease_modacks( self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True - ) -> List[str]: + ) -> Set[str]: exactly_once_enabled = False with self._exactly_once_enabled_lock: exactly_once_enabled = self._exactly_once_enabled if exactly_once_enabled: - items = [] - for ack_id in ack_ids: - future = futures.Future() - request = requests.ModAckRequest(ack_id, ack_deadline, future) - items.append(request) + items = [ + requests.ModAckRequest(ack_id, ack_deadline, futures.Future()) + for ack_id in ack_ids + ] assert self._dispatcher is not None - self._dispatcher.modify_ack_deadline(items) + self._dispatcher.modify_ack_deadline(items, ack_deadline) - expired_ack_ids = [] + expired_ack_ids = set() for req in items: try: assert req.future is not None @@ -1019,7 +1032,7 @@ def _send_lease_modacks( exc_info=True, ) if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: - expired_ack_ids.append(req.ack_id) + expired_ack_ids.add(req.ack_id) return expired_ack_ids else: items = [ @@ -1027,8 +1040,8 @@ def _send_lease_modacks( for ack_id in ack_ids ] assert self._dispatcher is not None - self._dispatcher.modify_ack_deadline(items) - return [] + self._dispatcher.modify_ack_deadline(items, ack_deadline) + return set() def _exactly_once_delivery_enabled(self) -> bool: """Whether exactly-once delivery is enabled for the subscription.""" @@ -1082,10 +1095,8 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # modack the messages we received, as this tells the server that we've # received them. ack_id_gen = (message.ack_id for message in received_messages) - expired_ack_ids = set( - self._send_lease_modacks( - ack_id_gen, self.ack_deadline, warn_on_invalid=False - ) + expired_ack_ids = self._send_lease_modacks( + ack_id_gen, self.ack_deadline, warn_on_invalid=False ) with self._pause_resume_lock: diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index a5107fe7b..d4813911c 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -645,16 +645,20 @@ def test_nack(): ] manager.send_unary_modack.return_value = (items, []) dispatcher_.nack(items) + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 1 - manager.send_unary_modack.assert_called_once_with( - modify_deadline_ack_ids=["ack_id_string"], - modify_deadline_seconds=[0], - ack_reqs_dict={ + for call in calls: + modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"] + assert list(modify_deadline_ack_ids) == ["ack_id_string"] + modify_deadline_seconds = call[1]["modify_deadline_seconds"] + assert list(modify_deadline_seconds) == [0] + ack_reqs_dict = call[1]["ack_reqs_dict"] + assert ack_reqs_dict == { "ack_id_string": requests.ModAckRequest( ack_id="ack_id_string", seconds=0, future=None ) - }, - ) + } def test_modify_ack_deadline(): @@ -666,12 +670,16 @@ def test_modify_ack_deadline(): items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60, future=None)] manager.send_unary_modack.return_value = (items, []) dispatcher_.modify_ack_deadline(items) + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 1 - manager.send_unary_modack.assert_called_once_with( - modify_deadline_ack_ids=["ack_id_string"], - modify_deadline_seconds=[60], - ack_reqs_dict={"ack_id_string": items[0]}, - ) + for call in calls: + modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"] + assert list(modify_deadline_ack_ids) == ["ack_id_string"] + modify_deadline_seconds = call[1]["modify_deadline_seconds"] + assert list(modify_deadline_seconds) == [60] + ack_reqs_dict = call[1]["ack_reqs_dict"] + assert ack_reqs_dict == {"ack_id_string": items[0]} def test_modify_ack_deadline_splitting_large_payload(): @@ -695,7 +703,7 @@ def test_modify_ack_deadline_splitting_large_payload(): sent_ack_ids = collections.Counter() for call in calls: - modack_ackids = call[1]["modify_deadline_ack_ids"] + modack_ackids = list(call[1]["modify_deadline_ack_ids"]) assert len(modack_ackids) <= dispatcher._ACK_IDS_BATCH_SIZE sent_ack_ids.update(modack_ackids) @@ -703,6 +711,39 @@ def test_modify_ack_deadline_splitting_large_payload(): assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once +def test_modify_ack_deadline_splitting_large_payload_with_default_deadline(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 176 bytes) + requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60, future=None) + for i in range(5001) + ] + manager.send_unary_modack.return_value = (items, []) + dispatcher_.modify_ack_deadline(items, 60) + + calls = manager.send_unary_modack.call_args_list + assert len(calls) == 6 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = collections.Counter() + + for call in calls: + modack_ackids = list(call[1]["modify_deadline_ack_ids"]) + modack_deadline_seconds = call[1]["modify_deadline_seconds"] + default_deadline = call[1]["default_deadline"] + assert len(list(modack_ackids)) <= dispatcher._ACK_IDS_BATCH_SIZE + assert modack_deadline_seconds is None + assert default_deadline == 60 + sent_ack_ids.update(modack_ackids) + + assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed + assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once + + @mock.patch("threading.Thread", autospec=True) def test_start(thread): manager = mock.create_autospec( diff --git a/tests/unit/pubsub_v1/subscriber/test_leaser.py b/tests/unit/pubsub_v1/subscriber/test_leaser.py index 7e11e3ccb..f38717c6f 100644 --- a/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -105,6 +105,7 @@ def test_maintain_leases_inactive_manager(caplog): [requests.LeaseRequest(ack_id="my_ack_ID", byte_size=42, ordering_key="")] ) + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() # Leases should still be maintained even if the manager is inactive. @@ -119,6 +120,7 @@ def test_maintain_leases_stopped(caplog): leaser_ = leaser.Leaser(manager) leaser_.stop() + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() assert "exiting" in caplog.text @@ -142,6 +144,7 @@ def test_maintain_leases_ack_ids(): [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] ) + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() assert len(manager._send_lease_modacks.mock_calls) == 1 @@ -151,6 +154,51 @@ def test_maintain_leases_ack_ids(): assert call.args[1] == 10 +def test_maintain_leases_expired_ack_ids_ignored(): + manager = create_manager() + leaser_ = leaser.Leaser(manager) + make_sleep_mark_event_as_done(leaser_) + leaser_.add( + [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] + ) + manager._exactly_once_delivery_enabled.return_value = False + manager._send_lease_modacks.return_value = set(["my ack id"]) + leaser_.maintain_leases() + + assert len(manager._send_lease_modacks.mock_calls) == 1 + + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["my ack id"] + assert call.args[1] == 10 + + +def test_maintain_leases_expired_ack_ids_exactly_once(): + manager = create_manager() + leaser_ = leaser.Leaser(manager) + make_sleep_mark_event_as_done(leaser_) + leaser_.add( + [requests.LeaseRequest(ack_id="my ack id", byte_size=50, ordering_key="")] + ) + manager._exactly_once_delivery_enabled.return_value = True + manager._send_lease_modacks.return_value = set(["my ack id"]) + leaser_.maintain_leases() + + assert len(manager._send_lease_modacks.mock_calls) == 1 + + call = manager._send_lease_modacks.mock_calls[0] + ack_ids = list(call.args[0]) + assert ack_ids == ["my ack id"] + assert call.args[1] == 10 + + assert len(manager.dispatcher.drop.mock_calls) == 1 + call = manager.dispatcher.drop.mock_calls[0] + drop_requests = list(call.args[0]) + assert drop_requests[0].ack_id == "my ack id" + assert drop_requests[0].byte_size == 50 + assert drop_requests[0].ordering_key == "" + + def test_maintain_leases_no_ack_ids(): manager = create_manager() leaser_ = leaser.Leaser(manager) @@ -187,6 +235,7 @@ def test_maintain_leases_outdated_items(time): # Now make sure time reports that we are past the end of our timeline. time.return_value = manager.flow_control.max_lease_duration + 1 + manager._send_lease_modacks.return_value = set() leaser_.maintain_leases() # ack2, ack3, and ack4 should be renewed. ack1 should've been dropped diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 1f28b3f40..e01299ef9 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -674,6 +674,33 @@ def test_send_unary_modack(): ) +def test_send_unary_modack_default_deadline(): + manager = make_manager() + + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=None), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=None), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=None), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=None, + ack_reqs_dict=ack_reqs_dict, + default_deadline=10, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3", "ack_id4", "ack_id5"], + ack_deadline_seconds=10, + ), + ], + any_order=True, + ) + + def test_send_unary_modack_exactly_once_enabled_with_futures(): manager = make_manager() manager._exactly_once_enabled = True @@ -1460,7 +1487,8 @@ def test__on_response_modifies_ack_deadline(): [ requests.ModAckRequest("ack_1", 18, None), requests.ModAckRequest("ack_2", 18, None), - ] + ], + 18, ) @@ -1521,6 +1549,7 @@ def test__on_response_modifies_ack_deadline_with_exactly_once_min_lease(): requests.ModAckRequest("ack_1", 10, None), requests.ModAckRequest("ack_2", 10, None), ] + assert call.args[1] == 10 # exactly_once should be enabled after this request b/c subscription_properties says so manager._on_response(response2) @@ -1534,6 +1563,8 @@ def test__on_response_modifies_ack_deadline_with_exactly_once_min_lease(): assert modack_reqs[0].seconds == 60 assert modack_reqs[1].ack_id == "ack_4" assert modack_reqs[1].seconds == 60 + modack_deadline = call.args[1] + assert modack_deadline == 60 def test__on_response_send_ack_deadline_after_enabling_exactly_once(): @@ -1610,7 +1641,8 @@ def test__on_response_no_leaser_overload(): [ requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), - ] + ], + 10, ) schedule_calls = scheduler.schedule.mock_calls @@ -1660,7 +1692,8 @@ def test__on_response_with_leaser_overload(): requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), requests.ModAckRequest("zack", 10, None), - ] + ], + 10, ) # one message should be scheduled, the flow control limits allow for it @@ -1740,7 +1773,8 @@ def test__on_response_with_ordering_keys(): requests.ModAckRequest("fack", 10, None), requests.ModAckRequest("back", 10, None), requests.ModAckRequest("zack", 10, None), - ] + ], + 10, ) # The first two messages should be scheduled, The third should be put on