Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME():
routing_key="events.reprocessing.symbolicate_event_low_priority",
),
Queue("events.save_event", routing_key="events.save_event"),
Queue("events.save_event_transaction", routing_key="events.save_event_transaction"),
Queue("events.symbolicate_event", routing_key="events.symbolicate_event"),
Queue(
"events.symbolicate_event_low_priority", routing_key="events.symbolicate_event_low_priority"
Expand Down
30 changes: 21 additions & 9 deletions src/sentry/ingest/ingest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from django.conf import settings
from django.core.cache import cache

from sentry import eventstore, features
from sentry import eventstore, features, options
from sentry.attachments import CachedAttachment, attachment_cache
from sentry.event_manager import save_attachment
from sentry.eventstore.processing import event_processing_store
Expand All @@ -31,7 +31,7 @@
from sentry.killswitches import killswitch_matches_context
from sentry.models import Project
from sentry.signals import event_accepted
from sentry.tasks.store import preprocess_event
from sentry.tasks.store import preprocess_event, save_event_transaction
from sentry.utils import json, metrics
from sentry.utils.batching_kafka_consumer import AbstractBatchWorker
from sentry.utils.cache import cache_key_for_event
Expand Down Expand Up @@ -279,17 +279,29 @@ def dispatch_task(cache_key: str) -> None:
cache_key, attachments=attachment_objects, timeout=CACHE_TIMEOUT
)

# Preprocess this event, which spawns either process_event or
# save_event. Pass data explicitly to avoid fetching it again from the
# cache.
with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"):
preprocess_event(
save_event_transaction_rate = options.get("store.save-transactions-ingest-consumer-rate")
if data.get("type") == "transaction" and random.random() < save_event_transaction_rate:
# No need for preprocess/process for transactions thus submit
# directly transaction specific save_event task.
save_event_transaction.delay(
cache_key=cache_key,
data=data,
data=None,
start_time=start_time,
event_id=event_id,
project=project,
project_id=project_id,
)
else:
# Preprocess this event, which spawns either process_event or
# save_event. Pass data explicitly to avoid fetching it again from the
# cache.
with sentry_sdk.start_span(op="ingest_consumer.process_event.preprocess_event"):
preprocess_event(
cache_key=cache_key,
data=data,
start_time=start_time,
event_id=event_id,
project=project,
)

# remember for an 1 hour that we saved this event (deduplication protection)
cache.set(deduplication_key, "", CACHE_TIMEOUT)
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,7 @@
register("symbolicate-event.low-priority.metrics.submission-rate", default=0.0)

register("performance.suspect-spans-ingestion-projects", default={})

# Sampling rate for controlled rollout of a change where ignest-consumer spawns
# special save_event task for transactions avoiding the preprocess.
register("store.save-transactions-ingest-consumer-rate", default=0.0)
17 changes: 17 additions & 0 deletions src/sentry/tasks/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,3 +761,20 @@ def save_event(
**kwargs: Any,
) -> None:
_do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs)


@instrumented_task( # type: ignore
name="sentry.tasks.store.save_event_transaction",
queue="events.save_event_transaction",
time_limit=65,
soft_time_limit=60,
)
def save_event_transaction(
cache_key: Optional[str] = None,
data: Optional[Event] = None,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
project_id: Optional[int] = None,
**kwargs: Any,
) -> None:
_do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs)
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import datetime
import time
import uuid
from unittest.mock import Mock

import pytest

from sentry import options
from sentry.event_manager import EventManager
from sentry.ingest.ingest_consumer import (
process_attachment_chunk,
Expand All @@ -20,6 +23,13 @@ def get_normalized_event(data, project):
return dict(mgr.get_data())


@pytest.fixture
def save_event_transaction(monkeypatch):
mock = Mock()
monkeypatch.setattr("sentry.ingest.ingest_consumer.save_event_transaction", mock)
return mock


@pytest.fixture
def preprocess_event(monkeypatch):
calls = []
Expand All @@ -31,6 +41,20 @@ def inner(**kwargs):
return calls


# TODO(michal): Remove when fully on save_event_transaction
@pytest.fixture
def save_event_transaction_rate():
orig_rate = options.get("store.save-transactions-ingest-consumer-rate")

def set_rate(rate):
options.set("store.save-transactions-ingest-consumer-rate", rate)

try:
yield set_rate
finally:
set_rate(orig_rate)


@pytest.mark.django_db
def test_deduplication_works(default_project, task_runner, preprocess_event):
payload = get_normalized_event({"message": "hello world"}, default_project)
Expand Down Expand Up @@ -60,6 +84,85 @@ def test_deduplication_works(default_project, task_runner, preprocess_event):
}


@pytest.mark.django_db
def test_transactions_spawn_save_event(
default_project,
task_runner,
preprocess_event,
save_event_transaction,
save_event_transaction_rate,
):
project_id = default_project.id
now = datetime.datetime.now()
event = {
"type": "transaction",
"timestamp": now.isoformat(),
"start_timestamp": now.isoformat(),
"spans": [],
"contexts": {
"trace": {
"parent_span_id": "8988cec7cc0779c1",
"type": "trace",
"op": "foobar",
"trace_id": "a7d67cf796774551a95be6543cacd459",
"span_id": "babaae0d4b7512d9",
"status": "ok",
}
},
}
payload = get_normalized_event(event, default_project)
event_id = payload["event_id"]
start_time = time.time() - 3600

# Use the old way through preprocess_event
save_event_transaction_rate(0.0)
process_event(
{
"payload": json.dumps(payload),
"start_time": start_time,
"event_id": event_id,
"project_id": project_id,
"remote_addr": "127.0.0.1",
},
projects={default_project.id: default_project},
)
(kwargs,) = preprocess_event
assert kwargs == {
"cache_key": f"e:{event_id}:{project_id}",
"data": payload,
"event_id": event_id,
"project": default_project,
"start_time": start_time,
}
preprocess_event.clear()

# TODO(michal): After we are fully on save_event_transaction remove the
# option and keep only this part of test
save_event_transaction_rate(1.0)
payload = get_normalized_event(event, default_project)
event_id = payload["event_id"]
start_time = time.time() - 3600
process_event(
{
"payload": json.dumps(payload),
"start_time": start_time,
"event_id": event_id,
"project_id": project_id,
"remote_addr": "127.0.0.1",
},
projects={default_project.id: default_project},
)
assert not len(preprocess_event)
assert save_event_transaction.delay.call_args[0] == ()
assert save_event_transaction.delay.call_args[1] == dict(
cache_key=f"e:{event_id}:{project_id}",
data=None,
start_time=start_time,
event_id=event_id,
project_id=project_id,
)


@pytest.mark.django_db
@pytest.mark.parametrize("missing_chunks", (True, False))
def test_with_attachments(default_project, task_runner, missing_chunks, monkeypatch):
Expand Down