diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 30750f3d0d7d..8d7b5f2b21f7 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -941,6 +941,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.seer.entrypoints.operator", "sentry.seer.entrypoints.slack.messaging", "sentry.seer.entrypoints.slack.tasks", + "sentry.snuba.query_subscriptions.run", "sentry.snuba.tasks", "sentry.tasks.activity", "sentry.tasks.assemble", diff --git a/src/sentry/snuba/query_subscriptions/run.py b/src/sentry/snuba/query_subscriptions/run.py index a2e157a26fbd..a6389047597d 100644 --- a/src/sentry/snuba/query_subscriptions/run.py +++ b/src/sentry/snuba/query_subscriptions/run.py @@ -12,11 +12,21 @@ ) from arroyo.types import BrokerValue, Commit, Message, Partition from sentry_kafka_schemas import get_codec +from taskbroker_client.registry import TaskNamespace from sentry import options from sentry.conf.types.kafka_definition import Topic +from sentry.silo.base import SiloMode from sentry.snuba.dataset import Dataset from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import ( + snuba_eap_subscriptions_raw_tasks, + snuba_events_subscriptions_raw_tasks, + snuba_generic_metrics_subscriptions_raw_tasks, + snuba_metrics_subscriptions_raw_tasks, + snuba_transactions_subscriptions_raw_tasks, +) from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing from sentry.utils.kafka_config import get_topic_definition @@ -111,3 +121,68 @@ def process_message( "value": message_value, }, ) + + +def _process_subscription_message(message_bytes: bytes, dataset: Dataset) -> None: + """Process a subscription message from raw Kafka message bytes.""" + from sentry.snuba.query_subscriptions.consumer import handle_message + from sentry.utils import metrics + + logical_topic = dataset_to_logical_topic[dataset] + topic = get_topic_definition(Topic(logical_topic))["real_topic_name"] + + with ( + sentry_sdk.start_transaction( + op="handle_message", + name="query_subscription_consumer_process_message", + custom_sampling_context={"sample_rate": options.get("subscriptions-query.sample-rate")}, + ), + metrics.timer("snuba_query_subscriber.handle_message", tags={"dataset": dataset.value}), + ): + try: + handle_message( + message_bytes, + -1, # offset not available in raw mode + -1, # partition not available in raw mode + topic, + dataset.value, + get_codec(logical_topic), + ) + except Exception: + logger.exception( + "Unexpected error while handling message in QuerySubscriptionStrategy. Skipping message.", + extra={"value": message_bytes}, + ) + + +def _register_subscription_tasks() -> None: + tasks: dict[str, tuple[Dataset, TaskNamespace]] = { + "events": (Dataset.Events, snuba_events_subscriptions_raw_tasks), + "transactions": (Dataset.Transactions, snuba_transactions_subscriptions_raw_tasks), + "metrics": (Dataset.Metrics, snuba_metrics_subscriptions_raw_tasks), + "generic_metrics": ( + Dataset.PerformanceMetrics, + snuba_generic_metrics_subscriptions_raw_tasks, + ), + "eap": (Dataset.EventsAnalyticsPlatform, snuba_eap_subscriptions_raw_tasks), + } + + registered_datasets = {dataset for dataset, _ in tasks.values()} + expected_datasets = set(dataset_to_logical_topic.keys()) + assert registered_datasets == expected_datasets, ( + f"Missing tasks for datasets: {expected_datasets - registered_datasets}" + ) + + for name, (dataset, namespace) in tasks.items(): + + @instrumented_task( + name=f"sentry.snuba.query_subscriptions.run.process_{name}_subscription_from_kafka", + namespace=namespace, + processing_deadline_duration=60, + silo_mode=SiloMode.CELL, + ) + def task_fn(message_bytes: bytes, _d: Dataset = dataset) -> None: + _process_subscription_message(message_bytes, _d) + + +_register_subscription_tasks() diff --git a/src/sentry/taskworker/namespaces.py b/src/sentry/taskworker/namespaces.py index bfadb09ea488..ec1e668771bd 100644 --- a/src/sentry/taskworker/namespaces.py +++ b/src/sentry/taskworker/namespaces.py @@ -112,6 +112,31 @@ app_feature="errors", ) +snuba_events_subscriptions_raw_tasks = app.taskregistry.create_namespace( + "snuba.subscriptions.events.raw", + app_feature="errors", +) + +snuba_transactions_subscriptions_raw_tasks = app.taskregistry.create_namespace( + "snuba.subscriptions.transactions.raw", + app_feature="transactions", +) + +snuba_metrics_subscriptions_raw_tasks = app.taskregistry.create_namespace( + "snuba.subscriptions.metrics.raw", + app_feature="sessions", +) + +snuba_generic_metrics_subscriptions_raw_tasks = app.taskregistry.create_namespace( + "snuba.subscriptions.generic_metrics.raw", + app_feature="transactions", +) + +snuba_eap_subscriptions_raw_tasks = app.taskregistry.create_namespace( + "snuba.subscriptions.eap.raw", + app_feature="transactions", +) + issues_tasks = app.taskregistry.create_namespace( "issues", app_feature="issueplatform",