Skip to content

Commit

Permalink
bug-1878419: pubsub emulator for local dev environment
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Mar 7, 2024
1 parent d1a7d98 commit 44c8e58
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -22,6 +22,7 @@ symbols/

# docker things
.docker-build*
.devcontainer-build
.cache/
docker-compose.override.yml
my.env
Expand Down
150 changes: 150 additions & 0 deletions bin/pubsub_cli.py
@@ -0,0 +1,150 @@
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

# Pub/Sub manipulation script.
#
# Note: Run this in the base container which has access to Pub/Sub.
#
# Usage: ./bin/pubsub_cli.py [SUBCOMMAND]

import click
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists, NotFound


@click.group()
def pubsub_group():
"""Local dev environment Pub/Sub emulator manipulation script."""


@pubsub_group.command("list_topics")
@click.argument("project_id")
@click.pass_context
def list_topics(ctx, project_id):
"""List topics for this project."""
click.echo(f"Listing topics in project {project_id}.")
publisher = pubsub_v1.PublisherClient()

for topic in publisher.list_topics(project=f"projects/{project_id}"):
click.echo(topic.name)


@pubsub_group.command("list_subscriptions")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def list_subscriptions(ctx, project_id, topic_name):
"""List subscriptions for a given topic."""
click.echo(f"Listing subscriptions in topic {topic_name!r}:")
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for subscription in publisher.list_topic_subscriptions(topic=topic_path):
click.echo(subscription)


@pubsub_group.command("create_topic")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def create_topic(ctx, project_id, topic_name):
"""Create topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

try:
publisher.create_topic(name=topic_path)
click.echo(f"Topic created: {topic_path}")
except AlreadyExists:
click.echo("Topic already created.")


@pubsub_group.command("create_subscription")
@click.argument("project_id")
@click.argument("topic_name")
@click.argument("subscription_name")
@click.pass_context
def create_subscription(ctx, project_id, topic_name, subscription_name):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
try:
subscriber.create_subscription(name=subscription_path, topic=topic_path)
click.echo(f"Subscription created: {subscription_path}")
except AlreadyExists:
click.echo("Subscription already created.")


@pubsub_group.command("delete_topic")
@click.argument("project_id")
@click.argument("topic_name")
@click.pass_context
def delete_topic(ctx, project_id, topic_name):
"""Delete a topic."""
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_name)

# Delete all subscriptions
for subscription in publisher.list_topic_subscriptions(topic=topic_path):
click.echo(f"Deleting {subscription} ...")
subscriber.delete_subscription(subscription=subscription)

# Delete topic
try:
publisher.delete_topic(topic=topic_path)
click.echo(f"Topic deleted: {topic_name}")
except NotFound:
click.echo(f"Topic {topic_name} does not exist.")


@pubsub_group.command("publish")
@click.argument("project_id")
@click.argument("topic_name")
@click.argument("crash_id")
@click.pass_context
def publish(ctx, project_id, topic_name, crash_id):
"""Publish crash_id to a given topic."""
click.echo(f"Publishing crash_id to topic {topic_name!r}:")
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

future = publisher.publish(topic_path, crash_id.encode("utf-8"), timeout=5)
click.echo(future.result())


@pubsub_group.command("pull")
@click.argument("project_id")
@click.argument("subscription_name")
@click.option("--ack/--no-ack", is_flag=True, default=False)
@click.option("--max-messages", default=1, type=int)
@click.pass_context
def pull(ctx, project_id, subscription_name, ack, max_messages):
"""Pull crash id from a given subscription."""
click.echo(f"Pulling crash id from subscription {subscription_name!r}:")
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)

response = subscriber.pull(
subscription=subscription_path, max_messages=max_messages
)
if not response.received_messages:
return

ack_ids = []
for msg in response.received_messages:
click.echo(f"crash id: {msg.message.data}")
ack_ids.append(msg.ack_id)

if ack:
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)


if __name__ == "__main__":
pubsub_group()
1 change: 1 addition & 0 deletions bin/test.sh
Expand Up @@ -29,6 +29,7 @@ echo ">>> wait for services to be ready"

urlwait "${DATABASE_URL}"
urlwait "${ELASTICSEARCH_URL}"
urlwait "http://${PUBSUB_EMULATOR_HOST}" 10
python ./bin/waitfor.py --verbose --codes=200,404 "${SENTRY_DSN}"
python ./bin/waitfor.py --verbose "${LOCAL_DEV_AWS_ENDPOINT_URL}health"

Expand Down
20 changes: 20 additions & 0 deletions docker-compose.yml
Expand Up @@ -25,6 +25,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch
volumes:
Expand All @@ -40,6 +41,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch

Expand All @@ -52,6 +54,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- elasticsearch
- symbolsserver
command: ["processor"]
Expand Down Expand Up @@ -81,6 +84,7 @@ services:
- fakesentry
- statsd
- localstack
- pubsub
- postgresql
- elasticsearch
- memcached
Expand Down Expand Up @@ -127,6 +131,7 @@ services:
- my.env
depends_on:
- localstack
- pubsub
- statsd
expose:
- 8000
Expand Down Expand Up @@ -187,6 +192,21 @@ services:
ports:
- "4566:4566"

# https://cloud.google.com/sdk/docs/downloads-docker
# official pubsub emulator
pubsub:
# also available as google/cloud-sdk:<version>-emulators
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:463.0.0-emulators
command:
- gcloud
- beta
- emulators
- pubsub
- start
- --host-port=0.0.0.0:${PUBSUB_PORT:-5010}
ports:
- 5010:5010

# https://hub.docker.com/_/memcached/
memcached:
image: memcached:1.5.1
Expand Down
4 changes: 4 additions & 0 deletions docker/config/local_dev.env
Expand Up @@ -129,3 +129,7 @@ CRASHPUBLISH_REGION=us-east-1
CRASHPUBLISH_ACCESS_KEY=foo
CRASHPUBLISH_SECRET_ACCESS_KEY=foo
CRASHPUBLISH_QUEUE_NAME=local-dev-standard

# pubsub emulator
# ---------------
PUBSUB_EMULATOR_HOST=pubsub:5010
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -19,6 +19,7 @@ everett==3.3.0
fillmore==1.2.0
freezegun==1.4.0
glom==23.5.0
google-cloud-pubsub==2.19.3
gunicorn==21.2.0
honcho==1.1.0
humanfriendly==10.0
Expand Down

0 comments on commit 44c8e58

Please sign in to comment.