Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Merge aa52d6f into d9c262a
Browse files Browse the repository at this point in the history
  • Loading branch information
Chen, Junjie committed Jun 28, 2018
2 parents d9c262a + aa52d6f commit c155cf7
Show file tree
Hide file tree
Showing 9 changed files with 1,098 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,15 @@ set(LIBPARQUET_SRCS
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
src/parquet/bloom.cc
src/parquet/column_reader.cc
src/parquet/column_scanner.cc
src/parquet/column_writer.cc
src/parquet/exception.cc
src/parquet/file_reader.cc
src/parquet/file_writer.cc
src/parquet/metadata.cc
src/parquet/murmur3.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
src/parquet/printer.cc
Expand Down
Binary file added data/bloom_filter.bin
Binary file not shown.
4 changes: 4 additions & 0 deletions src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# Headers: top level
install(FILES
bloom.h
column_reader.h
column_page.h
column_scanner.h
Expand All @@ -25,7 +26,9 @@ install(FILES
exception.h
file_reader.h
file_writer.h
hasher.h
metadata.h
murmur3.h
printer.h
properties.h
schema.h
Expand All @@ -50,6 +53,7 @@ install(FILES
"${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")

ADD_PARQUET_TEST(bloom-test)
ADD_PARQUET_TEST(column_reader-test)
ADD_PARQUET_TEST(column_scanner-test)
ADD_PARQUET_TEST(column_writer-test)
Expand Down
187 changes: 187 additions & 0 deletions src/parquet/bloom-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <algorithm>
#include <random>
#include <string>

#include "arrow/io/file.h"
#include "parquet/bloom.h"
#include "parquet/murmur3.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"

namespace parquet {
namespace test {
TEST(Murmur3Test, TestBloomFilter) {
int64_t result;
const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
MurmurHash3 murmur3;
murmur3.Hash_x64_128(bitset, 8, MurmurHash3::DEFAULT_SEED, &result);
EXPECT_EQ(result, -3850979349427597861l);
}

TEST(FindTest, TestBloomFilter) {
Bloom bloom(1024);

for (int i = 0; i < 10; i++) {
uint64_t hash_value = bloom.Hash(i);
bloom.InsertHash(hash_value);
}

// Serialize bloom filter to memory output stream
InMemoryOutputStream sink;
bloom.WriteTo(&sink);

// Deserialize bloom filter from memory
InMemoryInputStream source(sink.GetBuffer());
int64_t bytes_available;
uint32_t length =
*(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);
EXPECT_EQ(length, 1024);

uint32_t hash = *(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);
EXPECT_EQ(hash, 0);

uint32_t algo = *(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);
EXPECT_EQ(algo, 0);

const uint8_t* bitset = source.Read(length, &bytes_available);
ASSERT_EQ(static_cast<uint32_t>(bytes_available), length);
Bloom de_bloom(bitset, length);
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
}
}

TEST(FPPTest, TestBloomFilter) {
// The exist is a counter for Bloom filter check in this test, if Bloom filter
// FindHash return true, it is increased by one.
int exist = 0;

// Total count of elements that will be used
int total_count = 100000;

// Bloom filter fpp parameter
double fpp = 0.01;

// Character set used to generate random string
std::string charset("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz");
std::vector<std::string> members;
Bloom bloom(Bloom::OptimalNumOfBits(total_count, fpp));

// The uuid_seed is generated by "uuidgen -r"
std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());
std::mt19937 generator(seed);

// Insert 100000 elements to bloom filter and serialize to memory
for (int i = 0; i < total_count; i++) {
std::shuffle(charset.begin(), charset.end(), generator);
std::string tmp = charset.substr(0, 10);
ByteArray byte_array(10, reinterpret_cast<const uint8_t*>(tmp.c_str()));
members.push_back(tmp);
bloom.InsertHash(bloom.Hash<const ByteArray*>(&byte_array));
}

InMemoryOutputStream sink;
bloom.WriteTo(&sink);

// Deserialize bloom filter from memory
InMemoryInputStream source(sink.GetBuffer());
int64_t bytes_available;
uint32_t length =
*(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);

uint32_t hash = *(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);
EXPECT_EQ(hash, 0);

uint32_t algo = *(reinterpret_cast<const uint32_t*>(source.Read(4, &bytes_available)));
ASSERT_EQ(static_cast<uint32_t>(bytes_available), 4);
EXPECT_EQ(algo, 0);

const uint8_t* bitset = source.Read(length, &bytes_available);
ASSERT_EQ(static_cast<uint32_t>(bytes_available), length);

Bloom de_bloom(bitset, length);
for (int i = 0; i < total_count; i++) {
ByteArray byte_array1(10, reinterpret_cast<const uint8_t*>(members[i].c_str()));
ASSERT_TRUE(de_bloom.FindHash(de_bloom.Hash<const ByteArray*>(&byte_array1)));
std::shuffle(charset.begin(), charset.end(), generator);
std::string tmp = charset.substr(0, 8);
ByteArray byte_array2(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));

if (de_bloom.FindHash(de_bloom.Hash<const ByteArray*>(&byte_array2))) {
exist++;
}
}

// The exist should be probably less than 1000 according default FPP 0.01.
EXPECT_TRUE(exist < total_count * fpp);
}

// The CompatibilityTest is use to test cross compatibility with parquet-mr, it read
// the Bloom filter binary generated by Bloom filter class in parquet-mr project and test
// whether the values inserted before could be filtered or not.

// The Bloom filter binary is generated by three steps in from Parquet-mr.
// Step 1: Construct a Bloom filter with 1024 entries.
// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
// Step 3: Call writeTo API to write to File.

TEST(CompatibilityTest, TestBloomFilter) {
int64_t bytesRead;
int length, hash, algorithm;
std::unique_ptr<uint8_t[]> bitset;

std::string testString[4] = {"hello", "parquet", "bloom", "filter"};
std::string data_dir(test::get_data_dir());
std::string bloomDataPath = data_dir + "/bloom_filter.bin";
std::shared_ptr<::arrow::io::ReadableFile> handle;

PARQUET_THROW_NOT_OK(::arrow::io::ReadableFile::Open(bloomDataPath, &handle));

handle->Read(4, &bytesRead, reinterpret_cast<void*>(&length));
ASSERT_EQ(bytesRead, 4);
handle->Read(4, &bytesRead, reinterpret_cast<void*>(&hash));
ASSERT_EQ(hash, 0);
handle->Read(4, &bytesRead, reinterpret_cast<void*>(&algorithm));
ASSERT_EQ(algorithm, 0);

bitset.reset(new uint8_t[length]);
handle->Read(length, &bytesRead, reinterpret_cast<void*>(bitset.get()));
ASSERT_TRUE(length == bytesRead);

Bloom bloom_filter(bitset.get(), length);

for (int i = 0; i < 4; i++) {
ByteArray tmp(static_cast<uint32_t>(testString[i].length()),
reinterpret_cast<const uint8_t*>(testString[i].c_str()));
EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash<const ByteArray*>(&tmp)));
}
}

} // namespace test

} // namespace parquet
158 changes: 158 additions & 0 deletions src/parquet/bloom.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "parquet/bloom.h"

