diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 0f9a1e717c0bc1..f9e4c0b787fa78 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -617,6 +617,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME(): routing_key="events.reprocessing.symbolicate_event_low_priority", ), Queue("events.save_event", routing_key="events.save_event"), + Queue("events.save_event_transaction", routing_key="events.save_event_transaction"), Queue("events.symbolicate_event", routing_key="events.symbolicate_event"), Queue( "events.symbolicate_event_low_priority", routing_key="events.symbolicate_event_low_priority" diff --git a/src/sentry/ingest/ingest_consumer.py b/src/sentry/ingest/ingest_consumer.py index 732755d3ba0dd4..1999c3a27574b2 100644 --- a/src/sentry/ingest/ingest_consumer.py +++ b/src/sentry/ingest/ingest_consumer.py @@ -22,7 +22,7 @@ from django.conf import settings from django.core.cache import cache -from sentry import eventstore, features +from sentry import eventstore, features, options from sentry.attachments import CachedAttachment, attachment_cache from sentry.event_manager import save_attachment from sentry.eventstore.processing import event_processing_store @@ -31,7 +31,7 @@ from sentry.killswitches import killswitch_matches_context from sentry.models import Project from sentry.signals import event_accepted -from sentry.tasks.store import preprocess_event +from sentry.tasks.store import preprocess_event, save_event_transaction from sentry.utils import json, metrics from sentry.utils.batching_kafka_consumer import AbstractBatchWorker from sentry.utils.cache import cache_key_for_event @@ -279,17 +279,29 @@ def dispatch_task(cache_key: str) -> None: cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT ) - # Preprocess this event, which spawns either process_event or - # save_event. Pass data explicitly to avoid fetching it again from the - # cache. - with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): - preprocess_event( + save_event_transaction_rate = options.get("store.save-transactions-ingest-consumer-rate") + if data.get("type") == "transaction" and random.random() < save_event_transaction_rate: + # No need for preprocess/process for transactions thus submit + # directly transaction specific save_event task. + save_event_transaction.delay( cache_key=cache_key, - data=data, + data=None, start_time=start_time, event_id=event_id, - project=project, + project_id=project_id, ) + else: + # Preprocess this event, which spawns either process_event or + # save_event. Pass data explicitly to avoid fetching it again from the + # cache. + with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): + preprocess_event( + cache_key=cache_key, + data=data, + start_time=start_time, + event_id=event_id, + project=project, + ) # remember for an 1 hour that we saved this event (deduplication protection) cache.set(deduplication_key, "", CACHE_TIMEOUT) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 636a4463b8ddb2..60bb4c7ed5aae9 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -376,3 +376,7 @@ register("symbolicate-event.low-priority.metrics.submission-rate", default=0.0) register("performance.suspect-spans-ingestion-projects", default={}) + +# Sampling rate for controlled rollout of a change where ignest-consumer spawns +# special save_event task for transactions avoiding the preprocess. +register("store.save-transactions-ingest-consumer-rate", default=0.0) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 5e346df86f5777..3cfb9bd1aae74f 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -761,3 +761,20 @@ def save_event( **kwargs: Any, ) -> None: _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) + + +@instrumented_task( # type: ignore + name="sentry.tasks.store.save_event_transaction", + queue="events.save_event_transaction", + time_limit=65, + soft_time_limit=60, +) +def save_event_transaction( + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, + **kwargs: Any, +) -> None: + _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py index cd1ee8e90d4d0d..787363c059ea77 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py @@ -1,8 +1,11 @@ +import datetime import time import uuid +from unittest.mock import Mock import pytest +from sentry import options from sentry.event_manager import EventManager from sentry.ingest.ingest_consumer import ( process_attachment_chunk, @@ -20,6 +23,13 @@ def get_normalized_event(data, project): return dict(mgr.get_data()) +@pytest.fixture +def save_event_transaction(monkeypatch): + mock = Mock() + monkeypatch.setattr("sentry.ingest.ingest_consumer.save_event_transaction", mock) + return mock + + @pytest.fixture def preprocess_event(monkeypatch): calls = [] @@ -31,6 +41,20 @@ def inner(**kwargs): return calls +# TODO(michal): Remove when fully on save_event_transaction +@pytest.fixture +def save_event_transaction_rate(): + orig_rate = options.get("store.save-transactions-ingest-consumer-rate") + + def set_rate(rate): + options.set("store.save-transactions-ingest-consumer-rate", rate) + + try: + yield set_rate + finally: + set_rate(orig_rate) + + @pytest.mark.django_db def test_deduplication_works(default_project, task_runner, preprocess_event): payload = get_normalized_event({"message": "hello world"}, default_project) @@ -60,6 +84,85 @@ def test_deduplication_works(default_project, task_runner, preprocess_event): } +@pytest.mark.django_db +def test_transactions_spawn_save_event( + default_project, + task_runner, + preprocess_event, + save_event_transaction, + save_event_transaction_rate, +): + project_id = default_project.id + now = datetime.datetime.now() + event = { + "type": "transaction", + "timestamp": now.isoformat(), + "start_timestamp": now.isoformat(), + "spans": [], + "contexts": { + "trace": { + "parent_span_id": "8988cec7cc0779c1", + "type": "trace", + "op": "foobar", + "trace_id": "a7d67cf796774551a95be6543cacd459", + "span_id": "babaae0d4b7512d9", + "status": "ok", + } + }, + } + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + start_time = time.time() - 3600 + + # Use the old way through preprocess_event + save_event_transaction_rate(0.0) + process_event( + { + "payload": json.dumps(payload), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + projects={default_project.id: default_project}, + ) + (kwargs,) = preprocess_event + assert kwargs == { + "cache_key": f"e:{event_id}:{project_id}", + "data": payload, + "event_id": event_id, + "project": default_project, + "start_time": start_time, + } + preprocess_event.clear() + + # TODO(michal): After we are fully on save_event_transaction remove the + # option and keep only this part of test + save_event_transaction_rate(1.0) + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + start_time = time.time() - 3600 + process_event( + { + "payload": json.dumps(payload), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + projects={default_project.id: default_project}, + ) + assert not len(preprocess_event) + assert save_event_transaction.delay.call_args[0] == () + assert save_event_transaction.delay.call_args[1] == dict( + cache_key=f"e:{event_id}:{project_id}", + data=None, + start_time=start_time, + event_id=event_id, + project_id=project_id, + ) + + @pytest.mark.django_db @pytest.mark.parametrize("missing_chunks", (True, False)) def test_with_attachments(default_project, task_runner, missing_chunks, monkeypatch):