Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ class AnomalyTransition(StrEnum):

@dataclass
class DecisionResult:
ts: datetime
"""
The associated timestamp of the decision. Typically this will be the clock
tick when the decision was made. However for a incident start and end
transitions this will be the back-dated timestamp of when the state began.

INCIDENT_STARTED -> Tick when the incident truly starts
INCIDENT_RECOVERED -> Tick when the incident truly recovered
"""

decision: TickAnomalyDecision
"""
The recorded decision made for the clock tick
Expand Down Expand Up @@ -365,7 +375,7 @@ def make_clock_tick_decision(tick: datetime) -> DecisionResult:
Decision = TickAnomalyDecision

if not options.get("crons.tick_volume_anomaly_detection"):
return DecisionResult(Decision.NORMAL)
return DecisionResult(tick, Decision.NORMAL)

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

Expand Down Expand Up @@ -405,6 +415,7 @@ def make_clock_tick_decision(tick: datetime) -> DecisionResult:
def make_decision(
decision: TickAnomalyDecision,
transition: AnomalyTransition | None = None,
ts: datetime | None = None,
) -> DecisionResult:
decision_key = MONITOR_TICK_DECISION.format(ts=_make_reference_ts(tick))
pipeline = redis_client.pipeline()
Expand All @@ -421,7 +432,7 @@ def make_decision(
},
)

return DecisionResult(decision, transition)
return DecisionResult(ts or tick, decision, transition)

def metrics_match(metric: Metric) -> Generator[bool]:
return (d == metric for d in tick_metrics)
Expand Down Expand Up @@ -453,8 +464,8 @@ def metrics_match(metric: Metric) -> Generator[bool]:
# If the previous result was recovering, check if we have recovered and can
# backfill these decisions as normal.
if last_decision == Decision.RECOVERING and all(metrics_match(Metric.NORMAL)):
_backfill_decisions(past_ts, Decision.NORMAL, Decision.RECOVERING)
return make_decision(Decision.NORMAL, AnomalyTransition.INCIDENT_RECOVERED)
ts = _backfill_decisions(past_ts, Decision.NORMAL, Decision.RECOVERING)
return make_decision(Decision.NORMAL, AnomalyTransition.INCIDENT_RECOVERED, ts)

# E: RECOVERING -> INCIDENT
#
Expand All @@ -470,8 +481,8 @@ def metrics_match(metric: Metric) -> Generator[bool]:
# an incident, mark this tick as an incident and backfill all abnormal
# decisions to an incident decision.
if last_decision != Decision.INCIDENT and last_metric == Metric.INCIDENT:
_backfill_decisions(past_ts, Decision.INCIDENT, Decision.ABNORMAL)
return make_decision(Decision.INCIDENT, AnomalyTransition.INCIDENT_STARTED)
ts = _backfill_decisions(past_ts, Decision.INCIDENT, Decision.ABNORMAL)
return make_decision(Decision.INCIDENT, AnomalyTransition.INCIDENT_STARTED, ts)

# NORMAL -> NORMAL
# ABNORMAL -> ABNORMAL
Expand All @@ -494,24 +505,32 @@ def get_clock_tick_decision(tick: datetime) -> TickAnomalyDecision | None:
return None


def _backfill_keys(start: datetime, until_not: TickAnomalyDecision) -> Generator[str]:
@dataclass
class BackfillItem:
key: str
ts: datetime


def _make_backfill(start: datetime, until_not: TickAnomalyDecision) -> Generator[BackfillItem]:
"""
Yields keys from the `start` tick until the value of the key is not a
`until_not` tick decision.
Yields keys and associated timestamps from the `start` tick until the value
of the key is not a `until_not` tick decision.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

for chunked_offsets in batched(range(0, BACKFILL_CUTOFF), BACKFILL_CHUNKS):
pipeline = redis_client.pipeline()

keys: list[str] = []
timestamps: list[datetime] = []
for offset in chunked_offsets:
ts = start - timedelta(minutes=offset)
key = MONITOR_TICK_DECISION.format(ts=_make_reference_ts(ts))
pipeline.get(key)
keys.append(key)
timestamps.append(ts)

for key, value in zip(keys, pipeline.execute()):
for key, ts, value in zip(keys, timestamps, pipeline.execute()):
# Edge case, we found a hole gap in decisions
if value is None:
return
Expand All @@ -521,7 +540,7 @@ def _backfill_keys(start: datetime, until_not: TickAnomalyDecision) -> Generator
if prev_decision != until_not:
return

yield key
yield BackfillItem(key, ts)

# If we've iterated through the entire BACKFILL_CUTOFF we have a
# "decision runaway" and should report this as an error
Expand All @@ -532,18 +551,30 @@ def _backfill_decisions(
start: datetime,
decision: TickAnomalyDecision,
until_not: TickAnomalyDecision,
) -> None:
) -> datetime | None:
"""
Update historic tick decisions from `start` to `decision` until we no
longer see the `until_not` decision.

If a backfill occurred, returns the timestamp just before
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

pipeline = redis_client.pipeline()
for key in _backfill_keys(start, until_not):
pipeline.set(key, decision.value)
backfill_items = list(_make_backfill(start, until_not))

for item in backfill_items:
pipeline.set(item.key, decision.value)
pipeline.execute()

# Return the timestamp just before we reached until_not. Note
# backfill_items is in reverse chronological order here.
if backfill_items:
return backfill_items[-1].ts

# In the case that we didn't backfill anything return None
return None


def _make_reference_ts(ts: datetime):
"""
Expand Down
35 changes: 31 additions & 4 deletions tests/sentry/monitors/test_system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def test_record_clock_tiock_volume_metric_uniform(metrics, logger):
}
)
def test_tick_decision_anomaly_recovery():
start = timezone.now().replace(second=0, microsecond=0)
start = timezone.now().replace(minute=0, second=0, microsecond=0)

