Skip to content

Commit

Permalink
GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#…
Browse files Browse the repository at this point in the history
…35351)

### Rationale for this change

Allow read/set SortColumns in C++ parquet. Node that currently we didn't check sort columns, so user should ensure
that records don't violates the order

### What changes are included in this PR?

For RowGroupMetadata, add a SortColumns interface

### Are these changes tested?

* [x] tests

### Are there any user-facing changes?

User can read sort columns in the future

* Closes: #35331

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
mapleFU committed May 1, 2023
1 parent 3b48834 commit da6dbd4
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 2 deletions.
25 changes: 25 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
" columns, requested metadata for column: ", i);
}

std::vector<SortingColumn> sorting_columns() const {
std::vector<SortingColumn> sorting_columns;
if (!row_group_->__isset.sorting_columns) {
return sorting_columns;
}
sorting_columns.resize(row_group_->sorting_columns.size());
for (size_t i = 0; i < sorting_columns.size(); ++i) {
sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
}
return sorting_columns;
}

private:
const format::RowGroup* row_group_;
const SchemaDescriptor* schema_;
Expand Down Expand Up @@ -571,6 +583,10 @@ bool RowGroupMetaData::can_decompress() const {
return true;
}

std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
return impl_->sorting_columns();
}

// file metadata
class FileMetaData::FileMetaDataImpl {
public:
Expand Down Expand Up @@ -1684,6 +1700,15 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
total_compressed_size += column_builders_[i]->total_compressed_size();
}

const auto& sorting_columns = properties_->sorting_columns();
if (!sorting_columns.empty()) {
std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
for (size_t i = 0; i < sorting_columns.size(); ++i) {
thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
}
row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
}

row_group_->__set_file_offset(file_offset);
row_group_->__set_total_compressed_size(total_compressed_size);
row_group_->__set_total_byte_size(total_bytes_written);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ class PARQUET_EXPORT RowGroupMetaData {
const SchemaDescriptor* schema() const;
// Indicate if all of the RowGroup's ColumnChunks can be decompressed.
bool can_decompress() const;
// Sorting columns of the row group if any.
std::vector<SortingColumn> sorting_columns() const;

private:
explicit RowGroupMetaData(
Expand Down
43 changes: 43 additions & 0 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,49 @@ TEST(Metadata, TestReadPageIndex) {
}
}

TEST(Metadata, TestSortingColumns) {
schema::NodeVector fields;
fields.push_back(schema::Int32("sort_col", Repetition::REQUIRED));
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));

auto schema = std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));

std::vector<SortingColumn> sorting_columns;
{
SortingColumn sorting_column;
sorting_column.column_idx = 0;
sorting_column.descending = false;
sorting_column.nulls_first = false;
sorting_columns.push_back(sorting_column);
}

auto sink = CreateOutputStream();
auto writer_props = parquet::WriterProperties::Builder()
.disable_dictionary()
->set_sorting_columns(sorting_columns)
->build();

EXPECT_EQ(sorting_columns, writer_props->sorting_columns());

auto file_writer = parquet::ParquetFileWriter::Open(sink, schema, writer_props);

auto row_group_writer = file_writer->AppendBufferedRowGroup();
row_group_writer->Close();
file_writer->Close();

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
auto row_group_reader = file_reader->RowGroup(0);
auto* row_group_read_metadata = row_group_reader->metadata();
ASSERT_NE(nullptr, row_group_read_metadata);
EXPECT_EQ(sorting_columns, row_group_read_metadata->sorting_columns());
}

TEST(ApplicationVersion, Basics) {
ApplicationVersion version("parquet-mr version 1.7.9");
ApplicationVersion version1("parquet-mr version 1.8.0");
Expand Down
25 changes: 23 additions & 2 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ class PARQUET_EXPORT WriterProperties {
return this->enable_statistics(path->ToDotString());
}

/// Define the sorting columns.
/// Default empty.
///
/// If sorting columns are set, user should ensure that records
/// are sorted by sorting columns. Otherwise, the storing data
/// will be inconsistent with sorting_columns metadata.
Builder* set_sorting_columns(std::vector<SortingColumn> sorting_columns) {
sorting_columns_ = std::move(sorting_columns);
return this;
}

/// Disable statistics for the column specified by `path`.
/// Default enabled.
Builder* disable_statistics(const std::string& path) {
Expand Down Expand Up @@ -578,7 +589,8 @@ class PARQUET_EXPORT WriterProperties {
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_));
column_properties, data_page_version_, store_decimal_as_integer_,
std::move(sorting_columns_)));
}

private:
Expand All @@ -595,6 +607,9 @@ class PARQUET_EXPORT WriterProperties {

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

// If empty, there is no sorting columns.
std::vector<SortingColumn> sorting_columns_;

// Settings used for each column unless overridden in any of the maps below
ColumnProperties default_column_properties_;
std::unordered_map<std::string, Encoding::type> encodings_;
Expand Down Expand Up @@ -666,6 +681,8 @@ class PARQUET_EXPORT WriterProperties {
return column_properties(path).dictionary_enabled();
}

const std::vector<SortingColumn>& sorting_columns() const { return sorting_columns_; }

bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).statistics_enabled();
}
Expand Down Expand Up @@ -711,7 +728,8 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
std::vector<SortingColumn> sorting_columns)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
Expand All @@ -723,6 +741,7 @@ class PARQUET_EXPORT WriterProperties {
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
file_encryption_properties_(file_encryption_properties),
sorting_columns_(std::move(sorting_columns)),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}

Expand All @@ -739,6 +758,8 @@ class PARQUET_EXPORT WriterProperties {

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

std::vector<SortingColumn> sorting_columns_;

ColumnProperties default_column_properties_;
std::unordered_map<std::string, ColumnProperties> column_properties_;
};
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/parquet/thrift_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encrypt
return encryption_algorithm;
}

static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_column) {
SortingColumn sorting_column;
sorting_column.column_idx = thrift_sorting_column.column_idx;
sorting_column.nulls_first = thrift_sorting_column.nulls_first;
sorting_column.descending = thrift_sorting_column.descending;
return sorting_column;
}

// ----------------------------------------------------------------------
// Convert Thrift enums from Parquet enums

Expand Down Expand Up @@ -307,6 +315,14 @@ static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
}
}

static inline format::SortingColumn ToThrift(SortingColumn sorting_column) {
format::SortingColumn thrift_sorting_column;
thrift_sorting_column.column_idx = sorting_column.column_idx;
thrift_sorting_column.descending = sorting_column.descending;
thrift_sorting_column.nulls_first = sorting_column.nulls_first;
return thrift_sorting_column;
}

static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
format::Statistics statistics;
if (stats.has_min) {
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,27 @@ struct BoundaryOrder {
};
};

/// \brief SortingColumn is a proxy around format::SortingColumn.
struct PARQUET_EXPORT SortingColumn {
// The column index (in this row group)
int32_t column_idx;

// If true, indicates this column is sorted in descending order.
bool descending;

// If true, nulls will come before non-null values, otherwise, nulls go at the end.
bool nulls_first;
};

inline bool operator==(const SortingColumn& left, const SortingColumn& right) {
return left.nulls_first == right.nulls_first && left.descending == right.descending &&
left.column_idx == right.column_idx;
}

inline bool operator!=(const SortingColumn& left, const SortingColumn& right) {
return !(left == right);
}

// ----------------------------------------------------------------------

struct ByteArray {
Expand Down

0 comments on commit da6dbd4

Please sign in to comment.