Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Jun 17, 2024
1 parent 1ffaf01 commit bb2a52a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
21 changes: 16 additions & 5 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->


def optimistic_subscribe(
project_id: str, topic_id: str, subscription_id: str, timeout: Optional[float] = None
project_id: str,
topic_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
Expand Down Expand Up @@ -129,7 +132,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
Expand All @@ -153,13 +158,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:

# Subscribe on the created subscription.
try:
streaming_pull_future = subscriber.subscribe(subscription.name, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription.name, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except Exception as e:
print(f"Exception occurred when creating subscription and subscribing to it: {e}")
print(
f"Exception occurred when creating subscription and subscribing to it: {e}"
)
except Exception as e:
print(f"Exception occurred when attempting optimistic subscribe: {e}")
# [END pubsub_optimistic_subscribe]
Expand Down Expand Up @@ -1384,7 +1393,9 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
elif args.command == "optimistic-subscribe":
optimistic_subscribe(args.project_id, args.topic_id, args.subscription_id, args.timeout)
optimistic_subscribe(
args.project_id, args.topic_id, args.subscription_id, args.timeout
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_id, args.timeout)
elif args.command == "receive-custom-attributes":
Expand Down
14 changes: 6 additions & 8 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,10 @@ def test_optimistic_subscribe(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
publisher_client: pubsub_v1.PublisherClient,
capsys: CaptureFixture[str]
capsys: CaptureFixture[str],
) -> None:
subscription_id = (
f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
)
subscription_path = subscriber_client.subscription_path(
PROJECT_ID, subscription_id
)
subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id)
# Ensure there is no pre-existing subscription.
# So that we can test the case where optimistic subscribe fails.
try:
Expand Down Expand Up @@ -286,7 +282,9 @@ def test_optimistic_subscribe(

# Test case where creation of subscription fails for reasons other than
# a TimeoutError.
subscriber.optimistic_subscribe(PROJECT_ID, "not-existent-topic", subscription_id, 5)
subscriber.optimistic_subscribe(
PROJECT_ID, "not-existent-topic", subscription_id, 5
)
assert "Exception occurred when creating subscription and subscribing to it" in out

# Clean up resources created during test.
Expand Down

0 comments on commit bb2a52a

Please sign in to comment.