diff --git a/Cargo.lock b/Cargo.lock index 3578dc0c12f76..36a5afa1133da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3473,7 +3473,6 @@ dependencies = [ "serde_json", "sha2", "thrift", - "twox-hash 1.6.3", "typetag", ] diff --git a/src/common/hashtable/src/lib.rs b/src/common/hashtable/src/lib.rs index e975ea3d70d07..d35597478752c 100644 --- a/src/common/hashtable/src/lib.rs +++ b/src/common/hashtable/src/lib.rs @@ -44,6 +44,7 @@ mod utils; pub use table0::Entry as HashtableEntry; pub use traits::hash_join_fast_string_hash; +pub use traits::BloomHash; pub use traits::EntryMutRefLike as HashtableEntryMutRefLike; pub use traits::EntryRefLike as HashtableEntryRefLike; pub use traits::FastHash; diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 6be457a2fa37c..4979e29b1593a 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -176,6 +176,25 @@ pub trait FastHash { fn fast_hash(&self) -> u64; } +pub trait BloomHash { + fn bloom_hash(&self) -> u64; +} + +/// Compress a 128-bit value into a 64-bit hash. +/// +/// This is the `Hash128to64` function from CityHash, a Murmur-inspired +/// mixing function with good avalanche properties. +#[inline(always)] +fn hash128_to_64(low: u64, high: u64) -> u64 { + const KMUL: u64 = 0x9ddf_ea08_eb38_2d69; + let mut a = (low ^ high).wrapping_mul(KMUL); + a ^= a >> 47; + let mut b = (high ^ a).wrapping_mul(KMUL); + b ^= b >> 47; + b = b.wrapping_mul(KMUL); + b +} + macro_rules! impl_fast_hash_for_primitive_types { ($t: ty) => { impl FastHash for $t { @@ -208,6 +227,32 @@ impl_fast_hash_for_primitive_types!(i32); impl_fast_hash_for_primitive_types!(u64); impl_fast_hash_for_primitive_types!(i64); +macro_rules! impl_bloom_hash_for_primitive_types { + ($t: ty) => { + impl BloomHash for $t { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + let mut hasher = *self as u64; + hasher ^= hasher >> 33; + hasher = hasher.wrapping_mul(0xff51afd7ed558ccd_u64); + hasher ^= hasher >> 33; + hasher = hasher.wrapping_mul(0xc4ceb9fe1a85ec53_u64); + hasher ^= hasher >> 33; + hasher + } + } + }; +} + +impl_bloom_hash_for_primitive_types!(u8); +impl_bloom_hash_for_primitive_types!(i8); +impl_bloom_hash_for_primitive_types!(u16); +impl_bloom_hash_for_primitive_types!(i16); +impl_bloom_hash_for_primitive_types!(u32); +impl_bloom_hash_for_primitive_types!(i32); +impl_bloom_hash_for_primitive_types!(u64); +impl_bloom_hash_for_primitive_types!(i64); + impl FastHash for u128 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -229,6 +274,15 @@ impl FastHash for u128 { } } +impl BloomHash for u128 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + let low = *self as u64; + let high = (self >> 64) as u64; + hash128_to_64(low, high) + } +} + impl FastHash for i128 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -236,6 +290,13 @@ impl FastHash for i128 { } } +impl BloomHash for i128 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + (*self as u128).bloom_hash() + } +} + impl FastHash for i256 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -263,6 +324,20 @@ impl FastHash for i256 { } } +impl BloomHash for i256 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + let mut low = 0_u64; + let mut high = 0_u64; + for x in self.0 { + let v = x as u128; + low ^= v as u64; + high ^= (v >> 64) as u64; + } + hash128_to_64(low, high) + } +} + impl FastHash for U256 { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -290,6 +365,20 @@ impl FastHash for U256 { } } +impl BloomHash for U256 { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + let mut low = 0_u64; + let mut high = 0_u64; + for x in self.0 { + let v = x; + low ^= v as u64; + high ^= (v >> 64) as u64; + } + hash128_to_64(low, high) + } +} + impl FastHash for bool { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -297,6 +386,13 @@ impl FastHash for bool { } } +impl BloomHash for bool { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + (*self as u8).bloom_hash() + } +} + impl FastHash for OrderedFloat { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -308,6 +404,17 @@ impl FastHash for OrderedFloat { } } +impl BloomHash for OrderedFloat { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + if self.is_nan() { + f32::NAN.to_bits().bloom_hash() + } else { + self.to_bits().bloom_hash() + } + } +} + impl FastHash for OrderedFloat { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -319,6 +426,17 @@ impl FastHash for OrderedFloat { } } +impl BloomHash for OrderedFloat { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + if self.is_nan() { + f64::NAN.to_bits().bloom_hash() + } else { + self.to_bits().bloom_hash() + } + } +} + // To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning. #[allow(dead_code)] const SEEDS: [u64; 4] = [1, 1949, 2009, 9527]; @@ -358,6 +476,19 @@ impl FastHash for [u8] { } } +impl BloomHash for [u8] { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + use std::hash::BuildHasher; + use std::hash::Hasher; + + let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]); + let mut hasher = state.build_hasher(); + hasher.write(self); + hasher.finish() + } +} + impl FastHash for str { #[inline(always)] fn fast_hash(&self) -> u64 { @@ -365,6 +496,13 @@ impl FastHash for str { } } +impl BloomHash for str { + #[inline(always)] + fn bloom_hash(&self) -> u64 { + self.as_bytes().bloom_hash() + } +} + // trick for unsized_hashtable impl FastHash for ([u64; N], NonZeroU64) { #[inline(always)] diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index dd8ab00ed82b4..26c017b292494 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -39,7 +39,6 @@ serde = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } thrift = { workspace = true } -twox-hash = { workspace = true } typetag = { workspace = true } [dev-dependencies] diff --git a/src/query/catalog/src/sbbf.rs b/src/query/catalog/src/sbbf.rs index 5b0d050c3b355..e4d53278c85a6 100644 --- a/src/query/catalog/src/sbbf.rs +++ b/src/query/catalog/src/sbbf.rs @@ -73,10 +73,6 @@ //! [sbbf-paper]: https://arxiv.org/pdf/2101.01719 //! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf -use std::hash::Hasher; - -use twox_hash::XxHash64; - /// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach). const SALT: [u32; 8] = [ 0x47b6137b_u32, @@ -223,8 +219,8 @@ impl Sbbf { (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize } - /// Insert a hash into the filter - fn insert_hash(&mut self, hash: u64) { + /// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash. + pub fn insert_hash(&mut self, hash: u64) { let block_index = self.hash_to_block_index(hash); self.0[block_index].insert(hash as u32) } @@ -232,23 +228,11 @@ impl Sbbf { /// Check if a hash is in the filter. May return /// true for values that was never inserted ("false positive") /// but will always return false if a hash has not been inserted. - fn check_hash(&self, hash: u64) -> bool { + pub fn check_hash(&self, hash: u64) -> bool { let block_index = self.hash_to_block_index(hash); self.0[block_index].check(hash as u32) } - /// Insert a digest (u64 hash value) into the filter - pub fn insert_digest(&mut self, digest: u64) { - let hash = hash_u64(digest); - self.insert_hash(hash) - } - - /// Check if a digest is probably present or definitely absent in the filter - pub fn check_digest(&self, digest: u64) -> bool { - let hash = hash_u64(digest); - self.check_hash(hash) - } - /// Merge another bloom filter into this one (bitwise OR operation) /// Panics if the filters have different sizes pub fn union(&mut self, other: &Self) { @@ -271,30 +255,10 @@ impl Sbbf { } } -/// Per spec we use xxHash with seed=0 -const SEED: u64 = 0; - -/// Hash a u64 value using XxHash64 -#[inline] -fn hash_u64(value: u64) -> u64 { - let mut hasher = XxHash64::with_seed(SEED); - hasher.write_u64(value); - hasher.finish() -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn test_hash_u64() { - let digest1 = 12345u64; - let digest2 = 67890u64; - assert_ne!(hash_u64(digest1), digest1); - assert_ne!(hash_u64(digest2), digest2); - assert_ne!(hash_u64(digest1), hash_u64(digest2)); - } - #[test] fn test_mask_set_quick_check() { for i in 0..1_000_000 { @@ -316,8 +280,8 @@ mod tests { fn test_sbbf_insert_and_check() { let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]); for i in 0..1_000_000 { - sbbf.insert_digest(i); - assert!(sbbf.check_digest(i)); + sbbf.insert_hash(i); + assert!(sbbf.check_hash(i)); } } @@ -325,18 +289,18 @@ mod tests { fn test_sbbf_union() { let mut filter1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap(); for i in 0..50 { - filter1.insert_digest(i); + filter1.insert_hash(i); } let mut filter2 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap(); for i in 50..100 { - filter2.insert_digest(i); + filter2.insert_hash(i); } filter1.union(&filter2); for i in 0..100 { - assert!(filter1.check_digest(i)); + assert!(filter1.check_hash(i)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs index f79579bf09fc9..9af1bec4b542b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs @@ -33,7 +33,7 @@ use super::packet::JoinRuntimeFilterPacket; use super::packet::RuntimeFilterPacket; use super::packet::SerializableDomain; use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc; -use crate::pipelines::processors::transforms::hash_join::util::hash_by_method; +use crate::pipelines::processors::transforms::hash_join::util::hash_by_method_for_bloom; struct JoinRuntimeFilterPacketBuilder<'a> { build_key_column: Column, @@ -156,7 +156,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> { let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?; let mut hashes = Vec::with_capacity(num_rows); let key_columns = &[self.build_key_column.clone().into()]; - hash_by_method(&method, key_columns.into(), num_rows, &mut hashes)?; + hash_by_method_for_bloom(&method, key_columns.into(), num_rows, &mut hashes)?; Ok(hashes) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs index f58e01bcfaa83..f39386b88aabe 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/convert.rs @@ -256,8 +256,8 @@ async fn build_bloom_filter( if total_items < 50000 { let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - for digest in bloom { - filter.insert_digest(digest); + for hash in bloom { + filter.insert_hash(hash); } return Ok(RuntimeFilterBloom { column_name, @@ -279,8 +279,8 @@ async fn build_bloom_filter( let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - for digest in chunk { - filter.insert_digest(digest); + for hash in chunk { + filter.insert_hash(hash); } Ok::(filter) }) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index 497b3f1d576d3..85f2c59996569 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -24,6 +24,7 @@ use databend_common_expression::ProjectedBlock; use databend_common_expression::RawExpr; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_hashtable::BloomHash; use databend_common_hashtable::FastHash; pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef { @@ -48,7 +49,7 @@ pub(crate) fn probe_schema_wrap_nullable(probe_schema: &DataSchemaRef) -> DataSc DataSchemaRefExt::create(nullable_field) } -// Get row hash by HashMethod +// Get row hash by HashMethod (uses FastHash, i.e. the hashtable hash) pub fn hash_by_method( method: &HashMethodKind, columns: ProjectedBlock, @@ -127,6 +128,86 @@ where Ok(()) } +// Get row hash for Bloom filter by HashMethod. This always uses BloomHash, +// which is independent of SSE4.2 and provides a well-distributed 64-bit hash. +pub fn hash_by_method_for_bloom( + method: &HashMethodKind, + columns: ProjectedBlock, + num_rows: usize, + hashes: &mut T, +) -> Result<()> +where + T: Extend, +{ + match method { + HashMethodKind::Serializer(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::SingleBinary(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU8(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU16(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU32(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU64(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU128(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + HashMethodKind::KeysU256(method) => { + let keys_state = method.build_keys_state(columns, num_rows)?; + hashes.extend( + method + .build_keys_iter(&keys_state)? + .map(|key| key.bloom_hash()), + ); + } + } + Ok(()) +} + pub(crate) fn min_max_filter( min: Scalar, max: Scalar, diff --git a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs index c3648b501fc00..64fbf11ff5878 100644 --- a/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs +++ b/src/query/storages/fuse/src/pruning/expr_bloom_filter.rs @@ -23,7 +23,7 @@ use databend_common_expression::HashMethodKind; use databend_common_expression::KeysState; use databend_common_expression::KeysState::U128; use databend_common_expression::KeysState::U256; -use databend_common_hashtable::FastHash; +use databend_common_hashtable::BloomHash; pub struct ExprBloomFilter<'a> { filter: &'a Sbbf, @@ -48,8 +48,8 @@ impl<'a> ExprBloomFilter<'a> { let key_state = method.build_keys_state(group_columns, num_rows)?; match key_state { KeysState::Column(Column::Binary(col)) => col.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -63,15 +63,15 @@ impl<'a> ExprBloomFilter<'a> { KeysState::Column(Column::Binary(col)) | KeysState::Column(Column::Variant(col)) | KeysState::Column(Column::Bitmap(col)) => col.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; }), KeysState::Column(Column::String(col)) => col.iter().for_each(|key| { - let hash = key.as_bytes().fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.as_bytes().bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -84,8 +84,8 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt8(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -99,8 +99,8 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt16(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -114,8 +114,8 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt32(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -129,8 +129,8 @@ impl<'a> ExprBloomFilter<'a> { match key_state { KeysState::Column(Column::Number(NumberColumn::UInt64(c))) => { c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -143,8 +143,8 @@ impl<'a> ExprBloomFilter<'a> { let key_state = hash_method.build_keys_state(group_columns, num_rows)?; match key_state { U128(c) => c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1; @@ -156,8 +156,8 @@ impl<'a> ExprBloomFilter<'a> { let key_state = hash_method.build_keys_state(group_columns, num_rows)?; match key_state { U256(c) => c.iter().for_each(|key| { - let hash = key.fast_hash(); - if self.filter.check_digest(hash) { + let hash = key.bloom_hash(); + if self.filter.check_hash(hash) { bitmap.set(idx, true); } idx += 1;