From 32e70226ba3cb68566b2eacb25717f9ad7d724de Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Fri, 7 Nov 2025 16:05:14 -0800 Subject: [PATCH 1/4] fix(aci): Implement DataSource.normalize_before_relocation_import --- src/sentry/monitors/models.py | 7 ++ src/sentry/snuba/models.py | 7 ++ src/sentry/uptime/models.py | 7 ++ .../workflow_engine/models/data_source.py | 51 ++++++++++++- src/sentry/workflow_engine/types.py | 16 +++- .../models/test_data_source.py | 74 +++++++++++++++++++ 6 files changed, 160 insertions(+), 2 deletions(-) diff --git a/src/sentry/monitors/models.py b/src/sentry/monitors/models.py index d775079571d6d4..79d6ced545f699 100644 --- a/src/sentry/monitors/models.py +++ b/src/sentry/monitors/models.py @@ -814,6 +814,7 @@ class Meta: @data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR) class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]): + @override @staticmethod def bulk_get_query_object( data_sources: list[DataSource], @@ -834,6 +835,7 @@ def bulk_get_query_object( } return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources} + @override @staticmethod def related_model(instance) -> list[ModelRelation]: return [ModelRelation(Monitor, {"id": instance.source_id})] @@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None: def get_current_instance_count(org: Organization) -> int: # We don't have a limit at the moment, so no need to count. raise NotImplementedError + + @override + @staticmethod + def get_relocation_model_name() -> str: + return "monitors.monitor" diff --git a/src/sentry/snuba/models.py b/src/sentry/snuba/models.py index 0c0b5aae2ded6e..03fa3989a8c433 100644 --- a/src/sentry/snuba/models.py +++ b/src/sentry/snuba/models.py @@ -182,6 +182,7 @@ def write_relocation_import( @data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION) class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]): + @override @staticmethod def bulk_get_query_object( data_sources: list[DataSource], @@ -203,6 +204,7 @@ def bulk_get_query_object( } return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources} + @override @staticmethod def related_model(instance) -> list[ModelRelation]: return [ModelRelation(QuerySubscription, {"id": instance.source_id})] @@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int: QuerySubscription.Status.UPDATING.value, ), ).count() + + @override + @staticmethod + def get_relocation_model_name() -> str: + return "sentry.querysubscription" diff --git a/src/sentry/uptime/models.py b/src/sentry/uptime/models.py index eea14e2e713a12..94084292380bdc 100644 --- a/src/sentry/uptime/models.py +++ b/src/sentry/uptime/models.py @@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum): @data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION) class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]): + @override @staticmethod def bulk_get_query_object( data_sources: list[DataSource], @@ -210,6 +211,7 @@ def bulk_get_query_object( } return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources} + @override @staticmethod def related_model(instance) -> list[ModelRelation]: return [ModelRelation(UptimeSubscription, {"id": instance.source_id})] @@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int: # We don't have a limit at the moment, so no need to count. raise NotImplementedError + @override + @staticmethod + def get_relocation_model_name() -> str: + return "uptime.uptimesubscription" + def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector: """ diff --git a/src/sentry/workflow_engine/models/data_source.py b/src/sentry/workflow_engine/models/data_source.py index f0c1220dd2f678..ae79837315a0cb 100644 --- a/src/sentry/workflow_engine/models/data_source.py +++ b/src/sentry/workflow_engine/models/data_source.py @@ -1,18 +1,23 @@ import builtins import dataclasses +import logging from typing import Generic, TypeVar from django.db import models from django.db.models.signals import pre_save from django.dispatch import receiver -from sentry.backup.scopes import RelocationScope +from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap +from sentry.backup.helpers import ImportFlags +from sentry.backup.scopes import ImportScope, RelocationScope from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model from sentry.utils.registry import NoRegistrationExistsError from sentry.workflow_engine.models.data_source_detector import DataSourceDetector from sentry.workflow_engine.registry import data_source_type_registry from sentry.workflow_engine.types import DataSourceTypeHandler +logger = logging.getLogger(__name__) + T = TypeVar("T") @@ -25,6 +30,13 @@ class DataPacket(Generic[T]): @region_silo_model class DataSource(DefaultFieldsModel): __relocation_scope__ = RelocationScope.Organization + # DataSource.source_id dynamically references different models based on the 'type' field. + # We declare all possible dependencies here to ensure proper import ordering. + __relocation_dependencies__ = { + "monitors.monitor", # For DATA_SOURCE_CRON_MONITOR + "sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION + "uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION + } organization = FlexibleForeignKey("sentry.Organization") @@ -49,6 +61,43 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]: raise ValueError(f"Unknown data source type: {self.type}") return handler + def normalize_before_relocation_import( + self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags + ) -> int | None: + old_pk = super().normalize_before_relocation_import(pk_map, scope, flags) + if old_pk is None: + return None + + # Map source_id based on the data source type + try: + handler = data_source_type_registry.get(self.type) + model_name = NormalizedModelName(handler.get_relocation_model_name()) + old_source_id = int(self.source_id) + new_source_id = pk_map.get_pk(model_name, old_source_id) + + if new_source_id is not None: + self.source_id = str(new_source_id) + else: + # Referenced model not in pk_map. This may be correct (reset_pks=False) or broken + # (reset_pks=True but referenced model was filtered out or failed to import). + logger.warning( + "DataSource source_id not remapped - referenced model not in pk_map", + extra={ + "data_source_id": old_pk, + "type": self.type, + "source_id": old_source_id, + "model": str(model_name), + }, + ) + except Exception: + logger.exception( + "DataSource.normalize_before_relocation_import failed", + extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id}, + ) + return None + + return old_pk + @receiver(pre_save, sender=DataSource) def ensure_type_handler_registered(sender, instance: DataSource, **kwargs): diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index 1cbddbf85970e9..889219f526cb04 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -108,8 +108,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) - raise NotImplementedError -class DataSourceTypeHandler(Generic[T]): +class DataSourceTypeHandler(ABC, Generic[T]): @staticmethod + @abstractmethod def bulk_get_query_object(data_sources) -> dict[int, T | None]: """ Bulk fetch related data-source models returning a dict of the @@ -118,6 +119,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]: raise NotImplementedError @staticmethod + @abstractmethod def related_model(instance) -> list[ModelRelation]: """ A list of deletion ModelRelations. The model relation query should map @@ -127,6 +129,7 @@ def related_model(instance) -> list[ModelRelation]: raise NotImplementedError @staticmethod + @abstractmethod def get_instance_limit(org: Organization) -> int | None: """ Returns the maximum number of instances of this data source type for the organization. @@ -135,6 +138,7 @@ def get_instance_limit(org: Organization) -> int | None: raise NotImplementedError @staticmethod + @abstractmethod def get_current_instance_count(org: Organization) -> int: """ Returns the current number of instances of this data source type for the organization. @@ -142,6 +146,16 @@ def get_current_instance_count(org: Organization) -> int: """ raise NotImplementedError + @staticmethod + @abstractmethod + def get_relocation_model_name() -> str: + """ + Returns the normalized model name (e.g., "sentry.querysubscription") for the model that + source_id references. This is used during backup/relocation to map old PKs to new PKs. + The format is "app_label.model_name" in lowercase. + """ + raise NotImplementedError + class DataConditionHandler(Generic[T]): class Group(StrEnum): diff --git a/tests/sentry/workflow_engine/models/test_data_source.py b/tests/sentry/workflow_engine/models/test_data_source.py index a1019b1cc2b370..89fbe72d42fd90 100644 --- a/tests/sentry/workflow_engine/models/test_data_source.py +++ b/tests/sentry/workflow_engine/models/test_data_source.py @@ -2,6 +2,10 @@ import pytest +from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap +from sentry.backup.helpers import ImportFlags +from sentry.backup.scopes import ImportScope +from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR from sentry.workflow_engine.registry import data_source_type_registry from tests.sentry.workflow_engine.test_base import BaseWorkflowTest @@ -18,3 +22,73 @@ def test_data_source_valid_type(self) -> None: data_source = self.create_data_source(type="test") assert data_source is not None assert data_source.type == "test" + + def test_normalize_before_relocation_import(self) -> None: + """Test that normalize_before_relocation_import correctly maps source_id""" + monitor = self.create_monitor(project=self.project) + data_source = self.create_data_source( + type=DATA_SOURCE_CRON_MONITOR, + source_id=str(monitor.id), + organization_id=self.organization.id, + ) + + old_monitor_pk = monitor.id + new_monitor_pk = 9999 + old_data_source_id = data_source.id + old_org_id = data_source.organization_id + + # Create a PrimaryKeyMap that maps the old monitor ID to a new one + pk_map = PrimaryKeyMap() + pk_map.insert( + model_name=NormalizedModelName("monitors.monitor"), + old=old_monitor_pk, + new=new_monitor_pk, + kind=ImportKind.Inserted, + ) + pk_map.insert( + model_name=NormalizedModelName("sentry.organization"), + old=old_org_id, + new=old_org_id, + kind=ImportKind.Inserted, + ) + + old_data_source_pk = data_source.normalize_before_relocation_import( + pk_map, ImportScope.Organization, ImportFlags() + ) + + assert ( + old_data_source_pk == old_data_source_id + ), f"Expected {old_data_source_id}, got {old_data_source_pk}" + assert data_source.source_id == str(new_monitor_pk) + assert data_source.pk is None + + def test_normalize_before_relocation_import_missing_source(self) -> None: + """Test that normalize_before_relocation_import succeeds but doesn't update source_id if mapping not found""" + monitor = self.create_monitor(project=self.project) + data_source = self.create_data_source( + type=DATA_SOURCE_CRON_MONITOR, + source_id=str(monitor.id), + organization_id=self.organization.id, + ) + + old_source_id = data_source.source_id + old_data_source_id = data_source.id + old_org_id = data_source.organization_id + + # Create a PrimaryKeyMap without the monitor mapping + pk_map = PrimaryKeyMap() + pk_map.insert( + model_name=NormalizedModelName("sentry.organization"), + old=old_org_id, + new=old_org_id, + kind=ImportKind.Inserted, + ) + + result = data_source.normalize_before_relocation_import( + pk_map, ImportScope.Organization, ImportFlags() + ) + + # Should succeed but leave source_id unchanged + assert result == old_data_source_id + assert data_source.source_id == old_source_id + assert data_source.pk is None From cb15e1a49a74eeaf34020980224c6aada348d449 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Tue, 11 Nov 2025 15:02:02 -0800 Subject: [PATCH 2/4] make it correct --- .../workflow_engine/models/data_source.py | 19 +++++-------------- .../models/test_data_source.py | 10 ++-------- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/sentry/workflow_engine/models/data_source.py b/src/sentry/workflow_engine/models/data_source.py index ae79837315a0cb..4d1c4e6cc4e88c 100644 --- a/src/sentry/workflow_engine/models/data_source.py +++ b/src/sentry/workflow_engine/models/data_source.py @@ -75,20 +75,11 @@ def normalize_before_relocation_import( old_source_id = int(self.source_id) new_source_id = pk_map.get_pk(model_name, old_source_id) - if new_source_id is not None: - self.source_id = str(new_source_id) - else: - # Referenced model not in pk_map. This may be correct (reset_pks=False) or broken - # (reset_pks=True but referenced model was filtered out or failed to import). - logger.warning( - "DataSource source_id not remapped - referenced model not in pk_map", - extra={ - "data_source_id": old_pk, - "type": self.type, - "source_id": old_source_id, - "model": str(model_name), - }, - ) + if new_source_id is None: + # Referenced model not in pk_map - the source was filtered out or failed to import. + return None + + self.source_id = str(new_source_id) except Exception: logger.exception( "DataSource.normalize_before_relocation_import failed", diff --git a/tests/sentry/workflow_engine/models/test_data_source.py b/tests/sentry/workflow_engine/models/test_data_source.py index 89fbe72d42fd90..9f3d11cd4a66e0 100644 --- a/tests/sentry/workflow_engine/models/test_data_source.py +++ b/tests/sentry/workflow_engine/models/test_data_source.py @@ -24,7 +24,6 @@ def test_data_source_valid_type(self) -> None: assert data_source.type == "test" def test_normalize_before_relocation_import(self) -> None: - """Test that normalize_before_relocation_import correctly maps source_id""" monitor = self.create_monitor(project=self.project) data_source = self.create_data_source( type=DATA_SOURCE_CRON_MONITOR, @@ -63,7 +62,6 @@ def test_normalize_before_relocation_import(self) -> None: assert data_source.pk is None def test_normalize_before_relocation_import_missing_source(self) -> None: - """Test that normalize_before_relocation_import succeeds but doesn't update source_id if mapping not found""" monitor = self.create_monitor(project=self.project) data_source = self.create_data_source( type=DATA_SOURCE_CRON_MONITOR, @@ -71,8 +69,6 @@ def test_normalize_before_relocation_import_missing_source(self) -> None: organization_id=self.organization.id, ) - old_source_id = data_source.source_id - old_data_source_id = data_source.id old_org_id = data_source.organization_id # Create a PrimaryKeyMap without the monitor mapping @@ -88,7 +84,5 @@ def test_normalize_before_relocation_import_missing_source(self) -> None: pk_map, ImportScope.Organization, ImportFlags() ) - # Should succeed but leave source_id unchanged - assert result == old_data_source_id - assert data_source.source_id == old_source_id - assert data_source.pk is None + # Should return None when the referenced source is not in pk_map + assert result is None From 85d83f2ceb048a3a0aef0f84379d7a23545e67ff Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Wed, 12 Nov 2025 10:02:50 -0800 Subject: [PATCH 3/4] compromise field comparator --- src/sentry/backup/comparators.py | 16 +++++++++++++++- src/sentry/backup/findings.py | 7 +++++++ src/sentry/testutils/helpers/backups.py | 10 +++++++++- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/sentry/backup/comparators.py b/src/sentry/backup/comparators.py index bd2ca5ec44fa23..5636fbdf08c46c 100644 --- a/src/sentry/backup/comparators.py +++ b/src/sentry/backup/comparators.py @@ -736,6 +736,17 @@ def compare(self, on: InstanceID, left: Any, right: Any) -> list[ComparatorFindi return findings +class DataSourceComparator(IgnoredComparator): + """ + DataSource.source_id is a dynamic foreign key that gets remapped during import via the + normalize_before_relocation_import method. Since the remapping is handled there, we just + need to verify that both sides have a valid source_id value, without comparing the actual values. + """ + + def __init__(self): + super().__init__("source_id") + + def auto_assign_datetime_equality_comparators(comps: ComparatorMap) -> None: """Automatically assigns the DateAddedComparator to any `DateTimeField` that is not already claimed by the `DateUpdatedComparator`.""" @@ -929,7 +940,10 @@ def get_default_comparators() -> dict[str, list[JSONScrubbingComparator]]: "workflow_engine.dataconditiongroupaction": [ DateUpdatedComparator("date_updated", "date_added") ], - "workflow_engine.datasource": [DateUpdatedComparator("date_updated", "date_added")], + "workflow_engine.datasource": [ + DateUpdatedComparator("date_updated", "date_added"), + DataSourceComparator(), + ], "workflow_engine.datasourcedetector": [ DateUpdatedComparator("date_updated", "date_added") ], diff --git a/src/sentry/backup/findings.py b/src/sentry/backup/findings.py index 2015d46baae137..1ac7c8712749bd 100644 --- a/src/sentry/backup/findings.py +++ b/src/sentry/backup/findings.py @@ -95,6 +95,13 @@ class ComparatorFindingKind(FindingKind): # or `None`. ForeignKeyComparatorExistenceCheck = auto() + # DataSource.source_id field comparison failed (dynamic foreign key). + DataSourceComparator = auto() + + # Failed to compare DataSource.source_id field because one of the fields being compared was not present + # or `None`. + DataSourceComparatorExistenceCheck = auto() + # Failed to compare an ignored field. IgnoredComparator = auto() diff --git a/src/sentry/testutils/helpers/backups.py b/src/sentry/testutils/helpers/backups.py index 916d016d4c5ec1..8e66f47f1ba936 100644 --- a/src/sentry/testutils/helpers/backups.py +++ b/src/sentry/testutils/helpers/backups.py @@ -107,6 +107,7 @@ from sentry.services.nodestore.django.models import Node from sentry.silo.base import SiloMode from sentry.silo.safety import unguarded_write +from sentry.snuba.models import QuerySubscriptionDataSourceHandler from sentry.tempest.models import TempestCredentials from sentry.testutils.cases import TestCase, TransactionTestCase from sentry.testutils.factories import get_fixture_path @@ -121,6 +122,7 @@ from sentry.utils import json from sentry.workflow_engine.models import Action, DataConditionAlertRuleTrigger, DataConditionGroup from sentry.workflow_engine.models.workflow_action_group_status import WorkflowActionGroupStatus +from sentry.workflow_engine.registry import data_source_type_registry __all__ = [ "export_to_file", @@ -696,7 +698,13 @@ def create_exhaustive_organization( workflow=workflow, condition_group=notification_condition_group ) - data_source = self.create_data_source(organization=org) + # Use the alert_rule's QuerySubscription for the DataSource + query_subscription = alert.snuba_query.subscriptions.first() + data_source = self.create_data_source( + organization=org, + source_id=str(query_subscription.id), + type=data_source_type_registry.get_key(QuerySubscriptionDataSourceHandler), + ) self.create_data_source_detector(data_source, detector) detector_conditions = self.create_data_condition_group( From 6b40926f69cadbe160e549bd18d992683d0e93b6 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Wed, 12 Nov 2025 10:42:29 -0800 Subject: [PATCH 4/4] mypy --- src/sentry/testutils/helpers/backups.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/testutils/helpers/backups.py b/src/sentry/testutils/helpers/backups.py index 8e66f47f1ba936..73804e34ba64be 100644 --- a/src/sentry/testutils/helpers/backups.py +++ b/src/sentry/testutils/helpers/backups.py @@ -700,6 +700,7 @@ def create_exhaustive_organization( # Use the alert_rule's QuerySubscription for the DataSource query_subscription = alert.snuba_query.subscriptions.first() + assert query_subscription is not None data_source = self.create_data_source( organization=org, source_id=str(query_subscription.id),