Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/models/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
Expand Down Expand Up @@ -161,6 +162,19 @@ def get_metric_info(self, status: CallbackState, result: Any) -> dict:
# Remove the context (if exists) to keep the tags simple
tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != "context"}

# Metric backends (statsd, OpenTelemetry) require tag values to be primitives.
# OTel's aggregation key is built via ``frozenset(attributes.items())``, which
# raises ``TypeError: unhashable type: 'dict'`` if a value is a dict/list. The
# callback's ``result`` (passed in from a user callback) and ``kwargs`` are both
# frequently dicts, so coerce any non-primitive tag value to a JSON string before
# returning. Using ``default=str`` so values like ``datetime`` fall back cleanly.
tags = {
k: v
if isinstance(v, (str, int, float, bool)) or v is None
else json.dumps(v, default=str, sort_keys=True)
for k, v in tags.items()
}

prefix = self.data.get("prefix", "")
name = f"{prefix}.callback_{status}" if prefix else f"callback_{status}"

Expand Down
28 changes: 27 additions & 1 deletion airflow-core/tests/unit/models/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,36 @@ def test_get_metric_info(self):
assert metric_info["tags"] == {
"result": "0",
"path": TEST_ASYNC_CALLBACK.path,
"kwargs": {"email": "test@example.com"},
"kwargs": '{"email": "test@example.com"}',
"dag_id": TEST_DAG_ID,
}

def test_get_metric_info_dict_values_are_stringified(self):
"""
Regression for ``TypeError: unhashable type: 'dict'`` raised by OpenTelemetry's
``_view_instrument_match`` when callback metric tags contain dict/list values.

OTel builds its aggregation key as ``frozenset(attributes.items())``; any tag
value that isn't hashable (dict, list, set) crashes the triggerer when a
callback completes — e.g., deadline async callbacks whose ``result`` is a dict.
"""
callback = TriggererCallback(TEST_ASYNC_CALLBACK, prefix="deadline_alerts", dag_id=TEST_DAG_ID)
callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, "nested": {"a": 1}}

# ``result`` is a dict — exactly the case that surfaced in the deadline DAG.
metric_info = callback.get_metric_info(CallbackState.SUCCESS, {"output": [1, 2], "code": 0})

# Every tag value must be a primitive (str/int/float/bool/None) so OTel can hash it.
for k, v in metric_info["tags"].items():
assert isinstance(v, (str, int, float, bool)) or v is None, (
f"Tag {k!r}={v!r} is type {type(v).__name__}; must be primitive for OTel."
)
# ``frozenset(attributes.items())`` must not raise.
frozenset(metric_info["tags"].items())
# Stringified tag values must be sorted so equivalent kwargs in different
# insertion order collapse to one metric series (no needless cardinality split).
assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}'


class TestTriggererCallback:
def test_polymorphic_serde(self, session):
Expand Down
Loading