From 50dc8b0e568789d4d1f1c5db269a30a5420e3173 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 21 Nov 2025 15:06:17 +0800 Subject: [PATCH 1/4] init --- Cargo.lock | 1 - src/query/catalog/Cargo.toml | 1 - src/query/catalog/src/sbbf.rs | 52 +++---------------- .../hash_join/runtime_filter/convert.rs | 8 +-- .../fuse/src/pruning/expr_bloom_filter.rs | 18 +++---- 5 files changed, 21 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba85649b82c7a..eb9f95138b493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3191,7 +3191,6 @@ dependencies = [ "serde_json", "sha2", "thrift", - "twox-hash 1.6.3", "typetag", ] diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index dd8ab00ed82b4..26c017b292494 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -39,7 +39,6 @@ serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } thrift = { workspace = true } -twox-hash = { workspace = true } typetag = { workspace = true } [dev-dependencies] diff --git a/src/query/catalog/src/sbbf.rs b/src/query/catalog/src/sbbf.rs index 5b0d050c3b355..e4d53278c85a6 100644 --- a/src/query/catalog/src/sbbf.rs +++ b/src/query/catalog/src/sbbf.rs @@ -73,10 +73,6 @@ //! [sbbf-paper]: https://arxiv.org/pdf/2101.01719 //! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf -use std::hash::Hasher; - -use twox_hash::XxHash64; - /// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach). const SALT: [u32; 8] = [ 0x47b6137b_u32, @@ -223,8 +219,8 @@ impl Sbbf { (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize } - /// Insert a hash into the filter - fn insert_hash(&mut self, hash: u64) { + /// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash. + pub fn insert_hash(&mut self, hash: u64) { let block_index = self.hash_to_block_index(hash); self.0[block_index].insert(hash as u32) } @@ -232,23 +228,11 @@ impl Sbbf { /// Check if a hash is in the filter. May return /// true for values that was never inserted ("false positive") /// but will always return false if a hash has not been inserted. - fn check_hash(&self, hash: u64) -> bool { + pub fn check_hash(&self, hash: u64) -> bool { let block_index = self.hash_to_block_index(hash); self.0[block_index].check(hash as u32) } - /// Insert a digest (u64 hash value) into the filter - pub fn insert_digest(&mut self, digest: u64) { - let hash = hash_u64(digest); - self.insert_hash(hash) - } - - /// Check if a digest is probably present or definitely absent in the filter - pub fn check_digest(&self, digest: u64) -> bool { - let hash = hash_u64(digest); - self.check_hash(hash) - } - /// Merge another bloom filter into this one (bitwise OR operation) /// Panics if the filters have different sizes pub fn union(&mut self, other: &Self) { @@ -271,30 +255,10 @@ impl Sbbf { } } -/// Per spec we use xxHash with seed=0 -const SEED: u64 = 0; - -/// Hash a u64 value using XxHash64 -#[inline] -fn hash_u64(value: u64) -> u64 { - let mut hasher = XxHash64::with_seed(SEED); - hasher.write_u64(value); - hasher.finish() -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn test_hash_u64() { - let digest1 = 12345u64; - let digest2 = 67890u64; - assert_ne!(hash_u64(digest1), digest1); - assert_ne!(hash_u64(digest2), digest2); - assert_ne!(hash_u64(digest1), hash_u64(digest2)); - } - #[test] fn test_mask_set_quick_check() { for i in 0..1_000_000 { @@ -316,8 +280,8 @@ mod tests { fn test_sbbf_insert_and_check() { let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]); for i in 0..1_000_000 { - sbbf.insert_digest(i); - assert!(sbbf.check_digest(i)); + sbbf.insert_hash(i); + assert!(sbbf.check_hash(i)); } } @@ -325,18 +289,18 @@ mod tests { fn test_sbbf_union() { let mut filter1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap(); for i in 0..50 { - filter1.insert_digest(i); + filter1.insert_hash(i); } let mut filter2 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap(); for i in 50..100 { - filter2.insert_digest(i); + filter2.insert_hash(i); } filter1.union(&filter2); for i in 0..100 { - assert!(filter1.check_digest(i)); + assert!(filter1.check_hash(i)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index f58e01bcfaa83..f39386b88aabe 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -256,8 +256,8 @@ async fn build_bloom_filter( if total_items < 50000 { let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - for digest in bloom { - filter.insert_digest(digest); + for hash in bloom { + filter.insert_hash(hash); } return Ok(RuntimeFilterBloom { column_name, @@ -279,8 +279,8 @@ async fn build_bloom_filter( let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - for digest in chunk { - filter.insert_digest(digest); + for hash in chunk { + filter.insert_hash(hash); } Ok::(filter) }) diff --git a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs index c3648b501fc00..177c5f307cb3a 100644 --- a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs +++ b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs @@ -49,7 +49,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Binary(col)) => col.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -64,14 +64,14 @@ impl<'a> ExprBloomFilter<'a> { | KeysState::Column(Column::Variant(col)) | KeysState::Column(Column::Bitmap(col)) => col.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; }), KeysState::Column(Column::String(col)) => col.iter().for_each(|key| { let hash = key.as_bytes().fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -85,7 +85,7 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Number(NumberColumn::UInt8(c))) => { c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -100,7 +100,7 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Number(NumberColumn::UInt16(c))) => { c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -115,7 +115,7 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Number(NumberColumn::UInt32(c))) => { c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -130,7 +130,7 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Number(NumberColumn::UInt64(c))) => { c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -144,7 +144,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { U128(c) => c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -157,7 +157,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { U256(c) => c.iter().for_each(|key| { let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; From 174a6f1e8a70aa0f824a88720a895e7dabf77d46 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 25 Nov 2025 15:16:21 +0800 Subject: [PATCH 2/4] fix hash --- src/common/hashtable/src/lib.rs | 1 + src/common/hashtable/src/traits.rs | 129 ++++++++++++++++++ .../processors/transforms/hash_join/util.rs | 18 +-- .../fuse/src/pruning/expr_bloom_filter.rs | 20 +-- 4 files changed, 149 insertions(+), 19 deletions(-) diff --git a/src/common/hashtable/src/lib.rs b/src/common/hashtable/src/lib.rs index e975ea3d70d07..d35597478752c 100644 --- a/src/common/hashtable/src/lib.rs +++ b/src/common/hashtable/src/lib.rs @@ -44,6 +44,7 @@ mod utils; pub use table0::Entry as HashtableEntry; pub use traits::hash_join_fast_string_hash; +pub use traits::BloomHash; pub use traits::EntryMutRefLike as HashtableEntryMutRefLike; pub use traits::EntryRefLike as HashtableEntryRefLike; pub use traits::FastHash; diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 6be457a2fa37c..706e5f976c964 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -176,6 +176,10 @@ pub trait FastHash { fn fast_hash(&self) -> u64; } +pub trait BloomHash { + fn bloom_hash(&self) -> u64; +} + macro_rules! impl_fast_hash_for_primitive_types { ($t: ty) => { impl FastHash for $t { @@ -208,6 +212,32 @@ impl_fast_hash_for_primitive_types!(i32); impl_fast_hash_for_primitive_types!(u64); impl_fast_hash_for_primitive_types!(i64); +macro_rules! impl_bloom_hash_for_primitive_types { + ($t: ty) => { + impl BloomHash for $t { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + let mut hasher = *self as u64; + hasher ^= hasher >> 33; + hasher = hasher.wrapping_mul(0xff51afd7ed558ccd_u64); + hasher ^= hasher >> 33; + hasher = hasher.wrapping_mul(0xc4ceb9fe1a85ec53_u64); + hasher ^= hasher >> 33; + hasher + } + } + }; +} + +impl_bloom_hash_for_primitive_types!(u8); +impl_bloom_hash_for_primitive_types!(i8); +impl_bloom_hash_for_primitive_types!(u16); +impl_bloom_hash_for_primitive_types!(i16); +impl_bloom_hash_for_primitive_types!(u32); +impl_bloom_hash_for_primitive_types!(i32); +impl_bloom_hash_for_primitive_types!(u64); +impl_bloom_hash_for_primitive_types!(i64); + impl FastHash for u128 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -229,6 +259,19 @@ impl FastHash for u128 { } } +impl BloomHash for u128 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + use std::hash::BuildHasher; + use std::hash::Hasher; + + let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); + let mut hasher = state.build_hasher(); + hasher.write_u128(*self); + hasher.finish() + } +} + impl FastHash for i128 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -236,6 +279,13 @@ impl FastHash for i128 { } } +impl BloomHash for i128 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + (*self as u128).bloom_hash() + } +} + impl FastHash for i256 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -263,6 +313,21 @@ impl FastHash for i256 { } } +impl BloomHash for i256 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + use std::hash::BuildHasher; + use std::hash::Hasher; + + let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); + let mut hasher = state.build_hasher(); + for x in self.0 { + hasher.write_i128(x); + } + hasher.finish() + } +} + impl FastHash for U256 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -290,6 +355,21 @@ impl FastHash for U256 { } } +impl BloomHash for U256 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + use std::hash::BuildHasher; + use std::hash::Hasher; + + let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); + let mut hasher = state.build_hasher(); + for x in self.0 { + hasher.write_u128(x); + } + hasher.finish() + } +} + impl FastHash for bool { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -297,6 +377,13 @@ impl FastHash for bool { } } +impl BloomHash for bool { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + (*self as u8).bloom_hash() + } +} + impl FastHash for OrderedFloat { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -308,6 +395,17 @@ impl FastHash for OrderedFloat { } } +impl BloomHash for OrderedFloat { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + if self.is_nan() { + f32::NAN.to_bits().bloom_hash() + } else { + self.to_bits().bloom_hash() + } + } +} + impl FastHash for OrderedFloat { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -319,6 +417,17 @@ impl FastHash for OrderedFloat { } } +impl BloomHash for OrderedFloat { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + if self.is_nan() { + f64::NAN.to_bits().bloom_hash() + } else { + self.to_bits().bloom_hash() + } + } +} + // To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning. #[allow(dead_code)] const SEEDS: [u64; 4] = [1, 1949, 2009, 9527]; @@ -358,6 +467,19 @@ impl FastHash for [u8] { } } +impl BloomHash for [u8] { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + use std::hash::BuildHasher; + use std::hash::Hasher; + + let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); + let mut hasher = state.build_hasher(); + hasher.write(self); + hasher.finish() + } +} + impl FastHash for str { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -365,6 +487,13 @@ impl FastHash for str { } } +impl BloomHash for str { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + self.as_bytes().bloom_hash() + } +} + // trick for unsized_hashtable impl FastHash for ([u64; N], NonZeroU64) { #[inline(always)] diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index 497b3f1d576d3..e107e89a789d9 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -24,7 +24,7 @@ use databend_common_expression::ProjectedBlock; use databend_common_expression::RawExpr; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_common_hashtable::FastHash; +use databend_common_hashtable::BloomHash; pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef { let mut nullable_field = Vec::with_capacity(build_schema.fields().len()); @@ -64,7 +64,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::SingleBinary(method) => { @@ -72,7 +72,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU8(method) => { @@ -80,7 +80,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU16(method) => { @@ -88,7 +88,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU32(method) => { @@ -96,7 +96,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU64(method) => { @@ -104,7 +104,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU128(method) => { @@ -112,7 +112,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } HashMethodKind::KeysU256(method) => { @@ -120,7 +120,7 @@ where hashes.extend( method .build_keys_iter(&keys_state)? - .map(|key| key.fast_hash()), + .map(|key| key.bloom_hash()), ); } } diff --git a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs index 177c5f307cb3a..64fbf11ff5878 100644 --- a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs +++ b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs @@ -23,7 +23,7 @@ use databend_common_expression::HashMethodKind; use databend_common_expression::KeysState; use databend_common_expression::KeysState::U128; use databend_common_expression::KeysState::U256; -use databend_common_hashtable::FastHash; +use databend_common_hashtable::BloomHash; pub struct ExprBloomFilter<'a> { filter: &'a Sbbf, @@ -48,7 +48,7 @@ impl<'a> ExprBloomFilter<'a> { let key_state = method.build_keys_state(group_columns, num_rows)?; match key_state { KeysState::Column(Column::Binary(col)) => col.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -63,14 +63,14 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Binary(col)) | KeysState::Column(Column::Variant(col)) | KeysState::Column(Column::Bitmap(col)) => col.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; }), KeysState::Column(Column::String(col)) => col.iter().for_each(|key| { - let hash = key.as_bytes().fast_hash(); + let hash = key.as_bytes().bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -84,7 +84,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt8(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -99,7 +99,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt16(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -114,7 +114,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt32(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -129,7 +129,7 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt64(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -143,7 +143,7 @@ impl<'a> ExprBloomFilter<'a> { let key_state = hash_method.build_keys_state(group_columns, num_rows)?; match key_state { U128(c) => c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } @@ -156,7 +156,7 @@ impl<'a> ExprBloomFilter<'a> { let key_state = hash_method.build_keys_state(group_columns, num_rows)?; match key_state { U256(c) => c.iter().for_each(|key| { - let hash = key.fast_hash(); + let hash = key.bloom_hash(); if self.filter.check_hash(hash) { bitmap.set(idx, true); } From 54273f0732233aa3b76450bd21689fc89d7482b0 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 25 Nov 2025 15:34:09 +0800 Subject: [PATCH 3/4] fix --- .../hash_join/runtime_filter/builder.rs | 4 +- .../processors/transforms/hash_join/util.rs | 83 ++++++++++++++++++- 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs index f79579bf09fc9..9af1bec4b542b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs @@ -33,7 +33,7 @@ use super::packet::JoinRuntimeFilterPacket; use super::packet::RuntimeFilterPacket; use super::packet::SerializableDomain; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; -use crate::pipelines::processors::transforms::hash_join::util::hash_by_method; +use crate::pipelines::processors::transforms::hash_join::util::hash_by_method_for_bloom; struct JoinRuntimeFilterPacketBuilder<'a> { build_key_column: Column, @@ -156,7 +156,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?; let mut hashes = Vec::with_capacity(num_rows); let key_columns = &[self.build_key_column.clone().into()]; - hash_by_method(&method, key_columns.into(), num_rows, &mut hashes)?; + hash_by_method_for_bloom(&method, key_columns.into(), num_rows, &mut hashes)?; Ok(hashes) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index e107e89a789d9..85f2c59996569 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -25,6 +25,7 @@ use databend_common_expression::RawExpr; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::BloomHash; +use databend_common_hashtable::FastHash; pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef { let mut nullable_field = Vec::with_capacity(build_schema.fields().len()); @@ -48,13 +49,93 @@ pub(crate) fn probe_schema_wrap_nullable(probe_schema: &DataSchemaRef) -> DataSc DataSchemaRefExt::create(nullable_field) } -// Get row hash by HashMethod +// Get row hash by HashMethod (uses FastHash, i.e. the hashtable hash) pub fn hash_by_method( method: &HashMethodKind, columns: ProjectedBlock, num_rows: usize, hashes: &mut T, ) -> Result<()> +where + T: Extend, +{ + match method { + HashMethodKind::Serializer(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::SingleBinary(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU8(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU16(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU32(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU64(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU128(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + HashMethodKind::KeysU256(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.fast_hash()), + ); + } + } + Ok(()) +} + +// Get row hash for Bloom filter by HashMethod. This always uses BloomHash, +// which is independent of SSE4.2 and provides a well-distributed 64-bit hash. +pub fn hash_by_method_for_bloom( + method: &HashMethodKind, + columns: ProjectedBlock, + num_rows: usize, + hashes: &mut T, +) -> Result<()> where T: Extend, { From 6ca3f94dec6fc60a78b1d0d146f91ca3bd1ea1a4 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Tue, 25 Nov 2025 19:19:25 +0800 Subject: [PATCH 4/4] replace ahash --- src/common/hashtable/src/traits.rs | 51 ++++++++++++++++++------------ 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 706e5f976c964..4979e29b1593a 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -180,6 +180,21 @@ pub trait BloomHash { fn bloom_hash(&self) -> u64; } +/// Compress a 128-bit value into a 64-bit hash. +/// +/// This is the `Hash128to64` function from CityHash, a Murmur-inspired +/// mixing function with good avalanche properties. +#[inline(always)] +fn hash128_to_64(low: u64, high: u64) -> u64 { + const KMUL: u64 = 0x9ddf_ea08_eb38_2d69; + let mut a = (low ^ high).wrapping_mul(KMUL); + a ^= a >> 47; + let mut b = (high ^ a).wrapping_mul(KMUL); + b ^= b >> 47; + b = b.wrapping_mul(KMUL); + b +} + macro_rules! impl_fast_hash_for_primitive_types { ($t: ty) => { impl FastHash for $t { @@ -262,13 +277,9 @@ impl FastHash for u128 { impl BloomHash for u128 { #[inline(always)] fn bloom_hash(&self) -> u64 { - use std::hash::BuildHasher; - use std::hash::Hasher; - - let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); - let mut hasher = state.build_hasher(); - hasher.write_u128(*self); - hasher.finish() + let low = *self as u64; + let high = (self >> 64) as u64; + hash128_to_64(low, high) } } @@ -316,15 +327,14 @@ impl FastHash for i256 { impl BloomHash for i256 { #[inline(always)] fn bloom_hash(&self) -> u64 { - use std::hash::BuildHasher; - use std::hash::Hasher; - - let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); - let mut hasher = state.build_hasher(); + let mut low = 0_u64; + let mut high = 0_u64; for x in self.0 { - hasher.write_i128(x); + let v = x as u128; + low ^= v as u64; + high ^= (v >> 64) as u64; } - hasher.finish() + hash128_to_64(low, high) } } @@ -358,15 +368,14 @@ impl FastHash for U256 { impl BloomHash for U256 { #[inline(always)] fn bloom_hash(&self) -> u64 { - use std::hash::BuildHasher; - use std::hash::Hasher; - - let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); - let mut hasher = state.build_hasher(); + let mut low = 0_u64; + let mut high = 0_u64; for x in self.0 { - hasher.write_u128(x); + let v = x; + low ^= v as u64; + high ^= (v >> 64) as u64; } - hasher.finish() + hash128_to_64(low, high) } }