Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-1878419: pubsub emulator for local dev environment #6552

Merged
merged 2 commits into from Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
150 changes: 150 additions & 0 deletions bin/pubsub_cli.py
@@ -0,0 +1,150 @@
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

# Pub/Sub manipulation script.
#
# Note: Run this in the base container which has access to Pub/Sub.
#
# Usage: ./bin/pubsub_cli.py [SUBCOMMAND]

import click
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists, NotFound


@click.group()
def pubsub_group():
"""Local dev environment Pub/Sub emulator manipulation script."""


@pubsub_group.command("list_topics")
@click.argument("project_id")
@click.pass_context
def list_topics(ctx, project_id):
"""List topics for this project."""
click.echo(f"Listing topics in project {project_id}.")
publisher = pubsub_v1.PublisherClient()

for topic in publisher.list_topics(project=f"projects/{project_id}"):
click.echo(topic.name)


@pubsub_group.command("list_subscriptions")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def list_subscriptions(ctx, project_id, topic_name):
"""List subscriptions for a given topic."""
click.echo(f"Listing subscriptions in topic {topic_name!r}:")
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for subscription in publisher.list_topic_subscriptions(topic=topic_path):
click.echo(subscription)


@pubsub_group.command("create_topic")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def create_topic(ctx, project_id, topic_name):
"""Create topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

try:
publisher.create_topic(name=topic_path)
click.echo(f"Topic created: {topic_path}")
except AlreadyExists:
click.echo("Topic already created.")


@pubsub_group.command("create_subscription")
@click.argument("project_id")
@click.argument("topic_name")
@click.argument("subscription_name")
@click.pass_context
def create_subscription(ctx, project_id, topic_name, subscription_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
try:
subscriber.create_subscription(name=subscription_path, topic=topic_path)
click.echo(f"Subscription created: {subscription_path}")
except AlreadyExists:
click.echo("Subscription already created.")


@pubsub_group.command("delete_topic")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def delete_topic(ctx, project_id, topic_name):
"""Delete a topic."""
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_name)

# Delete all subscriptions
for subscription in publisher.list_topic_subscriptions(topic=topic_path):
click.echo(f"Deleting {subscription} ...")
subscriber.delete_subscription(subscription=subscription)

# Delete topic
try:
publisher.delete_topic(topic=topic_path)
click.echo(f"Topic deleted: {topic_name}")
except NotFound:
click.echo(f"Topic {topic_name} does not exist.")


@pubsub_group.command("publish")
@click.argument("project_id")
@click.argument("topic_name")
@click.argument("crash_id")
@click.pass_context
def publish(ctx, project_id, topic_name, crash_id):
"""Publish crash_id to a given topic."""
click.echo(f"Publishing crash_id to topic {topic_name!r}:")
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

future = publisher.publish(topic_path, crash_id.encode("utf-8"), timeout=5)
click.echo(future.result())


@pubsub_group.command("pull")
@click.argument("project_id")
@click.argument("subscription_name")
@click.option("--ack/--no-ack", is_flag=True, default=False)
@click.option("--max-messages", default=1, type=int)
@click.pass_context
def pull(ctx, project_id, subscription_name, ack, max_messages):
"""Pull crash id from a given subscription."""
click.echo(f"Pulling crash id from subscription {subscription_name!r}:")
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)

response = subscriber.pull(
subscription=subscription_path, max_messages=max_messages
)
if not response.received_messages:
return

ack_ids = []
for msg in response.received_messages:
click.echo(f"crash id: {msg.message.data}")
ack_ids.append(msg.ack_id)

if ack:
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)


if __name__ == "__main__":
pubsub_group()
1 change: 1 addition & 0 deletions bin/test.sh
Expand Up @@ -29,6 +29,7 @@ echo ">>> wait for services to be ready"

urlwait "${DATABASE_URL}"
urlwait "${ELASTICSEARCH_URL}"
urlwait "http://${PUBSUB_EMULATOR_HOST}" 10
python ./bin/waitfor.py --verbose --codes=200,404 "${SENTRY_DSN}"
python ./bin/waitfor.py --verbose "${LOCAL_DEV_AWS_ENDPOINT_URL}health"

Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yml
Expand Up @@ -25,6 +25,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch
volumes:
Expand All @@ -40,6 +41,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch

Expand Down Expand Up @@ -73,6 +75,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- elasticsearch
- symbolsserver
command: ["processor"]
Expand Down Expand Up @@ -102,6 +105,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch
- memcached
Expand Down Expand Up @@ -148,6 +152,7 @@ services:
- my.env
depends_on:
- localstack
- pubsub
- statsd
expose:
- 8000
Expand Down Expand Up @@ -208,6 +213,21 @@ services:
ports:
- "4566:4566"

# https://cloud.google.com/sdk/docs/downloads-docker
# official pubsub emulator
pubsub:
# also available as google/cloud-sdk:<version>-emulators
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:463.0.0-emulators
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --host-port=0.0.0.0:${PUBSUB_PORT:-5010}
ports:
- 5010:5010

# https://hub.docker.com/_/memcached/
memcached:
image: memcached:1.5.1
Expand Down
4 changes: 4 additions & 0 deletions docker/config/local_dev.env
Expand Up @@ -129,3 +129,7 @@ CRASHPUBLISH_REGION=us-east-1
CRASHPUBLISH_ACCESS_KEY=foo
CRASHPUBLISH_SECRET_ACCESS_KEY=foo
CRASHPUBLISH_QUEUE_NAME=local-dev-standard

# pubsub emulator
# ---------------
PUBSUB_EMULATOR_HOST=pubsub:5010
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -19,6 +19,7 @@ everett==3.3.0
fillmore==1.2.0
freezegun==1.4.0
glom==23.5.0
google-cloud-pubsub==2.19.3
gunicorn==21.2.0
honcho==1.1.0
humanfriendly==10.0
Expand Down