From 742e7dc07d46f0ffb4aa680dbc49c10b762f7c73 Mon Sep 17 00:00:00 2001 From: Daniel Thorn Date: Wed, 6 Mar 2024 13:48:26 -0800 Subject: [PATCH] bug-1878419: pubsub emulator for local dev environment --- bin/pubsub_cli.py | 150 +++++++++++++++++++++++++++++++ bin/test.sh | 1 + docker-compose.yml | 20 +++++ docker/config/local_dev.env | 4 + requirements.in | 1 + requirements.txt | 125 +++++++++++++++++++++++++- socorro/tests/test_pubsub_cli.py | 14 +++ 7 files changed, 314 insertions(+), 1 deletion(-) create mode 100755 bin/pubsub_cli.py create mode 100644 socorro/tests/test_pubsub_cli.py diff --git a/bin/pubsub_cli.py b/bin/pubsub_cli.py new file mode 100755 index 0000000000..ea34c42bc0 --- /dev/null +++ b/bin/pubsub_cli.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +# Pub/Sub manipulation script. +# +# Note: Run this in the base container which has access to Pub/Sub. +# +# Usage: ./bin/pubsub_cli.py [SUBCOMMAND] + +import click +from google.cloud import pubsub_v1 +from google.api_core.exceptions import AlreadyExists, NotFound + + +@click.group() +def pubsub_group(): + """Local dev environment Pub/Sub emulator manipulation script.""" + + +@pubsub_group.command("list_topics") +@click.argument("project_id") +@click.pass_context +def list_topics(ctx, project_id): + """List topics for this project.""" + click.echo(f"Listing topics in project {project_id}.") + publisher = pubsub_v1.PublisherClient() + + for topic in publisher.list_topics(project=f"projects/{project_id}"): + click.echo(topic.name) + + +@pubsub_group.command("list_subscriptions") +@click.argument("project_id") +@click.argument("topic_name") +@click.pass_context +def list_subscriptions(ctx, project_id, topic_name): + """List subscriptions for a given topic.""" + click.echo(f"Listing subscriptions in topic {topic_name!r}:") + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_name) + + for subscription in publisher.list_topic_subscriptions(topic=topic_path): + click.echo(subscription) + + +@pubsub_group.command("create_topic") +@click.argument("project_id") +@click.argument("topic_name") +@click.pass_context +def create_topic(ctx, project_id, topic_name): + """Create topic.""" + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_name) + + try: + publisher.create_topic(name=topic_path) + click.echo(f"Topic created: {topic_path}") + except AlreadyExists: + click.echo("Topic already created.") + + +@pubsub_group.command("create_subscription") +@click.argument("project_id") +@click.argument("topic_name") +@click.argument("subscription_name") +@click.pass_context +def create_subscription(ctx, project_id, topic_name, subscription_name): + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_name) + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_name) + try: + subscriber.create_subscription(name=subscription_path, topic=topic_path) + click.echo(f"Subscription created: {subscription_path}") + except AlreadyExists: + click.echo("Subscription already created.") + + +@pubsub_group.command("delete_topic") +@click.argument("project_id") +@click.argument("topic_name") +@click.pass_context +def delete_topic(ctx, project_id, topic_name): + """Delete a topic.""" + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + topic_path = publisher.topic_path(project_id, topic_name) + + # Delete all subscriptions + for subscription in publisher.list_topic_subscriptions(topic=topic_path): + click.echo(f"Deleting {subscription} ...") + subscriber.delete_subscription(subscription=subscription) + + # Delete topic + try: + publisher.delete_topic(topic=topic_path) + click.echo(f"Topic deleted: {topic_name}") + except NotFound: + click.echo(f"Topic {topic_name} does not exist.") + + +@pubsub_group.command("publish") +@click.argument("project_id") +@click.argument("topic_name") +@click.argument("crash_id") +@click.pass_context +def publish(ctx, project_id, topic_name, crash_id): + """Publish crash_id to a given topic.""" + click.echo(f"Publishing crash_id to topic {topic_name!r}:") + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_name) + + future = publisher.publish(topic_path, crash_id.encode("utf-8"), timeout=5) + click.echo(future.result()) + + +@pubsub_group.command("pull") +@click.argument("project_id") +@click.argument("subscription_name") +@click.option("--ack/--no-ack", is_flag=True, default=False) +@click.option("--max-messages", default=1, type=int) +@click.pass_context +def pull(ctx, project_id, subscription_name, ack, max_messages): + """Pull crash id from a given subscription.""" + click.echo(f"Pulling crash id from subscription {subscription_name!r}:") + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_name) + + response = subscriber.pull( + subscription=subscription_path, max_messages=max_messages + ) + if not response.received_messages: + return + + ack_ids = [] + for msg in response.received_messages: + click.echo(f"crash id: {msg.message.data}") + ack_ids.append(msg.ack_id) + + if ack: + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids) + + +if __name__ == "__main__": + pubsub_group() diff --git a/bin/test.sh b/bin/test.sh index 4e3bc959c2..1ecb898528 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -29,6 +29,7 @@ echo ">>> wait for services to be ready" urlwait "${DATABASE_URL}" urlwait "${ELASTICSEARCH_URL}" +urlwait "http://${PUBSUB_EMULATOR_HOST}" 10 python ./bin/waitfor.py --verbose --codes=200,404 "${SENTRY_DSN}" python ./bin/waitfor.py --verbose "${LOCAL_DEV_AWS_ENDPOINT_URL}health" diff --git a/docker-compose.yml b/docker-compose.yml index 54f93f7085..3262eb5ac4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,7 @@ services: - fakesentry - statsd - localstack + - pubsub - postgresql - elasticsearch volumes: @@ -40,6 +41,7 @@ services: - fakesentry - statsd - localstack + - pubsub - postgresql - elasticsearch @@ -73,6 +75,7 @@ services: - fakesentry - statsd - localstack + - pubsub - elasticsearch - symbolsserver command: ["processor"] @@ -102,6 +105,7 @@ services: - fakesentry - statsd - localstack + - pubsub - postgresql - elasticsearch - memcached @@ -148,6 +152,7 @@ services: - my.env depends_on: - localstack + - pubsub - statsd expose: - 8000 @@ -208,6 +213,21 @@ services: ports: - "4566:4566" + # https://cloud.google.com/sdk/docs/downloads-docker + # official pubsub emulator + pubsub: + # also available as google/cloud-sdk:-emulators + image: gcr.io/google.com/cloudsdktool/google-cloud-cli:463.0.0-emulators + command: + - gcloud + - beta + - emulators + - pubsub + - start + - --host-port=0.0.0.0:${PUBSUB_PORT:-5010} + ports: + - 5010:5010 + # https://hub.docker.com/_/memcached/ memcached: image: memcached:1.5.1 diff --git a/docker/config/local_dev.env b/docker/config/local_dev.env index 39e4ec004a..da5675fc1d 100644 --- a/docker/config/local_dev.env +++ b/docker/config/local_dev.env @@ -129,3 +129,7 @@ CRASHPUBLISH_REGION=us-east-1 CRASHPUBLISH_ACCESS_KEY=foo CRASHPUBLISH_SECRET_ACCESS_KEY=foo CRASHPUBLISH_QUEUE_NAME=local-dev-standard + +# pubsub emulator +# --------------- +PUBSUB_EMULATOR_HOST=pubsub:5010 diff --git a/requirements.in b/requirements.in index e4ee74b431..45ca330ff7 100644 --- a/requirements.in +++ b/requirements.in @@ -19,6 +19,7 @@ everett==3.3.0 fillmore==1.2.0 freezegun==1.4.0 glom==23.5.0 +google-cloud-pubsub==2.19.3 gunicorn==21.2.0 honcho==1.1.0 humanfriendly==10.0 diff --git a/requirements.txt b/requirements.txt index 3ea69ed888..7dfb37f91a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,6 +53,10 @@ build==1.0.3 \ --hash=sha256:538aab1b64f9828977f84bc63ae570b060a8ed1be419e7870b8b4fc5e6ea553b \ --hash=sha256:589bf99a67df7c9cf07ec0ac0e5e2ea5d4b37ac63301c4986d1acb126aa83f8f # via pip-tools +cachetools==5.3.3 \ + --hash=sha256:0abad1021d3f8325b2fc1d2e9c8b9c9d57b04c3932657a72465447332c24d945 \ + --hash=sha256:ba29e2dfa0b8b556606f097407ed1aa62080ee108ab0dc5ec9d6a723a007d105 + # via google-auth certifi==2023.7.22 \ --hash=sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082 \ --hash=sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9 @@ -349,6 +353,98 @@ glom==23.5.0 \ --hash=sha256:06af5e3486aacc59382ba34e53ebeabd7a9345d78f7dbcbee26f03baa4b83bac \ --hash=sha256:fe4e9be4dc93c11a99f8277042e4bee95419c02cda4b969f504508b0a1aa6a66 # via -r requirements.in +google-api-core==2.17.1 \ + --hash=sha256:610c5b90092c360736baccf17bd3efbcb30dd380e7a6dc28a71059edb8bd0d8e \ + --hash=sha256:9df18a1f87ee0df0bc4eea2770ebc4228392d8cc4066655b320e2cfccb15db95 + # via google-cloud-pubsub +google-auth==2.28.1 \ + --hash=sha256:25141e2d7a14bfcba945f5e9827f98092716e99482562f15306e5b026e21aa72 \ + --hash=sha256:34fc3046c257cedcf1622fc4b31fc2be7923d9b4d44973d481125ecc50d83885 + # via + # google-api-core + # google-cloud-pubsub +google-cloud-pubsub==2.19.3 \ + --hash=sha256:9745ab5fe13270abdfb558c8512933c6a5b73ebbf69565e64b606509d65b1b46 \ + --hash=sha256:b8848d8050b835d702f8206475992cf01074de4f8b5e0875035035115f98f2d9 + # via -r requirements.in +googleapis-common-protos==1.62.0 \ + --hash=sha256:4750113612205514f9f6aa4cb00d523a94f3e8c06c5ad2fee466387dc4875f07 \ + --hash=sha256:83f0ece9f94e5672cced82f592d2a5edf527a96ed1794f0bab36d5735c996277 + # via + # google-api-core + # grpc-google-iam-v1 + # grpcio-status +grpc-google-iam-v1==0.13.0 \ + --hash=sha256:53902e2af7de8df8c1bd91373d9be55b0743ec267a7428ea638db3775becae89 \ + --hash=sha256:fad318608b9e093258fbf12529180f400d1c44453698a33509cc6ecf005b294e + # via google-cloud-pubsub +grpcio==1.62.0 \ + --hash=sha256:0b9179478b09ee22f4a36b40ca87ad43376acdccc816ce7c2193a9061bf35701 \ + --hash=sha256:0d3dee701e48ee76b7d6fbbba18ba8bc142e5b231ef7d3d97065204702224e0e \ + --hash=sha256:0d7ae7fc7dbbf2d78d6323641ded767d9ec6d121aaf931ec4a5c50797b886532 \ + --hash=sha256:0e97f37a3b7c89f9125b92d22e9c8323f4e76e7993ba7049b9f4ccbe8bae958a \ + --hash=sha256:136ffd79791b1eddda8d827b607a6285474ff8a1a5735c4947b58c481e5e4271 \ + --hash=sha256:1bc8449084fe395575ed24809752e1dc4592bb70900a03ca42bf236ed5bf008f \ + --hash=sha256:1eda79574aec8ec4d00768dcb07daba60ed08ef32583b62b90bbf274b3c279f7 \ + --hash=sha256:29cb592c4ce64a023712875368bcae13938c7f03e99f080407e20ffe0a9aa33b \ + --hash=sha256:2c1488b31a521fbba50ae86423f5306668d6f3a46d124f7819c603979fc538c4 \ + --hash=sha256:2e84bfb2a734e4a234b116be208d6f0214e68dcf7804306f97962f93c22a1839 \ + --hash=sha256:2f3d9a4d0abb57e5f49ed5039d3ed375826c2635751ab89dcc25932ff683bbb6 \ + --hash=sha256:36df33080cd7897623feff57831eb83c98b84640b016ce443305977fac7566fb \ + --hash=sha256:38f69de9c28c1e7a8fd24e4af4264726637b72f27c2099eaea6e513e7142b47e \ + --hash=sha256:39cd45bd82a2e510e591ca2ddbe22352e8413378852ae814549c162cf3992a93 \ + --hash=sha256:3fa15850a6aba230eed06b236287c50d65a98f05054a0f01ccedf8e1cc89d57f \ + --hash=sha256:4cd356211579043fce9f52acc861e519316fff93980a212c8109cca8f47366b6 \ + --hash=sha256:56ca7ba0b51ed0de1646f1735154143dcbdf9ec2dbe8cc6645def299bb527ca1 \ + --hash=sha256:5e709f7c8028ce0443bddc290fb9c967c1e0e9159ef7a030e8c21cac1feabd35 \ + --hash=sha256:614c3ed234208e76991992342bab725f379cc81c7dd5035ee1de2f7e3f7a9842 \ + --hash=sha256:62aa1659d8b6aad7329ede5d5b077e3d71bf488d85795db517118c390358d5f6 \ + --hash=sha256:62ccb92f594d3d9fcd00064b149a0187c246b11e46ff1b7935191f169227f04c \ + --hash=sha256:662d3df5314ecde3184cf87ddd2c3a66095b3acbb2d57a8cada571747af03873 \ + --hash=sha256:748496af9238ac78dcd98cce65421f1adce28c3979393e3609683fcd7f3880d7 \ + --hash=sha256:77d48e5b1f8f4204889f1acf30bb57c30378e17c8d20df5acbe8029e985f735c \ + --hash=sha256:7a195531828b46ea9c4623c47e1dc45650fc7206f8a71825898dd4c9004b0928 \ + --hash=sha256:7e1f51e2a460b7394670fdb615e26d31d3260015154ea4f1501a45047abe06c9 \ + --hash=sha256:7eea57444a354ee217fda23f4b479a4cdfea35fb918ca0d8a0e73c271e52c09c \ + --hash=sha256:7f9d6c3223914abb51ac564dc9c3782d23ca445d2864321b9059d62d47144021 \ + --hash=sha256:81531632f93fece32b2762247c4c169021177e58e725494f9a746ca62c83acaa \ + --hash=sha256:81d444e5e182be4c7856cd33a610154fe9ea1726bd071d07e7ba13fafd202e38 \ + --hash=sha256:821a44bd63d0f04e33cf4ddf33c14cae176346486b0df08b41a6132b976de5fc \ + --hash=sha256:88f41f33da3840b4a9bbec68079096d4caf629e2c6ed3a72112159d570d98ebe \ + --hash=sha256:8aab8f90b2a41208c0a071ec39a6e5dbba16fd827455aaa070fec241624ccef8 \ + --hash=sha256:921148f57c2e4b076af59a815467d399b7447f6e0ee10ef6d2601eb1e9c7f402 \ + --hash=sha256:92cdb616be44c8ac23a57cce0243af0137a10aa82234f23cd46e69e115071388 \ + --hash=sha256:95370c71b8c9062f9ea033a0867c4c73d6f0ff35113ebd2618171ec1f1e903e0 \ + --hash=sha256:98d8f4eb91f1ce0735bf0b67c3b2a4fea68b52b2fd13dc4318583181f9219b4b \ + --hash=sha256:a33f2bfd8a58a02aab93f94f6c61279be0f48f99fcca20ebaee67576cd57307b \ + --hash=sha256:ab140a3542bbcea37162bdfc12ce0d47a3cda3f2d91b752a124cc9fe6776a9e2 \ + --hash=sha256:b3d3d755cfa331d6090e13aac276d4a3fb828bf935449dc16c3d554bf366136b \ + --hash=sha256:b71c65427bf0ec6a8b48c68c17356cb9fbfc96b1130d20a07cb462f4e4dcdcd5 \ + --hash=sha256:b7a6be562dd18e5d5bec146ae9537f20ae1253beb971c0164f1e8a2f5a27e829 \ + --hash=sha256:bcff647e7fe25495e7719f779cc219bbb90b9e79fbd1ce5bda6aae2567f469f2 \ + --hash=sha256:c912688acc05e4ff012c8891803659d6a8a8b5106f0f66e0aed3fb7e77898fa6 \ + --hash=sha256:ce1aafdf8d3f58cb67664f42a617af0e34555fe955450d42c19e4a6ad41c84bd \ + --hash=sha256:d6a56ba703be6b6267bf19423d888600c3f574ac7c2cc5e6220af90662a4d6b0 \ + --hash=sha256:e803e9b58d8f9b4ff0ea991611a8d51b31c68d2e24572cd1fe85e99e8cc1b4f8 \ + --hash=sha256:eef1d16ac26c5325e7d39f5452ea98d6988c700c427c52cbc7ce3201e6d93334 \ + --hash=sha256:f359d635ee9428f0294bea062bb60c478a8ddc44b0b6f8e1f42997e5dc12e2ee \ + --hash=sha256:f4c04fe33039b35b97c02d2901a164bbbb2f21fb9c4e2a45a959f0b044c3512c \ + --hash=sha256:f897b16190b46bc4d4aaf0a32a4b819d559a37a756d7c6b571e9562c360eed72 \ + --hash=sha256:fbe0c20ce9a1cff75cfb828b21f08d0a1ca527b67f2443174af6626798a754a4 \ + --hash=sha256:fc2836cb829895ee190813446dce63df67e6ed7b9bf76060262c55fcd097d270 \ + --hash=sha256:fcc98cff4084467839d0a20d16abc2a76005f3d1b38062464d088c07f500d170 + # via + # google-api-core + # google-cloud-pubsub + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status +grpcio-status==1.62.0 \ + --hash=sha256:0d693e9c09880daeaac060d0c3dba1ae470a43c99e5d20dfeafd62cf7e08a85d \ + --hash=sha256:3baac03fcd737310e67758c4082a188107f771d32855bce203331cd4c9aa687a + # via + # google-api-core + # google-cloud-pubsub gunicorn==21.2.0 \ --hash=sha256:3213aa5e8c24949e792bcacfc176fef362e7aac80b76c56f6b5122bf350722f0 \ --hash=sha256:88ec8bff1d634f98e61b9f65bc4bf3cd918a90806c6f5c48bc5603849ec81033 @@ -599,6 +695,29 @@ pluggy==1.3.0 \ --hash=sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12 \ --hash=sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7 # via pytest +proto-plus==1.23.0 \ + --hash=sha256:89075171ef11988b3fa157f5dbd8b9cf09d65fffee97e29ce403cd8defba19d2 \ + --hash=sha256:a829c79e619e1cf632de091013a4173deed13a55f326ef84f05af6f50ff4c82c + # via google-cloud-pubsub +protobuf==4.25.3 \ + --hash=sha256:19b270aeaa0099f16d3ca02628546b8baefe2955bbe23224aaf856134eccf1e4 \ + --hash=sha256:209ba4cc916bab46f64e56b85b090607a676f66b473e6b762e6f1d9d591eb2e8 \ + --hash=sha256:25b5d0b42fd000320bd7830b349e3b696435f3b329810427a6bcce6a5492cc5c \ + --hash=sha256:7c8daa26095f82482307bc717364e7c13f4f1c99659be82890dcfc215194554d \ + --hash=sha256:c053062984e61144385022e53678fbded7aea14ebb3e0305ae3592fb219ccfa4 \ + --hash=sha256:d4198877797a83cbfe9bffa3803602bbe1625dc30d8a097365dbc762e5790faa \ + --hash=sha256:e3c97a1555fd6388f857770ff8b9703083de6bf1f9274a002a332d65fbb56c8c \ + --hash=sha256:e7cb0ae90dd83727f0c0718634ed56837bfeeee29a5f82a7514c03ee1364c019 \ + --hash=sha256:f0700d54bcf45424477e46a9f0944155b46fb0639d69728739c0e47bab83f2b9 \ + --hash=sha256:f1279ab38ecbfae7e456a108c5c0681e4956d5b1090027c1de0f934dfdb4b35c \ + --hash=sha256:f4f118245c4a087776e0a8408be33cf09f6c547442c00395fbfb116fac2f8ac2 + # via + # google-api-core + # google-cloud-pubsub + # googleapis-common-protos + # grpc-google-iam-v1 + # grpcio-status + # proto-plus psutil==5.9.8 \ --hash=sha256:02615ed8c5ea222323408ceba16c60e99c3f91639b07da6373fb7e6539abc56d \ --hash=sha256:05806de88103b25903dff19bb6692bd2e714ccf9e668d050d144012055cbca73 \ @@ -701,7 +820,9 @@ pyasn1==0.5.0 \ pyasn1-modules==0.3.0 \ --hash=sha256:5bd01446b736eb9d31512a30d46c1ac3395d676c6f3cafa4c03eb54b9925631c \ --hash=sha256:d3ccd6ed470d9ffbc716be08bd90efbd44d0734bc9303818f7336070984a162d - # via oauth2client + # via + # google-auth + # oauth2client pycparser==2.21 \ --hash=sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9 \ --hash=sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206 @@ -828,6 +949,7 @@ requests==2.31.0 \ # via # -r requirements.in # datadog + # google-api-core # mozilla-django-oidc # requests-mock # sphinx @@ -943,6 +1065,7 @@ rsa==4.7.2 \ --hash=sha256:9d689e6ca1b3038bc82bf8d23e944b6b6037bc02301a574935b2dd946e0353b9 # via # awscli + # google-auth # oauth2client ruff==0.2.2 \ --hash=sha256:0a9efb032855ffb3c21f6405751d5e147b0c6b631e3ca3f6b20f917572b97eb6 \ diff --git a/socorro/tests/test_pubsub_cli.py b/socorro/tests/test_pubsub_cli.py new file mode 100644 index 0000000000..221fc0eeb6 --- /dev/null +++ b/socorro/tests/test_pubsub_cli.py @@ -0,0 +1,14 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +from click.testing import CliRunner + +from pubsub_cli import pubsub_group + + +def test_it_runs(): + """Test whether the module loads and spits out help.""" + runner = CliRunner() + result = runner.invoke(pubsub_group, ["--help"]) + assert result.exit_code == 0