Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,36 @@ def __iter__(self):
yield item


def _pausable_response_iterator(iterator, can_continue, period=1):
"""Converts a gRPC response iterator into one that can be paused.

The ``can_continue`` event can be used by an independent, concurrent
worker to pause and resume the iteration over ``iterator``.

Args:
iterator (grpc.RpcContext, Iterator[protobuf.Message]): A
``grpc.RpcContext`` instance that is also an iterator of responses.
This is a typically returned from grpc's streaming response call
types.
can_continue (threading.Event): An event which determines if we
can advance to the next iteration. Will be ``wait()``-ed on
before consuming more items from the iterator.
period (float): The number of seconds to wait to be able to consume
before checking if the RPC is cancelled. In practice, this
determines the maximum amount of time that ``next()`` on this
iterator will block after the RPC is cancelled.

Yields:
Any: The items yielded from ``iterator``.
"""
while True:
can_yield = can_continue.wait(timeout=period)
# Calling next() on a cancelled RPC will cause it to raise the
# grpc.RpcError associated with the cancellation.
if can_yield or not iterator.is_active():
yield next(iterator)


class Consumer(object):
"""Bi-directional streaming RPC consumer.

Expand Down Expand Up @@ -328,7 +358,7 @@ def _blocking_consume(self, policy):
self._request_queue, initial_request=initial_request)
rpc = policy.call_rpc(iter(request_generator))
request_generator.rpc = rpc
responses = _pausable_iterator(rpc, self._can_consume)
responses = _pausable_response_iterator(rpc, self._can_consume)
try:
for response in responses:
_LOGGER.debug('Received response on stream')
Expand Down Expand Up @@ -439,23 +469,3 @@ def stop_consuming(self):
"""
thread = self._stop_no_join()
thread.join()


def _pausable_iterator(iterator, can_continue):
"""Converts a standard iterator into one that can be paused.

The ``can_continue`` event can be used by an independent, concurrent
worker to pause and resume the iteration over ``iterator``.

Args:
iterator (Iterator): Any iterator to be iterated over.
can_continue (threading.Event): An event which determines if we
can advance to the next iteration. Will be ``wait()``-ed on
before

Yields:
Any: The items from ``iterator``.
"""
while True:
can_continue.wait()
yield next(iterator)
61 changes: 60 additions & 1 deletion pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,38 @@ def test_exit_with_stop(self):
assert items == []


class _ResponseIterator(object):
def __init__(self, items, active=True):
self._items = iter(items)
self._active = active

def is_active(self):
return self._active

def __next__(self):
return next(self._items)

next = __next__


def test__pausable_response_iterator_active_but_cant_consume():
# Note: we can't autospec threading.Event because it's goofy on Python 2.
can_consume = mock.Mock(spec=['wait'])
# First call will return false, indicating the loop should try again.
# second call will allow it to consume the first (and only) item.
can_consume.wait.side_effect = [False, True]
iterator = _ResponseIterator([1])

print(can_consume)

pausable_iter = _consumer._pausable_response_iterator(
iterator, can_consume)

items = list(pausable_iter)

assert items == [1]


def test_send_request():
consumer = _consumer.Consumer()
request = types.StreamingPullRequest(subscription='foo')
Expand Down Expand Up @@ -176,9 +208,10 @@ class RaisingResponseGenerator(object):
# rather than the **class** will not be iterable in Python 2.
# This is problematic since a `Mock` just sets members.

def __init__(self, exception):
def __init__(self, exception, active=True):
self.exception = exception
self.next_calls = 0
self._active = active

def __next__(self):
self.next_calls += 1
Expand All @@ -187,6 +220,32 @@ def __next__(self):
def next(self):
return self.__next__() # Python 2

def is_active(self):
return self._active


def test_blocking_consume_iter_exception_while_paused():
policy = mock.create_autospec(base.BasePolicy, instance=True)
exc = TypeError('Bad things!')
policy.call_rpc.return_value = RaisingResponseGenerator(
exc, active=False)

consumer = _consumer.Consumer()
# Ensure the consume is paused.
consumer.pause()
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
policy.on_exception.side_effect = OnException()

# Start the thread. It should not block forever but should notice the rpc
# is inactive and raise the exception from the stream and then exit
# because on_exception returns false.
consumer._blocking_consume(policy)
assert consumer._consumer_thread is None

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_exception.assert_called_once_with(exc)


def test_blocking_consume_two_exceptions():
policy = mock.create_autospec(base.BasePolicy, instance=True)
Expand Down