Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import functools
import logging
import random
import time
from dataclasses import replace
from typing import (
Expand Down Expand Up @@ -39,7 +40,6 @@
MetricsConfig,
configure_metrics,
get_metrics,
get_size,
)
from sentry_streams.pipeline.function_template import (
InputType,
Expand Down Expand Up @@ -97,47 +97,50 @@ def initializer(metrics_config: MetricsConfig) -> None:


def _metrics_wrapped_function(
step_name: str, application_function: Callable[[Message[Any]], Any], msg: Message[Any]
step_name: str,
sample_rate: float,
application_function: Callable[[Message[Any]], Any],
msg: Message[Any],
) -> Any:
"""
Module-level wrapper function for adding metrics to step functions.
This is defined at module level to be picklable for multiprocessing.
"""
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step_name, msg_size)
has_error = output_size = None
input_metrics(step_name, sample_rate)
has_error = None
start_time = time.time()
try:
result = application_function(msg)
output_size = get_size(result)
return result
except Exception as e:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step_name, has_error, start_time, output_size)
output_metrics(step_name, has_error, start_time, sample_rate)


def input_metrics(name: str, message_size: int | None) -> float:
def input_metrics(name: str, sample_rate: float) -> None:
if random.random() > sample_rate:
return
metrics = get_metrics()
tags = {"step": name}
metrics.increment(Metric.INPUT_MESSAGES, tags=tags)
if message_size is not None:
metrics.increment(Metric.INPUT_BYTES, tags=tags, value=message_size)
return time.time()
metrics.increment(Metric.INPUT_MESSAGES, value=1 / sample_rate, tags=tags)


def output_metrics(
name: str, error: str | None, start_time: float, message_size: int | None
name: str,
error: str | None,
start_time: float,
sample_rate: float,
) -> None:
if random.random() > sample_rate:
return
metrics = get_metrics()
tags = {"step": name}
if error:
tags["error"] = error
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The ERRORS metric is not scaled by 1/sample_rate to account for sampling, unlike INPUT_MESSAGES, leading to undercounted errors.
Severity: HIGH

Suggested Fix

In the output_metrics block, scale the ERRORS metric by 1 / sample_rate to match the handling of INPUT_MESSAGES. Change metrics.increment(Metric.ERRORS, tags=tags) to metrics.increment(Metric.ERRORS, value=1 / sample_rate, tags=tags).

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L141

Potential issue: The `ERRORS` metric in `output_metrics` is not scaled by
`1/sample_rate`, while the `INPUT_MESSAGES` metric is. This inconsistency leads to a
systematic undercounting of errors whenever sampling is active. The calculated error
rate (`ERRORS`/`INPUT_MESSAGES`) will be lower than the true rate by a factor of
`sample_rate`. This can cause monitoring dashboards and alerts to miss real production
issues, making the system appear more reliable than it actually is. For instance, with a
`sample_rate` of 0.1, only 10% of the actual errors will be reported.

Did we get this right? 👍 / 👎 to inform future reviews.

metrics.increment(Metric.ERRORS, tags=tags)

metrics.increment(Metric.OUTPUT_MESSAGES, tags=tags)
if message_size is not None:
metrics.increment(Metric.OUTPUT_BYTES, tags=tags, value=message_size)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error counter not scaled to compensate for sampling

High Severity

The Metric.ERRORS counter in output_metrics is not scaled by 1 / sample_rate, unlike Metric.INPUT_MESSAGES in input_metrics which correctly uses value=1 / sample_rate. Since output_metrics returns early based on random.random() > sample_rate, errors are both sampled down and recorded as a value of 1, leading to systematic under-reporting by a factor of 1/sample_rate (e.g., 10x with the default 0.1 rate). This makes error counts unreliable in monitoring dashboards.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6e7d58f. Configure here.

metrics.timing(Metric.DURATION, time.time() - start_time, tags=tags)


Expand Down Expand Up @@ -237,13 +240,15 @@ def __init__(
steps_config: Mapping[str, StepConfig],
metrics_config: MetricsConfig,
write_healthcheck: bool = False,
metrics_sample_rate: float = 0.1,
) -> None:
super().__init__()
self.steps_config = steps_config
self.__metrics_config = metrics_config
self.__write_healthcheck = write_healthcheck
self.__consumers: MutableMapping[str, ArroyoConsumer] = {}
self.__chains = TransformChains()
self.__metrics_sample_rate = metrics_sample_rate

