Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,20 @@ def _build_partition_predicate(
A predicate matching any of the input partition records.
"""
partition_fields = [schema.find_field(field.source_id).name for field in spec.fields]
if not partition_records or not partition_fields:
return AlwaysFalse()

expr: BooleanExpression = AlwaysFalse()
per_record_exprs: list[BooleanExpression] = []
for partition_record in partition_records:
match_partition_expression: BooleanExpression = AlwaysTrue()

for pos, partition_field in enumerate(partition_fields):
predicate = (
EqualTo(Reference(partition_field), partition_record[pos])
if partition_record[pos] is not None
else IsNull(Reference(partition_field))
)
match_partition_expression = And(match_partition_expression, predicate)
expr = Or(expr, match_partition_expression)
return expr
predicates: list[BooleanExpression] = [
EqualTo(Reference(partition_field), partition_record[pos])
if partition_record[pos] is not None
else IsNull(Reference(partition_field))
for pos, partition_field in enumerate(partition_fields)
]
per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0])

return Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0]

def _append_snapshot_producer(
self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH
Expand Down
13 changes: 13 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
EqualTo,
In,
)
from pyiceberg.expressions.visitors import bind
from pyiceberg.io import PY_IO_IMPL, load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -90,6 +91,7 @@
BucketTransform,
IdentityTransform,
)
from pyiceberg.typedef import Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1753,3 +1755,14 @@ def test_check_uuid_passes_when_match(table_v2: Table, example_table_metadata_v2
new_metadata = TableMetadataV2(**example_table_metadata_v2)
# Should not raise with same uuid
Table._check_uuid(table_v2.metadata, new_metadata)


def test_build_large_partition_predicate(table_v2: Table) -> None:
with table_v2.transaction() as tx:
expr = tx._build_partition_predicate(
partition_records={Record(i) for i in range(5000)},
spec=table_v2.metadata.spec(),
schema=table_v2.metadata.schema(),
)

bind(table_v2.metadata.schema(), expr, case_sensitive=True)
Loading