Skip to content

Commit

Permalink
GH-35729: [C++][Parquet] Implement batch interface for BloomFilter in…
Browse files Browse the repository at this point in the history
… Parquet (#35731)

### Rationale for this change

For optimizing #35691 . We need batch execution for BloomFilter

### What changes are included in this PR?

* Add `InsertHashes` and `Hashes` for BloomFilter
* Make `BloomFilter::Find` faster by inlining the bitmask computation inside the loop

* Closes: #35729

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: mwish <1506118561@qq.com>
Co-authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
4 people committed May 30, 2023
1 parent 9dc63c2 commit 2695293
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 91 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/bitmap_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class FirstTimeBitmapWriter {
// unset so no additional accounting is needed for when number_of_bits <
// bits_to_carry.
current_byte_ |= (word & bit_util::kPrecedingBitmask[bits_to_carry]) << bit_offset;
// Check if everything is transfered into current_byte_.
// Check if everything is transferred into current_byte_.
if (ARROW_PREDICT_FALSE(number_of_bits < bits_to_carry)) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ endif()
add_parquet_test(file_deserialize_test SOURCES file_deserialize_test.cc test_util.cc)
add_parquet_test(schema_test)

add_parquet_benchmark(bloom_filter_benchmark)
add_parquet_benchmark(column_reader_benchmark)
add_parquet_benchmark(column_io_benchmark)
add_parquet_benchmark(encoding_benchmark)
Expand Down
44 changes: 19 additions & 25 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

#include "arrow/result.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"

#include "generated/parquet_types.h"

#include "parquet/bloom_filter.h"
#include "parquet/exception.h"
#include "parquet/thrift_internal.h"
Expand Down Expand Up @@ -180,50 +183,41 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}

void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const {
for (int i = 0; i < kBitsSetPerBlock; ++i) {
block_mask.item[i] = key * SALT[i];
}

for (int i = 0; i < kBitsSetPerBlock; ++i) {
block_mask.item[i] = block_mask.item[i] >> 27;
}

for (int i = 0; i < kBitsSetPerBlock; ++i) {
block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i];
}
}

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());

// Calculate mask for bucket.
BlockMask block_mask;
SetMask(key, block_mask);

for (int i = 0; i < kBitsSetPerBlock; ++i) {
if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) {
// Calculate mask for key in the given bitset.
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
if (ARROW_PREDICT_FALSE(0 ==
(bitset32[kBitsSetPerBlock * bucket_index + i] & mask))) {
return false;
}
}
return true;
}

void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());

// Calculate mask for bucket.
BlockMask block_mask;
SetMask(key, block_mask);

for (int i = 0; i < kBitsSetPerBlock; i++) {
bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i];
// Calculate mask for key in the given bitset.
const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
bitset32[bucket_index * kBitsSetPerBlock + i] |= mask;
}
}

void BlockSplitBloomFilter::InsertHash(uint64_t hash) { InsertHashImpl(hash); }

void BlockSplitBloomFilter::InsertHashes(const uint64_t* hashes, int num_values) {
for (int i = 0; i < num_values; ++i) {
InsertHashImpl(hashes[i]);
}
}

Expand Down
104 changes: 97 additions & 7 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class PARQUET_EXPORT BloomFilter {
/// @param hash the hash of value to insert into Bloom filter.
virtual void InsertHash(uint64_t hash) = 0;

/// Insert elements to set represented by Bloom filter bitset.
/// @param hashes the hash values to insert into Bloom filter.
/// @param num_values the number of hash values to insert.
virtual void InsertHashes(const uint64_t* hashes, int num_values) = 0;

/// Write this Bloom filter to an output stream. A Bloom filter structure should
/// include bitset length, hash strategy, algorithm, and bitset.
///
Expand Down Expand Up @@ -101,7 +106,66 @@ class PARQUET_EXPORT BloomFilter {
/// @return hash result.
virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;

virtual ~BloomFilter() {}
/// Batch compute hashes for 32 bits values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const int32_t* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for 64 bits values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const int64_t* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for float values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const float* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for double values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const double* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for Int96 values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const Int96* values, int num_values, uint64_t* hashes) const = 0;

/// Batch compute hashes for ByteArray values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const ByteArray* values, int num_values,
uint64_t* hashes) const = 0;

/// Batch compute hashes for fixed byte array values by using its plain encoding result.
///
/// @param values values a pointer to the values to hash.
/// @param type_len the value length.
/// @param num_values the number of values to hash.
/// @param hashes a pointer to the output hash values, its length should be equal to
/// num_values.
virtual void Hashes(const FLBA* values, uint32_t type_len, int num_values,
uint64_t* hashes) const = 0;

virtual ~BloomFilter() = default;

protected:
// Hash strategy available for Bloom filter.
Expand Down Expand Up @@ -200,19 +264,48 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {

bool FindHash(uint64_t hash) const override;
void InsertHash(uint64_t hash) override;
void InsertHashes(const uint64_t* hashes, int num_values) override;
void WriteTo(ArrowOutputStream* sink) const override;
uint32_t GetBitsetSize() const override { return num_bytes_; }

uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); }
uint64_t Hash(float value) const override { return hasher_->Hash(value); }
uint64_t Hash(double value) const override { return hasher_->Hash(value); }
uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); }
uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); }
uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
uint64_t Hash(const FLBA* value, uint32_t len) const override {
return hasher_->Hash(value, len);
}

void Hashes(const int32_t* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const int64_t* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const float* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const double* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const Int96* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const ByteArray* values, int num_values, uint64_t* hashes) const override {
hasher_->Hashes(values, num_values, hashes);
}
void Hashes(const FLBA* values, uint32_t type_len, int num_values,
uint64_t* hashes) const override {
hasher_->Hashes(values, type_len, num_values, hashes);
}

uint64_t Hash(const int32_t* value) const { return hasher_->Hash(*value); }
uint64_t Hash(const int64_t* value) const { return hasher_->Hash(*value); }
uint64_t Hash(const float* value) const { return hasher_->Hash(*value); }
uint64_t Hash(const double* value) const { return hasher_->Hash(*value); }

/// Deserialize the Bloom filter from an input stream. It is used when reconstructing
/// a Bloom filter from a parquet filter.
///
Expand All @@ -223,6 +316,8 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
ArrowInputStream* input_stream);

private:
inline void InsertHashImpl(uint64_t hash);

// Bytes in a tiny Bloom filter block.
static constexpr int kBytesPerFilterBlock = 32;

Expand All @@ -240,11 +335,6 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};

/// Set bits in mask array according to input key.
/// @param key the value to calculate mask values.
/// @param mask the mask array is used to set inside a block
void SetMask(uint32_t key, BlockMask& mask) const;

// Memory pool to allocate aligned buffer for bitset
::arrow::MemoryPool* pool_;

Expand Down

0 comments on commit 2695293

Please sign in to comment.