Skip to content

perf: reclaim capacity in take_n during OOM-triggered partial-aggregation emit#22205

Open
RyanJamesStewart wants to merge 1 commit into
apache:mainfrom
RyanJamesStewart:perf/take_n-reclaim-capacity
Open

perf: reclaim capacity in take_n during OOM-triggered partial-aggregation emit#22205
RyanJamesStewart wants to merge 1 commit into
apache:mainfrom
RyanJamesStewart:perf/take_n-reclaim-capacity

Conversation

@RyanJamesStewart
Copy link
Copy Markdown

Which issue does this PR close?

Closes #22164.

Rationale for this change

When partial hash aggregation hits the memory limit and switches to early-emit mode (EmitTo::First(n)), three of the four GroupColumn::take_n implementations extract the first n elements with drain(0..n).collect::<Vec<_>>():

  • bytes.rs:383 (self.offsets)
  • primitive.rs:270 (self.group_values)
  • bytes_view.rs:366 (self.views)

As the issue reporter noted: drain does not affect the Vec's capacity, so self.offsets / self.group_values / self.views keeps the pre-emit allocation with reduced length. The whole point of the OOM-triggered early emit is to release memory; with the current code the builder ends up holding the same buffer it had a moment before the emit, which defeats the signal.

(The fourth implementation, boolean.rs::take_n, already does the right thing via swap-then-truncate over BooleanBufferBuilder; out of scope.)

What changes are included in this PR?

For each of the three Vec-based call sites, replace drain(0..n).collect() with std::mem::take + Vec::split_off(n):

// before
let first_n = self.vec.drain(0..n).collect::<Vec<_>>();
//   self.vec : length = len - n, capacity = pre-emit capacity (stranded)
//   first_n  : freshly allocated, sized to n

// after
let mut first_n = std::mem::take(&mut self.vec);
self.vec = first_n.split_off(n);
//   self.vec : new allocation sized to len - n (capacity reclaimed)
//   first_n  : original allocation, length n, retains pre-emit capacity

Vec::split_off(at) keeps [0, at) in self with the original capacity unchanged and returns a new Vec for [at, len) sized to its actual length. Combined with mem::take, the retained side becomes the freshly-sized buffer and the emitted side owns the original allocation. That is the correct assignment for OOM-emit because:

  1. The emitted Vec is consumed immediately by ScalarBuffer::from(Vec<T>), which is zero-copy via Vec::into_raw_parts; the original buffer travels into the output ArrayRef and is freed when downstream consumers drop it.
  2. The retained Vec is what the builder keeps accumulating into. A freshly-sized buffer is what the issue's memory-pressure signal is asking for.

Allocation accounting. Same allocation count as before. mem::take swaps with Vec::new() (no allocation); split_off allocates exactly one Vec for the tail; the original drain(..n).collect() allocated exactly one Vec for the head. The fix changes which side gets the new allocation, not how many allocations happen. The win is correct capacity assignment, not reduced allocation work.

Bonus on bytes.rs. The drain-then-collect is immediately followed by first_n_offsets.push(offset_n) to close the offset range. With collect(), first_n_offsets has capacity exactly n, so the push triggered a reallocation. With split_off, first_n_offsets carries the pre-emit capacity, and the push fits without reallocating.

Diff is 16 insertions, 3 deletions across the three files.

Are these changes tested?

Yes, by existing coverage:

  • test_byte_take_n (bytes.rs)
  • test_byte_view_take_n, test_byte_view_take_n_partial_completed_nonzero_index (bytes_view.rs)
  • test_emit_first_n_for_vectorized_group_values, test_hashtable_modifying_in_emit_first_n exercise the partial-aggregation emit path end-to-end through the GroupColumn trait, covering primitive.rs's implementation.

cargo test -p datafusion-physical-plan --lib aggregates::group_values passes (26 tests, 0 failed). cargo clippy -p datafusion-physical-plan --lib -- -D warnings is clean.

No new test added: the existing tests pin the take_n behavior (output array correctness and remaining-side state), and the fix is behaviorally identical at that level. The change is in which Vec each side ends up holding. The only observable difference is heap reservation on the retained side, which is what the issue reports; Vec::capacity() is implementation-defined and asserting on it from a test would be flaky under allocator changes, so I have not added one. Happy to add a capacity assertion if you'd prefer it; flag and I'll push.

Are there any user-facing changes?

No. No API changes, no behavior change at the operator level. Purely an internal memory-pressure improvement on the partial-aggregation early-emit path.


AI-assisted. The fix shape (mem::take + split_off direction) and the implementation are mine; I used Claude to help survey the three call sites and check that I had not missed a take_n impl elsewhere in group_values/. The substantive comprehension steps I went through:

  • Verified the issue reporter's mechanism description against each of the three sites; the drain-capacity gap is real and the sites share the same shape.
  • Traced the emitted Vec's lifetime through ScalarBuffer::from(Vec<T>) -> Buffer::from(Vec<T>) -> Vec::into_raw_parts, which is what justifies routing the original allocation to the emitted side.
  • Verified the allocation count is unchanged so the PR does not make a perf claim that would not hold up under run benchmarks on a non-OOM workload. The improvement is qualitative (memory-pressure semantics), not throughput.
  • Confirmed boolean.rs::take_n is already capacity-correct (different primitive, BooleanBufferBuilder swap-then-truncate) so the PR is correctly scoped to the three Vec-based sites.

Named unknown: I have not measured the heap-reservation improvement under an actual OOM-bound workload (e.g. clickbench_partitioned with a memory limit). The fix is justified by the mechanism analytically; an empirical peak-RSS confirmation belongs in a follow-up if you want the perf-evidence side strengthened.

…tion emit

Per apache#22164, the three Vec-based GroupColumn::take_n implementations
(bytes.rs, primitive.rs, bytes_view.rs) use drain(0..n).collect() to extract
the first-n elements. drain does not free capacity, so self.offsets /
self.group_values / self.views keeps the pre-emit allocation, defeating the
memory-pressure signal that triggered the early emit in the first place.

Replace with mem::take + Vec::split_off(n) so the retained side becomes the
freshly-sized buffer (capacity reclaimed) and the emitted side owns the
original allocation, which is consumed and dropped immediately downstream
(ScalarBuffer::from(Vec) is zero-copy via Vec::into_raw_parts).

Allocation count is unchanged; the win is correct capacity assignment.
Bytes.rs gets a secondary win: the subsequent first_n_offsets.push(offset_n)
no longer triggers a realloc, since first_n_offsets now carries the
pre-emit capacity rather than the exact-n capacity from collect().

boolean.rs::take_n is already capacity-correct via swap-then-truncate over
BooleanBufferBuilder; out of scope.

Tests: cargo test -p datafusion-physical-plan --lib aggregates::group_values
passes (26 tests including test_byte_take_n, test_byte_view_take_n,
test_byte_view_take_n_partial_completed_nonzero_index,
test_emit_first_n_for_vectorized_group_values).
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extra memory allocated during partial aggregation early emit during OOM handling

1 participant