perf(repartition): use SPSC channels + select_all in non-preserve-order mode#21678
perf(repartition): use SPSC channels + select_all in non-preserve-order mode#21678Dandandan wants to merge 1 commit into
Conversation
…er mode Previously the non-preserve-order path used a single MPSC channel per output partition shared by all N input senders. All input tasks contended on that channel's state mutex on every send. In addition, a single gate coordinated backpressure across all senders to that output. Switch to the same partition_aware_channels layout used by the preserve-order path — one SPSC channel per (input, output) pair — and merge the resulting N per-input streams on the consumer side with `futures::stream::select_all` (first-ready, unordered). Coalescing is lifted out of the per-input streams and done once on the merged output to preserve the existing batch-size contract. Benefits: - No contention on a shared per-output channel state mutex - No shared gate across inputs (per-input backpressure instead) - Consumer behavior (batch sizes, ordering semantics) preserved Existing repartition tests (41) including spill/drop/ordering pass unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
1 similar comment
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-buffer-based (38bec6c) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
In the non-preserve-order path,
RepartitionExeccurrently creates one MPSC channel per output partition, shared by all N input senders. On every send each input task must:This shared-channel state mutex is one of the hottest locks in parallel query execution and scales poorly with input parallelism.
What changes are included in this PR?
Use `partition_aware_channels` (which the preserve-order path already uses) for non-preserve-order too. Each (input, output) pair gets its own SPSC channel — no shared senders, no cross-input contention on sends. On the consumer side, merge the N per-input streams with `futures::stream::select_all` (unordered first-ready) instead of `StreamingMergeBuilder`. Coalescing is lifted out of the per-input streams and applied once on the merged output via a small `CoalescingOutputStream` wrapper so observable batch sizes are unchanged.
This is a proof-of-concept to see whether removing MPSC contention shows a measurable benefit on ClickBench before considering more invasive changes (e.g. replacing the channel transport entirely).
Are these changes tested?
Covered by the existing repartition test suite (41 tests pass), including spill, dropped-output-stream, delayed-stream, and ordering-preservation.
Are there any user-facing changes?
No — same memory semantics, same batch sizes, same ordering guarantees.
🤖 Generated with Claude Code