Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 61 additions & 86 deletions src/sentry/statistical_detectors/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,15 @@
from datetime import datetime, timezone
from typing import Any, Callable, Mapping, MutableMapping, Optional, Tuple

from sentry.statistical_detectors.base import (
DetectorConfig,
DetectorPayload,
DetectorState,
TrendType,
)
import sentry_sdk

from sentry.statistical_detectors.base import DetectorPayload, DetectorState, TrendType
from sentry.utils import metrics
from sentry.utils.math import MovingAverage

logger = logging.getLogger("sentry.tasks.statistical_detectors.algorithm")


class DetectorAlgorithm(ABC):
@abstractmethod
def __init__(
self,
source: str,
kind: str,
state: DetectorState,
config: DetectorConfig,
):
...

@abstractmethod
def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]:
...

@property
@abstractmethod
def state(self) -> DetectorState:
...


@dataclass(frozen=True)
class MovingAverageDetectorState(DetectorState):
timestamp: Optional[datetime]
Expand Down Expand Up @@ -111,85 +87,84 @@ def empty(cls) -> MovingAverageDetectorState:
)


@dataclass(frozen=True)
class MovingAverageDetectorConfig(DetectorConfig):
min_data_points: int
short_moving_avg_factory: Callable[[], MovingAverage]
long_moving_avg_factory: Callable[[], MovingAverage]
class DetectorAlgorithm(ABC):
@abstractmethod
def update(
self,
raw: Mapping[str | bytes, bytes | float | int | str],
payload: DetectorPayload,
) -> Tuple[TrendType, float, Optional[DetectorState]]:
...


class MovingAverageDetector(DetectorAlgorithm):
class MovingAverageRelativeChangeDetector(DetectorAlgorithm):
def __init__(
self,
source: str,
kind: str,
state: MovingAverageDetectorState,
config: MovingAverageDetectorConfig,
min_data_points: int,
moving_avg_short_factory: Callable[[], MovingAverage],
moving_avg_long_factory: Callable[[], MovingAverage],
threshold: float,
):
self.source = source
self.kind = kind
self.min_data_points = min_data_points
self.moving_avg_short_factory = moving_avg_short_factory
self.moving_avg_long_factory = moving_avg_long_factory
self.threshold = threshold

self.moving_avg_short = config.short_moving_avg_factory()
self.moving_avg_short.set(state.moving_avg_short, state.count)

self.moving_avg_long = config.long_moving_avg_factory()
self.moving_avg_long.set(state.moving_avg_long, state.count)

self.timestamp = state.timestamp
self.count = state.count
self.config = config

@property
def state(self) -> MovingAverageDetectorState:
return MovingAverageDetectorState(
timestamp=self.timestamp,
count=self.count,
moving_avg_short=self.moving_avg_short.value,
moving_avg_long=self.moving_avg_long.value,
)


@dataclass(frozen=True)
class MovingAverageRelativeChangeDetectorConfig(MovingAverageDetectorConfig):
threshold: float

def update(
self,
raw_state: Mapping[str | bytes, bytes | float | int | str],
payload: DetectorPayload,
) -> Tuple[TrendType, float, Optional[DetectorState]]:
try:
old = MovingAverageDetectorState.from_redis_dict(raw_state)
except Exception as e:
old = MovingAverageDetectorState.empty()

class MovingAverageRelativeChangeDetector(MovingAverageDetector):
config: MovingAverageRelativeChangeDetectorConfig
if raw_state:
# empty raw state implies that there was no
# previous state so no need to capture an exception
sentry_sdk.capture_exception(e)

def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]:
if self.timestamp is not None and self.timestamp > payload.timestamp:
if old.timestamp is not None and old.timestamp > payload.timestamp:
# In the event that the timestamp is before the payload's timestamps,
# we do not want to process this payload.
#
# This should not happen other than in some error state.
logger.warning(
"Trend detection out of order. Processing %s, but last processed was %s",
payload.timestamp.isoformat(),
self.timestamp.isoformat(),
old.timestamp.isoformat(),
)
return TrendType.Skipped, 0

