diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py index a40f039152a0..13e0c06cce80 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py @@ -408,6 +408,7 @@ def __init__(self, bidi_rpc, on_response): self._paused = False self._wake = threading.Condition() self._thread = None + self._operational_lock = threading.Lock() def _on_call_done(self, future): # Resume the thread if it's paused, this prevents blocking forever @@ -447,22 +448,26 @@ def _thread_main(self): def start(self): """Start the background thread and begin consuming the thread.""" - thread = threading.Thread( - name=_BIDIRECTIONAL_CONSUMER_NAME, - target=self._thread_main) - thread.daemon = True - thread.start() - self._thread = thread - _LOGGER.debug('Started helper thread %s', thread.name) + with self._operational_lock: + thread = threading.Thread( + name=_BIDIRECTIONAL_CONSUMER_NAME, + target=self._thread_main) + thread.daemon = True + thread.start() + self._thread = thread + _LOGGER.debug('Started helper thread %s', thread.name) def stop(self): """Stop consuming the stream and shutdown the background thread.""" - self._bidi_rpc.close() + with self._operational_lock: + self._bidi_rpc.close() - if self._thread is not None: - self._thread.join() + if self._thread is not None: + # Resume the thread to wake it up in case it is sleeping. + self.resume() + self._thread.join() - self._thread = None + self._thread = None @property def is_active(self): diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 3cd9500864b9..c70f8531a817 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -18,6 +18,7 @@ import logging import threading +from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -27,10 +28,45 @@ class Dispatcher(object): - def __init__(self, queue, subscriber): + def __init__(self, manager, queue): + self._manager = manager self._queue = queue - self._subscriber = subscriber self._thread = None + self._operational_lock = threading.Lock() + + def start(self): + """Start a thread to dispatch requests queued up by callbacks. + Spawns a thread to run :meth:`dispatch_callback`. + """ + with self._operational_lock: + if self._thread is not None: + raise ValueError('Dispatcher is already running.') + + flow_control = self._manager.flow_control + worker = helper_threads.QueueCallbackWorker( + self._queue, + self.dispatch_callback, + max_items=flow_control.max_request_batch_size, + max_latency=flow_control.max_request_batch_latency + ) + # Create and start the helper thread. + thread = threading.Thread( + name=_CALLBACK_WORKER_NAME, + target=worker, + ) + thread.daemon = True + thread.start() + _LOGGER.debug('Started helper thread %s', thread.name) + self._thread = thread + + def stop(self): + with self._operational_lock: + if self._thread is not None: + # Signal the worker to stop by queueing a "poison pill" + self._queue.put(helper_threads.STOP) + self._thread.join() + + self._thread = None def dispatch_callback(self, items): """Map the callback request to the appropriate gRPC request. @@ -44,7 +80,7 @@ def dispatch_callback(self, items): ValueError: If ``action`` isn't one of the expected actions "ack", "drop", "lease", "modify_ack_deadline" or "nack". """ - if not self._subscriber.is_active: + if not self._manager.is_active: return batched_commands = collections.defaultdict(list) @@ -55,46 +91,79 @@ def dispatch_callback(self, items): _LOGGER.debug('Handling %d batched requests', len(items)) if batched_commands[requests.LeaseRequest]: - self._subscriber.lease(batched_commands.pop(requests.LeaseRequest)) + self.lease(batched_commands.pop(requests.LeaseRequest)) if batched_commands[requests.ModAckRequest]: - self._subscriber.modify_ack_deadline( + self.modify_ack_deadline( batched_commands.pop(requests.ModAckRequest)) # Note: Drop and ack *must* be after lease. It's possible to get both # the lease the and ack/drop request in the same batch. if batched_commands[requests.AckRequest]: - self._subscriber.ack(batched_commands.pop(requests.AckRequest)) + self.ack(batched_commands.pop(requests.AckRequest)) if batched_commands[requests.NackRequest]: - self._subscriber.nack(batched_commands.pop(requests.NackRequest)) + self.nack(batched_commands.pop(requests.NackRequest)) if batched_commands[requests.DropRequest]: - self._subscriber.drop(batched_commands.pop(requests.DropRequest)) + self.drop(batched_commands.pop(requests.DropRequest)) - def start(self): - """Start a thread to dispatch requests queued up by callbacks. - Spawns a thread to run :meth:`dispatch_callback`. + def ack(self, items): + """Acknowledge the given messages. + + Args: + items(Sequence[AckRequest]): The items to acknowledge. """ - if self._thread is not None: - raise ValueError('Dispatcher is already running.') - - worker = helper_threads.QueueCallbackWorker( - self._queue, - self.dispatch_callback, - max_items=self._subscriber.flow_control.max_request_batch_size, - max_latency=self._subscriber.flow_control.max_request_batch_latency - ) - # Create and start the helper thread. - thread = threading.Thread( - name=_CALLBACK_WORKER_NAME, - target=worker, + # If we got timing information, add it to the histogram. + for item in items: + time_to_ack = item.time_to_ack + if time_to_ack is not None: + self._manager.ack_histogram.add(time_to_ack) + + ack_ids = [item.ack_id for item in items] + request = types.StreamingPullRequest(ack_ids=ack_ids) + self._manager.send(request) + + # Remove the message from lease management. + self.drop(items) + + def drop(self, items): + """Remove the given messages from lease management. + + Args: + items(Sequence[DropRequest]): The items to drop. + """ + self._manager.leaser.remove(items) + self._manager.maybe_resume_consumer() + + def lease(self, items): + """Add the given messages to lease management. + + Args: + items(Sequence[LeaseRequest]): The items to lease. + """ + self._manager.leaser.add(items) + self._manager.maybe_pause_consumer() + + def modify_ack_deadline(self, items): + """Modify the ack deadline for the given messages. + + Args: + items(Sequence[ModAckRequest]): The items to modify. + """ + ack_ids = [item.ack_id for item in items] + seconds = [item.seconds for item in items] + + request = types.StreamingPullRequest( + modify_deadline_ack_ids=ack_ids, + modify_deadline_seconds=seconds, ) - thread.daemon = True - thread.start() - _LOGGER.debug('Started helper thread %s', thread.name) - self._thread = thread + self._manager.send(request) - def stop(self): - if self._thread is not None: - # Signal the worker to stop by queueing a "poison pill" - self._queue.put(helper_threads.STOP) - self._thread.join() + def nack(self, items): + """Explicitly deny receipt of messages. - self._thread = None + Args: + items(Sequence[NackRequest]): The items to deny. + """ + self.modify_ack_deadline([ + requests.ModAckRequest(ack_id=item.ack_id, seconds=0) + for item in items]) + self.drop( + [requests.DropRequest(*item) for item in items]) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 05157b5e8db1..02e78577ff70 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -36,9 +36,10 @@ class Leaser(object): - def __init__(self, subscriber): + def __init__(self, manager): self._thread = None - self._subscriber = subscriber + self._operational_lock = threading.Lock() + self._manager = manager self._leased_messages = {} """dict[str, float]: A mapping of ack IDs to the local time when the @@ -93,17 +94,17 @@ def remove(self, items): self._bytes = 0 def maintain_leases(self): - """Maintain all of the leases being managed by the subscriber. + """Maintain all of the leases being managed. This method modifies the ack deadline for all of the managed ack IDs, then waits for most of that time (but with jitter), and repeats. """ - while self._subscriber.is_active and not self._stop_event.is_set(): + while self._manager.is_active and not self._stop_event.is_set(): # Determine the appropriate duration for the lease. This is # based off of how long previous messages have taken to ack, with # a sensible default and within the ranges allowed by Pub/Sub. - p99 = self._subscriber.ack_histogram.percentile(99) + p99 = self._manager.ack_histogram.percentile(99) _LOGGER.debug('The current p99 value is %d seconds.', p99) # Make a copy of the leased messages. This is needed because it's @@ -116,7 +117,7 @@ def maintain_leases(self): # drop messages and allow Pub/Sub to resend them. cutoff = ( time.time() - - self._subscriber.flow_control.max_lease_duration) + self._manager.flow_control.max_lease_duration) to_drop = [ requests.DropRequest(ack_id, item.size) for ack_id, item @@ -127,11 +128,11 @@ def maintain_leases(self): _LOGGER.warning( 'Dropping %s items because they were leased too long.', len(to_drop)) - self._subscriber.drop(to_drop) + self._manager.dispatcher.drop(to_drop) # Remove dropped items from our copy of the leased messages (they # have already been removed from the real one by - # self._subscriber.drop(), which calls self.remove()). + # self._manager.drop(), which calls self.remove()). for item in to_drop: leased_messages.pop(item.ack_id) @@ -147,7 +148,7 @@ def maintain_leases(self): # without any sort of race condition would require a # way for ``send_request`` to fail when the consumer # is inactive. - self._subscriber.modify_ack_deadline([ + self._manager.dispatcher.modify_ack_deadline([ requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids]) # Now wait an appropriate period of time and do this again. @@ -163,25 +164,27 @@ def maintain_leases(self): _LOGGER.info('%s exiting.', _LEASE_WORKER_NAME) def start(self): - if self._thread is not None: - raise ValueError('Leaser is already running.') - - # Create and start the helper thread. - self._stop_event.clear() - thread = threading.Thread( - name=_LEASE_WORKER_NAME, - target=self.maintain_leases) - thread.daemon = True - thread.start() - _LOGGER.debug('Started helper thread %s', thread.name) - self._thread = thread + with self._operational_lock: + if self._thread is not None: + raise ValueError('Leaser is already running.') + + # Create and start the helper thread. + self._stop_event.clear() + thread = threading.Thread( + name=_LEASE_WORKER_NAME, + target=self.maintain_leases) + thread.daemon = True + thread.start() + _LOGGER.debug('Started helper thread %s', thread.name) + self._thread = thread def stop(self): - self._stop_event.set() + with self._operational_lock: + self._stop_event.set() - if self._thread is not None: - # The thread should automatically exit when the consumer is - # inactive. - self._thread.join() + if self._thread is not None: + # The thread should automatically exit when the consumer is + # inactive. + self._thread.join() - self._thread = None + self._thread = None diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py new file mode 100644 index 000000000000..2fb93e7cfda7 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -0,0 +1,382 @@ +# Copyright 2017, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import functools +import logging +import threading + +from google.api_core import exceptions +import grpc + +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.subscriber._protocol import bidi +from google.cloud.pubsub_v1.subscriber._protocol import dispatcher +from google.cloud.pubsub_v1.subscriber._protocol import histogram +from google.cloud.pubsub_v1.subscriber._protocol import leaser +from google.cloud.pubsub_v1.subscriber._protocol import requests +import google.cloud.pubsub_v1.subscriber.message +import google.cloud.pubsub_v1.subscriber.scheduler + +_LOGGER = logging.getLogger(__name__) +_RETRYABLE_STREAM_ERRORS = ( + exceptions.DeadlineExceeded, + exceptions.ServiceUnavailable, + exceptions.InternalServerError, + exceptions.Unknown, + exceptions.GatewayTimeout +) + + +def _maybe_wrap_exception(exception): + """Wraps a gRPC exception class, if needed.""" + if isinstance(exception, grpc.RpcError): + return exceptions.from_grpc_error(exception) + return exception + + +def _wrap_callback_errors(callback, message): + """Wraps a user callback so that if an exception occurs the message is + nacked. + + Args: + callback (Callable[None, Message]): The user callback. + message (~Message): The Pub/Sub message. + """ + try: + callback(message) + except Exception: + # Note: the likelihood of this failing is extremely low. This just adds + # a message to a queue, so if this doesn't work the world is in an + # unrecoverable state and this thread should just bail. + _LOGGER.exception( + 'Top-level exception occurred in callback while processing a ' + 'message') + message.nack() + + +class StreamingPullManager(object): + """The streaming pull manager coordinates pulling messages from Pub/Sub, + leasing them, and scheduling them to be processed. + + Args: + client (~.pubsub_v1.subscriber.client): The subscriber client used + to create this instance. + subscription (str): The name of the subscription. The canonical + format for this is + ``projects/{project}/subscriptions/{subscription}``. + flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow + control settings. + scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler + to use to process messages. If not provided, a thread pool-based + scheduler will be used. + """ + + def __init__(self, client, subscription, flow_control=types.FlowControl(), + scheduler=None): + self._client = client + self._subscription = subscription + self._flow_control = flow_control + self._ack_histogram = histogram.Histogram() + self._last_histogram_size = 0 + self._ack_deadline = 10 + self._rpc = None + self._callback = None + self._closing = threading.Lock() + self._closed = False + self._close_callbacks = [] + + if scheduler is None: + self._scheduler = ( + google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler()) + else: + self._scheduler = scheduler + + # The threads created in ``.open()``. + self._dispatcher = None + self._leaser = None + self._consumer = None + + @property + def is_active(self): + """bool: True if this manager is actively streaming. + + Note that ``False`` does not indicate this is complete shut down, + just that it stopped getting new messages. + """ + return self._consumer is not None and self._consumer.is_active + + @property + def flow_control(self): + """google.cloud.pubsub_v1.types.FlowControl: The active flow control + settings.""" + return self._flow_control + + @property + def dispatcher(self): + """google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher: + The dispatcher helper. + """ + return self._dispatcher + + @property + def leaser(self): + """google.cloud.pubsub_v1.subscriber._protocol.leaser.Leaser: + The leaser helper. + """ + return self._leaser + + @property + def ack_histogram(self): + """google.cloud.pubsub_v1.subscriber._protocol.histogram.Histogram: + The histogram tracking time-to-acknowledge. + """ + return self._ack_histogram + + @property + def ack_deadline(self): + """Return the current ack deadline based on historical time-to-ack. + + This method is "sticky". It will only perform the computations to + check on the right ack deadline if the histogram has gained a + significant amount of new information. + + Returns: + int: The ack deadline. + """ + target = min([ + self._last_histogram_size * 2, + self._last_histogram_size + 100, + ]) + if len(self.ack_histogram) > target: + self._ack_deadline = self.ack_histogram.percentile(percent=99) + return self._ack_deadline + + @property + def load(self): + """Return the current load. + + The load is represented as a float, where 1.0 represents having + hit one of the flow control limits, and values between 0.0 and 1.0 + represent how close we are to them. (0.5 means we have exactly half + of what the flow control setting allows, for example.) + + There are (currently) two flow control settings; this property + computes how close the manager is to each of them, and returns + whichever value is higher. (It does not matter that we have lots of + running room on setting A if setting B is over.) + + Returns: + float: The load value. + """ + if self._leaser is None: + return 0 + + return max([ + self._leaser.message_count / self._flow_control.max_messages, + self._leaser.bytes / self._flow_control.max_bytes, + ]) + + def add_close_callback(self, callback): + """Schedules a callable when the manager closes. + + Args: + callback (Callable): The method to call. + """ + self._close_callbacks.append(callback) + + def maybe_pause_consumer(self): + """Check the current load and pause the consumer if needed.""" + if self.load >= 1.0 and not self._consumer.is_paused: + _LOGGER.debug( + 'Message backlog over load at %.2f, pausing.', self.load) + self._consumer.pause() + + def maybe_resume_consumer(self): + """Check the current load and resume the consumer if needed.""" + # If we have been paused by flow control, check and see if we are + # back within our limits. + # + # In order to not thrash too much, require us to have passed below + # the resume threshold (80% by default) of each flow control setting + # before restarting. + if not self._consumer.is_paused: + return + + if self.load < self.flow_control.resume_threshold: + self._consumer.resume() + else: + _LOGGER.debug('Did not resume, current load is %s', self.load) + + def send(self, request): + """Queue a request to be sent to the RPC.""" + self._rpc.send(request) + + def open(self, callback): + """Begin consuming messages. + + Args: + callback (Callable[None, google.cloud.pubsub_v1.message.Messages]): + A callback that will be called for each message received on the + stream. + """ + if self.is_active: + raise ValueError('This manager is already open.') + + if self._closed: + raise ValueError( + 'This manager has been closed and can not be re-used.') + + self._callback = functools.partial(_wrap_callback_errors, callback) + + # Start the thread to pass the requests. + self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue) + self._dispatcher.start() + + # Start consuming messages. + self._rpc = bidi.ResumableBidiRpc( + start_rpc=self._client.api.streaming_pull, + initial_request=self._get_initial_request, + should_recover=self._should_recover) + self._rpc.add_done_callback(self._on_rpc_done) + self._consumer = bidi.BackgroundConsumer( + self._rpc, self._on_response) + self._consumer.start() + + # Start the lease maintainer thread. + self._leaser = leaser.Leaser(self) + self._leaser.start() + + def close(self, reason=None): + """Stop consuming messages and shutdown all helper threads. + + This method is idempotent. Additional calls will have no effect. + + Args: + reason (Any): The reason to close this. If None, this is considered + an "intentional" shutdown. This is passed to the callbacks + specified via :meth:`add_close_callback`. + """ + with self._closing: + if self._closed: + return + + # Stop consuming messages. + if self.is_active: + _LOGGER.debug('Stopping consumer.') + self._consumer.stop() + self._consumer = None + + # Shutdown all helper threads + _LOGGER.debug('Stopping scheduler.') + self._scheduler.shutdown() + self._scheduler = None + _LOGGER.debug('Stopping leaser.') + self._leaser.stop() + self._leaser = None + _LOGGER.debug('Stopping dispatcher.') + self._dispatcher.stop() + self._dispatcher = None + + self._rpc = None + self._closed = True + _LOGGER.debug('Finished stopping manager.') + + for callback in self._close_callbacks: + callback(self, reason) + + def _get_initial_request(self): + """Return the initial request for the RPC. + + This defines the initial request that must always be sent to Pub/Sub + immediately upon opening the subscription. + + Returns: + google.cloud.pubsub_v1.types.StreamingPullRequest: A request + suitable for being the first request on the stream (and not + suitable for any other purpose). + """ + # Any ack IDs that are under lease management need to have their + # deadline extended immediately. + lease_ids = self._leaser.ack_ids + + # Put the request together. + request = types.StreamingPullRequest( + modify_deadline_ack_ids=list(lease_ids), + modify_deadline_seconds=[self.ack_deadline] * len(lease_ids), + stream_ack_deadline_seconds=self.ack_histogram.percentile(99), + subscription=self._subscription, + ) + + # Return the initial request. + return request + + def _on_response(self, response): + """Process all received Pub/Sub messages. + + For each message, send a modified acknowledgment request to the + server. This prevents expiration of the message due to buffering by + gRPC or proxy/firewall. This makes the server and client expiration + timer closer to each other thus preventing the message being + redelivered multiple times. + + After the messages have all had their ack deadline updated, execute + the callback for each message using the executor. + """ + _LOGGER.debug( + 'Scheduling callbacks for %s messages.', + len(response.received_messages)) + + # Immediately modack the messages we received, as this tells the server + # that we've received them. + items = [ + requests.ModAckRequest( + message.ack_id, self._ack_histogram.percentile(99)) + for message in response.received_messages + ] + self._dispatcher.modify_ack_deadline(items) + for received_message in response.received_messages: + message = google.cloud.pubsub_v1.subscriber.message.Message( + received_message.message, + received_message.ack_id, + self._scheduler.queue) + # TODO: Immediately lease instead of using the callback queue. + self._scheduler.schedule(self._callback, message) + + def _should_recover(self, exception): + """Determine if an error on the RPC stream should be recovered. + + If the exception is one of the retryable exceptions, this will signal + to the consumer thread that it should "recover" from the failure. + + This will cause the stream to exit when it returns :data:`False`. + + Returns: + bool: Indicates if the caller should recover or shut down. + Will be :data:`True` if the ``exception`` is "acceptable", i.e. + in a list of retryable / idempotent exceptions. + """ + exception = _maybe_wrap_exception(exception) + # If this is in the list of idempotent exceptions, then we want to + # recover. + if isinstance(exception, _RETRYABLE_STREAM_ERRORS): + return True + return False + + def _on_rpc_done(self, future): + _LOGGER.info( + 'RPC termination has signaled streaming pull manager shutdown.') + future = _maybe_wrap_exception(future) + self.close(reason=future) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/client.py b/pubsub/google/cloud/pubsub_v1/subscriber/client.py index 439a843cb3e9..4e104be8bf4c 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/client.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/client.py @@ -24,6 +24,8 @@ from google.cloud.pubsub_v1 import _gapic from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import subscriber_client +from google.cloud.pubsub_v1.subscriber import futures +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager from google.cloud.pubsub_v1.subscriber.policy import thread @@ -78,7 +80,7 @@ def __init__(self, policy_class=thread.Policy, **kwargs): # Add the metrics headers, and instantiate the underlying GAPIC # client. - self.api = subscriber_client.SubscriberClient(**kwargs) + self._api = subscriber_client.SubscriberClient(**kwargs) # The subcription class is responsible to retrieving and dispatching # messages. @@ -93,6 +95,11 @@ def target(self): """ return subscriber_client.SubscriberClient.SERVICE_ADDRESS + @property + def api(self): + """The underlying gapic API client.""" + return self._api + def subscribe(self, subscription, callback=None, flow_control=()): """Return a representation of an individual subscription. @@ -136,3 +143,79 @@ def subscribe(self, subscription, callback=None, flow_control=()): error = '{!r} is not callable, please check input'.format(callback) raise TypeError(error) return subscr + + def subscribe_experimental( + self, subscription, callback, flow_control=(), + scheduler_=None): + """Asynchronously start receiving messages on a given subscription. + + This method starts a background thread to begin pulling messages from + a Pub/Sub subscription and scheduling them to be processed using the + provided ``callback``. + + The ``callback`` will be called with an individual + :class:`google.cloud.pubsub_v1.subscriber.message.Message`. It is the + responsibility of the callback to either call ``ack()`` or ``nack()`` + on the message when it finished processing. If an exception occurs in + the callback during processing, the exception is logged and the message + is ``nack()`` ed. + + The ``flow_control`` argument can be used to control the rate of + message processing. + + This method starts the receiver in the background and returns a + *Future* representing its execution. Waiting on the future (calling + ``result()``) will block forever or until a non-recoverable error + is encountered (such as loss of network connectivity). Cancelling the + future will signal the process to shutdown gracefully and exit. + + Example + + .. code-block:: python + + from google.cloud.pubsub_v1 import subscriber + + subscriber_client = pubsub.SubscriberClient() + + # existing subscription + subscription = subscriber_client.subscription_path( + 'my-project-id', 'my-subscription') + + def callback(message): + print(message) + message.ack() + + future = subscriber.subscribe_experimental( + subscription, callback) + + try: + future.result() + except KeyboardInterrupt: + future.cancel() + + Args: + subscription (str): The name of the subscription. The + subscription should have already been created (for example, + by using :meth:`create_subscription`). + callback (Callable[~.pubsub_v1.subscriber.message.Message]): + The callback function. This function receives the message as + its only argument and will be called from a different thread/ + process depending on the scheduling strategy. + flow_control (~.pubsub_v1.types.FlowControl): The flow control + settings. Use this to prevent situations where you are + inundated with too many messages at once. + + Returns: + google.cloud.pubsub_v1.futures.StreamingPullFuture: A Future object + that can be used to manage the background stream. + """ + flow_control = types.FlowControl(*flow_control) + + manager = streaming_pull_manager.StreamingPullManager( + self, subscription, flow_control) + + future = futures.StreamingPullFuture(manager) + + manager.open(callback) + + return future diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py index 7114a32c9600..e6e55439a2c5 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py @@ -62,3 +62,36 @@ def running(self): return False return super(Future, self).running() + + +class StreamingPullFuture(futures.Future): + """Represents a process that asynchronously performs streaming pull and + schedules messages to be processed. + + This future is resolved when the process is stopped (via :meth:`cancel`) or + if it encounters an unrecoverable error. Calling `.result()` will cause + the calling thread to block indefinitely. + """ + + def __init__(self, manager): + super(StreamingPullFuture, self).__init__() + self._manager = manager + self._manager.add_close_callback(self._on_close_callback) + self._cancelled = True + + def _on_close_callback(self, manager, result): + if result is None: + self.set_result(True) + else: + self.set_exception(result) + + def cancel(self): + """Stops pulling messages and shutdowns the background thread consuming + messages. + """ + self._cancelled = True + return self._manager.close() + + def cancelled(self): + """bool: True if the subscription has been cancelled.""" + return self._cancelled diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py index 4af03976f27e..d24161e853f4 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py @@ -18,7 +18,7 @@ import math import time -from google.cloud.pubsub_v1.subscriber.policy import base as base_policy +from google.cloud.pubsub_v1.subscriber._protocol import requests _MESSAGE_REPR = """\ @@ -174,7 +174,7 @@ def ack(self): """ time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( - base_policy.AckRequest( + requests.AckRequest( ack_id=self._ack_id, byte_size=self.size, time_to_ack=time_to_ack @@ -195,7 +195,7 @@ def drop(self): directly. """ self._request_queue.put( - base_policy.DropRequest( + requests.DropRequest( ack_id=self._ack_id, byte_size=self.size ) @@ -209,7 +209,7 @@ def lease(self): need to call it manually. """ self._request_queue.put( - base_policy.LeaseRequest( + requests.LeaseRequest( ack_id=self._ack_id, byte_size=self.size ) @@ -231,7 +231,7 @@ def modify_ack_deadline(self, seconds): values below 10 are advised against. """ self._request_queue.put( - base_policy.ModAckRequest( + requests.ModAckRequest( ack_id=self._ack_id, seconds=seconds ) @@ -243,7 +243,7 @@ def nack(self): This will cause the message to be re-delivered to the subscription. """ self._request_queue.put( - base_policy.NackRequest( + requests.NackRequest( ack_id=self._ack_id, byte_size=self.size ) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py index 82122cea83d8..a1dca5208c94 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py @@ -28,6 +28,7 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber import _consumer +from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import histogram @@ -35,30 +36,11 @@ # Namedtuples for management requests. Used by the Message class to communicate # items of work back to the policy. -AckRequest = collections.namedtuple( - 'AckRequest', - ['ack_id', 'byte_size', 'time_to_ack'], -) - -DropRequest = collections.namedtuple( - 'DropRequest', - ['ack_id', 'byte_size'], -) - -LeaseRequest = collections.namedtuple( - 'LeaseRequest', - ['ack_id', 'byte_size'], -) - -ModAckRequest = collections.namedtuple( - 'ModAckRequest', - ['ack_id', 'seconds'], -) - -NackRequest = collections.namedtuple( - 'NackRequest', - ['ack_id', 'byte_size'], -) +AckRequest = requests.AckRequest +DropRequest = requests.DropRequest +LeaseRequest = requests.LeaseRequest +ModAckRequest = requests.ModAckRequest +NackRequest = requests.NackRequest _LeasedMessage = collections.namedtuple( '_LeasedMessage', @@ -474,7 +456,7 @@ def on_response(self, response): For example, if a the Policy implementation takes a callback in its constructor, you can schedule the callback using a - :cls:`concurrent.futures.ThreadPoolExecutor`:: + :class:`concurrent.futures.ThreadPoolExecutor`:: self._pool.submit(self._callback, response) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py b/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py deleted file mode 100644 index f1bc96808e60..000000000000 --- a/pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2017, Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from google.cloud.pubsub_v1 import types - - -class Subscriber(object): - """A consumer class based on :class:`threading.Thread`. - - This consumer handles the connection to the Pub/Sub service and all of - the concurrency needs. - - Args: - client (~.pubsub_v1.subscriber.client): The subscriber client used - to create this instance. - subscription (str): The name of the subscription. The canonical - format for this is - ``projects/{project}/subscriptions/{subscription}``. - flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow - control settings. - executor (~concurrent.futures.ThreadPoolExecutor): (Optional.) A - ThreadPoolExecutor instance, or anything duck-type compatible - with it. - queue (~queue.Queue): (Optional.) A Queue instance, appropriate - for crossing the concurrency boundary implemented by - ``executor``. - """ - - def __init__(self, client, subscription, flow_control=types.FlowControl(), - scheduler_cls=None): - raise NotImplementedError - - @property - def is_active(self): - raise NotImplementedError - - @property - def flow_control(self): - raise NotImplementedError - - @property - def ack_histogram(self): - raise NotImplementedError - - @property - def future(self): - raise NotImplementedError - - # - # User-facing subscriber management methods. - # - - def open(self, callback): - raise NotImplementedError - - def close(self): - raise NotImplementedError - - # - # Message management methods - # - - def ack(self, items): - raise NotImplementedError - - def drop(self, items): - raise NotImplementedError - - def lease(self, items): - raise NotImplementedError - - def modify_ack_deadline(self, items): - raise NotImplementedError - - def nack(self, items): - raise NotImplementedError diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 9f5c77477995..a33a042f07fa 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -186,6 +186,59 @@ def test_subscribe_to_messages_async_callbacks( subscription.close() +def test_subscribe_to_messages_async_callbacks_experimental( + publisher, topic_path, subscriber, subscription_path, cleanup): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # Create a topic. + publisher.create_topic(topic_path) + + # Subscribe to the topic. This must happen before the messages + # are published. + subscriber.create_subscription(subscription_path, topic_path) + + # Publish some messages. + futures = [ + publisher.publish( + topic_path, + b'Wooooo! The claaaaaw!', + num=str(index), + ) + for index in six.moves.range(2) + ] + + # Make sure the publish completes. + for future in futures: + future.result() + + # We want to make sure that the callback was called asynchronously. So + # track when each call happened and make sure below. + callback = TimesCallback(2) + + # Actually open the subscription and hold it open for a few seconds. + future = subscriber.subscribe_experimental(subscription_path, callback) + for second in six.moves.range(5): + time.sleep(4) + + # The callback should have fired at least two times, but it may + # take some time. + if callback.calls >= 2: + first, last = sorted(callback.call_times[:2]) + diff = last - first + # "Ensure" the first two callbacks were executed asynchronously + # (sequentially would have resulted in a difference of 2+ + # seconds). + assert diff.days == 0 + assert diff.seconds < callback.sleep_time + + # Okay, we took too long; fail out. + assert callback.calls >= 2 + + future.cancel() + + class AckCallback(object): def __init__(self): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 9367a6b1f0e7..7fdee71e7dcb 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -14,47 +14,139 @@ import threading +from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber._protocol import dispatcher from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests -from google.cloud.pubsub_v1.subscriber import subscriber +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager import mock from six.moves import queue import pytest -@pytest.mark.parametrize('item,method', [ +@pytest.mark.parametrize('item,method_name', [ (requests.AckRequest(0, 0, 0), 'ack'), (requests.DropRequest(0, 0), 'drop'), (requests.LeaseRequest(0, 0), 'lease'), (requests.ModAckRequest(0, 0), 'modify_ack_deadline'), (requests.NackRequest(0, 0), 'nack') ]) -def test_dispatch_callback(item, method): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) +def test_dispatch_callback(item, method_name): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) items = [item] - dispatcher_.dispatch_callback(items) - getattr(subscriber_, method).assert_called_once_with([item]) + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_called_once_with([item]) def test_dispatch_callback_inactive(): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - subscriber_.is_active = False - dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + manager.is_active = False + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) dispatcher_.dispatch_callback([requests.AckRequest(0, 0, 0)]) - subscriber_.ack.assert_not_called() + manager.send.assert_not_called() + + +def test_ack(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.AckRequest( + ack_id='ack_id_string', byte_size=0, time_to_ack=20)] + dispatcher_.ack(items) + + manager.send.assert_called_once_with(types.StreamingPullRequest( + ack_ids=['ack_id_string'], + )) + + manager.leaser.remove.assert_called_once_with(items) + manager.maybe_resume_consumer.assert_called_once() + manager.ack_histogram.add.assert_called_once_with(20) + + +def test_ack_no_time(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.AckRequest( + ack_id='ack_id_string', byte_size=0, time_to_ack=None)] + dispatcher_.ack(items) + + manager.send.assert_called_once_with(types.StreamingPullRequest( + ack_ids=['ack_id_string'], + )) + + manager.ack_histogram.add.assert_not_called() + + +def test_lease(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.LeaseRequest(ack_id='ack_id_string', byte_size=10)] + dispatcher_.lease(items) + + manager.leaser.add.assert_called_once_with(items) + manager.maybe_pause_consumer.assert_called_once() + + +def test_drop(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.DropRequest(ack_id='ack_id_string', byte_size=10)] + dispatcher_.drop(items) + + manager.leaser.remove.assert_called_once_with(items) + manager.maybe_resume_consumer.assert_called_once() + + +def test_nack(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.NackRequest(ack_id='ack_id_string', byte_size=10)] + dispatcher_.nack(items) + + manager.send.assert_called_once_with(types.StreamingPullRequest( + modify_deadline_ack_ids=['ack_id_string'], + modify_deadline_seconds=[0], + )) + + +def test_modify_ack_deadline(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.ModAckRequest(ack_id='ack_id_string', seconds=60)] + dispatcher_.modify_ack_deadline(items) + + manager.send.assert_called_once_with(types.StreamingPullRequest( + modify_deadline_ack_ids=['ack_id_string'], + modify_deadline_seconds=[60], + )) @mock.patch('threading.Thread', autospec=True) def test_start(thread): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) dispatcher_.start() @@ -68,8 +160,9 @@ def test_start(thread): @mock.patch('threading.Thread', autospec=True) def test_start_already_started(thread): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - dispatcher_ = dispatcher.Dispatcher(mock.sentinel.queue, subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) dispatcher_._thread = mock.sentinel.thread with pytest.raises(ValueError): @@ -80,7 +173,7 @@ def test_start_already_started(thread): def test_stop(): queue_ = queue.Queue() - dispatcher_ = dispatcher.Dispatcher(queue_, mock.sentinel.subscriber) + dispatcher_ = dispatcher.Dispatcher(mock.sentinel.manager, queue_) thread = mock.create_autospec(threading.Thread, instance=True) dispatcher_._thread = thread @@ -93,6 +186,6 @@ def test_stop(): def test_stop_no_join(): dispatcher_ = dispatcher.Dispatcher( - mock.sentinel.queue, mock.sentinel.subscriber) + mock.sentinel.manager, mock.sentinel.queue) dispatcher_.stop() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index 3e6b24501594..3ffcaff647dd 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -15,11 +15,13 @@ from __future__ import absolute_import import mock +import pytest from google.auth import credentials from google.cloud.pubsub_v1 import subscriber from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber.policy import thread +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager def create_policy(**kwargs): @@ -42,3 +44,45 @@ def test_running(): assert future.running() is True policy._future = None assert future.running() is False + + +class TestStreamingPullFuture(object): + def make_future(self): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + future = futures.StreamingPullFuture(manager) + return future + + def test_default_state(self): + future = self.make_future() + + assert future.running() + assert not future.done() + future._manager.add_close_callback.assert_called_once_with( + future._on_close_callback) + + def test__on_close_callback_success(self): + future = self.make_future() + + future._on_close_callback(mock.sentinel.manager, None) + + assert future.result() is True + assert not future.running() + + def test__on_close_callback_failure(self): + future = self.make_future() + + future._on_close_callback(mock.sentinel.manager, ValueError('meep')) + + with pytest.raises(ValueError): + future.result() + + assert not future.running() + + def test_cancel(self): + future = self.make_future() + + future.cancel() + + future._manager.close.assert_called_once() + assert future.cancelled() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py index c2cdde4bafd9..6c16276e8f15 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_leaser.py @@ -16,17 +16,18 @@ import threading from google.cloud.pubsub_v1 import types -from google.cloud.pubsub_v1.subscriber import subscriber +from google.cloud.pubsub_v1.subscriber._protocol import dispatcher from google.cloud.pubsub_v1.subscriber._protocol import histogram from google.cloud.pubsub_v1.subscriber._protocol import leaser from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager import mock import pytest def test_add_and_remove(): - leaser_ = leaser.Leaser(mock.sentinel.subscriber) + leaser_ = leaser.Leaser(mock.sentinel.manager) leaser_.add([ requests.LeaseRequest(ack_id='ack1', byte_size=50)]) @@ -48,7 +49,7 @@ def test_add_and_remove(): def test_add_already_managed(caplog): caplog.set_level(logging.DEBUG) - leaser_ = leaser.Leaser(mock.sentinel.subscriber) + leaser_ = leaser.Leaser(mock.sentinel.manager) leaser_.add([ requests.LeaseRequest(ack_id='ack1', byte_size=50)]) @@ -61,7 +62,7 @@ def test_add_already_managed(caplog): def test_remove_not_managed(caplog): caplog.set_level(logging.DEBUG) - leaser_ = leaser.Leaser(mock.sentinel.subscriber) + leaser_ = leaser.Leaser(mock.sentinel.manager) leaser_.remove([ requests.DropRequest(ack_id='ack1', byte_size=50)]) @@ -72,7 +73,7 @@ def test_remove_not_managed(caplog): def test_remove_negative_bytes(caplog): caplog.set_level(logging.DEBUG) - leaser_ = leaser.Leaser(mock.sentinel.subscriber) + leaser_ = leaser.Leaser(mock.sentinel.manager) leaser_.add([ requests.LeaseRequest(ack_id='ack1', byte_size=50)]) @@ -83,20 +84,23 @@ def test_remove_negative_bytes(caplog): assert 'unexpectedly negative' in caplog.text -def create_subscriber(flow_control=types.FlowControl()): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - subscriber_.is_active = True - subscriber_.flow_control = flow_control - subscriber_.ack_histogram = histogram.Histogram() - return subscriber_ +def create_manager(flow_control=types.FlowControl()): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + manager.dispatcher = mock.create_autospec( + dispatcher.Dispatcher, instance=True) + manager.is_active = True + manager.flow_control = flow_control + manager.ack_histogram = histogram.Histogram() + return manager def test_maintain_leases_inactive(caplog): caplog.set_level(logging.INFO) - subscriber_ = create_subscriber() - subscriber_.is_active = False + manager = create_manager() + manager.is_active = False - leaser_ = leaser.Leaser(subscriber_) + leaser_ = leaser.Leaser(manager) leaser_.maintain_leases() @@ -105,9 +109,9 @@ def test_maintain_leases_inactive(caplog): def test_maintain_leases_stopped(caplog): caplog.set_level(logging.INFO) - subscriber_ = create_subscriber() + manager = create_manager() - leaser_ = leaser.Leaser(subscriber_) + leaser_ = leaser.Leaser(manager) leaser_.stop() leaser_.maintain_leases() @@ -115,25 +119,25 @@ def test_maintain_leases_stopped(caplog): assert 'exiting' in caplog.text -def make_sleep_mark_subscriber_as_inactive(sleep, subscriber): - # Make sleep mark the subscriber as inactive so that maintain_leases +def make_sleep_mark_manager_as_inactive(sleep, manager): + # Make sleep mark the manager as inactive so that maintain_leases # exits at the end of the first run. def trigger_inactive(seconds): assert 0 < seconds < 10 - subscriber.is_active = False + manager.is_active = False sleep.side_effect = trigger_inactive @mock.patch('time.sleep', autospec=True) def test_maintain_leases_ack_ids(sleep): - subscriber_ = create_subscriber() - make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) - leaser_ = leaser.Leaser(subscriber_) + manager = create_manager() + make_sleep_mark_manager_as_inactive(sleep, manager) + leaser_ = leaser.Leaser(manager) leaser_.add([requests.LeaseRequest(ack_id='my ack id', byte_size=50)]) leaser_.maintain_leases() - subscriber_.modify_ack_deadline.assert_called_once_with([ + manager.dispatcher.modify_ack_deadline.assert_called_once_with([ requests.ModAckRequest( ack_id='my ack id', seconds=10, @@ -144,22 +148,22 @@ def test_maintain_leases_ack_ids(sleep): @mock.patch('time.sleep', autospec=True) def test_maintain_leases_no_ack_ids(sleep): - subscriber_ = create_subscriber() - make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) - leaser_ = leaser.Leaser(subscriber_) + manager = create_manager() + make_sleep_mark_manager_as_inactive(sleep, manager) + leaser_ = leaser.Leaser(manager) leaser_.maintain_leases() - subscriber_.modify_ack_deadline.assert_not_called() + manager.dispatcher.modify_ack_deadline.assert_not_called() sleep.assert_called() @mock.patch('time.time', autospec=True) @mock.patch('time.sleep', autospec=True) def test_maintain_leases_outdated_items(sleep, time): - subscriber_ = create_subscriber() - make_sleep_mark_subscriber_as_inactive(sleep, subscriber_) - leaser_ = leaser.Leaser(subscriber_) + manager = create_manager() + make_sleep_mark_manager_as_inactive(sleep, manager) + leaser_ = leaser.Leaser(manager) # Add these items at the beginning of the timeline time.return_value = 0 @@ -167,23 +171,23 @@ def test_maintain_leases_outdated_items(sleep, time): requests.LeaseRequest(ack_id='ack1', byte_size=50)]) # Add another item at towards end of the timeline - time.return_value = subscriber_.flow_control.max_lease_duration - 1 + time.return_value = manager.flow_control.max_lease_duration - 1 leaser_.add([ requests.LeaseRequest(ack_id='ack2', byte_size=50)]) # Now make sure time reports that we are at the end of our timeline. - time.return_value = subscriber_.flow_control.max_lease_duration + 1 + time.return_value = manager.flow_control.max_lease_duration + 1 leaser_.maintain_leases() # Only ack2 should be renewed. ack1 should've been dropped - subscriber_.modify_ack_deadline.assert_called_once_with([ + manager.dispatcher.modify_ack_deadline.assert_called_once_with([ requests.ModAckRequest( ack_id='ack2', seconds=10, ) ]) - subscriber_.drop.assert_called_once_with([ + manager.dispatcher.drop.assert_called_once_with([ requests.DropRequest(ack_id='ack1', byte_size=50) ]) sleep.assert_called() @@ -191,8 +195,9 @@ def test_maintain_leases_outdated_items(sleep, time): @mock.patch('threading.Thread', autospec=True) def test_start(thread): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - leaser_ = leaser.Leaser(subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + leaser_ = leaser.Leaser(manager) leaser_.start() @@ -206,8 +211,9 @@ def test_start(thread): @mock.patch('threading.Thread', autospec=True) def test_start_already_started(thread): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - leaser_ = leaser.Leaser(subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + leaser_ = leaser.Leaser(manager) leaser_._thread = mock.sentinel.thread with pytest.raises(ValueError): @@ -217,8 +223,9 @@ def test_start_already_started(thread): def test_stop(): - subscriber_ = mock.create_autospec(subscriber.Subscriber, instance=True) - leaser_ = leaser.Leaser(subscriber_) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + leaser_ = leaser.Leaser(manager) thread = mock.create_autospec(threading.Thread, instance=True) leaser_._thread = thread @@ -230,6 +237,6 @@ def test_stop(): def test_stop_no_join(): - leaser_ = leaser.Leaser(mock.sentinel.subscriber) + leaser_ = leaser.Leaser(mock.sentinel.manager) leaser_.stop() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py index 1587c0c1866a..431d39bb6afc 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py @@ -19,7 +19,7 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber import message -from google.cloud.pubsub_v1.subscriber.policy import base +from google.cloud.pubsub_v1.subscriber._protocol import requests def create_message(data, ack_id='ACKID', **attrs): @@ -78,56 +78,56 @@ def test_ack(): msg = create_message(b'foo', ack_id='bogus_ack_id') with mock.patch.object(msg._request_queue, 'put') as put: msg.ack() - put.assert_called_once_with(base.AckRequest( + put.assert_called_once_with(requests.AckRequest( ack_id='bogus_ack_id', byte_size=25, time_to_ack=mock.ANY, )) - check_call_types(put, base.AckRequest) + check_call_types(put, requests.AckRequest) def test_drop(): msg = create_message(b'foo', ack_id='bogus_ack_id') with mock.patch.object(msg._request_queue, 'put') as put: msg.drop() - put.assert_called_once_with(base.DropRequest( + put.assert_called_once_with(requests.DropRequest( ack_id='bogus_ack_id', byte_size=25, )) - check_call_types(put, base.DropRequest) + check_call_types(put, requests.DropRequest) def test_lease(): msg = create_message(b'foo', ack_id='bogus_ack_id') with mock.patch.object(msg._request_queue, 'put') as put: msg.lease() - put.assert_called_once_with(base.LeaseRequest( + put.assert_called_once_with(requests.LeaseRequest( ack_id='bogus_ack_id', byte_size=25, )) - check_call_types(put, base.LeaseRequest) + check_call_types(put, requests.LeaseRequest) def test_modify_ack_deadline(): msg = create_message(b'foo', ack_id='bogus_ack_id') with mock.patch.object(msg._request_queue, 'put') as put: msg.modify_ack_deadline(60) - put.assert_called_once_with(base.ModAckRequest( + put.assert_called_once_with(requests.ModAckRequest( ack_id='bogus_ack_id', seconds=60, )) - check_call_types(put, base.ModAckRequest) + check_call_types(put, requests.ModAckRequest) def test_nack(): msg = create_message(b'foo', ack_id='bogus_ack_id') with mock.patch.object(msg._request_queue, 'put') as put: msg.nack() - put.assert_called_once_with(base.NackRequest( + put.assert_called_once_with(requests.NackRequest( ack_id='bogus_ack_id', byte_size=25, )) - check_call_types(put, base.NackRequest) + check_call_types(put, requests.NackRequest) def test_repr(): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py new file mode 100644 index 000000000000..a6527dc4eb3b --- /dev/null +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -0,0 +1,396 @@ +# Copyright 2018, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import pytest + +from google.api_core import exceptions +from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.gapic import subscriber_client_config +from google.cloud.pubsub_v1.subscriber import client +from google.cloud.pubsub_v1.subscriber import message +from google.cloud.pubsub_v1.subscriber import scheduler +from google.cloud.pubsub_v1.subscriber._protocol import bidi +from google.cloud.pubsub_v1.subscriber._protocol import dispatcher +from google.cloud.pubsub_v1.subscriber._protocol import leaser +from google.cloud.pubsub_v1.subscriber._protocol import requests +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager +import grpc + + +@pytest.mark.parametrize('exception,expected_cls', [ + (ValueError('meep'), ValueError), + (mock.create_autospec(grpc.RpcError, instance=True), + exceptions.GoogleAPICallError), +]) +def test__maybe_wrap_exception(exception, expected_cls): + assert isinstance( + streaming_pull_manager._maybe_wrap_exception(exception), expected_cls) + + +def test__wrap_callback_errors_no_error(): + msg = mock.create_autospec(message.Message, instance=True) + callback = mock.Mock() + + streaming_pull_manager._wrap_callback_errors(callback, msg) + + callback.assert_called_once_with(msg) + msg.nack.assert_not_called() + + +def test__wrap_callback_errors_error(): + msg = mock.create_autospec(message.Message, instance=True) + callback = mock.Mock(side_effect=ValueError('meep')) + + streaming_pull_manager._wrap_callback_errors(callback, msg) + + msg.nack.assert_called_once() + + +def test_constructor_and_default_state(): + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription) + + # Public state + assert manager.is_active is False + assert manager.flow_control == types.FlowControl() + assert manager.dispatcher is None + assert manager.leaser is None + assert manager.ack_histogram is not None + assert manager.ack_deadline == 10 + assert manager.load == 0 + + # Private state + assert manager._client == mock.sentinel.client + assert manager._subscription == mock.sentinel.subscription + assert manager._scheduler is not None + + +def test_constructor_with_options(): + manager = streaming_pull_manager.StreamingPullManager( + mock.sentinel.client, + mock.sentinel.subscription, + flow_control=mock.sentinel.flow_control, + scheduler=mock.sentinel.scheduler) + + assert manager.flow_control == mock.sentinel.flow_control + assert manager._scheduler == mock.sentinel.scheduler + + +def make_manager(**kwargs): + client_ = mock.create_autospec(client.Client, instance=True) + scheduler_ = mock.create_autospec(scheduler.Scheduler, instance=True) + return streaming_pull_manager.StreamingPullManager( + client_, + 'subscription-name', + scheduler=scheduler_, + **kwargs) + + +def test_ack_deadline(): + manager = make_manager() + assert manager.ack_deadline == 10 + manager.ack_histogram.add(20) + assert manager.ack_deadline == 20 + manager.ack_histogram.add(10) + assert manager.ack_deadline == 20 + + +def test_lease_load_and_pause(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000)) + manager._leaser = leaser.Leaser(manager) + manager._consumer = mock.create_autospec( + bidi.BackgroundConsumer, instance=True) + manager._consumer.is_paused = False + + # This should mean that our messages count is at 10%, and our bytes + # are at 15%; load should return the higher (0.15), and shouldn't cause + # the consumer to pause. + manager.leaser.add([requests.LeaseRequest(ack_id='one', byte_size=150)]) + assert manager.load == 0.15 + manager.maybe_pause_consumer() + manager._consumer.pause.assert_not_called() + + # After this message is added, the messages should be higher at 20% + # (versus 16% for bytes). + manager.leaser.add([requests.LeaseRequest(ack_id='two', byte_size=10)]) + assert manager.load == 0.2 + + # Returning a number above 100% is fine, and it should cause this to pause. + manager.leaser.add([requests.LeaseRequest(ack_id='three', byte_size=1000)]) + assert manager.load == 1.16 + manager.maybe_pause_consumer() + manager._consumer.pause.assert_called_once() + + +def test_drop_and_resume(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000)) + manager._leaser = leaser.Leaser(manager) + manager._consumer = mock.create_autospec( + bidi.BackgroundConsumer, instance=True) + manager._consumer.is_paused = True + + # Add several messages until we're over the load threshold. + manager.leaser.add([ + requests.LeaseRequest(ack_id='one', byte_size=750), + requests.LeaseRequest(ack_id='two', byte_size=250)]) + + assert manager.load == 1.0 + + # Trying to resume now should have no effect as we're over the threshold. + manager.maybe_resume_consumer() + manager._consumer.resume.assert_not_called() + + # Drop the 200 byte message, which should put us under the resume + # threshold. + manager.leaser.remove([ + requests.DropRequest(ack_id='two', byte_size=250)]) + manager.maybe_resume_consumer() + manager._consumer.resume.assert_called_once() + + +def test_resume_not_paused(): + manager = make_manager() + manager._consumer = mock.create_autospec( + bidi.BackgroundConsumer, instance=True) + manager._consumer.is_paused = False + + # Resuming should have no effect is the consumer is not actually paused. + manager.maybe_resume_consumer() + manager._consumer.resume.assert_not_called() + + +def test_send(): + manager = make_manager() + manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True) + + manager.send(mock.sentinel.request) + + manager._rpc.send.assert_called_once_with(mock.sentinel.request) + + +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.bidi.ResumableBidiRpc', + autospec=True) +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.bidi.BackgroundConsumer', + autospec=True) +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.leaser.Leaser', + autospec=True) +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher', + autospec=True) +def test_open(dispatcher, leaser, background_consumer, resumable_bidi_rpc): + manager = make_manager() + + manager.open(mock.sentinel.callback) + + dispatcher.assert_called_once_with(manager, manager._scheduler.queue) + dispatcher.return_value.start.assert_called_once() + assert manager._dispatcher == dispatcher.return_value + + leaser.assert_called_once_with(manager) + leaser.return_value.start.assert_called_once() + assert manager.leaser == leaser.return_value + + background_consumer.assert_called_once_with( + manager._rpc, manager._on_response) + background_consumer.return_value.start.assert_called_once() + assert manager._consumer == background_consumer.return_value + + resumable_bidi_rpc.assert_called_once_with( + start_rpc=manager._client.api.streaming_pull, + initial_request=manager._get_initial_request, + should_recover=manager._should_recover) + resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( + manager._on_rpc_done) + assert manager._rpc == resumable_bidi_rpc.return_value + + manager._consumer.is_active = True + assert manager.is_active is True + + +def test_open_already_active(): + manager = make_manager() + manager._consumer = mock.create_autospec( + bidi.BackgroundConsumer, instance=True) + manager._consumer.is_active = True + + with pytest.raises(ValueError, match='already open'): + manager.open(mock.sentinel.callback) + + +def test_open_has_been_closed(): + manager = make_manager() + manager._closed = True + + with pytest.raises(ValueError, match='closed'): + manager.open(mock.sentinel.callback) + + +def make_running_manager(): + manager = make_manager() + manager._consumer = mock.create_autospec( + bidi.BackgroundConsumer, instance=True) + manager._consumer.is_active = True + manager._dispatcher = mock.create_autospec( + dispatcher.Dispatcher, instance=True) + manager._leaser = mock.create_autospec( + leaser.Leaser, instance=True) + + return ( + manager, manager._consumer, manager._dispatcher, manager._leaser, + manager._scheduler) + + +def test_close(): + manager, consumer, dispatcher, leaser, scheduler = make_running_manager() + + manager.close() + + consumer.stop.assert_called_once() + leaser.stop.assert_called_once() + dispatcher.stop.assert_called_once() + scheduler.shutdown.assert_called_once() + + assert manager.is_active is False + + +def test_close_inactive_consumer(): + manager, consumer, dispatcher, leaser, scheduler = make_running_manager() + consumer.is_active = False + + manager.close() + + consumer.stop.assert_not_called() + leaser.stop.assert_called_once() + dispatcher.stop.assert_called_once() + scheduler.shutdown.assert_called_once() + + +def test_close_idempotent(): + manager, _, _, _, scheduler = make_running_manager() + + manager.close() + manager.close() + + assert scheduler.shutdown.call_count == 1 + + +def test_close_callbacks(): + manager, _, _, _, _ = make_running_manager() + + callback = mock.Mock() + + manager.add_close_callback(callback) + manager.close(reason='meep') + + callback.assert_called_once_with(manager, 'meep') + + +def test__get_initial_request(): + manager = make_manager() + manager._leaser = mock.create_autospec( + leaser.Leaser, instance=True) + manager._leaser.ack_ids = ['1', '2'] + + initial_request = manager._get_initial_request() + + assert isinstance(initial_request, types.StreamingPullRequest) + assert initial_request.subscription == 'subscription-name' + assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.modify_deadline_ack_ids == ['1', '2'] + assert initial_request.modify_deadline_seconds == [10, 10] + + +def test_on_response(): + manager, _, dispatcher, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = types.StreamingPullResponse( + received_messages=[ + types.ReceivedMessage( + ack_id='fack', + message=types.PubsubMessage(data=b'foo', message_id='1') + ), + types.ReceivedMessage( + ack_id='back', + message=types.PubsubMessage(data=b'bar', message_id='2') + ), + ], + ) + + # Actually run the method and prove that modack and schedule + # are called in the expected way. + manager._on_response(response) + + dispatcher.modify_ack_deadline.assert_called_once_with( + [requests.ModAckRequest('fack', 10), + requests.ModAckRequest('back', 10)] + ) + + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + for call in schedule_calls: + assert call[1][0] == mock.sentinel.callback + assert isinstance(call[1][1], message.Message) + + +def test_retryable_stream_errors(): + # Make sure the config matches our hard-coded tuple of exceptions. + interfaces = subscriber_client_config.config['interfaces'] + retry_codes = interfaces['google.pubsub.v1.Subscriber']['retry_codes'] + idempotent = retry_codes['idempotent'] + + status_codes = tuple( + getattr(grpc.StatusCode, name, None) + for name in idempotent + ) + expected = tuple( + exceptions.exception_class_for_grpc_status(status_code) + for status_code in status_codes + ) + assert set(expected).issubset( + set(streaming_pull_manager._RETRYABLE_STREAM_ERRORS)) + + +def test__should_recover_true(): + manager = make_manager() + + details = 'UNAVAILABLE. Service taking nap.' + exc = exceptions.ServiceUnavailable(details) + + assert manager._should_recover(exc) is True + + +def test__should_recover_false(): + manager = make_manager() + + exc = TypeError('wahhhhhh') + + assert manager._should_recover(exc) is False + + +def test__on_rpc_done(): + manager = make_manager() + + with mock.patch.object(manager, 'close') as close: + manager._on_rpc_done(mock.sentinel.error) + + close.assert_called_once_with(reason=mock.sentinel.error) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 3d4169f7ab28..c5d29ecfafdc 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -17,6 +17,7 @@ import pytest from google.cloud.pubsub_v1 import subscriber +from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber.policy import thread @@ -64,3 +65,17 @@ def test_subscribe_with_failed_callback(): with pytest.raises(TypeError) as exc_info: client.subscribe('sub_name_b', callback) assert callback in str(exc_info.value) + + +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager.' + 'StreamingPullManager.open', autospec=True) +def test_subscribe_experimental(manager_open): + creds = mock.Mock(spec=credentials.Credentials) + client = subscriber.Client(credentials=creds) + + future = client.subscribe_experimental( + 'sub_name_a', callback=mock.sentinel.callback) + assert isinstance(future, futures.StreamingPullFuture) + + manager_open.assert_called_once_with(mock.ANY, mock.sentinel.callback)