diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index c8d4df20fcca..9807b8cfb95c 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1082,7 +1082,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand", - "smallvec", "sqlparser", "tempfile", "tokio", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 913d3c84beef..4e41b4f32e3f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -85,7 +85,6 @@ parquet = { workspace = true } percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" -smallvec = { version = "1.6", features = ["union"] } sqlparser = { workspace = true } tempfile = "3" tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 0928998ec841..f7d257e324ea 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -36,7 +36,7 @@ use crate::physical_plan::{ expressions::Column, expressions::PhysicalSortExpr, hash_utils::create_hashes, - joins::hash_join_utils::JoinHashMap, + joins::hash_join_utils::{JoinHashMap, JoinHashMapType}, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, combine_join_equivalence_properties, estimate_join_statistics, @@ -53,29 +53,17 @@ use super::{ PartitionMode, }; +use arrow::array::{ + Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, + UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, +}; use arrow::buffer::BooleanBuffer; use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow::{ - array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array, Date64Array, - Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, - PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, - UInt8Array, - }, - datatypes::{ - ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema, - SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, - }, - util::bit_util, -}; +use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; -use datafusion_common::cast::{as_dictionary_array, as_string_array}; use datafusion_common::{ exec_err, internal_err, plan_err, DataFusionError, JoinType, Result, }; @@ -600,6 +588,7 @@ async fn collect_left_input( offset, &random_state, &mut hashes_buffer, + 0, )?; offset += batch.num_rows(); } @@ -612,14 +601,18 @@ async fn collect_left_input( /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, /// assuming that the [RecordBatch] corresponds to the `index`th -pub fn update_hash( +pub fn update_hash( on: &[Column], batch: &RecordBatch, - hash_map: &mut JoinHashMap, + hash_map: &mut T, offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, -) -> Result<()> { + deleted_offset: usize, +) -> Result<()> +where + T: JoinHashMapType, +{ // evaluate the keys let keys_values = on .iter() @@ -629,20 +622,22 @@ pub fn update_hash( // calculate the hash values let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + // For usual JoinHashmap, the implementation is void. + hash_map.extend_zero(batch.num_rows()); + // insert hashes to key of the hashmap + let (mut_map, mut_list) = hash_map.get_mut(); for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .map - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, index)) = item { // Already exists: add index to next array let prev_index = *index; // Store new value inside hashmap *index = (row + offset + 1) as u64; // Update chained Vec at row + offset with previous value - hash_map.next[row + offset] = prev_index; + mut_list[row + offset - deleted_offset] = prev_index; } else { - hash_map.map.insert( + mut_map.insert( *hash_value, // store the value + 1 as 0 value reserved for end of list (*hash_value, (row + offset + 1) as u64), @@ -725,8 +720,8 @@ impl RecordBatchStream for HashJoinStream { // Build indices: 4, 5, 6, 6 // Probe indices: 3, 3, 4, 5 #[allow(clippy::too_many_arguments)] -pub fn build_equal_condition_join_indices( - build_hashmap: &JoinHashMap, +pub fn build_equal_condition_join_indices( + build_hashmap: &T, build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, build_on: &[Column], @@ -736,6 +731,7 @@ pub fn build_equal_condition_join_indices( hashes_buffer: &mut Vec, filter: Option<&JoinFilter>, build_side: JoinSide, + deleted_offset: Option, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -783,22 +779,33 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. + let hash_map = build_hashmap.get_map(); + let next_chain = build_hashmap.get_list(); for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, index)) = build_hashmap - .map - .get(*hash_value, |(hash, _)| *hash_value == *hash) + if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) { let mut i = *index - 1; loop { - build_indices.append(i); + let build_row_value = if let Some(offset) = deleted_offset { + // This arguments means that we prune the next index way before here. + if i < offset as u64 { + // End of the list due to pruning + break; + } + i - offset as u64 + } else { + i + }; + build_indices.append(build_row_value); probe_indices.append(row as u32); // Follow the chain to get the next index value - let next = build_hashmap.next[i as usize]; + let next = next_chain[build_row_value as usize]; if next == 0 { // end of list break; @@ -837,338 +844,6 @@ pub fn build_equal_condition_join_indices( ) } -macro_rules! equal_rows_elem { - ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{ - let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap(); - let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap(); - - match (left_array.is_null($left), right_array.is_null($right)) { - (false, false) => left_array.value($left) == right_array.value($right), - (true, true) => $null_equals_null, - _ => false, - } - }}; -} - -macro_rules! equal_rows_elem_with_string_dict { - ($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{ - let left_array: &DictionaryArray<$key_array_type> = - as_dictionary_array::<$key_array_type>($l).unwrap(); - let right_array: &DictionaryArray<$key_array_type> = - as_dictionary_array::<$key_array_type>($r).unwrap(); - - let (left_values, left_values_index) = { - let keys_col = left_array.keys(); - if keys_col.is_valid($left) { - let values_index = keys_col - .value($left) - .to_usize() - .expect("Can not convert index to usize in dictionary"); - - ( - as_string_array(left_array.values()).unwrap(), - Some(values_index), - ) - } else { - (as_string_array(left_array.values()).unwrap(), None) - } - }; - let (right_values, right_values_index) = { - let keys_col = right_array.keys(); - if keys_col.is_valid($right) { - let values_index = keys_col - .value($right) - .to_usize() - .expect("Can not convert index to usize in dictionary"); - - ( - as_string_array(right_array.values()).unwrap(), - Some(values_index), - ) - } else { - (as_string_array(right_array.values()).unwrap(), None) - } - }; - - match (left_values_index, right_values_index) { - (Some(left_values_index), Some(right_values_index)) => { - left_values.value(left_values_index) - == right_values.value(right_values_index) - } - (None, None) => $null_equals_null, - _ => false, - } - }}; -} - -/// Left and right row have equal values -/// If more data types are supported here, please also add the data types in can_hash function -/// to generate hash join logical plan. -pub fn equal_rows( - left: usize, - right: usize, - left_arrays: &[ArrayRef], - right_arrays: &[ArrayRef], - null_equals_null: bool, -) -> Result { - let mut err = None; - let res = left_arrays - .iter() - .zip(right_arrays) - .all(|(l, r)| match l.data_type() { - DataType::Null => { - // lhs and rhs are both `DataType::Null`, so the equal result - // is dependent on `null_equals_null` - null_equals_null - } - DataType::Boolean => { - equal_rows_elem!(BooleanArray, l, r, left, right, null_equals_null) - } - DataType::Int8 => { - equal_rows_elem!(Int8Array, l, r, left, right, null_equals_null) - } - DataType::Int16 => { - equal_rows_elem!(Int16Array, l, r, left, right, null_equals_null) - } - DataType::Int32 => { - equal_rows_elem!(Int32Array, l, r, left, right, null_equals_null) - } - DataType::Int64 => { - equal_rows_elem!(Int64Array, l, r, left, right, null_equals_null) - } - DataType::UInt8 => { - equal_rows_elem!(UInt8Array, l, r, left, right, null_equals_null) - } - DataType::UInt16 => { - equal_rows_elem!(UInt16Array, l, r, left, right, null_equals_null) - } - DataType::UInt32 => { - equal_rows_elem!(UInt32Array, l, r, left, right, null_equals_null) - } - DataType::UInt64 => { - equal_rows_elem!(UInt64Array, l, r, left, right, null_equals_null) - } - DataType::Float32 => { - equal_rows_elem!(Float32Array, l, r, left, right, null_equals_null) - } - DataType::Float64 => { - equal_rows_elem!(Float64Array, l, r, left, right, null_equals_null) - } - DataType::Date32 => { - equal_rows_elem!(Date32Array, l, r, left, right, null_equals_null) - } - DataType::Date64 => { - equal_rows_elem!(Date64Array, l, r, left, right, null_equals_null) - } - DataType::Time32(time_unit) => match time_unit { - TimeUnit::Second => { - equal_rows_elem!(Time32SecondArray, l, r, left, right, null_equals_null) - } - TimeUnit::Millisecond => { - equal_rows_elem!(Time32MillisecondArray, l, r, left, right, null_equals_null) - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - DataType::Time64(time_unit) => match time_unit { - TimeUnit::Microsecond => { - equal_rows_elem!(Time64MicrosecondArray, l, r, left, right, null_equals_null) - } - TimeUnit::Nanosecond => { - equal_rows_elem!(Time64NanosecondArray, l, r, left, right, null_equals_null) - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => { - equal_rows_elem!( - TimestampSecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Millisecond => { - equal_rows_elem!( - TimestampMillisecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Microsecond => { - equal_rows_elem!( - TimestampMicrosecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Nanosecond => { - equal_rows_elem!( - TimestampNanosecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - }, - DataType::Utf8 => { - equal_rows_elem!(StringArray, l, r, left, right, null_equals_null) - } - DataType::LargeUtf8 => { - equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) - } - DataType::FixedSizeBinary(_) => { - equal_rows_elem!(FixedSizeBinaryArray, l, r, left, right, null_equals_null) - } - DataType::Decimal128(_, lscale) => match r.data_type() { - DataType::Decimal128(_, rscale) => { - if lscale == rscale { - equal_rows_elem!( - Decimal128Array, - l, - r, - left, - right, - null_equals_null - ) - } else { - err = Some(internal_err!( - "Inconsistent Decimal data type in hasher, the scale should be same" - )); - false - } - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - }, - DataType::Dictionary(key_type, value_type) - if *value_type.as_ref() == DataType::Utf8 => - { - match key_type.as_ref() { - DataType::Int8 => { - equal_rows_elem_with_string_dict!( - Int8Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int16 => { - equal_rows_elem_with_string_dict!( - Int16Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int32 => { - equal_rows_elem_with_string_dict!( - Int32Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int64 => { - equal_rows_elem_with_string_dict!( - Int64Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt8 => { - equal_rows_elem_with_string_dict!( - UInt8Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt16 => { - equal_rows_elem_with_string_dict!( - UInt16Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt32 => { - equal_rows_elem_with_string_dict!( - UInt32Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt64 => { - equal_rows_elem_with_string_dict!( - UInt64Type, - l, - r, - left, - right, - null_equals_null - ) - } - _ => { - // should not happen - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - } - other => { - // This is internal because we should have caught this before. - err = Some(internal_err!( - "Unsupported data type in hasher: {other}" - )); - false - } - }); - - err.unwrap_or(Ok(res)) -} - // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, @@ -1297,6 +972,7 @@ impl HashJoinStream { &mut hashes_buffer, self.filter.as_ref(), JoinSide::Left, + None, ); let result = match left_right_indices { @@ -2759,6 +2435,7 @@ mod tests { &mut vec![0; right.num_rows()], None, JoinSide::Left, + None, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 37790e6bb8a6..b80413b53d89 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -18,30 +18,29 @@ //! This file contains common subroutines for regular and symmetric hash join //! related functionality, used both in join calculations and optimization rules. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Debug, Formatter}; +use std::ops::IndexMut; use std::sync::Arc; use std::{fmt, usize}; -use arrow::datatypes::{ArrowNativeType, SchemaRef}; +use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; +use crate::physical_plan::ExecutionPlan; use arrow::compute::concat_batches; +use arrow::datatypes::{ArrowNativeType, SchemaRef}; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch}; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound}; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; + use hashbrown::raw::RawTable; use hashbrown::HashSet; use parking_lot::Mutex; -use smallvec::SmallVec; -use std::fmt::{Debug, Formatter}; - -use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; -use crate::physical_plan::ExecutionPlan; -use datafusion_common::Result; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, @@ -94,20 +93,15 @@ use datafusion_common::Result; // --------------------- // | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) // --------------------- - // TODO: speed up collision checks // https://github.com/apache/arrow-datafusion/issues/50 pub struct JoinHashMap { - // Stores hash value to first index + // Stores hash value to last row index pub map: RawTable<(u64, u64)>, // Stores indices in chained list data structure pub next: Vec, } -/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate -/// and shrink the indices. -pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); - impl JoinHashMap { pub(crate) fn with_capacity(capacity: usize) -> Self { JoinHashMap { @@ -117,37 +111,201 @@ impl JoinHashMap { } } -impl SymmetricJoinHashMap { +/// Trait defining methods that must be implemented by a hash map type to be used for joins. +pub trait JoinHashMapType { + /// The type of list used to store the hash values. + type NextType: IndexMut; + /// Extend with zero + fn extend_zero(&mut self, len: usize); + /// Returns mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); + /// Returns a reference to the hash map. + fn get_map(&self) -> &RawTable<(u64, u64)>; + /// Returns a reference to the next. + fn get_list(&self) -> &Self::NextType; +} + +/// Implementation of `JoinHashMapType` for `JoinHashMap`. +impl JoinHashMapType for JoinHashMap { + type NextType = Vec; + + // Void implementation + fn extend_zero(&mut self, _: usize) {} + + /// Get mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + (&mut self.map, &mut self.next) + } + + /// Get a reference to the hash map. + fn get_map(&self) -> &RawTable<(u64, u64)> { + &self.map + } + + /// Get a reference to the next. + fn get_list(&self) -> &Self::NextType { + &self.next + } +} + +/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`. +impl JoinHashMapType for PruningJoinHashMap { + type NextType = VecDeque; + + // Extend with zero + fn extend_zero(&mut self, len: usize) { + self.next.resize(self.next.len() + len, 0) + } + + /// Get mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + (&mut self.map, &mut self.next) + } + + /// Get a reference to the hash map. + fn get_map(&self) -> &RawTable<(u64, u64)> { + &self.map + } + + /// Get a reference to the next. + fn get_list(&self) -> &Self::NextType { + &self.next + } +} + +impl fmt::Debug for JoinHashMap { + fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } +} + +/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with +/// the capability of pruning elements in an efficient manner. This structure +/// is particularly useful for cases where it's necessary to remove elements +/// from the map based on their buffer order. +/// +/// # Example +/// +/// ``` text +/// Let's continue the example of `JoinHashMap` and then show how `PruningJoinHashMap` would +/// handle the pruning scenario. +/// +/// Insert the pair (1,4) into the `PruningJoinHashMap`: +/// map: +/// --------- +/// | 1 | 5 | +/// | 2 | 3 | +/// --------- +/// list: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) +/// --------------------- +/// +/// Now, let's prune 3 rows from `PruningJoinHashMap`: +/// map: +/// --------- +/// | 1 | 5 | +/// --------- +/// list: +/// --------- +/// | 2 | 4 | <--- hash value 1 maps to 2 (5 - 3), 1 (4 - 3), NA (2 - 3) (which means indices values 1,0) +/// --------- +/// +/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap` since +/// there are no values left for this key. +/// ``` +pub struct PruningJoinHashMap { + /// Stores hash value to last row index + pub map: RawTable<(u64, u64)>, + /// Stores indices in chained list data structure + pub next: VecDeque, +} + +impl PruningJoinHashMap { + /// Constructs a new `PruningJoinHashMap` with the given capacity. + /// Both the map and the list are pre-allocated with the provided capacity. + /// + /// # Arguments + /// * `capacity`: The initial capacity of the hash map. + /// + /// # Returns + /// A new instance of `PruningJoinHashMap`. pub(crate) fn with_capacity(capacity: usize) -> Self { - Self(RawTable::with_capacity(capacity)) + PruningJoinHashMap { + map: RawTable::with_capacity(capacity), + next: VecDeque::with_capacity(capacity), + } } - /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is. - /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25% - /// when necessary. You can adjust the scale_factor value to achieve the desired - /// balance between memory usage and performance. - // - // If you increase the scale_factor, the capacity will shrink less aggressively, - // leading to potentially higher memory usage but fewer resizes. - // Conversely, if you decrease the scale_factor, the capacity will shrink more aggressively, - // potentially leading to lower memory usage but more frequent resizing. + /// Shrinks the capacity of the hash map, if necessary, based on the + /// provided scale factor. + /// + /// # Arguments + /// * `scale_factor`: The scale factor that determines how conservative the + /// shrinking strategy is. The capacity will be reduced by 1/`scale_factor` + /// when necessary. + /// + /// # Note + /// Increasing the scale factor results in less aggressive capacity shrinking, + /// leading to potentially higher memory usage but fewer resizes. Conversely, + /// decreasing the scale factor results in more aggressive capacity shrinking, + /// potentially leading to lower memory usage but more frequent resizing. pub(crate) fn shrink_if_necessary(&mut self, scale_factor: usize) { - let capacity = self.0.capacity(); - let len = self.0.len(); + let capacity = self.map.capacity(); - if capacity > scale_factor * len { + if capacity > scale_factor * self.map.len() { let new_capacity = (capacity * (scale_factor - 1)) / scale_factor; - self.0.shrink_to(new_capacity, |(hash, _)| *hash) + // Resize the map with the new capacity. + self.map.shrink_to(new_capacity, |(hash, _)| *hash) } } + /// Calculates the size of the `PruningJoinHashMap` in bytes. + /// + /// # Returns + /// The size of the hash map in bytes. pub(crate) fn size(&self) -> usize { - self.0.allocation_info().1.size() + self.map.allocation_info().1.size() + + self.next.capacity() * std::mem::size_of::() } -} -impl fmt::Debug for JoinHashMap { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + /// Removes hash values from the map and the list based on the given pruning + /// length and deleting offset. + /// + /// # Arguments + /// * `prune_length`: The number of elements to remove from the list. + /// * `deleting_offset`: The offset used to determine which hash values to remove from the map. + /// + /// # Returns + /// A `Result` indicating whether the operation was successful. + pub(crate) fn prune_hash_values( + &mut self, + prune_length: usize, + deleting_offset: u64, + shrink_factor: usize, + ) -> Result<()> { + // Remove elements from the list based on the pruning length. + self.next.drain(0..prune_length); + + // Calculate the keys that should be removed from the map. + let removable_keys = unsafe { + self.map + .iter() + .map(|bucket| bucket.as_ref()) + .filter_map(|(hash, tail_index)| { + (*tail_index < prune_length as u64 + deleting_offset).then_some(*hash) + }) + .collect::>() + }; + + // Remove the keys from the map. + removable_keys.into_iter().for_each(|hash_value| { + self.map + .remove_entry(hash_value, |(hash, _)| hash_value == *hash); + }); + + // Shrink the map if necessary. + self.shrink_if_necessary(shrink_factor); Ok(()) } } @@ -682,7 +840,6 @@ pub mod tests { use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, cast, col, lit}; - use smallvec::smallvec; use std::sync::Arc; /// Filter expr for a + b > c + 10 AND a + b < c + 100 @@ -1020,40 +1177,40 @@ pub mod tests { #[test] fn test_shrink_if_necessary() { let scale_factor = 4; - let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100); + let mut join_hash_map = PruningJoinHashMap::with_capacity(100); let data_size = 2000; let deleted_part = 3 * data_size / 4; // Add elements to the JoinHashMap for hash_value in 0..data_size { - join_hash_map.0.insert( + join_hash_map.map.insert( hash_value, - (hash_value, smallvec![hash_value]), + (hash_value, hash_value), |(hash, _)| *hash, ); } - assert_eq!(join_hash_map.0.len(), data_size as usize); - assert!(join_hash_map.0.capacity() >= data_size as usize); + assert_eq!(join_hash_map.map.len(), data_size as usize); + assert!(join_hash_map.map.capacity() >= data_size as usize); // Remove some elements from the JoinHashMap for hash_value in 0..deleted_part { join_hash_map - .0 + .map .remove_entry(hash_value, |(hash, _)| hash_value == *hash); } - assert_eq!(join_hash_map.0.len(), (data_size - deleted_part) as usize); + assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as usize); // Old capacity - let old_capacity = join_hash_map.0.capacity(); + let old_capacity = join_hash_map.map.capacity(); // Test shrink_if_necessary join_hash_map.shrink_if_necessary(scale_factor); // The capacity should be reduced by the scale factor let new_expected_capacity = - join_hash_map.0.capacity() * (scale_factor - 1) / scale_factor; - assert!(join_hash_map.0.capacity() >= new_expected_capacity); - assert!(join_hash_map.0.capacity() <= old_capacity); + join_hash_map.map.capacity() * (scale_factor - 1) / scale_factor; + assert!(join_hash_map.map.capacity() >= new_expected_capacity); + assert!(join_hash_map.map.capacity() <= old_capacity); } } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 3309a39b61c3..1c664adfbb71 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -25,7 +25,6 @@ //! This plan uses the [OneSideHashJoiner] object to facilitate join calculations //! for both its children. -use std::collections::{HashMap, VecDeque}; use std::fmt; use std::fmt::Debug; use std::sync::Arc; @@ -33,29 +32,15 @@ use std::task::Poll; use std::vec; use std::{any::Any, usize}; -use ahash::RandomState; -use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder}; -use arrow::compute::concat_batches; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder}; -use arrow_array::{UInt32Array, UInt64Array}; -use datafusion_physical_expr::hash_utils::create_hashes; -use datafusion_physical_expr::PhysicalExpr; -use futures::stream::{select, BoxStream}; -use futures::{Stream, StreamExt}; -use hashbrown::HashSet; -use parking_lot::Mutex; -use smallvec::smallvec; - -use datafusion_execution::memory_pool::MemoryConsumer; -use datafusion_physical_expr::intervals::ExprIntervalGraph; - use crate::physical_plan::common::SharedMemoryReservation; +use crate::physical_plan::joins::hash_join::{ + build_equal_condition_join_indices, update_hash, +}; use crate::physical_plan::joins::hash_join_utils::{ build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, get_pruning_semi_indices, record_visited_indices, IntervalCalculatorInnerState, + PruningJoinHashMap, }; use crate::physical_plan::joins::StreamJoinPartitionMode; use crate::physical_plan::DisplayAs; @@ -74,14 +59,23 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; + +use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder}; +use arrow::compute::concat_batches; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use datafusion_common::utils::bisect; use datafusion_common::{internal_err, plan_err, JoinType}; use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; +use datafusion_physical_expr::intervals::ExprIntervalGraph; -use super::hash_join::equal_rows; -use super::hash_join_utils::SymmetricJoinHashMap; -use super::utils::apply_join_filter_to_indices; +use ahash::RandomState; +use futures::stream::{select, BoxStream}; +use futures::{Stream, StreamExt}; +use hashbrown::HashSet; +use parking_lot::Mutex; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; @@ -620,41 +614,6 @@ impl Stream for SymmetricHashJoinStream { } } -fn prune_hash_values( - prune_length: usize, - hashmap: &mut SymmetricJoinHashMap, - row_hash_values: &mut VecDeque, - offset: u64, -) -> Result<()> { - // Create a (hash)-(row number set) map - let mut hash_value_map: HashMap> = HashMap::new(); - for index in 0..prune_length { - let hash_value = row_hash_values.pop_front().unwrap(); - if let Some(set) = hash_value_map.get_mut(&hash_value) { - set.insert(offset + index as u64); - } else { - let mut set = HashSet::new(); - set.insert(offset + index as u64); - hash_value_map.insert(hash_value, set); - } - } - for (hash_value, index_set) in hash_value_map.iter() { - if let Some((_, separation_chain)) = hashmap - .0 - .get_mut(*hash_value, |(hash, _)| hash_value == hash) - { - separation_chain.retain(|n| !index_set.contains(n)); - if separation_chain.is_empty() { - hashmap - .0 - .remove_entry(*hash_value, |(hash, _)| hash_value == hash); - } - } - } - hashmap.shrink_if_necessary(HASHMAP_SHRINK_SCALE_FACTOR); - Ok(()) -} - /// Determine the pruning length for `buffer`. /// /// This function evaluates the build side filter expression, converts the @@ -834,144 +793,6 @@ pub(crate) fn build_side_determined_results( } } -/// Gets build and probe indices which satisfy the on condition (including -/// the equality condition and the join filter) in the join. -#[allow(clippy::too_many_arguments)] -pub fn build_join_indices( - probe_batch: &RecordBatch, - build_hashmap: &SymmetricJoinHashMap, - build_input_buffer: &RecordBatch, - on_build: &[Column], - on_probe: &[Column], - filter: Option<&JoinFilter>, - random_state: &RandomState, - null_equals_null: bool, - hashes_buffer: &mut Vec, - offset: Option, - build_side: JoinSide, -) -> Result<(UInt64Array, UInt32Array)> { - // Get the indices that satisfy the equality condition, like `left.a1 = right.a2` - let (build_indices, probe_indices) = build_equal_condition_join_indices( - build_hashmap, - build_input_buffer, - probe_batch, - on_build, - on_probe, - random_state, - null_equals_null, - hashes_buffer, - offset, - )?; - if let Some(filter) = filter { - // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` - apply_join_filter_to_indices( - build_input_buffer, - probe_batch, - build_indices, - probe_indices, - filter, - build_side, - ) - } else { - Ok((build_indices, probe_indices)) - } -} - -// Returns build/probe indices satisfying the equality condition. -// On LEFT.b1 = RIGHT.b2 -// LEFT Table: -// a1 b1 c1 -// 1 1 10 -// 3 3 30 -// 5 5 50 -// 7 7 70 -// 9 8 90 -// 11 8 110 -// 13 10 130 -// RIGHT Table: -// a2 b2 c2 -// 2 2 20 -// 4 4 40 -// 6 6 60 -// 8 8 80 -// 10 10 100 -// 12 10 120 -// The result is -// "+----+----+-----+----+----+-----+", -// "| a1 | b1 | c1 | a2 | b2 | c2 |", -// "+----+----+-----+----+----+-----+", -// "| 11 | 8 | 110 | 8 | 8 | 80 |", -// "| 13 | 10 | 130 | 10 | 10 | 100 |", -// "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", -// "+----+----+-----+----+----+-----+" -// And the result of build and probe indices are: -// Build indices: 5, 6, 6, 4 -// Probe indices: 3, 4, 5, 3 -#[allow(clippy::too_many_arguments)] -pub fn build_equal_condition_join_indices( - build_hashmap: &SymmetricJoinHashMap, - build_input_buffer: &RecordBatch, - probe_batch: &RecordBatch, - build_on: &[Column], - probe_on: &[Column], - random_state: &RandomState, - null_equals_null: bool, - hashes_buffer: &mut Vec, - offset: Option, -) -> Result<(UInt64Array, UInt32Array)> { - let keys_values = probe_on - .iter() - .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) - .collect::>>()?; - let build_join_values = build_on - .iter() - .map(|c| { - Ok(c.evaluate(build_input_buffer)? - .into_array(build_input_buffer.num_rows())) - }) - .collect::>>()?; - hashes_buffer.clear(); - hashes_buffer.resize(probe_batch.num_rows(), 0); - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // Using a buffer builder to avoid slower normal builder - let mut build_indices = UInt64BufferBuilder::new(0); - let mut probe_indices = UInt32BufferBuilder::new(0); - let offset_value = offset.unwrap_or(0); - // Visit all of the probe rows - for (row, hash_value) in hash_values.iter().enumerate() { - // Get the hash and find it in the build index - // For every item on the build and probe we check if it matches - // This possibly contains rows with hash collisions, - // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap - .0 - .get(*hash_value, |(hash, _)| *hash_value == *hash) - { - for &i in indices { - // Check hash collisions - let offset_build_index = i as usize - offset_value; - // Check hash collisions - if equal_rows( - offset_build_index, - row, - &build_join_values, - &keys_values, - null_equals_null, - )? { - build_indices.append(offset_build_index as u64); - probe_indices.append(row as u32); - } - } - } - } - - Ok(( - PrimitiveArray::new(build_indices.finish().into(), None), - PrimitiveArray::new(probe_indices.finish().into(), None), - )) -} - /// This method performs a join between the build side input buffer and the probe side batch. /// /// # Arguments @@ -1006,18 +827,18 @@ pub(crate) fn join_with_probe_batch( if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(None); } - let (build_indices, probe_indices) = build_join_indices( - probe_batch, + let (build_indices, probe_indices) = build_equal_condition_join_indices( &build_hash_joiner.hashmap, &build_hash_joiner.input_buffer, + probe_batch, &build_hash_joiner.on, &probe_hash_joiner.on, - filter, random_state, null_equals_null, &mut build_hash_joiner.hashes_buffer, - Some(build_hash_joiner.deleted_offset), + filter, build_hash_joiner.build_side, + Some(build_hash_joiner.deleted_offset), )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( @@ -1063,9 +884,7 @@ pub struct OneSideHashJoiner { /// Columns from the side pub(crate) on: Vec, /// Hashmap - pub(crate) hashmap: SymmetricJoinHashMap, - /// To optimize hash deleting in case of pruning, we hold them in memory - row_hash_values: VecDeque, + pub(crate) hashmap: PruningJoinHashMap, /// Reuse the hashes buffer pub(crate) hashes_buffer: Vec, /// Matched rows @@ -1084,7 +903,6 @@ impl OneSideHashJoiner { size += self.input_buffer.get_array_memory_size(); size += std::mem::size_of_val(&self.on); size += self.hashmap.size(); - size += self.row_hash_values.capacity() * std::mem::size_of::(); size += self.hashes_buffer.capacity() * std::mem::size_of::(); size += self.visited_rows.capacity() * std::mem::size_of::(); size += std::mem::size_of_val(&self.offset); @@ -1096,8 +914,7 @@ impl OneSideHashJoiner { build_side, input_buffer: RecordBatch::new_empty(schema), on, - hashmap: SymmetricJoinHashMap::with_capacity(0), - row_hash_values: VecDeque::new(), + hashmap: PruningJoinHashMap::with_capacity(0), hashes_buffer: vec![], visited_rows: HashSet::new(), offset: 0, @@ -1105,39 +922,6 @@ impl OneSideHashJoiner { } } - pub fn update_hash( - on: &[Column], - batch: &RecordBatch, - hash_map: &mut SymmetricJoinHashMap, - offset: usize, - random_state: &RandomState, - hashes_buffer: &mut Vec, - ) -> Result<()> { - // evaluate the keys - let keys_values = on - .iter() - .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) - .collect::>>()?; - // calculate the hash values - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // insert hashes to key of the hashmap - for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .0 - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, indices)) = item { - indices.push((row + offset) as u64); - } else { - hash_map.0.insert( - *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), - |(hash, _)| *hash, - ); - } - } - Ok(()) - } - /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch. /// /// # Arguments @@ -1159,16 +943,15 @@ impl OneSideHashJoiner { self.hashes_buffer.resize(batch.num_rows(), 0); // Get allocation_info before adding the item // Update the hashmap with the join key values and hashes of the incoming batch: - Self::update_hash( + update_hash( &self.on, batch, &mut self.hashmap, self.offset, random_state, &mut self.hashes_buffer, + self.deleted_offset, )?; - // Add the hashes buffer to the hash value deque: - self.row_hash_values.extend(self.hashes_buffer.iter()); Ok(()) } @@ -1228,11 +1011,10 @@ impl OneSideHashJoiner { pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> { // Prune the hash values: - prune_hash_values( + self.hashmap.prune_hash_values( prune_length, - &mut self.hashmap, - &mut self.row_hash_values, self.deleted_offset as u64, + HASHMAP_SHRINK_SCALE_FACTOR, )?; // Remove pruned rows from the visited rows set: for row in self.deleted_offset..(self.deleted_offset + prune_length) { @@ -1448,7 +1230,7 @@ mod tests { }; use datafusion_common::ScalarValue; - const TABLE_SIZE: i32 = 100; + const TABLE_SIZE: i32 = 1000; pub async fn experiment( left: Arc, @@ -1630,6 +1412,61 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn join_all_one_ascending_numeric_v2() -> Result<()> { + let join_type = JoinType::Inner; + let cardinality = (4, 5); + let case_expr = 2; + let task_ctx = Arc::new(TaskContext::default()); + let (left_batch, right_batch) = build_sides_record_batches(1000, cardinality)?; + let left_schema = &left_batch.schema(); + let right_schema = &right_batch.schema(); + let left_sorted = vec![PhysicalSortExpr { + expr: col("la1", left_schema)?, + options: SortOptions::default(), + }]; + let right_sorted = vec![PhysicalSortExpr { + expr: col("ra1", right_schema)?, + options: SortOptions::default(), + }]; + let (left, right) = create_memory_table( + left_batch, + right_batch, + vec![left_sorted], + vec![right_sorted], + 13, + )?; + + let on = vec![( + Column::new_with_schema("lc1", left_schema)?, + Column::new_with_schema("rc1", right_schema)?, + )]; + + let intermediate_schema = Schema::new(vec![ + Field::new("left", DataType::Int32, true), + Field::new("right", DataType::Int32, true), + ]); + let filter_expr = join_expr_tests_fixture_i32( + case_expr, + col("left", &intermediate_schema)?, + col("right", &intermediate_schema)?, + ); + let column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ]; + let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema); + + experiment(left, right, Some(filter), join_type, on, task_ctx).await?; + Ok(()) + } + #[rstest] #[tokio::test(flavor = "multi_thread")] async fn join_without_sort_information(