#include <algorithm>
#include <cmath>
#include <cstdint>

#include "arrow/util/bit-util.h"

#include "parquet/exception.h"
#include "parquet/murmur3.h"
#include "parquet/util/logging.h"

namespace parquet {
constexpr uint32_t Bloom::SALT[8];

Bloom::Bloom(uint32_t num_bytes)
: num_bytes_(num_bytes),
hash_strategy_(HashStrategy::MURMUR3_X64_128),
algorithm_(Algorithm::BLOCK) {
InitBitset(num_bytes);

switch (hash_strategy_) {
case HashStrategy::MURMUR3_X64_128:
this->hasher_.reset(new MurmurHash3());
break;
default:
throw parquet::ParquetException("Unknown hash strategy.");
}
}

void Bloom::InitBitset(uint32_t num_bytes) {
if (num_bytes < BYTES_PER_FILTER_BLOCK) {
num_bytes = BYTES_PER_FILTER_BLOCK;
}

if (num_bytes > DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES) {
num_bytes = DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES;
}

// Get next power of 2 if it is not power of 2.
if ((num_bytes & (num_bytes - 1)) != 0) {
num_bytes = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bytes));
}

this->bitset_.reset(new uint32_t[num_bytes / 4]);
memset(this->bitset_.get(), 0, num_bytes);
}

