From ab1cc59cf7d5a09091f204b87578566978205f53 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Jul 2023 11:24:47 -0400 Subject: [PATCH 1/3] Don't store hashes in GroupOrdering --- .../physical_plan/aggregates/order/full.rs | 29 ++++-------------- .../src/physical_plan/aggregates/order/mod.rs | 9 ++---- .../physical_plan/aggregates/order/partial.rs | 30 ++++--------------- .../src/physical_plan/aggregates/row_hash.rs | 18 +++++------ 4 files changed, 22 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/order/full.rs b/datafusion/core/src/physical_plan/aggregates/order/full.rs index d95433a998f7..69b308da7c8c 100644 --- a/datafusion/core/src/physical_plan/aggregates/order/full.rs +++ b/datafusion/core/src/physical_plan/aggregates/order/full.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion_execution::memory_pool::proxy::VecAllocExt; - use crate::physical_expr::EmitTo; /// Tracks grouping state when the data is ordered entirely by its @@ -58,8 +56,6 @@ use crate::physical_expr::EmitTo; #[derive(Debug)] pub(crate) struct GroupOrderingFull { state: State, - /// Hash values for groups in 0..current - hashes: Vec, } #[derive(Debug)] @@ -79,7 +75,6 @@ impl GroupOrderingFull { pub fn new() -> Self { Self { state: State::Start, - hashes: vec![], } } @@ -101,19 +96,17 @@ impl GroupOrderingFull { } /// remove the first n groups from the internal state, shifting - /// all existing indexes down by `n`. Returns stored hash values - pub fn remove_groups(&mut self, n: usize) -> &[u64] { + /// all existing indexes down by `n` + pub fn remove_groups(&mut self, n: usize) { match &mut self.state { State::Start => panic!("invalid state: start"), State::InProgress { current } => { // shift down by n assert!(*current >= n); *current -= n; - self.hashes.drain(0..n); } State::Complete { .. } => panic!("invalid state: complete"), - }; - &self.hashes + } } /// Note that the input is complete so any outstanding groups are done as well @@ -123,20 +116,8 @@ impl GroupOrderingFull { /// Called when new groups are added in a batch. See documentation /// on [`super::GroupOrdering::new_groups`] - pub fn new_groups( - &mut self, - group_indices: &[usize], - batch_hashes: &[u64], - total_num_groups: usize, - ) { + pub fn new_groups(&mut self, total_num_groups: usize) { assert_ne!(total_num_groups, 0); - assert_eq!(group_indices.len(), batch_hashes.len()); - - // copy any hash values - self.hashes.resize(total_num_groups, 0); - for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) { - self.hashes[group_index] = hash; - } // Update state let max_group_index = total_num_groups - 1; @@ -158,6 +139,6 @@ impl GroupOrderingFull { } pub(crate) fn size(&self) -> usize { - std::mem::size_of::() + self.hashes.allocated_size() + std::mem::size_of::() } } diff --git a/datafusion/core/src/physical_plan/aggregates/order/mod.rs b/datafusion/core/src/physical_plan/aggregates/order/mod.rs index 4e1da3531952..81bf38aac3a6 100644 --- a/datafusion/core/src/physical_plan/aggregates/order/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/order/mod.rs @@ -84,9 +84,9 @@ impl GroupOrdering { /// remove the first n groups from the internal state, shifting /// all existing indexes down by `n`. Returns stored hash values - pub fn remove_groups(&mut self, n: usize) -> &[u64] { + pub fn remove_groups(&mut self, n: usize) { match self { - GroupOrdering::None => &[], + GroupOrdering::None => {} GroupOrdering::Partial(partial) => partial.remove_groups(n), GroupOrdering::Full(full) => full.remove_groups(n), } @@ -106,7 +106,6 @@ impl GroupOrdering { &mut self, batch_group_values: &[ArrayRef], group_indices: &[usize], - batch_hashes: &[u64], total_num_groups: usize, ) -> Result<()> { match self { @@ -115,13 +114,11 @@ impl GroupOrdering { partial.new_groups( batch_group_values, group_indices, - batch_hashes, total_num_groups, )?; } - GroupOrdering::Full(full) => { - full.new_groups(group_indices, batch_hashes, total_num_groups); + full.new_groups(total_num_groups); } }; Ok(()) diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs b/datafusion/core/src/physical_plan/aggregates/order/partial.rs index be8cd5967113..ac32c69fd568 100644 --- a/datafusion/core/src/physical_plan/aggregates/order/partial.rs +++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs @@ -71,9 +71,6 @@ pub(crate) struct GroupOrderingPartial { /// Converter for the sort key (used on the group columns /// specified in `order_indexes`) row_converter: RowConverter, - - /// Hash values for groups in 0..completed - hashes: Vec, } #[derive(Debug, Default)] @@ -127,7 +124,6 @@ impl GroupOrderingPartial { state: State::Start, order_indices: order_indices.to_vec(), row_converter: RowConverter::new(fields)?, - hashes: vec![], }) } @@ -167,8 +163,8 @@ impl GroupOrderingPartial { } /// remove the first n groups from the internal state, shifting - /// all existing indexes down by `n`. Returns stored hash values - pub fn remove_groups(&mut self, n: usize) -> &[u64] { + /// all existing indexes down by `n` + pub fn remove_groups(&mut self, n: usize) { match &mut self.state { State::Taken => unreachable!("State previously taken"), State::Start => panic!("invalid state: start"), @@ -182,12 +178,9 @@ impl GroupOrderingPartial { *current -= n; assert!(*current_sort >= n); *current_sort -= n; - // Note sort_key stays the same, we are just translating group indexes - self.hashes.drain(0..n); } State::Complete { .. } => panic!("invalid state: complete"), - }; - &self.hashes + } } /// Note that the input is complete so any outstanding groups are done as well @@ -204,18 +197,15 @@ impl GroupOrderingPartial { &mut self, batch_group_values: &[ArrayRef], group_indices: &[usize], - batch_hashes: &[u64], total_num_groups: usize, ) -> Result<()> { assert!(total_num_groups > 0); assert!(!batch_group_values.is_empty()); - assert_eq!(group_indices.len(), batch_hashes.len()); let max_group_index = total_num_groups - 1; // compute the sort key values for each group let sort_keys = self.compute_sort_keys(batch_group_values)?; - assert_eq!(sort_keys.num_rows(), batch_hashes.len()); let old_state = std::mem::take(&mut self.state); let (mut current_sort, mut sort_key) = match &old_state { @@ -231,16 +221,9 @@ impl GroupOrderingPartial { } }; - // copy any hash values, and find latest sort key - self.hashes.resize(total_num_groups, 0); - let iter = group_indices - .iter() - .zip(batch_hashes.iter()) - .zip(sort_keys.iter()); - - for ((&group_index, &hash), group_sort_key) in iter { - self.hashes[group_index] = hash; - + // Find latest sort key + let iter = group_indices.iter().zip(sort_keys.iter()); + for (&group_index, group_sort_key) in iter { // Does this group have seen a new sort_key? if sort_key != group_sort_key { current_sort = group_index; @@ -262,6 +245,5 @@ impl GroupOrderingPartial { std::mem::size_of::() + self.order_indices.allocated_size() + self.row_converter.size() - + self.hashes.allocated_size() } } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index b48e8f38e9dc..b3d8ead5e1e1 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -485,7 +485,6 @@ impl GroupedHashAggregateStream { self.group_ordering.new_groups( group_values, group_indices, - batch_hashes, total_num_groups, )?; } @@ -624,15 +623,14 @@ impl GroupedHashAggregateStream { } std::mem::swap(&mut new_group_values, &mut self.group_values); - // rebuild hash table (maybe we should remove the - // entries for each group that was emitted rather than - // rebuilding the whole thing - - let hashes = self.group_ordering.remove_groups(n); - assert_eq!(hashes.len(), self.group_values.num_rows()); - self.map.clear(); - for (idx, &hash) in hashes.iter().enumerate() { - self.map.insert(hash, (hash, idx), |(hash, _)| *hash); + self.group_ordering.remove_groups(n); + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + if bucket.as_ref().1 < n { + self.map.erase(bucket); + } + } } } }; From 0a6f23bf5bd456d47c36fecb0b232df5e05ce1dc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Jul 2023 14:08:17 -0400 Subject: [PATCH 2/3] Update group IDs --- datafusion/core/src/physical_plan/aggregates/row_hash.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index b3d8ead5e1e1..ab17e5a75eb1 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -627,8 +627,9 @@ impl GroupedHashAggregateStream { // SAFETY: self.map outlives iterator and is not modified concurrently unsafe { for bucket in self.map.iter() { - if bucket.as_ref().1 < n { - self.map.erase(bucket); + match bucket.as_ref().1.checked_sub(n) { + None => self.map.erase(bucket), + Some(sub) => bucket.as_mut().1 = sub, } } } From 99b3eebd27c64f35601d04259a3c18665e175f37 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Jul 2023 15:08:45 -0400 Subject: [PATCH 3/3] Review feedback --- datafusion/core/src/physical_plan/aggregates/row_hash.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index ab17e5a75eb1..59ffbe5cf1a2 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -627,9 +627,12 @@ impl GroupedHashAggregateStream { // SAFETY: self.map outlives iterator and is not modified concurrently unsafe { for bucket in self.map.iter() { + // Decrement group index by n match bucket.as_ref().1.checked_sub(n) { - None => self.map.erase(bucket), + // Group index was >= n, shift value down Some(sub) => bucket.as_mut().1 = sub, + // Group index was < n, so remove from table + None => self.map.erase(bucket), } } }