Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Merge fd3ba23 into 049af6f
Browse files Browse the repository at this point in the history
  • Loading branch information
Chen, Junjie committed Jul 23, 2018
2 parents 049af6f + fd3ba23 commit 54d49a1
Show file tree
Hide file tree
Showing 12 changed files with 1,142 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,16 @@ set(LIBPARQUET_SRCS
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
src/parquet/bloom_filter.cc
src/parquet/bloom_filter_algorithm.cc
src/parquet/column_reader.cc
src/parquet/column_scanner.cc
src/parquet/column_writer.cc
src/parquet/exception.cc
src/parquet/file_reader.cc
src/parquet/file_writer.cc
src/parquet/metadata.cc
src/parquet/murmur3.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
src/parquet/printer.cc
Expand Down
Binary file added data/bloom_filter.bin
Binary file not shown.
5 changes: 5 additions & 0 deletions src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

# Headers: top level
install(FILES
bloom_filter.h
bloom_filter_algorithm.h
column_reader.h
column_page.h
column_scanner.h
Expand All @@ -25,7 +27,9 @@ install(FILES
exception.h
file_reader.h
file_writer.h
hasher.h
metadata.h
murmur3.h
printer.h
properties.h
schema.h
Expand All @@ -50,6 +54,7 @@ install(FILES
"${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")

ADD_PARQUET_TEST(bloom_filter-test)
ADD_PARQUET_TEST(column_reader-test)
ADD_PARQUET_TEST(column_scanner-test)
ADD_PARQUET_TEST(column_writer-test)
Expand Down
10 changes: 10 additions & 0 deletions src/parquet/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of
Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary
generated by the Bloom filter class in the parquet-mr project and tests whether the
values inserted before could be filtered or not.

The Bloom filter binary is generated by three steps from Parquet-mr:
Step 1: Construct a Bloom filter with 1024 bytes of bitset.
Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter
by calling hash and insert APIs.
Step 3: Call writeTo API to write to File.
241 changes: 241 additions & 0 deletions src/parquet/bloom_filter-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <algorithm>
#include <random>
#include <string>

#include "arrow/io/file.h"
#include "parquet/bloom_filter.h"
#include "parquet/murmur3.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"

namespace parquet {
namespace test {
TEST(Murmur3Test, TestBloomFilter) {
uint64_t result;
const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
ByteArray byteArray(8, bitset);
MurmurHash3 murmur3;
result = murmur3.Hash(&byteArray);
EXPECT_EQ(result, UINT64_C(913737700387071329));
}

TEST(ConstructorTest, TestBloomFilter) {
EXPECT_NO_THROW(BloomFilter bloom_filter1(1000));

// It throws because the length does not match the bitset
std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
EXPECT_THROW(new BloomFilter(bitset1.get(), 0), ParquetException);

// It throws because the bitset is NULL
EXPECT_THROW(new BloomFilter(NULL, 1024), ParquetException);

std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());

// It throws because the number of bytes of Bloom filter bitset must be a power of 2.
EXPECT_THROW(new BloomFilter(bitset2.get(), 1023), ParquetException);
}

// The BasicTest is used to test basic operations including InsertHash, FindHash and
// serializing and de-serializing.
TEST(BasicTest, TestBloomFilter) {
BloomFilter bloom_filter(1024);

for (int i = 0; i < 10; i++) {
uint64_t hash_value = bloom_filter.Hash(i);
bloom_filter.InsertHash(hash_value);
}

for (int i = 0; i < 10; i++) {
EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
}

// Serialize Bloom filter to memory output stream
InMemoryOutputStream sink;
bloom_filter.WriteTo(&sink);

// Deserialize Bloom filter from memory
InMemoryInputStream source(sink.GetBuffer());

BloomFilter* de_bloom = BloomFilter::Deserialize(&source);

for (int i = 0; i < 10; i++) {
EXPECT_TRUE(de_bloom->FindHash(de_bloom->Hash(i)));
}
delete de_bloom;
}

// Helper function to generate random string.
std::string GetRandomString(uint32_t length) {
// Character set used to generate random string
const std::string charset =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";

// The uuid_seed was generated by "uuidgen -r"
const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());
std::mt19937 generator(seed);
std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
std::string ret = "";

for (uint32_t i = 0; i < length; i++) {
ret += charset[dist(generator)];
}

return ret;
}

TEST(FPPTest, TestBloomFilter) {
// It counts the number of times FindHash returns true.
int exist = 0;

// Total count of elements that will be used
const int total_count = 100000;

// Bloom filter fpp parameter
const double fpp = 0.01;

std::vector<std::string> members;
BloomFilter bloom_filter(BloomFilter::OptimalNumOfBits(total_count, fpp));

// Insert elements into the Bloom filter
for (int i = 0; i < total_count; i++) {
// Insert random string which length is 8
std::string tmp = GetRandomString(8);
const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
members.push_back(tmp);
bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
}

for (int i = 0; i < total_count; i++) {
const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
std::string tmp = GetRandomString(7);
const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));

if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
exist++;
}
}

// The exist should be probably less than 1000 according default FPP 0.01.
EXPECT_TRUE(exist < total_count * fpp);
}

// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
// and tests whether the values inserted before could be filtered or not.

// The Bloom filter binary is generated by three steps in from Parquet-mr.
// Step 1: Construct a Bloom filter with 1024 bytes bitset.
// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
// Step 3: Call writeTo API to write to File.
TEST(CompatibilityTest, TestBloomFilter) {
const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
const std::string bloom_filter_test_binary =
std::string(test::get_data_dir()) + "/bloom_filter.bin";
std::shared_ptr<::arrow::io::ReadableFile> handle;

int64_t size;
PARQUET_THROW_NOT_OK(
::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
PARQUET_THROW_NOT_OK(handle->GetSize(&size));

// 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
EXPECT_EQ(size, 1036);

std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
handle->Read(size, &buffer);

InMemoryInputStream source(buffer);
BloomFilter* bloom_filter1 = BloomFilter::Deserialize(&source);

for (int i = 0; i < 4; i++) {
const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
EXPECT_TRUE(bloom_filter1->FindHash(bloom_filter1->Hash(&tmp)));
}

// The following is used to check whether the new created Bloom filter in parquet-cpp is
// byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
// with same inserted hashes.
BloomFilter bloom_filter2(bloom_filter1->GetBitsetSize());
for (int i = 0; i < 4; i++) {
const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
}

// Serialize Bloom filter to memory output stream
InMemoryOutputStream sink;
bloom_filter2.WriteTo(&sink);
std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();

handle->Seek(0);
handle->GetSize(&size);
std::shared_ptr<Buffer> buffer2;
handle->Read(size, &buffer2);

EXPECT_TRUE((*buffer1).Equals(*buffer2));
delete bloom_filter1;
}

// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
// numbers according to formula:
// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
// where ndv is the number of distinct values and fpp is the false positive probability.
// Also it is used to test whether OptimalNumOfBits returns value between
// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
TEST(OptimalValueTest, TestBloomFilter) {
EXPECT_EQ(BloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));

EXPECT_EQ(BloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));

EXPECT_EQ(BloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));

EXPECT_EQ(BloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));

// Boundary check
EXPECT_EQ(BloomFilter::OptimalNumOfBits(4, 0.01) / 8, UINT32_C(32));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01) / 8,
UINT32_C(134217728));

EXPECT_EQ(BloomFilter::OptimalNumOfBits(4, 0.25) / 8, UINT32_C(32));
EXPECT_EQ(BloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25) / 8,
UINT32_C(134217728));
}

} // namespace test

} // namespace parquet

0 comments on commit 54d49a1

Please sign in to comment.