Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion src/sentry/eventstream/snuba.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sentry.models.project import Project
from sentry.options.rollout import in_rollout_group
from sentry.services.eventstore.models import GroupEvent
from sentry.utils import json, snuba
from sentry.utils import json, metrics, snuba
from sentry.utils.safe import get_path
from sentry.utils.sdk import set_current_event_project

Expand Down Expand Up @@ -494,6 +494,51 @@ def _send(
except urllib3.exceptions.HTTPError as err:
raise snuba.SnubaError(err)

def _send_item(self, trace_item: TraceItem) -> None:
try:
resp = snuba._snuba_pool.urlopen(
"POST",
"/tests/entities/eap_items/insert_bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is not the only place we use this URL in the repo... do you think it would be appropriate to break out to a constant? Maybe in a followup PR, probably not worth doing here/now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea. Will make that change in a follow-up PR.

fields={"item_0": trace_item.SerializeToString()},
)

if resp.status == 200:
metrics.incr(
"eventstream.eap.occurrence_insert.success",
tags={"backend": "snuba_http"},
)
else:
logger.warning(
"Failed to insert EAP occurrence item via Snuba HTTP",
extra={
"status": resp.status,
"organization_id": trace_item.organization_id,
"project_id": trace_item.project_id,
"item_id": trace_item.item_id.decode("utf-8"),
"trace_id": trace_item.trace_id,
"backend": "snuba_http",
},
)
metrics.incr(
"eventstream.eap.occurrence_insert.failure",
tags={"backend": "snuba_http"},
)
except Exception:
logger.exception(
"Exception while inserting EAP occurrence item via Snuba HTTP",
extra={
"organization_id": trace_item.organization_id,
"project_id": trace_item.project_id,
"item_id": trace_item.item_id.decode("utf-8"),
"trace_id": trace_item.trace_id,
"backend": "snuba_http",
},
)
metrics.incr(
"eventstream.eap.occurrence_insert.failure",
tags={"backend": "snuba_http"},
)

def requires_post_process_forwarder(self) -> bool:
return False

Expand Down
92 changes: 92 additions & 0 deletions tests/sentry/eventstream/test_eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,95 @@ def test_event_forwarding_to_items(self):
assert trace_item.organization_id == event.project.organization_id
assert trace_item.retention_days == 90
assert trace_item.attributes["group_id"].int_value == group_info.group.id

def test_snuba_event_stream_forwarding_to_items(self):
create_default_projects()
es = SnubaEventStream()

# Prepare a generic event with a span item
profile_message = load_data("generic-event-profiling")
event_data = {
**profile_message["event"],
"contexts": {"trace": {"trace_id": uuid.uuid4().hex}},
"timestamp": timezone.now().isoformat(),
}
project_id = event_data.get("project_id", self.project.id)

occurrence, group_info = self.process_occurrence(
event_id=event_data["event_id"],
project_id=project_id,
event_data=event_data,
)
assert group_info is not None

event = Event(
event_id=occurrence.event_id,
project_id=project_id,
data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)),
)
group_event = event.for_group(group_info.group)
group_event.occurrence = occurrence

with self.options({"eventstream.eap_forwarding_rate": 1.0}):
# Mock both _send and _send_item to avoid schema validation and verify EAP forwarding
with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item:
es.insert(
group_event,
is_new=True,
is_regression=True,
is_new_group_environment=False,
primary_hash="",
skip_consume=False,
received_timestamp=event_data["timestamp"],
)
mock_send_item.assert_called_once()

trace_item = mock_send_item.call_args[0][0]
assert trace_item.item_id == event.event_id.encode("utf-8")
assert trace_item.item_type == TRACE_ITEM_TYPE_OCCURRENCE
assert trace_item.trace_id == event_data["contexts"]["trace"]["trace_id"]
assert trace_item.project_id == event.project_id
assert trace_item.organization_id == event.project.organization_id
assert trace_item.retention_days == 90
assert trace_item.attributes["group_id"].int_value == group_info.group.id

def test_snuba_event_stream_no_forwarding_when_rate_zero(self):
create_default_projects()
es = SnubaEventStream()

# Prepare a generic event with a span item
profile_message = load_data("generic-event-profiling")
event_data = {
**profile_message["event"],
"contexts": {"trace": {"trace_id": uuid.uuid4().hex}},
"timestamp": timezone.now().isoformat(),
}
project_id = event_data.get("project_id", self.project.id)

occurrence, group_info = self.process_occurrence(
event_id=event_data["event_id"],
project_id=project_id,
event_data=event_data,
)
assert group_info is not None

event = Event(
event_id=occurrence.event_id,
project_id=project_id,
data=nodestore.backend.get(Event.generate_node_id(project_id, occurrence.event_id)),
)
group_event = event.for_group(group_info.group)
group_event.occurrence = occurrence

with self.options({"eventstream.eap_forwarding_rate": 0.0}):
with patch.object(es, "_send"), patch.object(es, "_send_item") as mock_send_item:
es.insert(
group_event,
is_new=True,
is_regression=True,
is_new_group_environment=False,
primary_hash="",
skip_consume=False,
received_timestamp=event_data["timestamp"],
)
mock_send_item.assert_not_called()
Loading