Skip to content

ref(metrics): Refactor metrics backend to make Arroyo metrics work#285

Merged
fpacifici merged 9 commits intomainfrom
fpacifici/add_missing_metrics
Mar 30, 2026
Merged

ref(metrics): Refactor metrics backend to make Arroyo metrics work#285
fpacifici merged 9 commits intomainfrom
fpacifici/add_missing_metrics

Conversation

@fpacifici
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici commented Mar 30, 2026

The need for this came from the realization that Python Arroyo metrics were not being correctly recorded from the streaming applications: all general tags were not applied.
It turned out that We created the Streams metrics backend correctly but we re-initialized the datadog backend when setting up arroyo metrics. At that point we had lost the tags.
Moreover we did not initialize metrics at all in the subprocesses.

When trying to fix the issue I found out the structure of our metrics backend was not sound: DatadogBackend was inherently a BufferedMetricsBackend, which is not needed when passing it to Arroyo (as it does it on its own).
So I made some structure changes.

classDiagram-v2
    MetricsBackend
    DatadogMetricsBackend
    LogMetricsbackend
    DummyMetricsBackend

    MetricsBackend <|-- DatadogMetricsBackend
    MetricsBackend <|-- LogMetricsbackend
    MetricsBackend <|-- DummyMetricsBackend
    MetricsBackend <|-- BufferedMetricsBackend
    
    BufferedMetricsBackend --> MetricsBackend

    Metrics --> MetricsBackend
    ArroyoMetrics --> MetricsBackend
Loading

Metrics and ArroyoMetrics are the entrypoint to produce metrics.
MetricsBackend is the general interface for all the backends.

Made with Cursor
(no, this time cursor just wrote some tests, the implementation was a huge amount of hallucinations so in the end I had to do the coding).

fpacifici and others added 3 commits March 27, 2026 18:34
Expand docstrings for MetricsBackend, Datadog, Buffered, and Arroyo adapters.
Move the enum-backed Metrics wrapper after helper types so protocol and
backends are documented before the application-level facade.

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 30, 2026

Semver Impact of This PR

🟢 Patch (bug fixes)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


Internal Changes 🔧

  • (metrics) Refactor metrics backend to make Arroyo metrics work by fpacifici in #285

🤖 This preview updates automatically when you update the PR.

@fpacifici fpacifici changed the title ref(metrics): Refactor backends and complete runtime wiring ref(metrics): Refactor metrics backend to make Arroyo metrics work Mar 30, 2026
Add return type and cast so __getattribute__ does not violate no-any-return under --strict.

Refs GH-285

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
@fpacifici fpacifici marked this pull request as ready for review March 30, 2026 09:48
@fpacifici fpacifici requested a review from a team as a code owner March 30, 2026 09:48
Comment thread sentry_streams/sentry_streams/metrics/metrics.py Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 3 issues found in the latest run.

  • ✅ Fixed: Constructor tags silently dropped when per-call tags absent
    • Fixed by calling _combine_tags first and checking the combined result for truthiness instead of checking the input tags parameter, ensuring constructor tags are preserved even when per-call tags is an empty dict.
  • ✅ Fixed: User-configured period_sec parameter is silently ignored
    • Fixed by removing the unused period_sec parameter from LogMetricsBackend constructor and adding throttle_interval_sec parameter to configure_metrics, which is now properly passed from runner.py.
  • ✅ Fixed: Unpicklable Datadog backend passed to spawn-based multiprocessing initializer
    • Fixed by creating a serializable MetricsConfig dataclass that is passed to the multiprocessing initializer instead of the unpicklable backend instance, with the backend being reconstructed in each subprocess.

Create PR

Or push these changes by commenting:

@cursor push 8ab1f30160
Preview (8ab1f30160)
diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
--- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
+++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
@@ -41,7 +41,11 @@
     get_metrics,
     get_size,
 )
