From 55a5a7d6bf66346ddffc06b4c9d0be7463d2feba Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 21 Aug 2023 13:33:15 +0300 Subject: [PATCH 01/11] Initial for tracking --- .../physical_plan/joins/hash_join_utils.rs | 278 +++++++++++++++++- .../joins/symmetric_hash_join.rs | 207 ++++++++----- 2 files changed, 416 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 37790e6bb8a6..bb35daf3eaf8 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -18,7 +18,7 @@ //! This file contains common subroutines for regular and symmetric hash join //! related functionality, used both in join calculations and optimization rules. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::{fmt, usize}; @@ -38,10 +38,12 @@ use hashbrown::HashSet; use parking_lot::Mutex; use smallvec::SmallVec; use std::fmt::{Debug, Formatter}; +use ahash::RandomState; use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use crate::physical_plan::ExecutionPlan; use datafusion_common::Result; +use datafusion_physical_expr::hash_utils::create_hashes; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, @@ -104,6 +106,229 @@ pub struct JoinHashMap { pub next: Vec, } +pub struct JoinHashMapv2 { + // Stores hash value to head and tail + pub map: RawTable<(u64, (u64, u64))>, + // Stores indices in chained list data structure + pub list: VecDeque, +} + +impl JoinHashMapv2 { + pub(crate) fn with_capacity(capacity: usize) -> Self { + JoinHashMapv2 { + map: RawTable::with_capacity(capacity), + list: VecDeque::with_capacity(capacity), + } + } + + pub(crate) fn size(&self) -> usize{ + 0 + } +} + +pub struct JoinHashMapv3 { + // Stores hash value to head and tail + pub map: RawTable<(u64, u64)>, + // Stores indices in chained list data structure + pub list: VecDeque, +} + +impl JoinHashMapv3 { + pub(crate) fn with_capacity(capacity: usize) -> Self { + JoinHashMapv3 { + map: RawTable::with_capacity(capacity), + list: VecDeque::with_capacity(capacity), + } + } + + pub(crate) fn size(&self) -> usize{ + 0 + } +} + +#[derive(Default, Debug, Copy, Clone)] +pub struct Node { + pub(crate) prev_index: u64, + pub(crate) next_index: u64 +} + +// /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, +// /// assuming that the [RecordBatch] corresponds to the `index`th +// pub fn update_hash( +// on: &[Column], +// batch: &RecordBatch, +// hash_map: &mut JoinHashMapv2, +// random_state: &RandomState, +// hashes_buffer: &mut Vec, +// offset: usize +// ) -> Result<()> { +// // evaluate the keys +// let keys_values = on +// .iter() +// .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) +// .collect::>>()?; +// let hash_values = datafusion_physical_expr::hash_utils::create_hashes(&keys_values, random_state, hashes_buffer)?; +// hash_map.list.resize(hash_map.list.len() + batch.num_rows(), Node::default()); +// // insert hashes to key of the hashmap +// for (row, hash_value) in hash_values.iter().enumerate() { +// let item = hash_map +// .map +// .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); +// if let Some((_, indices)) = item { +// // Already exists: add index to next array +// let (_, tail) = indices; +// +// // Update previous node if exist +// hash_map.list[(*tail - 1) as usize].next_index = (row + offset + 1) as u64; +// +// // Update chained Vec at row + offset with previous value +// hash_map.list[row + offset].prev_index = *tail; +// // Store new value inside hashmap +// indices.1 = (row + offset + 1) as u64; +// // println!("hash_map.next new key {:?} indices {:?}",hash_map.list, indices); +// } else { +// // println!("hash_map.next new key {:?}",hash_map.list); +// hash_map.map.insert( +// *hash_value, +// // store the value + 1 as 0 value reserved for end of list +// (*hash_value, ((row + offset + 1) as u64, (row + offset + 1) as u64)), +// |(hash, _)| *hash, +// ); +// // chained list at (row + offset) is already initialized with 0 +// // meaning end of list +// } +// } +// Ok(()) +// } +// +// #[allow(clippy::too_many_arguments)] +// pub fn build_equal_condition_join_indices_v2( +// build_hashmap: &JoinHashMapv2, +// build_input_buffer: &RecordBatch, +// probe_batch: &RecordBatch, +// build_on: &[Column], +// probe_on: &[Column], +// random_state: &RandomState, +// null_equals_null: bool, +// hashes_buffer: &mut Vec, +// filter: Option<&JoinFilter>, +// build_side: JoinSide, +// deleted_offset: Option, +// ) -> Result<(arrow_array::UInt64Array, arrow_array::UInt32Array)> { +// let keys_values = probe_on +// .iter() +// .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) +// .collect::>>()?; +// let build_join_values = build_on +// .iter() +// .map(|c| { +// Ok(c.evaluate(build_input_buffer)? +// .into_array(build_input_buffer.num_rows())) +// }) +// .collect::>>()?; +// hashes_buffer.clear(); +// hashes_buffer.resize(probe_batch.num_rows(), 0); +// let offset_value = deleted_offset.unwrap_or(0); +// let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; +// // Using a buffer builder to avoid slower normal builder +// let mut build_indices = arrow_array::builder::UInt64BufferBuilder::new(0); +// let mut probe_indices = arrow_array::builder::UInt32BufferBuilder::new(0); +// // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: +// // Build Indices: [5, 4, 3] +// // Probe Indices: [1, 1, 1] +// // +// // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. +// // Let's consider probe rows [0,1] as an example: +// // +// // When the probe iteration sequence is reversed, the following pairings can be derived: +// // +// // For probe row 1: +// // (5, 1) +// // (4, 1) +// // (3, 1) +// // +// // For probe row 0: +// // (5, 0) +// // (4, 0) +// // (3, 0) +// // +// // After reversing both sets of indices, we obtain reversed indices: +// // +// // (3,0) +// // (4,0) +// // (5,0) +// // (3,1) +// // (4,1) +// // (5,1) +// // +// // With this approach, the lexicographic order on both the probe side and the build side is preserved. +// for (row, hash_value) in hash_values.iter().enumerate().rev() { +// // Get the hash and find it in the build index +// +// // For every item on the build and probe we check if it matches +// // This possibly contains rows with hash collisions, +// // So we have to check here whether rows are equal or not +// if let Some((_, index)) = build_hashmap +// .map +// .get(*hash_value, |(hash, _)| *hash_value == *hash) +// { +// let mut i = *index - 1; +// loop { +// build_indices.append(i - offset_value as u64); +// probe_indices.append(row as u32); +// // Follow the chain to get the next index value +// let next = build_hashmap.list[i as usize].prev_index; +// if next == 0 { +// // end of list +// break; +// } +// i = next - 1; +// } +// } +// } +// // Reversing both sets of indices +// build_indices.as_slice_mut().reverse(); +// probe_indices.as_slice_mut().reverse(); +// +// let left: arrow_array::UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); +// let right: arrow_array::UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); +// +// let (left, right) = if let Some(filter) = filter { +// // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` +// crate::physical_plan::joins::utils::apply_join_filter_to_indices( +// build_input_buffer, +// probe_batch, +// left, +// right, +// filter, +// build_side, +// )? +// } else { +// (left, right) +// }; +// +// crate::physical_plan::joins::hash_join::equal_rows_arr( +// &left, +// &right, +// &build_join_values, +// &keys_values, +// null_equals_null, +// ) +// } +// +fn prune_hash_values( + prune_length: usize, + hashmap: &mut JoinHashMapv3, + row_hash_values: &mut VecDeque, + deleting_offset: u64, +) -> Result<()> { + // Create a (hash)-(row number set) map + for _ in 0..prune_length { + hashmap.list.pop_front() + } + Ok(()) +} + /// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate /// and shrink the indices. pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); @@ -684,6 +909,8 @@ pub mod tests { use datafusion_physical_expr::expressions::{binary, cast, col, lit}; use smallvec::smallvec; use std::sync::Arc; + use arrow_array::{Float64Array, UInt32Array}; + use crate::test::build_table_i32; /// Filter expr for a + b > c + 10 AND a + b < c + 100 pub(crate) fn complicated_filter( @@ -1056,4 +1283,53 @@ pub mod tests { assert!(join_hash_map.0.capacity() >= new_expected_capacity); assert!(join_hash_map.0.capacity() <= old_capacity); } + + fn create_test_schema() -> Result { + let a = Field::new("a", DataType::Int32, true); + let b = Field::new("b", DataType::Int32, false); + let c = Field::new("c", DataType::Int32, true); + + let schema = Arc::new(Schema::new(vec![a, b, c])); + Ok(schema) + } + + // #[test] + // fn update_sym_hash() -> Result<()>{ + // + // let left = build_table_i32( + // ("a", &vec![1, 1]), + // ("x", &vec![100, 200]), + // ("y", &vec![200, 300]), + // ); + // + // let right = build_table_i32( + // ("a", &vec![10, 20]), + // ("b", &vec![0, 0]), + // ("c", &vec![30, 40]), + // ); + // + // let on = vec![ + // Column::new_with_schema("a", &left.schema())?, + // ]; + // + // let random_state = RandomState::with_seeds(0, 0, 0, 0); + // let hashes_buff = &mut vec![0; left.num_rows()]; + // let mut hash_map = JoinHashMapv2{ + // map: RawTable::with_capacity(2), + // list: VecDeque::new(), + // }; + // let mut offset = 0; + // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); + // offset += left.num_rows(); + // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); + // offset += left.num_rows(); + // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); + // offset += left.num_rows(); + // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); + // + // + // Ok(()) + // + // + // } } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 3309a39b61c3..36652c866ec9 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -52,11 +52,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::intervals::ExprIntervalGraph; use crate::physical_plan::common::SharedMemoryReservation; -use crate::physical_plan::joins::hash_join_utils::{ - build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, - convert_sort_expr_with_filter_schema, get_pruning_anti_indices, - get_pruning_semi_indices, record_visited_indices, IntervalCalculatorInnerState, -}; +use crate::physical_plan::joins::hash_join_utils::{build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, get_pruning_semi_indices, record_visited_indices, IntervalCalculatorInnerState, JoinHashMapv2, Node}; use crate::physical_plan::joins::StreamJoinPartitionMode; use crate::physical_plan::DisplayAs; use crate::physical_plan::{ @@ -622,7 +618,7 @@ impl Stream for SymmetricHashJoinStream { fn prune_hash_values( prune_length: usize, - hashmap: &mut SymmetricJoinHashMap, + hashmap: &mut JoinHashMapv2, row_hash_values: &mut VecDeque, offset: u64, ) -> Result<()> { @@ -638,23 +634,40 @@ fn prune_hash_values( hash_value_map.insert(hash_value, set); } } + let mut removed_hashes = vec![]; for (hash_value, index_set) in hash_value_map.iter() { - if let Some((_, separation_chain)) = hashmap - .0 + if let Some((_, indices)) = hashmap + .map .get_mut(*hash_value, |(hash, _)| hash_value == hash) { - separation_chain.retain(|n| !index_set.contains(n)); - if separation_chain.is_empty() { - hashmap - .0 - .remove_entry(*hash_value, |(hash, _)| hash_value == hash); + let (start_index, _) = indices; + let mut start_index_temp = *start_index as usize - 1; + let prune_lengtg_per_key = index_set.len(); + for _ in 0..prune_lengtg_per_key { + let next_index = hashmap.list[start_index_temp].next_index; + if next_index == 0 { + removed_hashes.push(*hash_value); + break; + }else { + hashmap.list[start_index_temp].next_index = 0; + start_index_temp = next_index as usize - 1; + } } + indices.0 = start_index_temp as u64 + 1; + // Update the starting index. } } - hashmap.shrink_if_necessary(HASHMAP_SHRINK_SCALE_FACTOR); + + for hash_value in removed_hashes{ + hashmap.map.remove_entry(hash_value, |(hash, _)| hash_value == *hash); + } + + for _ in 0..prune_length { + hashmap.list.pop_front(); + + } Ok(()) } - /// Determine the pruning length for `buffer`. /// /// This function evaluates the build side filter expression, converts the @@ -839,7 +852,7 @@ pub(crate) fn build_side_determined_results( #[allow(clippy::too_many_arguments)] pub fn build_join_indices( probe_batch: &RecordBatch, - build_hashmap: &SymmetricJoinHashMap, + build_hashmap: &JoinHashMapv2, build_input_buffer: &RecordBatch, on_build: &[Column], on_probe: &[Column], @@ -851,7 +864,7 @@ pub fn build_join_indices( build_side: JoinSide, ) -> Result<(UInt64Array, UInt32Array)> { // Get the indices that satisfy the equality condition, like `left.a1 = right.a2` - let (build_indices, probe_indices) = build_equal_condition_join_indices( + build_equal_condition_join_indices( build_hashmap, build_input_buffer, probe_batch, @@ -860,21 +873,11 @@ pub fn build_join_indices( random_state, null_equals_null, hashes_buffer, + filter, + build_side, offset, - )?; - if let Some(filter) = filter { - // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` - apply_join_filter_to_indices( - build_input_buffer, - probe_batch, - build_indices, - probe_indices, - filter, - build_side, - ) - } else { - Ok((build_indices, probe_indices)) - } + + ) } // Returns build/probe indices satisfying the equality condition. @@ -910,7 +913,7 @@ pub fn build_join_indices( // Probe indices: 3, 4, 5, 3 #[allow(clippy::too_many_arguments)] pub fn build_equal_condition_join_indices( - build_hashmap: &SymmetricJoinHashMap, + build_hashmap: &JoinHashMapv2, build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, build_on: &[Column], @@ -918,7 +921,9 @@ pub fn build_equal_condition_join_indices( random_state: &RandomState, null_equals_null: bool, hashes_buffer: &mut Vec, - offset: Option, + filter: Option<&JoinFilter>, + build_side: JoinSide, + deleted_offset: Option, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -933,43 +938,93 @@ pub fn build_equal_condition_join_indices( .collect::>>()?; hashes_buffer.clear(); hashes_buffer.resize(probe_batch.num_rows(), 0); + let offset_value = deleted_offset.unwrap_or(0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // Using a buffer builder to avoid slower normal builder - let mut build_indices = UInt64BufferBuilder::new(0); - let mut probe_indices = UInt32BufferBuilder::new(0); - let offset_value = offset.unwrap_or(0); - // Visit all of the probe rows - for (row, hash_value) in hash_values.iter().enumerate() { + let mut build_indices = arrow_array::builder::UInt64BufferBuilder::new(0); + let mut probe_indices = arrow_array::builder::UInt32BufferBuilder::new(0); + // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: + // Build Indices: [5, 4, 3] + // Probe Indices: [1, 1, 1] + // + // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. + // Let's consider probe rows [0,1] as an example: + // + // When the probe iteration sequence is reversed, the following pairings can be derived: + // + // For probe row 1: + // (5, 1) + // (4, 1) + // (3, 1) + // + // For probe row 0: + // (5, 0) + // (4, 0) + // (3, 0) + // + // After reversing both sets of indices, we obtain reversed indices: + // + // (3,0) + // (4,0) + // (5,0) + // (3,1) + // (4,1) + // (5,1) + // + // With this approach, the lexicographic order on both the probe side and the build side is preserved. + for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index + // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap - .0 + if let Some((_, index)) = build_hashmap + .map .get(*hash_value, |(hash, _)| *hash_value == *hash) { - for &i in indices { - // Check hash collisions - let offset_build_index = i as usize - offset_value; - // Check hash collisions - if equal_rows( - offset_build_index, - row, - &build_join_values, - &keys_values, - null_equals_null, - )? { - build_indices.append(offset_build_index as u64); - probe_indices.append(row as u32); + let (prev_index, _) = index; + let mut i = *prev_index - 1; + loop { + build_indices.append(i - offset_value as u64); + probe_indices.append(row as u32); + // Follow the chain to get the next index value + let next = build_hashmap.list[i as usize].prev_index; + if next == 0 { + // end of list + break; } + i = next - 1; } } } + // Reversing both sets of indices + build_indices.as_slice_mut().reverse(); + probe_indices.as_slice_mut().reverse(); + + let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); + let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); + + let (left, right) = if let Some(filter) = filter { + // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` + apply_join_filter_to_indices( + build_input_buffer, + probe_batch, + left, + right, + filter, + build_side, + )? + } else { + (left, right) + }; - Ok(( - PrimitiveArray::new(build_indices.finish().into(), None), - PrimitiveArray::new(probe_indices.finish().into(), None), - )) + crate::physical_plan::joins::hash_join::equal_rows_arr( + &left, + &right, + &build_join_values, + &keys_values, + null_equals_null, + ) } /// This method performs a join between the build side input buffer and the probe side batch. @@ -1063,7 +1118,7 @@ pub struct OneSideHashJoiner { /// Columns from the side pub(crate) on: Vec, /// Hashmap - pub(crate) hashmap: SymmetricJoinHashMap, + pub(crate) hashmap: JoinHashMapv2, /// To optimize hash deleting in case of pruning, we hold them in memory row_hash_values: VecDeque, /// Reuse the hashes buffer @@ -1096,7 +1151,7 @@ impl OneSideHashJoiner { build_side, input_buffer: RecordBatch::new_empty(schema), on, - hashmap: SymmetricJoinHashMap::with_capacity(0), + hashmap: JoinHashMapv2::with_capacity(0), row_hash_values: VecDeque::new(), hashes_buffer: vec![], visited_rows: HashSet::new(), @@ -1105,34 +1160,50 @@ impl OneSideHashJoiner { } } + /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, + /// assuming that the [RecordBatch] corresponds to the `index`th pub fn update_hash( on: &[Column], batch: &RecordBatch, - hash_map: &mut SymmetricJoinHashMap, - offset: usize, + hash_map: &mut JoinHashMapv2, random_state: &RandomState, hashes_buffer: &mut Vec, + offset: usize ) -> Result<()> { // evaluate the keys let keys_values = on .iter() .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - // calculate the hash values - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + let hash_values = datafusion_physical_expr::hash_utils::create_hashes(&keys_values, random_state, hashes_buffer)?; + hash_map.list.resize(hash_map.list.len() + batch.num_rows(), Node::default()); // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { let item = hash_map - .0 + .map .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, indices)) = item { - indices.push((row + offset) as u64); + // Already exists: add index to next array + let (_, tail) = indices; + + // Update previous node if exist + hash_map.list[(*tail - 1) as usize].next_index = (row + offset + 1) as u64; + + // Update chained Vec at row + offset with previous value + hash_map.list[row + offset].prev_index = *tail; + // Store new value inside hashmap + indices.1 = (row + offset + 1) as u64; + // println!("hash_map.next new key {:?} indices {:?}",hash_map.list, indices); } else { - hash_map.0.insert( + // println!("hash_map.next new key {:?}",hash_map.list); + hash_map.map.insert( *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), + // store the value + 1 as 0 value reserved for end of list + (*hash_value, ((row + offset + 1) as u64, (row + offset + 1) as u64)), |(hash, _)| *hash, ); + // chained list at (row + offset) is already initialized with 0 + // meaning end of list } } Ok(()) @@ -1163,9 +1234,9 @@ impl OneSideHashJoiner { &self.on, batch, &mut self.hashmap, - self.offset, random_state, &mut self.hashes_buffer, + self.offset, )?; // Add the hashes buffer to the hash value deque: self.row_hash_values.extend(self.hashes_buffer.iter()); From 03d6bcd85f9a7d1a6db77de35c1d5b4b5e1859e9 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 21 Aug 2023 15:44:19 +0300 Subject: [PATCH 02/11] Before clippy --- .../core/src/physical_plan/joins/hash_join.rs | 346 +------------- .../physical_plan/joins/hash_join_utils.rs | 434 ++++++------------ .../joins/symmetric_hash_join.rs | 247 +++++----- .../src/physical_plan/joins/test_utils.rs | 2 +- 4 files changed, 240 insertions(+), 789 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index ff341ae2116d..50d04f9323b6 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -58,24 +58,17 @@ use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder}; use arrow::record_batch::RecordBatch; use arrow::{ array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array, Date64Array, - Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, - PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, - UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, - UInt8Array, + Array, ArrayRef, BooleanArray, BooleanBufferBuilder, + PrimitiveArray, UInt32Array, UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, }, datatypes::{ - ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema, - SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + DataType, Schema, + SchemaRef }, util::bit_util, }; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; -use datafusion_common::cast::{as_dictionary_array, as_string_array}; use datafusion_common::{internal_err, plan_err, DataFusionError, JoinType, Result}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -835,337 +828,6 @@ pub fn build_equal_condition_join_indices( ) } -macro_rules! equal_rows_elem { - ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{ - let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap(); - let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap(); - - match (left_array.is_null($left), right_array.is_null($right)) { - (false, false) => left_array.value($left) == right_array.value($right), - (true, true) => $null_equals_null, - _ => false, - } - }}; -} - -macro_rules! equal_rows_elem_with_string_dict { - ($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{ - let left_array: &DictionaryArray<$key_array_type> = - as_dictionary_array::<$key_array_type>($l).unwrap(); - let right_array: &DictionaryArray<$key_array_type> = - as_dictionary_array::<$key_array_type>($r).unwrap(); - - let (left_values, left_values_index) = { - let keys_col = left_array.keys(); - if keys_col.is_valid($left) { - let values_index = keys_col - .value($left) - .to_usize() - .expect("Can not convert index to usize in dictionary"); - - ( - as_string_array(left_array.values()).unwrap(), - Some(values_index), - ) - } else { - (as_string_array(left_array.values()).unwrap(), None) - } - }; - let (right_values, right_values_index) = { - let keys_col = right_array.keys(); - if keys_col.is_valid($right) { - let values_index = keys_col - .value($right) - .to_usize() - .expect("Can not convert index to usize in dictionary"); - - ( - as_string_array(right_array.values()).unwrap(), - Some(values_index), - ) - } else { - (as_string_array(right_array.values()).unwrap(), None) - } - }; - - match (left_values_index, right_values_index) { - (Some(left_values_index), Some(right_values_index)) => { - left_values.value(left_values_index) - == right_values.value(right_values_index) - } - (None, None) => $null_equals_null, - _ => false, - } - }}; -} - -/// Left and right row have equal values -/// If more data types are supported here, please also add the data types in can_hash function -/// to generate hash join logical plan. -pub fn equal_rows( - left: usize, - right: usize, - left_arrays: &[ArrayRef], - right_arrays: &[ArrayRef], - null_equals_null: bool, -) -> Result { - let mut err = None; - let res = left_arrays - .iter() - .zip(right_arrays) - .all(|(l, r)| match l.data_type() { - DataType::Null => { - // lhs and rhs are both `DataType::Null`, so the equal result - // is dependent on `null_equals_null` - null_equals_null - } - DataType::Boolean => { - equal_rows_elem!(BooleanArray, l, r, left, right, null_equals_null) - } - DataType::Int8 => { - equal_rows_elem!(Int8Array, l, r, left, right, null_equals_null) - } - DataType::Int16 => { - equal_rows_elem!(Int16Array, l, r, left, right, null_equals_null) - } - DataType::Int32 => { - equal_rows_elem!(Int32Array, l, r, left, right, null_equals_null) - } - DataType::Int64 => { - equal_rows_elem!(Int64Array, l, r, left, right, null_equals_null) - } - DataType::UInt8 => { - equal_rows_elem!(UInt8Array, l, r, left, right, null_equals_null) - } - DataType::UInt16 => { - equal_rows_elem!(UInt16Array, l, r, left, right, null_equals_null) - } - DataType::UInt32 => { - equal_rows_elem!(UInt32Array, l, r, left, right, null_equals_null) - } - DataType::UInt64 => { - equal_rows_elem!(UInt64Array, l, r, left, right, null_equals_null) - } - DataType::Float32 => { - equal_rows_elem!(Float32Array, l, r, left, right, null_equals_null) - } - DataType::Float64 => { - equal_rows_elem!(Float64Array, l, r, left, right, null_equals_null) - } - DataType::Date32 => { - equal_rows_elem!(Date32Array, l, r, left, right, null_equals_null) - } - DataType::Date64 => { - equal_rows_elem!(Date64Array, l, r, left, right, null_equals_null) - } - DataType::Time32(time_unit) => match time_unit { - TimeUnit::Second => { - equal_rows_elem!(Time32SecondArray, l, r, left, right, null_equals_null) - } - TimeUnit::Millisecond => { - equal_rows_elem!(Time32MillisecondArray, l, r, left, right, null_equals_null) - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - DataType::Time64(time_unit) => match time_unit { - TimeUnit::Microsecond => { - equal_rows_elem!(Time64MicrosecondArray, l, r, left, right, null_equals_null) - } - TimeUnit::Nanosecond => { - equal_rows_elem!(Time64NanosecondArray, l, r, left, right, null_equals_null) - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => { - equal_rows_elem!( - TimestampSecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Millisecond => { - equal_rows_elem!( - TimestampMillisecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Microsecond => { - equal_rows_elem!( - TimestampMicrosecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - TimeUnit::Nanosecond => { - equal_rows_elem!( - TimestampNanosecondArray, - l, - r, - left, - right, - null_equals_null - ) - } - }, - DataType::Utf8 => { - equal_rows_elem!(StringArray, l, r, left, right, null_equals_null) - } - DataType::LargeUtf8 => { - equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) - } - DataType::FixedSizeBinary(_) => { - equal_rows_elem!(FixedSizeBinaryArray, l, r, left, right, null_equals_null) - } - DataType::Decimal128(_, lscale) => match r.data_type() { - DataType::Decimal128(_, rscale) => { - if lscale == rscale { - equal_rows_elem!( - Decimal128Array, - l, - r, - left, - right, - null_equals_null - ) - } else { - err = Some(internal_err!( - "Inconsistent Decimal data type in hasher, the scale should be same" - )); - false - } - } - _ => { - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - }, - DataType::Dictionary(key_type, value_type) - if *value_type.as_ref() == DataType::Utf8 => - { - match key_type.as_ref() { - DataType::Int8 => { - equal_rows_elem_with_string_dict!( - Int8Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int16 => { - equal_rows_elem_with_string_dict!( - Int16Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int32 => { - equal_rows_elem_with_string_dict!( - Int32Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::Int64 => { - equal_rows_elem_with_string_dict!( - Int64Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt8 => { - equal_rows_elem_with_string_dict!( - UInt8Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt16 => { - equal_rows_elem_with_string_dict!( - UInt16Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt32 => { - equal_rows_elem_with_string_dict!( - UInt32Type, - l, - r, - left, - right, - null_equals_null - ) - } - DataType::UInt64 => { - equal_rows_elem_with_string_dict!( - UInt64Type, - l, - r, - left, - right, - null_equals_null - ) - } - _ => { - // should not happen - err = Some(internal_err!( - "Unsupported data type in hasher" - )); - false - } - } - } - other => { - // This is internal because we should have caught this before. - err = Some(internal_err!( - "Unsupported data type in hasher: {other}" - )); - false - } - }); - - err.unwrap_or(Ok(res)) -} // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index bb35daf3eaf8..6f3ad5fa2237 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -36,14 +36,11 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; use hashbrown::HashSet; use parking_lot::Mutex; -use smallvec::SmallVec; use std::fmt::{Debug, Formatter}; -use ahash::RandomState; use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use crate::physical_plan::ExecutionPlan; use datafusion_common::Result; -use datafusion_physical_expr::hash_utils::create_hashes; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, @@ -100,279 +97,156 @@ use datafusion_physical_expr::hash_utils::create_hashes; // TODO: speed up collision checks // https://github.com/apache/arrow-datafusion/issues/50 pub struct JoinHashMap { - // Stores hash value to first index + // Stores hash value to last row index pub map: RawTable<(u64, u64)>, // Stores indices in chained list data structure pub next: Vec, } -pub struct JoinHashMapv2 { - // Stores hash value to head and tail - pub map: RawTable<(u64, (u64, u64))>, - // Stores indices in chained list data structure - pub list: VecDeque, -} - -impl JoinHashMapv2 { +impl JoinHashMap { pub(crate) fn with_capacity(capacity: usize) -> Self { - JoinHashMapv2 { + JoinHashMap { map: RawTable::with_capacity(capacity), - list: VecDeque::with_capacity(capacity), + next: vec![0; capacity], } } +} - pub(crate) fn size(&self) -> usize{ - 0 +impl fmt::Debug for JoinHashMap { + fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + Ok(()) } } -pub struct JoinHashMapv3 { - // Stores hash value to head and tail +/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with the capability of pruning +/// elements in an efficient manner. This structure is particularly useful for cases where +/// it's necessary to remove elements from the map based on their buffer order. +/// +/// +/// # Example +/// +/// ``` text +/// Let's continue the example of `JoinHashMap` and then show how `PruningJoinHashMap` would handle the pruning scenario. +/// +/// Insert the pair (1,4) into the `PruningJoinHashMap`: +/// map: +/// --------- +/// | 1 | 5 | +/// | 2 | 3 | +/// --------- +/// list: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) +/// --------------------- +/// +/// Now, let's prune 3 rows from `PruningJoinHashMap`: +/// map: +/// --------- +/// | 1 | 5 | +/// --------- +/// list: +/// --------- +/// | 2 | 4 | <--- hash value 1 maps to 2 (5 - 3), 1 (4 - 3), NA (2 - 3) (which means indices values 1,0) +/// --------- +/// +/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap` since there are no values left for this key. +/// ``` +pub struct PruningJoinHashMap { + /// Stores hash value to last row index pub map: RawTable<(u64, u64)>, - // Stores indices in chained list data structure + /// Stores indices in chained list data structure pub list: VecDeque, } -impl JoinHashMapv3 { +impl PruningJoinHashMap { + /// Constructs a new `PruningJoinHashMap` with the given capacity. + /// Both the map and the list are pre-allocated with the provided capacity. + /// + /// # Arguments + /// * `capacity`: The initial capacity of the hash map. + /// + /// # Returns + /// A new instance of `PruningJoinHashMap`. pub(crate) fn with_capacity(capacity: usize) -> Self { - JoinHashMapv3 { + PruningJoinHashMap { map: RawTable::with_capacity(capacity), list: VecDeque::with_capacity(capacity), } } - pub(crate) fn size(&self) -> usize{ - 0 - } -} - -#[derive(Default, Debug, Copy, Clone)] -pub struct Node { - pub(crate) prev_index: u64, - pub(crate) next_index: u64 -} - -// /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, -// /// assuming that the [RecordBatch] corresponds to the `index`th -// pub fn update_hash( -// on: &[Column], -// batch: &RecordBatch, -// hash_map: &mut JoinHashMapv2, -// random_state: &RandomState, -// hashes_buffer: &mut Vec, -// offset: usize -// ) -> Result<()> { -// // evaluate the keys -// let keys_values = on -// .iter() -// .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) -// .collect::>>()?; -// let hash_values = datafusion_physical_expr::hash_utils::create_hashes(&keys_values, random_state, hashes_buffer)?; -// hash_map.list.resize(hash_map.list.len() + batch.num_rows(), Node::default()); -// // insert hashes to key of the hashmap -// for (row, hash_value) in hash_values.iter().enumerate() { -// let item = hash_map -// .map -// .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); -// if let Some((_, indices)) = item { -// // Already exists: add index to next array -// let (_, tail) = indices; -// -// // Update previous node if exist -// hash_map.list[(*tail - 1) as usize].next_index = (row + offset + 1) as u64; -// -// // Update chained Vec at row + offset with previous value -// hash_map.list[row + offset].prev_index = *tail; -// // Store new value inside hashmap -// indices.1 = (row + offset + 1) as u64; -// // println!("hash_map.next new key {:?} indices {:?}",hash_map.list, indices); -// } else { -// // println!("hash_map.next new key {:?}",hash_map.list); -// hash_map.map.insert( -// *hash_value, -// // store the value + 1 as 0 value reserved for end of list -// (*hash_value, ((row + offset + 1) as u64, (row + offset + 1) as u64)), -// |(hash, _)| *hash, -// ); -// // chained list at (row + offset) is already initialized with 0 -// // meaning end of list -// } -// } -// Ok(()) -// } -// -// #[allow(clippy::too_many_arguments)] -// pub fn build_equal_condition_join_indices_v2( -// build_hashmap: &JoinHashMapv2, -// build_input_buffer: &RecordBatch, -// probe_batch: &RecordBatch, -// build_on: &[Column], -// probe_on: &[Column], -// random_state: &RandomState, -// null_equals_null: bool, -// hashes_buffer: &mut Vec, -// filter: Option<&JoinFilter>, -// build_side: JoinSide, -// deleted_offset: Option, -// ) -> Result<(arrow_array::UInt64Array, arrow_array::UInt32Array)> { -// let keys_values = probe_on -// .iter() -// .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) -// .collect::>>()?; -// let build_join_values = build_on -// .iter() -// .map(|c| { -// Ok(c.evaluate(build_input_buffer)? -// .into_array(build_input_buffer.num_rows())) -// }) -// .collect::>>()?; -// hashes_buffer.clear(); -// hashes_buffer.resize(probe_batch.num_rows(), 0); -// let offset_value = deleted_offset.unwrap_or(0); -// let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; -// // Using a buffer builder to avoid slower normal builder -// let mut build_indices = arrow_array::builder::UInt64BufferBuilder::new(0); -// let mut probe_indices = arrow_array::builder::UInt32BufferBuilder::new(0); -// // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: -// // Build Indices: [5, 4, 3] -// // Probe Indices: [1, 1, 1] -// // -// // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. -// // Let's consider probe rows [0,1] as an example: -// // -// // When the probe iteration sequence is reversed, the following pairings can be derived: -// // -// // For probe row 1: -// // (5, 1) -// // (4, 1) -// // (3, 1) -// // -// // For probe row 0: -// // (5, 0) -// // (4, 0) -// // (3, 0) -// // -// // After reversing both sets of indices, we obtain reversed indices: -// // -// // (3,0) -// // (4,0) -// // (5,0) -// // (3,1) -// // (4,1) -// // (5,1) -// // -// // With this approach, the lexicographic order on both the probe side and the build side is preserved. -// for (row, hash_value) in hash_values.iter().enumerate().rev() { -// // Get the hash and find it in the build index -// -// // For every item on the build and probe we check if it matches -// // This possibly contains rows with hash collisions, -// // So we have to check here whether rows are equal or not -// if let Some((_, index)) = build_hashmap -// .map -// .get(*hash_value, |(hash, _)| *hash_value == *hash) -// { -// let mut i = *index - 1; -// loop { -// build_indices.append(i - offset_value as u64); -// probe_indices.append(row as u32); -// // Follow the chain to get the next index value -// let next = build_hashmap.list[i as usize].prev_index; -// if next == 0 { -// // end of list -// break; -// } -// i = next - 1; -// } -// } -// } -// // Reversing both sets of indices -// build_indices.as_slice_mut().reverse(); -// probe_indices.as_slice_mut().reverse(); -// -// let left: arrow_array::UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); -// let right: arrow_array::UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); -// -// let (left, right) = if let Some(filter) = filter { -// // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` -// crate::physical_plan::joins::utils::apply_join_filter_to_indices( -// build_input_buffer, -// probe_batch, -// left, -// right, -// filter, -// build_side, -// )? -// } else { -// (left, right) -// }; -// -// crate::physical_plan::joins::hash_join::equal_rows_arr( -// &left, -// &right, -// &build_join_values, -// &keys_values, -// null_equals_null, -// ) -// } -// -fn prune_hash_values( - prune_length: usize, - hashmap: &mut JoinHashMapv3, - row_hash_values: &mut VecDeque, - deleting_offset: u64, -) -> Result<()> { - // Create a (hash)-(row number set) map - for _ in 0..prune_length { - hashmap.list.pop_front() - } - Ok(()) -} - -/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate -/// and shrink the indices. -pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); - -impl JoinHashMap { - pub(crate) fn with_capacity(capacity: usize) -> Self { - JoinHashMap { - map: RawTable::with_capacity(capacity), - next: vec![0; capacity], - } - } -} - -impl SymmetricJoinHashMap { - pub(crate) fn with_capacity(capacity: usize) -> Self { - Self(RawTable::with_capacity(capacity)) - } - - /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is. - /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25% - /// when necessary. You can adjust the scale_factor value to achieve the desired - /// balance between memory usage and performance. - // - // If you increase the scale_factor, the capacity will shrink less aggressively, - // leading to potentially higher memory usage but fewer resizes. - // Conversely, if you decrease the scale_factor, the capacity will shrink more aggressively, - // potentially leading to lower memory usage but more frequent resizing. + /// Shrinks the capacity of the hash map if necessary based on the provided scale factor. + /// + /// # Arguments + /// * `scale_factor`: The scale factor that determines how conservative the shrinking strategy is. + /// The capacity will be reduced by 1/scale_factor when necessary. + /// + /// # Note + /// Increasing the scale factor results in less aggressive capacity shrinking, + /// leading to potentially higher memory usage but fewer resizes. Conversely, + /// decreasing the scale factor results in more aggressive capacity shrinking, + /// potentially leading to lower memory usage but more frequent resizing. pub(crate) fn shrink_if_necessary(&mut self, scale_factor: usize) { - let capacity = self.0.capacity(); - let len = self.0.len(); + let capacity = self.map.capacity(); + let len = self.map.len(); if capacity > scale_factor * len { let new_capacity = (capacity * (scale_factor - 1)) / scale_factor; - self.0.shrink_to(new_capacity, |(hash, _)| *hash) + // Resize the map with the new capacity. + self.map.shrink_to(new_capacity, |(hash, _)| *hash) } } + /// Calculates the size of the `PruningJoinHashMap` in bytes. + /// + /// # Returns + /// The size of the hash map in bytes. pub(crate) fn size(&self) -> usize { - self.0.allocation_info().1.size() + self.map.allocation_info().1.size() + + self.list.capacity() * std::mem::size_of::() } -} -impl fmt::Debug for JoinHashMap { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { + /// Removes hash values from the map and list based on the given pruning length and deleting offset. + /// + /// + /// + /// # Arguments + /// * `prune_length`: The number of elements to remove from the list. + /// * `deleting_offset`: The offset used to determine which hash values to remove from the map. + /// + /// # Returns + /// A `Result` indicating whether the operation was successful. + pub(crate) fn prune_hash_values( + &mut self, + prune_length: usize, + deleting_offset: u64, + shrink_factor: usize + ) -> Result<()> { + // Remove elements from the list based on the pruning length. + self.list.drain(0..prune_length); + + // Calculate the keys that should be removed from the map. + let removable_keys = unsafe { + self.map + .iter() + .map(|bucket| bucket.as_ref()) + .filter_map(|(hash, tail_index)| { + if *tail_index < prune_length as u64 + deleting_offset { + Some(*hash) + } else { + None + } + }) + .collect::>() + }; + + // Remove the keys from the map. + removable_keys.into_iter().for_each(|hash_value| { + self.map + .remove_entry(hash_value, |(hash, _)| hash_value == *hash); + }); + + // Shrink the map if necessary. + self.shrink_if_necessary(shrink_factor); Ok(()) } } @@ -907,10 +781,7 @@ pub mod tests { use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, cast, col, lit}; - use smallvec::smallvec; use std::sync::Arc; - use arrow_array::{Float64Array, UInt32Array}; - use crate::test::build_table_i32; /// Filter expr for a + b > c + 10 AND a + b < c + 100 pub(crate) fn complicated_filter( @@ -1247,89 +1118,40 @@ pub mod tests { #[test] fn test_shrink_if_necessary() { let scale_factor = 4; - let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100); + let mut join_hash_map = PruningJoinHashMap::with_capacity(100); let data_size = 2000; let deleted_part = 3 * data_size / 4; // Add elements to the JoinHashMap for hash_value in 0..data_size { - join_hash_map.0.insert( + join_hash_map.map.insert( hash_value, - (hash_value, smallvec![hash_value]), + (hash_value, hash_value), |(hash, _)| *hash, ); } - assert_eq!(join_hash_map.0.len(), data_size as usize); - assert!(join_hash_map.0.capacity() >= data_size as usize); + assert_eq!(join_hash_map.map.len(), data_size as usize); + assert!(join_hash_map.map.capacity() >= data_size as usize); // Remove some elements from the JoinHashMap for hash_value in 0..deleted_part { join_hash_map - .0 + .map .remove_entry(hash_value, |(hash, _)| hash_value == *hash); } - assert_eq!(join_hash_map.0.len(), (data_size - deleted_part) as usize); + assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as usize); // Old capacity - let old_capacity = join_hash_map.0.capacity(); + let old_capacity = join_hash_map.map.capacity(); // Test shrink_if_necessary join_hash_map.shrink_if_necessary(scale_factor); // The capacity should be reduced by the scale factor let new_expected_capacity = - join_hash_map.0.capacity() * (scale_factor - 1) / scale_factor; - assert!(join_hash_map.0.capacity() >= new_expected_capacity); - assert!(join_hash_map.0.capacity() <= old_capacity); + join_hash_map.map.capacity() * (scale_factor - 1) / scale_factor; + assert!(join_hash_map.map.capacity() >= new_expected_capacity); + assert!(join_hash_map.map.capacity() <= old_capacity); } - - fn create_test_schema() -> Result { - let a = Field::new("a", DataType::Int32, true); - let b = Field::new("b", DataType::Int32, false); - let c = Field::new("c", DataType::Int32, true); - - let schema = Arc::new(Schema::new(vec![a, b, c])); - Ok(schema) - } - - // #[test] - // fn update_sym_hash() -> Result<()>{ - // - // let left = build_table_i32( - // ("a", &vec![1, 1]), - // ("x", &vec![100, 200]), - // ("y", &vec![200, 300]), - // ); - // - // let right = build_table_i32( - // ("a", &vec![10, 20]), - // ("b", &vec![0, 0]), - // ("c", &vec![30, 40]), - // ); - // - // let on = vec![ - // Column::new_with_schema("a", &left.schema())?, - // ]; - // - // let random_state = RandomState::with_seeds(0, 0, 0, 0); - // let hashes_buff = &mut vec![0; left.num_rows()]; - // let mut hash_map = JoinHashMapv2{ - // map: RawTable::with_capacity(2), - // list: VecDeque::new(), - // }; - // let mut offset = 0; - // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); - // offset += left.num_rows(); - // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); - // offset += left.num_rows(); - // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); - // offset += left.num_rows(); - // update_hash(&on, &left, &mut hash_map, &random_state, hashes_buff, offset).unwrap(); - // - // - // Ok(()) - // - // - // } } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 36652c866ec9..79b3382a4903 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -25,7 +25,6 @@ //! This plan uses the [OneSideHashJoiner] object to facilitate join calculations //! for both its children. -use std::collections::{HashMap, VecDeque}; use std::fmt; use std::fmt::Debug; use std::sync::Arc; @@ -46,13 +45,18 @@ use futures::stream::{select, BoxStream}; use futures::{Stream, StreamExt}; use hashbrown::HashSet; use parking_lot::Mutex; -use smallvec::smallvec; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::intervals::ExprIntervalGraph; use crate::physical_plan::common::SharedMemoryReservation; -use crate::physical_plan::joins::hash_join_utils::{build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, get_pruning_semi_indices, record_visited_indices, IntervalCalculatorInnerState, JoinHashMapv2, Node}; +use crate::physical_plan::joins::hash_join::equal_rows_arr; +use crate::physical_plan::joins::hash_join_utils::{ + build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, + convert_sort_expr_with_filter_schema, get_pruning_anti_indices, + get_pruning_semi_indices, record_visited_indices, IntervalCalculatorInnerState, + PruningJoinHashMap, +}; use crate::physical_plan::joins::StreamJoinPartitionMode; use crate::physical_plan::DisplayAs; use crate::physical_plan::{ @@ -75,8 +79,6 @@ use datafusion_common::{internal_err, plan_err, JoinType}; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::TaskContext; -use super::hash_join::equal_rows; -use super::hash_join_utils::SymmetricJoinHashMap; use super::utils::apply_join_filter_to_indices; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; @@ -616,58 +618,6 @@ impl Stream for SymmetricHashJoinStream { } } -fn prune_hash_values( - prune_length: usize, - hashmap: &mut JoinHashMapv2, - row_hash_values: &mut VecDeque, - offset: u64, -) -> Result<()> { - // Create a (hash)-(row number set) map - let mut hash_value_map: HashMap> = HashMap::new(); - for index in 0..prune_length { - let hash_value = row_hash_values.pop_front().unwrap(); - if let Some(set) = hash_value_map.get_mut(&hash_value) { - set.insert(offset + index as u64); - } else { - let mut set = HashSet::new(); - set.insert(offset + index as u64); - hash_value_map.insert(hash_value, set); - } - } - let mut removed_hashes = vec![]; - for (hash_value, index_set) in hash_value_map.iter() { - if let Some((_, indices)) = hashmap - .map - .get_mut(*hash_value, |(hash, _)| hash_value == hash) - { - let (start_index, _) = indices; - let mut start_index_temp = *start_index as usize - 1; - let prune_lengtg_per_key = index_set.len(); - for _ in 0..prune_lengtg_per_key { - let next_index = hashmap.list[start_index_temp].next_index; - if next_index == 0 { - removed_hashes.push(*hash_value); - break; - }else { - hashmap.list[start_index_temp].next_index = 0; - start_index_temp = next_index as usize - 1; - } - } - indices.0 = start_index_temp as u64 + 1; - // Update the starting index. - } - } - - for hash_value in removed_hashes{ - hashmap.map.remove_entry(hash_value, |(hash, _)| hash_value == *hash); - } - - for _ in 0..prune_length { - hashmap.list.pop_front(); - - } - Ok(()) -} /// Determine the pruning length for `buffer`. /// /// This function evaluates the build side filter expression, converts the @@ -847,39 +797,6 @@ pub(crate) fn build_side_determined_results( } } -/// Gets build and probe indices which satisfy the on condition (including -/// the equality condition and the join filter) in the join. -#[allow(clippy::too_many_arguments)] -pub fn build_join_indices( - probe_batch: &RecordBatch, - build_hashmap: &JoinHashMapv2, - build_input_buffer: &RecordBatch, - on_build: &[Column], - on_probe: &[Column], - filter: Option<&JoinFilter>, - random_state: &RandomState, - null_equals_null: bool, - hashes_buffer: &mut Vec, - offset: Option, - build_side: JoinSide, -) -> Result<(UInt64Array, UInt32Array)> { - // Get the indices that satisfy the equality condition, like `left.a1 = right.a2` - build_equal_condition_join_indices( - build_hashmap, - build_input_buffer, - probe_batch, - on_build, - on_probe, - random_state, - null_equals_null, - hashes_buffer, - filter, - build_side, - offset, - - ) -} - // Returns build/probe indices satisfying the equality condition. // On LEFT.b1 = RIGHT.b2 // LEFT Table: @@ -912,24 +829,24 @@ pub fn build_join_indices( // Build indices: 5, 6, 6, 4 // Probe indices: 3, 4, 5, 3 #[allow(clippy::too_many_arguments)] -pub fn build_equal_condition_join_indices( - build_hashmap: &JoinHashMapv2, - build_input_buffer: &RecordBatch, +pub fn build_join_indices( probe_batch: &RecordBatch, - build_on: &[Column], - probe_on: &[Column], + build_hashmap: &PruningJoinHashMap, + build_input_buffer: &RecordBatch, + on_build: &[Column], + on_probe: &[Column], + filter: Option<&JoinFilter>, random_state: &RandomState, null_equals_null: bool, hashes_buffer: &mut Vec, - filter: Option<&JoinFilter>, - build_side: JoinSide, deleted_offset: Option, + build_side: JoinSide, ) -> Result<(UInt64Array, UInt32Array)> { - let keys_values = probe_on + let keys_values = on_probe .iter() .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) .collect::>>()?; - let build_join_values = build_on + let build_join_values = on_build .iter() .map(|c| { Ok(c.evaluate(build_input_buffer)? @@ -941,8 +858,8 @@ pub fn build_equal_condition_join_indices( let offset_value = deleted_offset.unwrap_or(0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // Using a buffer builder to avoid slower normal builder - let mut build_indices = arrow_array::builder::UInt64BufferBuilder::new(0); - let mut probe_indices = arrow_array::builder::UInt32BufferBuilder::new(0); + let mut build_indices = UInt64BufferBuilder::new(0); + let mut probe_indices = UInt32BufferBuilder::new(0); // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: // Build Indices: [5, 4, 3] // Probe Indices: [1, 1, 1] @@ -972,7 +889,7 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. - for (row, hash_value) in hash_values.iter().enumerate().rev() { + for (probe_row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches @@ -982,13 +899,18 @@ pub fn build_equal_condition_join_indices( .map .get(*hash_value, |(hash, _)| *hash_value == *hash) { - let (prev_index, _) = index; - let mut i = *prev_index - 1; + let mut i = *index - 1; loop { - build_indices.append(i - offset_value as u64); - probe_indices.append(row as u32); + // This arguments means that we prune the next index way before here. + if i < offset_value as u64 { + // End of the list due to pruning; + break; + } + let build_row_value = i - offset_value as u64; + build_indices.append(build_row_value); + probe_indices.append(probe_row as u32); // Follow the chain to get the next index value - let next = build_hashmap.list[i as usize].prev_index; + let next = build_hashmap.list[build_row_value as usize]; if next == 0 { // end of list break; @@ -1018,7 +940,7 @@ pub fn build_equal_condition_join_indices( (left, right) }; - crate::physical_plan::joins::hash_join::equal_rows_arr( + equal_rows_arr( &left, &right, &build_join_values, @@ -1118,9 +1040,7 @@ pub struct OneSideHashJoiner { /// Columns from the side pub(crate) on: Vec, /// Hashmap - pub(crate) hashmap: JoinHashMapv2, - /// To optimize hash deleting in case of pruning, we hold them in memory - row_hash_values: VecDeque, + pub(crate) hashmap: PruningJoinHashMap, /// Reuse the hashes buffer pub(crate) hashes_buffer: Vec, /// Matched rows @@ -1139,7 +1059,6 @@ impl OneSideHashJoiner { size += self.input_buffer.get_array_memory_size(); size += std::mem::size_of_val(&self.on); size += self.hashmap.size(); - size += self.row_hash_values.capacity() * std::mem::size_of::(); size += self.hashes_buffer.capacity() * std::mem::size_of::(); size += self.visited_rows.capacity() * std::mem::size_of::(); size += std::mem::size_of_val(&self.offset); @@ -1151,8 +1070,7 @@ impl OneSideHashJoiner { build_side, input_buffer: RecordBatch::new_empty(schema), on, - hashmap: JoinHashMapv2::with_capacity(0), - row_hash_values: VecDeque::new(), + hashmap: PruningJoinHashMap::with_capacity(0), hashes_buffer: vec![], visited_rows: HashSet::new(), offset: 0, @@ -1165,41 +1083,40 @@ impl OneSideHashJoiner { pub fn update_hash( on: &[Column], batch: &RecordBatch, - hash_map: &mut JoinHashMapv2, + hash_map: &mut PruningJoinHashMap, + offset: usize, + deleted_offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, - offset: usize ) -> Result<()> { // evaluate the keys let keys_values = on .iter() .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) .collect::>>()?; - let hash_values = datafusion_physical_expr::hash_utils::create_hashes(&keys_values, random_state, hashes_buffer)?; - hash_map.list.resize(hash_map.list.len() + batch.num_rows(), Node::default()); + + // calculate the hash values + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + hash_map + .list + .resize(hash_map.list.len() + batch.num_rows(), 0); // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { let item = hash_map .map .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, indices)) = item { + if let Some((_, index)) = item { // Already exists: add index to next array - let (_, tail) = indices; - - // Update previous node if exist - hash_map.list[(*tail - 1) as usize].next_index = (row + offset + 1) as u64; - - // Update chained Vec at row + offset with previous value - hash_map.list[row + offset].prev_index = *tail; + let prev_index = *index; // Store new value inside hashmap - indices.1 = (row + offset + 1) as u64; - // println!("hash_map.next new key {:?} indices {:?}",hash_map.list, indices); + *index = (row + offset + 1) as u64; + // Update chained Vec at row + offset with previous value + hash_map.list[row + offset - deleted_offset] = prev_index; } else { - // println!("hash_map.next new key {:?}",hash_map.list); hash_map.map.insert( *hash_value, // store the value + 1 as 0 value reserved for end of list - (*hash_value, ((row + offset + 1) as u64, (row + offset + 1) as u64)), + (*hash_value, (row + offset + 1) as u64), |(hash, _)| *hash, ); // chained list at (row + offset) is already initialized with 0 @@ -1234,12 +1151,11 @@ impl OneSideHashJoiner { &self.on, batch, &mut self.hashmap, + self.offset, + self.deleted_offset, random_state, &mut self.hashes_buffer, - self.offset, )?; - // Add the hashes buffer to the hash value deque: - self.row_hash_values.extend(self.hashes_buffer.iter()); Ok(()) } @@ -1299,12 +1215,8 @@ impl OneSideHashJoiner { pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> { // Prune the hash values: - prune_hash_values( - prune_length, - &mut self.hashmap, - &mut self.row_hash_values, - self.deleted_offset as u64, - )?; + self.hashmap + .prune_hash_values(prune_length, self.deleted_offset as u64, HASHMAP_SHRINK_SCALE_FACTOR)?; // Remove pruned rows from the visited rows set: for row in self.deleted_offset..(self.deleted_offset + prune_length) { self.visited_rows.remove(&row); @@ -1519,7 +1431,7 @@ mod tests { }; use datafusion_common::ScalarValue; - const TABLE_SIZE: i32 = 100; + const TABLE_SIZE: i32 = 1000; pub async fn experiment( left: Arc, @@ -1701,6 +1613,61 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn join_all_one_ascending_numeric_v2() -> Result<()> { + let join_type = JoinType::Inner; + let cardinality = (4, 5); + let case_expr = 2; + let task_ctx = Arc::new(TaskContext::default()); + let (left_batch, right_batch) = build_sides_record_batches(1000, cardinality)?; + let left_schema = &left_batch.schema(); + let right_schema = &right_batch.schema(); + let left_sorted = vec![PhysicalSortExpr { + expr: col("la1", left_schema)?, + options: SortOptions::default(), + }]; + let right_sorted = vec![PhysicalSortExpr { + expr: col("ra1", right_schema)?, + options: SortOptions::default(), + }]; + let (left, right) = create_memory_table( + left_batch, + right_batch, + vec![left_sorted], + vec![right_sorted], + 13, + )?; + + let on = vec![( + Column::new_with_schema("lc1", left_schema)?, + Column::new_with_schema("rc1", right_schema)?, + )]; + + let intermediate_schema = Schema::new(vec![ + Field::new("left", DataType::Int32, true), + Field::new("right", DataType::Int32, true), + ]); + let filter_expr = join_expr_tests_fixture_i32( + case_expr, + col("left", &intermediate_schema)?, + col("right", &intermediate_schema)?, + ); + let column_indices = vec![ + ColumnIndex { + index: 0, + side: JoinSide::Left, + }, + ColumnIndex { + index: 0, + side: JoinSide::Right, + }, + ]; + let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema); + + experiment(left, right, Some(filter), join_type, on, task_ctx).await?; + Ok(()) + } + #[rstest] #[tokio::test(flavor = "multi_thread")] async fn join_without_sort_information( diff --git a/datafusion/core/src/physical_plan/joins/test_utils.rs b/datafusion/core/src/physical_plan/joins/test_utils.rs index 44610ab09a38..3fbb0db45655 100644 --- a/datafusion/core/src/physical_plan/joins/test_utils.rs +++ b/datafusion/core/src/physical_plan/joins/test_utils.rs @@ -73,7 +73,7 @@ pub async fn partitioned_sym_join_with_filter( null_equals_null: bool, context: Arc, ) -> Result> { - let partition_count = 4; + let partition_count = 1; let left_expr = on .iter() From 32e5d7a2c73e34ec74244e08b5138e4c3e105eaa Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 21 Aug 2023 22:22:39 +0300 Subject: [PATCH 03/11] Single implementation --- .../core/src/physical_plan/joins/hash_join.rs | 57 +++-- .../physical_plan/joins/hash_join_utils.rs | 59 ++++- .../joins/symmetric_hash_join.rs | 229 ++---------------- .../src/physical_plan/joins/test_utils.rs | 2 +- 4 files changed, 109 insertions(+), 238 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 50d04f9323b6..2d070cd00da0 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -58,13 +58,10 @@ use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder}; use arrow::record_batch::RecordBatch; use arrow::{ array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, - PrimitiveArray, UInt32Array, UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, - }, - datatypes::{ - DataType, Schema, - SchemaRef + Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, + UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, }, + datatypes::{DataType, Schema, SchemaRef}, util::bit_util, }; use arrow_array::cast::downcast_array; @@ -74,6 +71,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::OrderingEquivalenceProperties; +use crate::physical_plan::joins::hash_join_utils::JoinHashMapType; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -584,13 +582,14 @@ async fn collect_left_input( for batch in batches.iter() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); - update_hash( + update_hash::( &on_left, batch, &mut hashmap, offset, &random_state, &mut hashes_buffer, + 0, )?; offset += batch.num_rows(); } @@ -603,13 +602,14 @@ async fn collect_left_input( /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, /// assuming that the [RecordBatch] corresponds to the `index`th -pub fn update_hash( +pub fn update_hash( on: &[Column], batch: &RecordBatch, - hash_map: &mut JoinHashMap, + hash_map: &mut T, offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, + deleted_offset: usize, ) -> Result<()> { // evaluate the keys let keys_values = on @@ -619,11 +619,14 @@ pub fn update_hash( // calculate the hash values let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + if EXTEND_CHAIN { + hash_map.extend(batch.num_rows(), 0) + } // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { let item = hash_map - .map + .get_mut_map() .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, index)) = item { // Already exists: add index to next array @@ -631,9 +634,9 @@ pub fn update_hash( // Store new value inside hashmap *index = (row + offset + 1) as u64; // Update chained Vec at row + offset with previous value - hash_map.next[row + offset] = prev_index; + hash_map.get_mut_list()[row + offset - deleted_offset] = prev_index; } else { - hash_map.map.insert( + hash_map.get_mut_map().insert( *hash_value, // store the value + 1 as 0 value reserved for end of list (*hash_value, (row + offset + 1) as u64), @@ -716,8 +719,8 @@ impl RecordBatchStream for HashJoinStream { // Build indices: 4, 5, 6, 6 // Probe indices: 3, 3, 4, 5 #[allow(clippy::too_many_arguments)] -pub fn build_equal_condition_join_indices( - build_hashmap: &JoinHashMap, +pub fn build_equal_condition_join_indices( + build_hashmap: &T, build_input_buffer: &RecordBatch, probe_batch: &RecordBatch, build_on: &[Column], @@ -727,6 +730,7 @@ pub fn build_equal_condition_join_indices( hashes_buffer: &mut Vec, filter: Option<&JoinFilter>, build_side: JoinSide, + deleted_offset: Option, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -741,6 +745,7 @@ pub fn build_equal_condition_join_indices( .collect::>>()?; hashes_buffer.clear(); hashes_buffer.resize(probe_batch.num_rows(), 0); + let deleted_offset_value = deleted_offset.unwrap_or(0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); @@ -774,22 +779,33 @@ pub fn build_equal_condition_join_indices( // (5,1) // // With this approach, the lexicographic order on both the probe side and the build side is preserved. + let hash_map = build_hashmap.get_map(); + let next_chain = build_hashmap.get_list(); for (row, hash_value) in hash_values.iter().enumerate().rev() { // Get the hash and find it in the build index // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, index)) = build_hashmap - .map - .get(*hash_value, |(hash, _)| *hash_value == *hash) + if let Some((_, index)) = + hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash) { let mut i = *index - 1; loop { - build_indices.append(i); + let build_row_value = if let Some(offset) = deleted_offset { + // This arguments means that we prune the next index way before here. + if i < offset as u64 { + // End of the list due to pruning; + break; + } + i - deleted_offset_value as u64 + } else { + i + }; + build_indices.append(build_row_value); probe_indices.append(row as u32); // Follow the chain to get the next index value - let next = build_hashmap.next[i as usize]; + let next = next_chain[build_row_value as usize]; if next == 0 { // end of list break; @@ -828,7 +844,6 @@ pub fn build_equal_condition_join_indices( ) } - // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, @@ -957,6 +972,7 @@ impl HashJoinStream { &mut hashes_buffer, self.filter.as_ref(), JoinSide::Left, + None, ); let result = match left_right_indices { @@ -2419,6 +2435,7 @@ mod tests { &mut vec![0; right.num_rows()], None, JoinSide::Left, + None, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 6f3ad5fa2237..d05c1c3d9a38 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -37,6 +37,7 @@ use hashbrown::raw::RawTable; use hashbrown::HashSet; use parking_lot::Mutex; use std::fmt::{Debug, Formatter}; +use std::ops::IndexMut; use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use crate::physical_plan::ExecutionPlan; @@ -93,7 +94,6 @@ use datafusion_common::Result; // --------------------- // | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) // --------------------- - // TODO: speed up collision checks // https://github.com/apache/arrow-datafusion/issues/50 pub struct JoinHashMap { @@ -112,6 +112,61 @@ impl JoinHashMap { } } +pub trait JoinHashMapType { + type ListType: IndexMut; + fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)>; + fn extend(&mut self, len: usize, value: u64); + fn get_map(&self) -> &RawTable<(u64, u64)>; + fn get_mut_list(&mut self) -> &mut Self::ListType; + fn get_list(&self) -> &Self::ListType; +} + +impl JoinHashMapType for JoinHashMap { + type ListType = Vec; + fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { + &mut self.map + } + + fn extend(&mut self, _: usize, _: u64) { + unimplemented!() + } + + fn get_map(&self) -> &RawTable<(u64, u64)> { + &self.map + } + + fn get_mut_list(&mut self) -> &mut Self::ListType { + &mut self.next + } + + fn get_list(&self) -> &Self::ListType { + &self.next + } +} + +impl JoinHashMapType for PruningJoinHashMap { + type ListType = VecDeque; + fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { + &mut self.map + } + + fn extend(&mut self, len: usize, value: u64) { + self.list.resize(self.list.len() + len, value) + } + + fn get_map(&self) -> &RawTable<(u64, u64)> { + &self.map + } + + fn get_mut_list(&mut self) -> &mut Self::ListType { + &mut self.list + } + + fn get_list(&self) -> &Self::ListType { + &self.list + } +} + impl fmt::Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { Ok(()) @@ -219,7 +274,7 @@ impl PruningJoinHashMap { &mut self, prune_length: usize, deleting_offset: u64, - shrink_factor: usize + shrink_factor: usize, ) -> Result<()> { // Remove elements from the list based on the pruning length. self.list.drain(0..prune_length); diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 79b3382a4903..9b27402cc6e8 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -37,10 +37,6 @@ use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveB use arrow::compute::concat_batches; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder}; -use arrow_array::{UInt32Array, UInt64Array}; -use datafusion_physical_expr::hash_utils::create_hashes; -use datafusion_physical_expr::PhysicalExpr; use futures::stream::{select, BoxStream}; use futures::{Stream, StreamExt}; use hashbrown::HashSet; @@ -50,7 +46,9 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr::intervals::ExprIntervalGraph; use crate::physical_plan::common::SharedMemoryReservation; -use crate::physical_plan::joins::hash_join::equal_rows_arr; +use crate::physical_plan::joins::hash_join::{ + build_equal_condition_join_indices, update_hash, +}; use crate::physical_plan::joins::hash_join_utils::{ build_filter_expression_graph, calculate_filter_expr_intervals, combine_two_batches, convert_sort_expr_with_filter_schema, get_pruning_anti_indices, @@ -79,8 +77,6 @@ use datafusion_common::{internal_err, plan_err, JoinType}; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::TaskContext; -use super::utils::apply_join_filter_to_indices; - const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; /// A symmetric hash join with range conditions is when both streams are hashed on the @@ -797,158 +793,6 @@ pub(crate) fn build_side_determined_results( } } -// Returns build/probe indices satisfying the equality condition. -// On LEFT.b1 = RIGHT.b2 -// LEFT Table: -// a1 b1 c1 -// 1 1 10 -// 3 3 30 -// 5 5 50 -// 7 7 70 -// 9 8 90 -// 11 8 110 -// 13 10 130 -// RIGHT Table: -// a2 b2 c2 -// 2 2 20 -// 4 4 40 -// 6 6 60 -// 8 8 80 -// 10 10 100 -// 12 10 120 -// The result is -// "+----+----+-----+----+----+-----+", -// "| a1 | b1 | c1 | a2 | b2 | c2 |", -// "+----+----+-----+----+----+-----+", -// "| 11 | 8 | 110 | 8 | 8 | 80 |", -// "| 13 | 10 | 130 | 10 | 10 | 100 |", -// "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", -// "+----+----+-----+----+----+-----+" -// And the result of build and probe indices are: -// Build indices: 5, 6, 6, 4 -// Probe indices: 3, 4, 5, 3 -#[allow(clippy::too_many_arguments)] -pub fn build_join_indices( - probe_batch: &RecordBatch, - build_hashmap: &PruningJoinHashMap, - build_input_buffer: &RecordBatch, - on_build: &[Column], - on_probe: &[Column], - filter: Option<&JoinFilter>, - random_state: &RandomState, - null_equals_null: bool, - hashes_buffer: &mut Vec, - deleted_offset: Option, - build_side: JoinSide, -) -> Result<(UInt64Array, UInt32Array)> { - let keys_values = on_probe - .iter() - .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) - .collect::>>()?; - let build_join_values = on_build - .iter() - .map(|c| { - Ok(c.evaluate(build_input_buffer)? - .into_array(build_input_buffer.num_rows())) - }) - .collect::>>()?; - hashes_buffer.clear(); - hashes_buffer.resize(probe_batch.num_rows(), 0); - let offset_value = deleted_offset.unwrap_or(0); - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - // Using a buffer builder to avoid slower normal builder - let mut build_indices = UInt64BufferBuilder::new(0); - let mut probe_indices = UInt32BufferBuilder::new(0); - // The chained list algorithm generates build indices for each probe row in a reversed sequence as such: - // Build Indices: [5, 4, 3] - // Probe Indices: [1, 1, 1] - // - // This affects the output sequence. Hypothetically, it's possible to preserve the lexicographic order on the build side. - // Let's consider probe rows [0,1] as an example: - // - // When the probe iteration sequence is reversed, the following pairings can be derived: - // - // For probe row 1: - // (5, 1) - // (4, 1) - // (3, 1) - // - // For probe row 0: - // (5, 0) - // (4, 0) - // (3, 0) - // - // After reversing both sets of indices, we obtain reversed indices: - // - // (3,0) - // (4,0) - // (5,0) - // (3,1) - // (4,1) - // (5,1) - // - // With this approach, the lexicographic order on both the probe side and the build side is preserved. - for (probe_row, hash_value) in hash_values.iter().enumerate().rev() { - // Get the hash and find it in the build index - - // For every item on the build and probe we check if it matches - // This possibly contains rows with hash collisions, - // So we have to check here whether rows are equal or not - if let Some((_, index)) = build_hashmap - .map - .get(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; - loop { - // This arguments means that we prune the next index way before here. - if i < offset_value as u64 { - // End of the list due to pruning; - break; - } - let build_row_value = i - offset_value as u64; - build_indices.append(build_row_value); - probe_indices.append(probe_row as u32); - // Follow the chain to get the next index value - let next = build_hashmap.list[build_row_value as usize]; - if next == 0 { - // end of list - break; - } - i = next - 1; - } - } - } - // Reversing both sets of indices - build_indices.as_slice_mut().reverse(); - probe_indices.as_slice_mut().reverse(); - - let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None); - let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None); - - let (left, right) = if let Some(filter) = filter { - // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` - apply_join_filter_to_indices( - build_input_buffer, - probe_batch, - left, - right, - filter, - build_side, - )? - } else { - (left, right) - }; - - equal_rows_arr( - &left, - &right, - &build_join_values, - &keys_values, - null_equals_null, - ) -} - /// This method performs a join between the build side input buffer and the probe side batch. /// /// # Arguments @@ -983,18 +827,18 @@ pub(crate) fn join_with_probe_batch( if build_hash_joiner.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(None); } - let (build_indices, probe_indices) = build_join_indices( - probe_batch, + let (build_indices, probe_indices) = build_equal_condition_join_indices( &build_hash_joiner.hashmap, &build_hash_joiner.input_buffer, + probe_batch, &build_hash_joiner.on, &probe_hash_joiner.on, - filter, random_state, null_equals_null, &mut build_hash_joiner.hashes_buffer, - Some(build_hash_joiner.deleted_offset), + filter, build_hash_joiner.build_side, + Some(build_hash_joiner.deleted_offset), )?; if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) { record_visited_indices( @@ -1078,54 +922,6 @@ impl OneSideHashJoiner { } } - /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, - /// assuming that the [RecordBatch] corresponds to the `index`th - pub fn update_hash( - on: &[Column], - batch: &RecordBatch, - hash_map: &mut PruningJoinHashMap, - offset: usize, - deleted_offset: usize, - random_state: &RandomState, - hashes_buffer: &mut Vec, - ) -> Result<()> { - // evaluate the keys - let keys_values = on - .iter() - .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) - .collect::>>()?; - - // calculate the hash values - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - hash_map - .list - .resize(hash_map.list.len() + batch.num_rows(), 0); - // insert hashes to key of the hashmap - for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .map - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { - // Already exists: add index to next array - let prev_index = *index; - // Store new value inside hashmap - *index = (row + offset + 1) as u64; - // Update chained Vec at row + offset with previous value - hash_map.list[row + offset - deleted_offset] = prev_index; - } else { - hash_map.map.insert( - *hash_value, - // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + offset + 1) as u64), - |(hash, _)| *hash, - ); - // chained list at (row + offset) is already initialized with 0 - // meaning end of list - } - } - Ok(()) - } - /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch. /// /// # Arguments @@ -1147,14 +943,14 @@ impl OneSideHashJoiner { self.hashes_buffer.resize(batch.num_rows(), 0); // Get allocation_info before adding the item // Update the hashmap with the join key values and hashes of the incoming batch: - Self::update_hash( + update_hash::( &self.on, batch, &mut self.hashmap, self.offset, - self.deleted_offset, random_state, &mut self.hashes_buffer, + self.deleted_offset, )?; Ok(()) } @@ -1215,8 +1011,11 @@ impl OneSideHashJoiner { pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> Result<()> { // Prune the hash values: - self.hashmap - .prune_hash_values(prune_length, self.deleted_offset as u64, HASHMAP_SHRINK_SCALE_FACTOR)?; + self.hashmap.prune_hash_values( + prune_length, + self.deleted_offset as u64, + HASHMAP_SHRINK_SCALE_FACTOR, + )?; // Remove pruned rows from the visited rows set: for row in self.deleted_offset..(self.deleted_offset + prune_length) { self.visited_rows.remove(&row); diff --git a/datafusion/core/src/physical_plan/joins/test_utils.rs b/datafusion/core/src/physical_plan/joins/test_utils.rs index 3fbb0db45655..44610ab09a38 100644 --- a/datafusion/core/src/physical_plan/joins/test_utils.rs +++ b/datafusion/core/src/physical_plan/joins/test_utils.rs @@ -73,7 +73,7 @@ pub async fn partitioned_sym_join_with_filter( null_equals_null: bool, context: Arc, ) -> Result> { - let partition_count = 1; + let partition_count = 4; let left_expr = on .iter() From e9deddffb39a35b85428634a7d9d03e0cdc2948f Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 22 Aug 2023 00:06:02 +0300 Subject: [PATCH 04/11] As any mut impl. --- .../core/src/physical_plan/joins/hash_join.rs | 18 ++-- .../physical_plan/joins/hash_join_utils.rs | 68 +++++++++------ .../joins/symmetric_hash_join.rs | 2 +- .../core/src/physical_plan/joins/utils.rs | 85 +++++++++++++++++++ 4 files changed, 142 insertions(+), 31 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 2d070cd00da0..80b884331002 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -71,7 +71,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::OrderingEquivalenceProperties; -use crate::physical_plan::joins::hash_join_utils::JoinHashMapType; +use crate::physical_plan::joins::hash_join_utils::{JoinHashMapType, PruningJoinHashMap}; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -582,7 +582,7 @@ async fn collect_left_input( for batch in batches.iter() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); - update_hash::( + update_hash( &on_left, batch, &mut hashmap, @@ -602,7 +602,7 @@ async fn collect_left_input( /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, /// assuming that the [RecordBatch] corresponds to the `index`th -pub fn update_hash( +pub fn update_hash( on: &[Column], batch: &RecordBatch, hash_map: &mut T, @@ -610,7 +610,10 @@ pub fn update_hash( random_state: &RandomState, hashes_buffer: &mut Vec, deleted_offset: usize, -) -> Result<()> { +) -> Result<()> +where + T: JoinHashMapType, +{ // evaluate the keys let keys_values = on .iter() @@ -619,8 +622,11 @@ pub fn update_hash( // calculate the hash values let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - if EXTEND_CHAIN { - hash_map.extend(batch.num_rows(), 0) + + if let Some(hash_map) = hash_map.as_any_mut().downcast_mut::() { + hash_map + .next + .resize(hash_map.next.len() + batch.num_rows(), 0); } // insert hashes to key of the hashmap diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index d05c1c3d9a38..941e130aebf4 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -18,6 +18,7 @@ //! This file contains common subroutines for regular and symmetric hash join //! related functionality, used both in join calculations and optimization rules. +use std::any::Any; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::{fmt, usize}; @@ -112,58 +113,77 @@ impl JoinHashMap { } } +/// Trait defining methods that must be implemented by a hash map type to be used for joins. pub trait JoinHashMapType { - type ListType: IndexMut; + /// The type of list used to store the hash values. + type NextType: IndexMut; + /// Returns a mutable reference to `self` as a `dyn Any` for dynamic downcasting. + fn as_any_mut(&mut self) -> &mut dyn Any; + /// Returns a mutable reference to the hash map. fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)>; - fn extend(&mut self, len: usize, value: u64); + /// Returns a reference to the hash map. fn get_map(&self) -> &RawTable<(u64, u64)>; - fn get_mut_list(&mut self) -> &mut Self::ListType; - fn get_list(&self) -> &Self::ListType; + /// Returns a mutable reference to the next. + fn get_mut_list(&mut self) -> &mut Self::NextType; + /// Returns a reference to the next. + fn get_list(&self) -> &Self::NextType; } +/// Implementation of `JoinHashMapType` for `JoinHashMap`. impl JoinHashMapType for JoinHashMap { - type ListType = Vec; - fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { - &mut self.map + type NextType = Vec; + + fn as_any_mut(&mut self) -> &mut dyn Any { + self } - fn extend(&mut self, _: usize, _: u64) { - unimplemented!() + /// Get a mutable reference to the hash map. + fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { + &mut self.map } + /// Get a reference to the hash map. fn get_map(&self) -> &RawTable<(u64, u64)> { &self.map } - fn get_mut_list(&mut self) -> &mut Self::ListType { + /// Get a mutable reference to the next. + fn get_mut_list(&mut self) -> &mut Self::NextType { &mut self.next } - fn get_list(&self) -> &Self::ListType { + /// Get a reference to the next. + fn get_list(&self) -> &Self::NextType { &self.next } } +/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`. impl JoinHashMapType for PruningJoinHashMap { - type ListType = VecDeque; - fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { - &mut self.map + type NextType = VecDeque; + + fn as_any_mut(&mut self) -> &mut dyn Any { + self } - fn extend(&mut self, len: usize, value: u64) { - self.list.resize(self.list.len() + len, value) + /// Get a mutable reference to the hash map. + fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { + &mut self.map } + /// Get a reference to the hash map. fn get_map(&self) -> &RawTable<(u64, u64)> { &self.map } - fn get_mut_list(&mut self) -> &mut Self::ListType { - &mut self.list + /// Get a mutable reference to the next. + fn get_mut_list(&mut self) -> &mut Self::NextType { + &mut self.next } - fn get_list(&self) -> &Self::ListType { - &self.list + /// Get a reference to the next. + fn get_list(&self) -> &Self::NextType { + &self.next } } @@ -210,7 +230,7 @@ pub struct PruningJoinHashMap { /// Stores hash value to last row index pub map: RawTable<(u64, u64)>, /// Stores indices in chained list data structure - pub list: VecDeque, + pub next: VecDeque, } impl PruningJoinHashMap { @@ -225,7 +245,7 @@ impl PruningJoinHashMap { pub(crate) fn with_capacity(capacity: usize) -> Self { PruningJoinHashMap { map: RawTable::with_capacity(capacity), - list: VecDeque::with_capacity(capacity), + next: VecDeque::with_capacity(capacity), } } @@ -257,7 +277,7 @@ impl PruningJoinHashMap { /// The size of the hash map in bytes. pub(crate) fn size(&self) -> usize { self.map.allocation_info().1.size() - + self.list.capacity() * std::mem::size_of::() + + self.next.capacity() * std::mem::size_of::() } /// Removes hash values from the map and list based on the given pruning length and deleting offset. @@ -277,7 +297,7 @@ impl PruningJoinHashMap { shrink_factor: usize, ) -> Result<()> { // Remove elements from the list based on the pruning length. - self.list.drain(0..prune_length); + self.next.drain(0..prune_length); // Calculate the keys that should be removed from the map. let removable_keys = unsafe { diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 9b27402cc6e8..e7f2753ff797 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -943,7 +943,7 @@ impl OneSideHashJoiner { self.hashes_buffer.resize(batch.num_rows(), 0); // Get allocation_info before adding the item // Update the hashmap with the join key values and hashes of the incoming batch: - update_hash::( + update_hash( &self.on, batch, &mut self.hashmap, diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index dd23e39e2774..7a2cf434fd1d 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -2092,3 +2092,88 @@ mod tests { Ok(()) } } + +/// Updates sorted filter expressions with corresponding node indices from the +/// expression interval graph. +/// +/// This function iterates through the provided sorted filter expressions, +/// gathers the corresponding node indices from the expression interval graph, +/// and then updates the sorted expressions with these indices. It ensures +/// that these sorted expressions are aligned with the structure of the graph. +fn update_sorted_exprs_with_node_indices( + graph: &mut ExprIntervalGraph, + sorted_exprs: &mut [SortedFilterExpr], +) { + // Extract filter expressions from the sorted expressions: + let filter_exprs = sorted_exprs + .iter() + .map(|expr| expr.filter_expr().clone()) + .collect::>(); + + // Gather corresponding node indices for the extracted filter expressions from the graph: + let child_node_indices = graph.gather_node_indices(&filter_exprs); + + // Iterate through the sorted expressions and the gathered node indices: + for (sorted_expr, (_, index)) in sorted_exprs.iter_mut().zip(child_node_indices) { + // Update each sorted expression with the corresponding node index: + sorted_expr.set_node_index(index); + } +} + +/// Prepares and sorts expressions based on a given filter, left and right execution plans, and sort expressions. +/// +/// # Arguments +/// +/// * `filter` - The join filter to base the sorting on. +/// * `left` - The left execution plan. +/// * `right` - The right execution plan. +/// * `left_sort_exprs` - The expressions to sort on the left side. +/// * `right_sort_exprs` - The expressions to sort on the right side. +/// +/// # Returns +/// +/// * A tuple consisting of the sorted filter expression for the left and right sides, and an expression interval graph. +pub fn prepare_sorted_exprs( + filter: &JoinFilter, + left: &Arc, + right: &Arc, + left_sort_exprs: &[PhysicalSortExpr], + right_sort_exprs: &[PhysicalSortExpr], +) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> { + // Build the filter order for the left side + let err = + || DataFusionError::Plan("Filter does not include the child order".to_owned()); + + let left_temp_sorted_filter_expr = build_filter_input_order( + JoinSide::Left, + filter, + &left.schema(), + &left_sort_exprs[0], + )? + .ok_or_else(err)?; + + // Build the filter order for the right side + let right_temp_sorted_filter_expr = build_filter_input_order( + JoinSide::Right, + filter, + &right.schema(), + &right_sort_exprs[0], + )? + .ok_or_else(err)?; + + // Collect the sorted expressions + let mut sorted_exprs = + vec![left_temp_sorted_filter_expr, right_temp_sorted_filter_expr]; + + // Build the expression interval graph + let mut graph = ExprIntervalGraph::try_new(filter.expression().clone())?; + + // Update sorted expressions with node indices + update_sorted_exprs_with_node_indices(&mut graph, &mut sorted_exprs); + + // Swap and remove to get the final sorted filter expressions + let right_sorted_filter_expr = sorted_exprs.swap_remove(1); + let left_sorted_filter_expr = sorted_exprs.swap_remove(0); + + Ok((left_sorted_filter_expr, right_sorted_filter_expr, graph)) +} From 482f262a7bbe4b758de32d59b769943a4bd9254d Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Mon, 21 Aug 2023 22:05:42 -0400 Subject: [PATCH 05/11] Review changes --- .../core/src/physical_plan/joins/hash_join.rs | 31 +++----- .../physical_plan/joins/hash_join_utils.rs | 79 ++++++++----------- .../joins/symmetric_hash_join.rs | 26 +++--- 3 files changed, 58 insertions(+), 78 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 80b884331002..f8137ba26526 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -36,7 +36,7 @@ use crate::physical_plan::{ expressions::Column, expressions::PhysicalSortExpr, hash_utils::create_hashes, - joins::hash_join_utils::JoinHashMap, + joins::hash_join_utils::{JoinHashMap, JoinHashMapType, PruningJoinHashMap}, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, combine_join_equivalence_properties, estimate_join_statistics, @@ -53,17 +53,15 @@ use super::{ PartitionMode, }; +use arrow::array::{ + Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, + UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, +}; use arrow::buffer::BooleanBuffer; use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use arrow::{ - array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, - UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder, - }, - datatypes::{DataType, Schema, SchemaRef}, - util::bit_util, -}; +use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; use datafusion_common::{internal_err, plan_err, DataFusionError, JoinType, Result}; @@ -71,7 +69,6 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::OrderingEquivalenceProperties; -use crate::physical_plan::joins::hash_join_utils::{JoinHashMapType, PruningJoinHashMap}; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -630,19 +627,18 @@ where } // insert hashes to key of the hashmap + let (mut_map, mut_list) = hash_map.get_mut(); for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .get_mut_map() - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash); if let Some((_, index)) = item { // Already exists: add index to next array let prev_index = *index; // Store new value inside hashmap *index = (row + offset + 1) as u64; // Update chained Vec at row + offset with previous value - hash_map.get_mut_list()[row + offset - deleted_offset] = prev_index; + mut_list[row + offset - deleted_offset] = prev_index; } else { - hash_map.get_mut_map().insert( + mut_map.insert( *hash_value, // store the value + 1 as 0 value reserved for end of list (*hash_value, (row + offset + 1) as u64), @@ -751,7 +747,6 @@ pub fn build_equal_condition_join_indices( .collect::>>()?; hashes_buffer.clear(); hashes_buffer.resize(probe_batch.num_rows(), 0); - let deleted_offset_value = deleted_offset.unwrap_or(0); let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); @@ -801,10 +796,10 @@ pub fn build_equal_condition_join_indices( let build_row_value = if let Some(offset) = deleted_offset { // This arguments means that we prune the next index way before here. if i < offset as u64 { - // End of the list due to pruning; + // End of the list due to pruning break; } - i - deleted_offset_value as u64 + i - offset as u64 } else { i }; diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 941e130aebf4..f08363068720 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -20,29 +20,28 @@ use std::any::Any; use std::collections::{HashMap, VecDeque}; +use std::fmt::{Debug, Formatter}; +use std::ops::IndexMut; use std::sync::Arc; use std::{fmt, usize}; -use arrow::datatypes::{ArrowNativeType, SchemaRef}; +use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; +use crate::physical_plan::ExecutionPlan; use arrow::compute::concat_batches; +use arrow::datatypes::{ArrowNativeType, SchemaRef}; use arrow_array::builder::BooleanBufferBuilder; use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, RecordBatch}; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound}; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; + use hashbrown::raw::RawTable; use hashbrown::HashSet; use parking_lot::Mutex; -use std::fmt::{Debug, Formatter}; -use std::ops::IndexMut; - -use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; -use crate::physical_plan::ExecutionPlan; -use datafusion_common::Result; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, @@ -119,12 +118,10 @@ pub trait JoinHashMapType { type NextType: IndexMut; /// Returns a mutable reference to `self` as a `dyn Any` for dynamic downcasting. fn as_any_mut(&mut self) -> &mut dyn Any; - /// Returns a mutable reference to the hash map. - fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)>; + /// Returns mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); /// Returns a reference to the hash map. fn get_map(&self) -> &RawTable<(u64, u64)>; - /// Returns a mutable reference to the next. - fn get_mut_list(&mut self) -> &mut Self::NextType; /// Returns a reference to the next. fn get_list(&self) -> &Self::NextType; } @@ -137,9 +134,9 @@ impl JoinHashMapType for JoinHashMap { self } - /// Get a mutable reference to the hash map. - fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { - &mut self.map + /// Get mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + (&mut self.map, &mut self.next) } /// Get a reference to the hash map. @@ -147,11 +144,6 @@ impl JoinHashMapType for JoinHashMap { &self.map } - /// Get a mutable reference to the next. - fn get_mut_list(&mut self) -> &mut Self::NextType { - &mut self.next - } - /// Get a reference to the next. fn get_list(&self) -> &Self::NextType { &self.next @@ -166,9 +158,9 @@ impl JoinHashMapType for PruningJoinHashMap { self } - /// Get a mutable reference to the hash map. - fn get_mut_map(&mut self) -> &mut RawTable<(u64, u64)> { - &mut self.map + /// Get mutable references to the hash map and the next. + fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { + (&mut self.map, &mut self.next) } /// Get a reference to the hash map. @@ -176,11 +168,6 @@ impl JoinHashMapType for PruningJoinHashMap { &self.map } - /// Get a mutable reference to the next. - fn get_mut_list(&mut self) -> &mut Self::NextType { - &mut self.next - } - /// Get a reference to the next. fn get_list(&self) -> &Self::NextType { &self.next @@ -193,15 +180,16 @@ impl fmt::Debug for JoinHashMap { } } -/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with the capability of pruning -/// elements in an efficient manner. This structure is particularly useful for cases where -/// it's necessary to remove elements from the map based on their buffer order. -/// +/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with +/// the capability of pruning elements in an efficient manner. This structure +/// is particularly useful for cases where it's necessary to remove elements +/// from the map based on their buffer order. /// /// # Example /// /// ``` text -/// Let's continue the example of `JoinHashMap` and then show how `PruningJoinHashMap` would handle the pruning scenario. +/// Let's continue the example of `JoinHashMap` and then show how `PruningJoinHashMap` would +/// handle the pruning scenario. /// /// Insert the pair (1,4) into the `PruningJoinHashMap`: /// map: @@ -224,7 +212,8 @@ impl fmt::Debug for JoinHashMap { /// | 2 | 4 | <--- hash value 1 maps to 2 (5 - 3), 1 (4 - 3), NA (2 - 3) (which means indices values 1,0) /// --------- /// -/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap` since there are no values left for this key. +/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap` since +/// there are no values left for this key. /// ``` pub struct PruningJoinHashMap { /// Stores hash value to last row index @@ -249,11 +238,13 @@ impl PruningJoinHashMap { } } - /// Shrinks the capacity of the hash map if necessary based on the provided scale factor. + /// Shrinks the capacity of the hash map, if necessary, based on the + /// provided scale factor. /// /// # Arguments - /// * `scale_factor`: The scale factor that determines how conservative the shrinking strategy is. - /// The capacity will be reduced by 1/scale_factor when necessary. + /// * `scale_factor`: The scale factor that determines how conservative the + /// shrinking strategy is. The capacity will be reduced by 1/`scale_factor` + /// when necessary. /// /// # Note /// Increasing the scale factor results in less aggressive capacity shrinking, @@ -262,9 +253,8 @@ impl PruningJoinHashMap { /// potentially leading to lower memory usage but more frequent resizing. pub(crate) fn shrink_if_necessary(&mut self, scale_factor: usize) { let capacity = self.map.capacity(); - let len = self.map.len(); - if capacity > scale_factor * len { + if capacity > scale_factor * self.map.len() { let new_capacity = (capacity * (scale_factor - 1)) / scale_factor; // Resize the map with the new capacity. self.map.shrink_to(new_capacity, |(hash, _)| *hash) @@ -280,9 +270,8 @@ impl PruningJoinHashMap { + self.next.capacity() * std::mem::size_of::() } - /// Removes hash values from the map and list based on the given pruning length and deleting offset. - /// - /// + /// Removes hash values from the map and the list based on the given pruning + /// length and deleting offset. /// /// # Arguments /// * `prune_length`: The number of elements to remove from the list. @@ -305,11 +294,7 @@ impl PruningJoinHashMap { .iter() .map(|bucket| bucket.as_ref()) .filter_map(|(hash, tail_index)| { - if *tail_index < prune_length as u64 + deleting_offset { - Some(*hash) - } else { - None - } + (*tail_index < prune_length as u64 + deleting_offset).then_some(*hash) }) .collect::>() }; diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index e7f2753ff797..1c664adfbb71 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -32,19 +32,6 @@ use std::task::Poll; use std::vec; use std::{any::Any, usize}; -use ahash::RandomState; -use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder}; -use arrow::compute::concat_batches; -use arrow::datatypes::{Schema, SchemaRef}; -use arrow::record_batch::RecordBatch; -use futures::stream::{select, BoxStream}; -use futures::{Stream, StreamExt}; -use hashbrown::HashSet; -use parking_lot::Mutex; - -use datafusion_execution::memory_pool::MemoryConsumer; -use datafusion_physical_expr::intervals::ExprIntervalGraph; - use crate::physical_plan::common::SharedMemoryReservation; use crate::physical_plan::joins::hash_join::{ build_equal_condition_join_indices, update_hash, @@ -72,10 +59,23 @@ use crate::physical_plan::{ DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; + +use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, PrimitiveBuilder}; +use arrow::compute::concat_batches; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use datafusion_common::utils::bisect; use datafusion_common::{internal_err, plan_err, JoinType}; use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; +use datafusion_physical_expr::intervals::ExprIntervalGraph; + +use ahash::RandomState; +use futures::stream::{select, BoxStream}; +use futures::{Stream, StreamExt}; +use hashbrown::HashSet; +use parking_lot::Mutex; const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; From 284f49516fe34aa6efda06fb970db608c84e2f80 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 22 Aug 2023 12:57:38 +0300 Subject: [PATCH 06/11] Leftovers --- .../core/src/physical_plan/joins/utils.rs | 85 ------------------- 1 file changed, 85 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index 7a2cf434fd1d..dd23e39e2774 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -2092,88 +2092,3 @@ mod tests { Ok(()) } } - -/// Updates sorted filter expressions with corresponding node indices from the -/// expression interval graph. -/// -/// This function iterates through the provided sorted filter expressions, -/// gathers the corresponding node indices from the expression interval graph, -/// and then updates the sorted expressions with these indices. It ensures -/// that these sorted expressions are aligned with the structure of the graph. -fn update_sorted_exprs_with_node_indices( - graph: &mut ExprIntervalGraph, - sorted_exprs: &mut [SortedFilterExpr], -) { - // Extract filter expressions from the sorted expressions: - let filter_exprs = sorted_exprs - .iter() - .map(|expr| expr.filter_expr().clone()) - .collect::>(); - - // Gather corresponding node indices for the extracted filter expressions from the graph: - let child_node_indices = graph.gather_node_indices(&filter_exprs); - - // Iterate through the sorted expressions and the gathered node indices: - for (sorted_expr, (_, index)) in sorted_exprs.iter_mut().zip(child_node_indices) { - // Update each sorted expression with the corresponding node index: - sorted_expr.set_node_index(index); - } -} - -/// Prepares and sorts expressions based on a given filter, left and right execution plans, and sort expressions. -/// -/// # Arguments -/// -/// * `filter` - The join filter to base the sorting on. -/// * `left` - The left execution plan. -/// * `right` - The right execution plan. -/// * `left_sort_exprs` - The expressions to sort on the left side. -/// * `right_sort_exprs` - The expressions to sort on the right side. -/// -/// # Returns -/// -/// * A tuple consisting of the sorted filter expression for the left and right sides, and an expression interval graph. -pub fn prepare_sorted_exprs( - filter: &JoinFilter, - left: &Arc, - right: &Arc, - left_sort_exprs: &[PhysicalSortExpr], - right_sort_exprs: &[PhysicalSortExpr], -) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> { - // Build the filter order for the left side - let err = - || DataFusionError::Plan("Filter does not include the child order".to_owned()); - - let left_temp_sorted_filter_expr = build_filter_input_order( - JoinSide::Left, - filter, - &left.schema(), - &left_sort_exprs[0], - )? - .ok_or_else(err)?; - - // Build the filter order for the right side - let right_temp_sorted_filter_expr = build_filter_input_order( - JoinSide::Right, - filter, - &right.schema(), - &right_sort_exprs[0], - )? - .ok_or_else(err)?; - - // Collect the sorted expressions - let mut sorted_exprs = - vec![left_temp_sorted_filter_expr, right_temp_sorted_filter_expr]; - - // Build the expression interval graph - let mut graph = ExprIntervalGraph::try_new(filter.expression().clone())?; - - // Update sorted expressions with node indices - update_sorted_exprs_with_node_indices(&mut graph, &mut sorted_exprs); - - // Swap and remove to get the final sorted filter expressions - let right_sorted_filter_expr = sorted_exprs.swap_remove(1); - let left_sorted_filter_expr = sorted_exprs.swap_remove(0); - - Ok((left_sorted_filter_expr, right_sorted_filter_expr, graph)) -} From 6695d6037b560ea95402f672eea7d670c9f56c67 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 22 Aug 2023 17:15:04 +0300 Subject: [PATCH 07/11] Update Cargo.toml --- datafusion/core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 913d3c84beef..4e41b4f32e3f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -85,7 +85,6 @@ parquet = { workspace = true } percent-encoding = "2.2.0" pin-project-lite = "^0.2.7" rand = "0.8" -smallvec = { version = "1.6", features = ["union"] } sqlparser = { workspace = true } tempfile = "3" tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } From 45101d6cf34e7469b610fb0ab767f2fced67bbe4 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Tue, 22 Aug 2023 17:29:06 +0300 Subject: [PATCH 08/11] Void implementation --- datafusion-cli/Cargo.lock | 1 - .../core/src/physical_plan/joins/hash_join.rs | 9 +++------ .../src/physical_plan/joins/hash_join_utils.rs | 14 +++++++------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index cca667b11425..127a13c75ddd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1082,7 +1082,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rand", - "smallvec", "sqlparser", "tempfile", "tokio", diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index f8137ba26526..92997d837175 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -36,7 +36,7 @@ use crate::physical_plan::{ expressions::Column, expressions::PhysicalSortExpr, hash_utils::create_hashes, - joins::hash_join_utils::{JoinHashMap, JoinHashMapType, PruningJoinHashMap}, + joins::hash_join_utils::{JoinHashMap, JoinHashMapType}, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, combine_join_equivalence_properties, estimate_join_statistics, @@ -620,11 +620,8 @@ where // calculate the hash values let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; - if let Some(hash_map) = hash_map.as_any_mut().downcast_mut::() { - hash_map - .next - .resize(hash_map.next.len() + batch.num_rows(), 0); - } + // For usual JoinHashmap, the implementation is void. + hash_map.extend_with_value(batch.num_rows(), 0); // insert hashes to key of the hashmap let (mut_map, mut_list) = hash_map.get_mut(); diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index f08363068720..1422c97bd200 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -18,7 +18,6 @@ //! This file contains common subroutines for regular and symmetric hash join //! related functionality, used both in join calculations and optimization rules. -use std::any::Any; use std::collections::{HashMap, VecDeque}; use std::fmt::{Debug, Formatter}; use std::ops::IndexMut; @@ -41,6 +40,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; use hashbrown::HashSet; +use itertools::repeat_n; use parking_lot::Mutex; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. @@ -117,7 +117,7 @@ pub trait JoinHashMapType { /// The type of list used to store the hash values. type NextType: IndexMut; /// Returns a mutable reference to `self` as a `dyn Any` for dynamic downcasting. - fn as_any_mut(&mut self) -> &mut dyn Any; + fn extend_with_value(&mut self, len: usize, value: u64); /// Returns mutable references to the hash map and the next. fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); /// Returns a reference to the hash map. @@ -130,9 +130,8 @@ pub trait JoinHashMapType { impl JoinHashMapType for JoinHashMap { type NextType = Vec; - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } + // Void implementation + fn extend_with_value(&mut self, _: usize, _: u64) {} /// Get mutable references to the hash map and the next. fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { @@ -154,8 +153,9 @@ impl JoinHashMapType for JoinHashMap { impl JoinHashMapType for PruningJoinHashMap { type NextType = VecDeque; - fn as_any_mut(&mut self) -> &mut dyn Any { - self + // Extend with value + fn extend_with_value(&mut self, len: usize, value: u64) { + self.next.extend(repeat_n(value, len)) } /// Get mutable references to the hash map and the next. From 6401b1bddf322b660a29ab27cf4cb32009253274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Metehan=20Y=C4=B1ld=C4=B1r=C4=B1m?= <100111937+metesynnada@users.noreply.github.com> Date: Wed, 23 Aug 2023 17:09:57 +0300 Subject: [PATCH 09/11] Update datafusion/core/src/physical_plan/joins/hash_join_utils.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Daniƫl Heres --- datafusion/core/src/physical_plan/joins/hash_join_utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 1422c97bd200..fb9e5a95f1d0 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -154,8 +154,8 @@ impl JoinHashMapType for PruningJoinHashMap { type NextType = VecDeque; // Extend with value - fn extend_with_value(&mut self, len: usize, value: u64) { - self.next.extend(repeat_n(value, len)) + fn extend_zero(&mut self, len: usize) { + self.next.resize(self.next.len() + len, 0) } /// Get mutable references to the hash map and the next. From 681fade9c61dd4de796af8d11317e6db26eccac2 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Wed, 23 Aug 2023 17:38:13 +0300 Subject: [PATCH 10/11] Code corrections --- datafusion/core/src/physical_plan/joins/hash_join.rs | 2 +- .../core/src/physical_plan/joins/hash_join_utils.rs | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 92997d837175..9351a04b07c6 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -621,7 +621,7 @@ where let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // For usual JoinHashmap, the implementation is void. - hash_map.extend_with_value(batch.num_rows(), 0); + hash_map.extend_zero(batch.num_rows()); // insert hashes to key of the hashmap let (mut_map, mut_list) = hash_map.get_mut(); diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index fb9e5a95f1d0..b80413b53d89 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -40,7 +40,6 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; use hashbrown::HashSet; -use itertools::repeat_n; use parking_lot::Mutex; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. @@ -116,8 +115,8 @@ impl JoinHashMap { pub trait JoinHashMapType { /// The type of list used to store the hash values. type NextType: IndexMut; - /// Returns a mutable reference to `self` as a `dyn Any` for dynamic downcasting. - fn extend_with_value(&mut self, len: usize, value: u64); + /// Extend with zero + fn extend_zero(&mut self, len: usize); /// Returns mutable references to the hash map and the next. fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType); /// Returns a reference to the hash map. @@ -131,7 +130,7 @@ impl JoinHashMapType for JoinHashMap { type NextType = Vec; // Void implementation - fn extend_with_value(&mut self, _: usize, _: u64) {} + fn extend_zero(&mut self, _: usize) {} /// Get mutable references to the hash map and the next. fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) { @@ -153,7 +152,7 @@ impl JoinHashMapType for JoinHashMap { impl JoinHashMapType for PruningJoinHashMap { type NextType = VecDeque; - // Extend with value + // Extend with zero fn extend_zero(&mut self, len: usize) { self.next.resize(self.next.len() + len, 0) } From e7943356dffef596ebc0ef48f17ec6e435cc7cca Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Wed, 23 Aug 2023 14:52:01 -0400 Subject: [PATCH 11/11] Remove unused imports --- datafusion/core/src/physical_plan/joins/hash_join.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 040a6c2e7207..f7d257e324ea 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -64,7 +64,6 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use arrow_array::cast::downcast_array; use arrow_schema::ArrowError; -use datafusion_common::cast::{as_dictionary_array, as_string_array}; use datafusion_common::{ exec_err, internal_err, plan_err, DataFusionError, JoinType, Result, };