From 62b6da7a88a385af36cf0479282db0b2db6f8296 Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Thu, 21 Oct 2021 15:26:37 +0200 Subject: [PATCH 1/7] Register save_event_transaction queue --- src/sentry/conf/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 0f9a1e717c0bc1..f9e4c0b787fa78 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -617,6 +617,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME(): routing_key="events.reprocessing.symbolicate_event_low_priority", ), Queue("events.save_event", routing_key="events.save_event"), + Queue("events.save_event_transaction", routing_key="events.save_event_transaction"), Queue("events.symbolicate_event", routing_key="events.symbolicate_event"), Queue( "events.symbolicate_event_low_priority", routing_key="events.symbolicate_event_low_priority" From 1502771d1282e3e5e2bd70c22e2b7cf977e73bf3 Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Thu, 21 Oct 2021 16:16:10 +0200 Subject: [PATCH 2/7] ref(processing): Add separate task for save_event of transaction events --- src/sentry/tasks/store.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/sentry/tasks/store.py b/src/sentry/tasks/store.py index 5e346df86f5777..3cfb9bd1aae74f 100644 --- a/src/sentry/tasks/store.py +++ b/src/sentry/tasks/store.py @@ -761,3 +761,20 @@ def save_event( **kwargs: Any, ) -> None: _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) + + +@instrumented_task( # type: ignore + name="sentry.tasks.store.save_event_transaction", + queue="events.save_event_transaction", + time_limit=65, + soft_time_limit=60, +) +def save_event_transaction( + cache_key: Optional[str] = None, + data: Optional[Event] = None, + start_time: Optional[int] = None, + event_id: Optional[str] = None, + project_id: Optional[int] = None, + **kwargs: Any, +) -> None: + _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs) From 5860cf4362043b4d6006b1a479fed033fa10d47e Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Thu, 21 Oct 2021 16:22:17 +0200 Subject: [PATCH 3/7] ref(processing): Submit save_event for transaction directly from ingest consumer --- src/sentry/ingest/ingest_consumer.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/sentry/ingest/ingest_consumer.py b/src/sentry/ingest/ingest_consumer.py index 732755d3ba0dd4..d2310373d767ea 100644 --- a/src/sentry/ingest/ingest_consumer.py +++ b/src/sentry/ingest/ingest_consumer.py @@ -31,7 +31,7 @@ from sentry.killswitches import killswitch_matches_context from sentry.models import Project from sentry.signals import event_accepted -from sentry.tasks.store import preprocess_event +from sentry.tasks.store import preprocess_event, save_event_transaction from sentry.utils import json, metrics from sentry.utils.batching_kafka_consumer import AbstractBatchWorker from sentry.utils.cache import cache_key_for_event @@ -279,17 +279,28 @@ def dispatch_task(cache_key: str) -> None: cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT ) - # Preprocess this event, which spawns either process_event or - # save_event. Pass data explicitly to avoid fetching it again from the - # cache. - with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): - preprocess_event( + if data.get("type") == "transaction": + # No need for preprocess/process for transactions thus submit + # directly transaction specific save_event task. + save_event_transaction.delay( cache_key=cache_key, - data=data, + data=None, start_time=start_time, event_id=event_id, - project=project, + project_id=project_id, ) + else: + # Preprocess this event, which spawns either process_event or + # save_event. Pass data explicitly to avoid fetching it again from the + # cache. + with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"): + preprocess_event( + cache_key=cache_key, + data=data, + start_time=start_time, + event_id=event_id, + project=project, + ) # remember for an 1 hour that we saved this event (deduplication protection) cache.set(deduplication_key, "", CACHE_TIMEOUT) From 14500bf32d852ed84743f654b8b1e1b524ce644f Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Wed, 27 Oct 2021 16:08:04 +0200 Subject: [PATCH 4/7] Add option for controlled roll-out --- src/sentry/ingest/ingest_consumer.py | 5 +++-- src/sentry/options/defaults.py | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/sentry/ingest/ingest_consumer.py b/src/sentry/ingest/ingest_consumer.py index d2310373d767ea..1999c3a27574b2 100644 --- a/src/sentry/ingest/ingest_consumer.py +++ b/src/sentry/ingest/ingest_consumer.py @@ -22,7 +22,7 @@ from django.conf import settings from django.core.cache import cache -from sentry import eventstore, features +from sentry import eventstore, features, options from sentry.attachments import CachedAttachment, attachment_cache from sentry.event_manager import save_attachment from sentry.eventstore.processing import event_processing_store @@ -279,7 +279,8 @@ def dispatch_task(cache_key: str) -> None: cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT ) - if data.get("type") == "transaction": + save_event_transaction_rate = options.get("store.save-transactions-ingest-consumer-rate") + if data.get("type") == "transaction" and random.random() < save_event_transaction_rate: # No need for preprocess/process for transactions thus submit # directly transaction specific save_event task. save_event_transaction.delay( diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 636a4463b8ddb2..60bb4c7ed5aae9 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -376,3 +376,7 @@ register("symbolicate-event.low-priority.metrics.submission-rate", default=0.0) register("performance.suspect-spans-ingestion-projects", default={}) + +# Sampling rate for controlled rollout of a change where ignest-consumer spawns +# special save_event task for transactions avoiding the preprocess. +register("store.save-transactions-ingest-consumer-rate", default=0.0) From 66ffc76880658b2501543ecddbeda71b79a4ae7d Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Wed, 27 Oct 2021 16:09:06 +0200 Subject: [PATCH 5/7] Add simple test for save_event_transaction dispatch --- .../test_ingest_consumer_processing.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py index cd1ee8e90d4d0d..cf60f21acb66e4 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py @@ -1,8 +1,11 @@ +import datetime import time import uuid +from unittest.mock import Mock import pytest +from sentry import options from sentry.event_manager import EventManager from sentry.ingest.ingest_consumer import ( process_attachment_chunk, @@ -20,6 +23,13 @@ def get_normalized_event(data, project): return dict(mgr.get_data()) +@pytest.fixture +def save_event_transaction(monkeypatch): + mock = Mock() + monkeypatch.setattr("sentry.ingest.ingest_consumer.save_event_transaction", mock) + return mock + + @pytest.fixture def preprocess_event(monkeypatch): calls = [] @@ -60,6 +70,79 @@ def test_deduplication_works(default_project, task_runner, preprocess_event): } +@pytest.mark.django_db +def test_transactions_spawn_save_event( + default_project, task_runner, preprocess_event, save_event_transaction +): + now = datetime.datetime.now() + event = { + "type": "transaction", + "timestamp": now.isoformat(), + "start_timestamp": now.isoformat(), + "spans": [], + "contexts": { + "trace": { + "parent_span_id": "8988cec7cc0779c1", + "type": "trace", + "op": "foobar", + "trace_id": "a7d67cf796774551a95be6543cacd459", + "span_id": "babaae0d4b7512d9", + "status": "ok", + } + }, + } + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + project_id = default_project.id + start_time = time.time() - 3600 + process_event( + { + "payload": json.dumps(payload), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + projects={default_project.id: default_project}, + ) + (kwargs,) = preprocess_event + assert kwargs == { + "cache_key": f"e:{event_id}:{project_id}", + "data": payload, + "event_id": event_id, + "project": default_project, + "start_time": start_time, + } + preprocess_event.clear() + + # TODO(michal): After we are fully on save_event_transaction remove the + # option and keep only this part of test + options.set("store.save-transactions-ingest-consumer-rate", 1.0) + payload = get_normalized_event(event, default_project) + event_id = payload["event_id"] + project_id = default_project.id + start_time = time.time() - 3600 + process_event( + { + "payload": json.dumps(payload), + "start_time": start_time, + "event_id": event_id, + "project_id": project_id, + "remote_addr": "127.0.0.1", + }, + projects={default_project.id: default_project}, + ) + assert not len(preprocess_event) + assert save_event_transaction.delay.call_args[0] == () + assert save_event_transaction.delay.call_args[1] == dict( + cache_key=f"e:{event_id}:{project_id}", + data=None, + start_time=start_time, + event_id=event_id, + project_id=2, + ) + + @pytest.mark.django_db @pytest.mark.parametrize("missing_chunks", (True, False)) def test_with_attachments(default_project, task_runner, missing_chunks, monkeypatch): From e379fc497e83f5e0053cb7ffce8eb346405ff5d5 Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Wed, 27 Oct 2021 17:44:51 +0200 Subject: [PATCH 6/7] Set rate option via pytest fixture --- .../test_ingest_consumer_processing.py | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py index cf60f21acb66e4..e082cddf4659ac 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py @@ -41,6 +41,20 @@ def inner(**kwargs): return calls +# TODO(michal): Remove when fully on save_event_transaction +@pytest.fixture +def save_event_transaction_rate(): + orig_rate = options.get("store.save-transactions-ingest-consumer-rate") + + def set_rate(rate): + options.set("store.save-transactions-ingest-consumer-rate", rate) + + try: + yield set_rate + finally: + set_rate(orig_rate) + + @pytest.mark.django_db def test_deduplication_works(default_project, task_runner, preprocess_event): payload = get_normalized_event({"message": "hello world"}, default_project) @@ -72,7 +86,11 @@ def test_deduplication_works(default_project, task_runner, preprocess_event): @pytest.mark.django_db def test_transactions_spawn_save_event( - default_project, task_runner, preprocess_event, save_event_transaction + default_project, + task_runner, + preprocess_event, + save_event_transaction, + save_event_transaction_rate, ): now = datetime.datetime.now() event = { @@ -95,6 +113,9 @@ def test_transactions_spawn_save_event( event_id = payload["event_id"] project_id = default_project.id start_time = time.time() - 3600 + + # Use the old way through preprocess_event + save_event_transaction_rate(0.0) process_event( { "payload": json.dumps(payload), @@ -117,7 +138,7 @@ def test_transactions_spawn_save_event( # TODO(michal): After we are fully on save_event_transaction remove the # option and keep only this part of test - options.set("store.save-transactions-ingest-consumer-rate", 1.0) + save_event_transaction_rate(1.0) payload = get_normalized_event(event, default_project) event_id = payload["event_id"] project_id = default_project.id From 12999856bba650297365224959cb44acd6b24594 Mon Sep 17 00:00:00 2001 From: Michal Kuffa Date: Wed, 27 Oct 2021 20:51:19 +0200 Subject: [PATCH 7/7] Fix project_id assertion --- .../ingest_consumer/test_ingest_consumer_processing.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py index e082cddf4659ac..787363c059ea77 100644 --- a/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py +++ b/tests/sentry/ingest/ingest_consumer/test_ingest_consumer_processing.py @@ -92,6 +92,7 @@ def test_transactions_spawn_save_event( save_event_transaction, save_event_transaction_rate, ): + project_id = default_project.id now = datetime.datetime.now() event = { "type": "transaction", @@ -111,7 +112,6 @@ def test_transactions_spawn_save_event( } payload = get_normalized_event(event, default_project) event_id = payload["event_id"] - project_id = default_project.id start_time = time.time() - 3600 # Use the old way through preprocess_event @@ -141,7 +141,6 @@ def test_transactions_spawn_save_event( save_event_transaction_rate(1.0) payload = get_normalized_event(event, default_project) event_id = payload["event_id"] - project_id = default_project.id start_time = time.time() - 3600 process_event( { @@ -160,7 +159,7 @@ def test_transactions_spawn_save_event( data=None, start_time=start_time, event_id=event_id, - project_id=2, + project_id=project_id, )