Skip to content

Commit

Permalink
Merge pull request #9290 from samansmink/parquet-schema-param-fix
Browse files Browse the repository at this point in the history
Allow file_row_number with parquet schema option
  • Loading branch information
Mytherin committed Oct 19, 2023
2 parents 8e698ae + d59baf7 commit 2f4425c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 4 deletions.
5 changes: 5 additions & 0 deletions extension/parquet/include/parquet_reader.hpp
Expand Up @@ -111,6 +111,11 @@ class ParquetReader {
MultiFileReaderData reader_data;
unique_ptr<ColumnReader> root_reader;

//! Index of the file_row_number column
idx_t file_row_number_idx = DConstants::INVALID_INDEX;
//! Parquet schema for the generated columns
vector<duckdb_parquet::format::SchemaElement> generated_column_schema;

public:
void InitializeScan(ParquetReaderScanState &state, vector<idx_t> groups_to_read);
void Scan(ParquetReaderScanState &state, DataChunk &output);
Expand Down
25 changes: 23 additions & 2 deletions extension/parquet/parquet_extension.cpp
Expand Up @@ -185,6 +185,17 @@ static MultiFileReaderBindData BindSchema(ClientContext &context, vector<Logical
return_types = schema_col_types;
D_ASSERT(names.size() == return_types.size());

if (options.file_row_number) {
if (std::find(names.begin(), names.end(), "file_row_number") != names.end()) {
throw BinderException(
"Using file_row_number option on file with column named file_row_number is not supported");
}

bind_data.file_row_number_idx = names.size();
return_types.emplace_back(LogicalType::BIGINT);
names.emplace_back("file_row_number");
}

return bind_data;
}

Expand Down Expand Up @@ -234,11 +245,20 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind
continue;
}

// Handle any generate columns that are not in the schema (currently only file_row_number)
if (global_column_index >= parquet_options.schema.size()) {
if (bind_data.reader_bind.file_row_number_idx == global_column_index) {
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(reader.file_row_number_idx);
}
continue;
}

const auto &column_definition = parquet_options.schema[global_column_index];
auto it = field_id_to_column_index.find(column_definition.field_id);
if (it == field_id_to_column_index.end()) {
// field id not present in file, use default value
reader_data.constant_map.emplace_back(global_column_index, column_definition.default_value);
reader_data.constant_map.emplace_back(i, column_definition.default_value);
continue;
}

Expand All @@ -249,7 +269,7 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind
reader_data.cast_map[local_column_index] = column_definition.type;
}

reader_data.column_mapping.push_back(global_column_index);
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(local_column_index);
}
reader_data.empty_columns = reader_data.column_ids.empty();
Expand Down Expand Up @@ -384,6 +404,7 @@ class ParquetScanFunction {
// a schema was supplied
result->reader_bind = BindSchema(context, result->types, result->names, *result, parquet_options);
}

if (return_types.empty()) {
// no expected types - just copy the types
return_types = result->types;
Expand Down
7 changes: 5 additions & 2 deletions extension/parquet/parquet_reader.cpp
Expand Up @@ -381,8 +381,11 @@ unique_ptr<ColumnReader> ParquetReader::CreateReader() {
root_struct_reader.child_readers[column_idx] = std::move(cast_reader);
}
if (parquet_options.file_row_number) {
root_struct_reader.child_readers.push_back(
make_uniq<RowNumberColumnReader>(*this, LogicalType::BIGINT, SchemaElement(), next_file_idx, 0, 0));
file_row_number_idx = root_struct_reader.child_readers.size();

generated_column_schema.push_back(SchemaElement());
root_struct_reader.child_readers.push_back(make_uniq<RowNumberColumnReader>(
*this, LogicalType::BIGINT, generated_column_schema.back(), next_file_idx, 0, 0));
}

return ret;
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/common/multi_file_reader.hpp
Expand Up @@ -40,6 +40,8 @@ struct MultiFileReaderBindData {
idx_t filename_idx = DConstants::INVALID_INDEX;
//! The set of hive partitioning indexes (if any)
vector<HivePartitioningIndex> hive_partitioning_indexes;
//! The index of the file_row_number column (if any)
idx_t file_row_number_idx = DConstants::INVALID_INDEX;

DUCKDB_API void Serialize(Serializer &serializer) const;
DUCKDB_API static MultiFileReaderBindData Deserialize(Deserializer &deserializer);
Expand Down
12 changes: 12 additions & 0 deletions test/parquet/test_parquet_schema.test
Expand Up @@ -183,6 +183,18 @@ FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map {
----
5

# projection still, even with different generated columns
query III
SELECT file_row_number, filename[-16:], i4
FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map {
1: {name: 'i1', type: 'BIGINT', default_value: NULL},
3: {name: 'i3', type: 'BIGINT', default_value: NULL},
4: {name: 'i4', type: 'BIGINT', default_value: 2},
5: {name: 'i5', type: 'BIGINT', default_value: NULL}
}, file_row_number=1, filename=1)
----
0 integers.parquet 2

# count(*) still ok
query I
SELECT count(*)
Expand Down

0 comments on commit 2f4425c

Please sign in to comment.