Fix missing implicit downstream dependencies for mapping sources in MappedTaskGroups #59561
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Ensure that tasks within a
MappedTaskGroupwhich are implicitly dependent on mapping sources are added to the downstream depdencies of the mapping source, even when they are not explicitly defined as consumers of the mapping input.This change ensures implicit dependencies of the mapping source within a
MappedTaskGroupare consistently reflected at the task level.Rationale
Previously, only tasks within a
MappedTaskGroupthat explicitly consumed the mapping input were registered as downstream dependencies of the mapping source.For example, in the task group definition below, the mapped task created by calling the mapping function explicitly consumes the mapping function passed into the argument
y:task_group_task = process_something(x, y)As a result, this task was correctly registered as downstream of the mapping source.
However, this was not the case for other tasks within the same MappedTaskGroup (but not immediately downstream of the mapping source) that did not directly consume the mapping input:
task_group_end = EmptyOperator (task_id ='end')These relied solely on task-to-task relationships within the group:
'task_group_task >> task_group_end `
Even though the entire task group was downstream of the mapping source:
dag_level_start >> the_list >> mapped_group >> dag_level_endTasks such
task_group_enddid not inherit an implicit task-level dependency on the mapping source. Their dependency on the mapping input was only represented at the group level.This could cause implicit dependencies to be skipped during DAG evaluation under certain trigger rules (such as
NONE_FAILED_MIN_ONE_SUCCESS), particularly when multiple parallel task streams within theMappedTaskGroupconverge on a single downstream task outside the group such asdag_level_endAs a result, DAG runs could be incorrectly marked as failed or skipped despite valid upstream execution.
Please refer to issue #59289 for more context. This PR was opened in response to that. The author of the issue reported the bug in Airflow 3.0.6 but I can confirm that the same issue is present in Airflow 3.1.5 after reproducing the bug.
Tests
Added a test covering
MappedTaskGroupbehavior to ensure all tasks within the group are registered as downstream dependencies of the mapping source, including tasks that do not explicitly consume the mapping input.Closes: #59289