From 4e722b00164ba3fbf5b200c32221dda263ec7d65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 19 Apr 2026 15:56:32 +0200 Subject: [PATCH] perf(aggregate): emit Partial batches at batch_size * target_partitions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PartialAgg's output is about to be hash-repartitioned into P output partitions, which slices each input batch into sub-batches of ~size/P rows. With the current emission size of `batch_size`, downstream operators see sub-batches of `batch_size / P` — small enough that a CoalesceBatchesExec is inserted to re-accumulate them back to `batch_size`. Scale the Partial/PartialReduce emission size to `batch_size * target_partitions` so the post-repartition sub-batches land at ~`batch_size` directly. CoalesceBatchesExec then has nothing to do; RepartitionExec does one hash + take pass over a larger batch instead of many passes over small ones. Other modes (Final / FinalPartitioned / Single / SinglePartitioned) keep `batch_size` since their output goes to the final consumer which expects conventional batch sizing. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../physical-plan/src/aggregates/row_hash.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..110eef4464550 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -386,6 +386,14 @@ pub(crate) struct GroupedHashAggregateStream { /// max rows in output RecordBatches batch_size: usize, + /// Max rows per emitted output batch. Matches `batch_size` for most + /// modes, but for `Partial` / `PartialReduce` — whose output feeds + /// a hash repartition that will split every input batch into `P` + /// sub-batches — we scale this to `batch_size * P` so the + /// post-repartition sub-batches already land at ~`batch_size` and + /// a downstream `CoalesceBatchesExec` has nothing to do. + emit_batch_size: usize, + /// Optional soft limit on the number of `group_values` in a batch /// If the number of `group_values` in a single batch exceeds this value, /// the `GroupedHashAggregateStream` operation immediately switches to @@ -470,6 +478,11 @@ impl GroupedHashAggregateStream { let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); + let emit_batch_size = match agg.mode { + AggregateMode::Partial | AggregateMode::PartialReduce => batch_size + .saturating_mul(context.session_config().target_partitions().max(1)), + _ => batch_size, + }; let input = agg.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let group_by_metrics = GroupByMetrics::new(&agg.metrics, partition); @@ -675,6 +688,7 @@ impl GroupedHashAggregateStream { baseline_metrics, group_by_metrics, batch_size, + emit_batch_size, group_ordering, input_done: false, spill_state, @@ -842,7 +856,7 @@ impl Stream for GroupedHashAggregateStream { ExecutionState::ProducingOutput(batch) => { // slice off a part of the batch, if needed let output_batch; - let size = self.batch_size; + let size = self.emit_batch_size; (self.exec_state, output_batch) = if batch.num_rows() <= size { ( if self.input_done { @@ -860,8 +874,7 @@ impl Stream for GroupedHashAggregateStream { batch.clone(), ) } else { - // output first batch_size rows - let size = self.batch_size; + // output first `emit_batch_size` rows let num_remaining = batch.num_rows() - size; let remaining = batch.slice(size, num_remaining); let output = batch.slice(0, size);