Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Mar 21, 2024
1 parent 213462c commit 14a2590
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 100 deletions.
49 changes: 47 additions & 2 deletions bin/pubsub_cli.py
Expand Up @@ -14,6 +14,8 @@
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists, NotFound

from socorro import settings


@click.group()
def pubsub_group():
Expand Down Expand Up @@ -68,6 +70,7 @@ def create_topic(ctx, project_id, topic_name):
@click.argument("subscription_name")
@click.pass_context
def create_subscription(ctx, project_id, topic_name, subscription_name):
"""Create subscription."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

Expand All @@ -85,7 +88,7 @@ def create_subscription(ctx, project_id, topic_name, subscription_name):
@click.argument("topic_name")
@click.pass_context
def delete_topic(ctx, project_id, topic_name):
"""Delete a topic."""
"""Delete a topic and all subscriptions."""
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_name)
Expand Down Expand Up @@ -131,7 +134,9 @@ def pull(ctx, project_id, subscription_name, ack, max_messages):
subscription_path = subscriber.subscription_path(project_id, subscription_name)

response = subscriber.pull(
subscription=subscription_path, max_messages=max_messages
subscription=subscription_path,
max_messages=max_messages,
return_immediately=True,
)
if not response.received_messages:
return
Expand All @@ -146,5 +151,45 @@ def pull(ctx, project_id, subscription_name, ack, max_messages):
subscriber.acknowledge(subscription=subscription_path, ack_ids=ack_ids)


@pubsub_group.command("create-all")
@click.pass_context
def create_all(ctx):
"""Create SQS queues related to processing."""
options = settings.QUEUE_PUBSUB["options"]
project_id = options["project_id"]
queues = {
options["standard_topic_name"]: options["standard_subscription_name"],
options["priority_topic_name"]: options["priority_subscription_name"],
options["reprocessing_topic_name"]: options["reprocessing_subscription_name"],
}
for topic_name, subscription_name in queues.items():
ctx.invoke(create_topic, project_id=project_id, topic_name=topic_name)
ctx.invoke(
create_subscription,
project_id=project_id,
topic_name=topic_name,
subscription_name=subscription_name,
)


@pubsub_group.command("delete-all")
@click.pass_context
def delete_all(ctx):
"""Delete SQS queues related to processing."""
options = settings.QUEUE_PUBSUB["options"]
project_id = options["project_id"]
for topic_name in (
options["standard_topic_name"],
options["priority_topic_name"],
options["reprocessing_topic_name"],
):
ctx.invoke(delete_topic, project_id=project_id, topic_name=topic_name)


def main(argv=None):
argv = argv or []
pubsub_group(argv)


if __name__ == "__main__":
pubsub_group()
4 changes: 4 additions & 0 deletions bin/setup_services.sh
Expand Up @@ -28,5 +28,9 @@ set -euo pipefail
/app/socorro-cmd sqs delete-all
/app/socorro-cmd sqs create-all

# Delete and create Pub/Sub queues
/app/socorro-cmd pubsub delete-all
/app/socorro-cmd pubsub create-all

# Initialize the cronrun bookkeeping for all configured jobs to success
/app/webapp/manage.py cronmarksuccess all
10 changes: 5 additions & 5 deletions bin/sqs_cli.py
Expand Up @@ -23,8 +23,8 @@


def get_client():
cls = import_class(settings.QUEUE["class"])
return cls.build_client(**settings.QUEUE["options"])
cls = import_class(settings.QUEUE_SQS["class"])
return cls.build_client(**settings.QUEUE_SQS["options"])


@click.group()
Expand Down Expand Up @@ -141,7 +141,7 @@ def create(ctx, queue):

conn = get_client()

cls = import_class(settings.QUEUE["class"])
cls = import_class(settings.QUEUE_SQS["class"])
cls.validate_queue_name(queue)
try:
conn.get_queue_url(QueueName=queue)
Expand All @@ -157,7 +157,7 @@ def create(ctx, queue):
@click.pass_context
def create_all(ctx):
"""Create SQS queues related to processing."""
options = settings.QUEUE["options"]
options = settings.QUEUE_SQS["options"]
for queue in (
options["standard_queue"],
options["priority_queue"],
Expand Down Expand Up @@ -192,7 +192,7 @@ def delete(ctx, queue):
@click.pass_context
def delete_all(ctx):
"""Delete SQS queues related to processing."""
options = settings.QUEUE["options"]
options = settings.QUEUE_SQS["options"]
for queue in (
options["standard_queue"],
options["priority_queue"],
Expand Down
3 changes: 3 additions & 0 deletions bin/test.sh
Expand Up @@ -38,6 +38,9 @@ echo ">>> build sqs things and db things"
# Clear SQS for tests
./socorro-cmd sqs delete-all

# Clear Pub/Sub for tests
./socorro-cmd pubsub delete-all

# Set up socorro_test db
./socorro-cmd db drop || true
./socorro-cmd db create
Expand Down
9 changes: 9 additions & 0 deletions docker/config/test.env
Expand Up @@ -15,6 +15,15 @@ SQS_STANDARD_QUEUE=test-standard
SQS_PRIORITY_QUEUE=test-priority
SQS_REPROCESSING_QUEUE=test-reprocessing

# Pub/Sub
PUBSUB_PROJECT_ID=test
PUBSUB_STANDARD_TOPIC_NAME=test-standard
PUBSUB_PRIORITY_TOPIC_NAME=test-priority
PUBSUB_REPROCESSING_TOPIC_NAME=test-reprocessing
PUBSUB_STANDARD_SUBSCRIPTION_NAME=test-standard
PUBSUB_PRIORITY_SUBSCRIPTION_NAME=test-priority
PUBSUB_REPROCESSING_SUBSCRIPTION_NAME=test-reprocessing

# Elasticsearch configuration
ELASTICSEARCH_INDEX=testsocorro%Y%W
ELASTICSEARCH_INDEX_REGEX='^testsocorro[0-9]{6}$'
Expand Down
4 changes: 4 additions & 0 deletions pytest.ini
Expand Up @@ -5,6 +5,7 @@
addopts = -rsxX --tb=native -p no:django
norecursedirs = .git docs config docker __pycache__
testpaths = socorro/

# Transform all warnings into errors
filterwarnings =
error
Expand All @@ -17,3 +18,6 @@ filterwarnings =
ignore:'urllib3.contrib.pyopenssl':DeprecationWarning
# django 3.2 imports cgi module which is deprecated
ignore:'cgi' is deprecated and slated for removal in Python 3.13:DeprecationWarning:django
# pubsub deprecated the return_immediately flag because it negatively impacts performance, but
# that performance cost is fine for our use case, especially in tests.
ignore:The return_immediately flag is deprecated and should be set to False.:DeprecationWarning:google.pubsub_v1
1 change: 1 addition & 0 deletions socorro-cmd
Expand Up @@ -103,6 +103,7 @@ COMMANDS = [
"db": import_path("socorro.scripts.db.db_group"),
"es": import_path("es_cli.main"),
"sqs": import_path("sqs_cli.main"),
"pubsub": import_path("pubsub_cli.main"),
},
),
Group(
Expand Down
8 changes: 5 additions & 3 deletions socorro/external/pubsub/crashqueue.py
Expand Up @@ -45,8 +45,8 @@ class PubSubCrashQueue(CrashQueueBase):
* ``roles/pubsub.publisher``
Socorro sends messages to topics--this is how the webapp publishes crash ids
to the priority and reprocessing queues. This requires permissions from the
Socorro webapp sends messages to topics--this is how the webapp publishes crash
ids to the priority and reprocessing queues. This requires permissions from the
``roles/pubsub.publisher`` role.
* ``roles/pubsub.subscriber``
Expand Down Expand Up @@ -174,7 +174,7 @@ def ack_crash(self, subscription_path, ack_id):
logger.debug("ack %s from %s", ack_id, subscription_path)

def __iter__(self):
"""Return iterator over crash ids from AWS SQS.
"""Return iterator over crash ids from Pub/Sub.
Each returned crash is a ``(crash_id, {kwargs})`` tuple with
``finished_func`` as the only key in ``kwargs``. The caller should call
Expand All @@ -187,6 +187,7 @@ def __iter__(self):
resp = self.subscriber.pull(
subscription=subscription_path,
max_messages=self.pull_max_messages,
return_immediately=True,
)
msgs = resp.received_messages

Expand Down Expand Up @@ -243,6 +244,7 @@ def publish(self, queue, crash_ids):
try:
future.result()
except Exception:
logger.exception(f"Crashid failed to publish: {batch[i]}")
failed.append(batch[i])

if failed:
Expand Down
138 changes: 74 additions & 64 deletions socorro/mozilla_settings.py
Expand Up @@ -139,9 +139,80 @@ def _or_none(val):
),
)

# Crash report processing queue configuration if CLOUD_PROVIDER == AWS
QUEUE_SQS = {
"class": "socorro.external.sqs.crashqueue.SQSCrashQueue",
"options": {
"standard_queue": _config(
"SQS_STANDARD_QUEUE",
default="standard-queue",
doc="Name for the standard processing queue.",
),
"priority_queue": _config(
"SQS_PRIORITY_QUEUE",
default="priority-queue",
doc="Name for the priority processing queue.",
),
"reprocessing_queue": _config(
"SQS_REPROCESSING_QUEUE",
default="reprocessing-queue",
doc="Name for the reprocessing queue.",
),
"access_key": _config("SQS_ACCESS_KEY", default="", doc="SQS access key."),
"secret_access_key": _config(
"SQS_SECRET_ACCESS_KEY",
default="",
doc="SQS secret access key.",
),
"region": _config("SQS_REGION", default="", doc="SQS region."),
"endpoint_url": LOCAL_DEV_AWS_ENDPOINT_URL,
},
}
# Crash report processing queue configuration if CLOUD_PROVIDER == GCP
QUEUE_PUBSUB = {
"class": "socorro.external.pubsub.crashqueue.PubSubCrashQueue",
"options": {
"project_id": _config(
"PUBSUB_PROJECT_ID",
default="test",
doc="Google Compute Platform project_id.",
),
"standard_topic_name": _config(
"PUBSUB_STANDARD_TOPIC_NAME",
default="standard-queue",
doc="Topic name for the standard processing queue.",
),
"standard_subscription_name": _config(
"PUBSUB_STANDARD_SUBSCRIPTION_NAME",
default="standard-queue",
doc="Subscription name for the standard processing queue.",
),
"priority_topic_name": _config(
"PUBSUB_PRIORITY_TOPIC_NAME",
default="priority-queue",
doc="Topic name for the priority processing queue.",
),
"priority_subscription_name": _config(
"PUBSUB_PRIORITY_SUBSCRIPTION_NAME",
default="priority-queue",
doc="Subscription name for the priority processing queue.",
),
"reprocessing_topic_name": _config(
"PUBSUB_REPROCESSING_TOPIC_NAME",
default="reprocessing-queue",
doc="Topic name for the reprocessing queue.",
),
"reprocessing_subscription_name": _config(
"PUBSUB_REPROCESSING_SUBSCRIPTION_NAME",
default="reprocessing-queue",
doc="Subscription name for the reprocessing queue.",
),
},
}


def cloud_provider_parser(val):
"""If the value is an empty string, then return None"""
"""Return 'AWS' or 'GCP'."""
normalized = val.strip().upper()
if normalized in ("AWS", "GCP"):
return normalized
Expand All @@ -156,70 +227,9 @@ def cloud_provider_parser(val):
doc="The cloud provider to use for queueing and blob storage. Must be AWS or GCP.",
)
if CLOUD_PROVIDER == "AWS":
# Crash report processing queue configuration
QUEUE = {
"class": "socorro.external.sqs.crashqueue.SQSCrashQueue",
"options": {
"standard_queue": _config(
"SQS_STANDARD_QUEUE",
default="standard-queue",
doc="Name for the standard processing queue.",
),
"priority_queue": _config(
"SQS_PRIORITY_QUEUE",
default="priority-queue",
doc="Name for the priority processing queue.",
),
"reprocessing_queue": _config(
"SQS_REPROCESSING_QUEUE",
default="reprocessing-queue",
doc="Name for the reprocessing queue.",
),
"access_key": _config("SQS_ACCESS_KEY", default="", doc="SQS access key."),
"secret_access_key": _config(
"SQS_SECRET_ACCESS_KEY",
default="",
doc="SQS secret access key.",
),
"region": _config("SQS_REGION", default="", doc="SQS region."),
"endpoint_url": LOCAL_DEV_AWS_ENDPOINT_URL,
},
}
QUEUE = QUEUE_SQS
elif CLOUD_PROVIDER == "GCP":
# Crash report processing queue configuration
QUEUE = {
"class": "socorro.external.pubsub.crashqueue.PubSubCrashQueue",
"options": {
"project_id": _config(
"PUBSUB_PROJECT_ID",
doc="Google Compute Platform project_id.",
),
"standard_topic_name": _config(
"PUBSUB_STANDARD_TOPIC_NAME",
doc="Topic name for the standard processing queue.",
),
"standard_subscription_name": _config(
"PUBSUB_STANDARD_SUBSCRIPTION_NAME",
doc="Subscription name for the standard processing queue.",
),
"priority_topic_name": _config(
"PUBSUB_PRIORITY_TOPIC_NAME",
doc="Topic name for the priority processing queue.",
),
"priority_subscription_name": _config(
"PUBSUB_PRIORITY_SUBSCRIPTION_NAME",
doc="Subscription name for the priority processing queue.",
),
"reprocessing_topic_name": _config(
"PUBSUB_REPROCESSING_TOPIC_NAME",
doc="Topic name for the reprocessing queue.",
),
"reprocessing_subscription_name": _config(
"PUBSUB_REPROCESSING_SUBSCRIPTION_NAME",
doc="Subscription name for the reprocessing queue.",
),
},
}
QUEUE = QUEUE_PUBSUB

S3_STORAGE = {
"class": "socorro.external.boto.crashstorage.BotoS3CrashStorage",
Expand Down

0 comments on commit 14a2590

Please sign in to comment.