Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 140 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.decimal import unscaled_to_decimal
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
from pyiceberg.utils.properties import (
get_first_property_value,
property_as_bool,
property_as_float,
property_as_int,
)
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

Expand Down Expand Up @@ -2473,6 +2478,120 @@ def parquet_path_to_id_mapping(
return result


def id_to_parquet_path_mapping(schema: Schema) -> dict[int, str]:
"""
Compute the mapping of Iceberg column ID to parquet column path.

Args:
schema (pyiceberg.schema.Schema): The current table schema.
"""
result: dict[int, str] = {}
for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
result[pair.field_id] = pair.parquet_path
return result


@dataclass(frozen=True)
class BloomFilterOptions:
parquet_path: str
ndv: int | None
fpp: float | None


class BloomFilterOptionsCollector(PreOrderSchemaVisitor[list[BloomFilterOptions]]):
_field_id: int = 0
_schema: Schema
_properties: dict[str, str]

def __init__(self, schema: Schema, properties: dict[str, str], id_to_parquet_path_mapping: dict[int, str]):
self._schema = schema
self._properties = properties
self._id_to_parquet_path_mapping = id_to_parquet_path_mapping

def schema(
self, schema: Schema, struct_result: Callable[[], builtins.list[BloomFilterOptions]]
) -> builtins.list[BloomFilterOptions]:
return struct_result()

def struct(
self, struct: StructType, field_results: builtins.list[Callable[[], builtins.list[BloomFilterOptions]]]
) -> builtins.list[BloomFilterOptions]:
return list(itertools.chain(*[result() for result in field_results]))

def field(
self, field: NestedField, field_result: Callable[[], builtins.list[BloomFilterOptions]]
) -> builtins.list[BloomFilterOptions]:
self._field_id = field.field_id
return field_result()

def list(
self, list_type: ListType, element_result: Callable[[], builtins.list[BloomFilterOptions]]
) -> builtins.list[BloomFilterOptions]:
self._field_id = list_type.element_id
return element_result()

def map(
self,
map_type: MapType,
key_result: Callable[[], builtins.list[BloomFilterOptions]],
value_result: Callable[[], builtins.list[BloomFilterOptions]],
) -> builtins.list[BloomFilterOptions]:
self._field_id = map_type.key_id
k = key_result()
self._field_id = map_type.value_id
v = value_result()
return k + v

def primitive(self, primitive: PrimitiveType) -> builtins.list[BloomFilterOptions]:
from pyiceberg.table import TableProperties

column_name = self._schema.find_column_name(self._field_id)
if column_name is None:
return []

parquet_path = self._id_to_parquet_path_mapping.get(self._field_id)
if parquet_path is None:
return []

bloom_filter_enabled = property_as_bool(
self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.{column_name}", False
)
if not bloom_filter_enabled:
return []

bloom_filter_fpp = property_as_float(
self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX}.{column_name}", None
)
bloom_filter_ndv = property_as_int(
self._properties, f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX}.{column_name}", None
)

return [BloomFilterOptions(parquet_path=parquet_path, ndv=bloom_filter_ndv, fpp=bloom_filter_fpp)]


def get_bloom_filter_options(
schema: Schema,
table_properties: dict[str, str],
) -> dict[str, dict[str, Any]]:
"""
Get the bloom filter options from the table properties.

Args:
schema (pyiceberg.schema.Schema): The current table schema.
table_properties (dict[str, str]): The table properties.
"""
bloom_filter_options = pre_order_visit(
schema, BloomFilterOptionsCollector(schema, table_properties, id_to_parquet_path_mapping(schema))
)
result: dict[str, dict[str, Any]] = {}
for bf_opts in bloom_filter_options:
result[bf_opts.parquet_path] = {
**({"ndv": bf_opts.ndv} if bf_opts.ndv is not None else {}),
**({"fpp": bf_opts.fpp} if bf_opts.fpp is not None else {}),
}
return result


@dataclass(frozen=True)
class DataFileStatistics:
record_count: int
Expand Down Expand Up @@ -2668,7 +2787,6 @@ def data_file_statistics_from_parquet_metadata(
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
Expand All @@ -2685,6 +2803,8 @@ def write_parquet(task: WriteTask) -> DataFile:
else:
file_schema = table_schema

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties, file_schema)

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
batches = [
_to_requested_schema(
Expand Down Expand Up @@ -2829,14 +2949,25 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
PYARROW_UNCOMPRESSED_CODEC = "none"


def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
def _get_parquet_writer_kwargs(table_properties: Properties, file_schema: Schema) -> dict[str, Any]:
from pyiceberg.table import TableProperties

for key_pattern in [
unsupported_key_patterns = [
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
]:
]

from packaging import version

MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES = "24.0.0"
if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES):
unsupported_key_patterns += [
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX}.*",
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX}.*",
]

