Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns #35351

Merged
merged 5 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
" columns, requested metadata for column: ", i);
}

std::vector<SortingColumn> sorting_columns() const {
std::vector<SortingColumn> sorting_columns;
if (!row_group_->__isset.sorting_columns) {
return sorting_columns;
}
sorting_columns.resize(row_group_->sorting_columns.size());
for (size_t i = 0; i < sorting_columns.size(); ++i) {
sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
}
return sorting_columns;
}

private:
const format::RowGroup* row_group_;
const SchemaDescriptor* schema_;
Expand Down Expand Up @@ -571,6 +583,10 @@ bool RowGroupMetaData::can_decompress() const {
return true;
}

std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
return impl_->sorting_columns();
}

// file metadata
class FileMetaData::FileMetaDataImpl {
public:
Expand Down Expand Up @@ -1684,6 +1700,15 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
total_compressed_size += column_builders_[i]->total_compressed_size();
}

const auto& sorting_columns = properties_->sorting_columns();
if (!sorting_columns.empty()) {
std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
for (size_t i = 0; i < sorting_columns.size(); ++i) {
thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
}
row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
}

row_group_->__set_file_offset(file_offset);
row_group_->__set_total_compressed_size(total_compressed_size);
row_group_->__set_total_byte_size(total_bytes_written);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ class PARQUET_EXPORT RowGroupMetaData {
const SchemaDescriptor* schema() const;
// Indicate if all of the RowGroup's ColumnChunks can be decompressed.
bool can_decompress() const;
// Sorting columns of the row group if any.
std::vector<SortingColumn> sorting_columns() const;

private:
explicit RowGroupMetaData(
Expand Down
43 changes: 43 additions & 0 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,49 @@ TEST(Metadata, TestReadPageIndex) {
}
}

TEST(Metadata, TestSortingColumns) {
schema::NodeVector fields;
fields.push_back(schema::Int32("sort_col", Repetition::REQUIRED));
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));

auto schema = std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));

std::vector<SortingColumn> sorting_columns;
{
SortingColumn sorting_column;
sorting_column.column_idx = 0;
sorting_column.descending = false;
sorting_column.nulls_first = false;
sorting_columns.push_back(sorting_column);
}

auto sink = CreateOutputStream();
auto writer_props = parquet::WriterProperties::Builder()
.disable_dictionary()
->set_sorting_columns(sorting_columns)
->build();

EXPECT_EQ(sorting_columns, writer_props->sorting_columns());

auto file_writer = parquet::ParquetFileWriter::Open(sink, schema, writer_props);

auto row_group_writer = file_writer->AppendBufferedRowGroup();
row_group_writer->Close();
file_writer->Close();

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
auto row_group_reader = file_reader->RowGroup(0);
auto* row_group_read_metadata = row_group_reader->metadata();
ASSERT_NE(nullptr, row_group_read_metadata);
EXPECT_EQ(sorting_columns, row_group_read_metadata->sorting_columns());
}

TEST(ApplicationVersion, Basics) {
ApplicationVersion version("parquet-mr version 1.7.9");
ApplicationVersion version1("parquet-mr version 1.8.0");
Expand Down
25 changes: 23 additions & 2 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ class PARQUET_EXPORT WriterProperties {
return this->enable_statistics(path->ToDotString());
}

/// Define the sorting columns.
/// Default empty.
///
/// If sorting columns are set, user should ensure that records
/// are sorted by sorting columns. Otherwise, the storing data
/// will be inconsistent with sorting_columns metadata.
Builder* set_sorting_columns(std::vector<SortingColumn> sorting_columns) {
sorting_columns_ = std::move(sorting_columns);
return this;
}

/// Disable statistics for the column specified by `path`.
/// Default enabled.
Builder* disable_statistics(const std::string& path) {
Expand Down Expand Up @@ -578,7 +589,8 @@ class PARQUET_EXPORT WriterProperties {
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_));
column_properties, data_page_version_, store_decimal_as_integer_,
std::move(sorting_columns_)));
}

private:
Expand All @@ -595,6 +607,9 @@ class PARQUET_EXPORT WriterProperties {

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

// If empty, there is no sorting columns.
std::vector<SortingColumn> sorting_columns_;

// Settings used for each column unless overridden in any of the maps below
ColumnProperties default_column_properties_;
std::unordered_map<std::string, Encoding::type> encodings_;
Expand Down Expand Up @@ -666,6 +681,8 @@ class PARQUET_EXPORT WriterProperties {
return column_properties(path).dictionary_enabled();
}

const std::vector<SortingColumn>& sorting_columns() const { return sorting_columns_; }

bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).statistics_enabled();
}
Expand Down Expand Up @@ -711,7 +728,8 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
std::vector<SortingColumn> sorting_columns)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
Expand All @@ -723,6 +741,7 @@ class PARQUET_EXPORT WriterProperties {
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
file_encryption_properties_(file_encryption_properties),
sorting_columns_(std::move(sorting_columns)),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}

Expand All @@ -739,6 +758,8 @@ class PARQUET_EXPORT WriterProperties {

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

std::vector<SortingColumn> sorting_columns_;

ColumnProperties default_column_properties_;
std::unordered_map<std::string, ColumnProperties> column_properties_;
};
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/parquet/thrift_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encrypt
return encryption_algorithm;
}

static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_column) {
SortingColumn sorting_column;
sorting_column.column_idx = thrift_sorting_column.column_idx;
sorting_column.nulls_first = thrift_sorting_column.nulls_first;
sorting_column.descending = thrift_sorting_column.descending;
return sorting_column;
}

// ----------------------------------------------------------------------
// Convert Thrift enums from Parquet enums

Expand Down Expand Up @@ -307,6 +315,14 @@ static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
}
}

static inline format::SortingColumn ToThrift(SortingColumn sorting_column) {
format::SortingColumn thrift_sorting_column;
thrift_sorting_column.column_idx = sorting_column.column_idx;
thrift_sorting_column.descending = sorting_column.descending;
thrift_sorting_column.nulls_first = sorting_column.nulls_first;
return thrift_sorting_column;
}

static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
format::Statistics statistics;
if (stats.has_min) {
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,27 @@ struct BoundaryOrder {
};
};

/// \brief SortingColumn is a proxy around format::SortingColumn.
struct PARQUET_EXPORT SortingColumn {
// The column index (in this row group)
int32_t column_idx;

// If true, indicates this column is sorted in descending order.
bool descending;

// If true, nulls will come before non-null values, otherwise, nulls go at the end.
bool nulls_first;
};

inline bool operator==(const SortingColumn& left, const SortingColumn& right) {
return left.nulls_first == right.nulls_first && left.descending == right.descending &&
left.column_idx == right.column_idx;
}

inline bool operator!=(const SortingColumn& left, const SortingColumn& right) {
return !(left == right);
}

// ----------------------------------------------------------------------

struct ByteArray {
Expand Down