Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions src/sentry/incidents/subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Sequence
from copy import deepcopy
from datetime import datetime, timedelta
from typing import TypeVar, cast
from typing import Literal, TypedDict, TypeVar, cast

from django.conf import settings
from django.db import router, transaction
Expand Down Expand Up @@ -87,6 +87,15 @@
T = TypeVar("T")


class MetricIssueDetectorConfig(TypedDict):
"""
Schema for Metric Issue Detector.config.
"""

comparison_delta: int | None
detection_type: Literal["static", "percent", "dynamic"]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detector config can also have:

sensitivity: str | None
seasonality: str | None
threshold_period: int

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, interesting. I based this on

config_schema={
"$schema": "https://json-schema.org/draft/2020-12/schema",
"description": "A representation of a metric alert firing",
"type": "object",
"required": ["detection_type"],
"properties": {
"comparison_delta": {
"type": ["integer", "null"],
"enum": COMPARISON_DELTA_CHOICES,
},
"detection_type": {
"type": "string",
"enum": [detection_type.value for detection_type in AlertRuleDetectionType],
},
},
},
and what fields were actually used here in the file. The lesson for me is that the default value of 'additionalProperties' is 'true', which I should keep in mind.

Interesting, the current db data doesn't seem to have any with seasonality, has only 4 (~0.04%) with sensitivity, and it looks like threshold_period isn't required: https://redash.getsentry.net/queries/9648
I'm not sure what that says about the group type schema or this here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me look into this - seasonality and sensitivity are only used for anomaly detection alerts.

seasonality I was wrong about. We have this in the alert rule model but as we never released anything besides "auto" we decided to not migrate it and not let users change this option.

sensitivity we should have on more than 4 detectors though. I'll follow up separately, we might need to run a migration to heal these missing values.

threshold_period isn't required because it's for an unreleased feature that lets you say the alert has to exceed the threshold x number of times before triggering. We have a handful of rules in the Sentry organization that use it, but you can't create a rule like this today. We should probably get rid of these if we don't plan to ever release it, but for now that's what that is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Michelle let me know we moved sensitivity off of the Detector model, I was unaware. These are on the DataCondition now :phew:


