Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,36 +342,48 @@ def _reopen(self):
# has exited.
self.request_generator = None

self.open()
# Note: we do not currently do any sort of backoff here. The
# assumption is that re-establishing the stream under normal
# circumstances will happen in intervals greater than 60s.
# However, it is possible in a degenerative case that the server
# closes the stream rapidly which would lead to thrashing here,
# but hopefully in those cases the server would return a non-
# retryable error.

try:
self.open()
# If re-opening or re-calling the method fails for any reason,
# consider it a terminal error and finalize the stream.
except Exception as exc:
self._finalize(exc)
raise

_LOGGER.info('Re-established stream')

def _recoverable(self, method, *args, **kwargs):
"""Wraps a method to recover the stream and retry on error.

If a recoverable error occurs, this will retry the RPC and retry the
method. If a second error occurs while retrying the method, it will
bubble up.
If a retryable error occurs while making the call, then the stream will
be re-opened and the method will be retried. This happens indefinitely
so long as the error is a retryable one. If an error occurs while
re-opening the stream, then this method will raise immediately and
trigger finalization of this object.

Args:
method (Callable[..., Any]): The method to call.
args: The args to pass to the method.
kwargs: The kwargs to pass to the method.
"""
try:
return method(*args, **kwargs)
while True:
try:
return method(*args, **kwargs)

except Exception as exc:
if not self._should_recover(exc):
self.close()
raise exc
except Exception as exc:
if not self._should_recover(exc):
self.close()
raise exc

try:
self._reopen()
return method(*args, **kwargs)
# If re-opening or re-calling the method fails for any reason, consider
# it a terminal error and finalize the object.
except Exception as exc:
self._finalize(exc)
raise

def send(self, request):
return self._recoverable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import google.cloud.pubsub_v1.subscriber.scheduler

_LOGGER = logging.getLogger(__name__)
_RPC_ERROR_THREAD_NAME = 'Thread-OnRpcTerminated'
_RETRYABLE_STREAM_ERRORS = (
exceptions.DeadlineExceeded,
exceptions.ServiceUnavailable,
Expand Down Expand Up @@ -414,11 +415,28 @@ def _should_recover(self, exception):
# If this is in the list of idempotent exceptions, then we want to
# recover.
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
logging.info('Observed recoverable stream error %s', exception)

This comment was marked as spam.

return True
logging.info('Observed non-recoverable stream error %s', exception)
return False

def _on_rpc_done(self, future):
"""Triggered whenever the underlying RPC terminates without recovery.

This is typically triggered from one of two threads: the background
consumer thread (when calling ``recv()`` produces a non-recoverable
error) or the grpc management thread (when cancelling the RPC).

This method is *non-blocking*. It will start another thread to deal
with shutting everything down. This is to prevent blocking in the
background consumer and preventing it from being ``joined()``.
"""
_LOGGER.info(
'RPC termination has signaled streaming pull manager shutdown.')
future = _maybe_wrap_exception(future)
self.close(reason=future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME,
target=self.close,
kwargs={'reason': future})
thread.daemon = True
thread.start()
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,12 @@ def test__should_recover_false():
assert manager._should_recover(exc) is False


def test__on_rpc_done():
@mock.patch('threading.Thread', autospec=True)
def test__on_rpc_done(thread):
manager = make_manager()

with mock.patch.object(manager, 'close') as close:
manager._on_rpc_done(mock.sentinel.error)
manager._on_rpc_done(mock.sentinel.error)

close.assert_called_once_with(reason=mock.sentinel.error)
thread.assert_called_once_with(
name=mock.ANY, target=manager.close,
kwargs={'reason': mock.sentinel.error})