diff --git a/CMakeLists.txt b/CMakeLists.txt index ec7d66b2..62182c4b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -296,7 +296,6 @@ endif() # Library config set(LIBPARQUET_SRCS - src/parquet/column/serialized-page.cc src/parquet/column/reader.cc src/parquet/column/scanner.cc diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc index b9076bf5..ce165880 100644 --- a/example/decode_benchmark.cc +++ b/example/decode_benchmark.cc @@ -20,7 +20,11 @@ #include #include "parquet/compression/codec.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" +#include "parquet/encodings/delta-byte-array-encoding.h" +#include "parquet/encodings/delta-length-byte-array-encoding.h" #include "parquet/util/stopwatch.h" using namespace parquet_cpp; diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index 6a47917e..97547ce1 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -22,4 +22,5 @@ install(FILES types.h DESTINATION include/parquet) +ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(reader-test) diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt index 32ec11c3..99b4ed21 100644 --- a/src/parquet/column/CMakeLists.txt +++ b/src/parquet/column/CMakeLists.txt @@ -20,10 +20,8 @@ install(FILES page.h levels.h reader.h - serialized-page.h scanner.h DESTINATION include/parquet/column) ADD_PARQUET_TEST(column-reader-test) ADD_PARQUET_TEST(levels-test) -ADD_PARQUET_TEST(serialized-page-test) diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 84a36dba..0abdf79c 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include #include #include -#include -#include +#include #include #include @@ -28,15 +28,13 @@ #include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/test-util.h" - -#include "parquet/util/output.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" #include "parquet/util/test-common.h" using std::string; using std::vector; using std::shared_ptr; -using parquet::FieldRepetitionType; -using parquet::SchemaElement; namespace parquet_cpp { diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc index 99cc21ee..6061d235 100644 --- a/src/parquet/column/levels-test.cc +++ b/src/parquet/column/levels-test.cc @@ -15,98 +15,94 @@ // specific language governing permissions and limitations // under the License. -#include -#include -#include +#include #include +#include #include -#include "parquet/thrift/parquet_types.h" #include "parquet/column/levels.h" +#include "parquet/types.h" using std::string; namespace parquet_cpp { -class TestLevels : public ::testing::Test { - public: - int GenerateLevels(int min_repeat_factor, int max_repeat_factor, - int max_level, std::vector& input_levels) { - int total_count = 0; - // for each repetition count upto max_repeat_factor - for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { - // repeat count increase by a factor of 2 for every iteration - int repeat_count = (1 << repeat); - // generate levels for repetition count upto the maximum level - int value = 0; - int bwidth = 0; - while (value <= max_level) { - for (int i = 0; i < repeat_count; i++) { - input_levels[total_count++] = value; - } - value = (2 << bwidth) - 1; - bwidth++; +int GenerateLevels(int min_repeat_factor, int max_repeat_factor, + int max_level, std::vector& input_levels) { + int total_count = 0; + // for each repetition count upto max_repeat_factor + for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { + // repeat count increase by a factor of 2 for every iteration + int repeat_count = (1 << repeat); + // generate levels for repetition count upto the maximum level + int value = 0; + int bwidth = 0; + while (value <= max_level) { + for (int i = 0; i < repeat_count; i++) { + input_levels[total_count++] = value; } + value = (2 << bwidth) - 1; + bwidth++; } - return total_count; } + return total_count; +} - void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level, - std::vector& input_levels) { - LevelEncoder encoder; - LevelDecoder decoder; - int levels_count = 0; - std::vector output_levels; - std::vector bytes; - int num_levels = input_levels.size(); - output_levels.resize(num_levels); - bytes.resize(2 * num_levels); - ASSERT_EQ(num_levels, output_levels.size()); - ASSERT_EQ(2 * num_levels, bytes.size()); - // start encoding and decoding - if (encoding == parquet::Encoding::RLE) { - // leave space to write the rle length value - encoder.Init(encoding, max_level, num_levels, - bytes.data() + sizeof(uint32_t), bytes.size()); - - levels_count = encoder.Encode(num_levels, input_levels.data()); - (reinterpret_cast(bytes.data()))[0] = encoder.len(); - - } else { - encoder.Init(encoding, max_level, num_levels, - bytes.data(), bytes.size()); - levels_count = encoder.Encode(num_levels, input_levels.data()); - } +void VerifyLevelsEncoding(Encoding::type encoding, int max_level, + std::vector& input_levels) { + LevelEncoder encoder; + LevelDecoder decoder; + int levels_count = 0; + std::vector output_levels; + std::vector bytes; + int num_levels = input_levels.size(); + output_levels.resize(num_levels); + bytes.resize(2 * num_levels); + ASSERT_EQ(num_levels, output_levels.size()); + ASSERT_EQ(2 * num_levels, bytes.size()); + // start encoding and decoding + if (encoding == Encoding::RLE) { + // leave space to write the rle length value + encoder.Init(encoding, max_level, num_levels, + bytes.data() + sizeof(uint32_t), bytes.size()); + + levels_count = encoder.Encode(num_levels, input_levels.data()); + (reinterpret_cast(bytes.data()))[0] = encoder.len(); + + } else { + encoder.Init(encoding, max_level, num_levels, + bytes.data(), bytes.size()); + levels_count = encoder.Encode(num_levels, input_levels.data()); + } - ASSERT_EQ(num_levels, levels_count); + ASSERT_EQ(num_levels, levels_count); - decoder.Init(encoding, max_level, num_levels, bytes.data()); - levels_count = decoder.Decode(num_levels, output_levels.data()); + decoder.Init(encoding, max_level, num_levels, bytes.data()); + levels_count = decoder.Decode(num_levels, output_levels.data()); - ASSERT_EQ(num_levels, levels_count); + ASSERT_EQ(num_levels, levels_count); - for (int i = 0; i < num_levels; i++) { - EXPECT_EQ(input_levels[i], output_levels[i]); - } + for (int i = 0; i < num_levels; i++) { + EXPECT_EQ(input_levels[i], output_levels[i]); } -}; +} + +TEST(TestLevels, TestEncodeDecodeLevels) { + // test levels with maximum bit-width from 1 to 8 + // increase the repetition count for each iteration by a factor of 2 -// test levels with maximum bit-width from 1 to 8 -// increase the repetition count for each iteration by a factor of 2 -TEST_F(TestLevels, TestEncodeDecodeLevels) { int min_repeat_factor = 0; int max_repeat_factor = 7; // 128 int max_bit_width = 8; std::vector input_levels; - parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE, - parquet::Encoding::BIT_PACKED}; + Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; // for each encoding for (int encode = 0; encode < 2; encode++) { - parquet::Encoding::type encoding = encodings[encode]; + Encoding::type encoding = encodings[encode]; // BIT_PACKED requires a sequence of atleast 8 - if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3; + if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3; // for each maximum bit-width for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h index 40562230..18fd0bb5 100644 --- a/src/parquet/column/levels.h +++ b/src/parquet/column/levels.h @@ -18,9 +18,10 @@ #ifndef PARQUET_COLUMN_LEVELS_H #define PARQUET_COLUMN_LEVELS_H +#include + #include "parquet/exception.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/encodings/encodings.h" +#include "parquet/types.h" #include "parquet/util/rle-encoding.h" namespace parquet_cpp { @@ -30,16 +31,16 @@ class LevelEncoder { LevelEncoder() {} // Initialize the LevelEncoder. - void Init(parquet::Encoding::type encoding, int16_t max_level, + void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, uint8_t* data, int data_size) { bit_width_ = BitUtil::Log2(max_level + 1); encoding_ = encoding; switch (encoding) { - case parquet::Encoding::RLE: { + case Encoding::RLE: { rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_)); break; } - case parquet::Encoding::BIT_PACKED: { + case Encoding::BIT_PACKED: { int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); bit_packed_encoder_.reset(new BitWriter(data, num_bytes)); break; @@ -56,7 +57,7 @@ class LevelEncoder { throw ParquetException("Level encoders are not initialized."); } - if (encoding_ == parquet::Encoding::RLE) { + if (encoding_ == Encoding::RLE) { for (size_t i = 0; i < batch_size; ++i) { if (!rle_encoder_->Put(*(levels + i))) { break; @@ -78,14 +79,16 @@ class LevelEncoder { } int32_t len() { - assert(encoding_ == parquet::Encoding::RLE); + if (encoding_ != Encoding::RLE) { + throw ParquetException("Only implemented for RLE encoding"); + } return rle_length_; } private: int bit_width_; int rle_length_; - parquet::Encoding::type encoding_; + Encoding::type encoding_; std::unique_ptr rle_encoder_; std::unique_ptr bit_packed_encoder_; }; @@ -96,20 +99,20 @@ class LevelDecoder { LevelDecoder() {} // Initialize the LevelDecoder and return the number of bytes consumed - size_t Init(parquet::Encoding::type encoding, int16_t max_level, + size_t Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data) { uint32_t num_bytes = 0; uint32_t total_bytes = 0; bit_width_ = BitUtil::Log2(max_level + 1); encoding_ = encoding; switch (encoding) { - case parquet::Encoding::RLE: { + case Encoding::RLE: { num_bytes = *reinterpret_cast(data); const uint8_t* decoder_data = data + sizeof(uint32_t); rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); return sizeof(uint32_t) + num_bytes; } - case parquet::Encoding::BIT_PACKED: { + case Encoding::BIT_PACKED: { num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); bit_packed_decoder_.reset(new BitReader(data, num_bytes)); return num_bytes; @@ -127,7 +130,7 @@ class LevelDecoder { throw ParquetException("Level decoders are not initialized."); } - if (encoding_ == parquet::Encoding::RLE) { + if (encoding_ == Encoding::RLE) { for (size_t i = 0; i < batch_size; ++i) { if (!rle_decoder_->Get(levels + i)) { break; @@ -147,7 +150,7 @@ class LevelDecoder { private: int bit_width_; - parquet::Encoding::type encoding_; + Encoding::type encoding_; std::unique_ptr rle_decoder_; std::unique_ptr bit_packed_decoder_; }; diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index f2740b61..3308a1c7 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -22,32 +22,28 @@ #ifndef PARQUET_COLUMN_PAGE_H #define PARQUET_COLUMN_PAGE_H -#include "parquet/thrift/parquet_types.h" +#include +#include + +#include "parquet/types.h" namespace parquet_cpp { -// Note: Copying the specific page header Thrift metadata to the Page object -// (instead of using a pointer) presently so that data pages can be -// decompressed and processed in parallel. We can turn the header members of -// these classes into pointers at some point, but the downside is that -// applications materializing multiple data pages at once will have to have a -// data container that manages the lifetime of the deserialized -// parquet::PageHeader structs. -// // TODO: Parallel processing is not yet safe because of memory-ownership // semantics (the PageReader may or may not own the memory referenced by a // page) +// +// TODO(wesm): In the future Parquet implementations may store the crc code +// in parquet::PageHeader. parquet-mr currently does not, so we also skip it +// here, both on the read and write path class Page { - // TODO(wesm): In the future Parquet implementations may store the crc code - // in parquet::PageHeader. parquet-mr currently does not, so we also skip it - // here, both on the read and write path public: - Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) : + Page(const uint8_t* buffer, int32_t buffer_size, PageType::type type) : buffer_(buffer), buffer_size_(buffer_size), type_(type) {} - parquet::PageType::type type() const { + PageType::type type() const { return type_; } @@ -57,71 +53,138 @@ class Page { } // @returns: the total size in bytes of the page's data buffer - size_t size() const { + int32_t size() const { return buffer_size_; } private: const uint8_t* buffer_; - size_t buffer_size_; + int32_t buffer_size_; - parquet::PageType::type type_; + PageType::type type_; }; class DataPage : public Page { public: - DataPage(const uint8_t* buffer, size_t buffer_size, - const parquet::DataPageHeader& header) : - Page(buffer, buffer_size, parquet::PageType::DATA_PAGE), - header_(header) {} - - size_t num_values() const { - return header_.num_values; + DataPage(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, Encoding::type encoding, + Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding) : + Page(buffer, buffer_size, PageType::DATA_PAGE), + num_values_(num_values), + encoding_(encoding), + definition_level_encoding_(definition_level_encoding), + repetition_level_encoding_(repetition_level_encoding) {} + + int32_t num_values() const { + return num_values_; } - parquet::Encoding::type encoding() const { - return header_.encoding; + Encoding::type encoding() const { + return encoding_; } - parquet::Encoding::type repetition_level_encoding() const { - return header_.repetition_level_encoding; + Encoding::type repetition_level_encoding() const { + return repetition_level_encoding_; } - parquet::Encoding::type definition_level_encoding() const { - return header_.definition_level_encoding; + Encoding::type definition_level_encoding() const { + return definition_level_encoding_; } private: - parquet::DataPageHeader header_; + int32_t num_values_; + Encoding::type encoding_; + Encoding::type definition_level_encoding_; + Encoding::type repetition_level_encoding_; + + // TODO(wesm): parquet::DataPageHeader.statistics }; class DataPageV2 : public Page { public: - DataPageV2(const uint8_t* buffer, size_t buffer_size, - const parquet::DataPageHeaderV2& header) : - Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2), - header_(header) {} + DataPageV2(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, int32_t num_nulls, int32_t num_rows, + Encoding::type encoding, + int32_t definition_levels_byte_length, + int32_t repetition_levels_byte_length, bool is_compressed = false) : + Page(buffer, buffer_size, PageType::DATA_PAGE_V2), + num_values_(num_values), + num_nulls_(num_nulls), + num_rows_(num_rows), + encoding_(encoding), + definition_levels_byte_length_(definition_levels_byte_length), + repetition_levels_byte_length_(repetition_levels_byte_length), + is_compressed_(is_compressed) {} + + int32_t num_values() const { + return num_values_; + } + + int32_t num_nulls() const { + return num_nulls_; + } + + int32_t num_rows() const { + return num_rows_; + } + + Encoding::type encoding() const { + return encoding_; + } + + int32_t definition_levels_byte_length() const { + return definition_levels_byte_length_; + } + + int32_t repetition_levels_byte_length() const { + return repetition_levels_byte_length_; + } + + bool is_compressed() const { + return is_compressed_; + } private: - parquet::DataPageHeaderV2 header_; + int32_t num_values_; + int32_t num_nulls_; + int32_t num_rows_; + Encoding::type encoding_; + int32_t definition_levels_byte_length_; + int32_t repetition_levels_byte_length_; + bool is_compressed_; + + // TODO(wesm): parquet::DataPageHeaderV2.statistics }; class DictionaryPage : public Page { public: - DictionaryPage(const uint8_t* buffer, size_t buffer_size, - const parquet::DictionaryPageHeader& header) : - Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE), - header_(header) {} + DictionaryPage(const uint8_t* buffer, int32_t buffer_size, + int32_t num_values, Encoding::type encoding, bool is_sorted = false) : + Page(buffer, buffer_size, PageType::DICTIONARY_PAGE), + num_values_(num_values), + encoding_(encoding), + is_sorted_(is_sorted) {} + + int32_t num_values() const { + return num_values_; + } + + Encoding::type encoding() const { + return encoding_; + } - size_t num_values() const { - return header_.num_values; + bool is_sorted() const { + return is_sorted_; } private: - parquet::DictionaryPageHeader header_; + int32_t num_values_; + Encoding::type encoding_; + bool is_sorted_; }; // Abstract page iterator interface. This way, we can feed column pages to the diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index 878bd4ff..4ba06163 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -18,13 +18,13 @@ #include "parquet/column/reader.h" #include +#include #include -#include -#include #include "parquet/column/page.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/plain-encoding.h" namespace parquet_cpp { @@ -37,7 +37,7 @@ ColumnReader::ColumnReader(const ColumnDescriptor* descr, template void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { - int encoding = static_cast(parquet::Encoding::RLE_DICTIONARY); + int encoding = static_cast(Encoding::RLE_DICTIONARY); auto it = decoders_.find(encoding); if (it != decoders_.end()) { @@ -61,9 +61,9 @@ void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. -static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) { - return e == parquet::Encoding::RLE_DICTIONARY || - e == parquet::Encoding::PLAIN_DICTIONARY; +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || + e == Encoding::PLAIN_DICTIONARY; } template @@ -78,10 +78,10 @@ bool TypedColumnReader::ReadNewPage() { return false; } - if (current_page_->type() == parquet::PageType::DICTIONARY_PAGE) { + if (current_page_->type() == PageType::DICTIONARY_PAGE) { ConfigureDictionary(static_cast(current_page_.get())); continue; - } else if (current_page_->type() == parquet::PageType::DATA_PAGE) { + } else if (current_page_->type() == PageType::DATA_PAGE) { const DataPage* page = static_cast(current_page_.get()); // Read a data page. @@ -123,10 +123,10 @@ bool TypedColumnReader::ReadNewPage() { // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. - parquet::Encoding::type encoding = page->encoding(); + Encoding::type encoding = page->encoding(); if (IsDictionaryIndexEncoding(encoding)) { - encoding = parquet::Encoding::RLE_DICTIONARY; + encoding = Encoding::RLE_DICTIONARY; } auto it = decoders_.find(static_cast(encoding)); @@ -134,18 +134,18 @@ bool TypedColumnReader::ReadNewPage() { current_decoder_ = it->second.get(); } else { switch (encoding) { - case parquet::Encoding::PLAIN: { + case Encoding::PLAIN: { std::shared_ptr decoder(new PlainDecoder(descr_)); decoders_[static_cast(encoding)] = decoder; current_decoder_ = decoder.get(); break; } - case parquet::Encoding::RLE_DICTIONARY: + case Encoding::RLE_DICTIONARY: throw ParquetException("Dictionary page must be before data page."); - case parquet::Encoding::DELTA_BINARY_PACKED: - case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: - case parquet::Encoding::DELTA_BYTE_ARRAY: + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: ParquetException::NYI("Unsupported encoding"); default: diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 4585de81..d11a13c8 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -22,25 +22,17 @@ #include #include #include -#include #include -#include - -#include "parquet/exception.h" -#include "parquet/types.h" +#include "parquet/column/levels.h" #include "parquet/column/page.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/decoder.h" +#include "parquet/exception.h" #include "parquet/schema/descriptor.h" -#include "parquet/util/rle-encoding.h" -#include "parquet/column/levels.h" +#include "parquet/types.h" namespace parquet_cpp { - -class Codec; -class Scanner; - class ColumnReader { public: ColumnReader(const ColumnDescriptor*, std::unique_ptr); diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc index 58f14606..4a0b32f8 100644 --- a/src/parquet/column/scanner.cc +++ b/src/parquet/column/scanner.cc @@ -17,6 +17,7 @@ #include "parquet/column/scanner.h" +#include #include #include "parquet/column/reader.h" @@ -24,7 +25,7 @@ namespace parquet_cpp { std::shared_ptr Scanner::Make(std::shared_ptr col_reader, - size_t batch_size) { + int64_t batch_size) { switch (col_reader->type()) { case Type::BOOLEAN: return std::make_shared(col_reader, batch_size); diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h index 17fd5f66..512f540c 100644 --- a/src/parquet/column/scanner.h +++ b/src/parquet/column/scanner.h @@ -18,24 +18,26 @@ #ifndef PARQUET_COLUMN_SCANNER_H #define PARQUET_COLUMN_SCANNER_H +#include +#include #include #include #include #include #include "parquet/column/reader.h" - +#include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/types.h" namespace parquet_cpp { -static constexpr size_t DEFAULT_SCANNER_BATCH_SIZE = 128; +static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; class Scanner { public: explicit Scanner(std::shared_ptr reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : batch_size_(batch_size), level_offset_(0), levels_buffered_(0), @@ -50,7 +52,7 @@ class Scanner { virtual ~Scanner() {} static std::shared_ptr Make(std::shared_ptr col_reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE); + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE); virtual void PrintNext(std::ostream& out, int width) = 0; @@ -62,14 +64,14 @@ class Scanner { return reader_->descr(); } - size_t batch_size() const { return batch_size_;} + int64_t batch_size() const { return batch_size_;} - void SetBatchSize(size_t batch_size) { + void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } protected: - size_t batch_size_; + int64_t batch_size_; std::vector def_levels_; std::vector rep_levels_; @@ -91,7 +93,7 @@ class TypedScanner : public Scanner { typedef typename type_traits::value_type T; explicit TypedScanner(std::shared_ptr reader, - size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : + int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) : Scanner(reader, batch_size) { typed_reader_ = static_cast*>(reader.get()); size_t value_byte_size = type_traits::value_byte_size; diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc deleted file mode 100644 index 56b73a70..00000000 --- a/src/parquet/column/serialized-page.cc +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/column/serialized-page.h" - -#include - -#include "parquet/exception.h" -#include "parquet/thrift/util.h" - -using parquet::PageType; - -namespace parquet_cpp { - -// ---------------------------------------------------------------------- -// SerializedPageReader deserializes Thrift metadata and pages that have been -// assembled in a serialized stream for storing in a Parquet files - -SerializedPageReader::SerializedPageReader(std::unique_ptr stream, - parquet::CompressionCodec::type codec) : - stream_(std::move(stream)) { - max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - switch (codec) { - case parquet::CompressionCodec::UNCOMPRESSED: - break; - case parquet::CompressionCodec::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - default: - ParquetException::NYI("Reading compressed data"); - } -} - - -std::shared_ptr SerializedPageReader::NextPage() { - // Loop here because there may be unhandled page types that we skip until - // finding a page that we do know what to do with - while (true) { - int64_t bytes_read = 0; - int64_t bytes_available = 0; - uint32_t header_size = 0; - const uint8_t* buffer; - uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE; - std::stringstream ss; - - // Page headers can be very large because of page statistics - // We try to deserialize a larger buffer progressively - // until a maximum allowed header limit - while (true) { - buffer = stream_->Peek(allowed_page_size, &bytes_available); - if (bytes_available == 0) { - return std::shared_ptr(nullptr); - } - - // This gets used, then set by DeserializeThriftMsg - header_size = bytes_available; - try { - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - break; - } catch (std::exception& e) { - // Failed to deserialize. Double the allowed page header size and try again - ss << e.what(); - allowed_page_size *= 2; - if (allowed_page_size > max_page_header_size_) { - ss << "Deserializing page header failed.\n"; - throw ParquetException(ss.str()); - } - } - } - // Advance the stream offset - stream_->Read(header_size, &bytes_read); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) ParquetException::EofException(); - - // Uncompress it if we need to - if (decompressor_ != NULL) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > decompression_buffer_.size()) { - decompression_buffer_.resize(uncompressed_len); - } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; - } - - if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - return std::make_shared(buffer, uncompressed_len, - current_page_header_.dictionary_page_header); - } else if (current_page_header_.type == PageType::DATA_PAGE) { - return std::make_shared(buffer, uncompressed_len, - current_page_header_.data_page_header); - } else if (current_page_header_.type == PageType::DATA_PAGE_V2) { - ParquetException::NYI("data page v2"); - } else { - // We don't know what this page type is. We're allowed to skip non-data - // pages. - continue; - } - } - return std::shared_ptr(nullptr); -} - -} // namespace parquet_cpp diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h deleted file mode 100644 index 62bf66df..00000000 --- a/src/parquet/column/serialized-page.h +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// This module defines an abstract interface for iterating through pages in a -// Parquet column chunk within a row group. It could be extended in the future -// to iterate through all data pages in all chunks in a file. - -#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H -#define PARQUET_COLUMN_SERIALIZED_PAGE_H - -#include -#include - -#include "parquet/column/page.h" -#include "parquet/compression/codec.h" -#include "parquet/util/input.h" -#include "parquet/thrift/parquet_types.h" - -namespace parquet_cpp { - -// 16 MB is the default maximum page header size -static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; -// 16 KB is the default expected page header size -static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; -// This subclass delimits pages appearing in a serialized stream, each preceded -// by a serialized Thrift parquet::PageHeader indicating the type of each page -// and the page metadata. -class SerializedPageReader : public PageReader { - public: - SerializedPageReader(std::unique_ptr stream, - parquet::CompressionCodec::type codec); - - virtual ~SerializedPageReader() {} - - // Implement the PageReader interface - virtual std::shared_ptr NextPage(); - - void set_max_page_header_size(uint32_t size) { - max_page_header_size_ = size; - } - - private: - std::unique_ptr stream_; - - parquet::PageHeader current_page_header_; - std::shared_ptr current_page_; - - // Compression codec to use. - std::unique_ptr decompressor_; - std::vector decompression_buffer_; - // Maximum allowed page size - uint32_t max_page_header_size_; -}; - -} // namespace parquet_cpp - -#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 90dde3bf..b346fc2b 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -27,7 +27,14 @@ #include #include +#include "parquet/column/levels.h" #include "parquet/column/page.h" + +// Depended on by SerializedPageReader test utilities for now +#include "parquet/encodings/plain-encoding.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input.h" + namespace parquet_cpp { namespace test { @@ -61,36 +68,38 @@ class DataPageBuilder { typedef typename type_traits::value_type T; // This class writes data and metadata to the passed inputs - explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) : + explicit DataPageBuilder(InMemoryOutputStream* sink) : sink_(sink), - header_(header), num_values_(0), + encoding_(Encoding::PLAIN), + definition_level_encoding_(Encoding::RLE), + repetition_level_encoding_(Encoding::RLE), have_def_levels_(false), have_rep_levels_(false), have_values_(false) { } - void AppendDefLevels(const std::vector& levels, - int16_t max_level, parquet::Encoding::type encoding) { + void AppendDefLevels(const std::vector& levels, int16_t max_level, + Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); - num_values_ = std::max(levels.size(), num_values_); - header_->__set_definition_level_encoding(encoding); + num_values_ = std::max(static_cast(levels.size()), num_values_); + definition_level_encoding_ = encoding; have_def_levels_ = true; } - void AppendRepLevels(const std::vector& levels, - int16_t max_level, parquet::Encoding::type encoding) { + void AppendRepLevels(const std::vector& levels, int16_t max_level, + Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); - num_values_ = std::max(levels.size(), num_values_); - header_->__set_repetition_level_encoding(encoding); + num_values_ = std::max(static_cast(levels.size()), num_values_); + repetition_level_encoding_ = encoding; have_rep_levels_ = true; } void AppendValues(const std::vector& values, - parquet::Encoding::type encoding) { - if (encoding != parquet::Encoding::PLAIN) { + Encoding::type encoding = Encoding::PLAIN) { + if (encoding != Encoding::PLAIN) { ParquetException::NYI("only plain encoding currently implemented"); } size_t bytes_to_encode = values.size() * sizeof(T); @@ -98,31 +107,43 @@ class DataPageBuilder { PlainEncoder encoder(nullptr); encoder.Encode(&values[0], values.size(), sink_); - num_values_ = std::max(values.size(), num_values_); - header_->__set_encoding(encoding); + num_values_ = std::max(static_cast(values.size()), num_values_); + encoding_ = encoding; have_values_ = true; } - void Finish() { - if (!have_values_) { - throw ParquetException("A data page must at least contain values"); - } - header_->__set_num_values(num_values_); + int32_t num_values() const { + return num_values_; + } + + Encoding::type encoding() const { + return encoding_; + } + + Encoding::type rep_level_encoding() const { + return repetition_level_encoding_; + } + + Encoding::type def_level_encoding() const { + return definition_level_encoding_; } private: InMemoryOutputStream* sink_; - parquet::DataPageHeader* header_; - size_t num_values_; + int32_t num_values_; + Encoding::type encoding_; + Encoding::type definition_level_encoding_; + Encoding::type repetition_level_encoding_; + bool have_def_levels_; bool have_rep_levels_; bool have_values_; // Used internally for both repetition and definition levels void AppendLevels(const std::vector& levels, int16_t max_level, - parquet::Encoding::type encoding) { - if (encoding != parquet::Encoding::RLE) { + Encoding::type encoding) { + if (encoding != Encoding::RLE) { ParquetException::NYI("only rle encoding currently implemented"); } @@ -152,32 +173,32 @@ static std::shared_ptr MakeDataPage(const std::vector& values, size_t num_values = values.size(); InMemoryOutputStream page_stream; - parquet::DataPageHeader page_header; - - test::DataPageBuilder page_builder(&page_stream, &page_header); + test::DataPageBuilder page_builder(&page_stream); if (!rep_levels.empty()) { - page_builder.AppendRepLevels(rep_levels, max_rep_level, - parquet::Encoding::RLE); + page_builder.AppendRepLevels(rep_levels, max_rep_level); } if (!def_levels.empty()) { - page_builder.AppendDefLevels(def_levels, max_def_level, - parquet::Encoding::RLE); + page_builder.AppendDefLevels(def_levels, max_def_level); } - page_builder.AppendValues(values, parquet::Encoding::PLAIN); - page_builder.Finish(); - - // Hand off the data stream to the passed std::vector + page_builder.AppendValues(values); page_stream.Transfer(out_buffer); - return std::make_shared(&(*out_buffer)[0], out_buffer->size(), page_header); + return std::make_shared(&(*out_buffer)[0], out_buffer->size(), + page_builder.num_values(), + page_builder.encoding(), + page_builder.def_level_encoding(), + page_builder.rep_level_encoding()); } + } // namespace test +// Utilities for testing the SerializedPageReader internally + static inline void InitDataPage(const parquet::Statistics& stat, - parquet::DataPageHeader& data_page, int nvalues) { + parquet::DataPageHeader& data_page, int32_t nvalues) { data_page.encoding = parquet::Encoding::PLAIN; data_page.definition_level_encoding = parquet::Encoding::RLE; data_page.repetition_level_encoding = parquet::Encoding::RLE; diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc index dfd50f6b..a131031a 100644 --- a/src/parquet/compression/lz4-codec.cc +++ b/src/parquet/compression/lz4-codec.cc @@ -18,6 +18,9 @@ #include "parquet/compression/codec.h" #include +#include + +#include "parquet/exception.h" namespace parquet_cpp { @@ -26,7 +29,7 @@ void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, int64_t n = LZ4_decompress_fast(reinterpret_cast(input), reinterpret_cast(output_buffer), output_len); if (n != input_len) { - throw parquet_cpp::ParquetException("Corrupt lz4 compressed data."); + throw ParquetException("Corrupt lz4 compressed data."); } } diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc index 4135a153..91590dbf 100644 --- a/src/parquet/compression/snappy-codec.cc +++ b/src/parquet/compression/snappy-codec.cc @@ -18,6 +18,10 @@ #include "parquet/compression/codec.h" #include +#include +#include + +#include "parquet/exception.h" namespace parquet_cpp { diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt index 638fba0f..c9349afb 100644 --- a/src/parquet/encodings/CMakeLists.txt +++ b/src/parquet/encodings/CMakeLists.txt @@ -17,7 +17,8 @@ # Headers: encodings install(FILES - encodings.h + decoder.h + encoder.h delta-bit-pack-encoding.h delta-byte-array-encoding.h delta-length-byte-array-encoding.h diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/decoder.h similarity index 56% rename from src/parquet/encodings/encodings.h rename to src/parquet/encodings/decoder.h index 46c61b6c..55b29e8a 100644 --- a/src/parquet/encodings/encodings.h +++ b/src/parquet/encodings/decoder.h @@ -15,24 +15,18 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_ENCODINGS_ENCODINGS_H -#define PARQUET_ENCODINGS_ENCODINGS_H +#ifndef PARQUET_ENCODINGS_DECODER_H +#define PARQUET_ENCODINGS_DECODER_H #include #include "parquet/exception.h" #include "parquet/types.h" -#include "parquet/util/output.h" -#include "parquet/util/rle-encoding.h" -#include "parquet/util/bit-stream-utils.inline.h" - -#include "parquet/schema/descriptor.h" - -#include "parquet/thrift/parquet_types.h" - namespace parquet_cpp { +class ColumnDescriptor; + // The Decoder template is parameterized on parquet_cpp::Type::type template class Decoder { @@ -57,55 +51,20 @@ class Decoder { // the number of values left in this page. int values_left() const { return num_values_; } - const parquet::Encoding::type encoding() const { return encoding_; } + const Encoding::type encoding() const { return encoding_; } protected: explicit Decoder(const ColumnDescriptor* descr, - const parquet::Encoding::type& encoding) + const Encoding::type& encoding) : descr_(descr), encoding_(encoding), num_values_(0) {} // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; - const parquet::Encoding::type encoding_; + const Encoding::type encoding_; int num_values_; }; - -// Base class for value encoders. Since encoders may or not have state (e.g., -// dictionary encoding) we use a class instance to maintain any state. -// -// TODO(wesm): Encode interface API is temporary -template -class Encoder { - public: - typedef typename type_traits::value_type T; - - virtual ~Encoder() {} - - // Subclasses should override the ones they support - virtual void Encode(const T* src, int num_values, OutputStream* dst) { - throw ParquetException("Encoder does not implement this type."); - } - - const parquet::Encoding::type encoding() const { return encoding_; } - - protected: - explicit Encoder(const ColumnDescriptor* descr, - const parquet::Encoding::type& encoding) - : descr_(descr), encoding_(encoding) {} - - // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY - const ColumnDescriptor* descr_; - const parquet::Encoding::type encoding_; -}; - } // namespace parquet_cpp -#include "parquet/encodings/plain-encoding.h" -#include "parquet/encodings/dictionary-encoding.h" -#include "parquet/encodings/delta-bit-pack-encoding.h" -#include "parquet/encodings/delta-length-byte-array-encoding.h" -#include "parquet/encodings/delta-byte-array-encoding.h" - -#endif // PARQUET_ENCODINGS_ENCODINGS_H +#endif // PARQUET_ENCODINGS_DECODER_H diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h index 4eb762bf..d512db9d 100644 --- a/src/parquet/encodings/delta-bit-pack-encoding.h +++ b/src/parquet/encodings/delta-bit-pack-encoding.h @@ -18,11 +18,13 @@ #ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H #define PARQUET_DELTA_BIT_PACK_ENCODING_H -#include "parquet/encodings/encodings.h" - #include +#include #include +#include "parquet/encodings/decoder.h" +#include "parquet/util/bit-stream-utils.inline.h" + namespace parquet_cpp { template @@ -31,7 +33,7 @@ class DeltaBitPackDecoder : public Decoder { typedef typename type_traits::value_type T; explicit DeltaBitPackDecoder(const ColumnDescriptor* descr) - : Decoder(descr, parquet::Encoding::DELTA_BINARY_PACKED) { + : Decoder(descr, Encoding::DELTA_BINARY_PACKED) { if (TYPE != Type::INT32 && TYPE != Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h index 2763f161..01dceea9 100644 --- a/src/parquet/encodings/delta-byte-array-encoding.h +++ b/src/parquet/encodings/delta-byte-array-encoding.h @@ -18,16 +18,18 @@ #ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H #define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/delta-length-byte-array-encoding.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" + namespace parquet_cpp { class DeltaByteArrayDecoder : public Decoder { public: explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr) - : Decoder(descr, parquet::Encoding::DELTA_BYTE_ARRAY), + : Decoder(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr), suffix_decoder_(nullptr) { } diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h index 0868924a..a1b4fd33 100644 --- a/src/parquet/encodings/delta-length-byte-array-encoding.h +++ b/src/parquet/encodings/delta-length-byte-array-encoding.h @@ -18,9 +18,12 @@ #ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H #define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include +#include +#include + +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/delta-bit-pack-encoding.h" namespace parquet_cpp { @@ -28,7 +31,7 @@ class DeltaLengthByteArrayDecoder : public Decoder { public: explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr) : Decoder(descr, - parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), + Encoding::DELTA_LENGTH_BYTE_ARRAY), len_decoder_(nullptr) { } diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index 0547eb39..b52aefb7 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -18,11 +18,14 @@ #ifndef PARQUET_DICTIONARY_ENCODING_H #define PARQUET_DICTIONARY_ENCODING_H -#include "parquet/encodings/encodings.h" - #include +#include #include +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/encoder.h" +#include "parquet/util/rle-encoding.h" + namespace parquet_cpp { template @@ -35,7 +38,7 @@ class DictionaryDecoder : public Decoder { // dictionary decoder needs to copy the data out if necessary. DictionaryDecoder(const ColumnDescriptor* descr, Decoder* dictionary) - : Decoder(descr, parquet::Encoding::RLE_DICTIONARY) { + : Decoder(descr, Encoding::RLE_DICTIONARY) { Init(dictionary); } diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h new file mode 100644 index 00000000..50ba48f9 --- /dev/null +++ b/src/parquet/encodings/encoder.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_ENCODINGS_ENCODER_H +#define PARQUET_ENCODINGS_ENCODER_H + +#include + +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet_cpp { + +class ColumnDescriptor; +class OutputStream; + +// Base class for value encoders. Since encoders may or not have state (e.g., +// dictionary encoding) we use a class instance to maintain any state. +// +// TODO(wesm): Encode interface API is temporary +template +class Encoder { + public: + typedef typename type_traits::value_type T; + + virtual ~Encoder() {} + + // Subclasses should override the ones they support + virtual void Encode(const T* src, int num_values, OutputStream* dst) { + throw ParquetException("Encoder does not implement this type."); + } + + const Encoding::type encoding() const { return encoding_; } + + protected: + explicit Encoder(const ColumnDescriptor* descr, + const Encoding::type& encoding) + : descr_(descr), encoding_(encoding) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ColumnDescriptor* descr_; + const Encoding::type encoding_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_ENCODINGS_ENCODER_H diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc index 16862b86..955a4155 100644 --- a/src/parquet/encodings/plain-encoding-test.cc +++ b/src/parquet/encodings/plain-encoding-test.cc @@ -16,13 +16,17 @@ // under the License. #include +#include #include #include #include -#include "parquet/util/test-common.h" -#include "parquet/encodings/encodings.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" using std::string; using std::vector; diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index a450eb48..78560fd1 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -18,11 +18,15 @@ #ifndef PARQUET_PLAIN_ENCODING_H #define PARQUET_PLAIN_ENCODING_H -#include "parquet/encodings/encodings.h" - #include #include +#include "parquet/encodings/decoder.h" +#include "parquet/encodings/encoder.h" +#include "parquet/schema/descriptor.h" +#include "parquet/util/bit-stream-utils.inline.h" +#include "parquet/util/output.h" + namespace parquet_cpp { // ---------------------------------------------------------------------- @@ -35,7 +39,7 @@ class PlainDecoder : public Decoder { using Decoder::num_values_; explicit PlainDecoder(const ColumnDescriptor* descr) : - Decoder(descr, parquet::Encoding::PLAIN), + Decoder(descr, Encoding::PLAIN), data_(NULL), len_(0) {} virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -98,7 +102,7 @@ template <> class PlainDecoder : public Decoder { public: explicit PlainDecoder(const ColumnDescriptor* descr) : - Decoder(descr, parquet::Encoding::PLAIN) {} + Decoder(descr, Encoding::PLAIN) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -145,7 +149,7 @@ class PlainEncoder : public Encoder { typedef typename type_traits::value_type T; explicit PlainEncoder(const ColumnDescriptor* descr) : - Encoder(descr, parquet::Encoding::PLAIN) {} + Encoder(descr, Encoding::PLAIN) {} virtual void Encode(const T* src, int num_values, OutputStream* dst); }; @@ -154,7 +158,7 @@ template <> class PlainEncoder : public Encoder { public: explicit PlainEncoder(const ColumnDescriptor* descr) : - Encoder(descr, parquet::Encoding::PLAIN) {} + Encoder(descr, Encoding::PLAIN) {} virtual void Encode(const bool* src, int num_values, OutputStream* dst) { throw ParquetException("this API for encoding bools not implemented"); diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt index ef6ac01e..b7a65f19 100644 --- a/src/parquet/file/CMakeLists.txt +++ b/src/parquet/file/CMakeLists.txt @@ -18,3 +18,5 @@ install(FILES reader.h DESTINATION include/parquet/file) + +ADD_PARQUET_TEST(file-deserialize-test) diff --git a/src/parquet/column/serialized-page-test.cc b/src/parquet/file/file-deserialize-test.cc similarity index 90% rename from src/parquet/column/serialized-page-test.cc rename to src/parquet/file/file-deserialize-test.cc index 5c490218..e90889dd 100644 --- a/src/parquet/column/serialized-page-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -15,28 +15,30 @@ // specific language governing permissions and limitations // under the License. +#include + +#include #include -#include -#include +#include +#include +#include #include -#include - -#include "parquet/types.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/thrift/util.h" -#include "parquet/column/serialized-page.h" #include "parquet/column/page.h" -#include "parquet/column/reader.h" #include "parquet/column/test-util.h" +#include "parquet/file/reader-internal.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/thrift/util.h" +#include "parquet/types.h" +#include "parquet/util/input.h" namespace parquet_cpp { class TestSerializedPage : public ::testing::Test { public: void InitSerializedPageReader(const uint8_t* buffer, size_t header_size, - parquet::CompressionCodec::type codec) { + Compression::type codec) { std::unique_ptr stream; stream.reset(new InMemoryInputStream(buffer, header_size)); page_reader_.reset(new SerializedPageReader(std::move(stream), codec)); @@ -68,10 +70,10 @@ TEST_F(TestSerializedPage, TestLargePageHeaders) { ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED); + serialized_buffer.length(), Compression::UNCOMPRESSED); std::shared_ptr current_page = page_reader_->NextPage(); - ASSERT_EQ(parquet::PageType::DATA_PAGE, current_page->type()); + ASSERT_EQ(PageType::DATA_PAGE, current_page->type()); const DataPage* page = static_cast(current_page.get()); ASSERT_EQ(num_values, page->num_values()); } @@ -99,7 +101,7 @@ TEST_F(TestSerializedPage, TestFailLargePageHeaders) { ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED); + serialized_buffer.length(), Compression::UNCOMPRESSED); // Set the max page header size to 128 KB, which is less than the current header size page_reader_->set_max_page_header_size(max_header_size); diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 7b0a7195..47092a5f 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -17,16 +17,137 @@ #include "parquet/file/reader-internal.h" -#include +#include +#include +#include +#include #include -#include "parquet/column/serialized-page.h" +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/exception.h" #include "parquet/schema/converter.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" #include "parquet/thrift/util.h" +#include "parquet/types.h" #include "parquet/util/input.h" namespace parquet_cpp { +// ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +SerializedPageReader::SerializedPageReader(std::unique_ptr stream, + Compression::type codec) : + stream_(std::move(stream)) { + max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; + // TODO(wesm): add GZIP after PARQUET-456 + switch (codec) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + case Compression::LZO: + decompressor_.reset(new Lz4Codec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } +} + +std::shared_ptr SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + while (true) { + int64_t bytes_read = 0; + int64_t bytes_available = 0; + uint32_t header_size = 0; + const uint8_t* buffer; + uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE; + std::stringstream ss; + + // Page headers can be very large because of page statistics + // We try to deserialize a larger buffer progressively + // until a maximum allowed header limit + while (true) { + buffer = stream_->Peek(allowed_page_size, &bytes_available); + if (bytes_available == 0) { + return std::shared_ptr(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + header_size = bytes_available; + try { + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + break; + } catch (std::exception& e) { + // Failed to deserialize. Double the allowed page header size and try again + ss << e.what(); + allowed_page_size *= 2; + if (allowed_page_size > max_page_header_size_) { + ss << "Deserializing page header failed.\n"; + throw ParquetException(ss.str()); + } + } + } + // Advance the stream offset + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { + const parquet::DictionaryPageHeader& dict_header = + current_page_header_.dictionary_page_header; + + bool is_sorted = dict_header.__isset.is_sorted? dict_header.is_sorted : false; + + return std::make_shared(buffer, uncompressed_len, + dict_header.num_values, FromThrift(dict_header.encoding), + is_sorted); + } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) { + const parquet::DataPageHeader& header = current_page_header_.data_page_header; + + return std::make_shared(buffer, uncompressed_len, + header.num_values, + FromThrift(header.encoding), + FromThrift(header.definition_level_encoding), + FromThrift(header.repetition_level_encoding)); + } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) { + const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; + bool is_compressed = header.__isset.is_compressed? header.is_compressed : false; + return std::make_shared(buffer, uncompressed_len, + header.num_values, header.num_nulls, header.num_rows, + FromThrift(header.encoding), + header.definition_levels_byte_length, + header.repetition_levels_byte_length, is_compressed); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr(nullptr); +} + // ---------------------------------------------------------------------- // SerializedRowGroup @@ -62,7 +183,7 @@ std::unique_ptr SerializedRowGroup::GetColumnPageReader(int i) { const ColumnDescriptor* descr = schema_->Column(i); return std::unique_ptr(new SerializedPageReader(std::move(input), - col.meta_data.codec)); + FromThrift(col.meta_data.codec))); } RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) { diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 8ba105ea..b7e9154d 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -18,16 +18,57 @@ #ifndef PARQUET_FILE_READER_INTERNAL_H #define PARQUET_FILE_READER_INTERNAL_H -#include "parquet/file/reader.h" - +#include #include +#include -#include "parquet/schema/descriptor.h" -#include "parquet/util/input.h" +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/file/reader.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" +#include "parquet/util/input.h" namespace parquet_cpp { +class SchemaDescriptor; + +// 16 MB is the default maximum page header size +static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; + +// 16 KB is the default expected page header size +static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift parquet::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::unique_ptr stream, + Compression::type codec); + + virtual ~SerializedPageReader() {} + + // Implement the PageReader interface + virtual std::shared_ptr NextPage(); + + void set_max_page_header_size(uint32_t size) { + max_page_header_size_ = size; + } + + private: + std::unique_ptr stream_; + + parquet::PageHeader current_page_header_; + std::shared_ptr current_page_; + + // Compression codec to use. + std::unique_ptr decompressor_; + std::vector decompression_buffer_; + // Maximum allowed page size + uint32_t max_page_header_size_; +}; + // RowGroupReader::Contents implementation for the Parquet file specification class SerializedRowGroup : public RowGroupReader::Contents { public: diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 6ef59edc..9da0f098 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -18,17 +18,19 @@ #include "parquet/file/reader.h" #include -#include #include #include #include +#include #include +#include "parquet/column/page.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" - #include "parquet/exception.h" #include "parquet/file/reader-internal.h" +#include "parquet/util/input.h" +#include "parquet/types.h" using std::string; using std::vector; diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index 3ff8697c..32ae4292 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -19,21 +19,17 @@ #define PARQUET_FILE_READER_H #include +#include #include #include -#include #include -#include "parquet/types.h" -#include "parquet/schema/descriptor.h" - -// TODO(wesm): Still depends on Thrift #include "parquet/column/page.h" +#include "parquet/schema/descriptor.h" namespace parquet_cpp { class ColumnReader; -class ParquetFileReader; struct RowGroupStatistics { int64_t num_values; diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index b8624aea..ea8ab5ee 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -27,9 +27,19 @@ #include #include "parquet/exception.h" + +// Column reader API #include "parquet/column/reader.h" + +// File API #include "parquet/file/reader.h" +// Schemas +#include "parquet/schema/descriptor.h" +#include "parquet/schema/printer.h" +#include "parquet/schema/types.h" + +// IO #include "parquet/util/input.h" #include "parquet/util/output.h" diff --git a/src/parquet/public-api-test.cc b/src/parquet/public-api-test.cc new file mode 100644 index 00000000..41037146 --- /dev/null +++ b/src/parquet/public-api-test.cc @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "parquet/parquet.h" + +namespace parquet_cpp { + +TEST(TestPublicAPI, DoesNotIncludeThrift) { +#ifdef _THRIFT_THRIFT_H_ + FAIL() << "Thrift headers should not be in the public API"; +#endif +} + +} // namespace parquet_cpp diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index 8599e7e3..97a5f79c 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -16,9 +16,9 @@ // under the License. #include +#include #include #include -#include #include #include @@ -26,7 +26,6 @@ #include "parquet/file/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" -#include "parquet/util/input.h" using std::string; diff --git a/src/parquet/schema/CMakeLists.txt b/src/parquet/schema/CMakeLists.txt index 0902ccf0..8aa99696 100644 --- a/src/parquet/schema/CMakeLists.txt +++ b/src/parquet/schema/CMakeLists.txt @@ -17,9 +17,8 @@ # Headers: top level install(FILES - builder.h - converter.h descriptor.h + printer.h types.h DESTINATION include/parquet/schema) diff --git a/src/parquet/schema/converter.cc b/src/parquet/schema/converter.cc index 9b45cc9b..9eb59b0d 100644 --- a/src/parquet/schema/converter.cc +++ b/src/parquet/schema/converter.cc @@ -17,9 +17,10 @@ #include "parquet/schema/converter.h" -#include - #include "parquet/exception.h" +#include "parquet/schema/descriptor.h" +#include "parquet/schema/types.h" +#include "parquet/thrift/parquet_types.h" using parquet::SchemaElement; @@ -46,7 +47,7 @@ std::unique_ptr FlatSchemaConverter::Convert() { std::unique_ptr FlatSchemaConverter::NextNode() { const SchemaElement& element = Next(); - size_t node_id = next_id(); + int node_id = next_id(); const void* opaque_element = static_cast(&element); @@ -56,7 +57,7 @@ std::unique_ptr FlatSchemaConverter::NextNode() { } else { // Group NodeVector fields; - for (size_t i = 0; i < element.num_children; ++i) { + for (int i = 0; i < element.num_children; ++i) { std::unique_ptr field = NextNode(); fields.push_back(NodePtr(field.release())); } @@ -82,25 +83,6 @@ std::shared_ptr FromParquet(const std::vector& return descr; } -// ---------------------------------------------------------------------- -// Conversion back to Parquet metadata - -// TODO: decide later what to do with these. When converting back only need to -// write into a parquet::SchemaElement - -// FieldRepetitionType::type ToParquet(Repetition::type type) { -// return static_cast(type); -// } - -// parquet::ConvertedType::type ToParquet(LogicalType::type type) { -// // item 0 is NONE -// return static_cast(static_cast(type) - 1); -// } - -// parquet::Type::type ToParquet(Type::type type) { -// return static_cast(type); -// } - } // namespace schema } // namespace parquet_cpp diff --git a/src/parquet/schema/converter.h b/src/parquet/schema/converter.h index cde48c9e..055eb693 100644 --- a/src/parquet/schema/converter.h +++ b/src/parquet/schema/converter.h @@ -25,20 +25,20 @@ #ifndef PARQUET_SCHEMA_CONVERTER_H #define PARQUET_SCHEMA_CONVERTER_H -#include #include -#include #include -#include "parquet/schema/descriptor.h" -#include "parquet/schema/types.h" - -#include "parquet/thrift/parquet_types.h" +namespace parquet { class SchemaElement;} namespace parquet_cpp { +class SchemaDescriptor; + namespace schema { +class GroupNode; +class Node; + // ---------------------------------------------------------------------- // Conversion from Parquet Thrift metadata @@ -47,7 +47,7 @@ std::shared_ptr FromParquet( class FlatSchemaConverter { public: - FlatSchemaConverter(const parquet::SchemaElement* elements, size_t length) : + FlatSchemaConverter(const parquet::SchemaElement* elements, int length) : elements_(elements), length_(length), pos_(0), @@ -57,11 +57,11 @@ class FlatSchemaConverter { private: const parquet::SchemaElement* elements_; - size_t length_; - size_t pos_; - size_t current_id_; + int length_; + int pos_; + int current_id_; - size_t next_id() { + int next_id() { return current_id_++; } diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h index d27dcc1e..7991deae 100644 --- a/src/parquet/schema/descriptor.h +++ b/src/parquet/schema/descriptor.h @@ -19,12 +19,14 @@ #define PARQUET_SCHEMA_DESCRIPTOR_H #include +#include #include #include #include #include #include "parquet/schema/types.h" +#include "parquet/types.h" namespace parquet_cpp { diff --git a/src/parquet/schema/printer.cc b/src/parquet/schema/printer.cc index 9c43e8e2..2aa2940b 100644 --- a/src/parquet/schema/printer.cc +++ b/src/parquet/schema/printer.cc @@ -17,15 +17,19 @@ #include "parquet/schema/printer.h" +#include #include +#include "parquet/schema/types.h" +#include "parquet/types.h" + namespace parquet_cpp { namespace schema { class SchemaPrinter : public Node::Visitor { public: - explicit SchemaPrinter(std::ostream& stream, size_t indent_width) : + explicit SchemaPrinter(std::ostream& stream, int indent_width) : stream_(stream), indent_(0), indent_width_(2) {} @@ -40,8 +44,8 @@ class SchemaPrinter : public Node::Visitor { std::ostream& stream_; - size_t indent_; - size_t indent_width_; + int indent_; + int indent_width_; }; static void PrintRepLevel(Repetition::type repetition, std::ostream& stream) { @@ -103,7 +107,7 @@ void SchemaPrinter::Visit(const GroupNode* node) { stream_ << " group " << node->name() << " {" << std::endl; indent_ += indent_width_; - for (size_t i = 0; i < node->field_count(); ++i) { + for (int i = 0; i < node->field_count(); ++i) { node->field(i)->Visit(this); } indent_ -= indent_width_; @@ -129,7 +133,7 @@ void SchemaPrinter::Visit(const Node* node) { } void PrintSchema(const Node* schema, std::ostream& stream, - size_t indent_width) { + int indent_width) { SchemaPrinter printer(stream, indent_width); printer.Visit(schema); } diff --git a/src/parquet/schema/printer.h b/src/parquet/schema/printer.h index 535262f9..6df78d0e 100644 --- a/src/parquet/schema/printer.h +++ b/src/parquet/schema/printer.h @@ -20,16 +20,16 @@ #ifndef PARQUET_SCHEMA_PRINTER_H #define PARQUET_SCHEMA_PRINTER_H -#include "parquet/schema/types.h" - #include namespace parquet_cpp { namespace schema { +class Node; + void PrintSchema(const Node* schema, std::ostream& stream, - size_t indent_width = 2); + int indent_width = 2); } // namespace schema diff --git a/src/parquet/schema/schema-converter-test.cc b/src/parquet/schema/schema-converter-test.cc index f2dadf26..93cfd24f 100644 --- a/src/parquet/schema/schema-converter-test.cc +++ b/src/parquet/schema/schema-converter-test.cc @@ -15,17 +15,19 @@ // specific language governing permissions and limitations // under the License. -#include +#include +#include #include #include #include -#include "parquet/util/test-common.h" +#include "parquet/exception.h" #include "parquet/schema/converter.h" -#include "parquet/thrift/parquet_types.h" - #include "parquet/schema/test-util.h" +#include "parquet/schema/types.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" using std::string; using std::vector; diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc index 1328bed1..c63df544 100644 --- a/src/parquet/schema/schema-descriptor-test.cc +++ b/src/parquet/schema/schema-descriptor-test.cc @@ -18,16 +18,15 @@ // Schema / column descriptor correctness tests (from flat Parquet schemas) #include +#include #include #include #include -#include "parquet/util/test-common.h" -#include "parquet/schema/converter.h" +#include "parquet/exception.h" #include "parquet/schema/descriptor.h" - -#include "parquet/thrift/parquet_types.h" +#include "parquet/schema/types.h" using std::string; using std::vector; diff --git a/src/parquet/schema/schema-printer-test.cc b/src/parquet/schema/schema-printer-test.cc index c21429af..094829b5 100644 --- a/src/parquet/schema/schema-printer-test.cc +++ b/src/parquet/schema/schema-printer-test.cc @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include +#include + +#include #include #include -#include -#include "parquet/util/test-common.h" - #include "parquet/schema/printer.h" -#include "parquet/schema/test-util.h" +#include "parquet/schema/types.h" +#include "parquet/types.h" using std::string; using std::vector; diff --git a/src/parquet/schema/schema-types-test.cc b/src/parquet/schema/schema-types-test.cc index 72d38c0e..cac7dc58 100644 --- a/src/parquet/schema/schema-types-test.cc +++ b/src/parquet/schema/schema-types-test.cc @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include +#include + +#include #include #include -#include -#include "parquet/util/test-common.h" - -#include "parquet/schema/types.h" #include "parquet/schema/test-util.h" +#include "parquet/schema/types.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/types.h" using std::string; using std::vector; diff --git a/src/parquet/schema/types.cc b/src/parquet/schema/types.cc index e088eede..fae7c842 100644 --- a/src/parquet/schema/types.cc +++ b/src/parquet/schema/types.cc @@ -19,7 +19,9 @@ #include +#include "parquet/exception.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/thrift/util.h" namespace parquet_cpp { @@ -72,7 +74,7 @@ bool GroupNode::EqualsInternal(const GroupNode* other) const { if (this->field_count() != other->field_count()) { return false; } - for (size_t i = 0; i < this->field_count(); ++i) { + for (int i = 0; i < this->field_count(); ++i) { if (!this->field(i)->Equals(other->field(i).get())) { return false; } @@ -94,19 +96,6 @@ void GroupNode::Visit(Node::Visitor* visitor) { // ---------------------------------------------------------------------- // Node construction from Parquet metadata -static Type::type ConvertEnum(parquet::Type::type type) { - return static_cast(type); -} - -static LogicalType::type ConvertEnum(parquet::ConvertedType::type type) { - // item 0 is NONE - return static_cast(static_cast(type) + 1); -} - -static Repetition::type ConvertEnum(parquet::FieldRepetitionType::type type) { - return static_cast(type); -} - struct NodeParams { explicit NodeParams(const std::string& name) : name(name) {} @@ -119,9 +108,9 @@ struct NodeParams { static inline NodeParams GetNodeParams(const parquet::SchemaElement* element) { NodeParams params(element->name); - params.repetition = ConvertEnum(element->repetition_type); + params.repetition = FromThrift(element->repetition_type); if (element->__isset.converted_type) { - params.logical_type = ConvertEnum(element->converted_type); + params.logical_type = FromThrift(element->converted_type); } else { params.logical_type = LogicalType::NONE; } @@ -145,7 +134,7 @@ std::unique_ptr PrimitiveNode::FromParquet(const void* opaque_element, std::unique_ptr result = std::unique_ptr( new PrimitiveNode(params.name, params.repetition, - ConvertEnum(element->type), params.logical_type, node_id)); + FromThrift(element->type), params.logical_type, node_id)); if (element->type == parquet::Type::FIXED_LEN_BYTE_ARRAY) { result->SetTypeLength(element->type_length); diff --git a/src/parquet/schema/types.h b/src/parquet/schema/types.h index 82db2330..83b9fd2c 100644 --- a/src/parquet/schema/types.h +++ b/src/parquet/schema/types.h @@ -26,7 +26,6 @@ #include #include -#include "parquet/exception.h" #include "parquet/types.h" #include "parquet/util/macros.h" @@ -254,11 +253,11 @@ class GroupNode : public Node { virtual bool Equals(const Node* other) const; - const NodePtr& field(size_t i) const { + const NodePtr& field(int i) const { return fields_[i]; } - size_t field_count() const { + int field_count() const { return fields_.size(); } diff --git a/src/parquet/thrift/serializer-test.cc b/src/parquet/thrift/serializer-test.cc index e89b1080..756fd100 100644 --- a/src/parquet/thrift/serializer-test.cc +++ b/src/parquet/thrift/serializer-test.cc @@ -15,18 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include -#include -#include -#include - #include +#include +#include +#include + +#include "parquet/column/test-util.h" #include "parquet/thrift/parquet_types.h" #include "parquet/thrift/util.h" -#include "parquet/column/page.h" -#include "parquet/column/reader.h" -#include "parquet/column/test-util.h" using std::string; @@ -59,12 +56,12 @@ TEST_F(TestThrift, TestSerializerDeserializer) { uint32_t header_size = 1024; // Deserialize the serialized page buffer ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast(serialized_buffer.c_str()), - &header_size, &out_page_header)); + &header_size, &out_page_header)); ASSERT_LE(stats_size, header_size); ASSERT_GE(max_header_len, header_size); ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding); - ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding); + ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding); ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding); for(int i = 0; i < stats_size; i++){ EXPECT_EQ(i % 255, (reinterpret_cast diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index a472dc27..8c341974 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -17,11 +17,39 @@ #include #include -#include "parquet/util/logging.h" #include "parquet/exception.h" +#include "parquet/util/logging.h" +#include "parquet/thrift/parquet_types.h" namespace parquet_cpp { +// ---------------------------------------------------------------------- +// Convert Thrift enums to / from parquet_cpp enums + +static inline Type::type FromThrift(parquet::Type::type type) { + return static_cast(type); +} + +static inline LogicalType::type FromThrift(parquet::ConvertedType::type type) { + // item 0 is NONE + return static_cast(static_cast(type) + 1); +} + +static inline Repetition::type FromThrift(parquet::FieldRepetitionType::type type) { + return static_cast(type); +} + +static inline Encoding::type FromThrift(parquet::Encoding::type type) { + return static_cast(type); +} + +static inline Compression::type FromThrift(parquet::CompressionCodec::type type) { + return static_cast(type); +} + +// ---------------------------------------------------------------------- +// Thrift struct serialization / deserialization utilities + // Deserialize a thrift message from buf/len. buf/len must at least contain // all the bytes needed to store the thrift message. On return, len will be // set to the actual length of the header. diff --git a/src/parquet/types.h b/src/parquet/types.h index 2d15cadc..8c5e1236 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -108,7 +108,6 @@ struct Encoding { // Compression, mirrors parquet::CompressionCodec struct Compression { enum type { - NONE, UNCOMPRESSED, SNAPPY, GZIP, diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h index eac53467..714911c1 100644 --- a/src/parquet/util/bit-util.h +++ b/src/parquet/util/bit-util.h @@ -26,7 +26,7 @@ #include #endif -#include +#include #include "parquet/util/compiler-util.h" #include "parquet/util/cpu-info.h" @@ -34,10 +34,35 @@ namespace parquet_cpp { -using boost::make_unsigned; +// TODO(wesm): The source from Impala was depending on boost::make_unsigned +// +// We add a partial stub implementation here + +template +struct make_unsigned { +}; + +template <> +struct make_unsigned { + typedef uint8_t type; +}; + +template <> +struct make_unsigned { + typedef uint16_t type; +}; + +template <> +struct make_unsigned { + typedef uint32_t type; +}; + +template <> +struct make_unsigned { + typedef uint64_t type; +}; /// Utility class to do standard bit tricks -/// TODO: is this in boost or something else like that? class BitUtil { public: /// Returns the ceil of value/divisor diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h index 4fd9cd78..d9b07fd9 100644 --- a/src/parquet/util/input.h +++ b/src/parquet/util/input.h @@ -18,6 +18,7 @@ #ifndef PARQUET_UTIL_INPUT_H #define PARQUET_UTIL_INPUT_H +#include #include #include #include diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc index 84f5b57e..5fbca4a4 100644 --- a/src/parquet/util/output-test.cc +++ b/src/parquet/util/output-test.cc @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include - #include +#include +#include +#include + #include "parquet/util/output.h" #include "parquet/util/test-common.h" diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc index 9748a69e..f0c8989f 100644 --- a/src/parquet/util/output.cc +++ b/src/parquet/util/output.cc @@ -17,9 +17,7 @@ #include "parquet/util/output.h" -#include #include -#include #include "parquet/exception.h" diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h index e83b2619..be25abd9 100644 --- a/src/parquet/util/output.h +++ b/src/parquet/util/output.h @@ -19,7 +19,6 @@ #define PARQUET_UTIL_OUTPUT_H #include -#include #include namespace parquet_cpp {