diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py index 63e41dbe8ef0..8abaedfe04ac 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py @@ -126,7 +126,7 @@ from six.moves import queue -from google.cloud.pubsub_v1.subscriber import _helper_threads +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads _LOGGER = logging.getLogger(__name__) @@ -212,7 +212,7 @@ def __iter__(self): # A call to consumer.close() signaled us to stop generating # requests. - if item == _helper_threads.STOP: + if item == helper_threads.STOP: _LOGGER.debug('Cleanly exiting request generator.') return @@ -453,7 +453,7 @@ def _stop_no_join(self): self._stopped.set() _LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name) # Signal the request generator RPC to exit cleanly. - self.send_request(_helper_threads.STOP) + self.send_request(helper_threads.STOP) thread = self._consumer_thread self._consumer_thread = None return thread diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py new file mode 100644 index 000000000000..3cd9500864b9 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -0,0 +1,100 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import collections +import logging +import threading + +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads +from google.cloud.pubsub_v1.subscriber._protocol import requests + + +_LOGGER = logging.getLogger(__name__) +_CALLBACK_WORKER_NAME = 'Thread-CallbackRequestDispatcher' + + +class Dispatcher(object): + def __init__(self, queue, subscriber): + self._queue = queue + self._subscriber = subscriber + self._thread = None + + def dispatch_callback(self, items): + """Map the callback request to the appropriate gRPC request. + + Args: + action (str): The method to be invoked. + kwargs (Dict[str, Any]): The keyword arguments for the method + specified by ``action``. + + Raises: + ValueError: If ``action`` isn't one of the expected actions + "ack", "drop", "lease", "modify_ack_deadline" or "nack". + """ + if not self._subscriber.is_active: + return + + batched_commands = collections.defaultdict(list) + + for item in items: + batched_commands[item.__class__].append(item) + + _LOGGER.debug('Handling %d batched requests', len(items)) + + if batched_commands[requests.LeaseRequest]: + self._subscriber.lease(batched_commands.pop(requests.LeaseRequest)) + if batched_commands[requests.ModAckRequest]: + self._subscriber.modify_ack_deadline( + batched_commands.pop(requests.ModAckRequest)) + # Note: Drop and ack *must* be after lease. It's possible to get both + # the lease the and ack/drop request in the same batch. + if batched_commands[requests.AckRequest]: + self._subscriber.ack(batched_commands.pop(requests.AckRequest)) + if batched_commands[requests.NackRequest]: + self._subscriber.nack(batched_commands.pop(requests.NackRequest)) + if batched_commands[requests.DropRequest]: + self._subscriber.drop(batched_commands.pop(requests.DropRequest)) + + def start(self): + """Start a thread to dispatch requests queued up by callbacks. + Spawns a thread to run :meth:`dispatch_callback`. + """ + if self._thread is not None: + raise ValueError('Dispatcher is already running.') + + worker = helper_threads.QueueCallbackWorker( + self._queue, + self.dispatch_callback, + max_items=self._subscriber.flow_control.max_request_batch_size, + max_latency=self._subscriber.flow_control.max_request_batch_latency + ) + # Create and start the helper thread. + thread = threading.Thread( + name=_CALLBACK_WORKER_NAME, + target=worker, + ) + thread.daemon = True + thread.start() + _LOGGER.debug('Started helper thread %s', thread.name) + self._thread = thread + + def stop(self): + if self._thread is not None: + # Signal the worker to stop by queueing a "poison pill" + self._queue.put(helper_threads.STOP) + self._thread.join() + + self._thread = None diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py similarity index 98% rename from pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py rename to pubsub/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py index b191eec90256..ac38101bd96f 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py @@ -65,7 +65,7 @@ def _get_many(queue_, max_items=None, max_latency=0): class QueueCallbackWorker(object): - """A helper that executes a callback for every item in the queue. + """A helper that executes a callback for items sent in a queue. Calls a blocking ``get()`` on the ``queue`` until it encounters :attr:`STOP`. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/requests.py new file mode 100644 index 000000000000..6e042e080648 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -0,0 +1,46 @@ +# Copyright 2017, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for concurrency policy.""" + +from __future__ import absolute_import, division + +import collections + +# Namedtuples for management requests. Used by the Message class to communicate +# items of work back to the policy. +AckRequest = collections.namedtuple( + 'AckRequest', + ['ack_id', 'byte_size', 'time_to_ack'], +) + +DropRequest = collections.namedtuple( + 'DropRequest', + ['ack_id', 'byte_size'], +) + +LeaseRequest = collections.namedtuple( + 'LeaseRequest', + ['ack_id', 'byte_size'], +) + +ModAckRequest = collections.namedtuple( + 'ModAckRequest', + ['ack_id', 'seconds'], +) + +NackRequest = collections.namedtuple( + 'NackRequest', + ['ack_id', 'byte_size'], +) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 9a16dbe4efc6..78874f32aaf2 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -24,7 +24,7 @@ from six.moves import queue as queue_mod from google.cloud.pubsub_v1 import types -from google.cloud.pubsub_v1.subscriber import _helper_threads +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber.futures import Future from google.cloud.pubsub_v1.subscriber.policy import base from google.cloud.pubsub_v1.subscriber.message import Message @@ -158,7 +158,7 @@ def close(self): raise ValueError('This policy has not been opened yet.') # Stop consuming messages. - self._request_queue.put(_helper_threads.STOP) + self._request_queue.put(helper_threads.STOP) self._dispatch_thread.join() # Wait until stopped. self._dispatch_thread = None self._consumer.stop_consuming() @@ -186,7 +186,7 @@ def _start_dispatch(self): "dispatch thread" member on the current policy. """ _LOGGER.debug('Starting callback requests worker.') - dispatch_worker = _helper_threads.QueueCallbackWorker( + dispatch_worker = helper_threads.QueueCallbackWorker( self._request_queue, self.dispatch_callback, max_items=self.flow_control.max_request_batch_size, diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py b/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py new file mode 100644 index 000000000000..c3177f71d9e7 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py @@ -0,0 +1,83 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.pubsub_v1 import types + + +class Subscriber(object): + """A consumer class based on :class:`threading.Thread`. + + This consumer handles the connection to the Pub/Sub service and all of + the concurrency needs. + + Args: + client (~.pubsub_v1.subscriber.client): The subscriber client used + to create this instance. + subscription (str): The name of the subscription. The canonical + format for this is + ``projects/{project}/subscriptions/{subscription}``. + flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow + control settings. + executor (~concurrent.futures.ThreadPoolExecutor): (Optional.) A + ThreadPoolExecutor instance, or anything duck-type compatible + with it. + queue (~queue.Queue): (Optional.) A Queue instance, appropriate + for crossing the concurrency boundary implemented by + ``executor``. + """ + + def __init__(self, client, subscription, flow_control=types.FlowControl(), + scheduler_cls=None): + raise NotImplementedError + + @property + def is_active(self): + raise NotImplementedError + + @property + def flow_control(self): + raise NotImplementedError + + @property + def future(self): + raise NotImplementedError + + # + # User-facing subscriber management methods. + # + + def open(self, callback): + raise NotImplementedError + + def close(self): + raise NotImplementedError + + # + # Message management methods + # + + def ack(self, items): + raise NotImplementedError + + def drop(self, items): + raise NotImplementedError + + def lease(self, items): + raise NotImplementedError + + def modify_ack_deadline(self, items): + raise NotImplementedError + + def nack(self, items): + raise NotImplementedError diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py index daac8352e5d9..572014dbfc4f 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py @@ -22,8 +22,8 @@ from google.cloud.pubsub_v1 import subscriber from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber import _consumer +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber.policy import base -from google.cloud.pubsub_v1.subscriber import _helper_threads class Test_RequestQueueGenerator(object): @@ -93,7 +93,7 @@ def test_exit_when_inactive_empty(self): def test_exit_with_stop(self): q = mock.create_autospec(queue.Queue, instance=True) - q.get.side_effect = [_helper_threads.STOP, queue.Empty()] + q.get.side_effect = [helper_threads.STOP, queue.Empty()] rpc = mock.create_autospec(grpc.RpcContext, instance=True) rpc.is_active.return_value = True diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py new file mode 100644 index 000000000000..9367a6b1f0e7 --- /dev/null +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -0,0 +1,98 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +from google.cloud.pubsub_v1.subscriber._protocol import dispatcher +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads +from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber import subscriber + +import mock +from six.moves import queue +import pytest + + +@pytest.mark.parametrize('item,method', [ + (requests.AckRequest(0, 0, 0), 'ack'), + (requests.DropRequest(0, 0), 'drop'), + (requests.LeaseRequest(0, 0), 'lease'), + (requests.ModAckRequest(0, 0), 'modify_ack_deadline'), + (requests.NackRequest(0, 0), 'nack') +]) +def test_dispatch_callback(item, method): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + + items = [item] + dispatcher_.dispatch_callback(items) + + getattr(subscriber_, method).assert_called_once_with([item]) + + +def test_dispatch_callback_inactive(): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + subscriber_.is_active = False + dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + + dispatcher_.dispatch_callback([requests.AckRequest(0, 0, 0)]) + + subscriber_.ack.assert_not_called() + + +@mock.patch('threading.Thread', autospec=True) +def test_start(thread): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + + dispatcher_.start() + + thread.assert_called_once_with( + name=dispatcher._CALLBACK_WORKER_NAME, target=mock.ANY) + + thread.return_value.start.assert_called_once() + + assert dispatcher_._thread is not None + + +@mock.patch('threading.Thread', autospec=True) +def test_start_already_started(thread): + subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) + dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + dispatcher_._thread = mock.sentinel.thread + + with pytest.raises(ValueError): + dispatcher_.start() + + thread.assert_not_called() + + +def test_stop(): + queue_ = queue.Queue() + dispatcher_ = dispatcher.Dispatcher(queue_, mock.sentinel.subscriber) + thread = mock.create_autospec(threading.Thread, instance=True) + dispatcher_._thread = thread + + dispatcher_.stop() + + assert queue_.get() is helper_threads.STOP + thread.join.assert_called_once() + assert dispatcher_._thread is None + + +def test_stop_no_join(): + dispatcher_ = dispatcher.Dispatcher( + mock.sentinel.queue, mock.sentinel.subscriber) + + dispatcher_.stop() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py index 8bdafeed5b6d..507e8292f7c8 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py @@ -15,20 +15,20 @@ import mock from six.moves import queue -from google.cloud.pubsub_v1.subscriber import _helper_threads +from google.cloud.pubsub_v1.subscriber._protocol import helper_threads def test_queue_callback_worker(): queue_ = queue.Queue() callback = mock.Mock(spec=()) - qct = _helper_threads.QueueCallbackWorker(queue_, callback) + qct = helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: get.side_effect = ( mock.sentinel.A, - _helper_threads.STOP, + helper_threads.STOP, queue.Empty()) qct() @@ -40,14 +40,14 @@ def test_queue_callback_worker(): def test_queue_callback_worker_stop_with_extra_items(): queue_ = queue.Queue() callback = mock.Mock(spec=()) - qct = _helper_threads.QueueCallbackWorker(queue_, callback) + qct = helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: get.side_effect = ( mock.sentinel.A, - _helper_threads.STOP, + helper_threads.STOP, mock.sentinel.B, queue.Empty()) qct() @@ -60,7 +60,7 @@ def test_queue_callback_worker_stop_with_extra_items(): def test_queue_callback_worker_get_many(): queue_ = queue.Queue() callback = mock.Mock(spec=()) - qct = _helper_threads.QueueCallbackWorker(queue_, callback) + qct = helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. @@ -69,7 +69,7 @@ def test_queue_callback_worker_get_many(): mock.sentinel.A, queue.Empty(), mock.sentinel.B, - _helper_threads.STOP, + helper_threads.STOP, queue.Empty()) qct() @@ -83,7 +83,7 @@ def test_queue_callback_worker_get_many(): def test_queue_callback_worker_max_items(): queue_ = queue.Queue() callback = mock.Mock(spec=()) - qct = _helper_threads.QueueCallbackWorker(queue_, callback, max_items=1) + qct = helper_threads.QueueCallbackWorker(queue_, callback, max_items=1) # Set up an appropriate mock for the queue, and call the queue callback # thread. @@ -91,7 +91,7 @@ def test_queue_callback_worker_max_items(): get.side_effect = ( mock.sentinel.A, mock.sentinel.B, - _helper_threads.STOP, + helper_threads.STOP, queue.Empty()) qct() @@ -105,14 +105,14 @@ def test_queue_callback_worker_max_items(): def test_queue_callback_worker_exception(): queue_ = queue.Queue() callback = mock.Mock(spec=(), side_effect=(Exception,)) - qct = _helper_threads.QueueCallbackWorker(queue_, callback) + qct = helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: get.side_effect = ( mock.sentinel.A, - _helper_threads.STOP, + helper_threads.STOP, queue.Empty()) qct()