Skip to content

Commit

Permalink
apacheGH-37111: [C++][Parquet] Dataset: Fixing Schema Cast (apache#37793
Browse files Browse the repository at this point in the history
)

### Rationale for this change

Parquet and Arrow has two schema:
1. Parquet has a SchemaElement[1], it's language and implement independent. Parquet Arrow will extract the schema and decude it.
2. Parquet arrow stores schema and possible `field_id` in `key_value_metadata`[2] when `store_schema` enabled. When deserializing, it will first parse `SchemaElement`[1], and if self-defined key_value_metadata exists, it will use `key_value_metadata` to override the [1]

[1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
[2] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033

The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't use `key_value_metadata` from `Metadata`, which raises the problem.

For duration, when `store_schema` enabled, it will store `Int64` as physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there is no `equal(Duration, i64)`. So raise the un-impl

### What changes are included in this PR?

Set `key_value_metadata` in implemented.

### Are these changes tested?

Yes

### Are there any user-facing changes?

bugfix

* Closes: apache#37111

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
mapleFU committed Sep 20, 2023
1 parent f52ebbb commit 008d277
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
7 changes: 4 additions & 3 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
return arrow_properties;
}

template <typename M>
Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
const M& metadata, const parquet::ArrowReaderProperties& properties) {
const parquet::FileMetaData& metadata,
const parquet::ArrowReaderProperties& properties) {
auto manifest = std::make_shared<SchemaManifest>();
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata = nullptr;
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata =
metadata.key_value_metadata();
RETURN_NOT_OK(SchemaManifest::Make(metadata.schema(), key_value_metadata, properties,
manifest.get()));
return manifest;
Expand Down
31 changes: 29 additions & 2 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,15 @@ class ParquetFormatHelper {
public:
using FormatType = ParquetFileFormat;

static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
static Result<std::shared_ptr<Buffer>> Write(
RecordBatchReader* reader,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
auto pool = ::arrow::default_memory_pool();
std::shared_ptr<Buffer> out;
auto sink = CreateOutputStream(pool);
RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink));
RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink, default_writer_properties(),
arrow_properties));
return sink->Finish();
}
static std::shared_ptr<ParquetFileFormat> MakeFormat() {
Expand Down Expand Up @@ -703,6 +707,29 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a")));
}

TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingDurationColumn) {
// GH-37111: Parquet arrow stores writer schema and possible field_id in
// key_value_metadata when store_schema enabled. When storing `arrow::duration`, it will
// be stored as int64. This test ensures that dataset can parse the writer schema
// correctly.
auto table = TableFromJSON(schema({field("t", duration(TimeUnit::NANO))}),
{
R"([{"t": 1}])",
R"([{"t": 2}, {"t": 3}])",
});
TableBatchReader table_reader(*table);
ASSERT_OK_AND_ASSIGN(
auto buffer,
ParquetFormatHelper::Write(
&table_reader, ArrowWriterProperties::Builder().store_schema()->build()));
auto source = std::make_shared<FileSource>(buffer);
SetSchema({field("t", duration(TimeUnit::NANO))});
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

auto expr = equal(field_ref("t"), literal(::arrow::DurationScalar(1, TimeUnit::NANO)));
CountRowGroupsInFragment(fragment, {0}, expr);
}

// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
Expand Down

0 comments on commit 008d277

Please sign in to comment.