Skip to content

Commit 8ab1f30

Browse files
committed
Fix three critical metrics backend bugs
Bug 1 (high): Fix constructor tags being dropped when per-call tags are empty dict - In DatadogMetricsBackend, the condition 'if tags else None' was falsy for empty dict {} - BufferedMetricsBackend always passes a dict (possibly empty), causing constructor tags to be lost - Fixed by calling _combine_tags first, then checking if the result is truthy - Added test to verify constructor tags are preserved with empty dict Bug 2 (medium): Fix ignored period_sec parameter in LogMetricsBackend - LogMetricsBackend accepted period_sec but never used it - configure_metrics always used hardcoded METRICS_FREQUENCY_SEC (10s) - Fixed by removing unused parameter from LogMetricsBackend constructor - Added throttle_interval_sec parameter to configure_metrics - Updated runner.py to pass period_sec to configure_metrics - Updated all test calls to remove period_sec parameter Bug 3 (high): Fix unpicklable Datadog backend in multiprocessing initializer - get_inner_metrics() returned DatadogMetricsBackend with unpicklable thread/socket - Multiprocessing with spawn context cannot pickle threads or sockets - Fixed by creating serializable MetricsConfig dataclass - Pass config to initializer instead of backend instance - Initializer recreates backend from config in each subprocess - Updated runner.py to create and store MetricsConfig - Updated rust_arroyo.py to use get_metrics_config() and pass config to initializer
1 parent 0b31c9a commit 8ab1f30

6 files changed

Lines changed: 124 additions & 23 deletions

File tree

sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@
4141
get_metrics,
4242
get_size,
4343
)
44-
from sentry_streams.metrics.metrics import get_inner_metrics
44+
from sentry_streams.metrics.metrics import (
45+
MetricsConfig,
46+
build_metrics_backend_from_config,
47+
get_metrics_config,
48+
)
4549
from sentry_streams.pipeline.function_template import (
4650
InputType,
4751
OutputType,
@@ -79,9 +83,10 @@
7983
logger = logging.getLogger(__name__)
8084

8185

82-
def initializer(metrics: MetricsBackend) -> None:
83-
# Reinitialize the metrics backend for each process
84-
configure_metrics(metrics)
86+
def initializer(config: MetricsConfig) -> None:
87+
# Reinitialize the metrics backend for each process from serializable config
88+
metrics = build_metrics_backend_from_config(config)
89+
configure_metrics(metrics, config=config, throttle_interval_sec=config.throttle_interval_sec)
8590

8691

8792
def _metrics_wrapped_function(
@@ -194,6 +199,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
194199
f"batch_time={config['batch_time']}"
195200
)
196201

202+
metrics_config = get_metrics_config()
197203
return RuntimeOperator.PythonAdapter(
198204
rust_route,
199205
MultiprocessDelegateFactory(
@@ -202,7 +208,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
202208
config["batch_time"],
203209
MultiprocessingPool(
204210
num_processes=config["processes"],
205-
initializer=functools.partial(initializer, get_inner_metrics()),
211+
initializer=functools.partial(initializer, metrics_config),
206212
),
207213
input_block_size=config.get("input_block_size"),
208214
output_block_size=config.get("output_block_size"),

sentry_streams/sentry_streams/metrics/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Metric,
77
Metrics,
88
MetricsBackend,
9+
MetricsConfig,
910
configure_metrics,
1011
get_metrics,
1112
get_size,
@@ -21,5 +22,6 @@
2122
"Metric",
2223
"Metrics",
2324
"MetricsBackend",
25+
"MetricsConfig",
2426
"get_size",
2527
]

sentry_streams/sentry_streams/metrics/metrics.py

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import time
55
from abc import abstractmethod
6+
from dataclasses import dataclass
67
from enum import Enum
78
from typing import (
89
Any,
@@ -159,24 +160,27 @@ def __normalize_tags(self, tags: Tags) -> list[str]:
159160
def increment(
160161
self, name: str, value: Union[int, float] = 1, tags: Optional[Tags] = None
161162
) -> None:
163+
combined_tags = _combine_tags(self.__tags, tags)
162164
self.datadog_client.increment(
163165
name,
164166
value,
165-
tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
167+
tags=self.__normalize_tags(combined_tags) if combined_tags else None,
166168
)
167169

168170
def gauge(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None:
171+
combined_tags = _combine_tags(self.__tags, tags)
169172
self.datadog_client.gauge(
170173
name,
171174
value,
172-
tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
175+
tags=self.__normalize_tags(combined_tags) if combined_tags else None,
173176
)
174177

175178
def timing(self, name: str, value: Union[int, float], tags: Optional[Tags] = None) -> None:
179+
combined_tags = _combine_tags(self.__tags, tags)
176180
self.datadog_client.timing(
177181
name,
178182
value,
179-
tags=self.__normalize_tags(_combine_tags(self.__tags, tags)) if tags else None,
183+
tags=self.__normalize_tags(combined_tags) if combined_tags else None,
180184
)
181185

182186

@@ -186,10 +190,9 @@ class LogMetricsBackend(MetricsBackend):
186190
as LogFlusher: ``prefix | counter|gauge|timing name=value tag1:val1 ...``.
187191
"""
188192

189-
def __init__(self, period_sec: float, tags: Optional[Tags] = None) -> None:
193+
def __init__(self, tags: Optional[Tags] = None) -> None:
190194
self.__prefix = METRICS_PREFIX.strip(".")
191195
self.__base_tags: Tags = tags if tags is not None else {}
192-
self._period_sec = period_sec
193196

194197
@staticmethod
195198
def __normalize_tags(tags: Tags) -> list[str]:
@@ -384,12 +387,52 @@ def timing(
384387
self.__backend.timing(name, value, tags=_tags_from_mapping(tags))
385388

386389

390+
@dataclass
391+
class MetricsConfig:
392+
"""
393+
Serializable configuration for metrics backends.
394+
Used to pass metrics configuration to multiprocessing workers.
395+
"""
396+
397+
backend_type: str
398+
host: Optional[str] = None
399+
port: Optional[int] = None
400+
tags: Optional[Tags] = None
401+
udp_queue_size: Optional[int] = None
402+
throttle_interval_sec: Optional[float] = None
403+
404+
405+
def build_metrics_backend_from_config(config: MetricsConfig) -> MetricsBackend:
406+
"""
407+
Build a metrics backend from a serializable configuration.
408+
This is used in multiprocessing workers to recreate the backend.
409+
"""
410+
if config.backend_type == "datadog":
411+
assert config.host is not None and config.port is not None
412+
return DatadogMetricsBackend(
413+
host=config.host,
414+
port=config.port,
415+
tags=config.tags,
416+
udp_queue_size=config.udp_queue_size,
417+
)
418+
elif config.backend_type == "log":
419+
return LogMetricsBackend(tags=config.tags)
420+
else:
421+
return DummyMetricsBackend()
422+
423+
387424
_inner_metrics_backend: Optional[MetricsBackend] = None
388425
_metrics_backend: Optional[MetricsBackend] = None
426+
_metrics_config: Optional[MetricsConfig] = None
389427
_dummy_metrics_backend = DummyMetricsBackend()
390428

391429

392-
def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None:
430+
def configure_metrics(
431+
metrics: MetricsBackend,
432+
force: bool = False,
433+
throttle_interval_sec: Optional[float] = None,
434+
config: Optional[MetricsConfig] = None,
435+
) -> None:
393436
"""
394437
Metrics can generally only be configured once, unless force is passed
395438
on subsequent initializations.
@@ -398,14 +441,17 @@ def configure_metrics(metrics: MetricsBackend, force: bool = False) -> None:
398441
"""
399442
global _metrics_backend
400443
global _inner_metrics_backend
444+
global _metrics_config
401445
if not force:
402446
assert _metrics_backend is None, "Metrics is already set"
403447

404448
# Runtime-check the metrics implementation so misconfiguration fails early.
405449
assert isinstance(metrics, MetricsBackend)
406450

407451
_inner_metrics_backend = metrics
408-
_metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=METRICS_FREQUENCY_SEC)
452+
interval = throttle_interval_sec if throttle_interval_sec is not None else METRICS_FREQUENCY_SEC
453+
_metrics_backend = BufferedMetricsBackend(metrics, throttle_interval_sec=interval)
454+
_metrics_config = config
409455
arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend))
410456

411457

@@ -415,6 +461,14 @@ def get_inner_metrics() -> MetricsBackend:
415461
return _inner_metrics_backend
416462

417463

464+
def get_metrics_config() -> Optional[MetricsConfig]:
465+
"""
466+
Get the serializable metrics configuration.
467+
This is used to pass metrics configuration to multiprocessing workers.
468+
"""
469+
return _metrics_config
470+
471+
418472
def get_metrics() -> Metrics:
419473
if _metrics_backend is None:
420474
return Metrics(_dummy_metrics_backend)

sentry_streams/sentry_streams/runner.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
DummyMetricsBackend,
1818
LogMetricsBackend,
1919
MetricsBackend,
20+
MetricsConfig,
2021
configure_metrics,
2122
)
2223
from sentry_streams.pipeline.config import load_config
@@ -106,6 +107,7 @@ def load_runtime(
106107

107108
metric_config = environment_config.get("metrics", {})
108109
metrics_backend: MetricsBackend
110+
serializable_config: MetricsConfig
109111
if metric_config.get("type") == "datadog":
110112
default_tags = metric_config.get("tags", {})
111113
default_tags["pipeline"] = name
@@ -116,7 +118,14 @@ def load_runtime(
116118
tags=default_tags,
117119
udp_queue_size=metric_config.get("udp_queue_size"),
118120
)
119-
configure_metrics(metrics_backend)
121+
serializable_config = MetricsConfig(
122+
backend_type="datadog",
123+
host=metric_config["host"],
124+
port=metric_config["port"],
125+
tags=default_tags,
126+
udp_queue_size=metric_config.get("udp_queue_size"),
127+
)
128+
configure_metrics(metrics_backend, config=serializable_config)
120129
metric_config = {
121130
"host": metric_config["host"],
122131
"port": metric_config["port"],
@@ -129,14 +138,23 @@ def load_runtime(
129138
default_tags["pipeline"] = name
130139

131140
metrics_backend = LogMetricsBackend(
132-
period_sec=metric_config["period_sec"],
133141
tags=default_tags,
134142
)
135-
configure_metrics(metrics_backend)
143+
serializable_config = MetricsConfig(
144+
backend_type="log",
145+
tags=default_tags,
146+
throttle_interval_sec=metric_config["period_sec"],
147+
)
148+
configure_metrics(
149+
metrics_backend,
150+
throttle_interval_sec=metric_config["period_sec"],
151+
config=serializable_config,
152+
)
136153
metric_config = {}
137154
else:
138155
metrics_backend = DummyMetricsBackend()
139-
configure_metrics(metrics_backend)
156+
serializable_config = MetricsConfig(backend_type="dummy")
157+
configure_metrics(metrics_backend, config=serializable_config)
140158
metric_config = {}
141159

142160
assigned_segment_id = int(segment_id) if segment_id else None

sentry_streams/tests/pipeline/test_metrics.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,27 @@ def test_datadog_increment_merges_constructor_tags(mock_dogstatsd: Any) -> None:
138138
assert set(mock_client.timing.call_args[1]["tags"]) == expected
139139

140140

141+
@patch("sentry_streams.metrics.metrics.DogStatsd")
142+
def test_datadog_constructor_tags_preserved_with_empty_dict(mock_dogstatsd: Any) -> None:
143+
backend = DatadogMetricsBackend("localhost", 8125, tags={"service": "streams"})
144+
mock_client = mock_dogstatsd.return_value
145+
146+
backend.increment(_metric(Metric.INPUT_MESSAGES), 1, tags={})
147+
backend.gauge(_metric(Metric.INPUT_BYTES), 100, tags={})
148+
backend.timing(_metric(Metric.DURATION), 1500, tags={})
149+
150+
expected = ["service:streams"]
151+
mock_client.increment.assert_called_once()
152+
assert mock_client.increment.call_args[0] == ("input.messages", 1)
153+
assert mock_client.increment.call_args[1]["tags"] == expected
154+
mock_client.gauge.assert_called_once()
155+
assert mock_client.gauge.call_args[0] == ("input.bytes", 100)
156+
assert mock_client.gauge.call_args[1]["tags"] == expected
157+
mock_client.timing.assert_called_once()
158+
assert mock_client.timing.call_args[0] == ("duration", 1500)
159+
assert mock_client.timing.call_args[1]["tags"] == expected
160+
161+
141162
@patch("sentry_streams.metrics.metrics.DogStatsd")
142163
@patch("time.time")
143164
def test_buffered_increment_with_throttling(mock_time: Any, mock_dogstatsd: Any) -> None:
@@ -248,7 +269,7 @@ def test_arroyo_methods_without_tags_pass_empty_dict() -> None:
248269

249270
@patch("sentry_streams.metrics.metrics.logger")
250271
def test_log_increment_logs_immediately(mock_logger: Any) -> None:
251-
backend = LogMetricsBackend(period_sec=15.0, tags={"env": "test"})
272+
backend = LogMetricsBackend(tags={"env": "test"})
252273
mock_info = mock_logger.info
253274

254275
backend.increment(_metric(Metric.INPUT_MESSAGES), 1)
@@ -261,7 +282,7 @@ def test_log_increment_logs_immediately(mock_logger: Any) -> None:
261282

262283
@patch("sentry_streams.metrics.metrics.logger")
263284
def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None:
264-
backend = LogMetricsBackend(period_sec=60.0)
285+
backend = LogMetricsBackend()
265286
mock_info = mock_logger.info
266287

267288
backend.increment(_metric(Metric.INPUT_MESSAGES), 1)
@@ -274,7 +295,7 @@ def test_log_each_call_emits_separate_log_line(mock_logger: Any) -> None:
274295
@patch("time.time")
275296
def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -> None:
276297
mock_time.return_value = 0.0
277-
inner = LogMetricsBackend(period_sec=60.0)
298+
inner = LogMetricsBackend()
278299
backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
279300
mock_info = mock_logger.info
280301

@@ -297,7 +318,7 @@ def test_buffered_log_accumulation_and_flush(mock_time: Any, mock_logger: Any) -
297318
@patch("time.time")
298319
def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) -> None:
299320
mock_time.return_value = 0.0
300-
inner = LogMetricsBackend(period_sec=60.0)
321+
inner = LogMetricsBackend()
301322
backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
302323
mock_info = mock_logger.info
303324

@@ -321,7 +342,7 @@ def test_buffered_log_flush_logs_and_clears(mock_time: Any, mock_logger: Any) ->
321342
@patch("time.time")
322343
def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None:
323344
mock_time.return_value = 0.0
324-
inner = LogMetricsBackend(period_sec=60.0)
345+
inner = LogMetricsBackend()
325346
backend = BufferedMetricsBackend(inner, throttle_interval_sec=10.0)
326347
mock_info = mock_logger.info
327348

@@ -337,7 +358,7 @@ def test_buffered_log_throttled_flush(mock_time: Any, mock_logger: Any) -> None:
337358
@patch("time.time")
338359
def test_buffered_log_global_tags_from_inner(mock_time: Any, mock_logger: Any) -> None:
339360
mock_time.return_value = 0.0
340-
inner = LogMetricsBackend(period_sec=60.0, tags={"service": "streams"})
361+
inner = LogMetricsBackend(tags={"service": "streams"})
341362
backend = BufferedMetricsBackend(inner, throttle_interval_sec=60.0)
342363
mock_info = mock_logger.info
343364

sentry_streams/uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)