diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 508f4d7ce..2d9a360f5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -147,6 +147,7 @@ def maintain_leases(self) -> None: repeats. """ while not self._stop_event.is_set(): + start_time = time.time() # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. @@ -204,11 +205,14 @@ def maintain_leases(self) -> None: # We determine the appropriate period of time based on a random # period between: # minimum: MAX_BATCH_LATENCY (to prevent duplicate modacks being created in one batch) - # maximum: 90% of the deadline + # maximum: 90% of the deadline, + # minus the time spent since the start of this while loop. # This maximum time attempts to prevent ack expiration before new lease modacks arrive at the server. # This use of jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases # where there are many clients. - snooze = random.uniform(_MAX_BATCH_LATENCY, deadline * 0.9) + snooze = random.uniform( + _MAX_BATCH_LATENCY, (deadline * 0.9) - (start_time - time.time()) + ) _LOGGER.debug("Snoozing lease management for %f seconds.", snooze) self._stop_event.wait(timeout=snooze)