diff --git a/shared/observability/src/airflow_shared/observability/metrics/histogram_buckets.py b/shared/observability/src/airflow_shared/observability/metrics/histogram_buckets.py new file mode 100644 index 0000000000000..a111b068acb13 --- /dev/null +++ b/shared/observability/src/airflow_shared/observability/metrics/histogram_buckets.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Declarative OTel histogram bucket views keyed by metric-name pattern. + +PR #64207 standardised timer histograms on +:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation` +at the instrument-type level. This module extends that idea to non-timer +histograms (``*_count``, ``*_duration``, ``*_delay`` and similar families): +bucket shape is declared once, here, instead of being chosen per call site. + +The mapping is intentionally narrow. Patterns are simple suffix globs and +each one maps to a single named aggregation. Deployments that need a +different bucket layout can pass an override dict to +:func:`build_views_for_patterns`. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from opentelemetry.sdk.metrics.view import ( + ExplicitBucketHistogramAggregation, + ExponentialBucketHistogramAggregation, + View, +) + +if TYPE_CHECKING: + from opentelemetry.sdk.metrics._internal.aggregation import Aggregation + +# Latency-shaped metrics span milliseconds to minutes; exponential buckets +# adapt without hand-tuned boundaries. +LATENCY_BUCKETS: Aggregation = ExponentialBucketHistogramAggregation() + +# Small unbounded counts (queue depths, retry attempts, fan-out sizes). +COUNT_BUCKETS: Aggregation = ExplicitBucketHistogramAggregation( + boundaries=(1, 2, 5, 10, 25, 50, 100, 250, 500, 1000), +) + +# Large-range delays (schedule lag, dependency wait); seconds to hours. +DELAY_BUCKETS: Aggregation = ExplicitBucketHistogramAggregation( + boundaries=(1, 5, 15, 60, 300, 900, 1800, 3600, 7200, 21600), +) + +DEFAULT_PATTERN_BUCKETS: dict[str, Aggregation] = { + "*_duration": LATENCY_BUCKETS, + "*_delay": DELAY_BUCKETS, + "*_count": COUNT_BUCKETS, +} + + +def build_views_for_patterns( + pattern_buckets: dict[str, Aggregation] | None = None, +) -> list[View]: + """ + Return one OTel ``View`` per ``(instrument-name pattern, aggregation)`` entry. + + :param pattern_buckets: Optional override of the default pattern map. When + ``None``, :data:`DEFAULT_PATTERN_BUCKETS` is used. Each key is a + ``View.instrument_name`` glob (e.g. ``"*_duration"``) and each value + is the aggregation to apply. + :returns: A list of :class:`~opentelemetry.sdk.metrics.view.View` objects + suitable for passing to ``MeterProvider(views=...)``. + """ + mapping = DEFAULT_PATTERN_BUCKETS if pattern_buckets is None else pattern_buckets + return [ + View(instrument_name=pattern, aggregation=aggregation) for pattern, aggregation in mapping.items() + ] diff --git a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py index 5aaa77741f0e5..38142ff408865 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py @@ -35,6 +35,7 @@ from ..common import get_otel_data_exporter from ..otel_env_config import load_metrics_env_config +from .histogram_buckets import build_views_for_patterns from .protocols import Timer from .validators import ( OTEL_NAME_MAX_LENGTH, @@ -439,6 +440,11 @@ def get_otel_logger( so that bucket boundaries adapt automatically to the observed data range. This avoids the need to hand-tune explicit bucket boundaries for metrics that span very different scales (milliseconds to hours). + + Pattern-keyed views from + :mod:`~airflow_shared.observability.metrics.histogram_buckets` are layered on top of + that baseline so non-timer histograms (``*_count``, ``*_duration``, ``*_delay``) get + bucket shapes appropriate to each family. """ otel_env_config = load_metrics_env_config() @@ -489,16 +495,22 @@ def get_otel_logger( except (ImportError, AttributeError): pass + # Per-instrument-type baseline: every histogram defaults to exponential + # buckets. Pattern-keyed views from histogram_buckets layer on top to + # give specific metric-name families their own bucket shape. + histogram_views: list[View] = [ + View( + instrument_type=metrics.Histogram, + aggregation=ExponentialBucketHistogramAggregation(), + ), + *build_views_for_patterns(), + ] + metrics.set_meter_provider( MeterProvider( resource=resource, metric_readers=readers, - views=[ - View( - instrument_type=metrics.Histogram, - aggregation=ExponentialBucketHistogramAggregation(), - ) - ], + views=histogram_views, shutdown_on_exit=False, ), ) diff --git a/shared/observability/tests/observability/metrics/test_histogram_buckets.py b/shared/observability/tests/observability/metrics/test_histogram_buckets.py new file mode 100644 index 0000000000000..5a58fb98a3515 --- /dev/null +++ b/shared/observability/tests/observability/metrics/test_histogram_buckets.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +from opentelemetry.sdk.metrics.view import ( + ExplicitBucketHistogramAggregation, + ExponentialBucketHistogramAggregation, + View, +) + +from airflow_shared.observability.metrics.histogram_buckets import ( + COUNT_BUCKETS, + DEFAULT_PATTERN_BUCKETS, + DELAY_BUCKETS, + LATENCY_BUCKETS, + build_views_for_patterns, +) + + +class TestHistogramBuckets: + def test_default_patterns_cover_expected_families(self): + assert set(DEFAULT_PATTERN_BUCKETS) == {"*_count", "*_duration", "*_delay"} + + @pytest.mark.parametrize( + ("pattern", "expected"), + [ + ("*_duration", LATENCY_BUCKETS), + ("*_delay", DELAY_BUCKETS), + ("*_count", COUNT_BUCKETS), + ], + ) + def test_pattern_maps_to_expected_aggregation(self, pattern, expected): + assert DEFAULT_PATTERN_BUCKETS[pattern] is expected + + def test_latency_buckets_is_exponential(self): + assert isinstance(LATENCY_BUCKETS, ExponentialBucketHistogramAggregation) + + def test_count_and_delay_buckets_are_explicit(self): + assert isinstance(COUNT_BUCKETS, ExplicitBucketHistogramAggregation) + assert isinstance(DELAY_BUCKETS, ExplicitBucketHistogramAggregation) + + def test_build_views_returns_one_view_per_pattern(self): + views = build_views_for_patterns() + + assert len(views) == len(DEFAULT_PATTERN_BUCKETS) + assert all(isinstance(v, View) for v in views) + + names = {v._instrument_name for v in views} + assert names == set(DEFAULT_PATTERN_BUCKETS) + + def test_build_views_resolves_duration_pattern_to_latency_buckets(self): + views = build_views_for_patterns() + duration_view = next(v for v in views if v._instrument_name == "*_duration") + + assert duration_view._aggregation is LATENCY_BUCKETS + + def test_build_views_accepts_custom_pattern_mapping(self): + custom = {"*_custom": ExplicitBucketHistogramAggregation(boundaries=(1, 2, 3))} + + views = build_views_for_patterns(custom) + + assert len(views) == 1 + assert views[0]._instrument_name == "*_custom" + assert views[0]._aggregation is custom["*_custom"] diff --git a/shared/observability/tests/observability/metrics/test_otel_logger.py b/shared/observability/tests/observability/metrics/test_otel_logger.py index 4bc889d4f1377..18a0b7748593b 100644 --- a/shared/observability/tests/observability/metrics/test_otel_logger.py +++ b/shared/observability/tests/observability/metrics/test_otel_logger.py @@ -427,10 +427,13 @@ def test_get_otel_logger_uses_exponential_histogram_view(self, mock_provider, mo call_kwargs = mock_provider.call_args.kwargs views = call_kwargs["views"] - assert len(views) == 1 - view = views[0] - assert isinstance(view, View) - assert isinstance(view._aggregation, ExponentialBucketHistogramAggregation) + # First view is the instrument-type baseline; remaining views come from + # the pattern-keyed bucket map in histogram_buckets. + assert all(isinstance(v, View) for v in views) + baseline = views[0] + assert isinstance(baseline._aggregation, ExponentialBucketHistogramAggregation) + pattern_names = {v._instrument_name for v in views[1:]} + assert pattern_names == {"*_count", "*_duration", "*_delay"} def test_atexit_flush_on_process_exit(self): """