Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Declarative OTel histogram bucket views keyed by metric-name pattern.

PR #64207 standardised timer histograms on
:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation`
at the instrument-type level. This module extends that idea to non-timer
histograms (``*_count``, ``*_duration``, ``*_delay`` and similar families):
bucket shape is declared once, here, instead of being chosen per call site.

The mapping is intentionally narrow. Patterns are simple suffix globs and
each one maps to a single named aggregation. Deployments that need a
different bucket layout can pass an override dict to
:func:`build_views_for_patterns`.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from opentelemetry.sdk.metrics.view import (
ExplicitBucketHistogramAggregation,
ExponentialBucketHistogramAggregation,
View,
)

if TYPE_CHECKING:
from opentelemetry.sdk.metrics._internal.aggregation import Aggregation

# Latency-shaped metrics span milliseconds to minutes; exponential buckets
# adapt without hand-tuned boundaries.
LATENCY_BUCKETS: Aggregation = ExponentialBucketHistogramAggregation()

# Small unbounded counts (queue depths, retry attempts, fan-out sizes).
COUNT_BUCKETS: Aggregation = ExplicitBucketHistogramAggregation(
boundaries=(1, 2, 5, 10, 25, 50, 100, 250, 500, 1000),
)

# Large-range delays (schedule lag, dependency wait); seconds to hours.
DELAY_BUCKETS: Aggregation = ExplicitBucketHistogramAggregation(
boundaries=(1, 5, 15, 60, 300, 900, 1800, 3600, 7200, 21600),
)

DEFAULT_PATTERN_BUCKETS: dict[str, Aggregation] = {
"*_duration": LATENCY_BUCKETS,
"*_delay": DELAY_BUCKETS,
"*_count": COUNT_BUCKETS,
}


def build_views_for_patterns(
pattern_buckets: dict[str, Aggregation] | None = None,
) -> list[View]:
"""
Return one OTel ``View`` per ``(instrument-name pattern, aggregation)`` entry.

:param pattern_buckets: Optional override of the default pattern map. When
``None``, :data:`DEFAULT_PATTERN_BUCKETS` is used. Each key is a
``View.instrument_name`` glob (e.g. ``"*_duration"``) and each value
is the aggregation to apply.
:returns: A list of :class:`~opentelemetry.sdk.metrics.view.View` objects
suitable for passing to ``MeterProvider(views=...)``.
"""
mapping = DEFAULT_PATTERN_BUCKETS if pattern_buckets is None else pattern_buckets
return [
View(instrument_name=pattern, aggregation=aggregation) for pattern, aggregation in mapping.items()
]
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from ..common import get_otel_data_exporter
from ..otel_env_config import load_metrics_env_config
from .histogram_buckets import build_views_for_patterns
from .protocols import Timer
from .validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -439,6 +440,11 @@ def get_otel_logger(
so that bucket boundaries adapt automatically to the observed data range. This avoids
the need to hand-tune explicit bucket boundaries for metrics that span very different
scales (milliseconds to hours).

Pattern-keyed views from
:mod:`~airflow_shared.observability.metrics.histogram_buckets` are layered on top of
that baseline so non-timer histograms (``*_count``, ``*_duration``, ``*_delay``) get
bucket shapes appropriate to each family.
"""
otel_env_config = load_metrics_env_config()

Expand Down Expand Up @@ -489,16 +495,22 @@ def get_otel_logger(
except (ImportError, AttributeError):
pass

# Per-instrument-type baseline: every histogram defaults to exponential
# buckets. Pattern-keyed views from histogram_buckets layer on top to
# give specific metric-name families their own bucket shape.
histogram_views: list[View] = [
View(
instrument_type=metrics.Histogram,
aggregation=ExponentialBucketHistogramAggregation(),
),
*build_views_for_patterns(),
]

metrics.set_meter_provider(
MeterProvider(
resource=resource,
metric_readers=readers,
views=[
View(
instrument_type=metrics.Histogram,
aggregation=ExponentialBucketHistogramAggregation(),
)
],
views=histogram_views,
shutdown_on_exit=False,
),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest
from opentelemetry.sdk.metrics.view import (
ExplicitBucketHistogramAggregation,
ExponentialBucketHistogramAggregation,
View,
)

from airflow_shared.observability.metrics.histogram_buckets import (
COUNT_BUCKETS,
DEFAULT_PATTERN_BUCKETS,
DELAY_BUCKETS,
LATENCY_BUCKETS,
build_views_for_patterns,
)


class TestHistogramBuckets:
def test_default_patterns_cover_expected_families(self):
assert set(DEFAULT_PATTERN_BUCKETS) == {"*_count", "*_duration", "*_delay"}

@pytest.mark.parametrize(
("pattern", "expected"),
[
("*_duration", LATENCY_BUCKETS),
("*_delay", DELAY_BUCKETS),
("*_count", COUNT_BUCKETS),
],
)
def test_pattern_maps_to_expected_aggregation(self, pattern, expected):
assert DEFAULT_PATTERN_BUCKETS[pattern] is expected

def test_latency_buckets_is_exponential(self):
assert isinstance(LATENCY_BUCKETS, ExponentialBucketHistogramAggregation)

def test_count_and_delay_buckets_are_explicit(self):
assert isinstance(COUNT_BUCKETS, ExplicitBucketHistogramAggregation)
assert isinstance(DELAY_BUCKETS, ExplicitBucketHistogramAggregation)

def test_build_views_returns_one_view_per_pattern(self):
views = build_views_for_patterns()

assert len(views) == len(DEFAULT_PATTERN_BUCKETS)
assert all(isinstance(v, View) for v in views)

names = {v._instrument_name for v in views}
assert names == set(DEFAULT_PATTERN_BUCKETS)

def test_build_views_resolves_duration_pattern_to_latency_buckets(self):
views = build_views_for_patterns()
duration_view = next(v for v in views if v._instrument_name == "*_duration")

assert duration_view._aggregation is LATENCY_BUCKETS

def test_build_views_accepts_custom_pattern_mapping(self):
custom = {"*_custom": ExplicitBucketHistogramAggregation(boundaries=(1, 2, 3))}

views = build_views_for_patterns(custom)

assert len(views) == 1
assert views[0]._instrument_name == "*_custom"
assert views[0]._aggregation is custom["*_custom"]
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,13 @@ def test_get_otel_logger_uses_exponential_histogram_view(self, mock_provider, mo

call_kwargs = mock_provider.call_args.kwargs
views = call_kwargs["views"]
assert len(views) == 1
view = views[0]
assert isinstance(view, View)
assert isinstance(view._aggregation, ExponentialBucketHistogramAggregation)
# First view is the instrument-type baseline; remaining views come from
# the pattern-keyed bucket map in histogram_buckets.
assert all(isinstance(v, View) for v in views)
baseline = views[0]
assert isinstance(baseline._aggregation, ExponentialBucketHistogramAggregation)
pattern_names = {v._instrument_name for v in views[1:]}
assert pattern_names == {"*_count", "*_duration", "*_delay"}

def test_atexit_flush_on_process_exit(self):
"""
Expand Down
Loading