Skip to content

Commit

Permalink
bug-1878423: support pubsub crashqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Mar 14, 2024
1 parent 742e7dc commit 7665840
Show file tree
Hide file tree
Showing 12 changed files with 648 additions and 45 deletions.
2 changes: 2 additions & 0 deletions bin/test.sh
Expand Up @@ -55,4 +55,6 @@ echo ">>> run tests"
pushd webapp
${PYTHON} manage.py collectstatic --noinput
"${PYTEST}"
# default cloud provider is aws, now configure gcp and run only impacted tests
CLOUD_PROVIDER=GCP "${PYTEST}" -m gcp
popd
12 changes: 10 additions & 2 deletions docker/config/local_dev.env
Expand Up @@ -130,6 +130,14 @@ CRASHPUBLISH_ACCESS_KEY=foo
CRASHPUBLISH_SECRET_ACCESS_KEY=foo
CRASHPUBLISH_QUEUE_NAME=local-dev-standard

# pubsub emulator
# ---------------
# pubsub
# ------
PUBSUB_EMULATOR_HOST=pubsub:5010

PUBSUB_PROJECT_ID=test
PUBSUB_STANDARD_TOPIC_NAME=local-standard-topic
PUBSUB_PRIORITY_TOPIC_NAME=local-priority-topic
PUBSUB_REPROCESSING_TOPIC_NAME=local-reprocessing-topic
PUBSUB_STANDARD_SUBSCRIPTION_NAME=local-standard-sub
PUBSUB_PRIORITY_SUBSCRIPTION_NAME=local-priority-sub
PUBSUB_REPROCESSING_SUBSCRIPTION_NAME=local-reprocessing-sub
3 changes: 3 additions & 0 deletions socorro/external/pubsub/__init__.py
@@ -0,0 +1,3 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
251 changes: 251 additions & 0 deletions socorro/external/pubsub/crashqueue.py
@@ -0,0 +1,251 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

from functools import partial
import logging
import os

from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
from google.cloud.pubsub_v1.types import BatchSettings, PublisherOptions
from more_itertools import chunked

from socorro.external.crashqueue_base import CrashQueueBase


logger = logging.getLogger(__name__)


class CrashIdsFailedToPublish(Exception):
"""Crash ids that failed to publish."""


class PubSubCrashQueue(CrashQueueBase):
"""Crash queue that uses Google Cloud Pub/Sub.
This requires three Google Cloud Pub/Sub topics with subscriptions:
* **standard**: processing incoming crashes from the collector
* **priority**: processing crashes right now because someone/something
is trying to view them
* **reprocessing**: reprocessing crashes after a change to the processor
that have (probably) already been processed
and they need the following settings:
========================== =========
Setting Value
========================== =========
Acknowledgement deadline 5 minutes
Message retention duration *default*
========================== =========
The GCP credentials that Socorro is configured with must have the following
permissions on the Pub/Sub topics and subscriptions configured:
* ``roles/pubsub.publisher``
Socorro sends messages to topics--this is how the webapp publishes crash ids
to the priority and reprocessing queues. This requires permissions from the
``roles/pubsub.publisher`` role.
* ``roles/pubsub.subscriber``
The Socorro processor has to receive messages from the configured subscriptions
in order to process the related crash reports. This requires permissions from the
``roles/pubsub.subscriber`` role.
If something isn't configured correctly, then the Socorro processor will be unable
to process crashes and the webapp will be unable to publish crash ids for
processing.
**Authentication**
The google cloud sdk will automatically detect credentials as described in
https://googleapis.dev/python/google-api-core/latest/auth.html:
- If you're running in a Google Virtual Machine Environment (Compute Engine, App
Engine, Cloud Run, Cloud Functions), authentication should "just work".
- If you're developing locally, the easiest way to authenticate is using the `Google
Cloud SDK <http://cloud.google.com/sdk>`_::
$ gcloud auth application-default login
- If you're running your application elsewhere, you should download a `service account
<https://cloud.google.com/iam/docs/creating-managing-service-accounts#creating>`_
JSON keyfile and point to it using an environment variable::
$ export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"
**Local emulator**
If you set the environment variable ``PUBSUB_EMULATOR_HOST=host:port``,
then this will connect to a local Pub/Sub emulator.
"""

def __init__(
self,
project_id,
standard_topic_name,
standard_subscription_name,
priority_topic_name,
priority_subscription_name,
reprocessing_topic_name,
reprocessing_subscription_name,
pull_max_messages=5,
publish_max_messages=10,
publish_timeout=5,
):
"""
:arg project_id: Google Compute Platform project_id
:arg standard_topic_name: topic name for the standard queue
:arg standard_subscription_name: subscription name for the standard queue
:arg priority_topic_name: topic name for the priority processing queue
:arg priority_subscription_name: subscription name for the priority queue
:arg reprocessing_topic_name: topic name for the reprocessing queue
:arg reprocessing_subscription_name: subscription name for the reprocessing
queue
:arg pull_max_messages: maximum number of messages to pull from Google Pub/Sub
in a single request
:arg publish_max_messages: maximum number of messages to publish to Google
Pub/Sub in a single request
:arg publish_timeout: rpc timeout for publish requests
"""

