Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 37 additions & 27 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
['ack_id', 'byte_size'],
)

_LeasedMessage = collections.namedtuple(
'_LeasedMessage',
['added_time', 'size'])


@six.add_metaclass(abc.ABCMeta)
class BasePolicy(object):
Expand Down Expand Up @@ -92,8 +96,6 @@ class BasePolicy(object):
your own dictionary class, ensure this assumption holds
or you will get strange behavior.
"""

_managed_ack_ids = None
_RETRYABLE_STREAM_ERRORS = (
exceptions.DeadlineExceeded,
exceptions.ServiceUnavailable,
Expand All @@ -109,6 +111,10 @@ def __init__(self, client, subscription,
self._future = None
self.flow_control = flow_control
self.histogram = _histogram.Histogram(data=histogram_data)
""".Histogram: the histogram tracking ack latency."""
self.leased_messages = {}
"""dict[str, float]: A mapping of ack IDs to the local time when the
ack ID was initially leased in seconds since the epoch."""

# These are for internal flow control tracking.
# They should not need to be used by subclasses.
Expand Down Expand Up @@ -144,17 +150,6 @@ def future(self):
"""
return self._future

@property
def managed_ack_ids(self):
"""Return the ack IDs currently being managed by the policy.

Returns:
set: The set of ack IDs being managed.
"""
if self._managed_ack_ids is None:
self._managed_ack_ids = set()
return self._managed_ack_ids

@property
def subscription(self):
"""Return the subscription.
Expand Down Expand Up @@ -182,7 +177,7 @@ def _load(self):
float: The load value.
"""
return max([
len(self.managed_ack_ids) / self.flow_control.max_messages,
len(self.leased_messages) / self.flow_control.max_messages,
self._bytes / self.flow_control.max_bytes,
self._consumer.pending_requests / self.flow_control.max_requests
])
Expand Down Expand Up @@ -252,11 +247,10 @@ def drop(self, items):
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if item.ack_id in self.managed_ack_ids:
self.managed_ack_ids.remove(item.ack_id)
if self.leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug('Item %s wasn\'t managed', item.ack_id)
_LOGGER.debug('Item %s was not managed.', item.ack_id)

if self._bytes < 0:
_LOGGER.debug(
Expand Down Expand Up @@ -290,7 +284,7 @@ def get_initial_request(self, ack_queue=False):
# Any ack IDs that are under lease management and not being acked
# need to have their deadline extended immediately.
ack_ids = set()
lease_ids = self.managed_ack_ids
lease_ids = set(self.leased_messages.keys())
if ack_queue:
ack_ids = self._ack_on_resume
lease_ids = lease_ids.difference(ack_ids)
Expand Down Expand Up @@ -321,8 +315,10 @@ def lease(self, items):
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self.managed_ack_ids:
self.managed_ack_ids.add(item.ack_id)
if item.ack_id not in self.leased_messages:
self.leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(),
size=item.byte_size)
self._bytes += item.byte_size
else:
_LOGGER.debug(
Expand Down Expand Up @@ -361,22 +357,36 @@ def maintain_leases(self):
p99 = self.histogram.percentile(99)
_LOGGER.debug('The current p99 value is %d seconds.', p99)

# Drop any leases that are well beyond max lease time. This
# ensures that in the event of a badly behaving actor, we can
# drop messages and allow Pub/Sub to resend them.
cutoff = time.time() - self.flow_control.max_lease_duration
to_drop = [
DropRequest(ack_id, item.size)
for ack_id, item
in six.iteritems(self.leased_messages)
if item.added_time < cutoff]

if to_drop:
_LOGGER.warning(
'Dropping %s items because they were leased too long.',
len(to_drop))
self.drop(to_drop)

# Create a streaming pull request.
# We do not actually call `modify_ack_deadline` over and over
# because it is more efficient to make a single request.
ack_ids = list(self.managed_ack_ids)
_LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))
ack_ids = list(self.leased_messages.keys())
if ack_ids:
request = types.StreamingPullRequest(
modify_deadline_ack_ids=ack_ids,
modify_deadline_seconds=[p99] * len(ack_ids),
)
_LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))

# NOTE: This may not work as expected if ``consumer.active``

This comment was marked as spam.

This comment was marked as spam.

# has changed since we checked it. An implementation
# without any sort of race condition would require a
# way for ``send_request`` to fail when the consumer
# is inactive.
self._consumer.send_request(request)
self.modify_ack_deadline([
ModAckRequest(ack_id, p99) for ack_id in ack_ids])

# Now wait an appropriate period of time and do this again.
#
Expand Down
4 changes: 3 additions & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
FlowControl = collections.namedtuple(
'FlowControl',
['max_bytes', 'max_messages', 'resume_threshold', 'max_requests',
'max_request_batch_size', 'max_request_batch_latency'],
'max_request_batch_size', 'max_request_batch_latency',
'max_lease_duration'],

