From 38a58d2dc2875804eb0edde2497cb728daca3743 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Tue, 10 Oct 2023 11:43:02 +0200 Subject: [PATCH 1/5] fix to support file_row_number with parquet schema --- extension/parquet/include/parquet_reader.hpp | 3 ++ extension/parquet/parquet_extension.cpp | 29 +++++++++++++++++-- extension/parquet/parquet_reader.cpp | 1 + .../duckdb/common/multi_file_reader.hpp | 2 ++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/extension/parquet/include/parquet_reader.hpp b/extension/parquet/include/parquet_reader.hpp index 77d9500d52e..1cd1ce446c4 100644 --- a/extension/parquet/include/parquet_reader.hpp +++ b/extension/parquet/include/parquet_reader.hpp @@ -111,6 +111,9 @@ class ParquetReader { MultiFileReaderData reader_data; unique_ptr root_reader; + //! Index of the file_row_number column + idx_t file_row_number_idx = DConstants::INVALID_INDEX; + public: void InitializeScan(ParquetReaderScanState &state, vector groups_to_read); void Scan(ParquetReaderScanState &state, DataChunk &output); diff --git a/extension/parquet/parquet_extension.cpp b/extension/parquet/parquet_extension.cpp index a7eb6aef1a8..86a1ceac6b5 100644 --- a/extension/parquet/parquet_extension.cpp +++ b/extension/parquet/parquet_extension.cpp @@ -185,6 +185,21 @@ static MultiFileReaderBindData BindSchema(ClientContext &context, vector(context, result.files[0], options); + result.Initialize(std::move(reader)); + return bind_data; } @@ -234,11 +249,20 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind continue; } + // Handle any generate columns that are not in the schema (currently only file_row_number) + if (global_column_index >= parquet_options.schema.size()) { + if (bind_data.reader_bind.file_row_number_idx == global_column_index) { + reader_data.column_mapping.push_back(i); + reader_data.column_ids.push_back(reader.file_row_number_idx); + } + continue; + } + const auto &column_definition = parquet_options.schema[global_column_index]; auto it = field_id_to_column_index.find(column_definition.field_id); if (it == field_id_to_column_index.end()) { // field id not present in file, use default value - reader_data.constant_map.emplace_back(global_column_index, column_definition.default_value); + reader_data.constant_map.emplace_back(i, column_definition.default_value); continue; } @@ -249,7 +273,7 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind reader_data.cast_map[local_column_index] = column_definition.type; } - reader_data.column_mapping.push_back(global_column_index); + reader_data.column_mapping.push_back(i); reader_data.column_ids.push_back(local_column_index); } reader_data.empty_columns = reader_data.column_ids.empty(); @@ -384,6 +408,7 @@ class ParquetScanFunction { // a schema was supplied result->reader_bind = BindSchema(context, result->types, result->names, *result, parquet_options); } + if (return_types.empty()) { // no expected types - just copy the types return_types = result->types; diff --git a/extension/parquet/parquet_reader.cpp b/extension/parquet/parquet_reader.cpp index 9129bf766a4..2618bae5394 100644 --- a/extension/parquet/parquet_reader.cpp +++ b/extension/parquet/parquet_reader.cpp @@ -381,6 +381,7 @@ unique_ptr ParquetReader::CreateReader() { root_struct_reader.child_readers[column_idx] = std::move(cast_reader); } if (parquet_options.file_row_number) { + file_row_number_idx = root_struct_reader.child_readers.size(); root_struct_reader.child_readers.push_back( make_uniq(*this, LogicalType::BIGINT, SchemaElement(), next_file_idx, 0, 0)); } diff --git a/src/include/duckdb/common/multi_file_reader.hpp b/src/include/duckdb/common/multi_file_reader.hpp index d6a39a53f80..ca52810e8df 100644 --- a/src/include/duckdb/common/multi_file_reader.hpp +++ b/src/include/duckdb/common/multi_file_reader.hpp @@ -40,6 +40,8 @@ struct MultiFileReaderBindData { idx_t filename_idx = DConstants::INVALID_INDEX; //! The set of hive partitioning indexes (if any) vector hive_partitioning_indexes; + //! The index of the file_row_number column (if any) + idx_t file_row_number_idx = DConstants::INVALID_INDEX; DUCKDB_API void Serialize(Serializer &serializer) const; DUCKDB_API static MultiFileReaderBindData Deserialize(Deserializer &deserializer); From d3f1cebdebf7d1a86d2741a96e1b168b98f4b0f7 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Tue, 10 Oct 2023 12:01:54 +0200 Subject: [PATCH 2/5] add test for file_row_number with schema param --- test/parquet/test_parquet_schema.test | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/parquet/test_parquet_schema.test b/test/parquet/test_parquet_schema.test index 4be770610a3..a7712718a3d 100644 --- a/test/parquet/test_parquet_schema.test +++ b/test/parquet/test_parquet_schema.test @@ -183,6 +183,18 @@ FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map { ---- 5 +# projection still, even with different generated columns +query III +SELECT file_row_number, filename[-16:], i4 +FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map { + 1: {name: 'i1', type: 'BIGINT', default_value: NULL}, + 3: {name: 'i3', type: 'BIGINT', default_value: NULL}, + 4: {name: 'i4', type: 'BIGINT', default_value: 2}, + 5: {name: 'i5', type: 'BIGINT', default_value: NULL} + }, file_row_number=1, filename=1) +---- +0 integers.parquet 2 + # count(*) still ok query I SELECT count(*) From 3a91d5986c07d378d62f8bfdd9990d7f295cd4f2 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Tue, 10 Oct 2023 12:15:26 +0200 Subject: [PATCH 3/5] format --- extension/parquet/parquet_extension.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/parquet/parquet_extension.cpp b/extension/parquet/parquet_extension.cpp index 86a1ceac6b5..2eaadec0a52 100644 --- a/extension/parquet/parquet_extension.cpp +++ b/extension/parquet/parquet_extension.cpp @@ -250,7 +250,7 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind } // Handle any generate columns that are not in the schema (currently only file_row_number) - if (global_column_index >= parquet_options.schema.size()) { + if (global_column_index >= parquet_options.schema.size()) { if (bind_data.reader_bind.file_row_number_idx == global_column_index) { reader_data.column_mapping.push_back(i); reader_data.column_ids.push_back(reader.file_row_number_idx); From 6530e70e65062d35c988c8b6acb4fdb841bf04d2 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Wed, 18 Oct 2023 14:32:40 +0200 Subject: [PATCH 4/5] fix issue with passing a reference to destroyed object --- extension/parquet/include/parquet_reader.hpp | 2 ++ extension/parquet/parquet_extension.cpp | 4 ---- extension/parquet/parquet_reader.cpp | 4 +++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/extension/parquet/include/parquet_reader.hpp b/extension/parquet/include/parquet_reader.hpp index 1cd1ce446c4..a7ffd59b863 100644 --- a/extension/parquet/include/parquet_reader.hpp +++ b/extension/parquet/include/parquet_reader.hpp @@ -113,6 +113,8 @@ class ParquetReader { //! Index of the file_row_number column idx_t file_row_number_idx = DConstants::INVALID_INDEX; + //! Parquet schema for the generated columns + vector generated_column_schema; public: void InitializeScan(ParquetReaderScanState &state, vector groups_to_read); diff --git a/extension/parquet/parquet_extension.cpp b/extension/parquet/parquet_extension.cpp index 2eaadec0a52..68f0179c33c 100644 --- a/extension/parquet/parquet_extension.cpp +++ b/extension/parquet/parquet_extension.cpp @@ -196,10 +196,6 @@ static MultiFileReaderBindData BindSchema(ClientContext &context, vector(context, result.files[0], options); - result.Initialize(std::move(reader)); - return bind_data; } diff --git a/extension/parquet/parquet_reader.cpp b/extension/parquet/parquet_reader.cpp index 2618bae5394..418ab40e732 100644 --- a/extension/parquet/parquet_reader.cpp +++ b/extension/parquet/parquet_reader.cpp @@ -382,8 +382,10 @@ unique_ptr ParquetReader::CreateReader() { } if (parquet_options.file_row_number) { file_row_number_idx = root_struct_reader.child_readers.size(); + + generated_column_schema.push_back(SchemaElement()); root_struct_reader.child_readers.push_back( - make_uniq(*this, LogicalType::BIGINT, SchemaElement(), next_file_idx, 0, 0)); + make_uniq(*this, LogicalType::BIGINT, generated_column_schema.back(), next_file_idx, 0, 0)); } return ret; From d59baf77543d9f17a1faee47eed9cb3444c1d9a3 Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Wed, 18 Oct 2023 14:39:04 +0200 Subject: [PATCH 5/5] format --- extension/parquet/parquet_reader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/parquet/parquet_reader.cpp b/extension/parquet/parquet_reader.cpp index 418ab40e732..842e5a6a959 100644 --- a/extension/parquet/parquet_reader.cpp +++ b/extension/parquet/parquet_reader.cpp @@ -384,8 +384,8 @@ unique_ptr ParquetReader::CreateReader() { file_row_number_idx = root_struct_reader.child_readers.size(); generated_column_schema.push_back(SchemaElement()); - root_struct_reader.child_readers.push_back( - make_uniq(*this, LogicalType::BIGINT, generated_column_schema.back(), next_file_idx, 0, 0)); + root_struct_reader.child_readers.push_back(make_uniq( + *this, LogicalType::BIGINT, generated_column_schema.back(), next_file_idx, 0, 0)); } return ret;