Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.seer.entrypoints.operator",
"sentry.seer.entrypoints.slack.messaging",
"sentry.seer.entrypoints.slack.tasks",
"sentry.snuba.query_subscriptions.run",
"sentry.snuba.tasks",
"sentry.tasks.activity",
"sentry.tasks.assemble",
Expand Down
75 changes: 75 additions & 0 deletions src/sentry/snuba/query_subscriptions/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@
)
from arroyo.types import BrokerValue, Commit, Message, Partition
from sentry_kafka_schemas import get_codec
from taskbroker_client.registry import TaskNamespace

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.silo.base import SiloMode
from sentry.snuba.dataset import Dataset
from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import (
snuba_eap_subscriptions_raw_tasks,
snuba_events_subscriptions_raw_tasks,
snuba_generic_metrics_subscriptions_raw_tasks,
snuba_metrics_subscriptions_raw_tasks,
snuba_transactions_subscriptions_raw_tasks,
)
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
from sentry.utils.kafka_config import get_topic_definition

Expand Down Expand Up @@ -111,3 +121,68 @@ def process_message(
"value": message_value,
},
)


def _process_subscription_message(message_bytes: bytes, dataset: Dataset) -> None:
"""Process a subscription message from raw Kafka message bytes."""
from sentry.snuba.query_subscriptions.consumer import handle_message
from sentry.utils import metrics
Comment on lines +128 to +129
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circular import?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i misinterpreted this. you're asking why the import is in the fn, not claiming there is a circular import. the honest answer is that this is vibecoded. i'll move it out in a followup.


logical_topic = dataset_to_logical_topic[dataset]
topic = get_topic_definition(Topic(logical_topic))["real_topic_name"]

with (
sentry_sdk.start_transaction(
op="handle_message",
name="query_subscription_consumer_process_message",
custom_sampling_context={"sample_rate": options.get("subscriptions-query.sample-rate")},
),
metrics.timer("snuba_query_subscriber.handle_message", tags={"dataset": dataset.value}),
):
try:
handle_message(
message_bytes,
-1, # offset not available in raw mode
-1, # partition not available in raw mode
topic,
dataset.value,
get_codec(logical_topic),
)
except Exception:
logger.exception(
"Unexpected error while handling message in QuerySubscriptionStrategy. Skipping message.",
extra={"value": message_bytes},
)


def _register_subscription_tasks() -> None:
tasks: dict[str, tuple[Dataset, TaskNamespace]] = {
"events": (Dataset.Events, snuba_events_subscriptions_raw_tasks),
"transactions": (Dataset.Transactions, snuba_transactions_subscriptions_raw_tasks),
"metrics": (Dataset.Metrics, snuba_metrics_subscriptions_raw_tasks),
"generic_metrics": (
Dataset.PerformanceMetrics,
snuba_generic_metrics_subscriptions_raw_tasks,
),
"eap": (Dataset.EventsAnalyticsPlatform, snuba_eap_subscriptions_raw_tasks),
}

registered_datasets = {dataset for dataset, _ in tasks.values()}
expected_datasets = set(dataset_to_logical_topic.keys())
assert registered_datasets == expected_datasets, (
f"Missing tasks for datasets: {expected_datasets - registered_datasets}"
)

for name, (dataset, namespace) in tasks.items():

@instrumented_task(
name=f"sentry.snuba.query_subscriptions.run.process_{name}_subscription_from_kafka",
namespace=namespace,
processing_deadline_duration=60,
silo_mode=SiloMode.CELL,
)
def task_fn(message_bytes: bytes, _d: Dataset = dataset) -> None:
_process_subscription_message(message_bytes, _d)


_register_subscription_tasks()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why dynamically construct them when you're immediately calling the function? Can these tasks be statically defined?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this dynamic code mainly enforces that PRs adding new datasets also create the respective task for it.

25 changes: 25 additions & 0 deletions src/sentry/taskworker/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,31 @@
app_feature="errors",
)

snuba_events_subscriptions_raw_tasks = app.taskregistry.create_namespace(
"snuba.subscriptions.events.raw",
app_feature="errors",
)

snuba_transactions_subscriptions_raw_tasks = app.taskregistry.create_namespace(
"snuba.subscriptions.transactions.raw",
app_feature="transactions",
)

snuba_metrics_subscriptions_raw_tasks = app.taskregistry.create_namespace(
"snuba.subscriptions.metrics.raw",
app_feature="sessions",
)

snuba_generic_metrics_subscriptions_raw_tasks = app.taskregistry.create_namespace(
"snuba.subscriptions.generic_metrics.raw",
app_feature="transactions",
)

snuba_eap_subscriptions_raw_tasks = app.taskregistry.create_namespace(
"snuba.subscriptions.eap.raw",
app_feature="transactions",
)

issues_tasks = app.taskregistry.create_namespace(
"issues",
app_feature="issueplatform",
Expand Down
Loading