Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster Parquet Bloom Writer #3333

Merged
merged 1 commit into from
Dec 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 81 additions & 39 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,82 @@ const SALT: [u32; 8] = [

/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
type Block = [u32; 8];

/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
/// word has exactly one bit set.
fn mask(x: u32) -> Block {
let mut result = [0_u32; 8];
for i in 0..8 {
// wrapping instead of checking for overflow
let y = x.wrapping_mul(SALT[i]);
let y = y >> 27;
result[i] = 1 << y;
#[derive(Debug, Copy, Clone)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we used Copy and Clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we use Copy in methods that return Block

struct Block([u32; 8]);
impl Block {
const ZERO: Block = Block([0; 8]);

/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
/// word has exactly one bit set.
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
// wrapping instead of checking for overflow
let y = x.wrapping_mul(SALT[i]);
let y = y >> 27;
result[i] = 1 << y;
}
Self(result)
}

#[inline]
#[cfg(target_endian = "little")]
fn to_le_bytes(self) -> [u8; 32] {
self.to_ne_bytes()
}

#[inline]
#[cfg(not(target_endian = "little"))]
fn to_le_bytes(self) -> [u8; 32] {
self.swap_bytes().to_ne_bytes()
}

#[inline]
fn to_ne_bytes(self) -> [u8; 32] {
unsafe { std::mem::transmute(self) }
}

#[inline]
#[cfg(not(target_endian = "little"))]
fn swap_bytes(mut self) -> Self {
self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
self
}

/// setting every bit in the block that was also set in the result from mask
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
self[i] |= mask[i];
}
}

/// returns true when every bit that is set in the result of mask is also set in the block.
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
for i in 0..8 {
if self[i] & mask[i] == 0 {
return false;
}
}
true
}
result
}

/// setting every bit in the block that was also set in the result from mask
fn block_insert(block: &mut Block, hash: u32) {
let mask = mask(hash);
for i in 0..8 {
block[i] |= mask[i];
impl std::ops::Index<usize> for Block {
type Output = u32;

#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}

/// returns true when every bit that is set in the result of mask is also set in the block.
fn block_check(block: &Block, hash: u32) -> bool {
let mask = mask(hash);
for i in 0..8 {
if block[i] & mask[i] == 0 {
return false;
}
impl std::ops::IndexMut<usize> for Block {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
self.0.index_mut(index)
}
true
}

/// A split block Bloom filter. The creation of this structure is based on the
Expand Down Expand Up @@ -166,7 +210,7 @@ impl Sbbf {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
let mut block = [0_u32; 8];
let mut block = Block::ZERO;
for (i, word) in chunk.chunks_exact(4).enumerate() {
block[i] = u32::from_le_bytes(word.try_into().unwrap());
}
Expand Down Expand Up @@ -194,14 +238,14 @@ impl Sbbf {
/// Write the bitset in serialized form to the writer.
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also experimented with transmuting the entire Vec<Block> this did lead to a very slight additional speed boost of 2%, but I didn't think it justified its unsafe-ness

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is long slice for all blocks instead of Vec<Block>? Do you think it would be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this only yielded a very slight speed boost of ~2%

for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
writer
.write_all(block.to_le_bytes().as_slice())
Copy link
Member

@viirya viirya Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this doesn't write the bytes in same format as before. I guess the reason to write each word in a block sequentially is to follow the spec? The written bloom filter must be sure to be read and understand by other Parquet libraries. Otherwise I have thought to write all words in a bulk like that.

Copy link
Contributor Author

@tustvold tustvold Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should write in the same order as the spec? It writes the integers first to last, with each 32-bit integer written little endian?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, I have thought it wrongly on the order of sequence... 😅

.map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {}",
e
))
})?;
}
}
Ok(())
}
Expand Down Expand Up @@ -271,8 +315,7 @@ impl Sbbf {
/// Insert a hash into the filter
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
let block = &mut self.0[block_index];
block_insert(block, hash as u32);
self.0[block_index].insert(hash as u32)
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
Expand All @@ -285,8 +328,7 @@ impl Sbbf {
/// but will always return false if a hash has not been inserted.
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
let block = &self.0[block_index];
block_check(block, hash as u32)
self.0[block_index].check(hash as u32)
}
}

Expand Down Expand Up @@ -316,23 +358,23 @@ mod tests {
#[test]
fn test_mask_set_quick_check() {
for i in 0..1_000_000 {
let result = mask(i);
assert!(result.iter().all(|&x| x.count_ones() == 1));
let result = Block::mask(i);
assert!(result.0.iter().all(|&x| x.count_ones() == 1));
}
}

#[test]
fn test_block_insert_and_check() {
for i in 0..1_000_000 {
let mut block = [0_u32; 8];
block_insert(&mut block, i);
assert!(block_check(&block, i));
let mut block = Block::ZERO;
block.insert(i);
assert!(block.check(i));
}
}

#[test]
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]);
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(&i);
assert!(sbbf.check(&i));
Expand Down