Skip to content
Permalink
Browse files
fix(pubsub): handle None in on response callback (#9982)
If the underlying RPC is shut down while pulling the messages with a
streming pull, the StreamingPullManager's _on_response() method is
invoked with None (as opposed to a StreamingPullResponse instance).

This commit handles this case and prevents an error in a background
thread on streaming pull manager shutdown.
  • Loading branch information
plamut authored and pradn committed Dec 16, 2019
1 parent cd7f479 commit 6596c4bae5526d82f5c1b5e0c243b2883404d51f
Showing with 22 additions and 0 deletions.
  1. +7 −0 google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
  2. +15 −0 tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
@@ -542,6 +542,13 @@ def _on_response(self, response):
After the messages have all had their ack deadline updated, execute
the callback for each message using the executor.
"""
if response is None:
_LOGGER.debug(
"Response callback invoked with None, likely due to a "
"transport shutdown."
)
return

_LOGGER.debug(
"Processing %s received message(s), currenty on hold %s (bytes %s).",
len(response.received_messages),
@@ -721,6 +721,21 @@ def test__on_response_with_leaser_overload():
assert msg.message_id in ("2", "3")


def test__on_response_none_data(caplog):
caplog.set_level(logging.DEBUG)

manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

# adjust message bookkeeping in leaser
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10)

manager._on_response(response=None)

scheduler.schedule.assert_not_called()
assert "callback invoked with None" in caplog.text


def test_retryable_stream_errors():
# Make sure the config matches our hard-coded tuple of exceptions.
interfaces = subscriber_client_config.config["interfaces"]

0 comments on commit 6596c4b

Please sign in to comment.