From c5a8f552495bf66d7ad82adb32dda4df7f5dae0e Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Wed, 30 May 2018 10:03:23 -0700 Subject: [PATCH] Fix retrying of bidirectional RPCs and closing the streaming pull manager --- .../pubsub_v1/subscriber/_protocol/bidi.py | 46 ++++++++++++------- .../_protocol/streaming_pull_manager.py | 20 +++++++- .../subscriber/test_streaming_pull_manager.py | 10 ++-- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py index 6e361a1e1ff8..80824c55022b 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py @@ -342,36 +342,48 @@ def _reopen(self): # has exited. self.request_generator = None - self.open() + # Note: we do not currently do any sort of backoff here. The + # assumption is that re-establishing the stream under normal + # circumstances will happen in intervals greater than 60s. + # However, it is possible in a degenerative case that the server + # closes the stream rapidly which would lead to thrashing here, + # but hopefully in those cases the server would return a non- + # retryable error. + + try: + self.open() + # If re-opening or re-calling the method fails for any reason, + # consider it a terminal error and finalize the stream. + except Exception as exc: + self._finalize(exc) + raise + + _LOGGER.info('Re-established stream') def _recoverable(self, method, *args, **kwargs): """Wraps a method to recover the stream and retry on error. - If a recoverable error occurs, this will retry the RPC and retry the - method. If a second error occurs while retrying the method, it will - bubble up. + If a retryable error occurs while making the call, then the stream will + be re-opened and the method will be retried. This happens indefinitely + so long as the error is a retryable one. If an error occurs while + re-opening the stream, then this method will raise immediately and + trigger finalization of this object. Args: method (Callable[..., Any]): The method to call. args: The args to pass to the method. kwargs: The kwargs to pass to the method. """ - try: - return method(*args, **kwargs) + while True: + try: + return method(*args, **kwargs) - except Exception as exc: - if not self._should_recover(exc): - self.close() - raise exc + except Exception as exc: + if not self._should_recover(exc): + self.close() + raise exc - try: self._reopen() - return method(*args, **kwargs) - # If re-opening or re-calling the method fails for any reason, consider - # it a terminal error and finalize the object. - except Exception as exc: - self._finalize(exc) - raise def send(self, request): return self._recoverable( diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index aa0876798ad9..57661729a83f 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -33,6 +33,7 @@ import google.cloud.pubsub_v1.subscriber.scheduler _LOGGER = logging.getLogger(__name__) +_RPC_ERROR_THREAD_NAME = 'Thread-OnRpcTerminated' _RETRYABLE_STREAM_ERRORS = ( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, @@ -414,11 +415,28 @@ def _should_recover(self, exception): # If this is in the list of idempotent exceptions, then we want to # recover. if isinstance(exception, _RETRYABLE_STREAM_ERRORS): + logging.info('Observed recoverable stream error %s', exception) return True + logging.info('Observed non-recoverable stream error %s', exception) return False def _on_rpc_done(self, future): + """Triggered whenever the underlying RPC terminates without recovery. + + This is typically triggered from one of two threads: the background + consumer thread (when calling ``recv()`` produces a non-recoverable + error) or the grpc management thread (when cancelling the RPC). + + This method is *non-blocking*. It will start another thread to deal + with shutting everything down. This is to prevent blocking in the + background consumer and preventing it from being ``joined()``. + """ _LOGGER.info( 'RPC termination has signaled streaming pull manager shutdown.') future = _maybe_wrap_exception(future) - self.close(reason=future) + thread = threading.Thread( + name=_RPC_ERROR_THREAD_NAME, + target=self.close, + kwargs={'reason': future}) + thread.daemon = True + thread.start() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 61d040a26fc1..1aa979a504c9 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -420,10 +420,12 @@ def test__should_recover_false(): assert manager._should_recover(exc) is False -def test__on_rpc_done(): +@mock.patch('threading.Thread', autospec=True) +def test__on_rpc_done(thread): manager = make_manager() - with mock.patch.object(manager, 'close') as close: - manager._on_rpc_done(mock.sentinel.error) + manager._on_rpc_done(mock.sentinel.error) - close.assert_called_once_with(reason=mock.sentinel.error) + thread.assert_called_once_with( + name=mock.ANY, target=manager.close, + kwargs={'reason': mock.sentinel.error})