diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py new file mode 100644 index 000000000000..05157b5e8db1 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -0,0 +1,187 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import collections +import copy +import logging +import random +import threading +import time + +import six + +from google.cloud.pubsub_v1.subscriber._protocol import requests + + +_LOGGER = logging.getLogger(__name__) +_LEASE_WORKER_NAME = 'Thread-LeaseMaintainer' + + +_LeasedMessage = collections.namedtuple( + '_LeasedMessage', + ['added_time', 'size']) + + +class Leaser(object): + def __init__(self, subscriber): + self._thread = None + self._subscriber = subscriber + + self._leased_messages = {} + """dict[str, float]: A mapping of ack IDs to the local time when the + ack ID was initially leased in seconds since the epoch.""" + self._bytes = 0 + """int: The total number of bytes consumed by leased messages.""" + + self._stop_event = threading.Event() + + @property + def message_count(self): + """int: The number of leased messages.""" + return len(self._leased_messages) + + @property + def ack_ids(self): + """Sequence[str]: The ack IDs of all leased messages.""" + return self._leased_messages.keys() + + @property + def bytes(self): + """int: The total size, in bytes, of all leased messages.""" + return self._bytes + + def add(self, items): + """Add messages to be managed by the leaser.""" + for item in items: + # Add the ack ID to the set of managed ack IDs, and increment + # the size counter. + if item.ack_id not in self._leased_messages: + self._leased_messages[item.ack_id] = _LeasedMessage( + added_time=time.time(), + size=item.byte_size) + self._bytes += item.byte_size + else: + _LOGGER.debug( + 'Message %s is already lease managed', item.ack_id) + + def remove(self, items): + """Remove messages from lease management.""" + # Remove the ack ID from lease management, and decrement the + # byte counter. + for item in items: + if self._leased_messages.pop(item.ack_id, None) is not None: + self._bytes -= item.byte_size + else: + _LOGGER.debug('Item %s was not managed.', item.ack_id) + + if self._bytes < 0: + _LOGGER.debug( + 'Bytes was unexpectedly negative: %d', self._bytes) + self._bytes = 0 + + def maintain_leases(self): + """Maintain all of the leases being managed by the subscriber. + + This method modifies the ack deadline for all of the managed + ack IDs, then waits for most of that time (but with jitter), and + repeats. + """ + while self._subscriber.is_active and not self._stop_event.is_set(): + # Determine the appropriate duration for the lease. This is + # based off of how long previous messages have taken to ack, with + # a sensible default and within the ranges allowed by Pub/Sub. + p99 = self._subscriber.ack_histogram.percentile(99) + _LOGGER.debug('The current p99 value is %d seconds.', p99) + + # Make a copy of the leased messages. This is needed because it's + # possible for another thread to modify the dictionary while + # we're iterating over it. + leased_messages = copy.copy(self._leased_messages) + + # Drop any leases that are well beyond max lease time. This + # ensures that in the event of a badly behaving actor, we can + # drop messages and allow Pub/Sub to resend them. + cutoff = ( + time.time() - + self._subscriber.flow_control.max_lease_duration) + to_drop = [ + requests.DropRequest(ack_id, item.size) + for ack_id, item + in six.iteritems(leased_messages) + if item.added_time < cutoff] + + if to_drop: + _LOGGER.warning( + 'Dropping %s items because they were leased too long.', + len(to_drop)) + self._subscriber.drop(to_drop) + + # Remove dropped items from our copy of the leased messages (they + # have already been removed from the real one by + # self._subscriber.drop(), which calls self.remove()). + for item in to_drop: + leased_messages.pop(item.ack_id) + + # Create a streaming pull request. + # We do not actually call `modify_ack_deadline` over and over + # because it is more efficient to make a single request. + ack_ids = leased_messages.keys() + if ack_ids: + _LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids)) + + # NOTE: This may not work as expected if ``consumer.active`` + # has changed since we checked it. An implementation + # without any sort of race condition would require a + # way for ``send_request`` to fail when the consumer + # is inactive. + self._subscriber.modify_ack_deadline([ + requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids]) + + # Now wait an appropriate period of time and do this again. + # + # We determine the appropriate period of time based on a random + # period between 0 seconds and 90% of the lease. This use of + # jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases + # where there are many clients. + snooze = random.uniform(0.0, p99 * 0.9) + _LOGGER.debug('Snoozing lease management for %f seconds.', snooze) + time.sleep(snooze) + + _LOGGER.info('%s exiting.', _LEASE_WORKER_NAME) + + def start(self): + if self._thread is not None: + raise ValueError('Leaser is already running.') + + # Create and start the helper thread. + self._stop_event.clear() + thread = threading.Thread( + name=_LEASE_WORKER_NAME, + target=self.maintain_leases) + thread.daemon = True + thread.start() + _LOGGER.debug('Started helper thread %s', thread.name) + self._thread = thread + + def stop(self): + self._stop_event.set() + + if self._thread is not None: + # The thread should automatically exit when the consumer is + # inactive. + self._thread.join() + + self._thread = None diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py b/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py index c3177f71d9e7..f1bc96808e60 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py @@ -49,6 +49,10 @@ def is_active(self): def flow_control(self): raise NotImplementedError + @property + def ack_histogram(self): + raise NotImplementedError + @property def future(self): raise NotImplementedError diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py new file mode 100644 index 000000000000..571e56f6b61a --- /dev/null +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -0,0 +1,235 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.subscriber import _histogram +from google.cloud.pubsub_v1.subscriber import subscriber +from google.cloud.pubsub_v1.subscriber._protocol import leaser +from google.cloud.pubsub_v1.subscriber._protocol import requests + +import mock +import pytest + + +def test_add_and_remove(): + leaser_ = leaser.Leaser(mock.sentinel.subscriber) + + leaser_.add([ + requests.LeaseRequest(ack_id='ack1', byte_size=50)]) + leaser_.add([ + requests.LeaseRequest(ack_id='ack2', byte_size=25)]) + + assert leaser_.message_count == 2 + assert set(leaser_.ack_ids) == set(['ack1', 'ack2']) + assert leaser_.bytes == 75 + + leaser_.remove([ + requests.DropRequest(ack_id='ack1', byte_size=50)]) + + assert leaser_.message_count == 1 + assert set(leaser_.ack_ids) == set(['ack2']) + assert leaser_.bytes == 25 + + +def test_add_already_managed(caplog): + caplog.set_level(logging.DEBUG) + + leaser_ = leaser.Leaser(mock.sentinel.subscriber) + + leaser_.add([ + requests.LeaseRequest(ack_id='ack1', byte_size=50)]) + leaser_.add([ + requests.LeaseRequest(ack_id='ack1', byte_size=50)]) + + assert 'already lease managed' in caplog.text + + +def test_remove_not_managed(caplog): + caplog.set_level(logging.DEBUG) + + leaser_ = leaser.Leaser(mock.sentinel.subscriber) + + leaser_.remove([ + requests.DropRequest(ack_id='ack1', byte_size=50)]) + + assert 'not managed' in caplog.text + + +def test_remove_negative_bytes(caplog): + caplog.set_level(logging.DEBUG) + + leaser_ = leaser.Leaser(mock.sentinel.subscriber) + + leaser_.add([ + requests.LeaseRequest(ack_id='ack1', byte_size=50)]) + leaser_.remove([ + requests.DropRequest(ack_id='ack1', byte_size=75)]) + + assert leaser_.bytes == 0 + assert 'unexpectedly negative' in caplog.text + + +def create_subscriber(flow_control=types.FlowControl()): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + subscriber_.is_active = True + subscriber_.flow_control = flow_control + subscriber_.ack_histogram = _histogram.Histogram() + return subscriber_ + + +def test_maintain_leases_inactive(caplog): + caplog.set_level(logging.INFO) + subscriber_ = create_subscriber() + subscriber_.is_active = False + + leaser_ = leaser.Leaser(subscriber_) + + leaser_.maintain_leases() + + assert 'exiting' in caplog.text + + +def test_maintain_leases_stopped(caplog): + caplog.set_level(logging.INFO) + subscriber_ = create_subscriber() + + leaser_ = leaser.Leaser(subscriber_) + leaser_.stop() + + leaser_.maintain_leases() + + assert 'exiting' in caplog.text + + +def make_sleep_mark_subscriber_as_inactive(sleep, subscriber): + # Make sleep mark the subscriber as inactive so that maintain_leases + # exits at the end of the first run. + def trigger_inactive(seconds): + assert 0 < seconds < 10 + subscriber.is_active = False + sleep.side_effect = trigger_inactive + + +@mock.patch('time.sleep', autospec=True) +def test_maintain_leases_ack_ids(sleep): + subscriber_ = create_subscriber() + make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) + leaser_ = leaser.Leaser(subscriber_) + leaser_.add([requests.LeaseRequest(ack_id='my ack id', byte_size=50)]) + + leaser_.maintain_leases() + + subscriber_.modify_ack_deadline.assert_called_once_with([ + requests.ModAckRequest( + ack_id='my ack id', + seconds=10, + ) + ]) + sleep.assert_called() + + +@mock.patch('time.sleep', autospec=True) +def test_maintain_leases_no_ack_ids(sleep): + subscriber_ = create_subscriber() + make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) + leaser_ = leaser.Leaser(subscriber_) + + leaser_.maintain_leases() + + subscriber_.modify_ack_deadline.assert_not_called() + sleep.assert_called() + + +@mock.patch('time.time', autospec=True) +@mock.patch('time.sleep', autospec=True) +def test_maintain_leases_outdated_items(sleep, time): + subscriber_ = create_subscriber() + make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) + leaser_ = leaser.Leaser(subscriber_) + + # Add these items at the beginning of the timeline + time.return_value = 0 + leaser_.add([ + requests.LeaseRequest(ack_id='ack1', byte_size=50)]) + + # Add another item at towards end of the timeline + time.return_value = subscriber_.flow_control.max_lease_duration - 1 + leaser_.add([ + requests.LeaseRequest(ack_id='ack2', byte_size=50)]) + + # Now make sure time reports that we are at the end of our timeline. + time.return_value = subscriber_.flow_control.max_lease_duration + 1 + + leaser_.maintain_leases() + + # Only ack2 should be renewed. ack1 should've been dropped + subscriber_.modify_ack_deadline.assert_called_once_with([ + requests.ModAckRequest( + ack_id='ack2', + seconds=10, + ) + ]) + subscriber_.drop.assert_called_once_with([ + requests.DropRequest(ack_id='ack1', byte_size=50) + ]) + sleep.assert_called() + + +@mock.patch('threading.Thread', autospec=True) +def test_start(thread): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + leaser_ = leaser.Leaser(subscriber_) + + leaser_.start() + + thread.assert_called_once_with( + name=leaser._LEASE_WORKER_NAME, target=leaser_.maintain_leases) + + thread.return_value.start.assert_called_once() + + assert leaser_._thread is not None + + +@mock.patch('threading.Thread', autospec=True) +def test_start_already_started(thread): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + leaser_ = leaser.Leaser(subscriber_) + leaser_._thread = mock.sentinel.thread + + with pytest.raises(ValueError): + leaser_.start() + + thread.assert_not_called() + + +def test_stop(): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + leaser_ = leaser.Leaser(subscriber_) + thread = mock.create_autospec(threading.Thread, instance=True) + leaser_._thread = thread + + leaser_.stop() + + assert leaser_._stop_event.is_set() + thread.join.assert_called_once() + assert leaser_._thread is None + + +def test_stop_no_join(): + leaser_ = leaser.Leaser(mock.sentinel.subscriber) + + leaser_.stop()