perf(repartition): drive input + completion from a single task#21788
perf(repartition): drive input + completion from a single task#21788Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
Previously each RepartitionExec input partition spawned two tokio tasks:
`pull_from_input` (the worker) and `wait_for_task` (a shepherd that only
awaited the worker's `JoinHandle` and forwarded EOS or the joined error
to the per-output senders). Observing ClickBench under tokio-console
showed a perfect 1:1 shepherd/worker split with shepherds at ~4 polls /
<30µs busy — pure overhead that doubled the task count per
RepartitionExec and added a cross-task waker round-trip per partition.
Fold the forwarder into the same task by wrapping `pull_from_input` in
`AssertUnwindSafe(..).catch_unwind()` and emitting `None` / error /
synthesized-panic markers to `senders` after the inner future resolves.
Panic safety is preserved: a panic is surfaced as
`DataFusionError::Context("Join Error", Execution("task panicked: …"))`,
mirroring the previous `JoinError` wrapping so existing callers see an
equivalent error shape.
Net effect: tasks spawned per RepartitionExec drops from 2·N to N
(where N = input partitions).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-single-task-per-input (d52306d) to 9a1ed57 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-single-task-per-input (d52306d) to 9a1ed57 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-single-task-per-input (d52306d) to 9a1ed57 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Benchmark for this request hit the 7200s job deadline before finishing. Benchmarks requested: Kubernetes messageFile an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
Each
RepartitionExecinput partition currently spawns two tokio tasks:pull_from_input— the worker that pulls batches from the input stream and routes them to per-output senders.wait_for_task— a shepherd that only awaits the worker'sJoinHandleand forwards EOS (None) or the joined error to each output sender.Observing a ClickBench run under
tokio-consoleshows the shepherd is pure overhead: on ~300 capturedSpawnedTaskrows, it's an exact 1:1 split between workers (polls ≈ 300–600, busy ≈ hundreds of ms) and shepherds (polls ≈ 3–5, busy ≈ 4–30 µs). That doubles the number of tokio tasks created perRepartitionExecand adds a cross-task waker round-trip per partition for no useful work.Example paired rows from the capture (same total lifetime, radically different workload):
What changes are included in this PR?
RepartitionExec::drive_inputthat wrapspull_from_inputinAssertUnwindSafe(..).catch_unwind()and, after the inner future resolves, forwardsNone/ error / a synthesized panic error to each output sender from the same task.RepartitionExec::wait_for_task.RepartitionExecState::consume_input_streamsnow spawns exactly one task per input partition.Panic safety is preserved: a panic from
pull_from_inputis surfaced asDataFusionError::Context("Join Error", Execution("task panicked: …")), mirroring the previousJoinError-wrapping path closely enough for existing callers.Net effect: tokio tasks spawned per
RepartitionExecdrops from2 · NtoN(whereN= input partitions), with one fewer cross-task waker hop per partition.Are these changes tested?
Yes — covered by existing repartition tests in
datafusion/physical-plan/src/repartition/mod.rs:cargo test -p datafusion-physical-plan --lib repartition::→ 41/41 pass, including:error_for_input_exec(exercises panic propagation from input)repartition_with_error_in_stream(exercises error propagation)cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings→ clean.cargo fmt --check→ clean.Are there any user-facing changes?
No public API changes. The only behavior difference is the wording of panic errors surfaced through repartition: they now carry
"task panicked: {msg}"(synthesized from the panic payload viadowncast_ref) instead of the exactJoinErrorDisplay, but remain wrapped inDataFusionError::Context("Join Error", …)as before.