Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sentry_streams/sentry_streams/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,29 @@
"port"
]
},
{
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"log"
]
},
"period_sec": {
"type": "number",
"minimum": 0.1
},
"tags": {
"type": "object",
"additionalProperties": true
}
},
"required": [
"type",
"period_sec"
]
},
{
"type": "object",
"properties": {
Expand Down
6 changes: 6 additions & 0 deletions sentry_streams/sentry_streams/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from sentry_streams.metrics.metrics import (
METRICS_PREFIX,
DatadogMetricsBackend,
DummyMetricsBackend,
LogMetricsBackend,
Metric,
Metrics,
configure_metrics,
get_metrics,
get_size,
Expand All @@ -12,6 +15,9 @@
"get_metrics",
"DatadogMetricsBackend",
"DummyMetricsBackend",
"LogMetricsBackend",
"METRICS_PREFIX",
"Metric",
"Metrics",
"get_size",
]
191 changes: 149 additions & 42 deletions sentry_streams/sentry_streams/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
from __future__ import annotations

import logging
import time
from abc import abstractmethod
from enum import Enum
from typing import Any, Mapping, Optional, Protocol, Union, runtime_checkable
from typing import (
Any,
Iterable,
Mapping,
Optional,
Protocol,
Union,
runtime_checkable,
)

from arroyo.utils.metric_defs import MetricName as ArroyoMetricName
from arroyo.utils.metrics import DummyMetricsBackend as ArroyoDummyMetricsBackend
from arroyo.utils.metrics import configure_metrics as arroyo_configure_metrics
from datadog.dogstatsd.base import DogStatsd

Tags = dict[str, str]
logger = logging.getLogger("sentry_streams.metrics.log_backend")


METRICS_FREQUENCY_SEC = 10

# Single source of truth for the metrics namespace used by both Datadog and Log backends.
METRICS_PREFIX = "streams.pipeline"

# max number of (UDP) packets in the dogstatsd queue. 0 means unlimited.
SENDER_QUEUE_SIZE = 100000
# do not block process shutdown on metrics.
Expand All @@ -24,22 +37,22 @@
class Metric(Enum):
# This counts how many messages were input into the step in the pipeline.
# Tags: step, pipeline
INPUT_MESSAGES = "streams.pipeline.input.messages"
INPUT_MESSAGES = "input.messages"
# This counts how many bytes were input into the step in the pipeline.
# Tags: step, pipeline
INPUT_BYTES = "streams.pipeline.input.bytes"
INPUT_BYTES = "input.bytes"
# This counts how many messages were output from the step in the pipeline. Useful for filter/batch steps.
# Tags: step, pipeline
OUTPUT_MESSAGES = "streams.pipeline.output.messages"
OUTPUT_MESSAGES = "output.messages"
# This counts how many bytes were output from the step in the pipeline. Useful for filter/batch steps.
# Tags: step, pipeline
OUTPUT_BYTES = "streams.pipeline.output.bytes"
OUTPUT_BYTES = "output.bytes"
# This times how long the application code in the step took to run.
# Tags: step, pipeline
DURATION = "streams.pipeline.duration"
DURATION = "duration"
# This counts how many errors were encountered in the step in the pipeline.
# Tags: step, pipeline, error_type
ERRORS = "streams.pipeline.errors"
ERRORS = "errors"


@runtime_checkable
Expand Down Expand Up @@ -118,39 +131,90 @@ def remove_global_tags(self, tags: Tags) -> None:
BufferedMetric = tuple[Metric, float, list[str] | None]


class DatadogMetricsBackend(Metrics):
@runtime_checkable
class MetricsFlusher(Protocol):
"""
Datadog metrics backend.
Protocol for flushing buffered metrics to a destination (e.g. Datadog or log).
"""

def flush(
self,
timers: Iterable[BufferedMetric],
counters: Iterable[BufferedMetric],
gauges: Iterable[BufferedMetric],
) -> None:
"""Write the buffered metrics to the destination."""
...


class DatadogFlusher:
"""Flusher that sends buffered metrics to a DogStatsd client."""

def __init__(self, client: DogStatsd) -> None:
self.__client = client

def flush(
self,
timers: Iterable[BufferedMetric],
counters: Iterable[BufferedMetric],
gauges: Iterable[BufferedMetric],
) -> None:
for name, value, tags in timers:
self.__client.timing(name.value, value, tags=tags or [])
for name, value, tags in counters:
self.__client.increment(name.value, value, tags=tags or [])
for name, value, tags in gauges:
self.__client.gauge(name.value, value, tags=tags or [])


class LogFlusher:
"""Flusher that writes buffered metrics to a logger."""

def __init__(self, prefix: str) -> None:
self.__prefix = prefix

def flush(
self,
timers: Iterable[BufferedMetric],
counters: Iterable[BufferedMetric],
gauges: Iterable[BufferedMetric],
) -> None:
parts: list[str] = [self.__prefix]
for name, value, tags in timers:
tags_str = " ".join(tags) if tags else ""
parts.append(f"timing {name.value}={value} {tags_str}".strip())
for name, value, tags in counters:
tags_str = " ".join(tags) if tags else ""
parts.append(f"counter {name.value}={value} {tags_str}".strip())
for name, value, tags in gauges:
tags_str = " ".join(tags) if tags else ""
parts.append(f"gauge {name.value}={value} {tags_str}".strip())
if len(parts) > 1:
logger.info(" | ".join(parts))
else:
logger.info("No metrics to flush")
Comment thread
cursor[bot] marked this conversation as resolved.


class BufferedMetricsBackend(Metrics):
"""
Metrics backend that buffers updates and periodically flushes them
via an injected flusher (e.g. Datadog or log).
"""

def __init__(
self,
host: str,
port: int,
prefix: str,
flusher: MetricsFlusher,
throttle_interval_sec: float,
tags: Optional[Tags] = None,
udp_queue_size: Optional[int] = None,
) -> None:
self.host = host
self.port = port
self.prefix = prefix if prefix.endswith(".") else prefix + "."
self.__flusher = flusher
self.__throttle_interval_sec = throttle_interval_sec
self.tags = tags
self.__normalized_tags = self.__normalize_tags(tags) if tags is not None else []
self.datadog_client = DogStatsd(
host=host,
port=port,
namespace=self.prefix,
constant_tags=self.__normalized_tags,
)
# ignore mypy because that method just is untyped, yet part of public API
self.datadog_client.enable_background_sender( # type: ignore[no-untyped-call]
sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE,
sender_queue_timeout=SENDER_QUEUE_TIMEOUT,
)
self.__timers: dict[int, BufferedMetric] = {}
self.__counters: dict[int, BufferedMetric] = {}
self.__gauges: dict[int, BufferedMetric] = {}
self.__last_record_time = 0.0
self.__last_flush_time = 0.0

def __add_to_buffer(
self,
Expand Down Expand Up @@ -183,15 +247,15 @@ def increment(
tags: Optional[Tags] = None,
) -> None:
self.__add_to_buffer(self.__counters, name, value, tags)
self.__throttled_record()
self.__throttled_flush()

def gauge(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None:
self.__add_to_buffer(self.__gauges, name, value, tags, replace=True)
self.__throttled_record()
self.__throttled_flush()

def timing(self, name: Metric, value: Union[int, float], tags: Optional[Tags] = None) -> None:
self.__add_to_buffer(self.__timers, name, value, tags)
self.__throttled_record()
self.__throttled_flush()

def add_global_tags(self, tags: Tags) -> None:
if self.tags is None:
Expand All @@ -207,25 +271,68 @@ def remove_global_tags(self, tags: Tags) -> None:
self.tags.pop(tag, None)
self.__normalized_tags = self.__normalize_tags(self.tags)

def flush(self) -> None:
for name, value, tags in self.__timers.values():
self.datadog_client.timing(self.prefix + name.value, value, tags=tags)
for name, value, tags in self.__counters.values():
self.datadog_client.increment(self.prefix + name.value, value, tags=tags)
for name, value, tags in self.__gauges.values():
self.datadog_client.gauge(self.prefix + name.value, value, tags=tags)
def __throttled_flush(self) -> None:
if time.time() - self.__last_flush_time >= self.__throttle_interval_sec:
self.flush()

def flush(self) -> None:
self.__flusher.flush(
self.__timers.values(),
self.__counters.values(),
self.__gauges.values(),
)
self.__reset()

def __reset(self) -> None:
self.__timers.clear()
self.__counters.clear()
self.__gauges.clear()
self.__last_record_time = time.time()
self.__last_flush_time = time.time()

def __throttled_record(self) -> None:
if time.time() - self.__last_record_time > METRICS_FREQUENCY_SEC:
self.flush()

class DatadogMetricsBackend(BufferedMetricsBackend):
"""
Datadog metrics backend. Buffers metrics and flushes to DogStatsd.
"""

def __init__(
self,
host: str,
port: int,
prefix: str,
tags: Optional[Tags] = None,
udp_queue_size: Optional[int] = None,
) -> None:
# Do not pass constant_tags to DogStatsd: BufferedMetricsBackend already
# adds tags to each metric. Passing both would duplicate tags.
self.datadog_client = DogStatsd(
host=host,
port=port,
namespace=prefix.strip("."),
constant_tags=[],
)
# ignore mypy because that method just is untyped, yet part of public API
self.datadog_client.enable_background_sender( # type: ignore[no-untyped-call]
sender_queue_size=udp_queue_size if udp_queue_size is not None else SENDER_QUEUE_SIZE,
sender_queue_timeout=SENDER_QUEUE_TIMEOUT,
)
flusher = DatadogFlusher(self.datadog_client)
super().__init__(flusher, METRICS_FREQUENCY_SEC, tags)
Comment thread
fpacifici marked this conversation as resolved.


class LogMetricsBackend(BufferedMetricsBackend):
"""
Metrics backend that buffers metrics and periodically logs accumulated
values at a configurable interval.
"""

def __init__(
self,
period_sec: float,
tags: Optional[Tags] = None,
) -> None:
flusher = LogFlusher(METRICS_PREFIX.strip("."))
super().__init__(flusher, period_sec, tags)


class ArroyoDatadogMetricsBackend:
Expand Down
19 changes: 17 additions & 2 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
StreamT,
)
from sentry_streams.metrics import (
METRICS_PREFIX,
DatadogMetricsBackend,
DummyMetricsBackend,
LogMetricsBackend,
Metrics,
configure_metrics,
)
from sentry_streams.pipeline.config import load_config
Expand Down Expand Up @@ -103,14 +106,15 @@ def load_runtime(
validate_all_branches_have_sinks(pipeline)

metric_config = environment_config.get("metrics", {})
metrics: Metrics
if metric_config.get("type") == "datadog":
default_tags = metric_config.get("tags", {})
default_tags["pipeline"] = name

metrics = DatadogMetricsBackend(
metric_config["host"],
metric_config["port"],
"sentry_streams",
METRICS_PREFIX,
default_tags,
metric_config.get("udp_queue_size"),
)
Expand All @@ -122,8 +126,19 @@ def load_runtime(
"flush_interval_ms": metric_config.get("flush_interval_ms"),
"udp_queue_size": metric_config.get("udp_queue_size"),
}
elif metric_config.get("type") == "log":
default_tags = metric_config.get("tags", {})
default_tags["pipeline"] = name

metrics = LogMetricsBackend(
period_sec=metric_config["period_sec"],
tags=default_tags,
)
configure_metrics(metrics)
metric_config = {}
else:
configure_metrics(DummyMetricsBackend())
metrics = DummyMetricsBackend()
configure_metrics(metrics)
metric_config = {}

assigned_segment_id = int(segment_id) if segment_id else None
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ struct MetricsFacadeRecorder;

impl Recorder for MetricsFacadeRecorder {
fn record_metric(&self, metric: Metric<'_>) {
let key = format!("{}", metric.key);
let key = format!("streams.pipeline.{}", metric.key);
let value_f64 = match metric.value {
sentry_arroyo::metrics::MetricValue::I64(v) => v as f64,
sentry_arroyo::metrics::MetricValue::U64(v) => v as f64,
Expand Down
Loading
Loading