Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/parquet/column_writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compressio
this->SyncValuesOut();
}

template <>
void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression,
int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
std::shared_ptr<CompareDefault<Int96Type>> compare;
compare = std::make_shared<CompareDefaultInt96>();
for (size_t i = 0; i < this->values_.size(); i++) {
if ((*compare)(this->values_[i], this->values_out_[i]) ||
(*compare)(this->values_out_[i], this->values_[i])) {
std::cout << "Failed at " << i << std::endl;
}
ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
}
ASSERT_EQ(this->values_, this->values_out_);
}

template <>
void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
int64_t total_values = static_cast<int64_t>(this->values_out_.size());
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/metadata-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ TEST(ApplicationVersion, Basics) {

ASSERT_EQ(true, version.VersionLt(version1));

ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED));
ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::UNKNOWN));
ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
Expand Down
47 changes: 41 additions & 6 deletions src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const ApplicationVersion ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION =
template <typename DType>
static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
// If new fields max_value/min_value are set, then return them.
if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) {
// If ColumnOrder is defined, return max_value and min_value
if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
return std::make_shared<TypedRowGroupStatistics<DType>>(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
Expand Down Expand Up @@ -310,6 +310,7 @@ class FileMetaData::FileMetaDataImpl {
}

InitSchema();
InitColumnOrders();
InitKeyValueMetadata();
}
~FileMetaDataImpl() {}
Expand Down Expand Up @@ -357,6 +358,23 @@ class FileMetaData::FileMetaDataImpl {
static_cast<int>(metadata_->schema.size()));
schema_.Init(converter.Convert());
}
void InitColumnOrders() {
// update ColumnOrder
std::vector<parquet::ColumnOrder> column_orders;
if (metadata_->__isset.column_orders) {
for (auto column_order : metadata_->column_orders) {
if (column_order.__isset.TYPE_ORDER) {
column_orders.push_back(ColumnOrder::type_defined_);
} else {
column_orders.push_back(ColumnOrder::undefined_);
}
}
} else {
column_orders.resize(schema_.num_columns(), ColumnOrder::undefined_);
}

schema_.updateColumnOrders(column_orders);
}
SchemaDescriptor schema_;
ApplicationVersion writer_version_;

Expand Down Expand Up @@ -495,10 +513,9 @@ bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
// Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) {
// Only SIGNED are valid
if (SortOrder::SIGNED != sort_order) return false;

// None of the current tools write INT96 Statistics correctly
if (col_type == Type::INT96) return false;
if (SortOrder::SIGNED != sort_order) {
return false;
}

// Statistics of other types are OK
if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
Expand All @@ -511,6 +528,11 @@ bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
return true;
}

// Unknown sort order has incorrect stats
if (SortOrder::UNKNOWN == sort_order) {
return false;
}

// PARQUET-251
if (VersionLt(PARQUET_251_FIXED_VERSION)) {
return false;
Expand Down Expand Up @@ -808,6 +830,19 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}
metadata_->__set_version(file_version);
metadata_->__set_created_by(properties_->created_by());

// Users cannot set the `ColumnOrder` since we donot not have user defined sort order
// in the spec yet.
// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
// the API once we have user defined sort orders in the Parquet format.
// TypeDefinedOrder implies choose SortOrder based on LogicalType/PhysicalType
format::TypeDefinedOrder type_defined_order;
format::ColumnOrder column_order;
column_order.__set_TYPE_ORDER(type_defined_order);
column_order.__isset.TYPE_ORDER = true;
metadata_->column_orders.resize(schema_->num_columns(), column_order);
metadata_->__isset.column_orders = true;

parquet::schema::SchemaFlattener flattener(
static_cast<parquet::schema::GroupNode*>(schema_->schema_root().get()),
&metadata_->schema);
Expand Down
Loading