diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 3451152686f2..9b2604d2ee6c 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -19,7 +19,10 @@ use crate::{DataFusionError, Result, ScalarValue}; use arrow::array::ArrayRef; +use arrow::compute; use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions}; +use arrow_array::types::UInt32Type; +use arrow_array::PrimitiveArray; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; use sqlparser::parser::{Parser, ParserError}; @@ -260,6 +263,24 @@ pub(crate) fn parse_identifiers(s: &str) -> Result> { Ok(idents) } +/// Construct a new Vec from the rows of the `arrays` at the `indices`. +pub fn get_arrayref_at_indices( + arrays: &[ArrayRef], + indices: &PrimitiveArray, +) -> Result> { + arrays + .iter() + .map(|array| { + compute::take( + array.as_ref(), + indices, + None, // None: no index check + ) + .map_err(DataFusionError::ArrowError) + }) + .collect() +} + pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec { parse_identifiers(s) .unwrap_or_default() @@ -579,4 +600,37 @@ mod tests { Ok(()) } + + #[test] + fn test_get_arrayref_at_indices() -> Result<()> { + let arrays: Vec = vec![ + Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 9., 10.])), + Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 4.0, 5.0])), + Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 10., 11.0])), + Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 5., 0.0])), + ]; + + let row_indices_vec: Vec> = vec![ + // Get rows 0 and 1 + vec![0, 1], + // Get rows 0 and 1 + vec![0, 2], + // Get rows 1 and 3 + vec![1, 3], + // Get rows 2 and 4 + vec![2, 4], + ]; + for row_indices in row_indices_vec { + let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned()); + let chunk = get_arrayref_at_indices(&arrays, &indices)?; + for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) { + for (idx, orig_idx) in row_indices.iter().enumerate() { + let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?; + let res2 = ScalarValue::try_from_array(arr_chunk, idx)?; + assert_eq!(res1, res2); + } + } + } + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 612b707cc19e..42ba9f8cb381 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -40,12 +40,11 @@ use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; -use arrow::array::{new_null_array, PrimitiveArray}; -use arrow::array::{Array, UInt32Builder}; +use arrow::array::{new_null_array, Array, ArrayRef, PrimitiveArray, UInt32Builder}; use arrow::compute::cast; use arrow::datatypes::{DataType, Schema, UInt32Type}; -use arrow::{array::ArrayRef, compute}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion_common::utils::get_arrayref_at_indices; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; use datafusion_row::accessor::RowAccessor; @@ -294,6 +293,192 @@ impl RecordBatchStream for GroupedHashAggregateStream { } impl GroupedHashAggregateStream { + // Update the row_aggr_state according to groub_by values (result of group_by_expressions) + fn update_group_state( + &mut self, + group_values: &[ArrayRef], + allocated: &mut usize, + ) -> Result> { + let group_rows = self.row_converter.convert_columns(group_values)?; + let n_rows = group_rows.num_rows(); + // 1.1 construct the key from the group values + // 1.2 construct the mapping key if it does not exist + // 1.3 add the row' index to `indices` + + // track which entries in `aggr_state` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; + + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; n_rows]; + create_hashes(group_values, &self.random_state, &mut batch_hashes)?; + + let RowAggregationState { + map: row_map, + group_states: row_group_states, + .. + } = &mut self.row_aggr_state; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let entry = row_map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &row_group_states[*group_idx]; + group_rows.row(row) == group_state.group_by_values.row() + }); + + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut row_group_states[*group_idx]; + + // 1.3 + if group_state.indices.is_empty() { + groups_with_rows.push(*group_idx); + }; + + group_state.indices.push_accounted(row as u32, allocated); // remember this row + } + // 1.2 Need to create new entry + None => { + let accumulator_set = + aggregates::create_accumulators(&self.normal_aggr_expr)?; + // Add new entry to group_states and save newly created index + let group_state = RowGroupState { + group_by_values: group_rows.row(row).owned(), + aggregation_buffer: vec![ + 0; + self.row_aggr_layout.fixed_part_width() + ], + accumulator_set, + indices: vec![row as u32], // 1.3 + }; + let group_idx = row_group_states.len(); + + // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by + // `group_states` (see allocation down below) + *allocated += (std::mem::size_of::() + * group_state.group_by_values.as_ref().len()) + + (std::mem::size_of::() + * group_state.aggregation_buffer.capacity()) + + (std::mem::size_of::() * group_state.indices.capacity()); + + // Allocation done by normal accumulators + *allocated += (std::mem::size_of::>() + * group_state.accumulator_set.capacity()) + + group_state + .accumulator_set + .iter() + .map(|accu| accu.size()) + .sum::(); + + // for hasher function, use precomputed hash value + row_map.insert_accounted( + (hash, group_idx), + |(hash, _group_index)| *hash, + allocated, + ); + + row_group_states.push_accounted(group_state, allocated); + + groups_with_rows.push(group_idx); + } + }; + } + Ok(groups_with_rows) + } + + // Update the accumulator results, according to row_aggr_state. + fn update_accumulators( + &mut self, + groups_with_rows: &[usize], + offsets: &[usize], + row_values: &[Vec], + normal_values: &[Vec], + allocated: &mut usize, + ) -> Result<()> { + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + groups_with_rows + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut self.row_aggr_state.group_states[*group_idx]; + // 2.2 + self.row_accumulators + .iter_mut() + .zip(row_values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let mut state_accessor = + RowAccessor::new_from_layout(self.row_aggr_layout.clone()); + state_accessor + .point_to(0, group_state.aggregation_buffer.as_mut_slice()); + match self.mode { + AggregateMode::Partial => { + accumulator.update_batch(&values, &mut state_accessor) + } + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values, &mut state_accessor) + } + } + }) + // 2.5 + .and(Ok(()))?; + // normal accumulators + group_state + .accumulator_set + .iter_mut() + .zip(normal_values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::>(), + ) + }) + .try_for_each(|(accumulator, values)| { + let size_pre = accumulator.size(); + let res = match self.mode { + AggregateMode::Partial => accumulator.update_batch(&values), + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values) + } + }; + let size_post = accumulator.size(); + *allocated += size_post.saturating_sub(size_pre); + res + }) + // 2.5 + .and({ + group_state.indices.clear(); + Ok(()) + }) + })?; + Ok(()) + } + /// Perform group-by aggregation for the given [`RecordBatch`]. /// /// If successful, this returns the additional number of bytes that were allocated during this process. @@ -303,11 +488,6 @@ impl GroupedHashAggregateStream { let group_by_values = evaluate_group_by(&self.group_by, &batch)?; // Keep track of memory allocated: let mut allocated = 0usize; - let RowAggregationState { - map: row_map, - group_states: row_group_states, - .. - } = &mut self.row_aggr_state; // Evaluate the aggregation expressions. // We could evaluate them after the `take`, but since we need to evaluate all @@ -319,192 +499,31 @@ impl GroupedHashAggregateStream { let row_converter_size_pre = self.row_converter.size(); for group_values in &group_by_values { - let group_rows = self.row_converter.convert_columns(group_values)?; - - // 1.1 construct the key from the group values - // 1.2 construct the mapping key if it does not exist - // 1.3 add the row' index to `indices` - - // track which entries in `aggr_state` have rows in this batch to aggregate - let mut groups_with_rows = vec![]; - - // 1.1 Calculate the group keys for the group values - let mut batch_hashes = vec![0; batch.num_rows()]; - create_hashes(group_values, &self.random_state, &mut batch_hashes)?; - - for (row, hash) in batch_hashes.into_iter().enumerate() { - let entry = row_map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - let group_state = &row_group_states[*group_idx]; - group_rows.row(row) == group_state.group_by_values.row() - }); - - match entry { - // Existing entry for this group value - Some((_hash, group_idx)) => { - let group_state = &mut row_group_states[*group_idx]; - - // 1.3 - if group_state.indices.is_empty() { - groups_with_rows.push(*group_idx); - }; - - group_state - .indices - .push_accounted(row as u32, &mut allocated); // remember this row - } - // 1.2 Need to create new entry - None => { - let accumulator_set = - aggregates::create_accumulators(&self.normal_aggr_expr)?; - // Add new entry to group_states and save newly created index - let group_state = RowGroupState { - group_by_values: group_rows.row(row).owned(), - aggregation_buffer: vec![ - 0; - self.row_aggr_layout - .fixed_part_width() - ], - accumulator_set, - indices: vec![row as u32], // 1.3 - }; - let group_idx = row_group_states.len(); - - // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by - // `group_states` (see allocation down below) - allocated += (std::mem::size_of::() - * group_state.group_by_values.as_ref().len()) - + (std::mem::size_of::() - * group_state.aggregation_buffer.capacity()) - + (std::mem::size_of::() - * group_state.indices.capacity()); - - // Allocation done by normal accumulators - allocated += (std::mem::size_of::>() - * group_state.accumulator_set.capacity()) - + group_state - .accumulator_set - .iter() - .map(|accu| accu.size()) - .sum::(); - - // for hasher function, use precomputed hash value - row_map.insert_accounted( - (hash, group_idx), - |(hash, _group_index)| *hash, - &mut allocated, - ); - - row_group_states.push_accounted(group_state, &mut allocated); - - groups_with_rows.push(group_idx); - } - }; - } + let groups_with_rows = + self.update_group_state(group_values, &mut allocated)?; // Collect all indices + offsets based on keys in this vec let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0); let mut offsets = vec![0]; let mut offset_so_far = 0; for &group_idx in groups_with_rows.iter() { - let indices = &row_group_states[group_idx].indices; + let indices = &self.row_aggr_state.group_states[group_idx].indices; batch_indices.append_slice(indices); offset_so_far += indices.len(); offsets.push(offset_so_far); } let batch_indices = batch_indices.finish(); - let row_values = get_at_indices(&row_aggr_input_values, &batch_indices); - let normal_values = get_at_indices(&normal_aggr_input_values, &batch_indices); - - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - groups_with_rows - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(group_idx, offsets)| { - let group_state = &mut row_group_states[*group_idx]; - // 2.2 - self.row_accumulators - .iter_mut() - .zip(row_values.iter()) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - array.slice(offsets[0], offsets[1] - offsets[0]) - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| { - let mut state_accessor = RowAccessor::new_from_layout( - self.row_aggr_layout.clone(), - ); - state_accessor.point_to( - 0, - group_state.aggregation_buffer.as_mut_slice(), - ); - match self.mode { - AggregateMode::Partial => { - accumulator.update_batch(&values, &mut state_accessor) - } - AggregateMode::FinalPartitioned - | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values, &mut state_accessor) - } - } - }) - // 2.5 - .and(Ok(()))?; - // normal accumulators - group_state - .accumulator_set - .iter_mut() - .zip(normal_values.iter()) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - array.slice(offsets[0], offsets[1] - offsets[0]) - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| { - let size_pre = accumulator.size(); - let res = match self.mode { - AggregateMode::Partial => { - accumulator.update_batch(&values) - } - AggregateMode::FinalPartitioned - | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }; - let size_post = accumulator.size(); - allocated += size_post.saturating_sub(size_pre); - res - }) - // 2.5 - .and({ - group_state.indices.clear(); - Ok(()) - }) - })?; + let row_values = get_at_indices(&row_aggr_input_values, &batch_indices)?; + let normal_values = + get_at_indices(&normal_aggr_input_values, &batch_indices)?; + self.update_accumulators( + &groups_with_rows, + &offsets, + &row_values, + &normal_values, + &mut allocated, + )?; } allocated += self .row_converter @@ -699,23 +718,11 @@ fn read_as_batch(rows: &[Vec], schema: &Schema, row_type: RowType) -> Vec>], + input_values: &[Vec], batch_indices: &PrimitiveArray, -) -> Vec>> { +) -> Result>> { input_values .iter() - .map(|array| { - array - .iter() - .map(|array| { - compute::take( - array.as_ref(), - batch_indices, - None, // None: no index check - ) - .unwrap() - }) - .collect() - }) + .map(|array| get_arrayref_at_indices(array, batch_indices)) .collect() }