if emulator := os.environ.get("PUBSUB_EMULATOR_HOST"):
logger.debug(
"PUBSUB_EMULATOR_HOST detected, connecting to emulator: %s",
emulator,
)
self.publisher = PublisherClient(
# publish messages in batches
batch_settings=BatchSettings(max_messages=publish_max_messages),
# disable retry in favor of socorro's retry and set rpc timeout
publisher_options=PublisherOptions(retry=None, timeout=publish_timeout),
)

self.standard_topic_path = self.publisher.topic_path(
project_id, standard_topic_name
)
self.priority_topic_path = self.publisher.topic_path(
project_id, priority_topic_name
)
self.reprocessing_topic_path = self.publisher.topic_path(
project_id, reprocessing_topic_name
)
self.queue_to_topic_path = {
"standard": self.standard_topic_path,
"priority": self.priority_topic_path,
"reprocessing": self.reprocessing_topic_path,
}

self.subscriber = SubscriberClient()
self.standard_subscription_path = self.subscriber.subscription_path(
project_id, standard_subscription_name
)
self.priority_subscription_path = self.subscriber.subscription_path(
project_id, priority_subscription_name
)
self.reprocessing_subscription_path = self.subscriber.subscription_path(
project_id, reprocessing_subscription_name
)
# order matters here, and is checked in tests
self.queue_to_subscription_path = {
"priority": self.priority_subscription_path,
"standard": self.standard_subscription_path,
"reprocessing": self.reprocessing_subscription_path,
}

self.pull_max_messages = pull_max_messages
self.publish_max_messages = publish_max_messages

def ack_crash(self, subscription_path, ack_id):
"""Acknowledges a crash
:arg subscription_path: the subscription path for the queue
:arg ack_id: the ack_id for the message to acknowledge
"""
self.subscriber.acknowledge(subscription=subscription_path, ack_ids=[ack_id])
logger.debug("ack %s from %s", ack_id, subscription_path)

def __iter__(self):
"""Return iterator over crash ids from AWS SQS.
Each returned crash is a ``(crash_id, {kwargs})`` tuple with
``finished_func`` as the only key in ``kwargs``. The caller should call
``finished_func`` when it's done processing the crash.
"""
while True:
has_msgs = {}
for subscription_path in self.queue_to_subscription_path.values():
resp = self.subscriber.pull(
subscription=subscription_path,
max_messages=self.pull_max_messages,
)
msgs = resp.received_messages

# if pull returned the max number of messages, this subscription
# may have more messages.
has_msgs[subscription_path] = len(msgs) == self.pull_max_messages

if not msgs:
continue

for msg in msgs:
crash_id = msg.message.data.decode("utf-8")
ack_id = msg.ack_id
logger.debug("got %s from %s", crash_id, subscription_path)
if crash_id == "test":
# Ack and drop any test crash ids
self.ack_crash(subscription_path, ack_id)
continue
yield (
(crash_id,),
{
"finished_func": partial(
self.ack_crash, subscription_path, ack_id
)
},
)

if not any(has_msgs.values()):
# There's nothing to process, so return
return

def publish(self, queue, crash_ids):
"""Publish crash ids to specified queue.
:arg queue: the name of the queue to publish to; "standard", "priority" or
"reprocessing"
:arg crash_ids: the list of crash ids to publish
:raises CrashIdsFailedToPublish: raised if there was a failure publishing
crash ids with the list of crash ids that failed to publish
"""
assert queue in self.queue_to_topic_path

failed = []

topic_path = self.queue_to_topic_path[queue]
for batch in chunked(crash_ids, self.publish_max_messages):
futures = [
self.publisher.publish(topic=topic_path, data=crash_id.encode("utf-8"))
for crash_id in batch
]
for i, future in enumerate(futures):
try:
future.result()
except Exception:
failed.append(batch[i])

if failed:
raise CrashIdsFailedToPublish(
f"Crashids failed to publish: {','.join(failed)}"
)
2 changes: 1 addition & 1 deletion socorro/external/sqs/crashqueue.py
Expand Up @@ -94,7 +94,7 @@ class SQSCrashQueue(CrashQueueBase):
* ``sqs:ReceiveMessage``
The Socorro processor has to receive messages from the queue in order to
process the related crash reports. This requires teh ``sqs:ReceiveMessage``
process the related crash reports. This requires the ``sqs:ReceiveMessage``
permission.
If something isn't configured correctly, then the Socorro processor will be unable
Expand Down

0 comments on commit 7665840

Please sign in to comment.