for key_pattern in unsupported_key_patterns:
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
warnings.warn(f"Parquet writer option(s) {unsupported_keys} not implemented", stacklevel=2)

Expand All @@ -2849,6 +2980,8 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
compression_codec = PYARROW_UNCOMPRESSED_CODEC

bloom_filter_options = get_bloom_filter_options(file_schema, table_properties)

return {
"compression": compression_codec,
"compression_level": compression_level,
Expand All @@ -2867,6 +3000,7 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
property_name=TableProperties.PARQUET_PAGE_ROW_LIMIT,
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
),
**({"bloom_filter_options": bloom_filter_options} if bloom_filter_options else {}),
}


Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class TableProperties:

PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"

PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX = "write.parquet.bloom-filter-fpp.column"

PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX = "write.parquet.bloom-filter-ndv.column"

WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB

Expand Down
28 changes: 28 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import fastavro
import pandas as pd
import pandas.testing
import pyarrow
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
import pytz
from packaging import version
from pyarrow.fs import S3FileSystem
from pydantic_core import ValidationError
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -68,6 +70,11 @@
from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion
from utils import TABLE_SCHEMA, _create_table

skip_if_bloom_filter_not_supported = pytest.mark.skipif(
version.parse(pyarrow.__version__) < version.parse("24.0.0"),
reason="Requires pyarrow version >= 24.0.0",
)


@pytest.fixture(scope="session", autouse=True)
def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
Expand Down Expand Up @@ -712,6 +719,27 @@ def test_write_parquet_unsupported_properties(
tbl.append(arrow_table_with_null)


@pytest.mark.integration
@skip_if_bloom_filter_not_supported
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_parquet_bloom_filter_properties(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.write_parquet_bloom_filter_properties"

_create_table(
session_catalog,
identifier,
{
"format-version": format_version,
"write.parquet.bloom-filter-enabled.column.string": "true",
"write.parquet.bloom-filter-fpp.column.string": "0.1",
"write.parquet.bloom-filter-ndv.column.string": "100",
},
[arrow_table_with_null],
)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_spark_writes_orc_pyiceberg_reads(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
Expand Down
36 changes: 36 additions & 0 deletions tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pytest

from pyiceberg.exceptions import ValidationError
from pyiceberg.io.pyarrow import get_bloom_filter_options
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromByteStream
Expand Down Expand Up @@ -876,3 +877,38 @@ def test_new_table_metadata_format_v2_with_v3_schema_fails(field_type: Primitive
location="s3://some_v1_location/",
properties={"format-version": "2"},
)


def test_get_bloom_filter_options() -> None:
schema = Schema(
NestedField(field_id=10, name="foo", field_type=StringType(), required=False),
NestedField(field_id=22, name="bar", field_type=IntegerType(), required=False),
NestedField(field_id=33, name="baz", field_type=BooleanType(), required=False),
NestedField(
field_id=34,
name="qux",
field_type=StructType(
NestedField(field_id=35, name="quux", field_type=StringType(), required=False),
NestedField(field_id=36, name="quuux", field_type=IntegerType(), required=False),
),
required=False,
),
)

table_properties = {
"write.parquet.bloom-filter-enabled.column.foo": "true",
"write.parquet.bloom-filter-fpp.column.foo": "0.01",
"write.parquet.bloom-filter-ndv.column.foo": "1000",
"write.parquet.bloom-filter-enabled.column.bar": "false",
"write.parquet.bloom-filter-fpp.column.bar": "0.02",
"write.parquet.bloom-filter-ndv.column.bar": "2000",
"write.parquet.bloom-filter-enabled.column.qux.quux": "true",
"write.parquet.bloom-filter-fpp.column.qux.quux": "0.03",
"write.parquet.bloom-filter-ndv.column.qux.quux": "3000",
}

bloom_filter_options = get_bloom_filter_options(schema, table_properties)
assert bloom_filter_options == {
"foo": {"fpp": 0.01, "ndv": 1000},
"qux.quux": {"fpp": 0.03, "ndv": 3000},
}
Loading