Skip to content

Commit

Permalink
PARQUET-1780: [C++] Set ColumnMetadata.encoding_stats field
Browse files Browse the repository at this point in the history
This is to solve the issue PARQUET-1780:
ColumnMetadata.encoding_stats field is empty in parquet-cpp implementation.
This leads to metadata mismatches between 2 parquet files generated by cpp and scala(parquet-mr).

encoding_stat is a vector of **PageEncodingStats**.
PageEncodingStats has three attributes:

- page_type: (data or dict)
- encoding: encoding of the page
- count:number of pages of this type with this encoding

From above first to can be extracted from available information. But for count I have to create a add some attributes to exisiting classes.
Modifications:
For the class **SerializedPageWriter**, added following two attributes.
  int32_t num_dict_pages_;
  std::pair<int32_t, int32_t> num_data_pages_; (first: number of un-encoded pages,
                                                                            second:number of encoded pages )

Closes #6370 from omega-gamage/PARQUET-1780 and squashes the following commits:

086af4e <Wes McKinney> Code review comments
a9c684b <Omega Gamage> Match the implementation with impala implementation
eae56fa <Wes McKinney> Simplify PageEncodingStats
54ac1eb <Omega Gamage> commit 9eecaaf Author: Omega Gamage <omega@bigstream.co> Date:   Tue Feb 18 14:23:08 2020 +0530

Lead-authored-by: Omega Gamage <omega@bigstream.co>
Co-authored-by: Wes McKinney <wesm+git@apache.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
omega-bigstream and wesm committed Mar 3, 2020
1 parent 21c4d4b commit b4acb0b
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 17 deletions.
13 changes: 10 additions & 3 deletions cpp/src/parquet/column_writer.cc
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <map>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -226,6 +227,7 @@ class SerializedPageWriter : public PageWriter {

total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
++dict_encoding_stats_[page.encoding()];

PARQUET_ASSIGN_OR_THROW(int64_t final_pos, sink_->Tell());
return final_pos - start_pos;
Expand All @@ -238,7 +240,8 @@ class SerializedPageWriter : public PageWriter {
// index_page_offset = -1 since they are not supported
metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
total_compressed_size_, total_uncompressed_size_, has_dictionary,
fallback, meta_encryptor_);
fallback, dict_encoding_stats_, data_encoding_stats_,
meta_encryptor_);
// Write metadata at end of column chunk
metadata_->WriteTo(sink_.get());
}
Expand Down Expand Up @@ -310,7 +313,7 @@ class SerializedPageWriter : public PageWriter {
total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
num_values_ += page.num_values();

++data_encoding_stats_[page.encoding()];
++page_ordinal_;
PARQUET_ASSIGN_OR_THROW(int64_t current_pos, sink_->Tell());
return current_pos - start_pos;
Expand Down Expand Up @@ -405,6 +408,9 @@ class SerializedPageWriter : public PageWriter {
std::shared_ptr<Encryptor> data_encryptor_;

std::shared_ptr<ResizableBuffer> encryption_buffer_;

std::map<Encoding::type, int32_t> dict_encoding_stats_;
std::map<Encoding::type, int32_t> data_encoding_stats_;
};

// This implementation of the PageWriter writes to the final sink on Close .
Expand Down Expand Up @@ -441,7 +447,8 @@ class BufferedPageWriter : public PageWriter {
metadata_->Finish(pager_->num_values(), dictionary_page_offset, -1,
pager_->data_page_offset() + final_position,
pager_->total_compressed_size(), pager_->total_uncompressed_size(),
has_dictionary, fallback, pager_->meta_encryptor_);
has_dictionary, fallback, pager_->dict_encoding_stats_,
pager_->data_encoding_stats_, pager_->meta_encryptor_);

// Write metadata at end of column chunk
metadata_->WriteTo(in_memory_sink_.get());
Expand Down
34 changes: 34 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Expand Up @@ -185,6 +185,31 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
{Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
ASSERT_EQ(encodings, expected);
}

std::vector<parquet::PageEncodingStats> encoding_stats =
this->metadata_encoding_stats();
if (this->type_num() == Type::BOOLEAN) {
ASSERT_EQ(encoding_stats[0].encoding, Encoding::PLAIN);
ASSERT_EQ(encoding_stats[0].page_type, PageType::DATA_PAGE);
} else if (version == ParquetVersion::PARQUET_1_0) {
std::vector<Encoding::type> expected(
{Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::PLAIN_DICTIONARY});
ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
for (size_t i = 1; i < encoding_stats.size(); i++) {
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
}
} else {
std::vector<Encoding::type> expected(
{Encoding::PLAIN, Encoding::PLAIN, Encoding::RLE_DICTIONARY});
ASSERT_EQ(encoding_stats[0].encoding, expected[0]);
ASSERT_EQ(encoding_stats[0].page_type, PageType::DICTIONARY_PAGE);
for (size_t i = 1; i < encoding_stats.size(); i++) {
ASSERT_EQ(encoding_stats[i].encoding, expected[i]);
ASSERT_EQ(encoding_stats[i].page_type, PageType::DATA_PAGE);
}
}
}