Bloom::Bloom(const uint8_t* bitset, uint32_t num_bytes)
: num_bytes_(num_bytes),
hash_strategy_(HashStrategy::MURMUR3_X64_128),
algorithm_(Algorithm::BLOCK) {
this->bitset_.reset(new uint32_t[num_bytes / 4]);
memcpy(this->bitset_.get(), bitset, num_bytes);
switch (hash_strategy_) {
case HashStrategy::MURMUR3_X64_128:
this->hasher_.reset(new MurmurHash3());
break;
default:
throw parquet::ParquetException("Not supported hash strategy");
}
}

void Bloom::SetMask(uint32_t key, uint32_t mask[8]) {
for (int i = 0; i < 8; ++i) {
mask[i] = key * SALT[i];
}

for (int i = 0; i < 8; ++i) {
mask[i] = mask[i] >> 27;
}

for (int i = 0; i < 8; ++i) {
mask[i] = 0x1U << mask[i];
}
}

uint32_t Bloom::OptimalNumOfBits(uint32_t ndv, double fpp) {
DCHECK(fpp > 0.0 && fpp < 1.0);
const double M = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
const double MAX = Bloom::DEFAULT_MAXIMUM_BLOOM_FILTER_BYTES << 3;
int num_bits = static_cast<uint32_t>(M);

// Handle overflow.
if (M > MAX || M < 0) {
num_bits = static_cast<uint32_t>(MAX);
}

// Get next power of 2 if bits is not power of 2.
if ((num_bits & (num_bits - 1)) != 0) {
num_bits = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bits));
}

// Minimum
if (num_bits < (Bloom::BYTES_PER_FILTER_BLOCK << 3)) {
num_bits = Bloom::BYTES_PER_FILTER_BLOCK << 3;
}

return num_bits;
}

void Bloom::InsertHash(uint64_t hash) {
uint32_t* const bitset32 = bitset_.get();
const uint32_t bucketIndex =
static_cast<uint32_t>(hash >> 32) & (num_bytes_ / BYTES_PER_FILTER_BLOCK - 1);
uint32_t key = static_cast<uint32_t>(hash);

// Calculate mask for bucket.
uint32_t mask[8];
SetMask(key, mask);

for (int i = 0; i < 8; i++) {
bitset32[bucketIndex * 8 + i] |= mask[i];
}
}

bool Bloom::FindHash(uint64_t hash) {
uint32_t* const bitset32 = bitset_.get();
const uint32_t bucketIndex =
static_cast<uint32_t>((hash >> 32) & (num_bytes_ / BYTES_PER_FILTER_BLOCK - 1));
uint32_t key = static_cast<uint32_t>(hash);

// Calculate mask for bucket.
uint32_t mask[8];
SetMask(key, mask);

for (int i = 0; i < 8; ++i) {
if (0 == (bitset32[8 * bucketIndex + i] & mask[i])) {
return false;
}
}
return true;
}

void Bloom::WriteTo(OutputStream* sink) {
sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(uint32_t));
sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), sizeof(uint32_t));
sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(uint32_t));
sink->Write(reinterpret_cast<const uint8_t*>(bitset_.get()), num_bytes_);
}

} // namespace parquet

0 comments on commit c155cf7

Please sign in to comment.