From 213187ac7f45fcc71a2254273c2946e308f36be1 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 27 Mar 2026 18:34:05 -0700 Subject: [PATCH 1/9] Refactor metrics backend --- .../sentry_streams/metrics/__init__.py | 2 + .../sentry_streams/metrics/metrics.py | 332 ++++++++-------- sentry_streams/sentry_streams/runner.py | 22 +- sentry_streams/tests/pipeline/test_metrics.py | 368 ++++++++++-------- 4 files changed, 364 insertions(+), 360 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/__init__.py b/sentry_streams/sentry_streams/metrics/__init__.py index be7510c9..9c16987c 100644 --- a/sentry_streams/sentry_streams/metrics/__init__.py +++ b/sentry_streams/sentry_streams/metrics/__init__.py @@ -5,6 +5,7 @@ LogMetricsBackend, Metric, Metrics, + MetricsBackend, configure_metrics, get_metrics, get_size, @@ -19,5 +20,6 @@ "METRICS_PREFIX", "Metric", "Metrics", + "MetricsBackend", "get_size", ] diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 89d3fe88..4214532c 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -6,7 +6,6 @@ from enum import Enum from typing import ( Any, - Iterable, Mapping, Optional, Protocol, @@ -15,7 +14,6 @@ ) from arroyo.utils.metric_defs import MetricName as ArroyoMetricName -from arroyo.utils.metrics import DummyMetricsBackend as ArroyoDummyMetricsBackend from arroyo.utils.metrics import configure_metrics as arroyo_configure_metrics from datadog.dogstatsd.base import DogStatsd @@ -55,13 +53,14 @@ class Metric(Enum): ERRORS = "errors" -@runtime_checkable -class Metrics(Protocol): +class Metrics: """ An abstract class that defines the interface for metrics backends. """ - @abstractmethod + def __init__(self, backend: MetricsBackend) -> None: + self.__backend = backend + def increment( self, name: Metric, @@ -71,131 +70,179 @@ def increment( """ Increments a counter metric by a given value. """ - raise NotImplementedError + self.__backend.increment(name.value, value, tags=tags) - @abstractmethod def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Sets a gauge metric to the given value. """ - raise NotImplementedError + self.__backend.gauge(name.value, value, tags=tags) - @abstractmethod def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ Records a timing metric. """ + self.__backend.timing(name.value, value, tags=tags) + + +@runtime_checkable +class MetricsBackend(Protocol): + """ + An abstract class that defines the interface for metrics backends. + """ + + @abstractmethod + def increment( + self, + name: str, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + ) -> None: + """ + Increments a counter metric by a given value. + """ raise NotImplementedError @abstractmethod - def add_global_tags(self, tags: Tags) -> None: + def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ - Adds global tags to the metrics. + Sets a gauge metric to the given value. """ raise NotImplementedError @abstractmethod - def remove_global_tags(self, tags: Tags) -> None: + def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: """ - Removes global tags from the metrics. + Records a timing metric. """ raise NotImplementedError -class DummyMetricsBackend(Metrics): +class DummyMetricsBackend(MetricsBackend): """ Default metrics backend that does not record anything. """ def increment( self, - name: Metric, + name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None, ) -> None: pass - def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - pass - - def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - pass - - def add_global_tags(self, tags: Tags) -> None: + def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: pass - def remove_global_tags(self, tags: Tags) -> None: + def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: pass -BufferedMetric = tuple[Metric, float, list[str] | None] +def _combine_tags(base: Tags, tags: Optional[Tags] = None) -> Tags: + if tags is None: + return base + return { + **base, + **tags, + } -@runtime_checkable -class MetricsFlusher(Protocol): +class DatadogMetricsBackend(MetricsBackend): """ - Protocol for flushing buffered metrics to a destination (e.g. Datadog or log). + Datadog metrics backend. """ - def flush( + def __init__( self, - timers: Iterable[BufferedMetric], - counters: Iterable[BufferedMetric], - gauges: Iterable[BufferedMetric], + host: str, + port: int, + tags: Optional[Tags] = None, + udp_queue_size: Optional[int] = None, ) -> None: - """Write the buffered metrics to the destination.""" - ... + # Do not pass constant_tags to DogStatsd: BufferedMetricsBackend already + # adds tags to each metric. Passing both would duplicate tags. + self.datadog_client = DogStatsd( + host=host, + port=port, + namespace=METRICS_PREFIX.strip("."), + constant_tags=[], + ) + # ignore mypy because that method just is untyped, yet part of public API + self.datadog_client.enable_background_sender( # type: ignore[no-untyped-call] + sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE, + sender_queue_timeout=SENDER_QUEUE_TIMEOUT, + ) + self.__tags: Tags = tags if tags is not None else {} + def __normalize_tags(self, tags: Tags) -> list[str]: + return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] -class DatadogFlusher: - """Flusher that sends buffered metrics to a DogStatsd client.""" + def increment( + self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + ) -> None: + self.datadog_client.increment( + name, + value, + tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, + ) - def __init__(self, client: DogStatsd) -> None: - self.__client = client + def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.datadog_client.gauge( + name, + value, + tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, + ) + + def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.datadog_client.timing( + name, + value, + tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, + ) - def flush( - self, - timers: Iterable[BufferedMetric], - counters: Iterable[BufferedMetric], - gauges: Iterable[BufferedMetric], - ) -> None: - for name, value, tags in timers: - self.__client.timing(name.value, value, tags=tags or []) - for name, value, tags in counters: - self.__client.increment(name.value, value, tags=tags or []) - for name, value, tags in gauges: - self.__client.gauge(name.value, value, tags=tags or []) +class LogMetricsBackend(MetricsBackend): + """ + Metrics backend that logs each update immediately, using the same segment format + as LogFlusher: ``prefix | counter|gauge|timing name=value tag1:val1 ...``. + """ -class LogFlusher: - """Flusher that writes buffered metrics to a logger.""" + def __init__(self, period_sec: float, tags: Optional[Tags] = None) -> None: + self.__prefix = METRICS_PREFIX.strip(".") + self.__base_tags: Tags = tags if tags is not None else {} + self._period_sec = period_sec - def __init__(self, prefix: str) -> None: - self.__prefix = prefix + @staticmethod + def __normalize_tags(tags: Tags) -> list[str]: + return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] - def flush( - self, - timers: Iterable[BufferedMetric], - counters: Iterable[BufferedMetric], - gauges: Iterable[BufferedMetric], + def __tag_strings(self, tags: Optional[Tags]) -> list[str]: + return self.__normalize_tags(_combine_tags(self.__base_tags, tags)) + + def __emit( + self, kind: str, name: str, value: Union[int, float], tags: Optional[Tags] = None ) -> None: - parts: list[str] = [self.__prefix] - for name, value, tags in timers: - tags_str = " ".join(tags) if tags else "" - parts.append(f"timing {name.value}={value} {tags_str}".strip()) - for name, value, tags in counters: - tags_str = " ".join(tags) if tags else "" - parts.append(f"counter {name.value}={value} {tags_str}".strip()) - for name, value, tags in gauges: - tags_str = " ".join(tags) if tags else "" - parts.append(f"gauge {name.value}={value} {tags_str}".strip()) - if len(parts) > 1: - logger.info(" | ".join(parts)) - else: - logger.info("No metrics to flush") + tag_list = self.__tag_strings(tags) + tags_str = " ".join(tag_list) if tag_list else "" + parts = [self.__prefix, f"{kind} {name}={value} {tags_str}".strip()] + logger.info(" | ".join(parts)) + def increment( + self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None + ) -> None: + self.__emit("counter", name, value, tags) -class BufferedMetricsBackend(Metrics): + def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.__emit("gauge", name, value, tags) + + def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.__emit("timing", name, value, tags) + + +BufferedMetric = tuple[str, float, Tags] + + +class BufferedMetricsBackend(MetricsBackend): """ Metrics backend that buffers updates and periodically flushes them via an injected flusher (e.g. Datadog or log). @@ -203,84 +250,68 @@ class BufferedMetricsBackend(Metrics): def __init__( self, - flusher: MetricsFlusher, + backend: MetricsBackend, throttle_interval_sec: float, - tags: Optional[Tags] = None, ) -> None: - self.__flusher = flusher self.__throttle_interval_sec = throttle_interval_sec - self.tags = tags - self.__normalized_tags = self.__normalize_tags(tags) if tags is not None else [] self.__timers: dict[int, BufferedMetric] = {} self.__counters: dict[int, BufferedMetric] = {} self.__gauges: dict[int, BufferedMetric] = {} self.__last_flush_time = 0.0 + self.__backend = backend def __add_to_buffer( self, buffer: dict[int, BufferedMetric], - name: Metric, + name: str, value: Union[int, float], - tags: Optional[Tags] = None, + tags: Tags, replace: bool = False, ) -> None: if tags is None: key = hash(name) - normalized_tags = self.__normalized_tags else: - normalized_tags = self.__normalize_tags(tags) + self.__normalized_tags + normalized_tags = self.__normalize_tags(tags) key = hash((name, frozenset(normalized_tags))) if key in buffer: new_value = buffer[key][1] + value if not replace else value - buffer[key] = (name, new_value, normalized_tags) + buffer[key] = (name, new_value, tags) else: - buffer[key] = (name, value, normalized_tags) + buffer[key] = (name, value, tags) def __normalize_tags(self, tags: Tags) -> list[str]: return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] def increment( self, - name: Metric, + name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None, ) -> None: - self.__add_to_buffer(self.__counters, name, value, tags) + self.__add_to_buffer(self.__counters, name, value, tags or {}) self.__throttled_flush() - def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - self.__add_to_buffer(self.__gauges, name, value, tags, replace=True) + def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.__add_to_buffer(self.__gauges, name, value, tags or {}, replace=True) self.__throttled_flush() - def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - self.__add_to_buffer(self.__timers, name, value, tags) + def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: + self.__add_to_buffer(self.__timers, name, value, tags or {}) self.__throttled_flush() - def add_global_tags(self, tags: Tags) -> None: - if self.tags is None: - self.tags = tags - else: - self.tags.update(tags) - - self.__normalized_tags = self.__normalize_tags(self.tags) - - def remove_global_tags(self, tags: Tags) -> None: - if self.tags: - for tag in tags: - self.tags.pop(tag, None) - self.__normalized_tags = self.__normalize_tags(self.tags) - def __throttled_flush(self) -> None: if time.time() - self.__last_flush_time >= self.__throttle_interval_sec: self.flush() def flush(self) -> None: - self.__flusher.flush( - self.__timers.values(), - self.__counters.values(), - self.__gauges.values(), - ) + for name, value, tags in self.__timers.values(): + self.__backend.timing(name, value, tags=tags) + for name, value, tags in self.__counters.values(): + self.__backend.increment(name, value, tags=tags) + for name, value, tags in self.__gauges.values(): + self.__backend.gauge(name, value, tags=tags) + self.__reset() def __reset(self) -> None: @@ -290,61 +321,19 @@ def __reset(self) -> None: self.__last_flush_time = time.time() -class DatadogMetricsBackend(BufferedMetricsBackend): - """ - Datadog metrics backend. Buffers metrics and flushes to DogStatsd. - """ - - def __init__( - self, - host: str, - port: int, - prefix: str, - tags: Optional[Tags] = None, - udp_queue_size: Optional[int] = None, - ) -> None: - # Do not pass constant_tags to DogStatsd: BufferedMetricsBackend already - # adds tags to each metric. Passing both would duplicate tags. - self.datadog_client = DogStatsd( - host=host, - port=port, - namespace=prefix.strip("."), - constant_tags=[], - ) - # ignore mypy because that method just is untyped, yet part of public API - self.datadog_client.enable_background_sender( # type: ignore[no-untyped-call] - sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE, - sender_queue_timeout=SENDER_QUEUE_TIMEOUT, - ) - flusher = DatadogFlusher(self.datadog_client) - super().__init__(flusher, METRICS_FREQUENCY_SEC, tags) - - -class LogMetricsBackend(BufferedMetricsBackend): - """ - Metrics backend that buffers metrics and periodically logs accumulated - values at a configurable interval. - """ +def _tags_from_mapping(tags: Optional[Mapping[str, str]]) -> Tags: + if not tags: + return {} + return dict(tags) - def __init__( - self, - period_sec: float, - tags: Optional[Tags] = None, - ) -> None: - flusher = LogFlusher(METRICS_PREFIX.strip(".")) - super().__init__(flusher, period_sec, tags) - -class ArroyoDatadogMetricsBackend: +class ArroyoMetricsBackend: """ - Arroyo wrapper around Datadog metrics backend. + Facade that adapts Arroyo metric names and tag mappings to MetricsBackend. """ - def __init__(self, datadog_client: DogStatsd) -> None: - self.__datadog_client = datadog_client - - def __normalize_tags(self, tags: Mapping[str, str]) -> list[str]: - return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] + def __init__(self, backend: MetricsBackend) -> None: + self.__backend = backend def increment( self, @@ -352,9 +341,7 @@ def increment( value: Union[int, float] = 1, tags: Optional[Mapping[str, str]] = None, ) -> None: - self.__datadog_client.increment( - name, value, tags=self.__normalize_tags(tags) if tags else None - ) + self.__backend.increment(name, value, tags=_tags_from_mapping(tags)) def gauge( self, @@ -362,7 +349,7 @@ def gauge( value: Union[int, float], tags: Optional[Mapping[str, str]] = None, ) -> None: - self.__datadog_client.gauge(name, value, tags=self.__normalize_tags(tags) if tags else None) + self.__backend.gauge(name, value, tags=_tags_from_mapping(tags)) def timing( self, @@ -370,16 +357,14 @@ def timing( value: Union[int, float], tags: Optional[Mapping[str, str]] = None, ) -> None: - self.__datadog_client.timing( - name, value, tags=self.__normalize_tags(tags) if tags else None - ) + self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) -_metrics_backend: Optional[Metrics] = None +_metrics_backend: Optional[MetricsBackend] = None _dummy_metrics_backend = DummyMetricsBackend() -def configure_metrics(metrics: Metrics, force: bool = False) -> None: +def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None: """ Metrics can generally only be configured once, unless force is passed on subsequent initializations. @@ -391,18 +376,15 @@ def configure_metrics(metrics: Metrics, force: bool = False) -> None: # Perform a runtime check of metrics instance upon initialization of # this class to avoid errors down the line when it is used. - assert isinstance(metrics, Metrics) + assert isinstance(metrics, MetricsBackend) _metrics_backend = metrics - if isinstance(metrics, DatadogMetricsBackend): - arroyo_configure_metrics(ArroyoDatadogMetricsBackend(metrics.datadog_client)) - else: - arroyo_configure_metrics(ArroyoDummyMetricsBackend()) + arroyo_configure_metrics(ArroyoMetricsBackend(metrics)) def get_metrics() -> Metrics: if _metrics_backend is None: - return _dummy_metrics_backend - return _metrics_backend + return Metrics(_dummy_metrics_backend) + return Metrics(_metrics_backend) def get_size(obj: Any) -> int | None: diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 1f929e1d..381cd9ec 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -13,11 +13,10 @@ StreamT, ) from sentry_streams.metrics import ( - METRICS_PREFIX, DatadogMetricsBackend, DummyMetricsBackend, LogMetricsBackend, - Metrics, + MetricsBackend, configure_metrics, ) from sentry_streams.pipeline.config import load_config @@ -106,19 +105,18 @@ def load_runtime( validate_all_branches_have_sinks(pipeline) metric_config = environment_config.get("metrics", {}) - metrics: Metrics + metrics_backend: MetricsBackend if metric_config.get("type") == "datadog": default_tags = metric_config.get("tags", {}) default_tags["pipeline"] = name - metrics = DatadogMetricsBackend( + metrics_backend = DatadogMetricsBackend( metric_config["host"], metric_config["port"], - METRICS_PREFIX, - default_tags, - metric_config.get("udp_queue_size"), + tags=default_tags, + udp_queue_size=metric_config.get("udp_queue_size"), ) - configure_metrics(metrics) + configure_metrics(metrics_backend) metric_config = { "host": metric_config["host"], "port": metric_config["port"], @@ -130,15 +128,15 @@ def load_runtime( default_tags = metric_config.get("tags", {}) default_tags["pipeline"] = name - metrics = LogMetricsBackend( + metrics_backend = LogMetricsBackend( period_sec=metric_config["period_sec"], tags=default_tags, ) - configure_metrics(metrics) + configure_metrics(metrics_backend) metric_config = {} else: - metrics = DummyMetricsBackend() - configure_metrics(metrics) + metrics_backend = DummyMetricsBackend() + configure_metrics(metrics_backend) metric_config = {} assigned_segment_id = int(segment_id) if segment_id else None diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index 6807728b..edb493e4 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -1,22 +1,28 @@ from typing import Any -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch import pytest from sentry_streams.metrics.metrics import ( METRICS_FREQUENCY_SEC, METRICS_PREFIX, - ArroyoDatadogMetricsBackend, + ArroyoMetricsBackend, + BufferedMetricsBackend, DatadogMetricsBackend, DummyMetricsBackend, LogMetricsBackend, Metric, + Metrics, configure_metrics, get_metrics, get_size, ) +def _metric(name: Metric) -> str: + return name.value + + class TestMetric: def test_metric_enum_values(self) -> None: assert Metric.INPUT_MESSAGES.value == "input.messages" @@ -30,292 +36,300 @@ def test_metric_enum_values(self) -> None: class TestDummyMetricsBackend: def test_increment(self) -> None: backend = DummyMetricsBackend() - backend.increment(Metric.INPUT_MESSAGES, 5) - backend.increment(Metric.INPUT_MESSAGES, tags={"key": "value"}) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), tags={"key": "value"}) def test_gauge(self) -> None: backend = DummyMetricsBackend() - backend.gauge(Metric.INPUT_BYTES, 100) - backend.gauge(Metric.INPUT_BYTES, 200.5, tags={"key": "value"}) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200.5, tags={"key": "value"}) def test_timing(self) -> None: backend = DummyMetricsBackend() - backend.timing(Metric.DURATION, 1000) - backend.timing(Metric.DURATION, 1500.5, tags={"key": "value"}) + backend.timing(_metric(Metric.DURATION), 1000) + backend.timing(_metric(Metric.DURATION), 1500.5, tags={"key": "value"}) - def test_add_global_tags(self) -> None: - backend = DummyMetricsBackend() - backend.add_global_tags({"env": "test"}) - def test_remove_global_tags(self) -> None: - backend = DummyMetricsBackend() - backend.remove_global_tags({"env": "test"}) +class TestMetricsFacade: + def test_delegates_to_backend(self) -> None: + inner = MagicMock(spec=DummyMetricsBackend) + facade = Metrics(inner) + facade.increment(Metric.INPUT_MESSAGES, 5, tags={"k": "v"}) + inner.increment.assert_called_once_with("input.messages", 5, tags={"k": "v"}) + facade.gauge(Metric.INPUT_BYTES, 42) + inner.gauge.assert_called_once_with("input.bytes", 42, tags=None) + facade.timing(Metric.DURATION, 10.0, tags={}) + inner.timing.assert_called_once_with("duration", 10.0, tags={}) class TestDatadogMetricsBackend: @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_init_with_prefix_dot(self, mock_dogstatsd: Any) -> None: - DatadogMetricsBackend("localhost", 8125, "test.") + def test_init_namespace_is_metrics_prefix(self, mock_dogstatsd: Any) -> None: + DatadogMetricsBackend("localhost", 8125) mock_dogstatsd.assert_called_once_with( host="localhost", port=8125, - namespace="test", + namespace=METRICS_PREFIX.strip("."), constant_tags=[], ) @patch("sentry_streams.metrics.metrics.DogStatsd") def test_init_with_tags(self, mock_dogstatsd: Any) -> None: tags = {"env": "production", "service": "streams"} - DatadogMetricsBackend("localhost", 8125, "test", tags) - - # Tags are applied by BufferedMetricsBackend per metric, not as constant_tags + DatadogMetricsBackend("localhost", 8125, tags=tags) mock_dogstatsd.assert_called_once_with( host="localhost", port=8125, - namespace="test", + namespace=METRICS_PREFIX.strip("."), constant_tags=[], ) @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_without_auto_flush(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.return_value = 0.0 - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_increment(self, mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - backend.increment(Metric.INPUT_MESSAGES, 5) - - mock_client.increment.assert_not_called() - - backend.flush() + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - mock_client.increment.assert_called_once_with("input.messages", 5, tags=[]) + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_with_throttling(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2] - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_increment_with_tags(self, mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - backend.increment(Metric.INPUT_MESSAGES, 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "test"}) - mock_client.increment.assert_called_once_with("input.messages", 5, tags=[]) + mock_client.increment.assert_called_once_with("input.messages", 1, tags=["env:test"]) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_increment_with_tags(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_increment_merges_constructor_tags(self, mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) mock_client = mock_dogstatsd.return_value - tags = {"env": "test"} - backend.increment(Metric.INPUT_MESSAGES, 1, tags) - backend.flush() + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "test"}) - mock_client.increment.assert_called_once_with("input.messages", 1, tags=["env:test"]) + called_tags = mock_client.increment.call_args[1]["tags"] + assert set(called_tags) == {"service:streams", "env:test"} @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_accumulation(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.return_value = 0.0 - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_gauge(self, mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - backend.increment(Metric.INPUT_MESSAGES, 5) - backend.increment(Metric.INPUT_MESSAGES, 3) - backend.flush() + backend.gauge(_metric(Metric.INPUT_BYTES), 100) - mock_client.increment.assert_called_once_with("input.messages", 8, tags=[]) + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_gauge(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_timing(self, mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - backend.gauge(Metric.INPUT_BYTES, 100) - backend.flush() + backend.timing(_metric(Metric.DURATION), 1500) + + mock_client.timing.assert_called_once_with("duration", 1500, tags=None) - mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=[]) +class TestBufferedMetricsBackend: @patch("sentry_streams.metrics.metrics.DogStatsd") @patch("time.time") - def test_gauge_replacement(self, mock_time: Any, mock_dogstatsd: Any) -> None: + def test_increment_without_auto_flush(self, mock_time: Any, mock_dogstatsd: Any) -> None: mock_time.return_value = 0.0 - backend = DatadogMetricsBackend("localhost", 8125, "test") + inner = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + + mock_client.increment.assert_not_called() - backend.gauge(Metric.INPUT_BYTES, 100) - backend.gauge(Metric.INPUT_BYTES, 200) backend.flush() - mock_client.gauge.assert_called_once_with("input.bytes", 200, tags=[]) + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_timing(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + @patch("time.time") + def test_increment_with_throttling(self, mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2] + inner = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.timing(Metric.DURATION, 1500) - backend.flush() + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - mock_client.timing.assert_called_once_with("duration", 1500, tags=[]) + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_add_global_tags_new(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + @patch("time.time") + def test_increment_accumulation(self, mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - tags = {"env": "production"} + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.add_global_tags(tags) - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), 3) backend.flush() - assert backend.tags == tags - mock_client.increment.assert_called_once_with("input.messages", 1, tags=["env:production"]) + mock_client.increment.assert_called_once_with("input.messages", 8, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_add_global_tags_existing(self, mock_dogstatsd: Any) -> None: - initial_tags = {"service": "streams"} - backend = DatadogMetricsBackend("localhost", 8125, "test", initial_tags) + def test_gauge(self, mock_dogstatsd: Any) -> None: + inner = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value - new_tags = {"env": "production"} + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - backend.add_global_tags(new_tags) - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) backend.flush() - assert backend.tags == {"service": "streams", "env": "production"} - mock_client.increment.assert_called_once() - called_args = mock_client.increment.call_args - assert "service:streams" in called_args[1]["tags"] - assert "env:production" in called_args[1]["tags"] + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_remove_global_tags(self, mock_dogstatsd: Any) -> None: - initial_tags = {"service": "streams", "env": "production"} - backend = DatadogMetricsBackend("localhost", 8125, "test", initial_tags) + @patch("time.time") + def test_gauge_replacement(self, mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125) mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.remove_global_tags({"env": "production"}) - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200) backend.flush() - assert backend.tags == {"service": "streams"} - mock_client.increment.assert_called_once_with("input.messages", 1, tags=["service:streams"]) + mock_client.gauge.assert_called_once_with("input.bytes", 200, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_remove_global_tags_nonexistent(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + def test_flush_all_metric_types(self, mock_dogstatsd: Any) -> None: + inner = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - backend.remove_global_tags({"nonexistent": "tag"}) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.timing(_metric(Metric.DURATION), 1000) - assert backend.tags is None + backend.flush() + + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) + mock_client.timing.assert_called_once_with("duration", 1000, tags=None) @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_flush_all_metric_types(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + @patch("time.time") + def test_buffered_wraps_datadog_with_constructor_tags( + self, mock_time: Any, mock_dogstatsd: Any + ) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.increment(Metric.INPUT_MESSAGES, 5) - backend.gauge(Metric.INPUT_BYTES, 100) - backend.timing(Metric.DURATION, 1000) - + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) backend.flush() - mock_client.increment.assert_called_once_with("input.messages", 5, tags=[]) - mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=[]) - mock_client.timing.assert_called_once_with("duration", 1000, tags=[]) + called_tags = mock_client.increment.call_args[1]["tags"] + assert set(called_tags) == {"service:streams", "env:production"} -class TestArroyoDatadogMetricsBackend: - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_increment(self, mock_dogstatsd: Any) -> None: - mock_client = Mock() - backend = ArroyoDatadogMetricsBackend(mock_client) +class TestArroyoMetricsBackend: + def test_increment(self) -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) - # Use a valid Arroyo metric name instead of "test.metric" backend.increment("arroyo.consumer.run.count", 5, {"env": "test"}) - mock_client.increment.assert_called_once_with( - "arroyo.consumer.run.count", 5, tags=["env:test"] + inner.increment.assert_called_once_with( + "arroyo.consumer.run.count", 5, tags={"env": "test"} ) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_gauge(self, mock_dogstatsd: Any) -> None: - mock_client = Mock() - backend = ArroyoDatadogMetricsBackend(mock_client) + def test_gauge(self) -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) - # Use a valid Arroyo metric name instead of "test.metric" backend.gauge("arroyo.consumer.run.count", 100, {"env": "test"}) - mock_client.gauge.assert_called_once_with( - "arroyo.consumer.run.count", 100, tags=["env:test"] - ) + inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={"env": "test"}) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_timing(self, mock_dogstatsd: Any) -> None: - mock_client = Mock() - backend = ArroyoDatadogMetricsBackend(mock_client) + def test_timing(self) -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) - # Use a valid Arroyo metric name instead of "test.metric" backend.timing("arroyo.consumer.poll.time", 1000, {"env": "test"}) - mock_client.timing.assert_called_once_with( - "arroyo.consumer.poll.time", 1000, tags=["env:test"] + inner.timing.assert_called_once_with( + "arroyo.consumer.poll.time", 1000, tags={"env": "test"} ) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_methods_without_tags(self, mock_dogstatsd: Any) -> None: - mock_client = Mock() - backend = ArroyoDatadogMetricsBackend(mock_client) + def test_methods_without_tags_pass_empty_dict(self) -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) - # Use valid Arroyo metric names instead of "test.metric" backend.increment("arroyo.consumer.run.count") backend.gauge("arroyo.consumer.run.count", 100) backend.timing("arroyo.consumer.poll.time", 1000) - mock_client.increment.assert_called_once_with("arroyo.consumer.run.count", 1, tags=None) - mock_client.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags=None) - mock_client.timing.assert_called_once_with("arroyo.consumer.poll.time", 1000, tags=None) + inner.increment.assert_called_once_with("arroyo.consumer.run.count", 1, tags={}) + inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={}) + inner.timing.assert_called_once_with("arroyo.consumer.poll.time", 1000, tags={}) class TestLogMetricsBackend: @patch("sentry_streams.metrics.metrics.logger") - def test_init(self, mock_logger: Any) -> None: + def test_increment_logs_immediately(self, mock_logger: Any) -> None: backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"}) - assert backend.tags == {"env": "test"} + mock_info = mock_logger.info + + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + + mock_info.assert_called_once() + call_msg = mock_info.call_args[0][0] + assert "input.messages" in call_msg + assert "env:test" in call_msg + + @patch("sentry_streams.metrics.metrics.logger") + def test_each_call_emits_separate_log_line(self, mock_logger: Any) -> None: + backend = LogMetricsBackend(period_sec=60.0) + mock_info = mock_logger.info + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 2) + + assert mock_info.call_count == 2 + + +class TestBufferedMetricsBackendWithLogBackend: @patch("sentry_streams.metrics.metrics.logger") @patch("time.time") def test_buffer_accumulation_and_flush(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 # avoid throttled flush before explicit flush - backend = LogMetricsBackend(period_sec=60.0) + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info - backend.increment(Metric.INPUT_MESSAGES, 5) - backend.increment(Metric.INPUT_MESSAGES, 3) - backend.gauge(Metric.INPUT_BYTES, 100) - backend.gauge(Metric.INPUT_BYTES, 200) - backend.timing(Metric.DURATION, 100) - backend.timing(Metric.DURATION, 50) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), 3) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200) + backend.timing(_metric(Metric.DURATION), 100) + backend.timing(_metric(Metric.DURATION), 50) backend.flush() - mock_info.assert_called_once() - call_msg = mock_info.call_args[0][0] - assert "input.messages" in call_msg - assert "8" in call_msg or "counter input.messages=8" in call_msg - assert "input.bytes" in call_msg - assert "200" in call_msg - assert "duration" in call_msg - assert "150" in call_msg + assert mock_info.call_count == 3 + logged = [c[0][0] for c in mock_info.call_args_list] + assert any("timing" in m and "duration" in m and "150" in m for m in logged) + assert any("counter" in m and "input.messages" in m and "8" in m for m in logged) + assert any("gauge" in m and "input.bytes" in m and "200" in m for m in logged) @patch("sentry_streams.metrics.metrics.logger") @patch("time.time") def test_flush_logs_and_clears(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 # avoid throttled flush before explicit flush - backend = LogMetricsBackend(period_sec=60.0) + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) backend.flush() mock_info.assert_called_once() @@ -324,7 +338,7 @@ def test_flush_logs_and_clears(self, mock_time: Any, mock_logger: Any) -> None: assert "input.messages" in call_msg mock_info.reset_mock() - backend.increment(Metric.INPUT_MESSAGES, 2) + backend.increment(_metric(Metric.INPUT_MESSAGES), 2) backend.flush() mock_info.assert_called_once() call_msg = mock_info.call_args[0][0] @@ -334,30 +348,32 @@ def test_flush_logs_and_clears(self, mock_time: Any, mock_logger: Any) -> None: @patch("time.time") def test_throttled_flush(self, mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - backend = LogMetricsBackend(period_sec=10.0) + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0) mock_info = mock_logger.info - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) mock_info.assert_not_called() mock_time.return_value = 11.0 - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) mock_info.assert_called_once() @patch("sentry_streams.metrics.metrics.logger") @patch("time.time") - def test_global_tags(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 # avoid throttled flush before explicit flush - backend = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"}) + def test_global_tags_from_inner_log_backend(self, mock_time: Any, mock_logger: Any) -> None: + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"}) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info - backend.add_global_tags({"env": "production"}) - backend.increment(Metric.INPUT_MESSAGES, 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) backend.flush() mock_info.assert_called_once() call_msg = mock_info.call_args[0][0] - assert "service:streams" in call_msg or "env:production" in call_msg + assert "service:streams" in call_msg + assert "env:production" in call_msg class TestConfigureMetrics: @@ -374,21 +390,23 @@ def test_configure_metrics_dummy(self, mock_arroyo_configure: Any) -> None: from sentry_streams.metrics.metrics import _metrics_backend - assert _metrics_backend == backend + assert _metrics_backend is backend mock_arroyo_configure.assert_called_once() + arroyo_backend = mock_arroyo_configure.call_args[0][0] + assert isinstance(arroyo_backend, ArroyoMetricsBackend) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") @patch("sentry_streams.metrics.metrics.DogStatsd") def test_configure_metrics_datadog( self, mock_dogstatsd: Any, mock_arroyo_configure: Any ) -> None: - backend = DatadogMetricsBackend("localhost", 8125, "test") + backend = DatadogMetricsBackend("localhost", 8125) configure_metrics(backend) from sentry_streams.metrics.metrics import _metrics_backend - assert _metrics_backend == backend + assert _metrics_backend is backend mock_arroyo_configure.assert_called_once() def test_configure_metrics_already_set(self) -> None: @@ -410,13 +428,13 @@ def test_configure_metrics_force(self, mock_arroyo_configure: Any) -> None: from sentry_streams.metrics.metrics import _metrics_backend - assert _metrics_backend == backend2 + assert _metrics_backend is backend2 def test_configure_metrics_invalid_type(self) -> None: invalid_backend = "not_a_metrics_backend" with pytest.raises(AssertionError): - configure_metrics(invalid_backend) # type: ignore + configure_metrics(invalid_backend) # type: ignore[arg-type] class TestGetMetrics: @@ -427,7 +445,9 @@ def teardown_method(self) -> None: def test_get_metrics_none_configured(self) -> None: metrics = get_metrics() - assert isinstance(metrics, DummyMetricsBackend) + assert isinstance(metrics, Metrics) + inner = object.__getattribute__(metrics, "_Metrics__backend") + assert isinstance(inner, DummyMetricsBackend) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_get_metrics_configured(self, mock_arroyo_configure: Any) -> None: @@ -435,7 +455,9 @@ def test_get_metrics_configured(self, mock_arroyo_configure: Any) -> None: configure_metrics(backend) metrics = get_metrics() - assert metrics == backend + assert isinstance(metrics, Metrics) + inner = object.__getattribute__(metrics, "_Metrics__backend") + assert inner is backend class TestGetSize: From e6938641e8d7127d1976b1c14d63f7e6c3efd350 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 11:16:49 +0200 Subject: [PATCH 2/9] Finish metrics implementation --- .../adapters/arroyo/rust_arroyo.py | 16 +- .../parallel_processing.yaml | 6 +- .../sentry_streams/metrics/metrics.py | 15 +- sentry_streams/tests/pipeline/test_metrics.py | 646 ++++++++---------- 4 files changed, 318 insertions(+), 365 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 8fc4a2d9..7c49f974 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -34,7 +34,14 @@ MultiProcessConfig, StepConfig, ) -from sentry_streams.metrics import Metric, get_metrics, get_size +from sentry_streams.metrics import ( + Metric, + MetricsBackend, + configure_metrics, + get_metrics, + get_size, +) +from sentry_streams.metrics.metrics import get_inner_metrics from sentry_streams.pipeline.function_template import ( InputType, OutputType, @@ -72,6 +79,11 @@ logger = logging.getLogger(__name__) +def initializer(metrics: MetricsBackend) -> None: + # Reinitialize the metrics backend for each process + configure_metrics(metrics) + + def _metrics_wrapped_function( step_name: str, application_function: Callable[[Message[Any]], Any], msg: Message[Any] ) -> Any: @@ -181,6 +193,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: f"batch_size={config['batch_size']}, " f"batch_time={config['batch_time']}" ) + return RuntimeOperator.PythonAdapter( rust_route, MultiprocessDelegateFactory( @@ -189,6 +202,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: config["batch_time"], MultiprocessingPool( num_processes=config["processes"], + initializer=functools.partial(initializer, get_inner_metrics()), ), input_block_size=config.get("input_block_size"), output_block_size=config.get("output_block_size"), diff --git a/sentry_streams/sentry_streams/deployment_config/parallel_processing.yaml b/sentry_streams/sentry_streams/deployment_config/parallel_processing.yaml index 4aad6e3a..80190ce4 100644 --- a/sentry_streams/sentry_streams/deployment_config/parallel_processing.yaml +++ b/sentry_streams/sentry_streams/deployment_config/parallel_processing.yaml @@ -4,7 +4,11 @@ # process. Though chains are parallelized over multiple # processes. env: {} - +metrics: + type: log + period_sec: 5 + tags: + pipeline: errors pipeline: segments: - steps_config: diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 4214532c..9fc14c2e 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -360,6 +360,7 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) +_inner_metrics_backend: Optional[MetricsBackend] = None _metrics_backend: Optional[MetricsBackend] = None _dummy_metrics_backend = DummyMetricsBackend() @@ -370,15 +371,23 @@ def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None: on subsequent initializations. """ global _metrics_backend - + global _inner_metrics_backend if not force: assert _metrics_backend is None, "Metrics is already set" # Perform a runtime check of metrics instance upon initialization of # this class to avoid errors down the line when it is used. assert isinstance(metrics, MetricsBackend) - _metrics_backend = metrics - arroyo_configure_metrics(ArroyoMetricsBackend(metrics)) + + _inner_metrics_backend = metrics + _metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=METRICS_FREQUENCY_SEC) + arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) + + +def get_inner_metrics() -> MetricsBackend: + if _inner_metrics_backend is None: + return _dummy_metrics_backend + return _inner_metrics_backend def get_metrics() -> Metrics: diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index edb493e4..443ec9dd 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -1,8 +1,10 @@ +from collections.abc import Generator from typing import Any from unittest.mock import MagicMock, Mock, patch import pytest +import sentry_streams.metrics.metrics as metrics_module from sentry_streams.metrics.metrics import ( METRICS_FREQUENCY_SEC, METRICS_PREFIX, @@ -14,8 +16,6 @@ Metric, Metrics, configure_metrics, - get_metrics, - get_size, ) @@ -23,453 +23,379 @@ def _metric(name: Metric) -> str: return name.value -class TestMetric: - def test_metric_enum_values(self) -> None: - assert Metric.INPUT_MESSAGES.value == "input.messages" - assert Metric.INPUT_BYTES.value == "input.bytes" - assert Metric.OUTPUT_MESSAGES.value == "output.messages" - assert Metric.OUTPUT_BYTES.value == "output.bytes" - assert Metric.DURATION.value == "duration" - assert Metric.ERRORS.value == "errors" +def _buffered_inner_backend(buffered: BufferedMetricsBackend): + return object.__getattribute__(buffered, "_BufferedMetricsBackend__backend") -class TestDummyMetricsBackend: - def test_increment(self) -> None: - backend = DummyMetricsBackend() - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - backend.increment(_metric(Metric.INPUT_MESSAGES), tags={"key": "value"}) - - def test_gauge(self) -> None: - backend = DummyMetricsBackend() - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - backend.gauge(_metric(Metric.INPUT_BYTES), 200.5, tags={"key": "value"}) +@pytest.fixture(autouse=True) +def reset_metrics_backend() -> Generator[None, None, None]: + metrics_module._metrics_backend = None + yield + metrics_module._metrics_backend = None - def test_timing(self) -> None: - backend = DummyMetricsBackend() - backend.timing(_metric(Metric.DURATION), 1000) - backend.timing(_metric(Metric.DURATION), 1500.5, tags={"key": "value"}) +def test_metric_enum_values() -> None: + assert Metric.INPUT_MESSAGES.value == "input.messages" + assert Metric.INPUT_BYTES.value == "input.bytes" + assert Metric.OUTPUT_MESSAGES.value == "output.messages" + assert Metric.OUTPUT_BYTES.value == "output.bytes" + assert Metric.DURATION.value == "duration" + assert Metric.ERRORS.value == "errors" -class TestMetricsFacade: - def test_delegates_to_backend(self) -> None: - inner = MagicMock(spec=DummyMetricsBackend) - facade = Metrics(inner) - facade.increment(Metric.INPUT_MESSAGES, 5, tags={"k": "v"}) - inner.increment.assert_called_once_with("input.messages", 5, tags={"k": "v"}) - facade.gauge(Metric.INPUT_BYTES, 42) - inner.gauge.assert_called_once_with("input.bytes", 42, tags=None) - facade.timing(Metric.DURATION, 10.0, tags={}) - inner.timing.assert_called_once_with("duration", 10.0, tags={}) +def test_dummy_metrics_backend_increment() -> None: + backend = DummyMetricsBackend() + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), tags={"key": "value"}) -class TestDatadogMetricsBackend: - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_init_namespace_is_metrics_prefix(self, mock_dogstatsd: Any) -> None: - DatadogMetricsBackend("localhost", 8125) - mock_dogstatsd.assert_called_once_with( - host="localhost", - port=8125, - namespace=METRICS_PREFIX.strip("."), - constant_tags=[], - ) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_init_with_tags(self, mock_dogstatsd: Any) -> None: - tags = {"env": "production", "service": "streams"} - DatadogMetricsBackend("localhost", 8125, tags=tags) - mock_dogstatsd.assert_called_once_with( - host="localhost", - port=8125, - namespace=METRICS_PREFIX.strip("."), - constant_tags=[], - ) +def test_dummy_metrics_backend_gauge() -> None: + backend = DummyMetricsBackend() + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200.5, tags={"key": "value"}) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_increment(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) +def test_dummy_metrics_backend_timing() -> None: + backend = DummyMetricsBackend() + backend.timing(_metric(Metric.DURATION), 1000) + backend.timing(_metric(Metric.DURATION), 1500.5, tags={"key": "value"}) - mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_increment_with_tags(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value +def test_metrics_facade_delegates_to_backend() -> None: + inner = MagicMock(spec=DummyMetricsBackend) + facade = Metrics(inner) + facade.increment(Metric.INPUT_MESSAGES, 5, tags={"k": "v"}) + inner.increment.assert_called_once_with("input.messages", 5, tags={"k": "v"}) + facade.gauge(Metric.INPUT_BYTES, 42) + inner.gauge.assert_called_once_with("input.bytes", 42, tags=None) + facade.timing(Metric.DURATION, 10.0, tags={}) + inner.timing.assert_called_once_with("duration", 10.0, tags={}) - backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "test"}) - mock_client.increment.assert_called_once_with("input.messages", 1, tags=["env:test"]) +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_datadog_init_namespace_is_metrics_prefix(mock_dogstatsd: Any) -> None: + DatadogMetricsBackend("localhost", 8125) + mock_dogstatsd.assert_called_once_with( + host="localhost", + port=8125, + namespace=METRICS_PREFIX.strip("."), + constant_tags=[], + ) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_increment_merges_constructor_tags(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) - mock_client = mock_dogstatsd.return_value - backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "test"}) +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_datadog_increment(mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value - called_tags = mock_client.increment.call_args[1]["tags"] - assert set(called_tags) == {"service:streams", "env:test"} + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.timing(_metric(Metric.DURATION), 1500) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_gauge(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) + mock_client.timing.assert_called_once_with("duration", 1500, tags=None) - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - - mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) - - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_timing(self, mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - - backend.timing(_metric(Metric.DURATION), 1500) - - mock_client.timing.assert_called_once_with("duration", 1500, tags=None) +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_datadog_increment_with_tags(mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + tags = {"env": "test"} -class TestBufferedMetricsBackend: - @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_without_auto_flush(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags=tags) + backend.gauge(_metric(Metric.INPUT_BYTES), 100, tags=tags) + backend.timing(_metric(Metric.DURATION), 1500, tags=tags) - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + mock_client.increment.assert_called_once_with("input.messages", 1, tags=["env:test"]) + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=["env:test"]) + mock_client.timing.assert_called_once_with("duration", 1500, tags=["env:test"]) - mock_client.increment.assert_not_called() - backend.flush() +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_datadog_increment_merges_constructor_tags(mock_dogstatsd: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) + mock_client = mock_dogstatsd.return_value + tags = {"env": "test"} - mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags=tags) + backend.gauge(_metric(Metric.INPUT_BYTES), 100, tags=tags) + backend.timing(_metric(Metric.DURATION), 1500, tags=tags) - @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_with_throttling(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2] - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + expected = {"service:streams", "env:test"} + mock_client.increment.assert_called_once() + assert mock_client.increment.call_args[0] == ("input.messages", 1) + assert set(mock_client.increment.call_args[1]["tags"]) == expected + mock_client.gauge.assert_called_once() + assert mock_client.gauge.call_args[0] == ("input.bytes", 100) + assert set(mock_client.gauge.call_args[1]["tags"]) == expected + mock_client.timing.assert_called_once() + assert mock_client.timing.call_args[0] == ("duration", 1500) + assert set(mock_client.timing.call_args[1]["tags"]) == expected - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2] + inner = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_increment_accumulation(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - backend.increment(_metric(Metric.INPUT_MESSAGES), 3) - backend.flush() + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) - mock_client.increment.assert_called_once_with("input.messages", 8, tags=None) - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_gauge(self, mock_dogstatsd: Any) -> None: - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_increment_accumulation(mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - backend.flush() + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), 3) + backend.flush() - mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) + mock_client.increment.assert_called_once_with("input.messages", 8, tags=None) - @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_gauge_replacement(self, mock_time: Any, mock_dogstatsd: Any) -> None: - mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - backend.gauge(_metric(Metric.INPUT_BYTES), 200) - backend.flush() +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_gauge_replacement(mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - mock_client.gauge.assert_called_once_with("input.bytes", 200, tags=None) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200) + backend.flush() - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_flush_all_metric_types(self, mock_dogstatsd: Any) -> None: - inner = DatadogMetricsBackend("localhost", 8125) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) + mock_client.gauge.assert_called_once_with("input.bytes", 200, tags=None) - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - backend.timing(_metric(Metric.DURATION), 1000) - backend.flush() +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_flush_all_metric_types(mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) - mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) - mock_client.timing.assert_called_once_with("duration", 1000, tags=None) + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.timing(_metric(Metric.DURATION), 1000) - @patch("sentry_streams.metrics.metrics.DogStatsd") - @patch("time.time") - def test_buffered_wraps_datadog_with_constructor_tags( - self, mock_time: Any, mock_dogstatsd: Any - ) -> None: - mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + mock_client.increment.assert_not_called() + mock_client.gauge.assert_not_called() + mock_client.timing.assert_not_called() - backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) - backend.flush() + backend.flush() - called_tags = mock_client.increment.call_args[1]["tags"] - assert set(called_tags) == {"service:streams", "env:production"} + mock_client.increment.assert_called_once_with("input.messages", 5, tags=None) + mock_client.gauge.assert_called_once_with("input.bytes", 100, tags=None) + mock_client.timing.assert_called_once_with("duration", 1000, tags=None) -class TestArroyoMetricsBackend: - def test_increment(self) -> None: - inner = Mock(spec=DummyMetricsBackend) - backend = ArroyoMetricsBackend(inner) +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_wraps_datadog_with_constructor_tags(mock_time: Any, mock_dogstatsd: Any) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - backend.increment("arroyo.consumer.run.count", 5, {"env": "test"}) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) + backend.flush() - inner.increment.assert_called_once_with( - "arroyo.consumer.run.count", 5, tags={"env": "test"} - ) - - def test_gauge(self) -> None: - inner = Mock(spec=DummyMetricsBackend) - backend = ArroyoMetricsBackend(inner) - - backend.gauge("arroyo.consumer.run.count", 100, {"env": "test"}) - - inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={"env": "test"}) - - def test_timing(self) -> None: - inner = Mock(spec=DummyMetricsBackend) - backend = ArroyoMetricsBackend(inner) - - backend.timing("arroyo.consumer.poll.time", 1000, {"env": "test"}) - - inner.timing.assert_called_once_with( - "arroyo.consumer.poll.time", 1000, tags={"env": "test"} - ) - - def test_methods_without_tags_pass_empty_dict(self) -> None: - inner = Mock(spec=DummyMetricsBackend) - backend = ArroyoMetricsBackend(inner) - - backend.increment("arroyo.consumer.run.count") - backend.gauge("arroyo.consumer.run.count", 100) - backend.timing("arroyo.consumer.poll.time", 1000) - - inner.increment.assert_called_once_with("arroyo.consumer.run.count", 1, tags={}) - inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={}) - inner.timing.assert_called_once_with("arroyo.consumer.poll.time", 1000, tags={}) - - -class TestLogMetricsBackend: - @patch("sentry_streams.metrics.metrics.logger") - def test_increment_logs_immediately(self, mock_logger: Any) -> None: - backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"}) - mock_info = mock_logger.info - - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - - mock_info.assert_called_once() - call_msg = mock_info.call_args[0][0] - assert "input.messages" in call_msg - assert "env:test" in call_msg - - @patch("sentry_streams.metrics.metrics.logger") - def test_each_call_emits_separate_log_line(self, mock_logger: Any) -> None: - backend = LogMetricsBackend(period_sec=60.0) - mock_info = mock_logger.info + called_tags = mock_client.increment.call_args[1]["tags"] + assert set(called_tags) == {"service:streams", "env:production"} - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - backend.increment(_metric(Metric.INPUT_MESSAGES), 2) - assert mock_info.call_count == 2 +def test_arroyo_delegates_increment_gauge_timing_with_tags() -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) + tags = {"env": "test"} + backend.increment("arroyo.consumer.run.count", 5, tags) + backend.gauge("arroyo.consumer.run.count", 100, tags) + backend.timing("arroyo.consumer.poll.time", 1000, tags) -class TestBufferedMetricsBackendWithLogBackend: - @patch("sentry_streams.metrics.metrics.logger") - @patch("time.time") - def test_buffer_accumulation_and_flush(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) - backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - mock_info = mock_logger.info - - backend.increment(_metric(Metric.INPUT_MESSAGES), 5) - backend.increment(_metric(Metric.INPUT_MESSAGES), 3) - backend.gauge(_metric(Metric.INPUT_BYTES), 100) - backend.gauge(_metric(Metric.INPUT_BYTES), 200) - backend.timing(_metric(Metric.DURATION), 100) - backend.timing(_metric(Metric.DURATION), 50) - backend.flush() + inner.increment.assert_called_once_with("arroyo.consumer.run.count", 5, tags={"env": "test"}) + inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={"env": "test"}) + inner.timing.assert_called_once_with("arroyo.consumer.poll.time", 1000, tags={"env": "test"}) - assert mock_info.call_count == 3 - logged = [c[0][0] for c in mock_info.call_args_list] - assert any("timing" in m and "duration" in m and "150" in m for m in logged) - assert any("counter" in m and "input.messages" in m and "8" in m for m in logged) - assert any("gauge" in m and "input.bytes" in m and "200" in m for m in logged) - @patch("sentry_streams.metrics.metrics.logger") - @patch("time.time") - def test_flush_logs_and_clears(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) - backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - mock_info = mock_logger.info +def test_arroyo_methods_without_tags_pass_empty_dict() -> None: + inner = Mock(spec=DummyMetricsBackend) + backend = ArroyoMetricsBackend(inner) - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - backend.flush() + backend.increment("arroyo.consumer.run.count") + backend.gauge("arroyo.consumer.run.count", 100) + backend.timing("arroyo.consumer.poll.time", 1000) - mock_info.assert_called_once() - call_msg = mock_info.call_args[0][0] - assert METRICS_PREFIX.split(".")[0] in call_msg - assert "input.messages" in call_msg + inner.increment.assert_called_once_with("arroyo.consumer.run.count", 1, tags={}) + inner.gauge.assert_called_once_with("arroyo.consumer.run.count", 100, tags={}) + inner.timing.assert_called_once_with("arroyo.consumer.poll.time", 1000, tags={}) - mock_info.reset_mock() - backend.increment(_metric(Metric.INPUT_MESSAGES), 2) - backend.flush() - mock_info.assert_called_once() - call_msg = mock_info.call_args[0][0] - assert "2" in call_msg - @patch("sentry_streams.metrics.metrics.logger") - @patch("time.time") - def test_throttled_flush(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) - backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0) - mock_info = mock_logger.info +@patch("sentry_streams.metrics.metrics.logger") +def test_log_increment_logs_immediately(mock_logger: Any) -> None: + backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"}) + mock_info = mock_logger.info - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - mock_info.assert_not_called() + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - mock_time.return_value = 11.0 - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - mock_info.assert_called_once() + mock_info.assert_called_once() + call_msg = mock_info.call_args[0][0] + assert "input.messages" in call_msg + assert "env:test" in call_msg - @patch("sentry_streams.metrics.metrics.logger") - @patch("time.time") - def test_global_tags_from_inner_log_backend(self, mock_time: Any, mock_logger: Any) -> None: - mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"}) - backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) - mock_info = mock_logger.info - backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) - backend.flush() +@patch("sentry_streams.metrics.metrics.logger") +def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None: + backend = LogMetricsBackend(period_sec=60.0) + mock_info = mock_logger.info - mock_info.assert_called_once() - call_msg = mock_info.call_args[0][0] - assert "service:streams" in call_msg - assert "env:production" in call_msg + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 2) + + assert mock_info.call_count == 2 -class TestConfigureMetrics: - def teardown_method(self) -> None: - import sentry_streams.metrics.metrics +@patch("sentry_streams.metrics.metrics.logger") +@patch("time.time") +def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -> None: + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) + mock_info = mock_logger.info - sentry_streams.metrics.metrics._metrics_backend = None + backend.increment(_metric(Metric.INPUT_MESSAGES), 5) + backend.increment(_metric(Metric.INPUT_MESSAGES), 3) + backend.gauge(_metric(Metric.INPUT_BYTES), 100) + backend.gauge(_metric(Metric.INPUT_BYTES), 200) + backend.timing(_metric(Metric.DURATION), 100) + backend.timing(_metric(Metric.DURATION), 50) + backend.flush() - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") - def test_configure_metrics_dummy(self, mock_arroyo_configure: Any) -> None: - backend = DummyMetricsBackend() + assert mock_info.call_count == 3 + logged = [c[0][0] for c in mock_info.call_args_list] + assert any("timing" in m and "duration" in m and "150" in m for m in logged) + assert any("counter" in m and "input.messages" in m and "8" in m for m in logged) + assert any("gauge" in m and "input.bytes" in m and "200" in m for m in logged) - configure_metrics(backend) - from sentry_streams.metrics.metrics import _metrics_backend +@patch("sentry_streams.metrics.metrics.logger") +@patch("time.time") +def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> None: + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) + mock_info = mock_logger.info - assert _metrics_backend is backend - mock_arroyo_configure.assert_called_once() - arroyo_backend = mock_arroyo_configure.call_args[0][0] - assert isinstance(arroyo_backend, ArroyoMetricsBackend) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + backend.flush() - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") - @patch("sentry_streams.metrics.metrics.DogStatsd") - def test_configure_metrics_datadog( - self, mock_dogstatsd: Any, mock_arroyo_configure: Any - ) -> None: - backend = DatadogMetricsBackend("localhost", 8125) - - configure_metrics(backend) + mock_info.assert_called_once() + call_msg = mock_info.call_args[0][0] + assert METRICS_PREFIX.split(".")[0] in call_msg + assert "input.messages" in call_msg - from sentry_streams.metrics.metrics import _metrics_backend + mock_info.reset_mock() + backend.increment(_metric(Metric.INPUT_MESSAGES), 2) + backend.flush() + mock_info.assert_called_once() + call_msg = mock_info.call_args[0][0] + assert "2" in call_msg - assert _metrics_backend is backend - mock_arroyo_configure.assert_called_once() - def test_configure_metrics_already_set(self) -> None: - backend1 = DummyMetricsBackend() - backend2 = DummyMetricsBackend() +@patch("sentry_streams.metrics.metrics.logger") +@patch("time.time") +def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None: + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0) + mock_info = mock_logger.info - configure_metrics(backend1) + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + mock_info.assert_not_called() - with pytest.raises(AssertionError, match="Metrics is already set"): - configure_metrics(backend2) + mock_time.return_value = 11.0 + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + mock_info.assert_called_once() - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") - def test_configure_metrics_force(self, mock_arroyo_configure: Any) -> None: - backend1 = DummyMetricsBackend() - backend2 = DummyMetricsBackend() - configure_metrics(backend1) - configure_metrics(backend2, force=True) +@patch("sentry_streams.metrics.metrics.logger") +@patch("time.time") +def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) -> None: + mock_time.return_value = 0.0 + inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"}) + backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) + mock_info = mock_logger.info - from sentry_streams.metrics.metrics import _metrics_backend + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={"env": "production"}) + backend.flush() - assert _metrics_backend is backend2 + mock_info.assert_called_once() + call_msg = mock_info.call_args[0][0] + assert "service:streams" in call_msg + assert "env:production" in call_msg - def test_configure_metrics_invalid_type(self) -> None: - invalid_backend = "not_a_metrics_backend" - with pytest.raises(AssertionError): - configure_metrics(invalid_backend) # type: ignore[arg-type] +@patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") +def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: + backend = DummyMetricsBackend() + configure_metrics(backend) + + wrapped = metrics_module._metrics_backend + assert isinstance(wrapped, BufferedMetricsBackend) + assert _buffered_inner_backend(wrapped) is backend + assert ( + object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") + == METRICS_FREQUENCY_SEC + ) + mock_arroyo_configure.assert_called_once() + arroyo_backend = mock_arroyo_configure.call_args[0][0] + assert isinstance(arroyo_backend, ArroyoMetricsBackend) + assert object.__getattribute__(arroyo_backend, "_ArroyoMetricsBackend__backend") is wrapped -class TestGetMetrics: - def teardown_method(self) -> None: - import sentry_streams.metrics.metrics - sentry_streams.metrics.metrics._metrics_backend = None +@patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: Any) -> None: + backend = DatadogMetricsBackend("localhost", 8125) + + configure_metrics(backend) + + wrapped = metrics_module._metrics_backend + assert isinstance(wrapped, BufferedMetricsBackend) + assert _buffered_inner_backend(wrapped) is backend + mock_arroyo_configure.assert_called_once() - def test_get_metrics_none_configured(self) -> None: - metrics = get_metrics() - assert isinstance(metrics, Metrics) - inner = object.__getattribute__(metrics, "_Metrics__backend") - assert isinstance(inner, DummyMetricsBackend) - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") - def test_get_metrics_configured(self, mock_arroyo_configure: Any) -> None: - backend = DummyMetricsBackend() - configure_metrics(backend) +def test_configure_metrics_already_set() -> None: + backend1 = DummyMetricsBackend() + backend2 = DummyMetricsBackend() - metrics = get_metrics() - assert isinstance(metrics, Metrics) - inner = object.__getattribute__(metrics, "_Metrics__backend") - assert inner is backend + configure_metrics(backend1) + with pytest.raises(AssertionError, match="Metrics is already set"): + configure_metrics(backend2) -class TestGetSize: - def test_get_size_string(self) -> None: - assert get_size("hello") == 5 - assert get_size("") == 0 - def test_get_size_bytes(self) -> None: - assert get_size(b"hello") == 5 - assert get_size(b"") == 0 +@patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") +def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: + backend1 = DummyMetricsBackend() + backend2 = DummyMetricsBackend() - def test_get_size_other_types(self) -> None: - assert get_size(123) is None - assert get_size([1, 2, 3]) is None - assert get_size({"key": "value"}) is None + configure_metrics(backend1) + configure_metrics(backend2, force=True) + + wrapped = metrics_module._metrics_backend + assert isinstance(wrapped, BufferedMetricsBackend) + assert _buffered_inner_backend(wrapped) is backend2 From 24f9e1220a777dff3238edee3b0c047781aabac5 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 11:29:37 +0200 Subject: [PATCH 3/9] docs(metrics): Clarify backends and streams Metrics adapter Expand docstrings for MetricsBackend, Datadog, Buffered, and Arroyo adapters. Move the enum-backed Metrics wrapper after helper types so protocol and backends are documented before the application-level facade. Co-Authored-By: Cursor Made-with: Cursor --- .../sentry_streams/metrics/metrics.py | 105 +++++++++++------- 1 file changed, 65 insertions(+), 40 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 9fc14c2e..c11b29cf 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -53,42 +53,15 @@ class Metric(Enum): ERRORS = "errors" -class Metrics: - """ - An abstract class that defines the interface for metrics backends. - """ - - def __init__(self, backend: MetricsBackend) -> None: - self.__backend = backend - - def increment( - self, - name: Metric, - value: Union[int, float] = 1, - tags: Optional[Tags] = None, - ) -> None: - """ - Increments a counter metric by a given value. - """ - self.__backend.increment(name.value, value, tags=tags) - - def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - """ - Sets a gauge metric to the given value. - """ - self.__backend.gauge(name.value, value, tags=tags) - - def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: - """ - Records a timing metric. - """ - self.__backend.timing(name.value, value, tags=tags) - - @runtime_checkable class MetricsBackend(Protocol): """ - An abstract class that defines the interface for metrics backends. + Provides an interface to produce metrics of counter, gauge and timing types. + + This can be implemented by different backends, such as Datadog to actually + produce metrics on a real channel or platform. + This can be wrapped in an adapter class that provides a client-specific + metrics interface, for example Arroyo metrics. """ @abstractmethod @@ -121,6 +94,7 @@ def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = Non class DummyMetricsBackend(MetricsBackend): """ Default metrics backend that does not record anything. + Useful for tests. """ def increment( @@ -149,7 +123,12 @@ def _combine_tags(base: Tags, tags: Optional[Tags] = None) -> Tags: class DatadogMetricsBackend(MetricsBackend): """ - Datadog metrics backend. + Backend to produce metrics to Datadog through the DogStatsd client. + + For each metric produced, a call is made to the datadog agent almost + immediately. Instances of this class can be provided with default tags + to be attached to each metric and with a prefix to be added to each + metric name. """ def __init__( @@ -167,7 +146,7 @@ def __init__( namespace=METRICS_PREFIX.strip("."), constant_tags=[], ) - # ignore mypy because that method just is untyped, yet part of public API + # Ignore mypy: this method is untyped but is part of the public API. self.datadog_client.enable_background_sender( # type: ignore[no-untyped-call] sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE, sender_queue_timeout=SENDER_QUEUE_TIMEOUT, @@ -244,8 +223,17 @@ def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = Non class BufferedMetricsBackend(MetricsBackend): """ - Metrics backend that buffers updates and periodically flushes them - via an injected flusher (e.g. Datadog or log). + This delegate class wraps a MetricsBackend and buffers metrics to flush them + periodically. + + This kind of pattern is especially useful when we produce metrics at + high throughput or in a tight loop so we do not incur the overhead + of producing metrics on each call. + + An alternative option would be to use Datadog metrics sampling, but that + would only work on the Datadog backend. Moreover this backend aggregates + the metric to be produced rather than sampling, so we preserve metrics + that are produced rarely. """ def __init__( @@ -327,9 +315,45 @@ def _tags_from_mapping(tags: Optional[Mapping[str, str]]) -> Tags: return dict(tags) +class Metrics: + """ + An adapter to a Metrics backend for the Sentry Streams application. + The only added value to the metrics backend is that the metric name has + to be defined in the enum. + """ + + def __init__(self, backend: MetricsBackend) -> None: + self.__backend = backend + + def increment( + self, + name: Metric, + value: Union[int, float] = 1, + tags: Optional[Tags] = None, + ) -> None: + """ + Increments a counter metric by a given value. + """ + self.__backend.increment(name.value, value, tags=tags) + + def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: + """ + Sets a gauge metric to the given value. + """ + self.__backend.gauge(name.value, value, tags=tags) + + def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None: + """ + Records a timing metric. + """ + self.__backend.timing(name.value, value, tags=tags) + + class ArroyoMetricsBackend: """ - Facade that adapts Arroyo metric names and tag mappings to MetricsBackend. + An adapter to the Metrics backend used in the Arroyo library. Arroyo allows + the client application to provide its own metrics implementation. The + implementation has to comply with arroyo.utils.metrics.Metrics. """ def __init__(self, backend: MetricsBackend) -> None: @@ -369,14 +393,15 @@ def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None: """ Metrics can generally only be configured once, unless force is passed on subsequent initializations. + + This method has to be called for each process the application uses. """ global _metrics_backend global _inner_metrics_backend if not force: assert _metrics_backend is None, "Metrics is already set" - # Perform a runtime check of metrics instance upon initialization of - # this class to avoid errors down the line when it is used. + # Runtime-check the metrics implementation so misconfiguration fails early. assert isinstance(metrics, MetricsBackend) _inner_metrics_backend = metrics From 0b31c9a140ddf807d5deca9dd602bdef280178f9 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 11:48:03 +0200 Subject: [PATCH 4/9] test(metrics): Annotate buffered backend helper for strict mypy Add return type and cast so __getattribute__ does not violate no-any-return under --strict. Refs GH-285 Co-Authored-By: Cursor Made-with: Cursor --- sentry_streams/tests/pipeline/test_metrics.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index 443ec9dd..855262e5 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -1,5 +1,5 @@ from collections.abc import Generator -from typing import Any +from typing import Any, cast from unittest.mock import MagicMock, Mock, patch import pytest @@ -15,6 +15,7 @@ LogMetricsBackend, Metric, Metrics, + MetricsBackend, configure_metrics, ) @@ -23,8 +24,11 @@ def _metric(name: Metric) -> str: return name.value -def _buffered_inner_backend(buffered: BufferedMetricsBackend): - return object.__getattribute__(buffered, "_BufferedMetricsBackend__backend") +def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: + return cast( + MetricsBackend, + object.__getattribute__(buffered, "_BufferedMetricsBackend__backend"), + ) @pytest.fixture(autouse=True) From da1732bff2196021436d54f4cd0e21dc78eb51d1 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 12:09:52 +0200 Subject: [PATCH 5/9] fix(metrics): Merge Datadog tags when per-call tags are empty dict BufferedMetricsBackend flushes with tags or {}, which is falsy; the previous if tags guard skipped _combine_tags and dropped constructor tags. Always combine base tags with per-call tags (None or dict) and pass normalized tags, or None when there are no tags at all. Refs GH-285 Co-Authored-By: Cursor --- .../sentry_streams/metrics/metrics.py | 23 ++++++---------- sentry_streams/tests/pipeline/test_metrics.py | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index c11b29cf..1830c2d7 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -156,28 +156,21 @@ def __init__( def __normalize_tags(self, tags: Tags) -> list[str]: return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] + def __datadog_tags_kw(self, tags: Optional[Tags]) -> Optional[list[str]]: + combined = _combine_tags(self.__tags, tags) + normalized = self.__normalize_tags(combined) + return normalized if normalized else None + def increment( self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None ) -> None: - self.datadog_client.increment( - name, - value, - tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, - ) + self.datadog_client.increment(name, value, tags=self.__datadog_tags_kw(tags)) def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: - self.datadog_client.gauge( - name, - value, - tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, - ) + self.datadog_client.gauge(name, value, tags=self.__datadog_tags_kw(tags)) def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None: - self.datadog_client.timing( - name, - value, - tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None, - ) + self.datadog_client.timing(name, value, tags=self.__datadog_tags_kw(tags)) class LogMetricsBackend(MetricsBackend): diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index 855262e5..bb73b781 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -138,6 +138,16 @@ def test_datadog_increment_merges_constructor_tags(mock_dogstatsd: Any) -> None: assert set(mock_client.timing.call_args[1]["tags"]) == expected +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_datadog_preserves_constructor_tags_when_per_call_tags_empty_dict( + mock_dogstatsd: Any, +) -> None: + backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) + mock_client = mock_dogstatsd.return_value + backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={}) + mock_client.increment.assert_called_once_with("input.messages", 1, tags=["service:streams"]) + + @patch("sentry_streams.metrics.metrics.DogStatsd") @patch("time.time") def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) -> None: @@ -219,6 +229,23 @@ def test_buffered_wraps_datadog_with_constructor_tags(mock_time: Any, mock_dogst assert set(called_tags) == {"service:streams", "env:production"} +@patch("sentry_streams.metrics.metrics.DogStatsd") +@patch("time.time") +def test_buffered_flush_preserves_constructor_tags_when_buffer_had_only_empty_tags( + mock_time: Any, + mock_dogstatsd: Any, +) -> None: + mock_time.return_value = 0.0 + inner = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) + mock_client = mock_dogstatsd.return_value + backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) + + backend.increment(_metric(Metric.INPUT_MESSAGES), 1) + backend.flush() + + mock_client.increment.assert_called_once_with("input.messages", 1, tags=["service:streams"]) + + def test_arroyo_delegates_increment_gauge_timing_with_tags() -> None: inner = Mock(spec=DummyMetricsBackend) backend = ArroyoMetricsBackend(inner) From 195ea9fc00de3e8d2f2998b6ef89ced14826fd66 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 12:18:33 +0200 Subject: [PATCH 6/9] ref(metrics): Configure metrics from picklable StreamMetricsConfig Runner builds StreamMetricsConfig from deployment YAML and calls configure_metrics with that object. Inner backends are created via build_metrics_backend so worker processes can call configure_metrics again with the same config under spawn multiprocessing. RustArroyoAdapter stores the config and passes it to MultiprocessingPool initializers instead of get_inner_metrics(). Log metrics honor metrics.period_sec as BufferedMetricsBackend throttle_interval_sec; LogMetricsBackend no longer takes an unused period_sec. Exports StreamMetricsConfig and build_metrics_backend from sentry_streams.metrics. Refs GH-285 Co-Authored-By: Cursor --- .../adapters/arroyo/rust_arroyo.py | 26 +++--- .../sentry_streams/adapters/loader.py | 5 +- .../sentry_streams/metrics/__init__.py | 4 + .../sentry_streams/metrics/metrics.py | 64 ++++++++++++--- sentry_streams/sentry_streams/runner.py | 58 ++++++------- sentry_streams/tests/pipeline/test_metrics.py | 81 ++++++++++++++----- sentry_streams/tests/test_load_runtime.py | 4 + 7 files changed, 173 insertions(+), 69 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 7c49f974..02735aad 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -36,12 +36,11 @@ ) from sentry_streams.metrics import ( Metric, - MetricsBackend, + StreamMetricsConfig, configure_metrics, get_metrics, get_size, ) -from sentry_streams.metrics.metrics import get_inner_metrics from sentry_streams.pipeline.function_template import ( InputType, OutputType, @@ -79,9 +78,8 @@ logger = logging.getLogger(__name__) -def initializer(metrics: MetricsBackend) -> None: - # Reinitialize the metrics backend for each process - configure_metrics(metrics) +def initializer(streams_metrics_config: StreamMetricsConfig) -> None: + configure_metrics(streams_metrics_config, force=True) def _metrics_wrapped_function( @@ -183,7 +181,11 @@ def build_kafka_producer_config( ) -def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: +def finalize_chain( + chains: TransformChains, + route: Route, + streams_metrics_config: StreamMetricsConfig, +) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) if config: @@ -202,7 +204,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: config["batch_time"], MultiprocessingPool( num_processes=config["processes"], - initializer=functools.partial(initializer, get_inner_metrics()), + initializer=functools.partial(initializer, streams_metrics_config), ), input_block_size=config.get("input_block_size"), output_block_size=config.get("output_block_size"), @@ -219,11 +221,13 @@ class RustArroyoAdapter(StreamAdapter[Route, Route]): def __init__( self, steps_config: Mapping[str, StepConfig], + streams_metrics_config: StreamMetricsConfig, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, ) -> None: super().__init__() self.steps_config = steps_config + self.__streams_metrics_config = streams_metrics_config self.__metric_config = metric_config self.__write_healthcheck = write_healthcheck self.__consumers: MutableMapping[str, ArroyoConsumer] = {} @@ -234,18 +238,22 @@ def build( cls, config: PipelineConfig, metric_config: PyMetricConfig | None = None, + streams_metrics_config: StreamMetricsConfig | None = None, ) -> Self: steps_config = config["steps_config"] adapter_config = config.get("adapter_config") or {} arroyo_config = adapter_config.get("arroyo") or {} write_healthcheck = bool(arroyo_config.get("write_healthcheck", False)) + smc = streams_metrics_config or StreamMetricsConfig(backend="dummy") - return cls(steps_config, metric_config, write_healthcheck) + return cls(steps_config, smc, metric_config, write_healthcheck) def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): logger.info(f"Closing transformation chain: {stream} and adding to pipeline") - self.__consumers[stream.source].add_step(finalize_chain(self.__chains, stream)) + self.__consumers[stream.source].add_step( + finalize_chain(self.__chains, stream, self.__streams_metrics_config) + ) def get_consumer(self, source: str) -> ArroyoConsumer: return self.__consumers[source] diff --git a/sentry_streams/sentry_streams/adapters/loader.py b/sentry_streams/sentry_streams/adapters/loader.py index 7024185a..bb9a2e20 100644 --- a/sentry_streams/sentry_streams/adapters/loader.py +++ b/sentry_streams/sentry_streams/adapters/loader.py @@ -4,6 +4,7 @@ from typing import Any, Optional, TypeVar, cast from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter +from sentry_streams.metrics.metrics import StreamMetricsConfig Stream = TypeVar("Stream") Sink = TypeVar("Sink") @@ -14,6 +15,7 @@ def load_adapter( config: PipelineConfig, segment_id: Optional[int] = None, metric_config: Optional[dict[str, Any]] = None, + streams_metrics_config: Optional[StreamMetricsConfig] = None, ) -> StreamAdapter[Stream, Sink]: """ Loads a StreamAdapter to run a pipeline. @@ -71,9 +73,10 @@ def load_adapter( ) # TODO: Fix this type as above. + smc = streams_metrics_config or StreamMetricsConfig(backend="dummy") return cast( StreamAdapter[Stream, Sink], - RustArroyoAdapter.build(config, py_metric_config), + RustArroyoAdapter.build(config, py_metric_config, smc), ) else: mod, cls = adapter_type.rsplit(".", 1) diff --git a/sentry_streams/sentry_streams/metrics/__init__.py b/sentry_streams/sentry_streams/metrics/__init__.py index 9c16987c..64601ab5 100644 --- a/sentry_streams/sentry_streams/metrics/__init__.py +++ b/sentry_streams/sentry_streams/metrics/__init__.py @@ -6,12 +6,15 @@ Metric, Metrics, MetricsBackend, + StreamMetricsConfig, + build_metrics_backend, configure_metrics, get_metrics, get_size, ) __all__ = [ + "build_metrics_backend", "configure_metrics", "get_metrics", "DatadogMetricsBackend", @@ -21,5 +24,6 @@ "Metric", "Metrics", "MetricsBackend", + "StreamMetricsConfig", "get_size", ] diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 1830c2d7..e905c186 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -3,9 +3,11 @@ import logging import time from abc import abstractmethod +from dataclasses import dataclass from enum import Enum from typing import ( Any, + Literal, Mapping, Optional, Protocol, @@ -20,9 +22,28 @@ Tags = dict[str, str] logger = logging.getLogger("sentry_streams.metrics.log_backend") - METRICS_FREQUENCY_SEC = 10 +MetricsBackendKind = Literal["dummy", "datadog", "log"] + + +@dataclass(frozen=True, slots=True) +class StreamMetricsConfig: + """ + Picklable metrics settings for the streaming process and multiprocessing workers. + + ``throttle_interval_sec`` is the flush interval for :class:`BufferedMetricsBackend`, + which wraps the inner backend configured via ``backend``. + """ + + backend: MetricsBackendKind + throttle_interval_sec: float = METRICS_FREQUENCY_SEC + host: Optional[str] = None + port: Optional[int] = None + tags: Optional[Tags] = None + udp_queue_size: Optional[int] = None + + # Single source of truth for the metrics namespace used by both Datadog and Log backends. METRICS_PREFIX = "streams.pipeline" @@ -179,10 +200,9 @@ class LogMetricsBackend(MetricsBackend): as LogFlusher: ``prefix | counter|gauge|timing name=value tag1:val1 ...``. """ - def __init__(self, period_sec: float, tags: Optional[Tags] = None) -> None: + def __init__(self, tags: Optional[Tags] = None) -> None: self.__prefix = METRICS_PREFIX.strip(".") self.__base_tags: Tags = tags if tags is not None else {} - self._period_sec = period_sec @staticmethod def __normalize_tags(tags: Tags) -> list[str]: @@ -379,26 +399,52 @@ def timing( _inner_metrics_backend: Optional[MetricsBackend] = None _metrics_backend: Optional[MetricsBackend] = None +_streams_metrics_config: Optional[StreamMetricsConfig] = None _dummy_metrics_backend = DummyMetricsBackend() -def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None: +def build_metrics_backend(config: StreamMetricsConfig) -> MetricsBackend: + """Construct the inner (unbuffered) metrics backend from picklable settings.""" + if config.backend == "dummy": + return DummyMetricsBackend() + if config.backend == "datadog": + if config.host is None or config.port is None: + raise ValueError("datadog metrics require host and port") + tag_values = dict(config.tags) if config.tags else None + return DatadogMetricsBackend( + config.host, + config.port, + tags=tag_values, + udp_queue_size=config.udp_queue_size, + ) + if config.backend == "log": + tag_values = dict(config.tags) if config.tags else None + return LogMetricsBackend(tags=tag_values) + raise ValueError(f"Unknown metrics backend: {config.backend!r}") + + +def configure_metrics(config: StreamMetricsConfig, force: bool = False) -> None: """ Metrics can generally only be configured once, unless force is passed on subsequent initializations. This method has to be called for each process the application uses. + Accepts a picklable :class:`StreamMetricsConfig` so worker processes can + rebuild the same backends under ``spawn`` multiprocessing. """ global _metrics_backend global _inner_metrics_backend + global _streams_metrics_config if not force: assert _metrics_backend is None, "Metrics is already set" - # Runtime-check the metrics implementation so misconfiguration fails early. - assert isinstance(metrics, MetricsBackend) - - _inner_metrics_backend = metrics - _metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=METRICS_FREQUENCY_SEC) + inner = build_metrics_backend(config) + _streams_metrics_config = config + _inner_metrics_backend = inner + _metrics_backend = BufferedMetricsBackend( + inner, + throttle_interval_sec=config.throttle_interval_sec, + ) arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 381cd9ec..169f6585 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -12,13 +12,7 @@ StreamSinkT, StreamT, ) -from sentry_streams.metrics import ( - DatadogMetricsBackend, - DummyMetricsBackend, - LogMetricsBackend, - MetricsBackend, - configure_metrics, -) +from sentry_streams.metrics import StreamMetricsConfig, configure_metrics from sentry_streams.pipeline.config import load_config from sentry_streams.pipeline.pipeline import ( Pipeline, @@ -104,43 +98,51 @@ def load_runtime( validate_all_branches_have_sinks(pipeline) - metric_config = environment_config.get("metrics", {}) - metrics_backend: MetricsBackend - if metric_config.get("type") == "datadog": - default_tags = metric_config.get("tags", {}) + metric_config_raw = environment_config.get("metrics", {}) + streams_config: StreamMetricsConfig + if metric_config_raw.get("type") == "datadog": + default_tags = dict(metric_config_raw.get("tags", {})) default_tags["pipeline"] = name - metrics_backend = DatadogMetricsBackend( - metric_config["host"], - metric_config["port"], + streams_config = StreamMetricsConfig( + backend="datadog", + host=metric_config_raw["host"], + port=int(metric_config_raw["port"]), tags=default_tags, - udp_queue_size=metric_config.get("udp_queue_size"), + udp_queue_size=metric_config_raw.get("udp_queue_size"), ) - configure_metrics(metrics_backend) + configure_metrics(streams_config) metric_config = { - "host": metric_config["host"], - "port": metric_config["port"], + "host": metric_config_raw["host"], + "port": metric_config_raw["port"], "tags": default_tags, - "flush_interval_ms": metric_config.get("flush_interval_ms"), - "udp_queue_size": metric_config.get("udp_queue_size"), + "flush_interval_ms": metric_config_raw.get("flush_interval_ms"), + "udp_queue_size": metric_config_raw.get("udp_queue_size"), } - elif metric_config.get("type") == "log": - default_tags = metric_config.get("tags", {}) + elif metric_config_raw.get("type") == "log": + default_tags = dict(metric_config_raw.get("tags", {})) default_tags["pipeline"] = name - metrics_backend = LogMetricsBackend( - period_sec=metric_config["period_sec"], + streams_config = StreamMetricsConfig( + backend="log", + throttle_interval_sec=float(metric_config_raw["period_sec"]), tags=default_tags, ) - configure_metrics(metrics_backend) + configure_metrics(streams_config) metric_config = {} else: - metrics_backend = DummyMetricsBackend() - configure_metrics(metrics_backend) + streams_config = StreamMetricsConfig(backend="dummy") + configure_metrics(streams_config) metric_config = {} assigned_segment_id = int(segment_id) if segment_id else None - runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config) + runtime: Any = load_adapter( + adapter, + environment_config, + assigned_segment_id, + metric_config, + streams_config, + ) translator = RuntimeTranslator(runtime) iterate_edges(pipeline, translator) diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index bb73b781..581053e0 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -1,3 +1,4 @@ +import pickle from collections.abc import Generator from typing import Any, cast from unittest.mock import MagicMock, Mock, patch @@ -16,6 +17,8 @@ Metric, Metrics, MetricsBackend, + StreamMetricsConfig, + build_metrics_backend, configure_metrics, ) @@ -34,8 +37,12 @@ def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: @pytest.fixture(autouse=True) def reset_metrics_backend() -> Generator[None, None, None]: metrics_module._metrics_backend = None + metrics_module._inner_metrics_backend = None + metrics_module._streams_metrics_config = None yield metrics_module._metrics_backend = None + metrics_module._inner_metrics_backend = None + metrics_module._streams_metrics_config = None def test_metric_enum_values() -> None: @@ -275,7 +282,7 @@ def test_arroyo_methods_without_tags_pass_empty_dict() -> None: @patch("sentry_streams.metrics.metrics.logger") def test_log_increment_logs_immediately(mock_logger: Any) -> None: - backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"}) + backend = LogMetricsBackend(tags={"env": "test"}) mock_info = mock_logger.info backend.increment(_metric(Metric.INPUT_MESSAGES), 1) @@ -288,7 +295,7 @@ def test_log_increment_logs_immediately(mock_logger: Any) -> None: @patch("sentry_streams.metrics.metrics.logger") def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None: - backend = LogMetricsBackend(period_sec=60.0) + backend = LogMetricsBackend() mock_info = mock_logger.info backend.increment(_metric(Metric.INPUT_MESSAGES), 1) @@ -301,7 +308,7 @@ def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None: @patch("time.time") def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) + inner = LogMetricsBackend() backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info @@ -324,7 +331,7 @@ def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) - @patch("time.time") def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) + inner = LogMetricsBackend() backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info @@ -348,7 +355,7 @@ def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> @patch("time.time") def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0) + inner = LogMetricsBackend() backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0) mock_info = mock_logger.info @@ -364,7 +371,7 @@ def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None: @patch("time.time") def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"}) + inner = LogMetricsBackend(tags={"service": "streams"}) backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info @@ -379,13 +386,13 @@ def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: - backend = DummyMetricsBackend() + cfg = StreamMetricsConfig(backend="dummy") - configure_metrics(backend) + configure_metrics(cfg) wrapped = metrics_module._metrics_backend assert isinstance(wrapped, BufferedMetricsBackend) - assert _buffered_inner_backend(wrapped) is backend + assert isinstance(_buffered_inner_backend(wrapped), DummyMetricsBackend) assert ( object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") == METRICS_FREQUENCY_SEC @@ -399,34 +406,64 @@ def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") @patch("sentry_streams.metrics.metrics.DogStatsd") def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) + cfg = StreamMetricsConfig(backend="datadog", host="localhost", port=8125) - configure_metrics(backend) + configure_metrics(cfg) wrapped = metrics_module._metrics_backend assert isinstance(wrapped, BufferedMetricsBackend) - assert _buffered_inner_backend(wrapped) is backend + assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) mock_arroyo_configure.assert_called_once() def test_configure_metrics_already_set() -> None: - backend1 = DummyMetricsBackend() - backend2 = DummyMetricsBackend() - - configure_metrics(backend1) + configure_metrics(StreamMetricsConfig(backend="dummy")) with pytest.raises(AssertionError, match="Metrics is already set"): - configure_metrics(backend2) + configure_metrics(StreamMetricsConfig(backend="dummy")) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: - backend1 = DummyMetricsBackend() - backend2 = DummyMetricsBackend() + configure_metrics(StreamMetricsConfig(backend="dummy")) + configure_metrics( + StreamMetricsConfig(backend="datadog", host="localhost", port=8125), + force=True, + ) + + wrapped = metrics_module._metrics_backend + assert isinstance(wrapped, BufferedMetricsBackend) + assert isinstance(_buffered_inner_backend(wrapped), DatadogMetricsBackend) - configure_metrics(backend1) - configure_metrics(backend2, force=True) +@patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") +def test_configure_metrics_log_uses_config_throttle_interval( + mock_arroyo_configure: Any, +) -> None: + configure_metrics( + StreamMetricsConfig(backend="log", throttle_interval_sec=33.0, tags={"k": "v"}) + ) wrapped = metrics_module._metrics_backend assert isinstance(wrapped, BufferedMetricsBackend) - assert _buffered_inner_backend(wrapped) is backend2 + assert ( + object.__getattribute__(wrapped, "_BufferedMetricsBackend__throttle_interval_sec") == 33.0 + ) + + +@patch("sentry_streams.metrics.metrics.DogStatsd") +def test_stream_metrics_config_roundtrips_through_pickle(mock_dogstatsd: Any) -> None: + cfg = StreamMetricsConfig( + backend="datadog", + host="h", + port=8125, + tags={"service": "streams"}, + throttle_interval_sec=7.5, + ) + roundtripped = pickle.loads(pickle.dumps(cfg)) + assert roundtripped == cfg + assert isinstance(build_metrics_backend(roundtripped), DatadogMetricsBackend) + + +def test_build_metrics_backend_datadog_requires_host_and_port() -> None: + with pytest.raises(ValueError, match="host and port"): + build_metrics_backend(StreamMetricsConfig(backend="datadog", host="localhost")) diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 36724897..50c95b22 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -55,10 +55,14 @@ def reset_metrics_backend() -> Generator[None, None, None]: # Reset before test runs (setup) sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._inner_metrics_backend = None + sentry_streams.metrics.metrics._streams_metrics_config = None arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) sentry_streams.metrics.metrics._metrics_backend = None + sentry_streams.metrics.metrics._inner_metrics_backend = None + sentry_streams.metrics.metrics._streams_metrics_config = None arroyo.utils.metrics._metrics_backend = None From fed0d61a293ded7e54b55c259d9ca3f5b0c300cd Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 16:22:09 +0200 Subject: [PATCH 7/9] Improve config types --- .../adapters/arroyo/rust_arroyo.py | 23 ++-- .../sentry_streams/adapters/loader.py | 8 +- sentry_streams/sentry_streams/config.json | 3 +- .../sentry_streams/metrics/__init__.py | 10 +- .../sentry_streams/metrics/metrics.py | 123 ++++++++++-------- sentry_streams/sentry_streams/runner.py | 38 +++--- .../tests/adapters/arroyo/test_rust_arroyo.py | 1 + sentry_streams/tests/pipeline/test_metrics.py | 82 +++++++----- sentry_streams/tests/test_runner.py | 12 +- 9 files changed, 177 insertions(+), 123 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 02735aad..111e0f6c 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -36,7 +36,7 @@ ) from sentry_streams.metrics import ( Metric, - StreamMetricsConfig, + MetricsConfig, configure_metrics, get_metrics, get_size, @@ -78,8 +78,8 @@ logger = logging.getLogger(__name__) -def initializer(streams_metrics_config: StreamMetricsConfig) -> None: - configure_metrics(streams_metrics_config, force=True) +def initializer(metrics_config: MetricsConfig) -> None: + configure_metrics(metrics_config, force=True) def _metrics_wrapped_function( @@ -184,7 +184,7 @@ def build_kafka_producer_config( def finalize_chain( chains: TransformChains, route: Route, - streams_metrics_config: StreamMetricsConfig, + metrics_config: MetricsConfig, ) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) @@ -204,7 +204,7 @@ def finalize_chain( config["batch_time"], MultiprocessingPool( num_processes=config["processes"], - initializer=functools.partial(initializer, streams_metrics_config), + initializer=functools.partial(initializer, metrics_config), ), input_block_size=config.get("input_block_size"), output_block_size=config.get("output_block_size"), @@ -221,38 +221,37 @@ class RustArroyoAdapter(StreamAdapter[Route, Route]): def __init__( self, steps_config: Mapping[str, StepConfig], - streams_metrics_config: StreamMetricsConfig, + metrics_config: MetricsConfig, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, ) -> None: super().__init__() self.steps_config = steps_config - self.__streams_metrics_config = streams_metrics_config + self.__metrics_config = metrics_config self.__metric_config = metric_config self.__write_healthcheck = write_healthcheck self.__consumers: MutableMapping[str, ArroyoConsumer] = {} self.__chains = TransformChains() @classmethod - def build( + def build( # type: ignore[override] cls, config: PipelineConfig, + metrics_config: MetricsConfig, metric_config: PyMetricConfig | None = None, - streams_metrics_config: StreamMetricsConfig | None = None, ) -> Self: steps_config = config["steps_config"] adapter_config = config.get("adapter_config") or {} arroyo_config = adapter_config.get("arroyo") or {} write_healthcheck = bool(arroyo_config.get("write_healthcheck", False)) - smc = streams_metrics_config or StreamMetricsConfig(backend="dummy") - return cls(steps_config, smc, metric_config, write_healthcheck) + return cls(steps_config, metrics_config, metric_config, write_healthcheck) def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): logger.info(f"Closing transformation chain: {stream} and adding to pipeline") self.__consumers[stream.source].add_step( - finalize_chain(self.__chains, stream, self.__streams_metrics_config) + finalize_chain(self.__chains, stream, self.__metrics_config) ) def get_consumer(self, source: str) -> ArroyoConsumer: diff --git a/sentry_streams/sentry_streams/adapters/loader.py b/sentry_streams/sentry_streams/adapters/loader.py index bb9a2e20..fca48473 100644 --- a/sentry_streams/sentry_streams/adapters/loader.py +++ b/sentry_streams/sentry_streams/adapters/loader.py @@ -4,7 +4,7 @@ from typing import Any, Optional, TypeVar, cast from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter -from sentry_streams.metrics.metrics import StreamMetricsConfig +from sentry_streams.metrics.metrics import MetricsConfig Stream = TypeVar("Stream") Sink = TypeVar("Sink") @@ -15,7 +15,8 @@ def load_adapter( config: PipelineConfig, segment_id: Optional[int] = None, metric_config: Optional[dict[str, Any]] = None, - streams_metrics_config: Optional[StreamMetricsConfig] = None, + *, + metrics_config: MetricsConfig, ) -> StreamAdapter[Stream, Sink]: """ Loads a StreamAdapter to run a pipeline. @@ -73,10 +74,9 @@ def load_adapter( ) # TODO: Fix this type as above. - smc = streams_metrics_config or StreamMetricsConfig(backend="dummy") return cast( StreamAdapter[Stream, Sink], - RustArroyoAdapter.build(config, py_metric_config, smc), + RustArroyoAdapter.build(config, metrics_config, py_metric_config), ) else: mod, cls = adapter_type.rsplit(".", 1) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index fcdd9034..f1e3be9f 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -110,7 +110,8 @@ }, "required": [ "type", - "period_sec" + "period_sec", + "tags" ] }, { diff --git a/sentry_streams/sentry_streams/metrics/__init__.py b/sentry_streams/sentry_streams/metrics/__init__.py index 64601ab5..5c98d04e 100644 --- a/sentry_streams/sentry_streams/metrics/__init__.py +++ b/sentry_streams/sentry_streams/metrics/__init__.py @@ -1,12 +1,15 @@ from sentry_streams.metrics.metrics import ( METRICS_PREFIX, DatadogMetricsBackend, + DatadogMetricsConfig, DummyMetricsBackend, + DummyMetricsConfig, LogMetricsBackend, + LogMetricsConfig, Metric, Metrics, MetricsBackend, - StreamMetricsConfig, + MetricsConfig, build_metrics_backend, configure_metrics, get_metrics, @@ -18,12 +21,15 @@ "configure_metrics", "get_metrics", "DatadogMetricsBackend", + "DatadogMetricsConfig", "DummyMetricsBackend", + "DummyMetricsConfig", "LogMetricsBackend", + "LogMetricsConfig", "METRICS_PREFIX", "Metric", "Metrics", "MetricsBackend", - "StreamMetricsConfig", + "MetricsConfig", "get_size", ] diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index e905c186..83fca295 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -3,15 +3,17 @@ import logging import time from abc import abstractmethod -from dataclasses import dataclass from enum import Enum from typing import ( Any, Literal, Mapping, + NotRequired, Optional, Protocol, + TypedDict, Union, + cast, runtime_checkable, ) @@ -24,24 +26,32 @@ METRICS_FREQUENCY_SEC = 10 -MetricsBackendKind = Literal["dummy", "datadog", "log"] +class DummyMetricsConfig(TypedDict): + type: Literal["dummy"] -@dataclass(frozen=True, slots=True) -class StreamMetricsConfig: - """ - Picklable metrics settings for the streaming process and multiprocessing workers. - ``throttle_interval_sec`` is the flush interval for :class:`BufferedMetricsBackend`, - which wraps the inner backend configured via ``backend``. - """ +class DatadogMetricsConfig(TypedDict): + type: Literal["datadog"] + host: str + port: int + tags: Tags + udp_queue_size: NotRequired[int] + + +class LogMetricsConfig(TypedDict): + type: Literal["log"] + period_sec: float + tags: Tags + + +MetricsConfig = Union[DummyMetricsConfig, DatadogMetricsConfig, LogMetricsConfig] + - backend: MetricsBackendKind - throttle_interval_sec: float = METRICS_FREQUENCY_SEC - host: Optional[str] = None - port: Optional[int] = None - tags: Optional[Tags] = None - udp_queue_size: Optional[int] = None +def _buffer_throttle_interval_sec(config: MetricsConfig) -> float: + if config["type"] == "log": + return float(config["period_sec"]) + return METRICS_FREQUENCY_SEC # Single source of truth for the metrics namespace used by both Datadog and Log backends. @@ -83,6 +93,10 @@ class MetricsBackend(Protocol): produce metrics on a real channel or platform. This can be wrapped in an adapter class that provides a client-specific metrics interface, for example Arroyo metrics. + + Concrete implementations define ``build(config)`` for their + :class:`MetricsConfig` variant; use :func:`build_metrics_backend` + to construct from an arbitrary config dict (discriminated by ``type``). """ @abstractmethod @@ -118,6 +132,10 @@ class DummyMetricsBackend(MetricsBackend): Useful for tests. """ + @staticmethod + def build(config: DummyMetricsConfig) -> DummyMetricsBackend: + return DummyMetricsBackend() + def increment( self, name: str, @@ -156,7 +174,7 @@ def __init__( self, host: str, port: int, - tags: Optional[Tags] = None, + tags: Tags, udp_queue_size: Optional[int] = None, ) -> None: # Do not pass constant_tags to DogStatsd: BufferedMetricsBackend already @@ -172,7 +190,21 @@ def __init__( sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE, sender_queue_timeout=SENDER_QUEUE_TIMEOUT, ) - self.__tags: Tags = tags if tags is not None else {} + self.__tags = tags + + @staticmethod + def build(config: DatadogMetricsConfig) -> DatadogMetricsBackend: + host = config.get("host") + port = config.get("port") + if host is None or port is None: + raise ValueError("datadog metrics require host and port") + udp_queue_size = config.get("udp_queue_size") + return DatadogMetricsBackend( + host, + port, + tags=config["tags"], + udp_queue_size=udp_queue_size, + ) def __normalize_tags(self, tags: Tags) -> list[str]: return [f"{key}:{value.replace('|', '_')}" for key, value in tags.items()] @@ -200,9 +232,13 @@ class LogMetricsBackend(MetricsBackend): as LogFlusher: ``prefix | counter|gauge|timing name=value tag1:val1 ...``. """ - def __init__(self, tags: Optional[Tags] = None) -> None: + def __init__(self, tags: Tags) -> None: self.__prefix = METRICS_PREFIX.strip(".") - self.__base_tags: Tags = tags if tags is not None else {} + self.__base_tags = tags + + @staticmethod + def build(config: LogMetricsConfig) -> LogMetricsBackend: + return LogMetricsBackend(tags=config["tags"]) @staticmethod def __normalize_tags(tags: Tags) -> list[str]: @@ -397,63 +433,44 @@ def timing( self.__backend.timing(name, value, tags=_tags_from_mapping(tags)) -_inner_metrics_backend: Optional[MetricsBackend] = None _metrics_backend: Optional[MetricsBackend] = None -_streams_metrics_config: Optional[StreamMetricsConfig] = None _dummy_metrics_backend = DummyMetricsBackend() -def build_metrics_backend(config: StreamMetricsConfig) -> MetricsBackend: - """Construct the inner (unbuffered) metrics backend from picklable settings.""" - if config.backend == "dummy": - return DummyMetricsBackend() - if config.backend == "datadog": - if config.host is None or config.port is None: - raise ValueError("datadog metrics require host and port") - tag_values = dict(config.tags) if config.tags else None - return DatadogMetricsBackend( - config.host, - config.port, - tags=tag_values, - udp_queue_size=config.udp_queue_size, - ) - if config.backend == "log": - tag_values = dict(config.tags) if config.tags else None - return LogMetricsBackend(tags=tag_values) - raise ValueError(f"Unknown metrics backend: {config.backend!r}") +def build_metrics_backend(config: MetricsConfig) -> MetricsBackend: + """Construct the inner (unbuffered) metrics backend from a metrics config dict.""" + kind = config["type"] + if kind == "dummy": + return DummyMetricsBackend.build(cast(DummyMetricsConfig, config)) + if kind == "datadog": + return DatadogMetricsBackend.build(cast(DatadogMetricsConfig, config)) + if kind == "log": + return LogMetricsBackend.build(cast(LogMetricsConfig, config)) + raise ValueError(f"Unknown metrics type: {kind!r}") -def configure_metrics(config: StreamMetricsConfig, force: bool = False) -> None: +def configure_metrics(config: MetricsConfig, force: bool = False) -> None: """ Metrics can generally only be configured once, unless force is passed on subsequent initializations. This method has to be called for each process the application uses. - Accepts a picklable :class:`StreamMetricsConfig` so worker processes can - rebuild the same backends under ``spawn`` multiprocessing. + Accepts a picklable metrics config dict (``type`` discriminator matches + ``config.json``) so worker processes can rebuild the same backends under + ``spawn`` multiprocessing. """ global _metrics_backend - global _inner_metrics_backend - global _streams_metrics_config if not force: assert _metrics_backend is None, "Metrics is already set" inner = build_metrics_backend(config) - _streams_metrics_config = config - _inner_metrics_backend = inner _metrics_backend = BufferedMetricsBackend( inner, - throttle_interval_sec=config.throttle_interval_sec, + throttle_interval_sec=_buffer_throttle_interval_sec(config), ) arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend)) -def get_inner_metrics() -> MetricsBackend: - if _inner_metrics_backend is None: - return _dummy_metrics_backend - return _inner_metrics_backend - - def get_metrics() -> Metrics: if _metrics_backend is None: return Metrics(_dummy_metrics_backend) diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 169f6585..85362ef4 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -12,7 +12,11 @@ StreamSinkT, StreamT, ) -from sentry_streams.metrics import StreamMetricsConfig, configure_metrics +from sentry_streams.metrics import ( + DatadogMetricsConfig, + MetricsConfig, + configure_metrics, +) from sentry_streams.pipeline.config import load_config from sentry_streams.pipeline.pipeline import ( Pipeline, @@ -99,18 +103,20 @@ def load_runtime( validate_all_branches_have_sinks(pipeline) metric_config_raw = environment_config.get("metrics", {}) - streams_config: StreamMetricsConfig + streams_config: MetricsConfig if metric_config_raw.get("type") == "datadog": default_tags = dict(metric_config_raw.get("tags", {})) default_tags["pipeline"] = name - streams_config = StreamMetricsConfig( - backend="datadog", - host=metric_config_raw["host"], - port=int(metric_config_raw["port"]), - tags=default_tags, - udp_queue_size=metric_config_raw.get("udp_queue_size"), - ) + base_dd: DatadogMetricsConfig = { + "type": "datadog", + "host": metric_config_raw["host"], + "port": int(metric_config_raw["port"]), + "tags": default_tags, + } + if metric_config_raw.get("udp_queue_size") is not None: + base_dd["udp_queue_size"] = metric_config_raw["udp_queue_size"] + streams_config = cast(MetricsConfig, base_dd) configure_metrics(streams_config) metric_config = { "host": metric_config_raw["host"], @@ -123,15 +129,15 @@ def load_runtime( default_tags = dict(metric_config_raw.get("tags", {})) default_tags["pipeline"] = name - streams_config = StreamMetricsConfig( - backend="log", - throttle_interval_sec=float(metric_config_raw["period_sec"]), - tags=default_tags, - ) + streams_config = { + "type": "log", + "period_sec": float(metric_config_raw["period_sec"]), + "tags": default_tags, + } configure_metrics(streams_config) metric_config = {} else: - streams_config = StreamMetricsConfig(backend="dummy") + streams_config = {"type": "dummy"} configure_metrics(streams_config) metric_config = {} @@ -141,7 +147,7 @@ def load_runtime( environment_config, assigned_segment_id, metric_config, - streams_config, + metrics_config=streams_config, ) translator = RuntimeTranslator(runtime) diff --git a/sentry_streams/tests/adapters/arroyo/test_rust_arroyo.py b/sentry_streams/tests/adapters/arroyo/test_rust_arroyo.py index e3a1e2f6..49c34cd8 100644 --- a/sentry_streams/tests/adapters/arroyo/test_rust_arroyo.py +++ b/sentry_streams/tests/adapters/arroyo/test_rust_arroyo.py @@ -23,6 +23,7 @@ def test_rust_arroyo_adapter( "kafkasink": {"bootstrap_servers": bootstrap_servers, "additional_settings": {}}, }, }, + {"type": "dummy"}, ) iterate_edges(pipeline, RuntimeTranslator(adapter)) diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index 581053e0..cdc4fb0e 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -12,12 +12,14 @@ ArroyoMetricsBackend, BufferedMetricsBackend, DatadogMetricsBackend, + DatadogMetricsConfig, DummyMetricsBackend, LogMetricsBackend, + LogMetricsConfig, Metric, Metrics, MetricsBackend, - StreamMetricsConfig, + MetricsConfig, build_metrics_backend, configure_metrics, ) @@ -37,12 +39,8 @@ def _buffered_inner_backend(buffered: BufferedMetricsBackend) -> MetricsBackend: @pytest.fixture(autouse=True) def reset_metrics_backend() -> Generator[None, None, None]: metrics_module._metrics_backend = None - metrics_module._inner_metrics_backend = None - metrics_module._streams_metrics_config = None yield metrics_module._metrics_backend = None - metrics_module._inner_metrics_backend = None - metrics_module._streams_metrics_config = None def test_metric_enum_values() -> None: @@ -85,7 +83,7 @@ def test_metrics_facade_delegates_to_backend() -> None: @patch("sentry_streams.metrics.metrics.DogStatsd") def test_datadog_init_namespace_is_metrics_prefix(mock_dogstatsd: Any) -> None: - DatadogMetricsBackend("localhost", 8125) + DatadogMetricsBackend("localhost", 8125, tags={}) mock_dogstatsd.assert_called_once_with( host="localhost", port=8125, @@ -96,7 +94,7 @@ def test_datadog_init_namespace_is_metrics_prefix(mock_dogstatsd: Any) -> None: @patch("sentry_streams.metrics.metrics.DogStatsd") def test_datadog_increment(mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) + backend = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value backend.increment(_metric(Metric.INPUT_MESSAGES), 5) @@ -110,7 +108,7 @@ def test_datadog_increment(mock_dogstatsd: Any) -> None: @patch("sentry_streams.metrics.metrics.DogStatsd") def test_datadog_increment_with_tags(mock_dogstatsd: Any) -> None: - backend = DatadogMetricsBackend("localhost", 8125) + backend = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value tags = {"env": "test"} @@ -159,7 +157,7 @@ def test_datadog_preserves_constructor_tags_when_per_call_tags_empty_dict( @patch("time.time") def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) -> None: mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2] - inner = DatadogMetricsBackend("localhost", 8125) + inner = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) @@ -172,7 +170,7 @@ def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) @patch("time.time") def test_buffered_increment_accumulation(mock_time: Any, mock_dogstatsd: Any) -> None: mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) + inner = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) @@ -187,7 +185,7 @@ def test_buffered_increment_accumulation(mock_time: Any, mock_dogstatsd: Any) -> @patch("time.time") def test_buffered_gauge_replacement(mock_time: Any, mock_dogstatsd: Any) -> None: mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) + inner = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) @@ -202,7 +200,7 @@ def test_buffered_gauge_replacement(mock_time: Any, mock_dogstatsd: Any) -> None @patch("time.time") def test_buffered_flush_all_metric_types(mock_time: Any, mock_dogstatsd: Any) -> None: mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125) + inner = DatadogMetricsBackend("localhost", 8125, tags={}) mock_client = mock_dogstatsd.return_value backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) @@ -295,7 +293,7 @@ def test_log_increment_logs_immediately(mock_logger: Any) -> None: @patch("sentry_streams.metrics.metrics.logger") def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None: - backend = LogMetricsBackend() + backend = LogMetricsBackend(tags={}) mock_info = mock_logger.info backend.increment(_metric(Metric.INPUT_MESSAGES), 1) @@ -308,7 +306,7 @@ def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None: @patch("time.time") def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend() + inner = LogMetricsBackend(tags={}) backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info @@ -331,7 +329,7 @@ def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) - @patch("time.time") def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend() + inner = LogMetricsBackend(tags={}) backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0) mock_info = mock_logger.info @@ -355,7 +353,7 @@ def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> @patch("time.time") def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None: mock_time.return_value = 0.0 - inner = LogMetricsBackend() + inner = LogMetricsBackend(tags={}) backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0) mock_info = mock_logger.info @@ -386,7 +384,7 @@ def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) - @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: - cfg = StreamMetricsConfig(backend="dummy") + cfg: MetricsConfig = {"type": "dummy"} configure_metrics(cfg) @@ -406,7 +404,12 @@ def test_configure_metrics_dummy(mock_arroyo_configure: Any) -> None: @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") @patch("sentry_streams.metrics.metrics.DogStatsd") def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: Any) -> None: - cfg = StreamMetricsConfig(backend="datadog", host="localhost", port=8125) + cfg: DatadogMetricsConfig = { + "type": "datadog", + "host": "localhost", + "port": 8125, + "tags": {}, + } configure_metrics(cfg) @@ -417,17 +420,22 @@ def test_configure_metrics_datadog(mock_dogstatsd: Any, mock_arroyo_configure: A def test_configure_metrics_already_set() -> None: - configure_metrics(StreamMetricsConfig(backend="dummy")) + configure_metrics({"type": "dummy"}) with pytest.raises(AssertionError, match="Metrics is already set"): - configure_metrics(StreamMetricsConfig(backend="dummy")) + configure_metrics({"type": "dummy"}) @patch("sentry_streams.metrics.metrics.arroyo_configure_metrics") def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: - configure_metrics(StreamMetricsConfig(backend="dummy")) + configure_metrics({"type": "dummy"}) configure_metrics( - StreamMetricsConfig(backend="datadog", host="localhost", port=8125), + { + "type": "datadog", + "host": "localhost", + "port": 8125, + "tags": {}, + }, force=True, ) @@ -440,9 +448,7 @@ def test_configure_metrics_force(mock_arroyo_configure: Any) -> None: def test_configure_metrics_log_uses_config_throttle_interval( mock_arroyo_configure: Any, ) -> None: - configure_metrics( - StreamMetricsConfig(backend="log", throttle_interval_sec=33.0, tags={"k": "v"}) - ) + configure_metrics({"type": "log", "period_sec": 33.0, "tags": {}}) wrapped = metrics_module._metrics_backend assert isinstance(wrapped, BufferedMetricsBackend) assert ( @@ -452,18 +458,28 @@ def test_configure_metrics_log_uses_config_throttle_interval( @patch("sentry_streams.metrics.metrics.DogStatsd") def test_stream_metrics_config_roundtrips_through_pickle(mock_dogstatsd: Any) -> None: - cfg = StreamMetricsConfig( - backend="datadog", - host="h", - port=8125, - tags={"service": "streams"}, - throttle_interval_sec=7.5, - ) + cfg: dict[str, Any] = { + "type": "datadog", + "host": "h", + "port": 8125, + "tags": {"service": "streams"}, + } roundtripped = pickle.loads(pickle.dumps(cfg)) assert roundtripped == cfg assert isinstance(build_metrics_backend(roundtripped), DatadogMetricsBackend) +def test_build_metrics_backend_log_with_empty_tags() -> None: + cfg: LogMetricsConfig = {"type": "log", "period_sec": 1.0, "tags": {}} + backend = build_metrics_backend(cast(MetricsConfig, cfg)) + assert isinstance(backend, LogMetricsBackend) + + def test_build_metrics_backend_datadog_requires_host_and_port() -> None: with pytest.raises(ValueError, match="host and port"): - build_metrics_backend(StreamMetricsConfig(backend="datadog", host="localhost")) + build_metrics_backend( + cast( + MetricsConfig, + {"type": "datadog", "host": "localhost"}, + ) + ) diff --git a/sentry_streams/tests/test_runner.py b/sentry_streams/tests/test_runner.py index ce0c0807..cc49603c 100644 --- a/sentry_streams/tests/test_runner.py +++ b/sentry_streams/tests/test_runner.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any +from typing import Any, cast import pytest @@ -61,7 +61,15 @@ def create_pipeline() -> Pipeline[bytes]: def test_iterate_edges(create_pipeline: Pipeline[bytes]) -> None: dummy_config: PipelineConfig = {} - runtime: DummyAdapter[Any, Any] = load_adapter("dummy", dummy_config, None) # type: ignore + runtime = cast( + DummyAdapter[Any, Any], + load_adapter( + "dummy", + dummy_config, + None, + metrics_config={"type": "dummy"}, + ), + ) translator: RuntimeTranslator[Any, Any] = RuntimeTranslator(runtime) iterate_edges(create_pipeline, translator) assert runtime.input_streams == [ From fe39852afbc2bf102ec11f837f61cc9f004e9645 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 16:34:54 +0200 Subject: [PATCH 8/9] ref(streams): Route adapter metrics through MetricsConfig only Remove the duplicate metric_config dict from the runner. load_adapter now takes metrics_config before optional segment_id; RustArroyoAdapter builds PyMetricConfig in build_py_metrics_config from DatadogMetricsConfig. Add optional flush_interval_ms to DatadogMetricsConfig and config schema. Tighten log metrics tests and reset fixtures for removed module globals. Co-Authored-By: Cursor Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 18 +++++++-- .../sentry_streams/adapters/loader.py | 19 ++-------- sentry_streams/sentry_streams/config.json | 4 ++ .../sentry_streams/metrics/metrics.py | 2 + sentry_streams/sentry_streams/runner.py | 14 ++----- sentry_streams/tests/pipeline/test_metrics.py | 38 ------------------- sentry_streams/tests/test_load_runtime.py | 4 -- sentry_streams/tests/test_runner.py | 2 +- 8 files changed, 27 insertions(+), 74 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 111e0f6c..44d532b9 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -78,6 +78,18 @@ logger = logging.getLogger(__name__) +def build_py_metrics_config(cfg: MetricsConfig) -> PyMetricConfig | None: + """Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics.""" + if cfg["type"] != "datadog": + return None + return PyMetricConfig( + host=cfg["host"], + port=cfg["port"], + tags=dict(cfg["tags"]), + flush_interval_ms=cfg.get("flush_interval_ms"), + ) + + def initializer(metrics_config: MetricsConfig) -> None: configure_metrics(metrics_config, force=True) @@ -222,13 +234,12 @@ def __init__( self, steps_config: Mapping[str, StepConfig], metrics_config: MetricsConfig, - metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, ) -> None: super().__init__() self.steps_config = steps_config self.__metrics_config = metrics_config - self.__metric_config = metric_config + self.__metric_config = build_py_metrics_config(metrics_config) self.__write_healthcheck = write_healthcheck self.__consumers: MutableMapping[str, ArroyoConsumer] = {} self.__chains = TransformChains() @@ -238,14 +249,13 @@ def build( # type: ignore[override] cls, config: PipelineConfig, metrics_config: MetricsConfig, - metric_config: PyMetricConfig | None = None, ) -> Self: steps_config = config["steps_config"] adapter_config = config.get("adapter_config") or {} arroyo_config = adapter_config.get("arroyo") or {} write_healthcheck = bool(arroyo_config.get("write_healthcheck", False)) - return cls(steps_config, metrics_config, metric_config, write_healthcheck) + return cls(steps_config, metrics_config, write_healthcheck) def __close_chain(self, stream: Route) -> None: if self.__chains.exists(stream): diff --git a/sentry_streams/sentry_streams/adapters/loader.py b/sentry_streams/sentry_streams/adapters/loader.py index fca48473..0bc04d53 100644 --- a/sentry_streams/sentry_streams/adapters/loader.py +++ b/sentry_streams/sentry_streams/adapters/loader.py @@ -1,7 +1,7 @@ import importlib.util as utils import sys from importlib import import_module -from typing import Any, Optional, TypeVar, cast +from typing import Optional, TypeVar, cast from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter from sentry_streams.metrics.metrics import MetricsConfig @@ -13,10 +13,8 @@ def load_adapter( adapter_type: str, config: PipelineConfig, - segment_id: Optional[int] = None, - metric_config: Optional[dict[str, Any]] = None, - *, metrics_config: MetricsConfig, + segment_id: Optional[int] = None, ) -> StreamAdapter[Stream, Sink]: """ Loads a StreamAdapter to run a pipeline. @@ -61,22 +59,11 @@ def load_adapter( if adapter_type == "rust_arroyo": from sentry_streams.adapters.arroyo import RustArroyoAdapter - from sentry_streams.rust_streams import PyMetricConfig - - # Convert dict metric_config to PyMetricConfig if provided - py_metric_config = None - if metric_config: - py_metric_config = PyMetricConfig( - host=metric_config["host"], - port=metric_config["port"], - tags=metric_config.get("tags"), - flush_interval_ms=metric_config.get("flush_interval_ms"), - ) # TODO: Fix this type as above. return cast( StreamAdapter[Stream, Sink], - RustArroyoAdapter.build(config, metrics_config, py_metric_config), + RustArroyoAdapter.build(config, metrics_config), ) else: mod, cls = adapter_type.rsplit(".", 1) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index f1e3be9f..7f2d5b1c 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -82,6 +82,10 @@ "udp_queue_size": { "type": "integer", "minimum": 0 + }, + "flush_interval_ms": { + "type": "integer", + "minimum": 1 } }, "required": [ diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 83fca295..93ebca1f 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -37,6 +37,8 @@ class DatadogMetricsConfig(TypedDict): port: int tags: Tags udp_queue_size: NotRequired[int] + # Rust consumer DogStatsD flush interval (Python BufferedMetricsBackend uses METRICS_FREQUENCY_SEC). + flush_interval_ms: NotRequired[int] class LogMetricsConfig(TypedDict): diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py index 85362ef4..de2569c7 100644 --- a/sentry_streams/sentry_streams/runner.py +++ b/sentry_streams/sentry_streams/runner.py @@ -116,15 +116,10 @@ def load_runtime( } if metric_config_raw.get("udp_queue_size") is not None: base_dd["udp_queue_size"] = metric_config_raw["udp_queue_size"] + if metric_config_raw.get("flush_interval_ms") is not None: + base_dd["flush_interval_ms"] = int(metric_config_raw["flush_interval_ms"]) streams_config = cast(MetricsConfig, base_dd) configure_metrics(streams_config) - metric_config = { - "host": metric_config_raw["host"], - "port": metric_config_raw["port"], - "tags": default_tags, - "flush_interval_ms": metric_config_raw.get("flush_interval_ms"), - "udp_queue_size": metric_config_raw.get("udp_queue_size"), - } elif metric_config_raw.get("type") == "log": default_tags = dict(metric_config_raw.get("tags", {})) default_tags["pipeline"] = name @@ -135,19 +130,16 @@ def load_runtime( "tags": default_tags, } configure_metrics(streams_config) - metric_config = {} else: streams_config = {"type": "dummy"} configure_metrics(streams_config) - metric_config = {} assigned_segment_id = int(segment_id) if segment_id else None runtime: Any = load_adapter( adapter, environment_config, + streams_config, assigned_segment_id, - metric_config, - metrics_config=streams_config, ) translator = RuntimeTranslator(runtime) diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py index cdc4fb0e..f014b162 100644 --- a/sentry_streams/tests/pipeline/test_metrics.py +++ b/sentry_streams/tests/pipeline/test_metrics.py @@ -1,4 +1,3 @@ -import pickle from collections.abc import Generator from typing import Any, cast from unittest.mock import MagicMock, Mock, patch @@ -15,7 +14,6 @@ DatadogMetricsConfig, DummyMetricsBackend, LogMetricsBackend, - LogMetricsConfig, Metric, Metrics, MetricsBackend, @@ -234,23 +232,6 @@ def test_buffered_wraps_datadog_with_constructor_tags(mock_time: Any, mock_dogst assert set(called_tags) == {"service:streams", "env:production"} -@patch("sentry_streams.metrics.metrics.DogStatsd") -@patch("time.time") -def test_buffered_flush_preserves_constructor_tags_when_buffer_had_only_empty_tags( - mock_time: Any, - mock_dogstatsd: Any, -) -> None: - mock_time.return_value = 0.0 - inner = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"}) - mock_client = mock_dogstatsd.return_value - backend = BufferedMetricsBackend(inner, throttle_interval_sec=METRICS_FREQUENCY_SEC) - - backend.increment(_metric(Metric.INPUT_MESSAGES), 1) - backend.flush() - - mock_client.increment.assert_called_once_with("input.messages", 1, tags=["service:streams"]) - - def test_arroyo_delegates_increment_gauge_timing_with_tags() -> None: inner = Mock(spec=DummyMetricsBackend) backend = ArroyoMetricsBackend(inner) @@ -456,25 +437,6 @@ def test_configure_metrics_log_uses_config_throttle_interval( ) -@patch("sentry_streams.metrics.metrics.DogStatsd") -def test_stream_metrics_config_roundtrips_through_pickle(mock_dogstatsd: Any) -> None: - cfg: dict[str, Any] = { - "type": "datadog", - "host": "h", - "port": 8125, - "tags": {"service": "streams"}, - } - roundtripped = pickle.loads(pickle.dumps(cfg)) - assert roundtripped == cfg - assert isinstance(build_metrics_backend(roundtripped), DatadogMetricsBackend) - - -def test_build_metrics_backend_log_with_empty_tags() -> None: - cfg: LogMetricsConfig = {"type": "log", "period_sec": 1.0, "tags": {}} - backend = build_metrics_backend(cast(MetricsConfig, cfg)) - assert isinstance(backend, LogMetricsBackend) - - def test_build_metrics_backend_datadog_requires_host_and_port() -> None: with pytest.raises(ValueError, match="host and port"): build_metrics_backend( diff --git a/sentry_streams/tests/test_load_runtime.py b/sentry_streams/tests/test_load_runtime.py index 50c95b22..36724897 100644 --- a/sentry_streams/tests/test_load_runtime.py +++ b/sentry_streams/tests/test_load_runtime.py @@ -55,14 +55,10 @@ def reset_metrics_backend() -> Generator[None, None, None]: # Reset before test runs (setup) sentry_streams.metrics.metrics._metrics_backend = None - sentry_streams.metrics.metrics._inner_metrics_backend = None - sentry_streams.metrics.metrics._streams_metrics_config = None arroyo.utils.metrics._metrics_backend = None yield # Reset after test completes (teardown) sentry_streams.metrics.metrics._metrics_backend = None - sentry_streams.metrics.metrics._inner_metrics_backend = None - sentry_streams.metrics.metrics._streams_metrics_config = None arroyo.utils.metrics._metrics_backend = None diff --git a/sentry_streams/tests/test_runner.py b/sentry_streams/tests/test_runner.py index cc49603c..0fcae5f5 100644 --- a/sentry_streams/tests/test_runner.py +++ b/sentry_streams/tests/test_runner.py @@ -66,8 +66,8 @@ def test_iterate_edges(create_pipeline: Pipeline[bytes]) -> None: load_adapter( "dummy", dummy_config, + {"type": "dummy"}, None, - metrics_config={"type": "dummy"}, ), ) translator: RuntimeTranslator[Any, Any] = RuntimeTranslator(runtime) From 63b66caef80af9f37482fbc26a219268e39aee67 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 30 Mar 2026 21:43:34 +0200 Subject: [PATCH 9/9] Fix pr comments --- .../sentry_streams/adapters/arroyo/rust_arroyo.py | 3 +-- sentry_streams/sentry_streams/metrics/metrics.py | 7 ++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 44d532b9..f93c2b93 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -239,7 +239,6 @@ def __init__( super().__init__() self.steps_config = steps_config self.__metrics_config = metrics_config - self.__metric_config = build_py_metrics_config(metrics_config) self.__write_healthcheck = write_healthcheck self.__consumers: MutableMapping[str, ArroyoConsumer] = {} self.__chains = TransformChains() @@ -299,7 +298,7 @@ def source(self, step: Source[Any]) -> Route: ), topic=step.stream_name, schema=schema_name, - metric_config=self.__metric_config, + metric_config=build_py_metrics_config(self.__metrics_config), write_healthcheck=self.__write_healthcheck, ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py index 93ebca1f..bc2a1f11 100644 --- a/sentry_streams/sentry_streams/metrics/metrics.py +++ b/sentry_streams/sentry_streams/metrics/metrics.py @@ -307,11 +307,8 @@ def __add_to_buffer( tags: Tags, replace: bool = False, ) -> None: - if tags is None: - key = hash(name) - else: - normalized_tags = self.__normalize_tags(tags) - key = hash((name, frozenset(normalized_tags))) + normalized_tags = self.__normalize_tags(tags) + key = hash((name, frozenset(normalized_tags))) if key in buffer: new_value = buffer[key][1] + value if not replace else value