From 8a4112c6e060c9a9a124eff5feef7abad65b9cfc Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 12 May 2026 17:41:38 +0900 Subject: [PATCH 1/3] Fix non-deterministic serialization of non-jsonable objects in template fields --- .../src/airflow/serialization/helpers.py | 111 ++-- .../unit/dags/test_dag_decorator_version.py | 63 ++ .../unit/models/test_renderedtifields.py | 4 +- .../serialization/test_dag_serialization.py | 39 ++ .../tests/unit/serialization/test_helpers.py | 544 ++++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 122 ++-- .../execution_time/test_task_runner.py | 13 +- 7 files changed, 770 insertions(+), 126 deletions(-) create mode 100644 airflow-core/tests/unit/dags/test_dag_decorator_version.py diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index e2c8069a1164a..89d5bb4167228 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -19,87 +19,84 @@ from __future__ import annotations import contextlib +import inspect from typing import TYPE_CHECKING, Any from airflow._shared.module_loading import qualname from airflow._shared.secrets_masker import redact from airflow._shared.template_rendering import truncate_rendered_value from airflow.configuration import conf -from airflow.settings import json if TYPE_CHECKING: from airflow.partition_mappers.base import PartitionMapper from airflow.timetables.base import Timetable as CoreTimetable -def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: +def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float | bool: """ Return a serializable representation of the templated field. - If ``templated_field`` is provided via a callable then - return the following serialized value: ```` + The walk has two responsibilities: - If ``templated_field`` contains a class or instance that requires recursive - templating, store them as strings. Otherwise simply return the field as-is. + 1. **Make the template_field JSON-encodable** — every container is rebuilt + with primitive leaves (str/int/float/bool/None), tuples and sets are + flattened to lists, and unsupported objects fall through to ``str()`` + so ``json.dumps`` never raises on the result. + 2. **Keep the output deterministic across parses** — callables are replaced + with their qualified name (never the default ```` + repr), dicts are key-sorted, and (frozen)sets are sorted by element so + the same input always produces the same string. """ - def is_jsonable(x): - try: - json.dumps(x) - except (TypeError, OverflowError): - return False - else: - return True - - def translate_tuples_to_lists(obj: Any): - """Recursively convert tuples to lists.""" - if isinstance(obj, tuple): - return [translate_tuples_to_lists(item) for item in obj] - if isinstance(obj, list): - return [translate_tuples_to_lists(item) for item in obj] - if isinstance(obj, dict): - return {key: translate_tuples_to_lists(value) for key, value in obj.items()} - return obj + def normalize_dict_key(key) -> str: + """Normalize a dict key to a serialized string type.""" + # Serialized template_field keys must all be strings, not a mix of types, so that + # downstream json.dumps(..., sort_keys=True) does not raise on mixed-type keys. + return str(serialize_object(key)) + + def serialize_object(obj): + """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable structure.""" + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj - def sort_dict_recursively(obj: Any) -> Any: - """Recursively sort dictionaries to ensure consistent ordering.""" if isinstance(obj, dict): - return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())} - if isinstance(obj, list): - return [sort_dict_recursively(item) for item in obj] - if isinstance(obj, tuple): - return tuple(sort_dict_recursively(item) for item in obj) - return obj + # Serialize keys/values first so each key is a string and the output is hash-stable, + # then sort by the serialized key to prevents hash inconsistencies when dict ordering varies. + serialized_pairs = [(normalize_dict_key(k), serialize_object(v)) for k, v in obj.items()] + return dict(sorted(serialized_pairs, key=lambda kv: kv[0])) + + if isinstance(obj, (list, tuple)): + return [serialize_object(item) for item in obj] + + if isinstance(obj, (set, frozenset)): + # JSON has no set type → flatten to a list with deterministic ordering + # so hash randomization on element types cannot shift cross-process iteration order. + return sorted( + (serialize_object(item) for item in obj), + key=lambda x: (type(x).__name__, str(x)), + ) + + # Use inspect.getattr_static to bypass any custom __getattr__ / metaclass magic + if callable(inspect.getattr_static(obj, "serialize", None)): + return serialize_object(obj.serialize()) + + if callable(obj): + # Use qualified name; default repr embeds memory addresses, which would change the DAG hash on every parse + return f"" + + # Non-primitive objects without a serialize attribute are converted to str + # So they don't break json.dumps downstream + return str(obj) max_length = conf.getint("core", "max_templated_field_length") - if not is_jsonable(template_field): - try: - serialized = template_field.serialize() - except AttributeError: - if callable(template_field): - full_qualified_name = qualname(template_field, True) - serialized = f"" - else: - serialized = str(template_field) - if len(serialized) > max_length: - rendered = redact(serialized, name) - return truncate_rendered_value(str(rendered), max_length) - return serialized - if not template_field and not isinstance(template_field, tuple): - # Avoid unnecessary serialization steps for empty fields unless they are tuples - # and need to be converted to lists - return template_field - template_field = translate_tuples_to_lists(template_field) - # Sort dictionaries recursively to ensure consistent string representation - # This prevents hash inconsistencies when dict ordering varies - if isinstance(template_field, dict): - template_field = sort_dict_recursively(template_field) - serialized = str(template_field) - if len(serialized) > max_length: - rendered = redact(serialized, name) + serialized = serialize_object(template_field) + + if len(str(serialized)) > max_length: + rendered = redact(str(serialized), name) return truncate_rendered_value(str(rendered), max_length) - return template_field + + return serialized class TimetableNotRegistered(ValueError): diff --git a/airflow-core/tests/unit/dags/test_dag_decorator_version.py b/airflow-core/tests/unit/dags/test_dag_decorator_version.py new file mode 100644 index 0000000000000..35fd0c98bb9f0 --- /dev/null +++ b/airflow-core/tests/unit/dags/test_dag_decorator_version.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow.sdk import dag, task, task_group + + +@dag( + dag_id="TEST_DTM", + dag_display_name="TEST DTM", + schedule=None, + default_args={"owner": "airflow", "email": ""}, + start_date=datetime(2024, 1, 25), +) +def dtm_test( + exponent: int = 2, +): + + @task + def get_data(): + return [20, 100, 200, 222, 242, 272] + + @task + def to_exp(number: int, exponent: int) -> float: + return number**exponent + + @task + def trunc(number: float, digits: int) -> float: + return round(number / 22, digits) + + @task + def save(number: list[float]): + for n in number: + print(f"Got number: {n}") + + @task_group # type: ignore[type-var] + def transform(number: int, exponent: int) -> float: + a = to_exp(number, exponent) + b = trunc(a, 2) + return b + + data = get_data() + result = transform.partial(exponent=exponent).expand(number=data) + save(result) # type: ignore[arg-type] + + +instance = dtm_test() diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py b/airflow-core/tests/unit/models/test_renderedtifields.py index d42ed06b033fe..37e6088494d13 100644 --- a/airflow-core/tests/unit/models/test_renderedtifields.py +++ b/airflow-core/tests/unit/models/test_renderedtifields.py @@ -116,11 +116,11 @@ def teardown_method(self): pytest.param([], [], id="list"), pytest.param({}, {}, id="empty_dict"), pytest.param((), [], id="empty_tuple"), - pytest.param(set(), "set()", id="empty_set"), + pytest.param(set(), [], id="empty_set"), pytest.param("test-string", "test-string", id="string"), pytest.param({"foo": "bar"}, {"foo": "bar"}, id="dict"), pytest.param(("foo", "bar"), ["foo", "bar"], id="tuple"), - pytest.param({"foo"}, "{'foo'}", id="set"), + pytest.param({"foo"}, ["foo"], id="set"), (date(2018, 12, 6), "2018-12-06"), pytest.param(datetime(2018, 12, 6, 10, 55), "2018-12-06 10:55:00+00:00", id="datetime"), pytest.param( diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index fd333d5d79918..0a40f67314e92 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -82,6 +82,7 @@ from airflow.serialization.serialized_objects import ( BaseSerialization, DagSerialization, + LazyDeserializedDAG, OperatorSerialization, _XComRef, ) @@ -114,6 +115,7 @@ cron_timetable, delta_timetable, ) +from unit.models import TEST_DAGS_FOLDER if TYPE_CHECKING: from airflow.sdk.definitions.context import Context @@ -704,6 +706,43 @@ def test_deserialization_across_process(self): for dag_id in stringified_dags: self.validate_deserialized_dag(stringified_dags[dag_id], dags[dag_id]) + @pytest.mark.db_test + @conf_vars({("core", "load_examples"): "false"}) + def test_reserialize_should_make_equal_hash_with_dag_processor(self): + dagbag1 = DagBag(TEST_DAGS_FOLDER / "test_dag_decorator_version.py") + hash_result1 = LazyDeserializedDAG.from_dag(next(iter(dagbag1.dags.values()))).hash + + dagbag2 = DagBag(TEST_DAGS_FOLDER / "test_dag_decorator_version.py") + hash_result2 = LazyDeserializedDAG.from_dag(next(iter(dagbag2.dags.values()))).hash + + assert hash_result1 == hash_result2 + + @pytest.mark.db_test + @conf_vars({("core", "load_examples"): "false"}) + def test_hash_succeeds_for_dag_with_mixed_primitive_key_template_field(self): + """SerializedDagModel.hash() must not raise on a template field whose dict has mixed-type primitive keys. + + Building the Dag twice via ``create_dag()`` produces independent Dag and + operator instances, so the hashes must also be equal across calls — + otherwise the serialization path is leaking non-deterministic state + (memory addresses, dict ordering, etc.) into the hash. + """ + from airflow.providers.standard.operators.python import PythonOperator + + def create_dag(): + with DAG(dag_id="dag_mixed_keys", schedule=None, start_date=datetime(2024, 1, 1)) as dag: + PythonOperator( + task_id="op", + python_callable=empty_function, + op_kwargs={"data": {1: "a", "b": "c", None: "z", 2: "d"}, empty_function: "t"}, + ) + return dag + + first_hash = LazyDeserializedDAG.from_dag(create_dag()).hash + second_hash = LazyDeserializedDAG.from_dag(create_dag()).hash + + assert first_hash == second_hash + @skip_if_force_lowest_dependencies_marker @pytest.mark.db_test def test_roundtrip_provider_example_dags(self): diff --git a/airflow-core/tests/unit/serialization/test_helpers.py b/airflow-core/tests/unit/serialization/test_helpers.py index 94fb26525c761..3e88cdab117b3 100644 --- a/airflow-core/tests/unit/serialization/test_helpers.py +++ b/airflow-core/tests/unit/serialization/test_helpers.py @@ -16,6 +16,10 @@ # under the License. from __future__ import annotations +import json + +import pytest + from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION from airflow.serialization.definitions.notset import NOTSET from airflow.serialization.helpers import serialize_template_field @@ -33,6 +37,16 @@ def test_serialize_template_field_with_very_small_max_length(monkeypatch): assert "Truncated. You can change this behaviour" in result +def test_serialize_template_field_truncation_kicks_in(monkeypatch): + """Long serialized output must be truncated with the standard message.""" + monkeypatch.setenv("AIRFLOW__CORE__MAX_TEMPLATED_FIELD_LENGTH", "20") + + long_value = {"k": "x" * 500} + result = serialize_template_field(long_value, "field") + + assert "Truncated. You can change this behaviour" in result + + def test_serialize_template_field_with_notset(): """NOTSET must serialize deterministically via serialize(), not str() fallback.""" result = serialize_template_field(NOTSET, "logical_date") @@ -51,3 +65,533 @@ def test_argnotset_repr_and_str(): assert str(NOTSET) == "NOTSET" assert repr(SET_DURING_EXECUTION) == "DYNAMIC (set during execution)" assert str(SET_DURING_EXECUTION) == "DYNAMIC (set during execution)" + + +def test_serialize_template_field_with_dict_value_callable(): + + def fn_returns_callable(): + def get_arg(): + pass + + return get_arg + + template_name = "op_kwargs" + + def make_value(): + return {"values": [3, 1, 2], "sort_key": lambda x: x} + + result1 = serialize_template_field(make_value(), template_name) + result2 = serialize_template_field(make_value(), template_name) + + assert result1 == result2 + + def make_value_nested(): + return { + "values": [3, 1, 2], + "sort_key_nested": {"b": lambda x: fn_returns_callable(), "a": "test"}, + } + + result1_nested = serialize_template_field(make_value_nested(), template_name) + result2_nested = serialize_template_field(make_value_nested(), template_name) + + assert result1_nested == result2_nested + + +def test_serialize_template_field_with_mixed_key_dict_and_callable(): + """Mixed-key dicts containing callables must serialize deterministically without TypeError.""" + template_name = "op_kwargs" + + def make_value(): + return {1: "a", "b": lambda x: x, 2: "c"} + + result1 = serialize_template_field(make_value(), template_name) + result2 = serialize_template_field(make_value(), template_name) + + assert result1 == result2 + assert any(isinstance(v, str) and "` repr leaks (which would break DAG hashing).""" + + def my_fn(): + pass + + value = { + "a": my_fn, + "b": [my_fn, {"c": my_fn}], + my_fn: "as-key", + ("tup",): my_fn, + } + result = serialize_template_field(value, "op_kwargs") + assert "at 0x" not in str(result) + + +def test_serialize_template_field_output_is_jsonable(): + """Whatever shape we pass in, the result must be directly JSON-encodable.""" + + def my_fn(): + pass + + value = { + "callable_value": my_fn, + "nested": {"list": [1, (2, 3), my_fn], "deep": {("k",): my_fn}}, + frozenset([1, 2]): [my_fn], + my_fn: {"x": 1}, + } + result = serialize_template_field(value, "op_kwargs") + json.dumps(result) + + +def test_serialize_template_field_deeply_nested_determinism(): + """Determinism across new instances of the same nested structure (key ordering must not matter).""" + + def my_fn(): + pass + + def make_a(): + return { + "z": [3, 2, 1], + "a": {"nested": my_fn, "items": (1, 2)}, + 10: ("x", "y"), + } + + def make_b(): + # Same content, different insertion order + return { + 10: ("x", "y"), + "a": {"items": (1, 2), "nested": my_fn}, + "z": [3, 2, 1], + } + + assert serialize_template_field(make_a(), "f") == serialize_template_field(make_b(), "f") + + +def test_serialize_template_field_bool_not_collapsed_to_int(): + """bool must be preserved as bool (Python treats True == 1, but JSON distinguishes them).""" + result = serialize_template_field({"flag": True, "count": 1}, "op_kwargs") + assert result["flag"] is True + assert result["count"] == 1 + assert type(result["flag"]) is bool + + +def test_serialize_template_field_none_preserved(): + """None must round-trip as None, not the string 'None'.""" + result = serialize_template_field({"x": None, "y": [None, 1]}, "op_kwargs") + assert result == {"x": None, "y": [None, 1]} + + +def test_serialize_template_field_list_with_callables_and_objects(): + """Lists must recursively serialize callables and objects without leaking repr.""" + + def my_fn(): + pass + + class Custom: + def serialize(self): + return "custom-serialized" + + result = serialize_template_field([1, my_fn, Custom(), (2, my_fn)], "field") + assert result[0] == 1 + assert result[1].startswith("``, so they collapse to the same + # serialized key. The assertion below targets stability across calls, + # not key preservation between the two lambdas. + return {(lambda x: x): "a", (lambda y: y): "b"} + + r1 = serialize_template_field(make_value(), "op_kwargs") + r2 = serialize_template_field(make_value(), "op_kwargs") + + assert r1 == r2 + assert "at 0x" not in str(r1) + + +def test_serialize_template_field_dict_with_serializable_keys_sort_by_serialized_form(): + """Custom objects whose .serialize() returns a stable string must be sorted by that string, not by repr.""" + + class StableId: + def __init__(self, name): + self.name = name + + def serialize(self): + return self.name + + # Insert in reverse alphabetical order — sorting by serialized form must reverse it. + r1 = serialize_template_field({StableId("zeta"): 1, StableId("alpha"): 2}, "op_kwargs") + r2 = serialize_template_field({StableId("alpha"): 2, StableId("zeta"): 1}, "op_kwargs") + + assert r1 == r2 + assert list(r1.keys()) == ["alpha", "zeta"] + + +@pytest.mark.parametrize( + ("value", "expected_keys"), + [ + ({1: "a", 2: "b"}, {"1", "2"}), + ({True: "a", False: "b"}, {"True", "False"}), + ({None: "a"}, {"None"}), + ({1.5: "a", 2.5: "b"}, {"1.5", "2.5"}), + ({1: "a", "b": "c"}, {"1", "b"}), + ], + ids=["int_keys", "bool_keys", "none_key", "float_keys", "mixed_int_str"], +) +def test_serialize_template_field_primitive_keys_coerced_to_string(value, expected_keys): + """All dict keys must be coerced to str so json.dumps(sort_keys=True) downstream cannot raise.""" + result = serialize_template_field(value, "op_kwargs") + assert set(result.keys()) == expected_keys + assert all(isinstance(k, str) for k in result) + + +def test_serialize_template_field_mixed_primitive_keys_jsonable_sort_keys(): + """Output of mixed-type primitive keys must survive ``json.dumps(..., sort_keys=True)``.""" + value = {1: "a", "b": "c", 2: "d", 3: True, None: "z", False: "y"} + result = serialize_template_field(value, "op_kwargs") + + json.dumps(result, sort_keys=True) + + +def test_serialize_template_field_mixed_primitive_keys_deterministic_across_calls(): + """Same input parsed twice must yield identical output once keys are stringified.""" + + def fn_a(): + pass + + def fn_b(): + pass + + def make_value(): + return {1: "a", "b": "c", 2: "d", None: "z", "test": fn_b, fn_a: 3.5} + + assert serialize_template_field(make_value(), "op_kwargs") == serialize_template_field( + make_value(), "op_kwargs" + ) + + +def test_serialize_template_field_nested_mixed_primitive_keys_jsonable(): + """Nested mixed-type primitive keys (dict inside dict) must also be coerced and jsonable.""" + value = {"outer": {1: "a", "b": "c", None: "z"}} + result = serialize_template_field(value, "op_kwargs") + + assert all(isinstance(k, str) for k in result["outer"]) + json.dumps(result, sort_keys=True) + + +def test_serialize_template_field_deeply_nested_dict_keys_recursively_normalized(): + """Every nested dict must apply key normalization and sorting recursively. + + Mixed-type primitive keys and callable keys appear at multiple depths; the + helper must stringify and sort them at each level so the full output is + deterministic across calls and safe for ``json.dumps(sort_keys=True)``. + """ + + def fn_inner(): + pass + + def make_value(): + return { + "level1": { + 1: "a", + fn_inner: { + None: "deep", + "nested_str": "value", + 2.5: {fn_inner: "deepest"}, + }, + "b": {3: "three", 4: "four"}, + }, + } + + r1 = serialize_template_field(make_value(), "op_kwargs") + r2 = serialize_template_field(make_value(), "op_kwargs") + + assert r1 == r2 + assert all(isinstance(k, str) for k in r1["level1"]) + callable_key = next(k for k in r1["level1"] if "fn_inner" in k) + inner = r1["level1"][callable_key] + assert all(isinstance(k, str) for k in inner) + float_key = next(k for k in inner if k == "2.5") + assert all(isinstance(k, str) for k in inner[float_key]) + assert "at 0x" not in str(r1) + json.dumps(r1, sort_keys=True) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 7c318fc499ed6..969bb242cd668 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1003,83 +1003,81 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: return ti, ti.get_template_context(), log -def _serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float: +def _serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float | bool: """ Return a serializable representation of the templated field. - If ``templated_field`` contains a class or instance that requires recursive - templating, store them as strings. Otherwise simply return the field as-is. + The walk has two responsibilities: + + 1. **Make the template_field JSON-encodable** — every container is rebuilt + with primitive leaves (str/int/float/bool/None), tuples are flattened to + lists, and unsupported objects fall through to ``str()`` so ``json.dumps`` + never raises on the result. + 2. **Keep the output deterministic across parses** — callables are replaced + with their qualified name (never the default ```` + repr) and dicts are key-sorted so the same input always produces the + same string. Used sdk secrets masker to redact secrets in the serialized output. """ - import json + import inspect + from airflow.sdk._shared.module_loading import qualname from airflow.sdk._shared.secrets_masker import redact - def is_jsonable(x): - try: - json.dumps(x) - except (TypeError, OverflowError): - return False - else: - return True - - def translate_tuples_to_lists(obj: Any): - """Recursively convert tuples to lists.""" - if isinstance(obj, tuple): - return [translate_tuples_to_lists(item) for item in obj] - if isinstance(obj, list): - return [translate_tuples_to_lists(item) for item in obj] - if isinstance(obj, dict): - return {key: translate_tuples_to_lists(value) for key, value in obj.items()} - return obj + def normalize_dict_key(key) -> str: + """Normalize a dict key to a serialized string type.""" + # Serialized template_field keys must all be strings, not a mix of types, so that + # downstream json.dumps(..., sort_keys=True) does not raise on mixed-type keys. + return str(serialize_object(key)) + + def serialize_object(obj): + """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable structure.""" + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj - def sort_dict_recursively(obj: Any) -> Any: - """Recursively sort dictionaries to ensure consistent ordering.""" if isinstance(obj, dict): - return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())} - if isinstance(obj, list): - return [sort_dict_recursively(item) for item in obj] - if isinstance(obj, tuple): - return tuple(sort_dict_recursively(item) for item in obj) - return obj - - def _fallback_serialization(obj): - """Serialize objects with to_dict() method (eg: k8s objects) for json.dumps() default parameter.""" - if hasattr(obj, "to_dict"): - return obj.to_dict() - raise TypeError(f"cannot serialize {obj}") + # Serialize keys/values first so each key is a string and the output is hash-stable, + # then sort by the serialized key. + serialized_pairs = [(normalize_dict_key(k), serialize_object(v)) for k, v in obj.items()] + return dict(sorted(serialized_pairs, key=lambda kv: kv[0])) + + if isinstance(obj, (list, tuple)): + return [serialize_object(item) for item in obj] + + if isinstance(obj, (set, frozenset)): + # JSON has no set type → flatten to a list with deterministic ordering + # so hash randomization on element types cannot shift cross-process iteration order. + return sorted( + (serialize_object(item) for item in obj), + key=lambda x: (type(x).__name__, str(x)), + ) + + # Use inspect.getattr_static to bypass any custom __getattr__ / metaclass magic + if callable(inspect.getattr_static(obj, "serialize", None)): + return serialize_object(obj.serialize()) + + # Kubernetes client objects (V1Pod, V1Container, ...) expose their content via to_dict() + if callable(inspect.getattr_static(obj, "to_dict", None)): + return serialize_object(obj.to_dict()) + + if callable(obj): + # Use qualified name; default repr embeds memory addresses, which would change the DAG hash on every parse + return f"" + + # Non-primitive objects without a serialize attribute are converted to str + # So they don't break json.dumps downstream + return str(obj) max_length = conf.getint("core", "max_templated_field_length") - if not is_jsonable(template_field): - try: - serialized = template_field.serialize() - except AttributeError: - # check if these objects can be converted to JSON serializable types - try: - serialized = json.dumps(template_field, default=_fallback_serialization) - except (TypeError, ValueError): - # fall back to string representation if not - serialized = str(template_field) - if len(serialized) > max_length: - rendered = redact(serialized, name) - return truncate_rendered_value(str(rendered), max_length) - return serialized - if not template_field and not isinstance(template_field, tuple): - # Avoid unnecessary serialization steps for empty fields unless they are tuples - # and need to be converted to lists - return template_field - template_field = translate_tuples_to_lists(template_field) - # Sort dictionaries recursively to ensure consistent string representation - # This prevents hash inconsistencies when dict ordering varies - if isinstance(template_field, dict): - template_field = sort_dict_recursively(template_field) - serialized = str(template_field) - if len(serialized) > max_length: - rendered = redact(serialized, name) + serialized = serialize_object(template_field) + + if len(str(serialized)) > max_length: + rendered = redact(str(serialized), name) return truncate_rendered_value(str(rendered), max_length) - return template_field + + return serialized def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]: diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 723ca42d93aa6..b851d73f74428 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -1117,7 +1117,7 @@ def test_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comm ), pytest.param( {"my_tup": (1, 2), "my_set": {1, 2, 3}}, - {"my_tup": [1, 2], "my_set": "{1, 2, 3}"}, + {"my_tup": [1, 2], "my_set": [1, 2, 3]}, id="tuples_and_sets", ), pytest.param( @@ -3042,10 +3042,13 @@ def execute(self, context): rendered_fields = mock_supervisor_comms.send.mock_calls[0].kwargs["msg"].rendered_fields assert rendered_fields is not None - assert ( - rendered_fields["env_vars"] - == '[{"name": "var1", "value": "This is a test phrase.", "value_from": null}, {"name": "var2", "value": "***", "value_from": null}, {"name": "var3", "value": "***", "value_from": null}]' - ) + # K8s V1EnvVar objects expose .to_dict(); the recursive walk normalizes the list of objects + # into a list of plain dicts so the result is directly JSON-encodable and redact can mask secrets in nested values. + assert rendered_fields["env_vars"] == [ + {"name": "var1", "value": "This is a test phrase.", "value_from": None}, + {"name": "var2", "value": "***", "value_from": None}, + {"name": "var3", "value": "***", "value_from": None}, + ] def test_nested_template_field_renderer_respects_redaction( self, create_runtime_ti, mock_supervisor_comms From 422bd739c1b993bd9fd6af278df0596fb4332496 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 12 May 2026 17:50:48 +0900 Subject: [PATCH 2/3] fix logic --- airflow-core/src/airflow/serialization/helpers.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 89d5bb4167228..5e47074df0104 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -80,6 +80,10 @@ def serialize_object(obj): if callable(inspect.getattr_static(obj, "serialize", None)): return serialize_object(obj.serialize()) + # Kubernetes client objects (V1Pod, V1Container, ...) expose their content via to_dict() + if callable(inspect.getattr_static(obj, "to_dict", None)): + return serialize_object(obj.to_dict()) + if callable(obj): # Use qualified name; default repr embeds memory addresses, which would change the DAG hash on every parse return f"" From 99d0a6a20bcbac264663f5fe10919e0317c6821a Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 12 May 2026 17:56:33 +0900 Subject: [PATCH 3/3] fix logic --- airflow-core/src/airflow/serialization/helpers.py | 2 +- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 5e47074df0104..45efc0bdec4b7 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -32,7 +32,7 @@ from airflow.timetables.base import Timetable as CoreTimetable -def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float | bool: +def serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float | bool | None: """ Return a serializable representation of the templated field. diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 969bb242cd668..7c3ec5641f4ac 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1003,7 +1003,9 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: return ti, ti.get_template_context(), log -def _serialize_template_field(template_field: Any, name: str) -> str | dict | list | int | float | bool: +def _serialize_template_field( + template_field: Any, name: str +) -> str | dict | list | int | float | bool | None: """ Return a serializable representation of the templated field.