void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
Expand Down Expand Up @@ -273,6 +298,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->encodings();
}

std::vector<parquet::PageEncodingStats> metadata_encoding_stats() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
return metadata_accessor->encoding_stats();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down
51 changes: 42 additions & 9 deletions cpp/src/parquet/metadata.cc
Expand Up @@ -214,6 +214,11 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
for (auto encoding : column_metadata_->encodings) {
encodings_.push_back(FromThrift(encoding));
}
for (auto encoding_stats : column_metadata_->encoding_stats) {
encoding_stats_.push_back({FromThrift(encoding_stats.page_type),
FromThrift(encoding_stats.encoding),
encoding_stats.count});
}
possible_stats_ = nullptr;
}
// column chunk
Expand Down Expand Up @@ -257,6 +262,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {

const std::vector<Encoding::type>& encodings() const { return encodings_; }

const std::vector<PageEncodingStats>& encoding_stats() const { return encoding_stats_; }

inline bool has_dictionary_page() const {
return column_metadata_->__isset.dictionary_page_offset;
}
Expand Down Expand Up @@ -293,6 +300,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
private:
mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
const format::ColumnChunk* column_;
const format::ColumnMetaData* column_metadata_;
format::ColumnMetaData decrypted_metadata_;
Expand Down Expand Up @@ -367,6 +375,10 @@ const std::vector<Encoding::type>& ColumnChunkMetaData::encodings() const {
return impl_->encodings();
}

const std::vector<PageEncodingStats>& ColumnChunkMetaData::encoding_stats() const {
return impl_->encoding_stats();
}

int64_t ColumnChunkMetaData::total_uncompressed_size() const {
return impl_->total_uncompressed_size();
}
Expand Down Expand Up @@ -966,7 +978,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
void Finish(int64_t num_values, int64_t dictionary_page_offset,
int64_t index_page_offset, int64_t data_page_offset,
int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
bool dictionary_fallback, const std::shared_ptr<Encryptor>& encryptor) {
bool dictionary_fallback,
const std::map<Encoding::type, int32_t>& dict_encoding_stats,
const std::map<Encoding::type, int32_t>& data_encoding_stats,
const std::shared_ptr<Encryptor>& encryptor) {
if (dictionary_page_offset > 0) {
column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
Expand Down Expand Up @@ -1000,6 +1015,24 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
}
column_chunk_->meta_data.__set_encodings(thrift_encodings);
std::vector<format::PageEncodingStats> thrift_encoding_stats;
// Add dictionary page encoding stats
for (const auto& entry : dict_encoding_stats) {
format::PageEncodingStats dict_enc_stat;
dict_enc_stat.__set_page_type(format::PageType::DICTIONARY_PAGE);
dict_enc_stat.__set_encoding(ToThrift(entry.first));
dict_enc_stat.__set_count(entry.second);
thrift_encoding_stats.push_back(dict_enc_stat);
}
// Add data page encoding stats
for (const auto& entry : data_encoding_stats) {
format::PageEncodingStats data_enc_stat;
data_enc_stat.__set_page_type(format::PageType::DATA_PAGE);
data_enc_stat.__set_encoding(ToThrift(entry.first));
data_enc_stat.__set_count(entry.second);
thrift_encoding_stats.push_back(data_enc_stat);
}
column_chunk_->meta_data.__set_encoding_stats(thrift_encoding_stats);

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
Expand Down Expand Up @@ -1117,16 +1150,16 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) {
impl_->set_file_path(path);
}

void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
int64_t dictionary_page_offset,
int64_t index_page_offset,
int64_t data_page_offset, int64_t compressed_size,
int64_t uncompressed_size, bool has_dictionary,
bool dictionary_fallback,
const std::shared_ptr<Encryptor>& encryptor) {
void ColumnChunkMetaDataBuilder::Finish(
int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset,
int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size,
bool has_dictionary, bool dictionary_fallback,
const std::map<Encoding::type, int32_t>& dict_encoding_stats,
const std::map<Encoding::type, int32_t>& data_encoding_stats,
const std::shared_ptr<Encryptor>& encryptor) {
impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset,
compressed_size, uncompressed_size, has_dictionary, dictionary_fallback,
encryptor);
dict_encoding_stats, data_encoding_stats, encryptor);
}

