Skip to content

Commit

Permalink
samples: sample for receiving messages with exactly-once delivery ena…
Browse files Browse the repository at this point in the history
…bled (#588)

* Add receive_messages_with_exactly_once_delivery_enabled sample with its own region tag

* Address Tianzi and Mahesh's comments.

* Add code for arg parsing / integrate sample with infra

* Add sample test

* Reformat and remove min lease extension period setting from sample

* Address Tianzi's comments.

* Fix import of subscriber exceptions.
  • Loading branch information
pradn committed Mar 4, 2022
1 parent 2fb6e15 commit bb8e24e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
70 changes: 70 additions & 0 deletions samples/snippets/subscriber.py
Expand Up @@ -580,6 +580,61 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# [END pubsub_subscriber_blocking_shutdown]


def receive_messages_with_exactly_once_delivery_enabled(
project_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
"""Receives messages from a pull subscription with exactly-once delivery enabled."""
# [START pubsub_subscriber_exactly_once]
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")

# Use `ack_with_response()` instead of `ack()` to get a future that tracks
# the result of the acknowledge call. When exactly-once delivery is enabled
# on the subscription, the message is guaranteed to not be delivered again
# if the ack future succeeds.
ack_future = message.ack_with_response()

try:
# Block on result of acknowledge call.
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
ack_future.result(timeout=timeout)
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(
f"Ack for message {message.message_id} failed with error: {e.error_code}"
)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
# [END pubsub_subscriber_exactly_once]


def synchronous_pull(project_id: str, subscription_id: str) -> None:
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
Expand Down Expand Up @@ -881,6 +936,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
"timeout", default=None, type=float, nargs="?"
)

receive_messages_with_exactly_once_delivery_enabled_parser = subparsers.add_parser(
"receive-messages-with-exactly-once-delivery-enabled",
help=receive_messages_with_exactly_once_delivery_enabled.__doc__,
)
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
"subscription_id"
)
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
"timeout", default=None, type=float, nargs="?"
)

synchronous_pull_parser = subparsers.add_parser(
"receive-synchronously", help=synchronous_pull.__doc__
)
Expand Down Expand Up @@ -967,6 +1033,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
receive_messages_with_blocking_shutdown(
args.project_id, args.subscription_id, args.timeout
)
elif args.command == "receive-messages-with-exactly-once-delivery-enabled":
receive_messages_with_exactly_once_delivery_enabled(
args.project_id, args.subscription_id, args.timeout
)
elif args.command == "receive-synchronously":
synchronous_pull(args.project_id, args.subscription_id)
elif args.command == "receive-synchronously-with-lease":
Expand Down
28 changes: 28 additions & 0 deletions samples/snippets/subscriber_test.py
Expand Up @@ -624,6 +624,34 @@ def eventually_consistent_test() -> None:
eventually_consistent_test()


def test_receive_messages_with_exactly_once_delivery_enabled(
publisher_client: pubsub_v1.PublisherClient,
topic: str,
subscription_async: str,
capsys: CaptureFixture[str],
) -> None:

typed_backoff = cast(
Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60),
)

@typed_backoff
def eventually_consistent_test() -> None:
_publish_messages(publisher_client, topic)

subscriber.receive_messages_with_exactly_once_delivery_enabled(
PROJECT_ID, SUBSCRIPTION_ASYNC, 10
)

out, _ = capsys.readouterr()
assert "Listening" in out
assert subscription_async in out
assert "Received" in out
assert "Ack" in out

eventually_consistent_test()


def test_listen_for_errors(
publisher_client: pubsub_v1.PublisherClient,
topic: str,
Expand Down

0 comments on commit bb8e24e

Please sign in to comment.