From b71da7fea020c45de613cca36de20eb68e43b37b Mon Sep 17 00:00:00 2001 From: Shashank Jarmale Date: Tue, 18 Nov 2025 14:25:29 -0800 Subject: [PATCH 1/2] Synchronously write occurrences to EAP via `POST` request in the `SnubaEventStream` --- src/sentry/eventstream/snuba.py | 36 +++++++- tests/sentry/eventstream/test_eventstream.py | 92 ++++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/src/sentry/eventstream/snuba.py b/src/sentry/eventstream/snuba.py index 594ebdcb19e1da..96042b1a33051c 100644 --- a/src/sentry/eventstream/snuba.py +++ b/src/sentry/eventstream/snuba.py @@ -17,7 +17,7 @@ from sentry.models.project import Project from sentry.options.rollout import in_rollout_group from sentry.services.eventstore.models import GroupEvent -from sentry.utils import json, snuba +from sentry.utils import json, metrics, snuba from sentry.utils.safe import get_path from sentry.utils.sdk import set_current_event_project @@ -494,6 +494,40 @@ def _send( except urllib3.exceptions.HTTPError as err: raise snuba.SnubaError(err) + def _send_item(self, trace_item: TraceItem) -> None: + try: + resp = snuba._snuba_pool.urlopen( + "POST", + "/tests/entities/eap_items/insert_bytes", + fields={"item_0": trace_item.SerializeToString()}, + ) + + if resp.status == 200: + metrics.incr("eventstream.eap.occurrence_insert.success") + else: + logger.warning( + "Failed to insert EAP occurrence item", + extra={ + "status": resp.status, + "organization_id": trace_item.organization_id, + "project_id": trace_item.project_id, + "item_id": trace_item.item_id.decode("utf-8"), + "trace_id": trace_item.trace_id, + }, + ) + metrics.incr("eventstream.eap.occurrence_insert.failure") + except Exception: + logger.exception( + "Exception while inserting EAP occurrence item", + extra={ + "organization_id": trace_item.organization_id, + "project_id": trace_item.project_id, + "item_id": trace_item.item_id.decode("utf-8"), + "trace_id": trace_item.trace_id, + }, + ) + metrics.incr("eventstream.eap.occurrence_insert.failure") + def requires_post_process_forwarder(self) -> bool: return False diff --git a/tests/sentry/eventstream/test_eventstream.py b/tests/sentry/eventstream/test_eventstream.py index 52182b729e621a..0c1cbe66eddf7c 100644 --- a/tests/sentry/eventstream/test_eventstream.py +++ b/tests/sentry/eventstream/test_eventstream.py @@ -466,3 +466,95 @@ def test_event_forwarding_to_items(self): assert trace_item.organization_id == event.project.organization_id assert trace_item.retention_days == 90 assert trace_item.attributes["group_id"].int_value == group_info.group.id + + def test_snuba_event_stream_forwarding_to_items(self): + create_default_projects() + es = SnubaEventStream() + + # Prepare a generic event with a span item + profile_message = load_data("generic-event-profiling") + event_data = { + **profile_message["event"], + "contexts": {"trace": {"trace_id": uuid.uuid4().hex}}, + "timestamp": timezone.now().isoformat(), + } + project_id = event_data.get("project_id", self.project.id) + + occurrence, group_info = self.process_occurrence( + event_id=event_data["event_id"], + project_id=project_id, + event_data=event_data, + ) + assert group_info is not None + + event = Event( + event_id=occurrence.event_id, + project_id=project_id, + data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)), + ) + group_event = event.for_group(group_info.group) + group_event.occurrence = occurrence + + with self.options({"eventstream.eap_forwarding_rate": 1.0}): + # Mock both _send and _send_item to avoid schema validation and verify EAP forwarding + with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item: + es.insert( + group_event, + is_new=True, + is_regression=True, + is_new_group_environment=False, + primary_hash="", + skip_consume=False, + received_timestamp=event_data["timestamp"], + ) + mock_send_item.assert_called_once() + + trace_item = mock_send_item.call_args[0][0] + assert trace_item.item_id == event.event_id.encode("utf-8") + assert trace_item.item_type == TRACE_ITEM_TYPE_OCCURRENCE + assert trace_item.trace_id == event_data["contexts"]["trace"]["trace_id"] + assert trace_item.project_id == event.project_id + assert trace_item.organization_id == event.project.organization_id + assert trace_item.retention_days == 90 + assert trace_item.attributes["group_id"].int_value == group_info.group.id + + def test_snuba_event_stream_no_forwarding_when_rate_zero(self): + create_default_projects() + es = SnubaEventStream() + + # Prepare a generic event with a span item + profile_message = load_data("generic-event-profiling") + event_data = { + **profile_message["event"], + "contexts": {"trace": {"trace_id": uuid.uuid4().hex}}, + "timestamp": timezone.now().isoformat(), + } + project_id = event_data.get("project_id", self.project.id) + + occurrence, group_info = self.process_occurrence( + event_id=event_data["event_id"], + project_id=project_id, + event_data=event_data, + ) + assert group_info is not None + + event = Event( + event_id=occurrence.event_id, + project_id=project_id, + data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)), + ) + group_event = event.for_group(group_info.group) + group_event.occurrence = occurrence + + with self.options({"eventstream.eap_forwarding_rate": 0.0}): + with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item: + es.insert( + group_event, + is_new=True, + is_regression=True, + is_new_group_environment=False, + primary_hash="", + skip_consume=False, + received_timestamp=event_data["timestamp"], + ) + mock_send_item.assert_not_called() From 56ba0d2acd8216a2036f867cf0af8ee26f088bac Mon Sep 17 00:00:00 2001 From: Shashank Jarmale Date: Thu, 20 Nov 2025 11:59:16 -0800 Subject: [PATCH 2/2] Specify Snuba HTTP backend in metrics & logs --- src/sentry/eventstream/snuba.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/sentry/eventstream/snuba.py b/src/sentry/eventstream/snuba.py index 96042b1a33051c..2a250a0198f78a 100644 --- a/src/sentry/eventstream/snuba.py +++ b/src/sentry/eventstream/snuba.py @@ -503,30 +503,41 @@ def _send_item(self, trace_item: TraceItem) -> None: ) if resp.status == 200: - metrics.incr("eventstream.eap.occurrence_insert.success") + metrics.incr( + "eventstream.eap.occurrence_insert.success", + tags={"backend": "snuba_http"}, + ) else: logger.warning( - "Failed to insert EAP occurrence item", + "Failed to insert EAP occurrence item via Snuba HTTP", extra={ "status": resp.status, "organization_id": trace_item.organization_id, "project_id": trace_item.project_id, "item_id": trace_item.item_id.decode("utf-8"), "trace_id": trace_item.trace_id, + "backend": "snuba_http", }, ) - metrics.incr("eventstream.eap.occurrence_insert.failure") + metrics.incr( + "eventstream.eap.occurrence_insert.failure", + tags={"backend": "snuba_http"}, + ) except Exception: logger.exception( - "Exception while inserting EAP occurrence item", + "Exception while inserting EAP occurrence item via Snuba HTTP", extra={ "organization_id": trace_item.organization_id, "project_id": trace_item.project_id, "item_id": trace_item.item_id.decode("utf-8"), "trace_id": trace_item.trace_id, + "backend": "snuba_http", }, ) - metrics.incr("eventstream.eap.occurrence_insert.failure") + metrics.incr( + "eventstream.eap.occurrence_insert.failure", + tags={"backend": "snuba_http"}, + ) def requires_post_process_forwarder(self) -> bool: return False