This comment was marked as spam.

This comment was marked as spam.

)
FlowControl.__new__.__defaults__ = (
psutil.virtual_memory().total * 0.2, # max_bytes: 20% of total RAM
Expand All @@ -63,6 +64,7 @@
100, # max_requests: 100
100, # max_request_batch_size: 100
0.01, # max_request_batch_latency: 0.01s
2 * 60 * 60, # max_lease_duration: 2 hours.
)


Expand Down
64 changes: 52 additions & 12 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ def test_get_initial_request():
assert initial_request.stream_ack_deadline_seconds == 10


def test_managed_ack_ids():
def test_leased_messagess():
policy = create_policy()

# Ensure we always get a set back, even if the property is not yet set.
managed_ack_ids = policy.managed_ack_ids
assert isinstance(managed_ack_ids, set)
leased_messages = policy.leased_messages
assert isinstance(leased_messages, dict)

# Ensure that multiple calls give the same actual object back.
assert managed_ack_ids is policy.managed_ack_ids
assert leased_messages is policy.leased_messages


def test_subscription():
Expand Down Expand Up @@ -130,25 +130,25 @@ def test_call_rpc():

def test_drop():
policy = create_policy()
policy.managed_ack_ids.add('ack_id_string')
policy.leased_messages['ack_id_string'] = 0
policy._bytes = 20
policy.drop([base.DropRequest(ack_id='ack_id_string', byte_size=20)])
assert len(policy.managed_ack_ids) == 0
assert len(policy.leased_messages) == 0
assert policy._bytes == 0

# Do this again to establish idempotency.
policy.drop([base.DropRequest(ack_id='ack_id_string', byte_size=20)])
assert len(policy.managed_ack_ids) == 0
assert len(policy.leased_messages) == 0
assert policy._bytes == 0


@mock.patch.object(base, '_LOGGER', spec=logging.Logger)
def test_drop_unexpected_negative(_LOGGER):
policy = create_policy()
policy.managed_ack_ids.add('ack_id_string')
policy.leased_messages['ack_id_string'] = 0
policy._bytes = 0
policy.drop([base.DropRequest(ack_id='ack_id_string', byte_size=20)])
assert len(policy.managed_ack_ids) == 0
assert len(policy.leased_messages) == 0
assert policy._bytes == 0
_LOGGER.debug.assert_called_once_with(
'Bytes was unexpectedly negative: %d', -20)
Expand All @@ -161,7 +161,7 @@ def test_drop_below_threshold():
the flow control thresholds, it should resume.
"""
policy = create_policy()
policy.managed_ack_ids.add('ack_id_string')
policy.leased_messages['ack_id_string'] = 0
num_bytes = 20
policy._bytes = num_bytes
consumer = policy._consumer
Expand Down Expand Up @@ -290,15 +290,55 @@ def trigger_inactive(seconds):
sleep.assert_called()


@mock.patch.object(time, 'time', autospec=True)
@mock.patch.object(time, 'sleep', autospec=True)
def test_maintain_leases_outdated_items(sleep, time):
policy = create_policy()
policy._consumer._stopped.clear()

# Add these items at the beginning of the timeline
time.return_value = 0
policy.lease([
base.LeaseRequest(ack_id='ack1', byte_size=50)])

# Add another item at towards end of the timeline
time.return_value = policy.flow_control.max_lease_duration - 1
policy.lease([
base.LeaseRequest(ack_id='ack2', byte_size=50)])

# Now make sure time reports that we are at the end of our timeline.
time.return_value = policy.flow_control.max_lease_duration + 1

# Mock the sleep object.
def trigger_inactive(seconds):
assert 0 < seconds < 10
policy._consumer._stopped.set()

sleep.side_effect = trigger_inactive

# Also mock the consumer, which sends the request.
with mock.patch.object(policy._consumer, 'send_request') as send:
policy.maintain_leases()

# Only ack2 should be renewed. ack1 should've been dropped
send.assert_called_once_with(types.StreamingPullRequest(
modify_deadline_ack_ids=['ack2'],
modify_deadline_seconds=[10],
))
assert len(policy.leased_messages) == 1

sleep.assert_called()


def test_lease():
policy = create_policy()
policy.lease([base.LeaseRequest(ack_id='ack_id_string', byte_size=20)])
assert len(policy.managed_ack_ids) == 1
assert len(policy.leased_messages) == 1
assert policy._bytes == 20

# Do this again to prove idempotency.
policy.lease([base.LeaseRequest(ack_id='ack_id_string', byte_size=20)])
assert len(policy.managed_ack_ids) == 1
assert len(policy.leased_messages) == 1
assert policy._bytes == 20


Expand Down