perf(aggregate): emit Partial batches at batch_size * target_partitions#21732
perf(aggregate): emit Partial batches at batch_size * target_partitions#21732Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
PartialAgg's output is about to be hash-repartitioned into P output partitions, which slices each input batch into sub-batches of ~size/P rows. With the current emission size of `batch_size`, downstream operators see sub-batches of `batch_size / P` — small enough that a CoalesceBatchesExec is inserted to re-accumulate them back to `batch_size`. Scale the Partial/PartialReduce emission size to `batch_size * target_partitions` so the post-repartition sub-batches land at ~`batch_size` directly. CoalesceBatchesExec then has nothing to do; RepartitionExec does one hash + take pass over a larger batch instead of many passes over small ones. Other modes (Final / FinalPartitioned / Single / SinglePartitioned) keep `batch_size` since their output goes to the final consumer which expects conventional batch sizing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing emit-at-batch-size-times-partitions (4e722b0) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing emit-at-batch-size-times-partitions (4e722b0) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing emit-at-batch-size-times-partitions (4e722b0) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Summary
Scale
AggregateExec's per-batch emission size forPartial/PartialReducemodes tobatch_size * target_partitionsinstead ofbatch_size.Motivation
A typical multi-phase aggregation plan looks like:
RepartitionExechash-partitions each input batch into P output sub-batches, so with PartialAgg emitting atbatch_size, each sub-batch lands at ~batch_size / P.CoalesceBatchesExecis inserted specifically to re-accumulate these tiny sub-batches back tobatch_sizebefore FinalAgg consumes them.This PR flips the approach: emit bigger upstream so the repartition naturally produces right-sized sub-batches on its own.
concat_batchescopies on the hot path.batch_sizeas expected.Other aggregate modes (
Final,FinalPartitioned,Single,SinglePartitioned) are unchanged since their output goes to the final consumer which expects conventional batch sizing.Tradeoffs
Test plan
cargo check -p datafusion-physical-plancargo clippy -p datafusion-physical-plan --all-targets -- -D warningscargo fmt --all -- --checkcargo test -p datafusion-physical-plan --lib aggregates::(86 passed)🤖 Generated with Claude Code