test_metrics = [
# fmt: off
Expand All @@ -316,24 +316,28 @@ def test_tick_decision_anomaly_recovery():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition is None
assert result.ts == ts

# Transition into anomalous state (-6)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.ABNORMAL
assert result.transition == AnomalyTransition.ABNORMALITY_STARTED
assert result.ts == ts

# Next 5 ticks (-7, -4, -3, -3, -4) stay in abnormal state
for _ in range(0, 5):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.ABNORMAL
assert result.transition is None
assert result.ts == ts

# Next tick recovers the abnormality after 5 ticks under the abnormality threshold
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition == AnomalyTransition.ABNORMALITY_RECOVERED
assert result.ts == ts

# The last 6 ABNORMAL ticks transitioned to NORMAL
for i in range(1, 7):
Expand All @@ -354,7 +358,7 @@ def test_tick_decisions_simple_incident():
Tests incident detection for an incident that immediately starts and
immediately stops.
"""
start = timezone.now().replace(second=0, microsecond=0)
start = timezone.now().replace(minute=0, second=0, microsecond=0)

test_metrics = [
# fmt: off
Expand All @@ -375,36 +379,43 @@ def test_tick_decisions_simple_incident():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition is None
assert result.ts == ts

# Transition into incident (-35)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition == AnomalyTransition.INCIDENT_STARTED
assert result.ts == ts

# Incident continues (-80, -100, -50)
for _ in range(0, 3):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition is None
assert result.ts == ts

# Incident recovers (-3)
# Incident begins recovery (-3)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition == AnomalyTransition.INCIDENT_RECOVERING
assert result.ts == ts

# Incident continues recovery (-2, -4, -1)
for _ in range(0, 3):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition is None
assert result.ts == ts

# Incident recovers
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition == AnomalyTransition.INCIDENT_RECOVERED
# True recovery was 4 ticks ago
assert result.ts == ts - timedelta(minutes=4)

# The last 4 RECOVERING ticks transitioned to NORMAL
for i in range(1, 5):
Expand All @@ -424,7 +435,7 @@ def test_tick_decisions_variable_incident():
"""
Tests an incident that slowly starts and slowly recovers.
"""
start = timezone.now().replace(second=0, microsecond=0)
start = timezone.now().replace(minute=0, second=0, microsecond=0)

test_metrics = [
# fmt: off
Expand Down Expand Up @@ -459,24 +470,29 @@ def test_tick_decisions_variable_incident():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition is None
assert result.ts == ts

# Transition into anomalous state (-6)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.ABNORMAL
assert result.transition == AnomalyTransition.ABNORMALITY_STARTED
assert result.ts == ts

# Next 4 ticks (-7, -4, -3, -10) stay in anomaly
for _ in range(0, 4):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.ABNORMAL
assert result.transition is None
assert result.ts == ts

# Incident starts (-30)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition == AnomalyTransition.INCIDENT_STARTED
# True incident start was 5 ticks ago
assert result.ts == ts - timedelta(minutes=5)

# The last 5 ABNORMAL ticks transitioned to INCIDENT
for i in range(1, 6):
Expand All @@ -487,24 +503,28 @@ def test_tick_decisions_variable_incident():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition is None
assert result.ts == ts

# Incident begins recovering (-4)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition == AnomalyTransition.INCIDENT_RECOVERING
assert result.ts == ts

# Incident continues to recover (-3)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition is None
assert result.ts == ts

# Incident has anomalous tick again (-6), not fully recovered
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition == AnomalyTransition.INCIDENT_RECOVERY_FAILED
assert result.ts == ts

# The last 2 RECOVERING ticks transitioned back to incident
for i in range(1, 3):
Expand All @@ -515,18 +535,21 @@ def test_tick_decisions_variable_incident():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition == AnomalyTransition.INCIDENT_RECOVERING
assert result.ts == ts

# Incident continues to recover (-1)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition is None
assert result.ts == ts

# Incident has incident tick again (-30), not fully recovered
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.INCIDENT
assert result.transition == AnomalyTransition.INCIDENT_RECOVERY_FAILED
assert result.ts == ts

# The last 2 RECOVERING ticks transitioned back to incident
for i in range(1, 3):
Expand All @@ -537,18 +560,22 @@ def test_tick_decisions_variable_incident():
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition == AnomalyTransition.INCIDENT_RECOVERING
assert result.ts == ts

# Incident continues to recover for the next 3 normal ticks (-2, -4, -4)
for _ in range(0, 3):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.RECOVERING
assert result.transition is None
assert result.ts == ts

# Incident recovers at the final 5th tick (-3)
for _ in range(0, 1):
result = make_clock_tick_decision(ts := ts + timedelta(minutes=1))
assert result.decision == TickAnomalyDecision.NORMAL
assert result.transition == AnomalyTransition.INCIDENT_RECOVERED
# True incident recovery was 4 ticks ago
assert result.ts == ts - timedelta(minutes=4)

# The last 4 RECOVERING ticks transitioned to NORMAL
for i in range(1, 5):
Expand Down