Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sample: add publish flow control sample and other nits #429

Merged
merged 4 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 85 additions & 23 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id):
def publish_messages_with_error_handler(project_id, topic_id):
# [START pubsub_publish_with_error_handler]
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time

from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
Expand All @@ -146,31 +145,28 @@ def publish_messages_with_error_handler(project_id, topic_id):

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

futures = dict()

def get_callback(f, data):
def callback(f):
def get_callback(publish_future, data):
def callback(publish_future):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
# Wait half 100 ms for the publish call to succeed.
print(publish_future.result(timeout=0.1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting only 100ms is likely to result in deadline exceeded failures. It's best not to set a deadline or if necessary, set a deadline closer to 60s.

except TimeoutError:
plamut marked this conversation as resolved.
Show resolved Hide resolved
print("Publishing {} timed out.".format(data))
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved

return callback

for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data.encode("utf-8"))
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
publish_future = publisher.publish(topic_path, data.encode("utf-8"))
# Non-blocking. Publish failures is handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")
# [END pubsub_publish_with_error_handler]
Expand All @@ -179,6 +175,7 @@ def callback(f):
def publish_messages_with_batch_settings(project_id, topic_id):
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
# [START pubsub_publisher_batch_settings]
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
Expand All @@ -194,6 +191,7 @@ def publish_messages_with_batch_settings(project_id, topic_id):
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future):
Expand All @@ -204,14 +202,66 @@ def callback(future):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
future = publisher.publish(topic_path, data)
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
future.add_done_callback(callback)
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with batch settings to {topic_path}.")
# [END pubsub_publisher_batch_settings]


def publish_messages_with_flow_control_settings(project_id, topic_id):
"""Publishes messages to a Pub/Sub topic with flow control settings."""
# [START pubsub_publisher_flow_control]
from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
LimitExceededBehavior,
PublisherOptions,
PublishFlowControl,
)

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
message_limit=100, # 100 messages
byte_limit=10 * 1024 * 1024, # 10 MiB
limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(publish_future):
message_id = publish_future.result()
print(message_id)

# Publish 1000 messages in quick succession to trigger flow control.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say "to trigger flow control." It's possible that flow control will be triggered, but it's also entirely possible it won't be if messages get out quickly enough. Either way, the fact that flow control was triggered isn't something that will be easily visible to the user. Maybe say "Rapidly publishing 1000 messages in a loop may be constrained by flow control."

for n in range(1, 1000):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with flow control settings to {topic_path}.")
# [END pubsub_publisher_flow_control]


def publish_messages_with_retry_settings(project_id, topic_id):
"""Publishes messages with custom retry settings."""
# [START pubsub_publisher_retry_settings]
Expand Down Expand Up @@ -365,7 +415,8 @@ def detach_subscription(project_id, subscription_id):

if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("project_id", help="Your Google Cloud project ID")

Expand All @@ -388,7 +439,8 @@ def detach_subscription(project_id, subscription_id):
publish_with_custom_attributes_parser.add_argument("topic_id")

publish_with_error_handler_parser = subparsers.add_parser(
"publish-with-error-handler", help=publish_messages_with_error_handler.__doc__,
"publish-with-error-handler",
help=publish_messages_with_error_handler.__doc__,
)
publish_with_error_handler_parser.add_argument("topic_id")

Expand All @@ -398,14 +450,21 @@ def detach_subscription(project_id, subscription_id):
)
publish_with_batch_settings_parser.add_argument("topic_id")

publish_with_flow_control_settings_parser = subparsers.add_parser(
"publish-with-flow-control",
help=publish_messages_with_flow_control_settings.__doc__,
)
publish_with_flow_control_settings_parser.add_argument("topic_id")

publish_with_retry_settings_parser = subparsers.add_parser(
"publish-with-retry-settings",
help=publish_messages_with_retry_settings.__doc__,
)
publish_with_retry_settings_parser.add_argument("topic_id")

publish_with_ordering_keys_parser = subparsers.add_parser(
"publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__,
"publish-with-ordering-keys",
help=publish_with_ordering_keys.__doc__,
)
publish_with_ordering_keys_parser.add_argument("topic_id")

Expand All @@ -416,7 +475,8 @@ def detach_subscription(project_id, subscription_id):
resume_publish_with_ordering_keys_parser.add_argument("topic_id")

detach_subscription_parser = subparsers.add_parser(
"detach-subscription", help=detach_subscription.__doc__,
"detach-subscription",
help=detach_subscription.__doc__,
)
detach_subscription_parser.add_argument("subscription_id")

Expand All @@ -436,6 +496,8 @@ def detach_subscription(project_id, subscription_id):
publish_messages_with_error_handler(args.project_id, args.topic_id)
elif args.command == "publish-with-batch-settings":
publish_messages_with_batch_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-flow-control":
publish_messages_with_flow_control_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-retry-settings":
publish_messages_with_retry_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-ordering-keys":
Expand Down
7 changes: 7 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ def test_publish_with_batch_settings(topic_path, capsys):
assert f"Published messages with batch settings to {topic_path}." in out


def test_publish_with_flow_control_settings(topic_path, capsys):
publisher.publish_messages_with_flow_control_settings(PROJECT_ID, TOPIC_ID)

out, _ = capsys.readouterr()
assert f"Published messages with flow control settings to {topic_path}." in out


def test_publish_with_retry_settings(topic_path, capsys):
publisher.publish_messages_with_retry_settings(PROJECT_ID, TOPIC_ID)

Expand Down