class SubscriptionProcessor:
"""
Class for processing subscription updates for an alert rule. Accepts a subscription
Expand All @@ -107,19 +116,20 @@ class SubscriptionProcessor:

def __init__(self, subscription: QuerySubscription) -> None:
self.subscription = subscription
self._alert_rule: AlertRule | None = None
try:
self.alert_rule = AlertRule.objects.get_for_subscription(subscription)
self._alert_rule = AlertRule.objects.get_for_subscription(subscription)
except AlertRule.DoesNotExist:
return

self.triggers = AlertRuleTrigger.objects.get_for_alert_rule(self.alert_rule)
self.triggers = AlertRuleTrigger.objects.get_for_alert_rule(self._alert_rule)
self.triggers.sort(key=lambda trigger: trigger.alert_threshold)

(
self.last_update,
self.trigger_alert_counts,
self.trigger_resolve_counts,
) = get_alert_rule_stats(self.alert_rule, self.subscription, self.triggers)
) = get_alert_rule_stats(self._alert_rule, self.subscription, self.triggers)
self.orig_trigger_alert_counts = deepcopy(self.trigger_alert_counts)
self.orig_trigger_resolve_counts = deepcopy(self.trigger_resolve_counts)

Expand All @@ -135,6 +145,14 @@ def __init__(self, subscription: QuerySubscription) -> None:
or self._has_workflow_engine_processing_only
)

@property
def alert_rule(self) -> AlertRule:
"""
Only use this in non-single processing contexts.
"""
assert self._alert_rule is not None
return self._alert_rule

@property
def active_incident(self) -> Incident | None:
"""
Expand Down Expand Up @@ -188,15 +206,15 @@ def check_trigger_matches_status(
incident_trigger = self.incident_trigger_map.get(trigger.id)
return incident_trigger is not None and incident_trigger.status == status.value

def reset_trigger_counts(self) -> None:
def reset_trigger_counts(self, alert_rule: AlertRule) -> None:
"""
Helper method that clears both the trigger alert and the trigger resolve counts
"""
for trigger_id in self.trigger_alert_counts:
self.trigger_alert_counts[trigger_id] = 0
for trigger_id in self.trigger_resolve_counts:
self.trigger_resolve_counts[trigger_id] = 0
self.update_alert_rule_stats()
self.update_alert_rule_stats(alert_rule)

def calculate_resolve_threshold(self, trigger: AlertRuleTrigger) -> float:
"""
Expand Down Expand Up @@ -253,8 +271,8 @@ def get_crash_rate_alert_metrics_aggregation_value(
aggregation_value = get_crash_rate_alert_metrics_aggregation_value_helper(
subscription_update
)
if aggregation_value is None:
self.reset_trigger_counts()
if aggregation_value is None and self._alert_rule is not None:
self.reset_trigger_counts(self._alert_rule)
return aggregation_value

def get_aggregation_value(
Expand All @@ -271,7 +289,7 @@ def get_aggregation_value(
organization_id=self.subscription.project.organization.id,
project_ids=[self.subscription.project_id],
comparison_delta=comparison_delta,
alert_rule_id=self.alert_rule.id,
alert_rule_id=self._alert_rule.id if self._alert_rule else None,
)

return aggregation_value
Expand Down Expand Up @@ -300,7 +318,7 @@ def handle_trigger_anomalies(
is_resolved=False,
)
incremented = metrics_incremented or incremented
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
incident_trigger = self.trigger_alert_threshold(trigger)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
Expand Down Expand Up @@ -332,9 +350,12 @@ def get_comparison_delta(self, detector: Detector | None) -> int | None:
comparison_delta = None

if detector:
comparison_delta = detector.config.get("comparison_delta")
detector_cfg: MetricIssueDetectorConfig = detector.config
comparison_delta = detector_cfg.get("comparison_delta")
else:
comparison_delta = self.alert_rule.comparison_delta
# If we don't have a Detector, we must have an AlertRule.
assert self._alert_rule is not None
comparison_delta = self._alert_rule.comparison_delta

return comparison_delta

Expand Down Expand Up @@ -421,7 +442,7 @@ def handle_trigger_alerts(
)
incremented = metrics_incremented or incremented
# triggering a threshold will create an incident and set the status to active
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
incident_trigger = self.trigger_alert_threshold(trigger)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
Expand Down Expand Up @@ -455,11 +476,13 @@ def handle_trigger_alerts(

def process_results_workflow_engine(
self,
detector: Detector,
subscription_update: QuerySubscriptionUpdate,
aggregation_value: float,
organization: Organization,
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]:
if self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC:
detector_cfg: MetricIssueDetectorConfig = detector.config
if detector_cfg["detection_type"] == AlertRuleDetectionType.DYNAMIC.value:
anomaly_detection_packet = AnomalyDetectionUpdate(
entity=subscription_update.get("entity", ""),
subscription_id=subscription_update["subscription_id"],
Expand Down Expand Up @@ -499,14 +522,13 @@ def process_results_workflow_engine(
"results": results,
"num_results": len(results),
"value": aggregation_value,
"rule_id": self.alert_rule.id,
"rule_id": self._alert_rule.id if self._alert_rule else None,
},
)
return results

def process_legacy_metric_alerts(
self,
subscription_update: QuerySubscriptionUpdate,
aggregation_value: float,
detector: Detector | None,
results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] | None,
Expand Down Expand Up @@ -632,7 +654,7 @@ def process_legacy_metric_alerts(
# is killed here. The trade-off is that we might process an update twice. Mostly
# this will have no effect, but if someone manages to close a triggered incident
# before the next one then we might alert twice.
self.update_alert_rule_stats()
self.update_alert_rule_stats(self.alert_rule)
return fired_incident_triggers

def has_downgraded(self, dataset: str, organization: Organization) -> bool:
Expand Down Expand Up @@ -677,7 +699,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
if self.has_downgraded(dataset, organization):
return

if not hasattr(self, "alert_rule"):
if self._alert_rule is None:
# QuerySubscriptions must _always_ have an associated AlertRule
# If the alert rule has been removed then clean up associated tables and return
metrics.incr("incidents.alert_rules.no_alert_rule_for_subscription", sample_rate=1.0)
Expand Down Expand Up @@ -736,8 +758,9 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
legacy_results = None

if self._has_workflow_engine_processing:
assert detector is not None
workflow_engine_results = self.process_results_workflow_engine(
subscription_update, aggregation_value, organization
detector, subscription_update, aggregation_value, organization
)

if self._has_workflow_engine_processing_only:
Expand All @@ -756,7 +779,6 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
workflow engine "and" metric alerts.
"""
legacy_results = self.process_legacy_metric_alerts(
subscription_update,
aggregation_value,
detector,
workflow_engine_results,
Expand All @@ -775,7 +797,8 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
)

def trigger_alert_threshold(
self, trigger: AlertRuleTrigger, metric_value: float
self,
trigger: AlertRuleTrigger,
) -> IncidentTrigger | None:
"""
Called when a subscription update exceeds the value defined in the
Expand Down Expand Up @@ -1019,7 +1042,7 @@ def handle_incident_severity_update(self) -> None:
status_method=IncidentStatusMethod.RULE_TRIGGERED,
)

def update_alert_rule_stats(self) -> None:
def update_alert_rule_stats(self, alert_rule: AlertRule) -> None:
"""
Updates stats about the alert rule, if they're changed.
:return:
Expand All @@ -1036,7 +1059,7 @@ def update_alert_rule_stats(self) -> None:
}

update_alert_rule_stats(
self.alert_rule,
alert_rule,
self.subscription,
self.last_update,
updated_trigger_alert_counts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3481,7 +3481,7 @@ def test_seer_call_null_aggregation_value(

mock_seer_request.return_value = HTTPResponse(orjson.dumps(seer_return_value), status=200)
processor = SubscriptionProcessor(self.sub)
processor.alert_rule = self.dynamic_rule
processor._alert_rule = self.dynamic_rule
result = get_anomaly_data_from_seer_legacy(
alert_rule=processor.alert_rule,
subscription=processor.subscription,
Expand Down
Loading