void ColumnChunkMetaDataBuilder::WriteTo(::arrow::io::OutputStream* sink) {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/parquet/metadata.h
Expand Up @@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/util/key_value_metadata.h"
Expand Down Expand Up @@ -119,6 +120,13 @@ class PARQUET_EXPORT ColumnCryptoMetaData {
std::unique_ptr<ColumnCryptoMetaDataImpl> impl_;
};

/// \brief Public struct for Thrift PageEncodingStats in ColumnChunkMetaData
struct PageEncodingStats {
PageType::type page_type;
Encoding::type encoding;
int32_t count;
};

class PARQUET_EXPORT ColumnChunkMetaData {
public:
// API convenience to get a MetaData accessor
Expand Down Expand Up @@ -150,6 +158,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
bool can_decompress() const;

const std::vector<Encoding::type>& encodings() const;
const std::vector<PageEncodingStats>& encoding_stats() const;
bool has_dictionary_page() const;
int64_t dictionary_page_offset() const;
int64_t data_page_offset() const;
Expand Down Expand Up @@ -320,6 +329,8 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
int64_t index_page_offset, int64_t data_page_offset,
int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
bool dictionary_fallback,
const std::map<Encoding::type, int32_t>& dict_encoding_stats_,
const std::map<Encoding::type, int32_t>& data_encoding_stats_,
const std::shared_ptr<Encryptor>& encryptor = NULLPTR);

// The metadata contents, suitable for passing to ColumnChunkMetaData::Make
Expand Down
20 changes: 15 additions & 5 deletions cpp/src/parquet/metadata_test.cc
Expand Up @@ -35,18 +35,22 @@ std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
EncodedStatistics stats_int, EncodedStatistics stats_float) {
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
auto rg1_builder = f_builder->AppendRowGroup();

// Write the metadata
// rowgroup1 metadata
auto col1_builder = rg1_builder->NextColumnChunk();
auto col2_builder = rg1_builder->NextColumnChunk();
// column metadata
std::map<Encoding::type, int32_t> dict_encoding_stats({{Encoding::RLE_DICTIONARY, 1}});
std::map<Encoding::type, int32_t> data_encoding_stats(
{{Encoding::PLAIN, 1}, {Encoding::RLE, 1}});
stats_int.set_is_signed(true);
col1_builder->SetStatistics(stats_int);
stats_float.set_is_signed(true);
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false, dict_encoding_stats,
data_encoding_stats);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false, dict_encoding_stats,
data_encoding_stats);

rg1_builder->set_num_rows(nrows / 2);
rg1_builder->Finish(1024);
Expand All @@ -58,8 +62,10 @@ std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
// column metadata
col1_builder->SetStatistics(stats_int);
col2_builder->SetStatistics(stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false, dict_encoding_stats,
data_encoding_stats);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false, dict_encoding_stats,
data_encoding_stats);

rg2_builder->set_num_rows(nrows / 2);
rg2_builder->Finish(1024);
Expand Down Expand Up @@ -155,6 +161,8 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_EQ(24, rg1_column2->dictionary_page_offset());
ASSERT_EQ(10, rg1_column1->data_page_offset());
ASSERT_EQ(30, rg1_column2->data_page_offset());
ASSERT_EQ(3, rg1_column1->encoding_stats().size());
ASSERT_EQ(3, rg1_column2->encoding_stats().size());

auto rg2_accessor = f_accessors[loop_index]->RowGroup(1);
ASSERT_EQ(2, rg2_accessor->num_columns());
Expand Down Expand Up @@ -187,6 +195,8 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_EQ(16, rg2_column2->dictionary_page_offset());
ASSERT_EQ(10, rg2_column1->data_page_offset());
ASSERT_EQ(26, rg2_column2->data_page_offset());
ASSERT_EQ(3, rg2_column1->encoding_stats().size());
ASSERT_EQ(3, rg2_column2->encoding_stats().size());

// Test FileMetaData::set_file_path
ASSERT_TRUE(rg2_column1->file_path().empty());
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/thrift_internal.h
Expand Up @@ -82,6 +82,10 @@ static inline Encoding::type FromThrift(format::Encoding::type type) {
return static_cast<Encoding::type>(type);
}

static inline PageType::type FromThrift(format::PageType::type type) {
return static_cast<PageType::type>(type);
}

static inline AadMetadata FromThrift(format::AesGcmV1 aesGcmV1) {
return AadMetadata{aesGcmV1.aad_prefix, aesGcmV1.aad_file_unique,
aesGcmV1.supply_aad_prefix};
Expand Down

0 comments on commit b4acb0b

Please sign in to comment.