old_moving_avg_short = self.moving_avg_short.value
old_moving_avg_long = self.moving_avg_long.value

self.moving_avg_short.update(payload.value)
self.moving_avg_long.update(payload.value)
self.timestamp = payload.timestamp
self.count += 1
return TrendType.Skipped, 0, None

moving_avg_short = self.moving_avg_short_factory()
moving_avg_long = self.moving_avg_long_factory()

new = MovingAverageDetectorState(
timestamp=payload.timestamp,
count=old.count + 1,
moving_avg_short=moving_avg_short.update(
old.count, old.moving_avg_short, payload.value
),
moving_avg_long=moving_avg_long.update(old.count, old.moving_avg_long, payload.value),
)

# The heuristic isn't stable initially, so ensure we have a minimum
# number of data points before looking for a regression.
stablized = self.count > self.config.min_data_points
stablized = new.count > self.min_data_points

score = abs(self.moving_avg_short.value - self.moving_avg_long.value)
score = abs(new.moving_avg_short - new.moving_avg_long)

try:
relative_change_old = (old_moving_avg_short - old_moving_avg_long) / abs(
old_moving_avg_long
relative_change_old = (old.moving_avg_short - old.moving_avg_long) / abs(
old.moving_avg_long
)
relative_change_new = (self.moving_avg_short.value - self.moving_avg_long.value) / abs(
self.moving_avg_long.value
relative_change_new = (new.moving_avg_short - new.moving_avg_long) / abs(
new.moving_avg_long
)

metrics.distribution(
Expand All @@ -203,16 +178,16 @@ def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]:

if (
stablized
and relative_change_old < self.config.threshold
and relative_change_new > self.config.threshold
and relative_change_old < self.threshold
and relative_change_new > self.threshold
):
return TrendType.Regressed, score
return TrendType.Regressed, score, new

elif (
stablized
and relative_change_old > -self.config.threshold
and relative_change_new < -self.config.threshold
and relative_change_old > -self.threshold
and relative_change_new < -self.threshold
):
return TrendType.Improved, score
return TrendType.Improved, score, new

return TrendType.Unchanged, score
return TrendType.Unchanged, score, new
5 changes: 0 additions & 5 deletions src/sentry/statistical_detectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,3 @@ def should_escalate(
@abstractmethod
def empty(cls) -> DetectorState:
...


@dataclass(frozen=True)
class DetectorConfig(ABC):
...
45 changes: 14 additions & 31 deletions src/sentry/statistical_detectors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
from sentry.search.events.fields import get_function_alias
from sentry.seer.utils import BreakpointData, detect_breakpoints
from sentry.statistical_detectors.algorithm import DetectorAlgorithm
from sentry.statistical_detectors.base import (
DetectorConfig,
DetectorPayload,
DetectorState,
TrendType,
)
from sentry.statistical_detectors.base import DetectorPayload, DetectorState, TrendType
from sentry.statistical_detectors.issue_platform_adapter import fingerprint_regression
from sentry.statistical_detectors.store import DetectorStore
from sentry.utils import metrics
Expand All @@ -50,16 +45,18 @@ class RegressionDetector(ABC):
source: str
kind: str
regression_type: RegressionType
config: DetectorConfig
state_cls: type[DetectorState]
detector_cls: type[DetectorAlgorithm]
min_change: int
resolution_rel_threshold: float
escalation_rel_threshold: float

@classmethod
@abstractmethod
def make_detector_store(cls) -> DetectorStore:
def detector_algorithm_factory(cls) -> DetectorAlgorithm:
...

@classmethod
@abstractmethod
def detector_store_factory(cls) -> DetectorStore:
...

@classmethod
Expand Down Expand Up @@ -96,7 +93,8 @@ def detect_trends(
regressed_count = 0
improved_count = 0

store = cls.make_detector_store()
algorithm = cls.detector_algorithm_factory()
store = cls.detector_store_factory()

for payloads in chunked(cls.all_payloads(projects, start), 100):
total_count += len(payloads)
Expand All @@ -106,37 +104,22 @@ def detect_trends(
states = []

for raw_state, payload in zip(raw_states, payloads):
try:
state = cls.state_cls.from_redis_dict(raw_state)
except Exception as e:
state = cls.state_cls.empty()

if raw_state:
# empty raw state implies that there was no
# previous state so no need to capture an exception
sentry_sdk.capture_exception(e)

algorithm = cls.detector_cls(cls.source, cls.kind, state, cls.config)
trend_type, score = algorithm.update(payload)

# the trend type can be None if no update happened,
# pass None to indicate we do not need up update the state
states.append(
None if trend_type == TrendType.Skipped else algorithm.state.to_redis_dict()
)
unique_project_ids.add(payload.project_id)

trend_type, score, new_state = algorithm.update(raw_state, payload)

if trend_type == TrendType.Regressed:
regressed_count += 1
elif trend_type == TrendType.Improved:
improved_count += 1

unique_project_ids.add(payload.project_id)
states.append(None if new_state is None else new_state.to_redis_dict())

yield TrendBundle(
type=trend_type,
score=score,
payload=payload,
state=algorithm.state,
state=new_state,
)

store.bulk_write_states(payloads, states)
Expand Down
45 changes: 25 additions & 20 deletions src/sentry/tasks/statistical_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
from sentry.statistical_detectors.algorithm import (
MovingAverageDetectorState,
DetectorAlgorithm,
MovingAverageRelativeChangeDetector,
MovingAverageRelativeChangeDetectorConfig,
)
from sentry.statistical_detectors.base import DetectorPayload
from sentry.statistical_detectors.detector import RegressionDetector
Expand Down Expand Up @@ -162,20 +161,23 @@ class EndpointRegressionDetector(RegressionDetector):
source = "transaction"
kind = "endpoint"
regression_type = RegressionType.ENDPOINT
config = MovingAverageRelativeChangeDetectorConfig(
min_data_points=6,
short_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 21),
long_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 41),
threshold=0.2,
)
state_cls = MovingAverageDetectorState
detector_cls = MovingAverageRelativeChangeDetector
min_change = 200 # 200ms in ms
resolution_rel_threshold = 0.1
escalation_rel_threshold = 0.3

@classmethod
def make_detector_store(cls) -> DetectorStore:
def detector_algorithm_factory(cls) -> DetectorAlgorithm:
return MovingAverageRelativeChangeDetector(
source=cls.source,
kind=cls.kind,
min_data_points=6,
moving_avg_short_factory=lambda: ExponentialMovingAverage(2 / 21),
moving_avg_long_factory=lambda: ExponentialMovingAverage(2 / 41),
threshold=0.2,
)

@classmethod
def detector_store_factory(cls) -> DetectorStore:
return RedisDetectorStore(regression_type=RegressionType.ENDPOINT)

@classmethod
Expand All @@ -200,20 +202,23 @@ class FunctionRegressionDetector(RegressionDetector):
source = "profile"
kind = "function"
regression_type = RegressionType.FUNCTION
config = MovingAverageRelativeChangeDetectorConfig(
min_data_points=6,
short_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 21),
long_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 41),
threshold=0.2,
)
state_cls = MovingAverageDetectorState
detector_cls = MovingAverageRelativeChangeDetector
min_change = 100_000_000 # 100ms in ns
resolution_rel_threshold = 0.1
escalation_rel_threshold = 0.3

@classmethod
def make_detector_store(cls) -> DetectorStore:
def detector_algorithm_factory(cls) -> DetectorAlgorithm:
return MovingAverageRelativeChangeDetector(
source=cls.source,
kind=cls.kind,
min_data_points=6,
moving_avg_short_factory=lambda: ExponentialMovingAverage(2 / 21),
moving_avg_long_factory=lambda: ExponentialMovingAverage(2 / 41),
threshold=0.2,
)

@classmethod
def detector_store_factory(cls) -> DetectorStore:
return RedisDetectorStore(regression_type=RegressionType.FUNCTION)

@classmethod
Expand Down
Loading