Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow file_row_number with parquet schema option #9290

Merged
merged 5 commits into from Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions extension/parquet/include/parquet_reader.hpp
Expand Up @@ -111,6 +111,9 @@ class ParquetReader {
MultiFileReaderData reader_data;
unique_ptr<ColumnReader> root_reader;

//! Index of the file_row_number column
idx_t file_row_number_idx = DConstants::INVALID_INDEX;

public:
void InitializeScan(ParquetReaderScanState &state, vector<idx_t> groups_to_read);
void Scan(ParquetReaderScanState &state, DataChunk &output);
Expand Down
29 changes: 27 additions & 2 deletions extension/parquet/parquet_extension.cpp
Expand Up @@ -185,6 +185,21 @@ static MultiFileReaderBindData BindSchema(ClientContext &context, vector<Logical
return_types = schema_col_types;
D_ASSERT(names.size() == return_types.size());

if (options.file_row_number) {
if (std::find(names.begin(), names.end(), "file_row_number") != names.end()) {
throw BinderException(
"Using file_row_number option on file with column named file_row_number is not supported");
}

bind_data.file_row_number_idx = names.size();
return_types.emplace_back(LogicalType::BIGINT);
names.emplace_back("file_row_number");
samansmink marked this conversation as resolved.
Show resolved Hide resolved
}

// Create initial reader
auto reader = make_shared<ParquetReader>(context, result.files[0], options);
result.Initialize(std::move(reader));

return bind_data;
}

Expand Down Expand Up @@ -234,11 +249,20 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind
continue;
}

// Handle any generate columns that are not in the schema (currently only file_row_number)
if (global_column_index >= parquet_options.schema.size()) {
if (bind_data.reader_bind.file_row_number_idx == global_column_index) {
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(reader.file_row_number_idx);
}
continue;
}

const auto &column_definition = parquet_options.schema[global_column_index];
auto it = field_id_to_column_index.find(column_definition.field_id);
if (it == field_id_to_column_index.end()) {
// field id not present in file, use default value
reader_data.constant_map.emplace_back(global_column_index, column_definition.default_value);
reader_data.constant_map.emplace_back(i, column_definition.default_value);
continue;
}

Expand All @@ -249,7 +273,7 @@ static void InitializeParquetReader(ParquetReader &reader, const ParquetReadBind
reader_data.cast_map[local_column_index] = column_definition.type;
}

reader_data.column_mapping.push_back(global_column_index);
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(local_column_index);
}
reader_data.empty_columns = reader_data.column_ids.empty();
Expand Down Expand Up @@ -384,6 +408,7 @@ class ParquetScanFunction {
// a schema was supplied
result->reader_bind = BindSchema(context, result->types, result->names, *result, parquet_options);
}

if (return_types.empty()) {
// no expected types - just copy the types
return_types = result->types;
Expand Down
1 change: 1 addition & 0 deletions extension/parquet/parquet_reader.cpp
Expand Up @@ -381,6 +381,7 @@ unique_ptr<ColumnReader> ParquetReader::CreateReader() {
root_struct_reader.child_readers[column_idx] = std::move(cast_reader);
}
if (parquet_options.file_row_number) {
file_row_number_idx = root_struct_reader.child_readers.size();
root_struct_reader.child_readers.push_back(
make_uniq<RowNumberColumnReader>(*this, LogicalType::BIGINT, SchemaElement(), next_file_idx, 0, 0));
}
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/common/multi_file_reader.hpp
Expand Up @@ -40,6 +40,8 @@ struct MultiFileReaderBindData {
idx_t filename_idx = DConstants::INVALID_INDEX;
//! The set of hive partitioning indexes (if any)
vector<HivePartitioningIndex> hive_partitioning_indexes;
//! The index of the file_row_number column (if any)
idx_t file_row_number_idx = DConstants::INVALID_INDEX;

DUCKDB_API void Serialize(Serializer &serializer) const;
DUCKDB_API static MultiFileReaderBindData Deserialize(Deserializer &deserializer);
Expand Down
12 changes: 12 additions & 0 deletions test/parquet/test_parquet_schema.test
Expand Up @@ -183,6 +183,18 @@ FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map {
----
5

# projection still, even with different generated columns
query III
SELECT file_row_number, filename[-16:], i4
FROM read_parquet('__TEST_DIR__/integers.parquet', schema=map {
1: {name: 'i1', type: 'BIGINT', default_value: NULL},
3: {name: 'i3', type: 'BIGINT', default_value: NULL},
4: {name: 'i4', type: 'BIGINT', default_value: 2},
5: {name: 'i5', type: 'BIGINT', default_value: NULL}
}, file_row_number=1, filename=1)
----
0 integers.parquet 2

# count(*) still ok
query I
SELECT count(*)
Expand Down