Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
docs: block until the streaming pull shuts down (#424)
Fixes #423.

If subscriber client is used as a context manager, we need to block until the shutdown is complete before leaving the `with` block. See the issue description for more details.

**PR checklist:**
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
  • Loading branch information
plamut committed Jun 14, 2021
1 parent c757a5e commit d0d0b70
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
3 changes: 2 additions & 1 deletion google/cloud/pubsub_v1/subscriber/client.py
Expand Up @@ -188,7 +188,8 @@ def callback(message):
try:
future.result()
except KeyboardInterrupt:
future.cancel()
future.cancel() # Trigger the shutdown.
future.result() # Block until the shutdown is complete.
Args:
subscription (str): The name of the subscription. The
Expand Down
3 changes: 2 additions & 1 deletion samples/snippets/quickstart/sub.py
Expand Up @@ -43,7 +43,8 @@ def callback(message):
# exiting while messages get processed in the callbacks.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.

subscriber_client.close()

Expand Down
6 changes: 4 additions & 2 deletions samples/snippets/schema.py
Expand Up @@ -343,7 +343,8 @@ def callback(message):
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscribe_avro_records]


Expand Down Expand Up @@ -393,7 +394,8 @@ def callback(message):
# unless an exception occurs first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscribe_proto_messages]


Expand Down
15 changes: 10 additions & 5 deletions samples/snippets/subscriber.py
Expand Up @@ -397,7 +397,8 @@ def callback(message):
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]

Expand Down Expand Up @@ -436,7 +437,8 @@ def callback(message):
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_async_pull_custom_attributes]


Expand Down Expand Up @@ -474,7 +476,8 @@ def callback(message):
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_flow_settings]


Expand Down Expand Up @@ -663,10 +666,11 @@ def callback(message):
try:
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
f"Listening for messages on {subscription_path} threw an exception: {e}."
)
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_error_listener]


Expand Down Expand Up @@ -697,7 +701,8 @@ def callback(message):
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_dead_letter_delivery_attempt]


Expand Down

0 comments on commit d0d0b70

Please sign in to comment.