From 10398beb5244a1ef5cb0ee80adf40c269963e1a5 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Tue, 21 Apr 2026 18:06:33 -0400 Subject: [PATCH] Fix topo sort for listed deps --- .../serialization/definitions/taskgroup.py | 12 ++++++++-- .../tests/unit/utils/test_task_group.py | 22 +++++++++++++++++++ .../src/airflow/sdk/definitions/taskgroup.py | 14 +++++++++--- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py b/airflow-core/src/airflow/serialization/definitions/taskgroup.py index d971c303c7c53..9f5020cd57038 100644 --- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py +++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py @@ -243,8 +243,16 @@ def topological_sort(self) -> list[DAGNode]: # We are already going to visit that TG break else: - del graph_unsorted[node.node_id] - graph_sorted.append(node) + # When list-based deps (e.g. `[b0, b1] >> a`) are used between TaskGroups, + # only upstream_group_ids is populated (not upstream_task_ids), so upstream_list + # is empty and the task-level check above won't block the node. Check group-level + # upstreams explicitly to handle this case. + for group_id in getattr(node, "upstream_group_ids", ()): + if group_id in graph_unsorted: + break + else: + del graph_unsorted[node.node_id] + graph_sorted.append(node) return graph_sorted def add(self, node: DAGNode) -> DAGNode: diff --git a/airflow-core/tests/unit/utils/test_task_group.py b/airflow-core/tests/unit/utils/test_task_group.py index 3b62ad75a7290..361043c68c8ba 100644 --- a/airflow-core/tests/unit/utils/test_task_group.py +++ b/airflow-core/tests/unit/utils/test_task_group.py @@ -1117,6 +1117,28 @@ def nested_topo(group): ] +def test_topological_group_dep_list_syntax(): + """List-based deps (`[b0, b1] >> a`) must produce the same topological order as individual deps.""" + logical_date = pendulum.parse("20200101") + with DAG("test_dag_list_dep", schedule=None, start_date=logical_date) as dag: + with TaskGroup("a") as tg_a: + EmptyOperator(task_id="task") + + groups = [] + for x in range(3): + with TaskGroup(f"b_{x}") as tg_b: + EmptyOperator(task_id="task") + groups.append(tg_b) + + groups >> tg_a # list-based dep — previously produced wrong order + + top_level = dag.task_group.topological_sort() + ids = [node.node_id for node in top_level] + a_idx = ids.index("a") + b_idxs = [ids.index(f"b_{x}") for x in range(3)] + assert all(b < a_idx for b in b_idxs), f"Expected all b_x before a in topological order, got: {ids}" + + def test_task_group_arrow_with_setup_group(): with DAG(dag_id="setup_group_teardown_group") as dag: with TaskGroup("group_1") as g1: diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py b/task-sdk/src/airflow/sdk/definitions/taskgroup.py index c47b1f360aea6..a5b7615f3186b 100644 --- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py +++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py @@ -559,9 +559,17 @@ def topological_sort(self): # We are already going to visit that TG break else: - acyclic = True - del graph_unsorted[node.node_id] - graph_sorted.append(node) + # When list-based deps (e.g. `[b0, b1] >> a`) are used between TaskGroups, + # only upstream_group_ids is populated (not upstream_task_ids), so upstream_list + # is empty and the task-level check above won't block the node. Check group-level + # upstreams explicitly to handle this case. + for group_id in getattr(node, "upstream_group_ids", ()): + if group_id in graph_unsorted: + break + else: + acyclic = True + del graph_unsorted[node.node_id] + graph_sorted.append(node) if not acyclic: raise AirflowDagCycleException(f"A cyclic dependency occurred in dag: {self.dag_id}")