Skip to content

Commit 00ea07d

Browse files
kconsandrewshie-sentry
authored andcommitted
fix(aci): Implement DataSource.normalize_before_relocation_import (#103002)
To be properly imported from backup (as with self-host to saas migration) DataSource needs to remap ids to new values. This involves tracking dependencies and translating in normalize_before_relocation_import. The field comparator has to be somewhat lax here, as the actual foreign item referenced is dependent on two fields, which isn't really a pattern supported by our ForeignKey comparator infrastructure. For now, at least, we assume the fields being referenced are valid (a big improvement over the current prod code, where we have broken id references and also ignore that), but this will be revisited in a follow-up.
1 parent b6caeab commit 00ea07d

File tree

9 files changed

+177
-4
lines changed

9 files changed

+177
-4
lines changed

src/sentry/backup/comparators.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,17 @@ def compare(self, on: InstanceID, left: Any, right: Any) -> list[ComparatorFindi
736736
return findings
737737

738738

739+
class DataSourceComparator(IgnoredComparator):
740+
"""
741+
DataSource.source_id is a dynamic foreign key that gets remapped during import via the
742+
normalize_before_relocation_import method. Since the remapping is handled there, we just
743+
need to verify that both sides have a valid source_id value, without comparing the actual values.
744+
"""
745+
746+
def __init__(self):
747+
super().__init__("source_id")
748+
749+
739750
def auto_assign_datetime_equality_comparators(comps: ComparatorMap) -> None:
740751
"""Automatically assigns the DateAddedComparator to any `DateTimeField` that is not already
741752
claimed by the `DateUpdatedComparator`."""
@@ -929,7 +940,10 @@ def get_default_comparators() -> dict[str, list[JSONScrubbingComparator]]:
929940
"workflow_engine.dataconditiongroupaction": [
930941
DateUpdatedComparator("date_updated", "date_added")
931942
],
932-
"workflow_engine.datasource": [DateUpdatedComparator("date_updated", "date_added")],
943+
"workflow_engine.datasource": [
944+
DateUpdatedComparator("date_updated", "date_added"),
945+
DataSourceComparator(),
946+
],
933947
"workflow_engine.datasourcedetector": [
934948
DateUpdatedComparator("date_updated", "date_added")
935949
],

src/sentry/backup/findings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ class ComparatorFindingKind(FindingKind):
9595
# or `None`.
9696
ForeignKeyComparatorExistenceCheck = auto()
9797

98+
# DataSource.source_id field comparison failed (dynamic foreign key).
99+
DataSourceComparator = auto()
100+
101+
# Failed to compare DataSource.source_id field because one of the fields being compared was not present
102+
# or `None`.
103+
DataSourceComparatorExistenceCheck = auto()
104+
98105
# Failed to compare an ignored field.
99106
IgnoredComparator = auto()
100107

src/sentry/monitors/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,7 @@ class Meta:
814814

815815
@data_source_type_registry.register(DATA_SOURCE_CRON_MONITOR)
816816
class CronMonitorDataSourceHandler(DataSourceTypeHandler[Monitor]):
817+
@override
817818
@staticmethod
818819
def bulk_get_query_object(
819820
data_sources: list[DataSource],
@@ -834,6 +835,7 @@ def bulk_get_query_object(
834835
}
835836
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
836837

838+
@override
837839
@staticmethod
838840
def related_model(instance) -> list[ModelRelation]:
839841
return [ModelRelation(Monitor, {"id": instance.source_id})]
@@ -848,3 +850,8 @@ def get_instance_limit(org: Organization) -> int | None:
848850
def get_current_instance_count(org: Organization) -> int:
849851
# We don't have a limit at the moment, so no need to count.
850852
raise NotImplementedError
853+
854+
@override
855+
@staticmethod
856+
def get_relocation_model_name() -> str:
857+
return "monitors.monitor"

src/sentry/snuba/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def write_relocation_import(
182182

183183
@data_source_type_registry.register(DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
184184
class QuerySubscriptionDataSourceHandler(DataSourceTypeHandler[QuerySubscription]):
185+
@override
185186
@staticmethod
186187
def bulk_get_query_object(
187188
data_sources: list[DataSource],
@@ -203,6 +204,7 @@ def bulk_get_query_object(
203204
}
204205
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
205206

207+
@override
206208
@staticmethod
207209
def related_model(instance) -> list[ModelRelation]:
208210
return [ModelRelation(QuerySubscription, {"id": instance.source_id})]
@@ -223,3 +225,8 @@ def get_current_instance_count(org: Organization) -> int:
223225
QuerySubscription.Status.UPDATING.value,
224226
),
225227
).count()
228+
229+
@override
230+
@staticmethod
231+
def get_relocation_model_name() -> str:
232+
return "sentry.querysubscription"

src/sentry/testutils/helpers/backups.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
from sentry.services.nodestore.django.models import Node
109109
from sentry.silo.base import SiloMode
110110
from sentry.silo.safety import unguarded_write
111+
from sentry.snuba.models import QuerySubscriptionDataSourceHandler
111112
from sentry.tempest.models import TempestCredentials
112113
from sentry.testutils.cases import TestCase, TransactionTestCase
113114
from sentry.testutils.factories import get_fixture_path
@@ -122,6 +123,7 @@
122123
from sentry.utils import json
123124
from sentry.workflow_engine.models import Action, DataConditionAlertRuleTrigger, DataConditionGroup
124125
from sentry.workflow_engine.models.workflow_action_group_status import WorkflowActionGroupStatus
126+
from sentry.workflow_engine.registry import data_source_type_registry
125127

126128
__all__ = [
127129
"export_to_file",
@@ -697,7 +699,14 @@ def create_exhaustive_organization(
697699
workflow=workflow, condition_group=notification_condition_group
698700
)
699701

700-
data_source = self.create_data_source(organization=org)
702+
# Use the alert_rule's QuerySubscription for the DataSource
703+
query_subscription = alert.snuba_query.subscriptions.first()
704+
assert query_subscription is not None
705+
data_source = self.create_data_source(
706+
organization=org,
707+
source_id=str(query_subscription.id),
708+
type=data_source_type_registry.get_key(QuerySubscriptionDataSourceHandler),
709+
)
701710

702711
self.create_data_source_detector(data_source, detector)
703712
detector_conditions = self.create_data_condition_group(

src/sentry/uptime/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class UptimeRegionScheduleMode(enum.StrEnum):
186186

187187
@data_source_type_registry.register(DATA_SOURCE_UPTIME_SUBSCRIPTION)
188188
class UptimeSubscriptionDataSourceHandler(DataSourceTypeHandler[UptimeSubscription]):
189+
@override
189190
@staticmethod
190191
def bulk_get_query_object(
191192
data_sources: list[DataSource],
@@ -210,6 +211,7 @@ def bulk_get_query_object(
210211
}
211212
return {ds.id: qs_lookup.get(ds.source_id) for ds in data_sources}
212213

214+
@override
213215
@staticmethod
214216
def related_model(instance) -> list[ModelRelation]:
215217
return [ModelRelation(UptimeSubscription, {"id": instance.source_id})]
@@ -225,6 +227,11 @@ def get_current_instance_count(org: Organization) -> int:
225227
# We don't have a limit at the moment, so no need to count.
226228
raise NotImplementedError
227229

230+
@override
231+
@staticmethod
232+
def get_relocation_model_name() -> str:
233+
return "uptime.uptimesubscription"
234+
228235

229236
def get_detector(uptime_subscription: UptimeSubscription, prefetch_workflow_data=False) -> Detector:
230237
"""

src/sentry/workflow_engine/models/data_source.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
import builtins
22
import dataclasses
3+
import logging
34
from typing import Generic, TypeVar
45

56
from django.db import models
67
from django.db.models.signals import pre_save
78
from django.dispatch import receiver
89

9-
from sentry.backup.scopes import RelocationScope
10+
from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap
11+
from sentry.backup.helpers import ImportFlags
12+
from sentry.backup.scopes import ImportScope, RelocationScope
1013
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
1114
from sentry.utils.registry import NoRegistrationExistsError
1215
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
1316
from sentry.workflow_engine.registry import data_source_type_registry
1417
from sentry.workflow_engine.types import DataSourceTypeHandler
1518

19+
logger = logging.getLogger(__name__)
20+
1621
T = TypeVar("T")
1722

1823

@@ -25,6 +30,13 @@ class DataPacket(Generic[T]):
2530
@region_silo_model
2631
class DataSource(DefaultFieldsModel):
2732
__relocation_scope__ = RelocationScope.Organization
33+
# DataSource.source_id dynamically references different models based on the 'type' field.
34+
# We declare all possible dependencies here to ensure proper import ordering.
35+
__relocation_dependencies__ = {
36+
"monitors.monitor", # For DATA_SOURCE_CRON_MONITOR
37+
"sentry.querysubscription", # For DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
38+
"uptime.uptimesubscription", # For DATA_SOURCE_UPTIME_SUBSCRIPTION
39+
}
2840

2941
organization = FlexibleForeignKey("sentry.Organization")
3042

@@ -49,6 +61,34 @@ def type_handler(self) -> builtins.type[DataSourceTypeHandler]:
4961
raise ValueError(f"Unknown data source type: {self.type}")
5062
return handler
5163

64+
def normalize_before_relocation_import(
65+
self, pk_map: PrimaryKeyMap, scope: ImportScope, flags: ImportFlags
66+
) -> int | None:
67+
old_pk = super().normalize_before_relocation_import(pk_map, scope, flags)
68+
if old_pk is None:
69+
return None
70+
71+
# Map source_id based on the data source type
72+
try:
73+
handler = data_source_type_registry.get(self.type)
74+
model_name = NormalizedModelName(handler.get_relocation_model_name())
75+
old_source_id = int(self.source_id)
76+
new_source_id = pk_map.get_pk(model_name, old_source_id)
77+
78+
if new_source_id is None:
79+
# Referenced model not in pk_map - the source was filtered out or failed to import.
80+
return None
81+
82+
self.source_id = str(new_source_id)
83+
except Exception:
84+
logger.exception(
85+
"DataSource.normalize_before_relocation_import failed",
86+
extra={"data_source_id": old_pk, "type": self.type, "source_id": self.source_id},
87+
)
88+
return None
89+
90+
return old_pk
91+
5292

5393
@receiver(pre_save, sender=DataSource)
5494
def ensure_type_handler_registered(sender, instance: DataSource, **kwargs):

src/sentry/workflow_engine/types.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ def execute(event_data: WorkflowEventData, action: Action, detector: Detector) -
107107
raise NotImplementedError
108108

109109

110-
class DataSourceTypeHandler(Generic[T]):
110+
class DataSourceTypeHandler(ABC, Generic[T]):
111111
@staticmethod
112+
@abstractmethod
112113
def bulk_get_query_object(data_sources) -> dict[int, T | None]:
113114
"""
114115
Bulk fetch related data-source models returning a dict of the
@@ -117,6 +118,7 @@ def bulk_get_query_object(data_sources) -> dict[int, T | None]:
117118
raise NotImplementedError
118119

119120
@staticmethod
121+
@abstractmethod
120122
def related_model(instance) -> list[ModelRelation]:
121123
"""
122124
A list of deletion ModelRelations. The model relation query should map
@@ -126,6 +128,7 @@ def related_model(instance) -> list[ModelRelation]:
126128
raise NotImplementedError
127129

128130
@staticmethod
131+
@abstractmethod
129132
def get_instance_limit(org: Organization) -> int | None:
130133
"""
131134
Returns the maximum number of instances of this data source type for the organization.
@@ -134,13 +137,24 @@ def get_instance_limit(org: Organization) -> int | None:
134137
raise NotImplementedError
135138

136139
@staticmethod
140+
@abstractmethod
137141
def get_current_instance_count(org: Organization) -> int:
138142
"""
139143
Returns the current number of instances of this data source type for the organization.
140144
Only called if `get_instance_limit` returns a number >0
141145
"""
142146
raise NotImplementedError
143147

148+
@staticmethod
149+
@abstractmethod
150+
def get_relocation_model_name() -> str:
151+
"""
152+
Returns the normalized model name (e.g., "sentry.querysubscription") for the model that
153+
source_id references. This is used during backup/relocation to map old PKs to new PKs.
154+
The format is "app_label.model_name" in lowercase.
155+
"""
156+
raise NotImplementedError
157+
144158

145159
class DataConditionHandler(Generic[T]):
146160
class Group(StrEnum):

tests/sentry/workflow_engine/models/test_data_source.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
import pytest
44

5+
from sentry.backup.dependencies import ImportKind, NormalizedModelName, PrimaryKeyMap
6+
from sentry.backup.helpers import ImportFlags
7+
from sentry.backup.scopes import ImportScope
8+
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
59
from sentry.workflow_engine.registry import data_source_type_registry
610
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
711

@@ -18,3 +22,67 @@ def test_data_source_valid_type(self) -> None:
1822
data_source = self.create_data_source(type="test")
1923
assert data_source is not None
2024
assert data_source.type == "test"
25+
26+
def test_normalize_before_relocation_import(self) -> None:
27+
monitor = self.create_monitor(project=self.project)
28+
data_source = self.create_data_source(
29+
type=DATA_SOURCE_CRON_MONITOR,
30+
source_id=str(monitor.id),
31+
organization_id=self.organization.id,
32+
)
33+
34+
old_monitor_pk = monitor.id
35+
new_monitor_pk = 9999
36+
old_data_source_id = data_source.id
37+
old_org_id = data_source.organization_id
38+
39+
# Create a PrimaryKeyMap that maps the old monitor ID to a new one
40+
pk_map = PrimaryKeyMap()
41+
pk_map.insert(
42+
model_name=NormalizedModelName("monitors.monitor"),
43+
old=old_monitor_pk,
44+
new=new_monitor_pk,
45+
kind=ImportKind.Inserted,
46+
)
47+
pk_map.insert(
48+
model_name=NormalizedModelName("sentry.organization"),
49+
old=old_org_id,
50+
new=old_org_id,
51+
kind=ImportKind.Inserted,
52+
)
53+
54+
old_data_source_pk = data_source.normalize_before_relocation_import(
55+
pk_map, ImportScope.Organization, ImportFlags()
56+
)
57+
58+
assert (
59+
old_data_source_pk == old_data_source_id
60+
), f"Expected {old_data_source_id}, got {old_data_source_pk}"
61+
assert data_source.source_id == str(new_monitor_pk)
62+
assert data_source.pk is None
63+
64+
def test_normalize_before_relocation_import_missing_source(self) -> None:
65+
monitor = self.create_monitor(project=self.project)
66+
data_source = self.create_data_source(
67+
type=DATA_SOURCE_CRON_MONITOR,
68+
source_id=str(monitor.id),
69+
organization_id=self.organization.id,
70+
)
71+
72+
old_org_id = data_source.organization_id
73+
74+
# Create a PrimaryKeyMap without the monitor mapping
75+
pk_map = PrimaryKeyMap()
76+
pk_map.insert(
77+
model_name=NormalizedModelName("sentry.organization"),
78+
old=old_org_id,
79+
new=old_org_id,
80+
kind=ImportKind.Inserted,
81+
)
82+
83+
result = data_source.normalize_before_relocation_import(
84+
pk_map, ImportScope.Organization, ImportFlags()
85+
)
86+
87+
# Should return None when the referenced source is not in pk_map
88+
assert result is None

0 commit comments

Comments
 (0)