@classmethod
def build( # type: ignore[override]
Expand All @@ -255,8 +260,9 @@ def build( # type: ignore[override]
adapter_config = config.get("adapter_config") or {}
arroyo_config = adapter_config.get("arroyo") or {}
write_healthcheck = bool(arroyo_config.get("write_healthcheck", False))
metrics_sample_rate = float(arroyo_config.get("metrics_sample_rate", 0.1))

return cls(steps_config, metrics_config, write_healthcheck)
return cls(steps_config, metrics_config, write_healthcheck, metrics_sample_rate)

def __close_chain(self, stream: Route) -> None:
if self.__chains.exists(stream):
Expand Down Expand Up @@ -326,11 +332,12 @@ def sink(self, step: Sink[Any], stream: Route) -> Route:
# Fix this to wrap the actual step instead of just the object_generator.
# This will at least capture the number of calls to the step, if nothing else.
def wrapped_generator() -> str:
start_time = input_metrics(step.name, None)
start_time = time.time()
input_metrics(step.name, self.__metrics_sample_rate)
try:
return step.object_generator()
finally:
output_metrics(step.name, None, start_time, None)
output_metrics(step.name, None, start_time, self.__metrics_sample_rate)

logger.info(f"Adding GCS sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
Expand Down Expand Up @@ -381,7 +388,7 @@ def map(self, step: Map[Any, Any], stream: Route) -> Route:
application_function = step.resolved_function

wrapped_function = functools.partial(
_metrics_wrapped_function, step.name, application_function
_metrics_wrapped_function, step.name, self.__metrics_sample_rate, application_function
)

step = replace(step, function=wrapped_function)
Expand Down Expand Up @@ -450,18 +457,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route:
elif isinstance(step, PredicateFilter):

def filter_msg(msg: Message[Any]) -> bool:
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step.name, msg_size)
has_error = output_size = None
input_metrics(step.name, self.__metrics_sample_rate)
has_error = None
start_time = time.time()
try:
result = step.resolved_function(msg)
output_size = get_size(result)
return result
except Exception as e:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step.name, has_error, start_time, output_size)
output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate)

self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg))
return stream
Expand Down Expand Up @@ -537,8 +543,8 @@ def router(
route = RustRoute(stream.source, stream.waypoints)

def routing_function(msg: Message[Any]) -> str:
msg_size = get_size(msg.payload) if hasattr(msg, "payload") else None
start_time = input_metrics(step.name, msg_size)
input_metrics(step.name, self.__metrics_sample_rate)
start_time = time.time()
has_error = None
try:
waypoint = step.routing_function(msg)
Expand All @@ -548,7 +554,7 @@ def routing_function(msg: Message[Any]) -> str:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step.name, has_error, start_time, None)
output_metrics(step.name, has_error, start_time, self.__metrics_sample_rate)

logger.info(f"Adding router: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pipeline:
adapter_config:
arroyo:
write_healthcheck: true
metrics_sample_rate: 0.01
segments:
- steps_config:
myinput:
Expand Down
19 changes: 11 additions & 8 deletions sentry_streams/sentry_streams/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def timing(
self.__backend.timing(name, value, tags=_tags_from_mapping(tags))


_metrics_backend: Optional[MetricsBackend] = None
_metrics: Optional[Metrics] = None
_dummy_metrics_backend = DummyMetricsBackend()


Expand All @@ -458,22 +458,25 @@ def configure_metrics(config: MetricsConfig, force: bool = False) -> None:
``config.json``) so worker processes can rebuild the same backends under
``spawn`` multiprocessing.
"""
global _metrics_backend
global _metrics
if not force:
assert _metrics_backend is None, "Metrics is already set"
assert _metrics is None, "Metrics is already set"

inner = build_metrics_backend(config)
_metrics_backend = BufferedMetricsBackend(
backend = BufferedMetricsBackend(
inner,
throttle_interval_sec=_buffer_throttle_interval_sec(config),
)
arroyo_configure_metrics(ArroyoMetricsBackend(_metrics_backend))
_metrics = Metrics(backend)
arroyo_configure_metrics(ArroyoMetricsBackend(backend))


def get_metrics() -> Metrics:
if _metrics_backend is None:
return Metrics(_dummy_metrics_backend)
return Metrics(_metrics_backend)
global _metrics
if _metrics is None:
_metrics = Metrics(_dummy_metrics_backend)

return _metrics
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching dummy metrics breaks later configuration call

Medium Severity

get_metrics() now mutates the global _metrics by caching a dummy-backed Metrics instance when none is configured. Previously it returned a fresh throwaway object without side effects. If get_metrics() is ever called before configure_metrics(), the cached dummy will cause configure_metrics(force=False) to hit the assert _metrics is None guard and crash.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit f2ca949. Configure here.

Comment on lines 474 to +479
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The test fixture reset_metrics_backend was not updated after _metrics_backend was renamed to _metrics, causing state to leak between tests.
Severity: MEDIUM

Suggested Fix

In the reset_metrics_backend fixture located in test_metrics.py, update the line that resets the global variable to target the new name. Change metrics_module._metrics_backend = None to metrics_module._metrics = None.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_streams/sentry_streams/metrics/metrics.py#L474-L479

Potential issue: The global variable `_metrics_backend` was renamed to `_metrics`, but
the test fixture `reset_metrics_backend` was not updated to reflect this change. The
fixture continues to reset the old, non-existent `_metrics_backend` variable, causing
the state of the new `_metrics` variable to leak between tests. This leads to test
failures, particularly in functions like `configure_metrics()` which assert that
`_metrics` is `None` before initialization. The broken test fixture compromises the
integrity of the test suite, as tests will run with stale data from previous runs.



def get_size(obj: Any) -> int | None:
Expand Down
Loading