Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/67684.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``ONE_FAILED``/``ONE_SUCCESS``/``ONE_DONE`` trigger rules inside a mapped task group being evaluated against every upstream instance instead of the upstream instance sharing the task's own map index, which wrongly triggered the rule for every expanded instance when only one upstream had failed (or succeeded).
9 changes: 8 additions & 1 deletion airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,14 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None:
assert task.dag
assert task.task_group

if is_mapped(task.task_group):
# Only the not-yet-expanded summary ti (map_index < 0) needs the broad
# "depend on every upstream ti" behavior, so a fast-triggered rule
# (ONE_SUCCESS / ONE_FAILED / ONE_DONE) does not skip it before the
# mapped task group has expanded (see #34023). Once the ti is expanded,
# each instance must depend on the upstream instance(s) that share its
# map index, otherwise a single upstream failure would wrongly trigger
# every expanded instance (see #50210).
if is_mapped(task.task_group) and ti.map_index < 0:
is_fast_triggered = task.trigger_rule in (TR.ONE_SUCCESS, TR.ONE_FAILED, TR.ONE_DONE)
if is_fast_triggered and upstream_id not in set(
_iter_expansion_dependencies(task_group=task.task_group)
Expand Down
108 changes: 108 additions & 0 deletions airflow-core/tests/unit/models/test_mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,114 @@ def t3(a):
assert not ti3.state


def test_one_failed_trigger_rule_in_mapped_task_group_is_per_index(dag_maker):
"""Regression test for #50210.

A task with the ``ONE_FAILED`` trigger rule inside a mapped task group must
be evaluated against the upstream instance that shares its own map index,
not against every upstream instance of the group. Otherwise a single failed
upstream instance would wrongly trigger the rule for every expanded instance.
"""
with dag_maker(dag_id="test_one_failed_in_mapped_task_group") as dag:

@task
def divide(i):
return 30 / i

@task(trigger_rule=TriggerRule.ONE_FAILED)
def report_failure(i):
pass

@task
def report_success(i):
pass

@task
def gen_examples():
return [0, 1, 2, 3]

@task_group
def divide_and_report(i):
divide(i) >> [report_success(i), report_failure(i)]

divide_and_report.expand(i=gen_examples())

dr = dag.test()

states: dict[str, dict[int, str | None]] = defaultdict(dict)
for ti in dr.get_task_instances():
states[ti.task_id][ti.map_index] = ti.state

# divide(0) fails (ZeroDivisionError); the rest succeed.
assert states["divide_and_report.divide"] == {0: "failed", 1: "success", 2: "success", 3: "success"}
# Only report_failure sharing divide(0)'s map index should run; the rest are skipped.
assert states["divide_and_report.report_failure"] == {
0: "success",
1: "skipped",
2: "skipped",
3: "skipped",
}
# report_success mirrors the opposite: it is upstream_failed only where divide failed.
assert states["divide_and_report.report_success"] == {
0: "upstream_failed",
1: "success",
2: "success",
3: "success",
}


def test_one_failed_trigger_rule_runs_on_indirect_failure_in_mapped_task_group(dag_maker):
"""Regression test for #34023.

A ``ONE_FAILED`` task at the end of a chain inside a mapped task group must
still run for every expanded instance whose (indirect) upstream failed, and
must not be skipped prematurely before the group has expanded. This guards
the end-to-end outcome of the fix that the per-index change in #50210 builds on.
"""
with dag_maker(dag_id="test_one_failed_indirect_in_mapped_task_group") as dag:

@task
def get_records():
return ["a", "b", "c"]

@task
def submit_job(record):
pass

@task
def fake_sensor(record):
raise RuntimeError("boo")

@task
def deliver_record(record):
pass

@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record):
pass

@task_group(group_id="deliver_records")
def deliver_record_task_group(record):
(
submit_job(record)
>> fake_sensor(record)
>> deliver_record(record)
>> handle_failed_delivery(record)
)

deliver_record_task_group.expand(record=get_records())

dr = dag.test()

states: dict[str, dict[int, str | None]] = defaultdict(dict)
for ti in dr.get_task_instances():
states[ti.task_id][ti.map_index] = ti.state

# fake_sensor fails for every index, so handle_failed_delivery must run everywhere.
assert states["deliver_records.fake_sensor"] == {0: "failed", 1: "failed", 2: "failed"}
assert states["deliver_records.handle_failed_delivery"] == {0: "success", 1: "success", 2: "success"}


def test_mapped_operator_retry_delay_default(dag_maker):
"""
Test that MappedOperator.retry_delay returns default value when not explicitly set.
Expand Down
Loading