diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 0e62540d6d55..9d016a60f45f 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -30,7 +30,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::{ array::{ - ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, + ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, DictionaryArray, FixedSizeBinaryArray, LargeStringArray, PrimitiveArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, @@ -43,9 +43,8 @@ use arrow::{ util::bit_util, }; use futures::{ready, Stream, StreamExt, TryStreamExt}; -use hashbrown::raw::RawTable; -use smallvec::smallvec; use std::fmt; +use std::mem::size_of; use std::sync::Arc; use std::task::Poll; use std::{any::Any, usize, vec}; @@ -510,15 +509,16 @@ async fn collect_left_input( ) })? / 7) .next_power_of_two(); - // 32 bytes per `(u64, SmallVec<[u64; 1]>)` + // 16 bytes per `(u64, u64)` // + 1 byte for each bucket - // + 16 bytes fixed - let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16; + // + fixed size of JoinHashMap (RawTable + Vec) + let estimated_hastable_size = + 16 * estimated_buckets + estimated_buckets + size_of::(); reservation.try_grow(estimated_hastable_size)?; metrics.build_mem_used.add(estimated_hastable_size); - let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); + let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; for batch in batches.iter() { @@ -563,16 +563,24 @@ pub fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { let item = hash_map - .0 + .map .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, indices)) = item { - indices.push((row + offset) as u64); + if let Some((_, index)) = item { + // Already exists: add index to next array + let prev_index = *index; + // Store new value inside hashmap + *index = (row + offset + 1) as u64; + // Update chained Vec at row + offset with previous value + hash_map.next[row + offset] = prev_index; } else { - hash_map.0.insert( + hash_map.map.insert( *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), + // store the value + 1 as 0 value reserved for end of list + (*hash_value, (row + offset + 1) as u64), |(hash, _)| *hash, ); + // chained list at (row + offset) is already initialized with 0 + // meaning end of list } } Ok(()) @@ -629,7 +637,6 @@ pub fn build_join_indices( random_state: &RandomState, null_equals_null: bool, hashes_buffer: &mut Vec, - offset: Option, build_side: JoinSide, ) -> Result<(UInt64Array, UInt32Array)> { // Get the indices that satisfy the equality condition, like `left.a1 = right.a2` @@ -642,7 +649,6 @@ pub fn build_join_indices( random_state, null_equals_null, hashes_buffer, - offset, )?; if let Some(filter) = filter { // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` @@ -700,7 +706,6 @@ pub fn build_equal_condition_join_indices( random_state: &RandomState, null_equals_null: bool, hashes_buffer: &mut Vec, - offset: Option, ) -> Result<(UInt64Array, UInt32Array)> { let keys_values = probe_on .iter() @@ -719,7 +724,6 @@ pub fn build_equal_condition_join_indices( // Using a buffer builder to avoid slower normal builder let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); - let offset_value = offset.unwrap_or(0); // Visit all of the probe rows for (row, hash_value) in hash_values.iter().enumerate() { // Get the hash and find it in the build index @@ -727,39 +731,37 @@ pub fn build_equal_condition_join_indices( // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap - .0 + if let Some((_, index)) = build_hashmap + .map .get(*hash_value, |(hash, _)| *hash_value == *hash) { - for &i in indices { - // Check hash collisions - let offset_build_index = i as usize - offset_value; + let mut i = *index - 1; + loop { // Check hash collisions if equal_rows( - offset_build_index, + i as usize, row, &build_join_values, &keys_values, null_equals_null, )? { - build_indices.append(offset_build_index as u64); + build_indices.append(i); probe_indices.append(row as u32); } + // Follow the chain to get the next index value + let next = build_hashmap.next[i as usize]; + if next == 0 { + // end of list + break; + } + i = next - 1; } } } - let build = ArrayData::builder(DataType::UInt64) - .len(build_indices.len()) - .add_buffer(build_indices.finish()) - .build()?; - let probe = ArrayData::builder(DataType::UInt32) - .len(probe_indices.len()) - .add_buffer(probe_indices.finish()) - .build()?; Ok(( - PrimitiveArray::::from(build), - PrimitiveArray::::from(probe), + PrimitiveArray::new(build_indices.finish().into(), None), + PrimitiveArray::new(probe_indices.finish().into(), None), )) } @@ -830,7 +832,7 @@ macro_rules! equal_rows_elem_with_string_dict { /// Left and right row have equal values /// If more data types are supported here, please also add the data types in can_hash function /// to generate hash join logical plan. -fn equal_rows( +pub fn equal_rows( left: usize, right: usize, left_arrays: &[ArrayRef], @@ -1157,7 +1159,6 @@ impl HashJoinStream { &self.random_state, self.null_equals_null, &mut hashes_buffer, - None, JoinSide::Left, ); @@ -1258,11 +1259,11 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use smallvec::smallvec; use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::Literal; + use hashbrown::raw::RawTable; use crate::execution::context::SessionConfig; use crate::physical_expr::expressions::BinaryExpr; @@ -2616,8 +2617,10 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h); + + let next = vec![2, 0]; let right = build_table_i32( ("a", &vec![10, 20]), @@ -2625,7 +2628,13 @@ mod tests { ("c", &vec![30, 40]), ); - let left_data = (JoinHashMap(hashmap_left), left); + let left_data = ( + JoinHashMap { + map: hashmap_left, + next, + }, + left, + ); let (l, r) = build_equal_condition_join_indices( &left_data.0, &left_data.1, @@ -2635,7 +2644,6 @@ mod tests { &random_state, false, &mut vec![0; right.num_rows()], - None, )?; let mut left_ids = UInt64Builder::with_capacity(0); diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 992de86dfe17..1b9cbd543d73 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -36,24 +36,88 @@ use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use datafusion_common::Result; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. -// -// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used -// to put the indices in a certain bucket. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, // we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 // As the key is a hash value, we need to check possible hash collisions in the probe stage // During this stage it might be the case that a row is contained the same hashmap value, // but the values don't match. Those are checked in the [equal_rows] macro -// TODO: speed up collision check and move away from using a hashbrown HashMap +// The indices (values) are stored in a separate chained list stored in the `Vec`. +// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. +// The chain can be followed until the value "0" has been reached, meaning the end of the list. +// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) +// See the example below: +// Insert (1,1) +// map: +// --------- +// | 1 | 2 | +// --------- +// next: +// --------------------- +// | 0 | 0 | 0 | 0 | 0 | +// --------------------- +// Insert (2,2) +// map: +// --------- +// | 1 | 2 | +// | 2 | 3 | +// --------- +// next: +// --------------------- +// | 0 | 0 | 0 | 0 | 0 | +// --------------------- +// Insert (1,3) +// map: +// --------- +// | 1 | 4 | +// | 2 | 3 | +// --------- +// next: +// --------------------- +// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1) +// --------------------- +// Insert (1,4) +// map: +// --------- +// | 1 | 5 | +// | 2 | 3 | +// --------- +// next: +// --------------------- +// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) +// --------------------- + +// TODO: speed up collision checks // https://github.com/apache/arrow-datafusion/issues/50 -pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); +pub struct JoinHashMap { + // Stores hash value to first index + pub map: RawTable<(u64, u64)>, + // Stores indices in chained list data structure + pub next: Vec, +} + +/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate +/// and shrink the indices. +pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); impl JoinHashMap { + pub(crate) fn with_capacity(capacity: usize) -> Self { + JoinHashMap { + map: RawTable::with_capacity(capacity), + next: vec![0; capacity], + } + } +} + +impl SymmetricJoinHashMap { + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self(RawTable::with_capacity(capacity)) + } + /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is. /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25% /// when necessary. You can adjust the scale_factor value to achieve the desired - /// ,balance between memory usage and performance. + /// balance between memory usage and performance. // // If you increase the scale_factor, the capacity will shrink less aggressively, // leading to potentially higher memory usage but fewer resizes. @@ -628,7 +692,7 @@ pub mod tests { #[test] fn test_shrink_if_necessary() { let scale_factor = 4; - let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100)); + let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100); let data_size = 2000; let deleted_part = 3 * data_size / 4; // Add elements to the JoinHashMap diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 7eac619687b2..f2b750c0b692 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -41,10 +41,15 @@ use arrow::array::{ use arrow::compute::concat_batches; use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder}; +use arrow_array::{UInt32Array, UInt64Array}; +use datafusion_physical_expr::hash_utils::create_hashes; +use datafusion_physical_expr::PhysicalExpr; use futures::stream::{select, BoxStream}; use futures::{Stream, StreamExt}; -use hashbrown::{raw::RawTable, HashSet}; +use hashbrown::HashSet; use parking_lot::Mutex; +use smallvec::smallvec; use datafusion_common::{utils::bisect, ScalarValue}; use datafusion_execution::memory_pool::MemoryConsumer; @@ -52,12 +57,10 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB use crate::physical_plan::common::SharedMemoryReservation; use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema; -use crate::physical_plan::joins::hash_join_utils::JoinHashMap; use crate::physical_plan::{ expressions::Column, expressions::PhysicalSortExpr, joins::{ - hash_join::{build_join_indices, update_hash}, hash_join_utils::{build_filter_input_order, SortedFilterExpr}, utils::{ build_batch_from_indices, build_join_schema, check_join_is_valid, @@ -73,6 +76,10 @@ use datafusion_common::JoinType; use datafusion_common::{DataFusionError, Result}; use datafusion_execution::TaskContext; +use super::hash_join::equal_rows; +use super::hash_join_utils::SymmetricJoinHashMap; +use super::utils::apply_join_filter_to_indices; + const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4; /// A symmetric hash join with range conditions is when both streams are hashed on the @@ -681,7 +688,7 @@ impl Stream for SymmetricHashJoinStream { fn prune_hash_values( prune_length: usize, - hashmap: &mut JoinHashMap, + hashmap: &mut SymmetricJoinHashMap, row_hash_values: &mut VecDeque, offset: u64, ) -> Result<()> { @@ -1043,7 +1050,7 @@ struct OneSideHashJoiner { /// Columns from the side on: Vec, /// Hashmap - hashmap: JoinHashMap, + hashmap: SymmetricJoinHashMap, /// To optimize hash deleting in case of pruning, we hold them in memory row_hash_values: VecDeque, /// Reuse the hashes buffer @@ -1076,7 +1083,7 @@ impl OneSideHashJoiner { build_side, input_buffer: RecordBatch::new_empty(schema), on, - hashmap: JoinHashMap(RawTable::with_capacity(0)), + hashmap: SymmetricJoinHashMap::with_capacity(0), row_hash_values: VecDeque::new(), hashes_buffer: vec![], visited_rows: HashSet::new(), @@ -1085,6 +1092,39 @@ impl OneSideHashJoiner { } } + pub fn update_hash( + on: &[Column], + batch: &RecordBatch, + hash_map: &mut SymmetricJoinHashMap, + offset: usize, + random_state: &RandomState, + hashes_buffer: &mut Vec, + ) -> Result<()> { + // evaluate the keys + let keys_values = on + .iter() + .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows()))) + .collect::>>()?; + // calculate the hash values + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + // insert hashes to key of the hashmap + for (row, hash_value) in hash_values.iter().enumerate() { + let item = hash_map + .0 + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + if let Some((_, indices)) = item { + indices.push((row + offset) as u64); + } else { + hash_map.0.insert( + *hash_value, + (*hash_value, smallvec![(row + offset) as u64]), + |(hash, _)| *hash, + ); + } + } + Ok(()) + } + /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch. /// /// # Arguments @@ -1106,7 +1146,7 @@ impl OneSideHashJoiner { self.hashes_buffer.resize(batch.num_rows(), 0); // Get allocation_info before adding the item // Update the hashmap with the join key values and hashes of the incoming batch: - update_hash( + Self::update_hash( &self.on, batch, &mut self.hashmap, @@ -1119,6 +1159,144 @@ impl OneSideHashJoiner { Ok(()) } + /// Gets build and probe indices which satisfy the on condition (including + /// the equality condition and the join filter) in the join. + #[allow(clippy::too_many_arguments)] + pub fn build_join_indices( + probe_batch: &RecordBatch, + build_hashmap: &SymmetricJoinHashMap, + build_input_buffer: &RecordBatch, + on_build: &[Column], + on_probe: &[Column], + filter: Option<&JoinFilter>, + random_state: &RandomState, + null_equals_null: bool, + hashes_buffer: &mut Vec, + offset: Option, + build_side: JoinSide, + ) -> Result<(UInt64Array, UInt32Array)> { + // Get the indices that satisfy the equality condition, like `left.a1 = right.a2` + let (build_indices, probe_indices) = Self::build_equal_condition_join_indices( + build_hashmap, + build_input_buffer, + probe_batch, + on_build, + on_probe, + random_state, + null_equals_null, + hashes_buffer, + offset, + )?; + if let Some(filter) = filter { + // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10` + apply_join_filter_to_indices( + build_input_buffer, + probe_batch, + build_indices, + probe_indices, + filter, + build_side, + ) + } else { + Ok((build_indices, probe_indices)) + } + } + + // Returns build/probe indices satisfying the equality condition. + // On LEFT.b1 = RIGHT.b2 + // LEFT Table: + // a1 b1 c1 + // 1 1 10 + // 3 3 30 + // 5 5 50 + // 7 7 70 + // 9 8 90 + // 11 8 110 + // 13 10 130 + // RIGHT Table: + // a2 b2 c2 + // 2 2 20 + // 4 4 40 + // 6 6 60 + // 8 8 80 + // 10 10 100 + // 12 10 120 + // The result is + // "+----+----+-----+----+----+-----+", + // "| a1 | b1 | c1 | a2 | b2 | c2 |", + // "+----+----+-----+----+----+-----+", + // "| 11 | 8 | 110 | 8 | 8 | 80 |", + // "| 13 | 10 | 130 | 10 | 10 | 100 |", + // "| 13 | 10 | 130 | 12 | 10 | 120 |", + // "| 9 | 8 | 90 | 8 | 8 | 80 |", + // "+----+----+-----+----+----+-----+" + // And the result of build and probe indices are: + // Build indices: 5, 6, 6, 4 + // Probe indices: 3, 4, 5, 3 + #[allow(clippy::too_many_arguments)] + pub fn build_equal_condition_join_indices( + build_hashmap: &SymmetricJoinHashMap, + build_input_buffer: &RecordBatch, + probe_batch: &RecordBatch, + build_on: &[Column], + probe_on: &[Column], + random_state: &RandomState, + null_equals_null: bool, + hashes_buffer: &mut Vec, + offset: Option, + ) -> Result<(UInt64Array, UInt32Array)> { + let keys_values = probe_on + .iter() + .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))) + .collect::>>()?; + let build_join_values = build_on + .iter() + .map(|c| { + Ok(c.evaluate(build_input_buffer)? + .into_array(build_input_buffer.num_rows())) + }) + .collect::>>()?; + hashes_buffer.clear(); + hashes_buffer.resize(probe_batch.num_rows(), 0); + let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + // Using a buffer builder to avoid slower normal builder + let mut build_indices = UInt64BufferBuilder::new(0); + let mut probe_indices = UInt32BufferBuilder::new(0); + let offset_value = offset.unwrap_or(0); + // Visit all of the probe rows + for (row, hash_value) in hash_values.iter().enumerate() { + // Get the hash and find it in the build index + // For every item on the build and probe we check if it matches + // This possibly contains rows with hash collisions, + // So we have to check here whether rows are equal or not + if let Some((_, indices)) = build_hashmap + .0 + .get(*hash_value, |(hash, _)| *hash_value == *hash) + { + for &i in indices { + // Check hash collisions + let offset_build_index = i as usize - offset_value; + // Check hash collisions + if equal_rows( + offset_build_index, + row, + &build_join_values, + &keys_values, + null_equals_null, + )? { + build_indices.append(offset_build_index as u64); + probe_indices.append(row as u32); + } + } + } + } + + Ok(( + PrimitiveArray::new(build_indices.finish().into(), None), + PrimitiveArray::new(probe_indices.finish().into(), None), + )) + } + /// This method performs a join between the build side input buffer and the probe side batch. /// /// # Arguments @@ -1155,7 +1333,7 @@ impl OneSideHashJoiner { if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 { return Ok(None); } - let (build_indices, probe_indices) = build_join_indices( + let (build_indices, probe_indices) = Self::build_join_indices( probe_batch, &self.hashmap, &self.input_buffer,