Describe the bug
For partial aggregation with GroupOrdering::None, a huge batch is produced after consuming all the input RecordBatches, which is further sliced in order to produce batches of batch_size length.
In row_hash.rs, ExecutionState::ProducingOutput(batch) slices the large batch:
let remaining = batch.slice(size, num_remaining);
let output = batch.slice(0, size);
Unfortunately get_array_memory_size for each of these small RecordBatches returns the physical memory of the initial huge batch, causing unnecessary spills in the downstream operator.
Operators such as RepartitionExec use batch.get_array_memory_size() for deciding whether or not to spill.
Related to #19481
To Reproduce
See the linked PR: #22527
Expected behavior
Option 1. Avoid producing the initial huge RecordBatch
Avoid producing a large RecordBatch and slicing it afterwards, instead emit only batch_size RecordBatches at a time, this would also fix: #18907
The issue with this approach is described here: #19906, i.e. the bottleneck of Vec::drain of shifting all the existing elements in the vector (though I'm not sure this is more significant then the performance impact of spilling)
Merging #15591 would probably make this solution more feasible, alternatively VecDeque could be considered: #19906 (comment)
Option 2. Fix memory accounting
Alternatively, fix the memory accounting for these small RecordBatches, so they only report the memory occupied by their slice instead of the entire underlying array
Additional context
No response
Describe the bug
For partial aggregation with
GroupOrdering::None, a huge batch is produced after consuming all the input RecordBatches, which is further sliced in order to produce batches ofbatch_sizelength.In row_hash.rs,
ExecutionState::ProducingOutput(batch)slices the large batch:Unfortunately
get_array_memory_sizefor each of these small RecordBatches returns the physical memory of the initial huge batch, causing unnecessary spills in the downstream operator.Operators such as RepartitionExec use
batch.get_array_memory_size()for deciding whether or not to spill.Related to #19481
To Reproduce
See the linked PR: #22527
Expected behavior
Option 1. Avoid producing the initial huge RecordBatch
Avoid producing a large RecordBatch and slicing it afterwards, instead emit only batch_size RecordBatches at a time, this would also fix: #18907
The issue with this approach is described here: #19906, i.e. the bottleneck of Vec::drain of shifting all the existing elements in the vector (though I'm not sure this is more significant then the performance impact of spilling)
Merging #15591 would probably make this solution more feasible, alternatively VecDeque could be considered: #19906 (comment)
Option 2. Fix memory accounting
Alternatively, fix the memory accounting for these small RecordBatches, so they only report the memory occupied by their slice instead of the entire underlying array
Additional context
No response