diff --git a/src/sentry/eventstream/snuba.py b/src/sentry/eventstream/snuba.py index 594ebdcb19e1da..2a250a0198f78a 100644 --- a/src/sentry/eventstream/snuba.py +++ b/src/sentry/eventstream/snuba.py @@ -17,7 +17,7 @@ from sentry.models.project import Project from sentry.options.rollout import in_rollout_group from sentry.services.eventstore.models import GroupEvent -from sentry.utils import json, snuba +from sentry.utils import json, metrics, snuba from sentry.utils.safe import get_path from sentry.utils.sdk import set_current_event_project @@ -494,6 +494,51 @@ def _send( except urllib3.exceptions.HTTPError as err: raise snuba.SnubaError(err) + def _send_item(self, trace_item: TraceItem) -> None: + try: + resp = snuba._snuba_pool.urlopen( + "POST", + "/tests/entities/eap_items/insert_bytes", + fields={"item_0": trace_item.SerializeToString()}, + ) + + if resp.status == 200: + metrics.incr( + "eventstream.eap.occurrence_insert.success", + tags={"backend": "snuba_http"}, + ) + else: + logger.warning( + "Failed to insert EAP occurrence item via Snuba HTTP", + extra={ + "status": resp.status, + "organization_id": trace_item.organization_id, + "project_id": trace_item.project_id, + "item_id": trace_item.item_id.decode("utf-8"), + "trace_id": trace_item.trace_id, + "backend": "snuba_http", + }, + ) + metrics.incr( + "eventstream.eap.occurrence_insert.failure", + tags={"backend": "snuba_http"}, + ) + except Exception: + logger.exception( + "Exception while inserting EAP occurrence item via Snuba HTTP", + extra={ + "organization_id": trace_item.organization_id, + "project_id": trace_item.project_id, + "item_id": trace_item.item_id.decode("utf-8"), + "trace_id": trace_item.trace_id, + "backend": "snuba_http", + }, + ) + metrics.incr( + "eventstream.eap.occurrence_insert.failure", + tags={"backend": "snuba_http"}, + ) + def requires_post_process_forwarder(self) -> bool: return False diff --git a/tests/sentry/eventstream/test_eventstream.py b/tests/sentry/eventstream/test_eventstream.py index 52182b729e621a..0c1cbe66eddf7c 100644 --- a/tests/sentry/eventstream/test_eventstream.py +++ b/tests/sentry/eventstream/test_eventstream.py @@ -466,3 +466,95 @@ def test_event_forwarding_to_items(self): assert trace_item.organization_id == event.project.organization_id assert trace_item.retention_days == 90 assert trace_item.attributes["group_id"].int_value == group_info.group.id + + def test_snuba_event_stream_forwarding_to_items(self): + create_default_projects() + es = SnubaEventStream() + + # Prepare a generic event with a span item + profile_message = load_data("generic-event-profiling") + event_data = { + **profile_message["event"], + "contexts": {"trace": {"trace_id": uuid.uuid4().hex}}, + "timestamp": timezone.now().isoformat(), + } + project_id = event_data.get("project_id", self.project.id) + + occurrence, group_info = self.process_occurrence( + event_id=event_data["event_id"], + project_id=project_id, + event_data=event_data, + ) + assert group_info is not None + + event = Event( + event_id=occurrence.event_id, + project_id=project_id, + data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)), + ) + group_event = event.for_group(group_info.group) + group_event.occurrence = occurrence + + with self.options({"eventstream.eap_forwarding_rate": 1.0}): + # Mock both _send and _send_item to avoid schema validation and verify EAP forwarding + with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item: + es.insert( + group_event, + is_new=True, + is_regression=True, + is_new_group_environment=False, + primary_hash="", + skip_consume=False, + received_timestamp=event_data["timestamp"], + ) + mock_send_item.assert_called_once() + + trace_item = mock_send_item.call_args[0][0] + assert trace_item.item_id == event.event_id.encode("utf-8") + assert trace_item.item_type == TRACE_ITEM_TYPE_OCCURRENCE + assert trace_item.trace_id == event_data["contexts"]["trace"]["trace_id"] + assert trace_item.project_id == event.project_id + assert trace_item.organization_id == event.project.organization_id + assert trace_item.retention_days == 90 + assert trace_item.attributes["group_id"].int_value == group_info.group.id + + def test_snuba_event_stream_no_forwarding_when_rate_zero(self): + create_default_projects() + es = SnubaEventStream() + + # Prepare a generic event with a span item + profile_message = load_data("generic-event-profiling") + event_data = { + **profile_message["event"], + "contexts": {"trace": {"trace_id": uuid.uuid4().hex}}, + "timestamp": timezone.now().isoformat(), + } + project_id = event_data.get("project_id", self.project.id) + + occurrence, group_info = self.process_occurrence( + event_id=event_data["event_id"], + project_id=project_id, + event_data=event_data, + ) + assert group_info is not None + + event = Event( + event_id=occurrence.event_id, + project_id=project_id, + data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)), + ) + group_event = event.for_group(group_info.group) + group_event.occurrence = occurrence + + with self.options({"eventstream.eap_forwarding_rate": 0.0}): + with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item: + es.insert( + group_event, + is_new=True, + is_regression=True, + is_new_group_environment=False, + primary_hash="", + skip_consume=False, + received_timestamp=event_data["timestamp"], + ) + mock_send_item.assert_not_called()