diff --git a/src/sentry/statistical_detectors/algorithm.py b/src/sentry/statistical_detectors/algorithm.py index 492834876bd325..a4e42527d0763b 100644 --- a/src/sentry/statistical_detectors/algorithm.py +++ b/src/sentry/statistical_detectors/algorithm.py @@ -6,39 +6,15 @@ from datetime import datetime, timezone from typing import Any, Callable, Mapping, MutableMapping, Optional, Tuple -from sentry.statistical_detectors.base import ( - DetectorConfig, - DetectorPayload, - DetectorState, - TrendType, -) +import sentry_sdk + +from sentry.statistical_detectors.base import DetectorPayload, DetectorState, TrendType from sentry.utils import metrics from sentry.utils.math import MovingAverage logger = logging.getLogger("sentry.tasks.statistical_detectors.algorithm") -class DetectorAlgorithm(ABC): - @abstractmethod - def __init__( - self, - source: str, - kind: str, - state: DetectorState, - config: DetectorConfig, - ): - ... - - @abstractmethod - def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]: - ... - - @property - @abstractmethod - def state(self) -> DetectorState: - ... - - @dataclass(frozen=True) class MovingAverageDetectorState(DetectorState): timestamp: Optional[datetime] @@ -111,54 +87,49 @@ def empty(cls) -> MovingAverageDetectorState: ) -@dataclass(frozen=True) -class MovingAverageDetectorConfig(DetectorConfig): - min_data_points: int - short_moving_avg_factory: Callable[[], MovingAverage] - long_moving_avg_factory: Callable[[], MovingAverage] +class DetectorAlgorithm(ABC): + @abstractmethod + def update( + self, + raw: Mapping[str | bytes, bytes | float | int | str], + payload: DetectorPayload, + ) -> Tuple[TrendType, float, Optional[DetectorState]]: + ... -class MovingAverageDetector(DetectorAlgorithm): +class MovingAverageRelativeChangeDetector(DetectorAlgorithm): def __init__( self, source: str, kind: str, - state: MovingAverageDetectorState, - config: MovingAverageDetectorConfig, + min_data_points: int, + moving_avg_short_factory: Callable[[], MovingAverage], + moving_avg_long_factory: Callable[[], MovingAverage], + threshold: float, ): self.source = source self.kind = kind + self.min_data_points = min_data_points + self.moving_avg_short_factory = moving_avg_short_factory + self.moving_avg_long_factory = moving_avg_long_factory + self.threshold = threshold - self.moving_avg_short = config.short_moving_avg_factory() - self.moving_avg_short.set(state.moving_avg_short, state.count) - - self.moving_avg_long = config.long_moving_avg_factory() - self.moving_avg_long.set(state.moving_avg_long, state.count) - - self.timestamp = state.timestamp - self.count = state.count - self.config = config - - @property - def state(self) -> MovingAverageDetectorState: - return MovingAverageDetectorState( - timestamp=self.timestamp, - count=self.count, - moving_avg_short=self.moving_avg_short.value, - moving_avg_long=self.moving_avg_long.value, - ) - - -@dataclass(frozen=True) -class MovingAverageRelativeChangeDetectorConfig(MovingAverageDetectorConfig): - threshold: float - + def update( + self, + raw_state: Mapping[str | bytes, bytes | float | int | str], + payload: DetectorPayload, + ) -> Tuple[TrendType, float, Optional[DetectorState]]: + try: + old = MovingAverageDetectorState.from_redis_dict(raw_state) + except Exception as e: + old = MovingAverageDetectorState.empty() -class MovingAverageRelativeChangeDetector(MovingAverageDetector): - config: MovingAverageRelativeChangeDetectorConfig + if raw_state: + # empty raw state implies that there was no + # previous state so no need to capture an exception + sentry_sdk.capture_exception(e) - def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]: - if self.timestamp is not None and self.timestamp > payload.timestamp: + if old.timestamp is not None and old.timestamp > payload.timestamp: # In the event that the timestamp is before the payload's timestamps, # we do not want to process this payload. # @@ -166,30 +137,34 @@ def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]: logger.warning( "Trend detection out of order. Processing %s, but last processed was %s", payload.timestamp.isoformat(), - self.timestamp.isoformat(), + old.timestamp.isoformat(), ) - return TrendType.Skipped, 0 - - old_moving_avg_short = self.moving_avg_short.value - old_moving_avg_long = self.moving_avg_long.value - - self.moving_avg_short.update(payload.value) - self.moving_avg_long.update(payload.value) - self.timestamp = payload.timestamp - self.count += 1 + return TrendType.Skipped, 0, None + + moving_avg_short = self.moving_avg_short_factory() + moving_avg_long = self.moving_avg_long_factory() + + new = MovingAverageDetectorState( + timestamp=payload.timestamp, + count=old.count + 1, + moving_avg_short=moving_avg_short.update( + old.count, old.moving_avg_short, payload.value + ), + moving_avg_long=moving_avg_long.update(old.count, old.moving_avg_long, payload.value), + ) # The heuristic isn't stable initially, so ensure we have a minimum # number of data points before looking for a regression. - stablized = self.count > self.config.min_data_points + stablized = new.count > self.min_data_points - score = abs(self.moving_avg_short.value - self.moving_avg_long.value) + score = abs(new.moving_avg_short - new.moving_avg_long) try: - relative_change_old = (old_moving_avg_short - old_moving_avg_long) / abs( - old_moving_avg_long + relative_change_old = (old.moving_avg_short - old.moving_avg_long) / abs( + old.moving_avg_long ) - relative_change_new = (self.moving_avg_short.value - self.moving_avg_long.value) / abs( - self.moving_avg_long.value + relative_change_new = (new.moving_avg_short - new.moving_avg_long) / abs( + new.moving_avg_long ) metrics.distribution( @@ -203,16 +178,16 @@ def update(self, payload: DetectorPayload) -> Tuple[TrendType, float]: if ( stablized - and relative_change_old < self.config.threshold - and relative_change_new > self.config.threshold + and relative_change_old < self.threshold + and relative_change_new > self.threshold ): - return TrendType.Regressed, score + return TrendType.Regressed, score, new elif ( stablized - and relative_change_old > -self.config.threshold - and relative_change_new < -self.config.threshold + and relative_change_old > -self.threshold + and relative_change_new < -self.threshold ): - return TrendType.Improved, score + return TrendType.Improved, score, new - return TrendType.Unchanged, score + return TrendType.Unchanged, score, new diff --git a/src/sentry/statistical_detectors/base.py b/src/sentry/statistical_detectors/base.py index 80c8282fd92d36..5682d173e1476d 100644 --- a/src/sentry/statistical_detectors/base.py +++ b/src/sentry/statistical_detectors/base.py @@ -49,8 +49,3 @@ def should_escalate( @abstractmethod def empty(cls) -> DetectorState: ... - - -@dataclass(frozen=True) -class DetectorConfig(ABC): - ... diff --git a/src/sentry/statistical_detectors/detector.py b/src/sentry/statistical_detectors/detector.py index 464e9cf22b1480..a9c698a726fc4c 100644 --- a/src/sentry/statistical_detectors/detector.py +++ b/src/sentry/statistical_detectors/detector.py @@ -23,12 +23,7 @@ from sentry.search.events.fields import get_function_alias from sentry.seer.utils import BreakpointData, detect_breakpoints from sentry.statistical_detectors.algorithm import DetectorAlgorithm -from sentry.statistical_detectors.base import ( - DetectorConfig, - DetectorPayload, - DetectorState, - TrendType, -) +from sentry.statistical_detectors.base import DetectorPayload, DetectorState, TrendType from sentry.statistical_detectors.issue_platform_adapter import fingerprint_regression from sentry.statistical_detectors.store import DetectorStore from sentry.utils import metrics @@ -50,16 +45,18 @@ class RegressionDetector(ABC): source: str kind: str regression_type: RegressionType - config: DetectorConfig - state_cls: type[DetectorState] - detector_cls: type[DetectorAlgorithm] min_change: int resolution_rel_threshold: float escalation_rel_threshold: float @classmethod @abstractmethod - def make_detector_store(cls) -> DetectorStore: + def detector_algorithm_factory(cls) -> DetectorAlgorithm: + ... + + @classmethod + @abstractmethod + def detector_store_factory(cls) -> DetectorStore: ... @classmethod @@ -96,7 +93,8 @@ def detect_trends( regressed_count = 0 improved_count = 0 - store = cls.make_detector_store() + algorithm = cls.detector_algorithm_factory() + store = cls.detector_store_factory() for payloads in chunked(cls.all_payloads(projects, start), 100): total_count += len(payloads) @@ -106,37 +104,22 @@ def detect_trends( states = [] for raw_state, payload in zip(raw_states, payloads): - try: - state = cls.state_cls.from_redis_dict(raw_state) - except Exception as e: - state = cls.state_cls.empty() - - if raw_state: - # empty raw state implies that there was no - # previous state so no need to capture an exception - sentry_sdk.capture_exception(e) - - algorithm = cls.detector_cls(cls.source, cls.kind, state, cls.config) - trend_type, score = algorithm.update(payload) - - # the trend type can be None if no update happened, - # pass None to indicate we do not need up update the state - states.append( - None if trend_type == TrendType.Skipped else algorithm.state.to_redis_dict() - ) + unique_project_ids.add(payload.project_id) + + trend_type, score, new_state = algorithm.update(raw_state, payload) if trend_type == TrendType.Regressed: regressed_count += 1 elif trend_type == TrendType.Improved: improved_count += 1 - unique_project_ids.add(payload.project_id) + states.append(None if new_state is None else new_state.to_redis_dict()) yield TrendBundle( type=trend_type, score=score, payload=payload, - state=algorithm.state, + state=new_state, ) store.bulk_write_states(payloads, states) diff --git a/src/sentry/tasks/statistical_detectors.py b/src/sentry/tasks/statistical_detectors.py index 275b236dadea34..89a2f87f7ca083 100644 --- a/src/sentry/tasks/statistical_detectors.py +++ b/src/sentry/tasks/statistical_detectors.py @@ -41,9 +41,8 @@ from sentry.snuba.metrics.naming_layer.mri import TransactionMRI from sentry.snuba.referrer import Referrer from sentry.statistical_detectors.algorithm import ( - MovingAverageDetectorState, + DetectorAlgorithm, MovingAverageRelativeChangeDetector, - MovingAverageRelativeChangeDetectorConfig, ) from sentry.statistical_detectors.base import DetectorPayload from sentry.statistical_detectors.detector import RegressionDetector @@ -162,20 +161,23 @@ class EndpointRegressionDetector(RegressionDetector): source = "transaction" kind = "endpoint" regression_type = RegressionType.ENDPOINT - config = MovingAverageRelativeChangeDetectorConfig( - min_data_points=6, - short_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 21), - long_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 41), - threshold=0.2, - ) - state_cls = MovingAverageDetectorState - detector_cls = MovingAverageRelativeChangeDetector min_change = 200 # 200ms in ms resolution_rel_threshold = 0.1 escalation_rel_threshold = 0.3 @classmethod - def make_detector_store(cls) -> DetectorStore: + def detector_algorithm_factory(cls) -> DetectorAlgorithm: + return MovingAverageRelativeChangeDetector( + source=cls.source, + kind=cls.kind, + min_data_points=6, + moving_avg_short_factory=lambda: ExponentialMovingAverage(2 / 21), + moving_avg_long_factory=lambda: ExponentialMovingAverage(2 / 41), + threshold=0.2, + ) + + @classmethod + def detector_store_factory(cls) -> DetectorStore: return RedisDetectorStore(regression_type=RegressionType.ENDPOINT) @classmethod @@ -200,20 +202,23 @@ class FunctionRegressionDetector(RegressionDetector): source = "profile" kind = "function" regression_type = RegressionType.FUNCTION - config = MovingAverageRelativeChangeDetectorConfig( - min_data_points=6, - short_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 21), - long_moving_avg_factory=lambda: ExponentialMovingAverage(2 / 41), - threshold=0.2, - ) - state_cls = MovingAverageDetectorState - detector_cls = MovingAverageRelativeChangeDetector min_change = 100_000_000 # 100ms in ns resolution_rel_threshold = 0.1 escalation_rel_threshold = 0.3 @classmethod - def make_detector_store(cls) -> DetectorStore: + def detector_algorithm_factory(cls) -> DetectorAlgorithm: + return MovingAverageRelativeChangeDetector( + source=cls.source, + kind=cls.kind, + min_data_points=6, + moving_avg_short_factory=lambda: ExponentialMovingAverage(2 / 21), + moving_avg_long_factory=lambda: ExponentialMovingAverage(2 / 41), + threshold=0.2, + ) + + @classmethod + def detector_store_factory(cls) -> DetectorStore: return RedisDetectorStore(regression_type=RegressionType.FUNCTION) @classmethod diff --git a/src/sentry/utils/math.py b/src/sentry/utils/math.py index a7825ccc2df8df..af40ba24779ba6 100644 --- a/src/sentry/utils/math.py +++ b/src/sentry/utils/math.py @@ -47,37 +47,18 @@ def nice_int(x): class MovingAverage(ABC): - def __init__(self): - self.value: float = 0 - self.n: int = 0 - - def set(self, value: float, n: int) -> None: - self.value = value - self.n = n - - def update(self, x: float) -> None: - if self.n == 0: - self.value = x - else: - weight = self.get_weight(self.n + 1) - self.value = x * weight + self.value * (1 - weight) - self.n += 1 - @abstractmethod - def get_weight(self, n: int) -> float: + def update(self, n: int, avg: float, value: float) -> float: raise NotImplementedError -class SimpleMovingAverage(MovingAverage): - def get_weight(self, n: int) -> float: - return 1 / n - - class ExponentialMovingAverage(MovingAverage): def __init__(self, weight: float): super().__init__() - assert weight < 1 + assert 0 < weight and weight < 1 self.weight = weight - def get_weight(self, n: int) -> float: - return self.weight + def update(self, n: int, avg: float, value: float) -> float: + if n == 0: + return value + return value * self.weight + avg * (1 - self.weight) diff --git a/tests/sentry/statistical_detectors/test_algorithm.py b/tests/sentry/statistical_detectors/test_algorithm.py index bac3b17299f045..303db55ee12d4c 100644 --- a/tests/sentry/statistical_detectors/test_algorithm.py +++ b/tests/sentry/statistical_detectors/test_algorithm.py @@ -1,11 +1,13 @@ +from __future__ import annotations + from datetime import datetime, timedelta, timezone +from typing import Mapping import pytest from sentry.statistical_detectors.algorithm import ( MovingAverageDetectorState, MovingAverageRelativeChangeDetector, - MovingAverageRelativeChangeDetectorConfig, ) from sentry.statistical_detectors.base import DetectorPayload, TrendType from sentry.utils.math import ExponentialMovingAverage @@ -234,7 +236,7 @@ def test_moving_average_detector_state_should_escalate( @pytest.mark.parametrize( - ["min_data_points", "short_moving_avg_factory", "long_moving_avg_factory", "threshold"], + ["min_data_points", "moving_avg_short_factory", "moving_avg_long_factory", "threshold"], [ pytest.param( 6, @@ -275,8 +277,8 @@ def test_moving_average_detector_state_should_escalate( ) def test_moving_average_relative_change_detector( min_data_points, - short_moving_avg_factory, - long_moving_avg_factory, + moving_avg_long_factory, + moving_avg_short_factory, threshold, values, regressed_indices, @@ -285,7 +287,7 @@ def test_moving_average_relative_change_detector( all_regressed = [] all_improved = [] - now = datetime.now() + now = datetime(2023, 8, 31, 11, 28, 52, tzinfo=timezone.utc) payloads = [ DetectorPayload( @@ -302,18 +304,19 @@ def test_moving_average_relative_change_detector( detector = MovingAverageRelativeChangeDetector( "transaction", "endpoint", - MovingAverageDetectorState.empty(), - MovingAverageRelativeChangeDetectorConfig( - min_data_points=min_data_points, - short_moving_avg_factory=short_moving_avg_factory, - long_moving_avg_factory=long_moving_avg_factory, - threshold=threshold, - ), + min_data_points=min_data_points, + moving_avg_short_factory=moving_avg_short_factory, + moving_avg_long_factory=moving_avg_long_factory, + threshold=threshold, ) + raw_state: Mapping[str | bytes, bytes | float | int | str] = {} + for payload in payloads: - trend_type, score = detector.update(payload) + trend_type, score, state = detector.update(raw_state, payload) assert score >= 0 + if state is not None: + raw_state = state.to_redis_dict() if trend_type == TrendType.Regressed: all_regressed.append(payload) elif trend_type == TrendType.Improved: diff --git a/tests/sentry/tasks/test_statistical_detectors.py b/tests/sentry/tasks/test_statistical_detectors.py index 0616af0822b4d6..e5714399feaf93 100644 --- a/tests/sentry/tasks/test_statistical_detectors.py +++ b/tests/sentry/tasks/test_statistical_detectors.py @@ -20,6 +20,7 @@ from sentry.sentry_metrics.use_case_id_registry import UseCaseID from sentry.snuba.discover import zerofill from sentry.snuba.metrics.naming_layer.mri import TransactionMRI +from sentry.statistical_detectors.algorithm import MovingAverageDetectorState from sentry.statistical_detectors.base import DetectorPayload, TrendType from sentry.statistical_detectors.detector import TrendBundle, generate_fingerprint from sentry.tasks.statistical_detectors import ( @@ -911,7 +912,7 @@ def get_trends(): value=100, timestamp=timestamp - timedelta(hours=1), ) - state = detector_cls.state_cls( + state = MovingAverageDetectorState( timestamp=timestamp - timedelta(hours=1), count=100, moving_avg_short=100, diff --git a/tests/sentry/utils/test_math.py b/tests/sentry/utils/test_math.py index 9dfa9bc1405382..b6f409350f37fb 100644 --- a/tests/sentry/utils/test_math.py +++ b/tests/sentry/utils/test_math.py @@ -1,6 +1,6 @@ import pytest -from sentry.utils.math import ExponentialMovingAverage, SimpleMovingAverage, nice_int +from sentry.utils.math import ExponentialMovingAverage, nice_int def linspace(start, stop, n): @@ -54,22 +54,6 @@ def test_nice_int(start, stop, expected): ) -@pytest.mark.parametrize( - "sequence,expected", - [ - ([], 0), - ([1], 1), - ([1 for _ in range(10)], 1), - ([i for i in range(10)], 4.5), - ], -) -def test_simple_moving_average(sequence, expected): - avg = SimpleMovingAverage() - for x in sequence: - avg.update(x) - assert avg.value == pytest.approx(expected) - - @pytest.mark.parametrize( "sequence,expected", [ @@ -81,6 +65,7 @@ def test_simple_moving_average(sequence, expected): ) def test_exponential_moving_average(sequence, expected): avg = ExponentialMovingAverage(2 / 11) - for x in sequence: - avg.update(x) - assert avg.value == pytest.approx(expected, abs=1e-3) + t = 0.0 + for i, x in enumerate(sequence): + t = avg.update(i, t, x) + assert t == pytest.approx(expected, abs=1e-3)