-from sentry_streams.metrics.metrics import get_inner_metrics
+from sentry_streams.metrics.metrics import (
+    MetricsConfig,
+    build_metrics_backend_from_config,
+    get_metrics_config,
+)
 from sentry_streams.pipeline.function_template import (
     InputType,
     OutputType,
@@ -79,9 +83,10 @@
 logger = logging.getLogger(__name__)
 
 
-def initializer(metrics: MetricsBackend) -> None:
-    # Reinitialize the metrics backend for each process
-    configure_metrics(metrics)
+def initializer(config: MetricsConfig) -> None:
+    # Reinitialize the metrics backend for each process from serializable config
+    metrics = build_metrics_backend_from_config(config)
+    configure_metrics(metrics, config=config, throttle_interval_sec=config.throttle_interval_sec)
 
 
 def _metrics_wrapped_function(
@@ -194,6 +199,7 @@
             f"batch_time={config['batch_time']}"
         )
 
+        metrics_config = get_metrics_config()
         return RuntimeOperator.PythonAdapter(
             rust_route,
             MultiprocessDelegateFactory(
@@ -202,7 +208,7 @@
                 config["batch_time"],
                 MultiprocessingPool(
                     num_processes=config["processes"],
-                    initializer=functools.partial(initializer, get_inner_metrics()),
+                    initializer=functools.partial(initializer, metrics_config),
                 ),
                 input_block_size=config.get("input_block_size"),
                 output_block_size=config.get("output_block_size"),

diff --git a/sentry_streams/sentry_streams/metrics/__init__.py b/sentry_streams/sentry_streams/metrics/__init__.py
--- a/sentry_streams/sentry_streams/metrics/__init__.py
+++ b/sentry_streams/sentry_streams/metrics/__init__.py
@@ -6,6 +6,7 @@
     Metric,
     Metrics,
     MetricsBackend,
+    MetricsConfig,
     configure_metrics,
     get_metrics,
     get_size,
@@ -21,5 +22,6 @@
     "Metric",
     "Metrics",
     "MetricsBackend",
+    "MetricsConfig",
     "get_size",
 ]

diff --git a/sentry_streams/sentry_streams/metrics/metrics.py b/sentry_streams/sentry_streams/metrics/metrics.py
--- a/sentry_streams/sentry_streams/metrics/metrics.py
+++ b/sentry_streams/sentry_streams/metrics/metrics.py
@@ -3,6 +3,7 @@
 import logging
 import time
 from abc import abstractmethod
+from dataclasses import dataclass
 from enum import Enum
 from typing import (
     Any,
@@ -159,24 +160,27 @@
     def increment(
         self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None
     ) -> None:
+        combined_tags = _combine_tags(self.__tags, tags)
         self.datadog_client.increment(
             name,
             value,
-            tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
+            tags=self.__normalize_tags(combined_tags) if combined_tags else None,
         )
 
     def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None:
+        combined_tags = _combine_tags(self.__tags, tags)
         self.datadog_client.gauge(
             name,
             value,
-            tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
+            tags=self.__normalize_tags(combined_tags) if combined_tags else None,
         )
 
     def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None:
+        combined_tags = _combine_tags(self.__tags, tags)
         self.datadog_client.timing(
             name,
             value,
-            tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
+            tags=self.__normalize_tags(combined_tags) if combined_tags else None,
         )
 
 
@@ -186,10 +190,9 @@
     as LogFlusher: ``prefix | counter|gauge|timing name=value tag1:val1 ...``.
     """
 
-    def __init__(self, period_sec: float, tags: Optional[Tags] = None) -> None:
+    def __init__(self, tags: Optional[Tags] = None) -> None:
         self.__prefix = METRICS_PREFIX.strip(".")
         self.__base_tags: Tags = tags if tags is not None else {}
-        self._period_sec = period_sec
 
     @staticmethod
     def __normalize_tags(tags: Tags) -> list[str]:
@@ -384,12 +387,52 @@
         self.__backend.timing(name, value, tags=_tags_from_mapping(tags))
 
 
+@dataclass
+class MetricsConfig:
+    """
+    Serializable configuration for metrics backends.
+    Used to pass metrics configuration to multiprocessing workers.
+    """
+
+    backend_type: str
+    host: Optional[str] = None
+    port: Optional[int] = None
+    tags: Optional[Tags] = None
+    udp_queue_size: Optional[int] = None
+    throttle_interval_sec: Optional[float] = None
+
+
+def build_metrics_backend_from_config(config: MetricsConfig) -> MetricsBackend:
+    """
+    Build a metrics backend from a serializable configuration.
+    This is used in multiprocessing workers to recreate the backend.
+    """
+    if config.backend_type == "datadog":
+        assert config.host is not None and config.port is not None
+        return DatadogMetricsBackend(
+            host=config.host,
+            port=config.port,
+            tags=config.tags,
+            udp_queue_size=config.udp_queue_size,
+        )
+    elif config.backend_type == "log":
+        return LogMetricsBackend(tags=config.tags)
+    else:
+        return DummyMetricsBackend()
+
+
 _inner_metrics_backend: Optional[MetricsBackend] = None
 _metrics_backend: Optional[MetricsBackend] = None
+_metrics_config: Optional[MetricsConfig] = None
 _dummy_metrics_backend = DummyMetricsBackend()
 
 
-def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None:
+def configure_metrics(
+    metrics: MetricsBackend,
+    force: bool = False,
+    throttle_interval_sec: Optional[float] = None,
+    config: Optional[MetricsConfig] = None,
+) -> None:
     """
     Metrics can generally only be configured once, unless force is passed
     on subsequent initializations.
@@ -398,6 +441,7 @@
     """
     global _metrics_backend
     global _inner_metrics_backend
+    global _metrics_config
     if not force:
         assert _metrics_backend is None, "Metrics is already set"
 
@@ -405,7 +449,9 @@
     assert isinstance(metrics, MetricsBackend)
 
     _inner_metrics_backend = metrics
-    _metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=METRICS_FREQUENCY_SEC)
+    interval = throttle_interval_sec if throttle_interval_sec is not None else METRICS_FREQUENCY_SEC
+    _metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=interval)
+    _metrics_config = config
     arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend))
 
 
@@ -415,6 +461,14 @@
     return _inner_metrics_backend
 
 
+def get_metrics_config() -> Optional[MetricsConfig]:
+    """
+    Get the serializable metrics configuration.
+    This is used to pass metrics configuration to multiprocessing workers.
+    """
+    return _metrics_config
+
+
 def get_metrics() -> Metrics:
     if _metrics_backend is None:
         return Metrics(_dummy_metrics_backend)

diff --git a/sentry_streams/sentry_streams/runner.py b/sentry_streams/sentry_streams/runner.py
--- a/sentry_streams/sentry_streams/runner.py
+++ b/sentry_streams/sentry_streams/runner.py
@@ -17,6 +17,7 @@
     DummyMetricsBackend,
     LogMetricsBackend,
     MetricsBackend,
+    MetricsConfig,
     configure_metrics,
 )
 from sentry_streams.pipeline.config import load_config
@@ -106,6 +107,7 @@
 
     metric_config = environment_config.get("metrics", {})
     metrics_backend: MetricsBackend
+    serializable_config: MetricsConfig
     if metric_config.get("type") == "datadog":
         default_tags = metric_config.get("tags", {})
         default_tags["pipeline"] = name
@@ -116,7 +118,14 @@
             tags=default_tags,
             udp_queue_size=metric_config.get("udp_queue_size"),
         )
-        configure_metrics(metrics_backend)
+        serializable_config = MetricsConfig(
+            backend_type="datadog",
+            host=metric_config["host"],
+            port=metric_config["port"],
+            tags=default_tags,
+            udp_queue_size=metric_config.get("udp_queue_size"),
+        )
+        configure_metrics(metrics_backend, config=serializable_config)
         metric_config = {
             "host": metric_config["host"],
             "port": metric_config["port"],
@@ -129,14 +138,23 @@
         default_tags["pipeline"] = name
 
         metrics_backend = LogMetricsBackend(
-            period_sec=metric_config["period_sec"],
             tags=default_tags,
         )
-        configure_metrics(metrics_backend)
+        serializable_config = MetricsConfig(
+            backend_type="log",
+            tags=default_tags,
+            throttle_interval_sec=metric_config["period_sec"],
+        )
+        configure_metrics(
+            metrics_backend,
+            throttle_interval_sec=metric_config["period_sec"],
+            config=serializable_config,
+        )
         metric_config = {}
     else:
         metrics_backend = DummyMetricsBackend()
-        configure_metrics(metrics_backend)
+        serializable_config = MetricsConfig(backend_type="dummy")
+        configure_metrics(metrics_backend, config=serializable_config)
         metric_config = {}
 
     assigned_segment_id = int(segment_id) if segment_id else None

diff --git a/sentry_streams/tests/pipeline/test_metrics.py b/sentry_streams/tests/pipeline/test_metrics.py
--- a/sentry_streams/tests/pipeline/test_metrics.py
+++ b/sentry_streams/tests/pipeline/test_metrics.py
@@ -139,6 +139,27 @@
 
 
 @patch("sentry_streams.metrics.metrics.DogStatsd")
+def test_datadog_constructor_tags_preserved_with_empty_dict(mock_dogstatsd: Any) -> None:
+    backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"})
+    mock_client = mock_dogstatsd.return_value
+
+    backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={})
+    backend.gauge(_metric(Metric.INPUT_BYTES), 100, tags={})
+    backend.timing(_metric(Metric.DURATION), 1500, tags={})
+
+    expected = ["service:streams"]
+    mock_client.increment.assert_called_once()
+    assert mock_client.increment.call_args[0] == ("input.messages", 1)
+    assert mock_client.increment.call_args[1]["tags"] == expected
+    mock_client.gauge.assert_called_once()
+    assert mock_client.gauge.call_args[0] == ("input.bytes", 100)
+    assert mock_client.gauge.call_args[1]["tags"] == expected
+    mock_client.timing.assert_called_once()
+    assert mock_client.timing.call_args[0] == ("duration", 1500)
+    assert mock_client.timing.call_args[1]["tags"] == expected
+
+
+@patch("sentry_streams.metrics.metrics.DogStatsd")
 @patch("time.time")
 def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) -> None:
     mock_time.side_effect = [METRICS_FREQUENCY_SEC + 1, METRICS_FREQUENCY_SEC + 2]
@@ -248,7 +269,7 @@
 
 @patch("sentry_streams.metrics.metrics.logger")
 def test_log_increment_logs_immediately(mock_logger: Any) -> None:
-    backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"})
+    backend = LogMetricsBackend(tags={"env": "test"})
     mock_info = mock_logger.info
 
     backend.increment(_metric(Metric.INPUT_MESSAGES), 1)
@@ -261,7 +282,7 @@
 
 @patch("sentry_streams.metrics.metrics.logger")
 def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None:
-    backend = LogMetricsBackend(period_sec=60.0)
+    backend = LogMetricsBackend()
     mock_info = mock_logger.info
 
     backend.increment(_metric(Metric.INPUT_MESSAGES), 1)
@@ -274,7 +295,7 @@
 @patch("time.time")
 def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -> None:
     mock_time.return_value = 0.0
-    inner = LogMetricsBackend(period_sec=60.0)
+    inner = LogMetricsBackend()
     backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
     mock_info = mock_logger.info
 
@@ -297,7 +318,7 @@
 @patch("time.time")
 def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> None:
     mock_time.return_value = 0.0
-    inner = LogMetricsBackend(period_sec=60.0)
+    inner = LogMetricsBackend()
     backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
     mock_info = mock_logger.info
 
@@ -321,7 +342,7 @@
 @patch("time.time")
 def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None:
     mock_time.return_value = 0.0
-    inner = LogMetricsBackend(period_sec=60.0)
+    inner = LogMetricsBackend()
     backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0)
     mock_info = mock_logger.info
 
@@ -337,7 +358,7 @@
 @patch("time.time")
 def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) -> None:
     mock_time.return_value = 0.0
-    inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"})
+    inner = LogMetricsBackend(tags={"service": "streams"})
     backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
     mock_info = mock_logger.info
 

diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock
--- a/sentry_streams/uv.lock
+++ b/sentry_streams/uv.lock
@@ -893,7 +893,7 @@
 
 [[package]]
 name = "sentry-streams"
-version = "0.0.40"
+version = "0.0.41"
 source = { editable = "." }
 dependencies = [
     { name = "click" },

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment thread sentry_streams/sentry_streams/metrics/metrics.py Outdated
Comment thread sentry_streams/sentry_streams/metrics/metrics.py Outdated
Comment thread sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py Outdated
fpacifici and others added 2 commits March 30, 2026 12:09
BufferedMetricsBackend flushes with tags or {}, which is falsy; the previous if tags guard skipped _combine_tags and dropped constructor tags.

Always combine base tags with per-call tags (None or dict) and pass normalized tags, or None when there are no tags at all.

Refs GH-285

Co-Authored-By: Cursor <cursoragent@cursor.com>
Runner builds StreamMetricsConfig from deployment YAML and calls configure_metrics with that object. Inner backends are created via build_metrics_backend so worker processes can call configure_metrics again with the same config under spawn multiprocessing.

RustArroyoAdapter stores the config and passes it to MultiprocessingPool initializers instead of get_inner_metrics(). Log metrics honor metrics.period_sec as BufferedMetricsBackend throttle_interval_sec; LogMetricsBackend no longer takes an unused period_sec.

Exports StreamMetricsConfig and build_metrics_backend from sentry_streams.metrics.

Refs GH-285

Co-Authored-By: Cursor <cursoragent@cursor.com>
inner,
throttle_interval_sec=config.throttle_interval_sec,
)
arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arroyo metrics needlessly wrapped in BufferedMetricsBackend

Medium Severity

configure_metrics passes _metrics_backend (a BufferedMetricsBackend) to ArroyoMetricsBackend, but the inner (unbuffered) backend was created specifically for this purpose — it's stored in _inner_metrics_backend and exposed via the unused get_inner_metrics(). The old code passed the DogStatsd client directly to Arroyo, bypassing buffering. Now Arroyo timing metrics are accumulated (summed) inside the buffer before flush, corrupting their semantics. The line likely needs ArroyoMetricsBackend(inner) instead of ArroyoMetricsBackend(_metrics_backend).

Additional Locations (1)
Fix in Cursor Fix in Web

Comment thread sentry_streams/sentry_streams/metrics/metrics.py
fpacifici and others added 2 commits March 30, 2026 16:22
Remove the duplicate metric_config dict from the runner. load_adapter now takes metrics_config before optional segment_id; RustArroyoAdapter builds PyMetricConfig in build_py_metrics_config from DatadogMetricsConfig.

Add optional flush_interval_ms to DatadogMetricsConfig and config schema. Tighten log metrics tests and reset fixtures for removed module globals.

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks clean to me, thanks for this!

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Comment thread sentry_streams/sentry_streams/metrics/metrics.py
Copy link
Copy Markdown
Member

@bmckerry bmckerry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit + bugbot comment

Comment on lines +241 to +242
self.__metrics_config = metrics_config
self.__metric_config = build_py_metrics_config(metrics_config)
Copy link
Copy Markdown
Member

@bmckerry bmckerry Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename self.__metric_config to distinguish it from self.__metrics_config (maybe self.__py_metrics_config if I'm understanding this right)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I had removed this

@fpacifici fpacifici force-pushed the fpacifici/add_missing_metrics branch from 7dc48a1 to 63b66ca Compare March 30, 2026 19:45
@fpacifici fpacifici merged commit 9a9448f into main Mar 30, 2026
23 checks passed
@fpacifici fpacifici deleted the fpacifici/add_missing_metrics branch March 30, 2026 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants