Skip to content
48 changes: 39 additions & 9 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@
MultiProcessConfig,
StepConfig,
)
from sentry_streams.metrics import Metric, get_metrics, get_size
from sentry_streams.metrics import (
Metric,
MetricsConfig,
configure_metrics,
get_metrics,
get_size,
)
from sentry_streams.pipeline.function_template import (
InputType,
OutputType,
Expand Down Expand Up @@ -72,6 +78,22 @@
logger = logging.getLogger(__name__)


def build_py_metrics_config(cfg: MetricsConfig) -> PyMetricConfig | None:
"""Build Rust-side DogStatsD config from the same metrics dict used by configure_metrics."""
if cfg["type"] != "datadog":
return None
return PyMetricConfig(
host=cfg["host"],
port=cfg["port"],
tags=dict(cfg["tags"]),
flush_interval_ms=cfg.get("flush_interval_ms"),
)


def initializer(metrics_config: MetricsConfig) -> None:
configure_metrics(metrics_config, force=True)


def _metrics_wrapped_function(
step_name: str, application_function: Callable[[Message[Any]], Any], msg: Message[Any]
) -> Any:
Expand Down Expand Up @@ -171,7 +193,11 @@ def build_kafka_producer_config(
)


def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
def finalize_chain(
chains: TransformChains,
route: Route,
metrics_config: MetricsConfig,
) -> RuntimeOperator:
rust_route = RustRoute(route.source, route.waypoints)
config, func = chains.finalize(route)
if config:
Expand All @@ -181,6 +207,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
f"batch_size={config['batch_size']}, "
f"batch_time={config['batch_time']}"
)

return RuntimeOperator.PythonAdapter(
rust_route,
MultiprocessDelegateFactory(
Expand All @@ -189,6 +216,7 @@ def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
config["batch_time"],
MultiprocessingPool(
num_processes=config["processes"],
initializer=functools.partial(initializer, metrics_config),
),
input_block_size=config.get("input_block_size"),
output_block_size=config.get("output_block_size"),
Expand All @@ -205,33 +233,35 @@ class RustArroyoAdapter(StreamAdapter[Route, Route]):
def __init__(
self,
steps_config: Mapping[str, StepConfig],
metric_config: PyMetricConfig | None = None,
metrics_config: MetricsConfig,
write_healthcheck: bool = False,
) -> None:
super().__init__()
self.steps_config = steps_config
self.__metric_config = metric_config
self.__metrics_config = metrics_config
self.__write_healthcheck = write_healthcheck
self.__consumers: MutableMapping[str, ArroyoConsumer] = {}
self.__chains = TransformChains()

@classmethod
def build(
def build( # type: ignore[override]
cls,
config: PipelineConfig,
metric_config: PyMetricConfig | None = None,
metrics_config: MetricsConfig,
) -> Self:
steps_config = config["steps_config"]
adapter_config = config.get("adapter_config") or {}
arroyo_config = adapter_config.get("arroyo") or {}
write_healthcheck = bool(arroyo_config.get("write_healthcheck", False))

return cls(steps_config, metric_config, write_healthcheck)
return cls(steps_config, metrics_config, write_healthcheck)

def __close_chain(self, stream: Route) -> None:
if self.__chains.exists(stream):
logger.info(f"Closing transformation chain: {stream} and adding to pipeline")
self.__consumers[stream.source].add_step(finalize_chain(self.__chains, stream))
self.__consumers[stream.source].add_step(
finalize_chain(self.__chains, stream, self.__metrics_config)
)

def get_consumer(self, source: str) -> ArroyoConsumer:
return self.__consumers[source]
Expand Down Expand Up @@ -268,7 +298,7 @@ def source(self, step: Source[Any]) -> Route:
),
topic=step.stream_name,
schema=schema_name,
metric_config=self.__metric_config,
metric_config=build_py_metrics_config(self.__metrics_config),
write_healthcheck=self.__write_healthcheck,
)
return Route(source_name, [])
Expand Down
18 changes: 4 additions & 14 deletions sentry_streams/sentry_streams/adapters/loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import importlib.util as utils
import sys
from importlib import import_module
from typing import Any, Optional, TypeVar, cast
from typing import Optional, TypeVar, cast

from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter
from sentry_streams.metrics.metrics import MetricsConfig

Stream = TypeVar("Stream")
Sink = TypeVar("Sink")
Expand All @@ -12,8 +13,8 @@
def load_adapter(
adapter_type: str,
config: PipelineConfig,
metrics_config: MetricsConfig,
segment_id: Optional[int] = None,
metric_config: Optional[dict[str, Any]] = None,
) -> StreamAdapter[Stream, Sink]:
"""
Loads a StreamAdapter to run a pipeline.
Expand Down Expand Up @@ -58,22 +59,11 @@ def load_adapter(

if adapter_type == "rust_arroyo":
from sentry_streams.adapters.arroyo import RustArroyoAdapter
from sentry_streams.rust_streams import PyMetricConfig

# Convert dict metric_config to PyMetricConfig if provided
py_metric_config = None
if metric_config:
py_metric_config = PyMetricConfig(
host=metric_config["host"],
port=metric_config["port"],
tags=metric_config.get("tags"),
flush_interval_ms=metric_config.get("flush_interval_ms"),
)

# TODO: Fix this type as above.
return cast(
StreamAdapter[Stream, Sink],
RustArroyoAdapter.build(config, py_metric_config),
RustArroyoAdapter.build(config, metrics_config),
)
else:
mod, cls = adapter_type.rsplit(".", 1)
Expand Down
7 changes: 6 additions & 1 deletion sentry_streams/sentry_streams/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
"udp_queue_size": {
"type": "integer",
"minimum": 0
},
"flush_interval_ms": {
"type": "integer",
"minimum": 1
}
},
"required": [
Expand Down Expand Up @@ -110,7 +114,8 @@
},
"required": [
"type",
"period_sec"
"period_sec",
"tags"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
# process. Though chains are parallelized over multiple
# processes.
env: {}

metrics:
type: log
period_sec: 5
tags:
pipeline: errors
pipeline:
segments:
- steps_config:
Expand Down
12 changes: 12 additions & 0 deletions sentry_streams/sentry_streams/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
from sentry_streams.metrics.metrics import (
METRICS_PREFIX,
DatadogMetricsBackend,
DatadogMetricsConfig,
DummyMetricsBackend,
DummyMetricsConfig,
LogMetricsBackend,
LogMetricsConfig,
Metric,
Metrics,
MetricsBackend,
MetricsConfig,
build_metrics_backend,
configure_metrics,
get_metrics,
get_size,
)

__all__ = [
"build_metrics_backend",
"configure_metrics",
"get_metrics",
"DatadogMetricsBackend",
"DatadogMetricsConfig",
"DummyMetricsBackend",
"DummyMetricsConfig",
"LogMetricsBackend",
"LogMetricsConfig",
"METRICS_PREFIX",
"Metric",
"Metrics",
"MetricsBackend",
"MetricsConfig",
"get_size",
]
Loading
Loading