diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index e407be5e390dc..9d7996d0961b3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -380,7 +380,12 @@ where // Given offsets like [0, 2, 4, 5] and n = 1, we expect to get // offsets [0, 2, 3]. We first create two offsets for first_n as [0, 2] and the remaining as [2, 4, 5]. // And we shift the offset starting from 0 for the remaining one, [2, 4, 5] -> [0, 2, 3]. - let mut first_n_offsets = self.offsets.drain(0..n).collect::>(); + // + // `split_off` reclaims capacity on the retained `self.offsets` (sized to `len - n`) + // instead of leaving it at the pre-emit capacity; the emitted side owns the original + // allocation, which is freed when the output `ArrayRef` is dropped downstream. + let mut first_n_offsets = std::mem::take(&mut self.offsets); + self.offsets = first_n_offsets.split_off(n); let offset_n = *self.offsets.first().unwrap(); self.offsets .iter_mut() diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index 9267cf4f27f35..8645f786ee5b8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -363,7 +363,11 @@ impl ByteViewGroupValueBuilder { // // - Shift the `buffer index` of remaining non-inlined `views` // - let first_n_views = self.views.drain(0..n).collect::>(); + // `split_off` reclaims capacity on the retained `self.views` (sized to `len - n`); + // `drain(..n).collect()` would leave it at the pre-emit capacity, which defeats + // the OOM-emit signal. + let mut first_n_views = std::mem::take(&mut self.views); + self.views = first_n_views.split_off(n); let last_non_inlined_view = first_n_views .iter() diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index bdc06fa553de5..4dd8efdb1a4e3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -267,7 +267,11 @@ impl GroupColumn } fn take_n(&mut self, n: usize) -> ArrayRef { - let first_n = self.group_values.drain(0..n).collect::>(); + // `split_off` reclaims capacity on the retained `self.group_values` (sized to + // `len - n`); `drain(..n).collect()` would leave it at the pre-emit capacity, + // which defeats the OOM-emit signal. + let mut first_n = std::mem::take(&mut self.group_values); + self.group_values = first_n.split_off(n); let first_n_nulls = if NULLABLE { self.nulls.take_n(n) } else { None };