From d21f99a216d46d07bd0f3aac43f36d2f60348ed6 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Tue, 21 Apr 2026 19:49:45 -0700 Subject: [PATCH] perf: build partition filter with balanced tree to avoid RecursionError --- pyiceberg/table/__init__.py | 24 ++++++++++++------------ tests/table/test_init.py | 13 +++++++++++++ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb8765b651..7f1524642b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -367,20 +367,20 @@ def _build_partition_predicate( A predicate matching any of the input partition records. """ partition_fields = [schema.find_field(field.source_id).name for field in spec.fields] + if not partition_records or not partition_fields: + return AlwaysFalse() - expr: BooleanExpression = AlwaysFalse() + per_record_exprs: list[BooleanExpression] = [] for partition_record in partition_records: - match_partition_expression: BooleanExpression = AlwaysTrue() - - for pos, partition_field in enumerate(partition_fields): - predicate = ( - EqualTo(Reference(partition_field), partition_record[pos]) - if partition_record[pos] is not None - else IsNull(Reference(partition_field)) - ) - match_partition_expression = And(match_partition_expression, predicate) - expr = Or(expr, match_partition_expression) - return expr + predicates: list[BooleanExpression] = [ + EqualTo(Reference(partition_field), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_field)) + for pos, partition_field in enumerate(partition_fields) + ] + per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0]) + + return Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0] def _append_snapshot_producer( self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 30c4a3a45a..0c4ea258f3 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -32,6 +32,7 @@ EqualTo, In, ) +from pyiceberg.expressions.visitors import bind from pyiceberg.io import PY_IO_IMPL, load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -90,6 +91,7 @@ BucketTransform, IdentityTransform, ) +from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, @@ -1753,3 +1755,14 @@ def test_check_uuid_passes_when_match(table_v2: Table, example_table_metadata_v2 new_metadata = TableMetadataV2(**example_table_metadata_v2) # Should not raise with same uuid Table._check_uuid(table_v2.metadata, new_metadata) + + +def test_build_large_partition_predicate(table_v2: Table) -> None: + with table_v2.transaction() as tx: + expr = tx._build_partition_predicate( + partition_records={Record(i) for i in range(5000)}, + spec=table_v2.metadata.spec(), + schema=table_v2.metadata.schema(), + ) + + bind(table_v2.metadata.schema(), expr, case_sensitive=True)