Skip to content

Commit

Permalink
fix: remove suboptimal logic in leasing behavior (#816)
Browse files Browse the repository at this point in the history
* fix: subtract time spent leasing from max snooze value

* Revert "fix: subtract time spent leasing from max snooze value"

This reverts commit 01f7ff4.

* fix: remove suboptimal list operations in leasing

* remove typing

* add default_deadline as separate argument to send_unary_modack

* remove unused import

* fix test_streaming_pull_manager

* fix test_streaming_pull_manager lint

* drop expired_ack_ids from lease management

* add return value to _send_lease_modacks in unit tests

* remove unused import

* addressing comments

* fix comment

* fix modify_deadline_seconds generator

* fix modify_deadline_seconds generator

* fix subscripting in streaming_pull_manager

* fix mypy checks

* fix mypy checks

* fix lint
  • Loading branch information
acocuzzo committed Nov 11, 2022
1 parent 10cfc05 commit f067af3
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 56 deletions.
38 changes: 27 additions & 11 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ def lease(self, items: Sequence[requests.LeaseRequest]) -> None:
self._manager.leaser.add(items)
self._manager.maybe_pause_consumer()

def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
def modify_ack_deadline(
self,
items: Sequence[requests.ModAckRequest],
default_deadline: Optional[float] = None,
) -> None:
"""Modify the ack deadline for the given messages.
Args:
Expand All @@ -337,16 +341,28 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
# no further work needs to be done for `requests_to_retry`
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
),
modify_deadline_seconds=list(
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
),
ack_reqs_dict=ack_reqs_dict,
)
requests_to_retry: List[requests.ModAckRequest]
if default_deadline is None:
# no further work needs to be done for `requests_to_retry`
_, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
),
modify_deadline_seconds=list(
itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)
),
ack_reqs_dict=ack_reqs_dict,
default_deadline=None,
)
else:
_, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=itertools.islice(
ack_ids_gen, _ACK_IDS_BATCH_SIZE
),
modify_deadline_seconds=None,
ack_reqs_dict=ack_reqs_dict,
default_deadline=default_deadline,
)
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."
Expand Down
25 changes: 23 additions & 2 deletions google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def maintain_leases(self) -> None:
# We do not actually call `modify_ack_deadline` over and over
# because it is more efficient to make a single request.
ack_ids = leased_messages.keys()
expired_ack_ids = set()
if ack_ids:
_LOGGER.debug("Renewing lease for %d ack IDs.", len(ack_ids))

Expand All @@ -197,8 +198,25 @@ def maintain_leases(self) -> None:
# is inactive.
assert self._manager.dispatcher is not None
ack_id_gen = (ack_id for ack_id in ack_ids)
self._manager._send_lease_modacks(ack_id_gen, deadline)
expired_ack_ids = self._manager._send_lease_modacks(
ack_id_gen, deadline
)

start_time = time.time()
# If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
assert self._manager.dispatcher is not None
self._manager.dispatcher.drop(
[
requests.DropRequest(
ack_id,
leased_messages.get(ack_id).size, # type: ignore
leased_messages.get(ack_id).ordering_key, # type: ignore
)
for ack_id in expired_ack_ids
if ack_id in leased_messages
]
)
# Now wait an appropriate period of time and do this again.
#
# We determine the appropriate period of time based on a random
Expand All @@ -208,7 +226,10 @@ def maintain_leases(self) -> None:
# This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server.
# This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
# where there are many clients.
snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9)
# If we spent any time iterating over expired acks, we should subtract this from the deadline.
snooze = random.uniform(
_MAX_BATCH_LATENCY, (deadline * 0.9 - (time.time() - start_time))
)
_LOGGER.debug("Snoozing lease management for %f seconds.", snooze)
self._stop_event.wait(timeout=snooze)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import threading
import typing
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple
from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
import uuid

import grpc # type: ignore
Expand Down Expand Up @@ -686,30 +686,44 @@ def send_unary_ack(
return requests_completed, requests_to_retry

def send_unary_modack(
self, modify_deadline_ack_ids, modify_deadline_seconds, ack_reqs_dict
self,
modify_deadline_ack_ids,
modify_deadline_seconds,
ack_reqs_dict,
default_deadline=None,
) -> Tuple[List[requests.ModAckRequest], List[requests.ModAckRequest]]:
"""Send a request using a separate unary request instead of over the stream.
If a RetryError occurs, the manager shutdown is triggered, and the
error is re-raised.
"""
assert modify_deadline_ack_ids
# Either we have a generator or a single deadline.
assert modify_deadline_seconds is None or default_deadline is None

error_status = None
modack_errors_dict = None
try:
# Send ack_ids with the same deadline seconds together.
deadline_to_ack_ids = collections.defaultdict(list)

for n, ack_id in enumerate(modify_deadline_ack_ids):
deadline = modify_deadline_seconds[n]
deadline_to_ack_ids[deadline].append(ack_id)

for deadline, ack_ids in deadline_to_ack_ids.items():
if default_deadline is None:
# Send ack_ids with the same deadline seconds together.
deadline_to_ack_ids = collections.defaultdict(list)

for n, ack_id in enumerate(modify_deadline_ack_ids):
deadline = modify_deadline_seconds[n]
deadline_to_ack_ids[deadline].append(ack_id)

for deadline, ack_ids in deadline_to_ack_ids.items():
self._client.modify_ack_deadline(
subscription=self._subscription,
ack_ids=ack_ids,
ack_deadline_seconds=deadline,
)
else:
# We can send all requests with the default deadline.
self._client.modify_ack_deadline(
subscription=self._subscription,
ack_ids=ack_ids,
ack_deadline_seconds=deadline,
ack_ids=modify_deadline_ack_ids,
ack_deadline_seconds=default_deadline,
)
except exceptions.GoogleAPICallError as exc:
_LOGGER.debug(
Expand Down Expand Up @@ -990,21 +1004,20 @@ def _get_initial_request(

def _send_lease_modacks(
self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True
) -> List[str]:
) -> Set[str]:
exactly_once_enabled = False
with self._exactly_once_enabled_lock:
exactly_once_enabled = self._exactly_once_enabled
if exactly_once_enabled:
items = []
for ack_id in ack_ids:
future = futures.Future()
request = requests.ModAckRequest(ack_id, ack_deadline, future)
items.append(request)
items = [
requests.ModAckRequest(ack_id, ack_deadline, futures.Future())
for ack_id in ack_ids
]

assert self._dispatcher is not None
self._dispatcher.modify_ack_deadline(items)
self._dispatcher.modify_ack_deadline(items, ack_deadline)

expired_ack_ids = []
expired_ack_ids = set()
for req in items:
try:
assert req.future is not None
Expand All @@ -1019,16 +1032,16 @@ def _send_lease_modacks(
exc_info=True,
)
if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID:
expired_ack_ids.append(req.ack_id)
expired_ack_ids.add(req.ack_id)
return expired_ack_ids
else:
items = [
requests.ModAckRequest(ack_id, self.ack_deadline, None)
for ack_id in ack_ids
]
assert self._dispatcher is not None
self._dispatcher.modify_ack_deadline(items)
return []
self._dispatcher.modify_ack_deadline(items, ack_deadline)
return set()

def _exactly_once_delivery_enabled(self) -> bool:
"""Whether exactly-once delivery is enabled for the subscription."""
Expand Down Expand Up @@ -1082,10 +1095,8 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# modack the messages we received, as this tells the server that we've
# received them.
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = set(
self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)

with self._pause_resume_lock:
Expand Down
65 changes: 53 additions & 12 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,16 +645,20 @@ def test_nack():
]
manager.send_unary_modack.return_value = (items, [])
dispatcher_.nack(items)
calls = manager.send_unary_modack.call_args_list
assert len(calls) == 1

manager.send_unary_modack.assert_called_once_with(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[0],
ack_reqs_dict={
for call in calls:
modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"]
assert list(modify_deadline_ack_ids) == ["ack_id_string"]
modify_deadline_seconds = call[1]["modify_deadline_seconds"]
assert list(modify_deadline_seconds) == [0]
ack_reqs_dict = call[1]["ack_reqs_dict"]
assert ack_reqs_dict == {
"ack_id_string": requests.ModAckRequest(
ack_id="ack_id_string", seconds=0, future=None
)
},
)
}


def test_modify_ack_deadline():
Expand All @@ -666,12 +670,16 @@ def test_modify_ack_deadline():
items = [requests.ModAckRequest(ack_id="ack_id_string", seconds=60, future=None)]
manager.send_unary_modack.return_value = (items, [])
dispatcher_.modify_ack_deadline(items)
calls = manager.send_unary_modack.call_args_list
assert len(calls) == 1

manager.send_unary_modack.assert_called_once_with(
modify_deadline_ack_ids=["ack_id_string"],
modify_deadline_seconds=[60],
ack_reqs_dict={"ack_id_string": items[0]},
)
for call in calls:
modify_deadline_ack_ids = call[1]["modify_deadline_ack_ids"]
assert list(modify_deadline_ack_ids) == ["ack_id_string"]
modify_deadline_seconds = call[1]["modify_deadline_seconds"]
assert list(modify_deadline_seconds) == [60]
ack_reqs_dict = call[1]["ack_reqs_dict"]
assert ack_reqs_dict == {"ack_id_string": items[0]}


def test_modify_ack_deadline_splitting_large_payload():
Expand All @@ -695,14 +703,47 @@ def test_modify_ack_deadline_splitting_large_payload():
sent_ack_ids = collections.Counter()

for call in calls:
modack_ackids = call[1]["modify_deadline_ack_ids"]
modack_ackids = list(call[1]["modify_deadline_ack_ids"])
assert len(modack_ackids) <= dispatcher._ACK_IDS_BATCH_SIZE
sent_ack_ids.update(modack_ackids)

assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once


def test_modify_ack_deadline_splitting_large_payload_with_default_deadline():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

items = [
# use realistic lengths for ACK IDs (max 176 bytes)
requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60, future=None)
for i in range(5001)
]
manager.send_unary_modack.return_value = (items, [])
dispatcher_.modify_ack_deadline(items, 60)

calls = manager.send_unary_modack.call_args_list
assert len(calls) == 6

all_ack_ids = {item.ack_id for item in items}
sent_ack_ids = collections.Counter()

for call in calls:
modack_ackids = list(call[1]["modify_deadline_ack_ids"])
modack_deadline_seconds = call[1]["modify_deadline_seconds"]
default_deadline = call[1]["default_deadline"]
assert len(list(modack_ackids)) <= dispatcher._ACK_IDS_BATCH_SIZE
assert modack_deadline_seconds is None
assert default_deadline == 60
sent_ack_ids.update(modack_ackids)

assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once


@mock.patch("threading.Thread", autospec=True)
def test_start(thread):
manager = mock.create_autospec(
Expand Down

0 comments on commit f067af3

Please sign in to comment.