From 933716e1b7fe934acfd5d4465c2a4c72fb57708a Mon Sep 17 00:00:00 2001 From: Renaud Bourassa Date: Tue, 21 Apr 2026 16:51:35 -0400 Subject: [PATCH 1/2] feat: Add support for writing bloom filters --- pyiceberg/io/pyarrow.py | 146 ++++++++++++++++++- pyiceberg/table/__init__.py | 4 + pyiceberg/utils/properties.py | 35 +++++ tests/integration/test_writes/test_writes.py | 28 ++++ tests/table/test_metadata.py | 36 +++++ 5 files changed, 243 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8f22261f5d..f543901530 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -180,7 +180,12 @@ from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal -from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int +from pyiceberg.utils.properties import ( + get_first_property_value, + property_as_bool, + property_as_float, + property_as_int, +) from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -2473,6 +2478,120 @@ def parquet_path_to_id_mapping( return result +def id_to_parquet_path_mapping(schema: Schema) -> dict[int, str]: + """ + Compute the mapping of Iceberg column ID to parquet column path. + + Args: + schema (pyiceberg.schema.Schema): The current table schema. + """ + result: dict[int, str] = {} + for pair in pre_order_visit(schema, ID2ParquetPathVisitor()): + result[pair.field_id] = pair.parquet_path + return result + + +@dataclass(frozen=True) +class BloomFilterOptions: + parquet_path: str + ndv: int | None + fpp: float | None + + +class BloomFilterOptionsCollector(PreOrderSchemaVisitor[list[BloomFilterOptions]]): + _field_id: int = 0 + _schema: Schema + _properties: dict[str, str] + + def __init__(self, schema: Schema, properties: dict[str, str], id_to_parquet_path_mapping: dict[int, str]): + self._schema = schema + self._properties = properties + self._id_to_parquet_path_mapping = id_to_parquet_path_mapping + + def schema( + self, schema: Schema, struct_result: Callable[[], builtins.list[BloomFilterOptions]] + ) -> builtins.list[BloomFilterOptions]: + return struct_result() + + def struct( + self, struct: StructType, field_results: builtins.list[Callable[[], builtins.list[BloomFilterOptions]]] + ) -> builtins.list[BloomFilterOptions]: + return list(itertools.chain(*[result() for result in field_results])) + + def field( + self, field: NestedField, field_result: Callable[[], builtins.list[BloomFilterOptions]] + ) -> builtins.list[BloomFilterOptions]: + self._field_id = field.field_id + return field_result() + + def list( + self, list_type: ListType, element_result: Callable[[], builtins.list[BloomFilterOptions]] + ) -> builtins.list[BloomFilterOptions]: + self._field_id = list_type.element_id + return element_result() + + def map( + self, + map_type: MapType, + key_result: Callable[[], builtins.list[BloomFilterOptions]], + value_result: Callable[[], builtins.list[BloomFilterOptions]], + ) -> builtins.list[BloomFilterOptions]: + self._field_id = map_type.key_id + k = key_result() + self._field_id = map_type.value_id + v = value_result() + return k + v + + def primitive(self, primitive: PrimitiveType) -> builtins.list[BloomFilterOptions]: + from pyiceberg.table import TableProperties + + column_name = self._schema.find_column_name(self._field_id) + if column_name is None: + return [] + + parquet_path = self._id_to_parquet_path_mapping.get(self._field_id) + if parquet_path is None: + return [] + + bloom_filter_enabled = property_as_bool( + self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.{column_name}", False + ) + if not bloom_filter_enabled: + return [] + + bloom_filter_fpp = property_as_float( + self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX}.{column_name}", None + ) + bloom_filter_ndv = property_as_int( + self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX}.{column_name}", None + ) + + return [BloomFilterOptions(parquet_path=parquet_path, ndv=bloom_filter_ndv, fpp=bloom_filter_fpp)] + + +def get_bloom_filter_options( + schema: Schema, + table_properties: dict[str, str], +) -> dict[str, dict[str, Any]]: + """ + Get the bloom filter options from the table properties. + + Args: + schema (pyiceberg.schema.Schema): The current table schema. + table_properties (dict[str, str]): The table properties. + """ + bloom_filter_options = pre_order_visit( + schema, BloomFilterOptionsCollector(schema, table_properties, id_to_parquet_path_mapping(schema)) + ) + result: dict[str, dict[str, Any]] = {} + for bf_opts in bloom_filter_options: + result[bf_opts.parquet_path] = { + **({"ndv": bf_opts.ndv} if bf_opts.ndv is not None else {}), + **({"fpp": bf_opts.fpp} if bf_opts.fpp is not None else {}), + } + return result + + @dataclass(frozen=True) class DataFileStatistics: record_count: int @@ -2668,7 +2787,6 @@ def data_file_statistics_from_parquet_metadata( def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties - parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = property_as_int( properties=table_metadata.properties, property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, @@ -2685,6 +2803,8 @@ def write_parquet(task: WriteTask) -> DataFile: else: file_schema = table_schema + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties, file_schema) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False batches = [ _to_requested_schema( @@ -2829,14 +2949,25 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa PYARROW_UNCOMPRESSED_CODEC = "none" -def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]: +def _get_parquet_writer_kwargs(table_properties: Properties, file_schema: Schema) -> dict[str, Any]: from pyiceberg.table import TableProperties - for key_pattern in [ + unsupported_key_patterns = [ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES, - f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*", - ]: + ] + + from packaging import version + + MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES = "24.0.0" + if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES): + unsupported_key_patterns += [ + f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*", + f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX}.*", + f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX}.*", + ] + + for key_pattern in unsupported_key_patterns: if unsupported_keys := fnmatch.filter(table_properties, key_pattern): warnings.warn(f"Parquet writer option(s) {unsupported_keys} not implemented", stacklevel=2) @@ -2849,6 +2980,8 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]: if compression_codec == ICEBERG_UNCOMPRESSED_CODEC: compression_codec = PYARROW_UNCOMPRESSED_CODEC + bloom_filter_options = get_bloom_filter_options(file_schema, table_properties) + return { "compression": compression_codec, "compression_level": compression_level, @@ -2867,6 +3000,7 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]: property_name=TableProperties.PARQUET_PAGE_ROW_LIMIT, default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT, ), + **({"bloom_filter_options": bloom_filter_options} if bloom_filter_options else {}), } diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb8765b651..9cfb83cbee 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -147,6 +147,10 @@ class TableProperties: PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column" + PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX = "write.parquet.bloom-filter-fpp.column" + + PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX = "write.parquet.bloom-filter-ndv.column" + WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes" WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 2a95b39a50..60b124c41a 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -66,6 +66,41 @@ def property_as_bool( return default +def properties_as_int_dict( + properties: dict[str, str], + property_prefix: str, +) -> dict[str, int]: + return { + key.removeprefix(property_prefix + "."): value + for key in properties.keys() + if key.startswith(property_prefix) + if (value := property_as_int(properties, key, None)) is not None + } + + +def properties_as_float_dict( + properties: dict[str, str], + property_prefix: str, +) -> dict[str, float]: + return { + key.removeprefix(property_prefix + "."): value + for key in properties.keys() + if key.startswith(property_prefix) + if (value := property_as_float(properties, key, None)) is not None + } + + +def properties_as_bool_dict( + properties: dict[str, str], + property_prefix: str, +) -> dict[str, bool]: + return { + key.removeprefix(property_prefix + "."): property_as_bool(properties, key, False) + for key in properties.keys() + if key.startswith(property_prefix) + } + + def get_first_property_value( properties: Properties, *property_names: str, diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 0a09867656..385642856c 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -30,11 +30,13 @@ import fastavro import pandas as pd import pandas.testing +import pyarrow import pyarrow as pa import pyarrow.compute as pc import pyarrow.parquet as pq import pytest import pytz +from packaging import version from pyarrow.fs import S3FileSystem from pydantic_core import ValidationError from pyspark.sql import SparkSession @@ -68,6 +70,11 @@ from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion from utils import TABLE_SCHEMA, _create_table +skip_if_bloom_filter_not_supported = pytest.mark.skipif( + version.parse(pyarrow.__version__) < version.parse("24.0.0"), + reason="Requires pyarrow version >= 24.0.0", +) + @pytest.fixture(scope="session", autouse=True) def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: @@ -712,6 +719,27 @@ def test_write_parquet_unsupported_properties( tbl.append(arrow_table_with_null) +@pytest.mark.integration +@skip_if_bloom_filter_not_supported +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_parquet_bloom_filter_properties( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.write_parquet_bloom_filter_properties" + + _create_table( + session_catalog, + identifier, + { + "format-version": format_version, + "write.parquet.bloom-filter-enabled.column.string": "true", + "write.parquet.bloom-filter-fpp.column.string": "0.1", + "write.parquet.bloom-filter-ndv.column.string": "100", + }, + [arrow_table_with_null], + ) + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_spark_writes_orc_pyiceberg_reads(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index c163c90626..f82cadd4fd 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -26,6 +26,7 @@ import pytest from pyiceberg.exceptions import ValidationError +from pyiceberg.io.pyarrow import get_bloom_filter_options from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromByteStream @@ -876,3 +877,38 @@ def test_new_table_metadata_format_v2_with_v3_schema_fails(field_type: Primitive location="s3://some_v1_location/", properties={"format-version": "2"}, ) + + +def test_get_bloom_filter_options() -> None: + schema = Schema( + NestedField(field_id=10, name="foo", field_type=StringType(), required=False), + NestedField(field_id=22, name="bar", field_type=IntegerType(), required=False), + NestedField(field_id=33, name="baz", field_type=BooleanType(), required=False), + NestedField( + field_id=34, + name="qux", + field_type=StructType( + NestedField(field_id=35, name="quux", field_type=StringType(), required=False), + NestedField(field_id=36, name="quuux", field_type=IntegerType(), required=False), + ), + required=False, + ), + ) + + table_properties = { + "write.parquet.bloom-filter-enabled.column.foo": "true", + "write.parquet.bloom-filter-fpp.column.foo": "0.01", + "write.parquet.bloom-filter-ndv.column.foo": "1000", + "write.parquet.bloom-filter-enabled.column.bar": "false", + "write.parquet.bloom-filter-fpp.column.bar": "0.02", + "write.parquet.bloom-filter-ndv.column.bar": "2000", + "write.parquet.bloom-filter-enabled.column.qux.quux": "true", + "write.parquet.bloom-filter-fpp.column.qux.quux": "0.03", + "write.parquet.bloom-filter-ndv.column.qux.quux": "3000", + } + + bloom_filter_options = get_bloom_filter_options(schema, table_properties) + assert bloom_filter_options == { + "foo": {"fpp": 0.01, "ndv": 1000}, + "qux.quux": {"fpp": 0.03, "ndv": 3000}, + } From b86268b40a14198b6c453c9648da2f2f48593058 Mon Sep 17 00:00:00 2001 From: Renaud Bourassa Date: Tue, 21 Apr 2026 17:07:45 -0400 Subject: [PATCH 2/2] Remove unused functions --- pyiceberg/utils/properties.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 60b124c41a..2a95b39a50 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -66,41 +66,6 @@ def property_as_bool( return default -def properties_as_int_dict( - properties: dict[str, str], - property_prefix: str, -) -> dict[str, int]: - return { - key.removeprefix(property_prefix + "."): value - for key in properties.keys() - if key.startswith(property_prefix) - if (value := property_as_int(properties, key, None)) is not None - } - - -def properties_as_float_dict( - properties: dict[str, str], - property_prefix: str, -) -> dict[str, float]: - return { - key.removeprefix(property_prefix + "."): value - for key in properties.keys() - if key.startswith(property_prefix) - if (value := property_as_float(properties, key, None)) is not None - } - - -def properties_as_bool_dict( - properties: dict[str, str], - property_prefix: str, -) -> dict[str, bool]: - return { - key.removeprefix(property_prefix + "."): property_as_bool(properties, key, False) - for key in properties.keys() - if key.startswith(property_prefix) - } - - def get_first_property_value( properties: Properties, *property_names: str,