From a25065c9bedf21f4b82736c3292e3c64967e14a1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 5 Aug 2021 19:24:25 +0200 Subject: [PATCH 1/4] Use rawtable API --- datafusion/src/physical_plan/hash_join.rs | 103 +++++++--------------- 1 file changed, 33 insertions(+), 70 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 1a174bb11d10..45cfaa6af4ed 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -29,8 +29,8 @@ use arrow::{ datatypes::{UInt32Type, UInt64Type}, }; use smallvec::{smallvec, SmallVec}; +use std::sync::Arc; use std::{any::Any, usize}; -use std::{hash::Hasher, sync::Arc}; use std::{time::Instant, vec}; use async_trait::async_trait; @@ -49,6 +49,8 @@ use arrow::array::{ UInt64Array, UInt8Array, }; +use hashbrown::raw::RawTable; + use super::expressions::Column; use super::hash_utils::create_hashes; use super::{ @@ -65,6 +67,7 @@ use super::{ use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::{PhysicalExpr, SQLMetric}; use log::debug; +use std::fmt; // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value. // @@ -78,7 +81,14 @@ use log::debug; // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collission check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>; +struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>); + +impl fmt::Debug for JoinHashMap { + fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { + Ok(()) + } +} + type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>; /// join execution plan executes partitions in parallel and combines them into a set of @@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let mut hashmap = JoinHashMap::with_capacity_and_hasher( - num_rows, - IdHashBuilder {}, - ); + let mut hashmap = + JoinHashMap(RawTable::with_capacity(num_rows)); let mut hashes_buffer = Vec::new(); let mut offset = 0; for batch in batches.iter() { @@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec { Ok(acc) }) .await?; - let mut hashmap = - JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {}); + let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); let mut hashes_buffer = Vec::new(); let mut offset = 0; for batch in batches.iter() { @@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec { fn update_hash( on: &[Column], batch: &RecordBatch, - hash: &mut JoinHashMap, + hash_map: &mut JoinHashMap, offset: usize, random_state: &RandomState, hashes_buffer: &mut Vec, @@ -476,18 +483,16 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - match hash.raw_entry_mut().from_hash(*hash_value, |_| true) { - hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => { - entry.get_mut().push((row + offset) as u64); - } - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck( - *hash_value, - (), - smallvec![(row + offset) as u64], - ); - } - }; + let item = hash_map.0.get_mut(*hash_value, |(h, _)| *h == *hash_value); + if let Some((_, matches)) = item { + matches.push((row + offset) as u64); + } else { + hash_map.0.insert( + *hash_value, + (*hash_value, smallvec![(row + offset) as u64]), + |(h, _)| *h, + ); + } } Ok(()) } @@ -678,7 +683,7 @@ fn build_join_indexes( // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not if let Some((_, indices)) = - left.raw_entry().from_hash(*hash_value, |_| true) + left.0.get(*hash_value, |(h, _)| *h == *hash_value) { for &i in indices { // Check hash collisions @@ -710,7 +715,7 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { if let Some((_, indices)) = - left.raw_entry().from_hash(*hash_value, |_| true) + left.0.get(*hash_value, |(h, _)| *h == *hash_value) { for &i in indices { // Collision check @@ -728,7 +733,7 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.raw_entry().from_hash(*hash_value, |_| true) { + match left.0.get(*hash_value, |(h, _)| *h == *hash_value) { Some((_, indices)) => { for &i in indices { if equal_rows( @@ -755,38 +760,6 @@ fn build_join_indexes( } } } -use core::hash::BuildHasher; - -/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing -/// it when inserting/indexing or regrowing the `HashMap` -struct IdHasher { - hash: u64, -} - -impl Hasher for IdHasher { - fn finish(&self) -> u64 { - self.hash - } - - fn write_u64(&mut self, i: u64) { - self.hash = i; - } - - fn write(&mut self, _bytes: &[u8]) { - unreachable!("IdHasher should only be used for u64 keys") - } -} - -#[derive(Debug)] -struct IdHashBuilder {} - -impl BuildHasher for IdHashBuilder { - type Hasher = IdHasher; - - fn build_hasher(&self) -> Self::Hasher { - IdHasher { hash: 0 } - } -} macro_rules! equal_rows_elem { ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{ @@ -1776,7 +1749,7 @@ mod tests { #[test] fn join_with_hash_collision() -> Result<()> { - let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {}); + let mut hashmap_left = RawTable::with_capacity(2); let left = build_table_i32( ("a", &vec![10, 20]), ("x", &vec![100, 200]), @@ -1789,18 +1762,8 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions - match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) { - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1]) - } - _ => unreachable!("Hash should not be vacant"), - }; - match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) { - hashbrown::hash_map::RawEntryMut::Vacant(entry) => { - entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1]) - } - _ => unreachable!("Hash should not be vacant"), - }; + hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]), @@ -1808,7 +1771,7 @@ mod tests { ("c", &vec![30, 40]), ); - let left_data = JoinLeftData::new((hashmap_left, left)); + let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left)); let (l, r) = build_join_indexes( &left_data, &right, From ff7dd76f5e15e10b1b4bf63a13a58f4068489a41 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Thu, 5 Aug 2021 19:44:21 +0200 Subject: [PATCH 2/4] Avoid changes --- datafusion/src/physical_plan/hash_join.rs | 32 +++++++++-------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 45cfaa6af4ed..d94b5d5d1aea 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -81,7 +81,7 @@ use std::fmt; // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collission check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>); +struct JoinHashMap(RawTable>); impl fmt::Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -483,15 +483,13 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map.0.get_mut(*hash_value, |(h, _)| *h == *hash_value); - if let Some((_, matches)) = item { - matches.push((row + offset) as u64); + let item = hash_map.0.get_mut(*hash_value, |_| true); + if let Some(indices) = item { + indices.push((row + offset) as u64); } else { - hash_map.0.insert( - *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), - |(h, _)| *h, - ); + hash_map + .0 + .insert(*hash_value, smallvec![(row + offset) as u64], |_| 0); } } Ok(()) @@ -682,9 +680,7 @@ fn build_join_indexes( // For every item on the left and right we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = - left.0.get(*hash_value, |(h, _)| *h == *hash_value) - { + if let Some(indices) = left.0.get(*hash_value, |_| true) { for &i in indices { // Check hash collisions if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -714,9 +710,7 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { - if let Some((_, indices)) = - left.0.get(*hash_value, |(h, _)| *h == *hash_value) - { + if let Some(indices) = left.0.get(*hash_value, |_| true) { for &i in indices { // Collision check if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -733,8 +727,8 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.0.get(*hash_value, |(h, _)| *h == *hash_value) { - Some((_, indices)) => { + match left.0.get(*hash_value, |_| true) { + Some(indices) => { for &i in indices { if equal_rows( i as usize, @@ -1762,8 +1756,8 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions - hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[0], smallvec![0, 1], |_| 0); + hashmap_left.insert(hashes[1], smallvec![0, 1], |_| 0); let right = build_table_i32( ("a", &vec![10, 20]), From 086b47fb58a9664653cf52260d9c59faf9a93bd1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 6 Aug 2021 07:08:47 +0200 Subject: [PATCH 3/4] Check on hash again --- datafusion/src/physical_plan/hash_join.rs | 34 ++++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index d94b5d5d1aea..50ccb76c7742 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -81,7 +81,7 @@ use std::fmt; // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collission check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -struct JoinHashMap(RawTable>); +struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>); impl fmt::Debug for JoinHashMap { fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -483,13 +483,17 @@ fn update_hash( // insert hashes to key of the hashmap for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map.0.get_mut(*hash_value, |_| true); - if let Some(indices) = item { + let item = hash_map + .0 + .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); + if let Some((_, indices)) = item { indices.push((row + offset) as u64); } else { - hash_map - .0 - .insert(*hash_value, smallvec![(row + offset) as u64], |_| 0); + hash_map.0.insert( + *hash_value, + (*hash_value, smallvec![(row + offset) as u64]), + |(hash, _)| *hash, + ); } } Ok(()) @@ -680,7 +684,9 @@ fn build_join_indexes( // For every item on the left and right we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some(indices) = left.0.get(*hash_value, |_| true) { + if let Some((_, indices)) = + left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) + { for &i in indices { // Check hash collisions if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -710,7 +716,9 @@ fn build_join_indexes( // First visit all of the rows for (row, hash_value) in hash_values.iter().enumerate() { - if let Some(indices) = left.0.get(*hash_value, |_| true) { + if let Some((_, indices)) = + left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) + { for &i in indices { // Collision check if equal_rows(i as usize, row, &left_join_values, &keys_values)? { @@ -727,8 +735,8 @@ fn build_join_indexes( let mut right_indices = UInt32Builder::new(0); for (row, hash_value) in hash_values.iter().enumerate() { - match left.0.get(*hash_value, |_| true) { - Some(indices) => { + match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) { + Some((_, indices)) => { for &i in indices { if equal_rows( i as usize, @@ -1755,9 +1763,9 @@ mod tests { let hashes = create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; - // Create hash collisions - hashmap_left.insert(hashes[0], smallvec![0, 1], |_| 0); - hashmap_left.insert(hashes[1], smallvec![0, 1], |_| 0); + // Create hash collisions (same hashes) + hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[0], smallvec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]), From 7f233d3eddd56c7a1e80eb671d7d483527ca98c1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 6 Aug 2021 14:20:29 +0200 Subject: [PATCH 4/4] Test fix --- datafusion/src/physical_plan/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 50ccb76c7742..1a57c404e96e 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1765,7 +1765,7 @@ mod tests { // Create hash collisions (same hashes) hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[0], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); let right = build_table_i32( ("a", &vec![10, 20]),