-
Notifications
You must be signed in to change notification settings - Fork 791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster Parquet Bloom Writer #3333
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,38 +49,82 @@ const SALT: [u32; 8] = [ | |
|
||
/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. | ||
/// Each word is thought of as an array of bits; each bit is either "set" or "not set". | ||
type Block = [u32; 8]; | ||
|
||
/// takes as its argument a single unsigned 32-bit integer and returns a block in which each | ||
/// word has exactly one bit set. | ||
fn mask(x: u32) -> Block { | ||
let mut result = [0_u32; 8]; | ||
for i in 0..8 { | ||
// wrapping instead of checking for overflow | ||
let y = x.wrapping_mul(SALT[i]); | ||
let y = y >> 27; | ||
result[i] = 1 << y; | ||
#[derive(Debug, Copy, Clone)] | ||
struct Block([u32; 8]); | ||
impl Block { | ||
const ZERO: Block = Block([0; 8]); | ||
|
||
/// takes as its argument a single unsigned 32-bit integer and returns a block in which each | ||
/// word has exactly one bit set. | ||
fn mask(x: u32) -> Self { | ||
let mut result = [0_u32; 8]; | ||
for i in 0..8 { | ||
// wrapping instead of checking for overflow | ||
let y = x.wrapping_mul(SALT[i]); | ||
let y = y >> 27; | ||
result[i] = 1 << y; | ||
} | ||
Self(result) | ||
} | ||
|
||
#[inline] | ||
#[cfg(target_endian = "little")] | ||
fn to_le_bytes(self) -> [u8; 32] { | ||
self.to_ne_bytes() | ||
} | ||
|
||
#[inline] | ||
#[cfg(not(target_endian = "little"))] | ||
fn to_le_bytes(self) -> [u8; 32] { | ||
self.swap_bytes().to_ne_bytes() | ||
} | ||
|
||
#[inline] | ||
fn to_ne_bytes(self) -> [u8; 32] { | ||
unsafe { std::mem::transmute(self) } | ||
} | ||
|
||
#[inline] | ||
#[cfg(not(target_endian = "little"))] | ||
fn swap_bytes(mut self) -> Self { | ||
self.0.iter_mut().for_each(|x| *x = x.swap_bytes()); | ||
self | ||
} | ||
|
||
/// setting every bit in the block that was also set in the result from mask | ||
fn insert(&mut self, hash: u32) { | ||
let mask = Self::mask(hash); | ||
for i in 0..8 { | ||
self[i] |= mask[i]; | ||
} | ||
} | ||
|
||
/// returns true when every bit that is set in the result of mask is also set in the block. | ||
fn check(&self, hash: u32) -> bool { | ||
let mask = Self::mask(hash); | ||
for i in 0..8 { | ||
if self[i] & mask[i] == 0 { | ||
return false; | ||
} | ||
} | ||
true | ||
} | ||
result | ||
} | ||
|
||
/// setting every bit in the block that was also set in the result from mask | ||
fn block_insert(block: &mut Block, hash: u32) { | ||
let mask = mask(hash); | ||
for i in 0..8 { | ||
block[i] |= mask[i]; | ||
impl std::ops::Index<usize> for Block { | ||
type Output = u32; | ||
|
||
#[inline] | ||
fn index(&self, index: usize) -> &Self::Output { | ||
self.0.index(index) | ||
} | ||
} | ||
|
||
/// returns true when every bit that is set in the result of mask is also set in the block. | ||
fn block_check(block: &Block, hash: u32) -> bool { | ||
let mask = mask(hash); | ||
for i in 0..8 { | ||
if block[i] & mask[i] == 0 { | ||
return false; | ||
} | ||
impl std::ops::IndexMut<usize> for Block { | ||
#[inline] | ||
fn index_mut(&mut self, index: usize) -> &mut Self::Output { | ||
self.0.index_mut(index) | ||
} | ||
true | ||
} | ||
|
||
/// A split block Bloom filter. The creation of this structure is based on the | ||
|
@@ -166,7 +210,7 @@ impl Sbbf { | |
let data = bitset | ||
.chunks_exact(4 * 8) | ||
.map(|chunk| { | ||
let mut block = [0_u32; 8]; | ||
let mut block = Block::ZERO; | ||
for (i, word) in chunk.chunks_exact(4).enumerate() { | ||
block[i] = u32::from_le_bytes(word.try_into().unwrap()); | ||
} | ||
|
@@ -194,14 +238,14 @@ impl Sbbf { | |
/// Write the bitset in serialized form to the writer. | ||
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> { | ||
for block in &self.0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also experimented with transmuting the entire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is long slice for all blocks instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found this only yielded a very slight speed boost of ~2% |
||
for word in block { | ||
writer.write_all(&word.to_le_bytes()).map_err(|e| { | ||
writer | ||
.write_all(block.to_le_bytes().as_slice()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I think this doesn't write the bytes in same format as before. I guess the reason to write each word in a block sequentially is to follow the spec? The written bloom filter must be sure to be read and understand by other Parquet libraries. Otherwise I have thought to write all words in a bulk like that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should write in the same order as the spec? It writes the integers first to last, with each 32-bit integer written little endian? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, I have thought it wrongly on the order of sequence... 😅 |
||
.map_err(|e| { | ||
ParquetError::General(format!( | ||
"Could not write bloom filter bit set: {}", | ||
e | ||
)) | ||
})?; | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -271,8 +315,7 @@ impl Sbbf { | |
/// Insert a hash into the filter | ||
fn insert_hash(&mut self, hash: u64) { | ||
let block_index = self.hash_to_block_index(hash); | ||
let block = &mut self.0[block_index]; | ||
block_insert(block, hash as u32); | ||
self.0[block_index].insert(hash as u32) | ||
} | ||
|
||
/// Check if an [AsBytes] value is probably present or definitely absent in the filter | ||
|
@@ -285,8 +328,7 @@ impl Sbbf { | |
/// but will always return false if a hash has not been inserted. | ||
fn check_hash(&self, hash: u64) -> bool { | ||
let block_index = self.hash_to_block_index(hash); | ||
let block = &self.0[block_index]; | ||
block_check(block, hash as u32) | ||
self.0[block_index].check(hash as u32) | ||
} | ||
} | ||
|
||
|
@@ -316,23 +358,23 @@ mod tests { | |
#[test] | ||
fn test_mask_set_quick_check() { | ||
for i in 0..1_000_000 { | ||
let result = mask(i); | ||
assert!(result.iter().all(|&x| x.count_ones() == 1)); | ||
let result = Block::mask(i); | ||
assert!(result.0.iter().all(|&x| x.count_ones() == 1)); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_block_insert_and_check() { | ||
for i in 0..1_000_000 { | ||
let mut block = [0_u32; 8]; | ||
block_insert(&mut block, i); | ||
assert!(block_check(&block, i)); | ||
let mut block = Block::ZERO; | ||
block.insert(i); | ||
assert!(block.check(i)); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_sbbf_insert_and_check() { | ||
let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]); | ||
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]); | ||
for i in 0..1_000_000 { | ||
sbbf.insert(&i); | ||
assert!(sbbf.check(&i)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we used
Copy
andClone
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we use Copy in methods that return
Block