Skip to content
Permalink
Browse files
feat: Add dead lettering max delivery attempts argument (#236)
* Add max_delivery_attempts input to subsciber.py

Add functionality so users could set max_delivery_attempts while creating or updating a subscription with dead lettering enabled instead of it's value being set to an arbitrary number.

* Make max_delivery_attempts argument optional

Make the argument optional and set the value to 5 if the user doesn't set it just like Cloud Pub/Sub does.

* Add max_delivery_attempts parameter to create and update subscription with dead lettering calls

Added max delivery attempts parameter to calls to update and create subscriber to match the methods in subscriber.py

* Add constants and defaults for max_delivery_attempts argument

* Fix comments related to added max_delivery_attempts parameter

* Fix typo in max_delivery_attempts comments
  • Loading branch information
danavaziri-ga committed Nov 16, 2020
1 parent 94d738c commit 7687ae500bdb9c76e3ffb23302b4f32dc9627d81
Showing with 36 additions and 14 deletions.
  1. +30 −11 samples/snippets/subscriber.py
  2. +6 −3 samples/snippets/subscriber_test.py
@@ -90,7 +90,8 @@ def create_subscription(project_id, topic_id, subscription_id):


def create_subscription_with_dead_letter_topic(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Create a subscription with dead letter policy."""
# [START pubsub_dead_letter_create_subscription]
@@ -108,6 +109,9 @@ def create_subscription_with_dead_letter_topic(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
@@ -117,7 +121,8 @@ def create_subscription_with_dead_letter_topic(
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

with subscriber:
@@ -259,7 +264,8 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint):


def update_subscription_with_dead_letter_policy(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Update a subscription's dead letter policy."""
# [START pubsub_dead_letter_update_subscription]
@@ -276,6 +282,9 @@ def update_subscription_with_dead_letter_policy(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
@@ -290,11 +299,12 @@ def update_subscription_with_dead_letter_policy(
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"])
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

# Construct the subscription with the dead letter policy you expect to have
@@ -339,12 +349,7 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id):
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(
paths=[
"dead_letter_policy.dead_letter_topic",
"dead_letter_policy.max_delivery_attempts",
]
)
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
@@ -676,6 +681,12 @@ def callback(message):
create_with_dead_letter_policy_parser.add_argument("topic_id")
create_with_dead_letter_policy_parser.add_argument("subscription_id")
create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
create_with_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

create_push_parser = subparsers.add_parser(
"create-push", help=create_push_subscription.__doc__
@@ -707,6 +718,12 @@ def callback(message):
update_dead_letter_policy_parser.add_argument("topic_id")
update_dead_letter_policy_parser.add_argument("subscription_id")
update_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
update_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

remove_dead_letter_policy_parser = subparsers.add_parser(
"remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__
@@ -777,6 +794,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "create-push":
create_push_subscription(
@@ -798,6 +816,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
@@ -32,6 +32,8 @@
SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID
ENDPOINT = "https://{}.appspot.com/push".format(PROJECT_ID)
NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT_ID)
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20


@pytest.fixture(scope="module")
@@ -214,18 +216,19 @@ def test_create_subscription_with_dead_letter_policy(
out, _ = capsys.readouterr()
assert f"Subscription created: {subscription_dlq}" in out
assert f"It will forward dead letter messages to: {dead_letter_topic}" in out
assert "After 10 delivery attempts." in out
assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out


def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys):
_ = subscriber.update_subscription_with_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC,
UPDATED_MAX_DELIVERY_ATTEMPTS
)

out, _ = capsys.readouterr()
assert dead_letter_topic in out
assert subscription_dlq in out
assert "max_delivery_attempts: 20" in out
assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out


def test_create_subscription_with_ordering(

0 comments on commit 7687ae5

Please sign in to comment.