Optimize TaskGroup.topological_sort for reverse-declared Dags#67688
Optimize TaskGroup.topological_sort for reverse-declared Dags#67688shahar1 wants to merge 2 commits into
Conversation
645a59d to
8506a3f
Compare
| pending = next_pending | ||
| return order | ||
|
|
||
| def _sort_via_pass_numbering( |
There was a problem hiding this comment.
This new branch has no test pinning its emission order against the sweep. The reverse-chain cases in test_topological_sort_shape_correctness route through here, but _assert_valid_topological_order only checks the result is a valid topological sort, not that it matches what _sweep_projection emits. The order-sensitive tests (test_topological_sort1/2, test_topological_nested_groups) use forward-declared DAGs and route through the sweep, so nothing pins the "both branches produce identical order" invariant this PR rests on.
Worth a test that builds a reverse-declared DAG and asserts _sort_via_pass_numbering and _sweep_projection return identical orders. I checked equivalence empirically across ~150k random DAGs and it holds, so this is a coverage gap rather than a bug, but it's the property most likely to silently break in a future refactor.
There was a problem hiding this comment.
Added a reverse-declared regression that directly asserts _sort_via_pass_numbering() emits the same order as _sweep_projection(), so that invariant is now pinned independently of the validity-only shape tests.
| if any(d > i for d in deps): | ||
| nodes_with_back_edge += 1 | ||
|
|
||
| if nodes_with_back_edge * 2 > n: |
There was a problem hiding this comment.
nodes_with_back_edge * 2 > n is a ratio over all children, so a long reverse chain padded with independent children silently stays on the O(N²) sweep, the exact case this PR targets.
Concretely: a reverse chain r0 >> r1 >> ... >> r_{L-1} on its own (n=L) has L-1 back-edge nodes, so (L-1)*2 > L is true and it correctly takes the fast path. Add ~L isolated or forward-declared tasks and now n≈2L while the back-edge count is still L-1, so the ratio drops below 0.5 and nodes_with_back_edge * 2 > n is false, falling back to the quadratic sweep. A 2000-node DAG with a 1000-long reverse chain plus 1000 independent tasks stays quadratic.
Worth dispatching on something the dilution can't defeat (an absolute back-edge count, or reverse-chain depth) in addition to / instead of the ratio, with a one-line comment on what shape the cutoff discriminates. Mirror the change in airflow-core/src/airflow/serialization/definitions/taskgroup.py:244 and add a padded-reverse-chain dispatch regression.
There was a problem hiding this comment.
Added an absolute back-edge cutoff (>= 32) and padded reverse-chain regressions; the diluted review case now takes the fast path.
A long reverse-declared run could fall back to the sweep when independent children diluted the back-edge ratio. Add an absolute back-edge cutoff and pin the order/dispatch invariants in both TaskGroup implementations.
8506a3f to
a948df8
Compare
Further optimization of
TaskGroup.topological_sortto handle reverse-declared Dags (where many children are declared before their dependencies) efficiently. This is the follow-up to PR #67288.Summary
Addresses the O(N²) worst-case behavior of the greedy-sweep approach on adversarial Dag shapes such as reverse-insertion chains.
Uses a hybrid strategy:
Both approaches emit the same order: level-by-legacy-pass, ties broken by insertion order.
Benchmark Results
Run the benchmark with:
uv run --project task-sdk python dev/bench_topological_sort_comparison.pySee the gist for the benchmark script.
Reverse-Chain Speedup (Worst Case)
Padded Reverse-Chain (Review Case)
Performance Progression
The dispatcher now switches when a group is clearly back-heavy either by ratio or by an absolute back-edge count, so padded reverse-declared Dags no longer fall back to the quadratic sweep.
Test Plan
main_sort_via_pass_numberingmatches_sweep_projectionTaskGroupandSerializedTaskGroupuv run --project task-sdk pytest task-sdk/tests/task_sdk/definitions/test_taskgroup.py -k 'reverse_declared_order_matches_sweep or padded_reverse_chain_uses_pass_numbering or topological_sort_shape_correctness' -xvsuv run --project airflow-core pytest airflow-core/tests/unit/utils/test_task_group.py -k 'serialized_padded_reverse_chain_uses_pass_numbering or topological_sort_serialized_layered' -xvsWas generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Haiku 4.5) following the guidelines