Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/hashtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod utils;

pub use table0::Entry as HashtableEntry;
pub use traits::hash_join_fast_string_hash;
pub use traits::BloomHash;
pub use traits::EntryMutRefLike as HashtableEntryMutRefLike;
pub use traits::EntryRefLike as HashtableEntryRefLike;
pub use traits::FastHash;
Expand Down
138 changes: 138 additions & 0 deletions src/common/hashtable/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ pub trait FastHash {
fn fast_hash(&self) -> u64;
}

pub trait BloomHash {
fn bloom_hash(&self) -> u64;
}

/// Compress a 128-bit value into a 64-bit hash.
///
/// This is the `Hash128to64` function from CityHash, a Murmur-inspired
/// mixing function with good avalanche properties.
#[inline(always)]
fn hash128_to_64(low: u64, high: u64) -> u64 {
const KMUL: u64 = 0x9ddf_ea08_eb38_2d69;
let mut a = (low ^ high).wrapping_mul(KMUL);
a ^= a >> 47;
let mut b = (high ^ a).wrapping_mul(KMUL);
b ^= b >> 47;
b = b.wrapping_mul(KMUL);
b
}

macro_rules! impl_fast_hash_for_primitive_types {
($t: ty) => {
impl FastHash for $t {
Expand Down Expand Up @@ -208,6 +227,32 @@ impl_fast_hash_for_primitive_types!(i32);
impl_fast_hash_for_primitive_types!(u64);
impl_fast_hash_for_primitive_types!(i64);

macro_rules! impl_bloom_hash_for_primitive_types {
($t: ty) => {
impl BloomHash for $t {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
let mut hasher = *self as u64;
hasher ^= hasher >> 33;
hasher = hasher.wrapping_mul(0xff51afd7ed558ccd_u64);
hasher ^= hasher >> 33;
hasher = hasher.wrapping_mul(0xc4ceb9fe1a85ec53_u64);
hasher ^= hasher >> 33;
hasher
}
}
};
}

impl_bloom_hash_for_primitive_types!(u8);
impl_bloom_hash_for_primitive_types!(i8);
impl_bloom_hash_for_primitive_types!(u16);
impl_bloom_hash_for_primitive_types!(i16);
impl_bloom_hash_for_primitive_types!(u32);
impl_bloom_hash_for_primitive_types!(i32);
impl_bloom_hash_for_primitive_types!(u64);
impl_bloom_hash_for_primitive_types!(i64);

impl FastHash for u128 {
#[inline(always)]
fn fast_hash(&self) -> u64 {
Expand All @@ -229,13 +274,29 @@ impl FastHash for u128 {
}
}

impl BloomHash for u128 {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
let low = *self as u64;
let high = (self >> 64) as u64;
hash128_to_64(low, high)
}
}

impl FastHash for i128 {
#[inline(always)]
fn fast_hash(&self) -> u64 {
(*self as u128).fast_hash()
}
}

impl BloomHash for i128 {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
(*self as u128).bloom_hash()
}
}

impl FastHash for i256 {
#[inline(always)]
fn fast_hash(&self) -> u64 {
Expand Down Expand Up @@ -263,6 +324,20 @@ impl FastHash for i256 {
}
}

impl BloomHash for i256 {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
let mut low = 0_u64;
let mut high = 0_u64;
for x in self.0 {
let v = x as u128;
low ^= v as u64;
high ^= (v >> 64) as u64;
}
hash128_to_64(low, high)
}
}

impl FastHash for U256 {
#[inline(always)]
fn fast_hash(&self) -> u64 {
Expand Down Expand Up @@ -290,13 +365,34 @@ impl FastHash for U256 {
}
}

impl BloomHash for U256 {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
let mut low = 0_u64;
let mut high = 0_u64;
for x in self.0 {
let v = x;
low ^= v as u64;
high ^= (v >> 64) as u64;
}
hash128_to_64(low, high)
}
}

impl FastHash for bool {
#[inline(always)]
fn fast_hash(&self) -> u64 {
(*self as u8).fast_hash()
}
}

impl BloomHash for bool {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
(*self as u8).bloom_hash()
}
}

impl FastHash for OrderedFloat<f32> {
#[inline(always)]
fn fast_hash(&self) -> u64 {
Expand All @@ -308,6 +404,17 @@ impl FastHash for OrderedFloat<f32> {
}
}

impl BloomHash for OrderedFloat<f32> {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
if self.is_nan() {
f32::NAN.to_bits().bloom_hash()
} else {
self.to_bits().bloom_hash()
}
}
}

impl FastHash for OrderedFloat<f64> {
#[inline(always)]
fn fast_hash(&self) -> u64 {
Expand All @@ -319,6 +426,17 @@ impl FastHash for OrderedFloat<f64> {
}
}

impl BloomHash for OrderedFloat<f64> {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
if self.is_nan() {
f64::NAN.to_bits().bloom_hash()
} else {
self.to_bits().bloom_hash()
}
}
}

// To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning.
#[allow(dead_code)]
const SEEDS: [u64; 4] = [1, 1949, 2009, 9527];
Expand Down Expand Up @@ -358,13 +476,33 @@ impl FastHash for [u8] {
}
}

