From d21d0856f48ae158f38f9afe13e882ddadb1e567 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Tue, 18 Nov 2025 11:46:56 -0800 Subject: [PATCH 1/9] feat(aci): Add serialized data source to issue occurrence evidence data --- src/sentry/incidents/grouptype.py | 2 ++ .../handlers/detector/stateful.py | 34 ++++++++++++++++++- .../workflow_engine/processors/data_source.py | 2 +- .../test_metric_issue_detector_handler.py | 22 ++++++++++++ .../processors/test_data_sources.py | 10 ++++-- 5 files changed, 66 insertions(+), 4 deletions(-) diff --git a/src/sentry/incidents/grouptype.py b/src/sentry/incidents/grouptype.py index ea36e0542008f6..6c1fedb15adc35 100644 --- a/src/sentry/incidents/grouptype.py +++ b/src/sentry/incidents/grouptype.py @@ -51,6 +51,8 @@ @dataclass class MetricIssueEvidenceData(EvidenceData[MetricResult]): alert_id: int + # Optional: include a generic data source definition used by the detector + data_source_definition: dict[str, Any] | None = None class SessionsAggregate(StrEnum): diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index 230e3ccbbc0524..9a4d190a37fda3 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -9,6 +9,7 @@ from django.db.models import Q from sentry_redis_tools.retrying_cluster import RetryingRedisCluster +from sentry.api.serializers import serialize from sentry.issues.issue_occurrence import IssueOccurrence from sentry.issues.status_change_message import StatusChangeMessage from sentry.models.group import GroupStatus @@ -353,6 +354,34 @@ def build_detector_evidence_data( """ return {} + def _build_data_source_definition( + self, data_packet: DataPacket[DataPacketType] + ) -> dict[str, Any]: + try: + data_source = next( + ( + ds + for ds in self.detector.data_sources.all() + if ds.source_id == data_packet.source_id + ), + None, + ) + if not data_source: + logger.warning( + "Matching data source not found for detector while generating occurrence evidence data", + extra={ + "detector_id": self.detector.id, + "data_packet_source_id": data_packet.source_id, + }, + ) + return None + return serialize(data_source) if data_source else None + except Exception: + logger.exception( + "Failed to serialize data source definition when building workflow engine evidence data" + ) + return None + def _build_workflow_engine_evidence_data( self, evaluation_result: ProcessedDataConditionGroup, @@ -363,15 +392,18 @@ def _build_workflow_engine_evidence_data( Build the workflow engine specific evidence data. This is data that is common to all detectors. """ - return { + base: dict[str, Any] = { "detector_id": self.detector.id, "value": evaluation_value, "data_packet_source_id": str(data_packet.source_id), "conditions": [ result.condition.get_snapshot() for result in evaluation_result.condition_results ], + "data_source_definition": self._build_data_source_definition(data_packet), } + return base + def evaluate_impl( self, data_packet: DataPacket[DataPacketType] ) -> GroupedDetectorEvaluationResult: diff --git a/src/sentry/workflow_engine/processors/data_source.py b/src/sentry/workflow_engine/processors/data_source.py index a5981b0356b338..2b764bc18bec5b 100644 --- a/src/sentry/workflow_engine/processors/data_source.py +++ b/src/sentry/workflow_engine/processors/data_source.py @@ -18,7 +18,7 @@ def bulk_fetch_enabled_detectors(source_id: str, query_type: str) -> list[Detect enabled=True, data_sources__source_id=source_id, data_sources__type=query_type ) .select_related("workflow_condition_group") - .prefetch_related("workflow_condition_group__conditions") + .prefetch_related("workflow_condition_group__conditions", "data_sources") .distinct() .order_by("id") ) diff --git a/tests/sentry/incidents/test_metric_issue_detector_handler.py b/tests/sentry/incidents/test_metric_issue_detector_handler.py index 435434ba388025..c37607d7b0347e 100644 --- a/tests/sentry/incidents/test_metric_issue_detector_handler.py +++ b/tests/sentry/incidents/test_metric_issue_detector_handler.py @@ -24,6 +24,7 @@ def generate_evidence_data( detector_trigger: DataCondition, extra_trigger: DataCondition | None = None, ): + self.query_subscription.refresh_from_db() conditions = [ { @@ -50,6 +51,27 @@ def generate_evidence_data( "alert_id": self.alert_rule.id, "data_packet_source_id": str(self.query_subscription.id), "conditions": conditions, + "data_source_definition": { + "id": str(self.data_source.id), + "organizationId": str(self.organization.id), + "type": self.data_source.type, + "sourceId": str(self.query_subscription.id), + "queryObj": { + "id": str(self.query_subscription.id), + "status": self.query_subscription.status, + "subscription": self.query_subscription.subscription_id, + "snubaQuery": { + "id": str(self.snuba_query.id), + "dataset": self.snuba_query.dataset, + "query": self.snuba_query.query, + "aggregate": self.snuba_query.aggregate, + "timeWindow": self.snuba_query.time_window, + "environment": self.environment.name, + "eventTypes": ["error"], + "extrapolationMode": "unknown", + }, + }, + }, } return evidence_data diff --git a/tests/sentry/workflow_engine/processors/test_data_sources.py b/tests/sentry/workflow_engine/processors/test_data_sources.py index 2150d355c78152..ffb83b5778c450 100644 --- a/tests/sentry/workflow_engine/processors/test_data_sources.py +++ b/tests/sentry/workflow_engine/processors/test_data_sources.py @@ -108,11 +108,12 @@ def test_metrics_for_many_detectors(self) -> None: ) def test_sql_cascades(self) -> None: - with self.assertNumQueries(2): + with self.assertNumQueries(3): """ - There should be 2 total SQL queries for `bulk_fetch_enabled_detectors`: + There should be 3 total SQL queries for `bulk_fetch_enabled_detectors`: - Get the detector and data condition group associated with it - Get all the data conditions for the group + - Get all the data sources for the detectors """ _, detectors = process_data_source(self.two_detector_packet, "test") # If the detector is not prefetched this will increase the query count @@ -126,3 +127,8 @@ def test_sql_cascades(self) -> None: for condition in detector.workflow_condition_group.conditions.all(): # Trigger a SQL query if not prefetched, and fail the assertion assert condition.id is not None + + # Verify data sources are prefetched + for data_source in detector.data_sources.all(): + # Trigger a SQL query if not prefetched, and fail the assertion + assert data_source.id is not None From dfebb5f0959b2a5f95d5395b67fad937ce1ce95d Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Tue, 18 Nov 2025 12:00:17 -0800 Subject: [PATCH 2/9] Move type def to EvidenceData dataclass --- src/sentry/incidents/grouptype.py | 2 -- src/sentry/workflow_engine/handlers/detector/base.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sentry/incidents/grouptype.py b/src/sentry/incidents/grouptype.py index 6c1fedb15adc35..ea36e0542008f6 100644 --- a/src/sentry/incidents/grouptype.py +++ b/src/sentry/incidents/grouptype.py @@ -51,8 +51,6 @@ @dataclass class MetricIssueEvidenceData(EvidenceData[MetricResult]): alert_id: int - # Optional: include a generic data source definition used by the detector - data_source_definition: dict[str, Any] | None = None class SessionsAggregate(StrEnum): diff --git a/src/sentry/workflow_engine/handlers/detector/base.py b/src/sentry/workflow_engine/handlers/detector/base.py index f33787eff12c33..7f4d3f013aceaa 100644 --- a/src/sentry/workflow_engine/handlers/detector/base.py +++ b/src/sentry/workflow_engine/handlers/detector/base.py @@ -34,6 +34,7 @@ class EvidenceData(Generic[DataPacketEvaluationType]): detector_id: int data_packet_source_id: int conditions: list[dict[str, Any]] + data_source_definition: dict[str, Any] | None @dataclasses.dataclass(frozen=True, kw_only=True) From 0bcc481d7a37f22c2e7b3de6c982416b990ac381 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Tue, 18 Nov 2025 12:51:58 -0800 Subject: [PATCH 3/9] Fix tests so that they include data_source_definition --- .../notification_action/test_metric_alert_registry_handlers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py b/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py index 9712b2b38ef9b5..7ac0bb80cda764 100644 --- a/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py +++ b/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py @@ -101,6 +101,7 @@ def create_models(self): "condition_result": DetectorPriorityLevel.OK.value, }, ], + data_source_definition=None, alert_id=self.alert_rule.id, ) @@ -126,6 +127,7 @@ def create_models(self): "condition_result": DetectorPriorityLevel.HIGH.value, }, ], + data_source_definition=None, alert_id=self.alert_rule.id, ) self.group, self.event, self.group_event = self.create_group_event( From 2a5aed3a627810f6cdc514571cd14aa9cbcd6ec3 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Tue, 18 Nov 2025 14:30:13 -0800 Subject: [PATCH 4/9] Fix type errors --- src/sentry/workflow_engine/handlers/detector/stateful.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index 9a4d190a37fda3..b1e155535f6942 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -356,7 +356,7 @@ def build_detector_evidence_data( def _build_data_source_definition( self, data_packet: DataPacket[DataPacketType] - ) -> dict[str, Any]: + ) -> dict[str, Any] | None: try: data_source = next( ( From 0c5c0c4bf559e8df087f21e77c1e90999036d355 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Thu, 20 Nov 2025 10:22:22 -0800 Subject: [PATCH 5/9] Make query for data source when building the occurrence data --- .../workflow_engine/handlers/detector/stateful.py | 15 +++++---------- .../workflow_engine/processors/data_source.py | 2 +- .../processors/test_data_sources.py | 10 ++-------- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index b1e155535f6942..115e66c0b6fc57 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -22,7 +22,7 @@ EventData, GroupedDetectorEvaluationResult, ) -from sentry.workflow_engine.models import DataPacket, Detector, DetectorState +from sentry.workflow_engine.models import DataPacket, DataSource, Detector, DetectorState from sentry.workflow_engine.processors.data_condition_group import ( ProcessedDataConditionGroup, process_data_condition_group, @@ -358,14 +358,9 @@ def _build_data_source_definition( self, data_packet: DataPacket[DataPacketType] ) -> dict[str, Any] | None: try: - data_source = next( - ( - ds - for ds in self.detector.data_sources.all() - if ds.source_id == data_packet.source_id - ), - None, - ) + data_source = DataSource.objects.filter( + detectors=self.detector, source_id=data_packet.source_id + ).first() if not data_source: logger.warning( "Matching data source not found for detector while generating occurrence evidence data", @@ -375,7 +370,7 @@ def _build_data_source_definition( }, ) return None - return serialize(data_source) if data_source else None + return serialize(data_source) except Exception: logger.exception( "Failed to serialize data source definition when building workflow engine evidence data" diff --git a/src/sentry/workflow_engine/processors/data_source.py b/src/sentry/workflow_engine/processors/data_source.py index 2b764bc18bec5b..a5981b0356b338 100644 --- a/src/sentry/workflow_engine/processors/data_source.py +++ b/src/sentry/workflow_engine/processors/data_source.py @@ -18,7 +18,7 @@ def bulk_fetch_enabled_detectors(source_id: str, query_type: str) -> list[Detect enabled=True, data_sources__source_id=source_id, data_sources__type=query_type ) .select_related("workflow_condition_group") - .prefetch_related("workflow_condition_group__conditions", "data_sources") + .prefetch_related("workflow_condition_group__conditions") .distinct() .order_by("id") ) diff --git a/tests/sentry/workflow_engine/processors/test_data_sources.py b/tests/sentry/workflow_engine/processors/test_data_sources.py index ffb83b5778c450..2150d355c78152 100644 --- a/tests/sentry/workflow_engine/processors/test_data_sources.py +++ b/tests/sentry/workflow_engine/processors/test_data_sources.py @@ -108,12 +108,11 @@ def test_metrics_for_many_detectors(self) -> None: ) def test_sql_cascades(self) -> None: - with self.assertNumQueries(3): + with self.assertNumQueries(2): """ - There should be 3 total SQL queries for `bulk_fetch_enabled_detectors`: + There should be 2 total SQL queries for `bulk_fetch_enabled_detectors`: - Get the detector and data condition group associated with it - Get all the data conditions for the group - - Get all the data sources for the detectors """ _, detectors = process_data_source(self.two_detector_packet, "test") # If the detector is not prefetched this will increase the query count @@ -127,8 +126,3 @@ def test_sql_cascades(self) -> None: for condition in detector.workflow_condition_group.conditions.all(): # Trigger a SQL query if not prefetched, and fail the assertion assert condition.id is not None - - # Verify data sources are prefetched - for data_source in detector.data_sources.all(): - # Trigger a SQL query if not prefetched, and fail the assertion - assert data_source.id is not None From 64225fee167c9506e02cc443185c3543a2c74f12 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Fri, 21 Nov 2025 09:53:53 -0800 Subject: [PATCH 6/9] Serialize the list of data_sources --- .../workflow_engine/handlers/detector/base.py | 2 +- .../handlers/detector/stateful.py | 16 +++---- .../test_metric_issue_detector_handler.py | 42 ++++++++++--------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/sentry/workflow_engine/handlers/detector/base.py b/src/sentry/workflow_engine/handlers/detector/base.py index 7f4d3f013aceaa..08417f6996e6e3 100644 --- a/src/sentry/workflow_engine/handlers/detector/base.py +++ b/src/sentry/workflow_engine/handlers/detector/base.py @@ -34,7 +34,7 @@ class EvidenceData(Generic[DataPacketEvaluationType]): detector_id: int data_packet_source_id: int conditions: list[dict[str, Any]] - data_source_definition: dict[str, Any] | None + data_sources: list[dict[str, Any]] @dataclasses.dataclass(frozen=True, kw_only=True) diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index 115e66c0b6fc57..9cb4eae43424bc 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -356,12 +356,12 @@ def build_detector_evidence_data( def _build_data_source_definition( self, data_packet: DataPacket[DataPacketType] - ) -> dict[str, Any] | None: + ) -> list[dict[str, Any]] | None: try: - data_source = DataSource.objects.filter( - detectors=self.detector, source_id=data_packet.source_id - ).first() - if not data_source: + data_sources = list( + DataSource.objects.filter(detectors=self.detector, source_id=data_packet.source_id) + ) + if not data_sources: logger.warning( "Matching data source not found for detector while generating occurrence evidence data", extra={ @@ -369,8 +369,8 @@ def _build_data_source_definition( "data_packet_source_id": data_packet.source_id, }, ) - return None - return serialize(data_source) + return [] + return serialize(data_sources) except Exception: logger.exception( "Failed to serialize data source definition when building workflow engine evidence data" @@ -394,7 +394,7 @@ def _build_workflow_engine_evidence_data( "conditions": [ result.condition.get_snapshot() for result in evaluation_result.condition_results ], - "data_source_definition": self._build_data_source_definition(data_packet), + "data_sources": self._build_data_source_definition(data_packet), } return base diff --git a/tests/sentry/incidents/test_metric_issue_detector_handler.py b/tests/sentry/incidents/test_metric_issue_detector_handler.py index c37607d7b0347e..a5e69543a56494 100644 --- a/tests/sentry/incidents/test_metric_issue_detector_handler.py +++ b/tests/sentry/incidents/test_metric_issue_detector_handler.py @@ -51,27 +51,29 @@ def generate_evidence_data( "alert_id": self.alert_rule.id, "data_packet_source_id": str(self.query_subscription.id), "conditions": conditions, - "data_source_definition": { - "id": str(self.data_source.id), - "organizationId": str(self.organization.id), - "type": self.data_source.type, - "sourceId": str(self.query_subscription.id), - "queryObj": { - "id": str(self.query_subscription.id), - "status": self.query_subscription.status, - "subscription": self.query_subscription.subscription_id, - "snubaQuery": { - "id": str(self.snuba_query.id), - "dataset": self.snuba_query.dataset, - "query": self.snuba_query.query, - "aggregate": self.snuba_query.aggregate, - "timeWindow": self.snuba_query.time_window, - "environment": self.environment.name, - "eventTypes": ["error"], - "extrapolationMode": "unknown", + "data_sources": [ + { + "id": str(self.data_source.id), + "organizationId": str(self.organization.id), + "type": self.data_source.type, + "sourceId": str(self.query_subscription.id), + "queryObj": { + "id": str(self.query_subscription.id), + "status": self.query_subscription.status, + "subscription": self.query_subscription.subscription_id, + "snubaQuery": { + "id": str(self.snuba_query.id), + "dataset": self.snuba_query.dataset, + "query": self.snuba_query.query, + "aggregate": self.snuba_query.aggregate, + "timeWindow": self.snuba_query.time_window, + "environment": self.environment.name, + "eventTypes": ["error"], + "extrapolationMode": "unknown", + }, }, - }, - }, + } + ], } return evidence_data From bf0835b5523527d0a08fa324a9cd2830e9a92906 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Fri, 21 Nov 2025 10:00:19 -0800 Subject: [PATCH 7/9] Default data_sources to empty list --- src/sentry/workflow_engine/handlers/detector/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/workflow_engine/handlers/detector/base.py b/src/sentry/workflow_engine/handlers/detector/base.py index 08417f6996e6e3..be3f27cdd98cd2 100644 --- a/src/sentry/workflow_engine/handlers/detector/base.py +++ b/src/sentry/workflow_engine/handlers/detector/base.py @@ -34,7 +34,7 @@ class EvidenceData(Generic[DataPacketEvaluationType]): detector_id: int data_packet_source_id: int conditions: list[dict[str, Any]] - data_sources: list[dict[str, Any]] + data_sources: list[dict[str, Any]] = dataclasses.field(default_factory=list, kw_only=True) @dataclasses.dataclass(frozen=True, kw_only=True) From 9f39d355e28227dbf2383a25ba8f30cecf2eb4b7 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Fri, 21 Nov 2025 10:04:09 -0800 Subject: [PATCH 8/9] Rename outdated variables --- src/sentry/workflow_engine/handlers/detector/stateful.py | 4 ++-- .../test_metric_alert_registry_handlers.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index 9cb4eae43424bc..910affa46553fc 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -354,7 +354,7 @@ def build_detector_evidence_data( """ return {} - def _build_data_source_definition( + def _build_evidence_data_sources( self, data_packet: DataPacket[DataPacketType] ) -> list[dict[str, Any]] | None: try: @@ -394,7 +394,7 @@ def _build_workflow_engine_evidence_data( "conditions": [ result.condition.get_snapshot() for result in evaluation_result.condition_results ], - "data_sources": self._build_data_source_definition(data_packet), + "data_sources": self._build_evidence_data_sources(data_packet), } return base diff --git a/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py b/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py index 7ac0bb80cda764..b75c0fda9b9d61 100644 --- a/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py +++ b/tests/sentry/notifications/notification_action/test_metric_alert_registry_handlers.py @@ -101,7 +101,7 @@ def create_models(self): "condition_result": DetectorPriorityLevel.OK.value, }, ], - data_source_definition=None, + data_sources=[], alert_id=self.alert_rule.id, ) @@ -127,7 +127,7 @@ def create_models(self): "condition_result": DetectorPriorityLevel.HIGH.value, }, ], - data_source_definition=None, + data_sources=[], alert_id=self.alert_rule.id, ) self.group, self.event, self.group_event = self.create_group_event( From 3d4ea80c8cb7513bff2d3bc59c5ccc83af8a7127 Mon Sep 17 00:00:00 2001 From: Malachi Willey Date: Fri, 21 Nov 2025 10:04:52 -0800 Subject: [PATCH 9/9] Return [] instead of None --- src/sentry/workflow_engine/handlers/detector/stateful.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/workflow_engine/handlers/detector/stateful.py b/src/sentry/workflow_engine/handlers/detector/stateful.py index 910affa46553fc..48cc3110443576 100644 --- a/src/sentry/workflow_engine/handlers/detector/stateful.py +++ b/src/sentry/workflow_engine/handlers/detector/stateful.py @@ -356,7 +356,7 @@ def build_detector_evidence_data( def _build_evidence_data_sources( self, data_packet: DataPacket[DataPacketType] - ) -> list[dict[str, Any]] | None: + ) -> list[dict[str, Any]]: try: data_sources = list( DataSource.objects.filter(detectors=self.detector, source_id=data_packet.source_id) @@ -375,7 +375,7 @@ def _build_evidence_data_sources( logger.exception( "Failed to serialize data source definition when building workflow engine evidence data" ) - return None + return [] def _build_workflow_engine_evidence_data( self,