diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py index 1cf757c52bd0..63e41dbe8ef0 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py @@ -228,6 +228,36 @@ def __iter__(self): yield item +def _pausable_response_iterator(iterator, can_continue, period=1): + """Converts a gRPC response iterator into one that can be paused. + + The ``can_continue`` event can be used by an independent, concurrent + worker to pause and resume the iteration over ``iterator``. + + Args: + iterator (grpc.RpcContext, Iterator[protobuf.Message]): A + ``grpc.RpcContext`` instance that is also an iterator of responses. + This is a typically returned from grpc's streaming response call + types. + can_continue (threading.Event): An event which determines if we + can advance to the next iteration. Will be ``wait()``-ed on + before consuming more items from the iterator. + period (float): The number of seconds to wait to be able to consume + before checking if the RPC is cancelled. In practice, this + determines the maximum amount of time that ``next()`` on this + iterator will block after the RPC is cancelled. + + Yields: + Any: The items yielded from ``iterator``. + """ + while True: + can_yield = can_continue.wait(timeout=period) + # Calling next() on a cancelled RPC will cause it to raise the + # grpc.RpcError associated with the cancellation. + if can_yield or not iterator.is_active(): + yield next(iterator) + + class Consumer(object): """Bi-directional streaming RPC consumer. @@ -328,7 +358,7 @@ def _blocking_consume(self, policy): self._request_queue, initial_request=initial_request) rpc = policy.call_rpc(iter(request_generator)) request_generator.rpc = rpc - responses = _pausable_iterator(rpc, self._can_consume) + responses = _pausable_response_iterator(rpc, self._can_consume) try: for response in responses: _LOGGER.debug('Received response on stream') @@ -439,23 +469,3 @@ def stop_consuming(self): """ thread = self._stop_no_join() thread.join() - - -def _pausable_iterator(iterator, can_continue): - """Converts a standard iterator into one that can be paused. - - The ``can_continue`` event can be used by an independent, concurrent - worker to pause and resume the iteration over ``iterator``. - - Args: - iterator (Iterator): Any iterator to be iterated over. - can_continue (threading.Event): An event which determines if we - can advance to the next iteration. Will be ``wait()``-ed on - before - - Yields: - Any: The items from ``iterator``. - """ - while True: - can_continue.wait() - yield next(iterator) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py index f54de7484d22..31cd5ec66d04 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py @@ -105,6 +105,38 @@ def test_exit_with_stop(self): assert items == [] +class _ResponseIterator(object): + def __init__(self, items, active=True): + self._items = iter(items) + self._active = active + + def is_active(self): + return self._active + + def __next__(self): + return next(self._items) + + next = __next__ + + +def test__pausable_response_iterator_active_but_cant_consume(): + # Note: we can't autospec threading.Event because it's goofy on Python 2. + can_consume = mock.Mock(spec=['wait']) + # First call will return false, indicating the loop should try again. + # second call will allow it to consume the first (and only) item. + can_consume.wait.side_effect = [False, True] + iterator = _ResponseIterator([1]) + + print(can_consume) + + pausable_iter = _consumer._pausable_response_iterator( + iterator, can_consume) + + items = list(pausable_iter) + + assert items == [1] + + def test_send_request(): consumer = _consumer.Consumer() request = types.StreamingPullRequest(subscription='foo') @@ -176,9 +208,10 @@ class RaisingResponseGenerator(object): # rather than the **class** will not be iterable in Python 2. # This is problematic since a `Mock` just sets members. - def __init__(self, exception): + def __init__(self, exception, active=True): self.exception = exception self.next_calls = 0 + self._active = active def __next__(self): self.next_calls += 1 @@ -187,6 +220,32 @@ def __next__(self): def next(self): return self.__next__() # Python 2 + def is_active(self): + return self._active + + +def test_blocking_consume_iter_exception_while_paused(): + policy = mock.create_autospec(base.BasePolicy, instance=True) + exc = TypeError('Bad things!') + policy.call_rpc.return_value = RaisingResponseGenerator( + exc, active=False) + + consumer = _consumer.Consumer() + # Ensure the consume is paused. + consumer.pause() + consumer._consumer_thread = mock.Mock(spec=threading.Thread) + policy.on_exception.side_effect = OnException() + + # Start the thread. It should not block forever but should notice the rpc + # is inactive and raise the exception from the stream and then exit + # because on_exception returns false. + consumer._blocking_consume(policy) + assert consumer._consumer_thread is None + + # Check mocks. + policy.call_rpc.assert_called_once() + policy.on_exception.assert_called_once_with(exc) + def test_blocking_consume_two_exceptions(): policy = mock.create_autospec(base.BasePolicy, instance=True)