impl BloomHash for [u8] {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
use std::hash::BuildHasher;
use std::hash::Hasher;

let state = ahash::RandomState::with_seeds(SEEDS[0], SEEDS[1], SEEDS[2], SEEDS[3]);
let mut hasher = state.build_hasher();
hasher.write(self);
hasher.finish()
}
}

impl FastHash for str {
#[inline(always)]
fn fast_hash(&self) -> u64 {
self.as_bytes().fast_hash()
}
}

impl BloomHash for str {
#[inline(always)]
fn bloom_hash(&self) -> u64 {
self.as_bytes().bloom_hash()
}
}

// trick for unsized_hashtable
impl<const N: usize> FastHash for ([u64; N], NonZeroU64) {
#[inline(always)]
Expand Down
1 change: 0 additions & 1 deletion src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
thrift = { workspace = true }
twox-hash = { workspace = true }
typetag = { workspace = true }

[dev-dependencies]
Expand Down
52 changes: 8 additions & 44 deletions src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@
//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf

use std::hash::Hasher;

use twox_hash::XxHash64;

/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
0x47b6137b_u32,
Expand Down Expand Up @@ -223,32 +219,20 @@ impl Sbbf {
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}

/// Insert a hash into the filter
fn insert_hash(&mut self, hash: u64) {
/// Insert a hash into the filter. The caller must provide a well-distributed 64-bit hash.
pub fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].insert(hash as u32)
}

/// Check if a hash is in the filter. May return
/// true for values that was never inserted ("false positive")
/// but will always return false if a hash has not been inserted.
fn check_hash(&self, hash: u64) -> bool {
pub fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}

/// Insert a digest (u64 hash value) into the filter
pub fn insert_digest(&mut self, digest: u64) {
let hash = hash_u64(digest);
self.insert_hash(hash)
}

/// Check if a digest is probably present or definitely absent in the filter
pub fn check_digest(&self, digest: u64) -> bool {
let hash = hash_u64(digest);
self.check_hash(hash)
}

/// Merge another bloom filter into this one (bitwise OR operation)
/// Panics if the filters have different sizes
pub fn union(&mut self, other: &Self) {
Expand All @@ -271,30 +255,10 @@ impl Sbbf {
}
}

/// Per spec we use xxHash with seed=0
const SEED: u64 = 0;

/// Hash a u64 value using XxHash64
#[inline]
fn hash_u64(value: u64) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write_u64(value);
hasher.finish()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_hash_u64() {
let digest1 = 12345u64;
let digest2 = 67890u64;
assert_ne!(hash_u64(digest1), digest1);
assert_ne!(hash_u64(digest2), digest2);
assert_ne!(hash_u64(digest1), hash_u64(digest2));
}

#[test]
fn test_mask_set_quick_check() {
for i in 0..1_000_000 {
Expand All @@ -316,27 +280,27 @@ mod tests {
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
for i in 0..1_000_000 {
sbbf.insert_digest(i);
assert!(sbbf.check_digest(i));
sbbf.insert_hash(i);
assert!(sbbf.check_hash(i));
}
}

#[test]
fn test_sbbf_union() {
let mut filter1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
for i in 0..50 {
filter1.insert_digest(i);
filter1.insert_hash(i);
}

let mut filter2 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
for i in 50..100 {
filter2.insert_digest(i);
filter2.insert_hash(i);
}

filter1.union(&filter2);

for i in 0..100 {
assert!(filter1.check_digest(i));
assert!(filter1.check_hash(i));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::packet::JoinRuntimeFilterPacket;
use super::packet::RuntimeFilterPacket;
use super::packet::SerializableDomain;
use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc;
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method;
use crate::pipelines::processors::transforms::hash_join::util::hash_by_method_for_bloom;

struct JoinRuntimeFilterPacketBuilder<'a> {
build_key_column: Column,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<'a> JoinRuntimeFilterPacketBuilder<'a> {
let method = DataBlock::choose_hash_method_with_types(&[data_type.clone()])?;
let mut hashes = Vec::with_capacity(num_rows);
let key_columns = &[self.build_key_column.clone().into()];
hash_by_method(&method, key_columns.into(), num_rows, &mut hashes)?;
hash_by_method_for_bloom(&method, key_columns.into(), num_rows, &mut hashes)?;
Ok(hashes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ async fn build_bloom_filter(
if total_items < 50000 {
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
.map_err(|e| ErrorCode::Internal(e.to_string()))?;
for digest in bloom {
filter.insert_digest(digest);
for hash in bloom {
filter.insert_hash(hash);
}
return Ok(RuntimeFilterBloom {
column_name,
Expand All @@ -279,8 +279,8 @@ async fn build_bloom_filter(
let mut filter = Sbbf::new_with_ndv_fpp(total_items as u64, 0.01)
.map_err(|e| ErrorCode::Internal(e.to_string()))?;

for digest in chunk {
filter.insert_digest(digest);
for hash in chunk {
filter.insert_hash(hash);
}
Ok::<Sbbf, ErrorCode>(filter)
})
Expand Down
Loading
Loading