Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
return element.name;
const auto & map = column_mapper->getFieldIdToClickHouseName();
if (!element.__isset.field_id)
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Missing field_id for column {}", element.name);
{
/// Does iceberg require that parquet files have field ids?
/// Our iceberg writer currently doesn't write them.
//throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Missing field_id for column {}", element.name);
return element.name;
}
auto it = map.find(element.field_id);
if (it == map.end())
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
Expand Down
22 changes: 20 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,26 @@ StorageObjectStorage::StorageObjectStorage(
sample_path);
}

supports_prewhere = FormatFactory::instance().checkIfFormatSupportsPrewhere(configuration->getFormat(), context, format_settings);
/// TODO: Known problems with datalake prewhere:
/// * If the iceberg table went through schema evolution, columns read from file may need to
/// be renamed or typecast before applying prewhere. There's already a mechanism for
/// telling parquet reader to rename columns: ColumnMapper. And parquet reader already
/// automatically does type casts to requested types. But weirdly the iceberg reader uses
/// those mechanism to request the *old* name and type of the column, then has additional
/// code to do the renaming and casting as a separate step outside parquet reader.
/// We should probably change this and delete that additional code?
/// * Delta Lake can have "partition columns", which are columns with constant value specified
/// in the metadata, not present in parquet file. Like hive partitioning, but in metadata
/// files instead of path. Currently these columns are added to the block outside parquet
/// reader. If they appear in prewhere expression, parquet reader gets a "no column in block"
/// error. Unlike hive partitioning, we can't (?) just return these columns from
/// supportedPrewhereColumns() because at the time of the call the delta lake metadata hasn't
/// been read yet. So we should probably pass these columns to the parquet reader instead of
/// adding them outside.
/// * There's a bug in StorageObjectStorageSource::createReader: it makes a copy of
/// FormatFilterInfo, but for some reason unsets prewhere_info and row_level_filter_info.
/// There's probably no reason for this, and it should just copy those fields like the others.
supports_prewhere = !configuration->isDataLakeConfiguration() && FormatFactory::instance().checkIfFormatSupportsPrewhere(configuration->getFormat(), context, format_settings);

StorageInMemoryMetadata metadata;
metadata.setColumns(columns);
Expand Down Expand Up @@ -707,4 +726,3 @@ void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands,
}

}

3 changes: 2 additions & 1 deletion tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ def test_position_deletes_out_of_order(started_cluster, use_roaring_bitmaps):

create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, additional_settings=["input_format_parquet_use_native_reader_v3=1", f"use_roaring_bitmap_iceberg_positional_deletes={use_roaring_bitmaps}"])

assert get_array(instance.query(f"SELECT id FROM {TABLE_NAME} PREWHERE NOT sleepEachRow(1/100) order by id")) == list(range(10, 103)) + [104]
# TODO: Replace WHERE with PREWHERE when we add prewhere support for datalakes.
assert get_array(instance.query(f"SELECT id FROM {TABLE_NAME} WHERE NOT sleepEachRow(1/100) order by id")) == list(range(10, 103)) + [104]

instance.query(f"DROP TABLE {TABLE_NAME}")

Expand Down
Loading