From 4e3e575d784ea1c32f2a6e12e2757cf4f5f38c94 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 29 May 2026 13:25:02 +0300 Subject: [PATCH] [v3-2-test] Fix per-index evaluation of ONE_FAILED in mapped task groups (#67684) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A task using a "fast triggered" trigger rule (ONE_FAILED, ONE_SUCCESS, ONE_DONE) inside a mapped task group was evaluated against every expanded instance of its upstream, instead of the upstream instance sharing its own map index. As a result a single failed (or succeeded) upstream instance wrongly triggered the rule for every expanded instance of the task — e.g. a mapped ONE_FAILED reporting task ran for all map indexes when only one upstream had failed. The broad "depend on every upstream instance" behavior is only needed for the not-yet-expanded summary task instance (map_index < 0), so a fast trigger rule does not prematurely skip the task before the mapped task group expands (the case fixed in #34023). Restrict that special case to the summary instance; expanded instances now use the normal per-map-index upstream resolution. (cherry picked from commit 862b647facf4d6df67c63218257fd0dcbe11c315) Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com> closes: #50210 --- airflow-core/newsfragments/67684.bugfix.rst | 1 + .../airflow/ti_deps/deps/trigger_rule_dep.py | 9 +- .../tests/unit/models/test_mappedoperator.py | 108 ++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 airflow-core/newsfragments/67684.bugfix.rst diff --git a/airflow-core/newsfragments/67684.bugfix.rst b/airflow-core/newsfragments/67684.bugfix.rst new file mode 100644 index 0000000000000..9cf4ce96c85f3 --- /dev/null +++ b/airflow-core/newsfragments/67684.bugfix.rst @@ -0,0 +1 @@ +Fix ``ONE_FAILED``/``ONE_SUCCESS``/``ONE_DONE`` trigger rules inside a mapped task group being evaluated against every upstream instance instead of the upstream instance sharing the task's own map index, which wrongly triggered the rule for every expanded instance when only one upstream had failed (or succeeded). diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py index 4943913d3283a..eab9792ba32f2 100644 --- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py @@ -175,7 +175,14 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: assert task.dag assert task.task_group - if is_mapped(task.task_group): + # Only the not-yet-expanded summary ti (map_index < 0) needs the broad + # "depend on every upstream ti" behavior, so a fast-triggered rule + # (ONE_SUCCESS / ONE_FAILED / ONE_DONE) does not skip it before the + # mapped task group has expanded (see #34023). Once the ti is expanded, + # each instance must depend on the upstream instance(s) that share its + # map index, otherwise a single upstream failure would wrongly trigger + # every expanded instance (see #50210). + if is_mapped(task.task_group) and ti.map_index < 0: is_fast_triggered = task.trigger_rule in (TR.ONE_SUCCESS, TR.ONE_FAILED, TR.ONE_DONE) if is_fast_triggered and upstream_id not in set( _iter_expansion_dependencies(task_group=task.task_group) diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py b/airflow-core/tests/unit/models/test_mappedoperator.py index b5bbbd7f3a9df..e00a821e056d7 100644 --- a/airflow-core/tests/unit/models/test_mappedoperator.py +++ b/airflow-core/tests/unit/models/test_mappedoperator.py @@ -1601,6 +1601,114 @@ def t3(a): assert not ti3.state +def test_one_failed_trigger_rule_in_mapped_task_group_is_per_index(dag_maker): + """Regression test for #50210. + + A task with the ``ONE_FAILED`` trigger rule inside a mapped task group must + be evaluated against the upstream instance that shares its own map index, + not against every upstream instance of the group. Otherwise a single failed + upstream instance would wrongly trigger the rule for every expanded instance. + """ + with dag_maker(dag_id="test_one_failed_in_mapped_task_group") as dag: + + @task + def divide(i): + return 30 / i + + @task(trigger_rule=TriggerRule.ONE_FAILED) + def report_failure(i): + pass + + @task + def report_success(i): + pass + + @task + def gen_examples(): + return [0, 1, 2, 3] + + @task_group + def divide_and_report(i): + divide(i) >> [report_success(i), report_failure(i)] + + divide_and_report.expand(i=gen_examples()) + + dr = dag.test() + + states: dict[str, dict[int, str | None]] = defaultdict(dict) + for ti in dr.get_task_instances(): + states[ti.task_id][ti.map_index] = ti.state + + # divide(0) fails (ZeroDivisionError); the rest succeed. + assert states["divide_and_report.divide"] == {0: "failed", 1: "success", 2: "success", 3: "success"} + # Only report_failure sharing divide(0)'s map index should run; the rest are skipped. + assert states["divide_and_report.report_failure"] == { + 0: "success", + 1: "skipped", + 2: "skipped", + 3: "skipped", + } + # report_success mirrors the opposite: it is upstream_failed only where divide failed. + assert states["divide_and_report.report_success"] == { + 0: "upstream_failed", + 1: "success", + 2: "success", + 3: "success", + } + + +def test_one_failed_trigger_rule_runs_on_indirect_failure_in_mapped_task_group(dag_maker): + """Regression test for #34023. + + A ``ONE_FAILED`` task at the end of a chain inside a mapped task group must + still run for every expanded instance whose (indirect) upstream failed, and + must not be skipped prematurely before the group has expanded. This guards + the end-to-end outcome of the fix that the per-index change in #50210 builds on. + """ + with dag_maker(dag_id="test_one_failed_indirect_in_mapped_task_group") as dag: + + @task + def get_records(): + return ["a", "b", "c"] + + @task + def submit_job(record): + pass + + @task + def fake_sensor(record): + raise RuntimeError("boo") + + @task + def deliver_record(record): + pass + + @task(trigger_rule=TriggerRule.ONE_FAILED) + def handle_failed_delivery(record): + pass + + @task_group(group_id="deliver_records") + def deliver_record_task_group(record): + ( + submit_job(record) + >> fake_sensor(record) + >> deliver_record(record) + >> handle_failed_delivery(record) + ) + + deliver_record_task_group.expand(record=get_records()) + + dr = dag.test() + + states: dict[str, dict[int, str | None]] = defaultdict(dict) + for ti in dr.get_task_instances(): + states[ti.task_id][ti.map_index] = ti.state + + # fake_sensor fails for every index, so handle_failed_delivery must run everywhere. + assert states["deliver_records.fake_sensor"] == {0: "failed", 1: "failed", 2: "failed"} + assert states["deliver_records.handle_failed_delivery"] == {0: "success", 1: "success", 2: "success"} + + def test_mapped_operator_retry_delay_default(dag_maker): """ Test that MappedOperator.retry_delay returns default value when not explicitly set.