diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index 0beb0f3df0..f3cfabf1a5 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -1241,7 +1241,8 @@ def visit_less_than(self, term: BoundTerm, literal: LiteralValue) -> bool: if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if lower_bound_bytes := self.lower_bounds.get(field_id): + lower_bound_bytes = self.lower_bounds.get(field_id) + if lower_bound_bytes is not None: lower_bound = from_bytes(field.field_type, lower_bound_bytes) if self._is_nan(lower_bound): @@ -1263,7 +1264,8 @@ def visit_less_than_or_equal(self, term: BoundTerm, literal: LiteralValue) -> bo if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if lower_bound_bytes := self.lower_bounds.get(field_id): + lower_bound_bytes = self.lower_bounds.get(field_id) + if lower_bound_bytes is not None: lower_bound = from_bytes(field.field_type, lower_bound_bytes) if self._is_nan(lower_bound): # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. @@ -1284,7 +1286,8 @@ def visit_greater_than(self, term: BoundTerm, literal: LiteralValue) -> bool: if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if upper_bound_bytes := self.upper_bounds.get(field_id): + upper_bound_bytes = self.upper_bounds.get(field_id) + if upper_bound_bytes is not None: upper_bound = from_bytes(field.field_type, upper_bound_bytes) if upper_bound <= literal.value: if self._is_nan(upper_bound): @@ -1305,7 +1308,8 @@ def visit_greater_than_or_equal(self, term: BoundTerm, literal: LiteralValue) -> if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if upper_bound_bytes := self.upper_bounds.get(field_id): + upper_bound_bytes = self.upper_bounds.get(field_id) + if upper_bound_bytes is not None: upper_bound = from_bytes(field.field_type, upper_bound_bytes) if upper_bound < literal.value: if self._is_nan(upper_bound): @@ -1326,7 +1330,8 @@ def visit_equal(self, term: BoundTerm, literal: LiteralValue) -> bool: if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if lower_bound_bytes := self.lower_bounds.get(field_id): + lower_bound_bytes = self.lower_bounds.get(field_id) + if lower_bound_bytes is not None: lower_bound = from_bytes(field.field_type, lower_bound_bytes) if self._is_nan(lower_bound): # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. @@ -1335,7 +1340,8 @@ def visit_equal(self, term: BoundTerm, literal: LiteralValue) -> bool: if lower_bound > literal.value: return ROWS_CANNOT_MATCH - if upper_bound_bytes := self.upper_bounds.get(field_id): + upper_bound_bytes = self.upper_bounds.get(field_id) + if upper_bound_bytes is not None: upper_bound = from_bytes(field.field_type, upper_bound_bytes) if self._is_nan(upper_bound): # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. @@ -1363,7 +1369,8 @@ def visit_in(self, term: BoundTerm, literals: set[L]) -> bool: if not isinstance(field.field_type, PrimitiveType): raise ValueError(f"Expected PrimitiveType: {field.field_type}") - if lower_bound_bytes := self.lower_bounds.get(field_id): + lower_bound_bytes = self.lower_bounds.get(field_id) + if lower_bound_bytes is not None: lower_bound = from_bytes(field.field_type, lower_bound_bytes) if self._is_nan(lower_bound): # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. @@ -1373,7 +1380,8 @@ def visit_in(self, term: BoundTerm, literals: set[L]) -> bool: if len(literals) == 0: return ROWS_CANNOT_MATCH - if upper_bound_bytes := self.upper_bounds.get(field_id): + upper_bound_bytes = self.upper_bounds.get(field_id) + if upper_bound_bytes is not None: upper_bound = from_bytes(field.field_type, upper_bound_bytes) # this is different from Java, here NaN is always larger if self._is_nan(upper_bound): @@ -1403,14 +1411,16 @@ def visit_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: prefix = str(literal.value) len_prefix = len(prefix) - if lower_bound_bytes := self.lower_bounds.get(field_id): + lower_bound_bytes = self.lower_bounds.get(field_id) + if lower_bound_bytes is not None: lower_bound = str(from_bytes(field.field_type, lower_bound_bytes)) # truncate lower bound so that its length is not greater than the length of prefix if lower_bound and lower_bound[:len_prefix] > prefix: return ROWS_CANNOT_MATCH - if upper_bound_bytes := self.upper_bounds.get(field_id): + upper_bound_bytes = self.upper_bounds.get(field_id) + if upper_bound_bytes is not None: upper_bound = str(from_bytes(field.field_type, upper_bound_bytes)) # truncate upper bound so that its length is not greater than the length of prefix @@ -1434,7 +1444,9 @@ def visit_not_starts_with(self, term: BoundTerm, literal: LiteralValue) -> bool: # not_starts_with will match unless all values must start with the prefix. This happens when # the lower and upper bounds both start with the prefix. - if (lower_bound_bytes := self.lower_bounds.get(field_id)) and (upper_bound_bytes := self.upper_bounds.get(field_id)): + lower_bound_bytes = self.lower_bounds.get(field_id) + upper_bound_bytes = self.upper_bounds.get(field_id) + if lower_bound_bytes is not None and upper_bound_bytes is not None: lower_bound = str(from_bytes(field.field_type, lower_bound_bytes)) upper_bound = str(from_bytes(field.field_type, upper_bound_bytes)) @@ -1558,7 +1570,8 @@ def visit_less_than(self, term: BoundTerm, literal: LiteralValue) -> bool: if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id): return ROWS_MIGHT_NOT_MATCH - if upper_bytes := self.upper_bounds.get(field_id): + upper_bytes = self.upper_bounds.get(field_id) + if upper_bytes is not None: field = self._get_field(field_id) upper = _from_byte_buffer(field.field_type, upper_bytes) @@ -1575,7 +1588,8 @@ def visit_less_than_or_equal(self, term: BoundTerm, literal: LiteralValue) -> bo if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id): return ROWS_MIGHT_NOT_MATCH - if upper_bytes := self.upper_bounds.get(field_id): + upper_bytes = self.upper_bounds.get(field_id) + if upper_bytes is not None: field = self._get_field(field_id) upper = _from_byte_buffer(field.field_type, upper_bytes) @@ -1592,7 +1606,8 @@ def visit_greater_than(self, term: BoundTerm, literal: LiteralValue) -> bool: if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id): return ROWS_MIGHT_NOT_MATCH - if lower_bytes := self.lower_bounds.get(field_id): + lower_bytes = self.lower_bounds.get(field_id) + if lower_bytes is not None: field = self._get_field(field_id) lower = _from_byte_buffer(field.field_type, lower_bytes) @@ -1613,7 +1628,8 @@ def visit_greater_than_or_equal(self, term: BoundTerm, literal: LiteralValue) -> if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id): return ROWS_MIGHT_NOT_MATCH - if lower_bytes := self.lower_bounds.get(field_id): + lower_bytes = self.lower_bounds.get(field_id) + if lower_bytes is not None: field = self._get_field(field_id) lower = _from_byte_buffer(field.field_type, lower_bytes) @@ -1634,7 +1650,9 @@ def visit_equal(self, term: BoundTerm, literal: LiteralValue) -> bool: if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id): return ROWS_MIGHT_NOT_MATCH - if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)): + lower_bytes = self.lower_bounds.get(field_id) + upper_bytes = self.upper_bounds.get(field_id) + if lower_bytes is not None and upper_bytes is not None: field = self._get_field(field_id) lower = _from_byte_buffer(field.field_type, lower_bytes) upper = _from_byte_buffer(field.field_type, upper_bytes) @@ -1655,7 +1673,8 @@ def visit_not_equal(self, term: BoundTerm, literal: LiteralValue) -> bool: field = self._get_field(field_id) - if lower_bytes := self.lower_bounds.get(field_id): + lower_bytes = self.lower_bounds.get(field_id) + if lower_bytes is not None: lower = _from_byte_buffer(field.field_type, lower_bytes) if self._is_nan(lower): @@ -1666,7 +1685,8 @@ def visit_not_equal(self, term: BoundTerm, literal: LiteralValue) -> bool: if lower > literal.value: return ROWS_MUST_MATCH - if upper_bytes := self.upper_bounds.get(field_id): + upper_bytes = self.upper_bounds.get(field_id) + if upper_bytes is not None: upper = _from_byte_buffer(field.field_type, upper_bytes) if upper < literal.value: @@ -1682,7 +1702,9 @@ def visit_in(self, term: BoundTerm, literals: set[L]) -> bool: field = self._get_field(field_id) - if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)): + lower_bytes = self.lower_bounds.get(field_id) + upper_bytes = self.upper_bounds.get(field_id) + if lower_bytes is not None and upper_bytes is not None: # similar to the implementation in eq, first check if the lower bound is in the set lower = _from_byte_buffer(field.field_type, lower_bytes) if lower not in literals: @@ -1711,7 +1733,8 @@ def visit_not_in(self, term: BoundTerm, literals: set[L]) -> bool: field = self._get_field(field_id) - if lower_bytes := self.lower_bounds.get(field_id): + lower_bytes = self.lower_bounds.get(field_id) + if lower_bytes is not None: lower = _from_byte_buffer(field.field_type, lower_bytes) if self._is_nan(lower): @@ -1723,7 +1746,8 @@ def visit_not_in(self, term: BoundTerm, literals: set[L]) -> bool: if len(literals) == 0: return ROWS_MUST_MATCH - if upper_bytes := self.upper_bounds.get(field_id): + upper_bytes = self.upper_bounds.get(field_id) + if upper_bytes is not None: upper = _from_byte_buffer(field.field_type, upper_bytes) literals = {val for val in literals if upper >= val} diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index db9a035bdc..4ec7a73afe 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1605,7 +1605,8 @@ def _get_column_projection_values( for field_id in project_schema_diff: for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform): - if partition_value := accessors[partition_field.field_id].get(file.partition): + partition_value = accessors[partition_field.field_id].get(file.partition) + if partition_value is not None: projected_missing_fields[field_id] = partition_value return projected_missing_fields @@ -2010,7 +2011,8 @@ def struct( elif field.optional or field.initial_default is not None: # When an optional field is added, or when a required field with a non-null initial default is added arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids) - if projected_value := self._projected_missing_fields.get(field.field_id): + projected_value = self._projected_missing_fields.get(field.field_id) + if projected_value is not None: field_arrays.append(pa.repeat(pa.scalar(projected_value, type=arrow_type), len(struct_array))) elif field.initial_default is None: field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 5da343ccb6..54504d70df 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -38,6 +38,10 @@ ALWAYS_TRUE = AlwaysTrue() +def _readable_bound(field_type: PrimitiveType, bound: bytes | None) -> Any | None: + return from_bytes(field_type, bound) if bound is not None else None + + class InspectTable: tbl: Table @@ -180,12 +184,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: "null_value_count": null_value_counts.get(field.field_id), "nan_value_count": nan_value_counts.get(field.field_id), # Makes them readable - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, + "lower_bound": _readable_bound(field.field_type, lower_bounds.get(field.field_id)), + "upper_bound": _readable_bound(field.field_type, upper_bounds.get(field.field_id)), } for field in self.tbl.metadata.schema().fields } @@ -570,12 +570,8 @@ def _get_files_from_manifest( "value_count": value_counts.get(field.field_id), "null_value_count": null_value_counts.get(field.field_id), "nan_value_count": nan_value_counts.get(field.field_id), - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, + "lower_bound": _readable_bound(field.field_type, lower_bounds.get(field.field_id)), + "upper_bound": _readable_bound(field.field_type, upper_bounds.get(field.field_id)), } for field in self.tbl.metadata.schema().fields } diff --git a/tests/expressions/test_evaluator.py b/tests/expressions/test_evaluator.py index b8e4d87044..ea1bef0a7d 100644 --- a/tests/expressions/test_evaluator.py +++ b/tests/expressions/test_evaluator.py @@ -900,6 +900,62 @@ def test_string_starts_with( # assert not should_read, "Should not read: range doesn't match" +def test_inclusive_metrics_evaluator_uses_empty_byte_lower_bound() -> None: + schema = Schema(NestedField(1, "empty_string", StringType(), required=True)) + data_file = DataFile.from_args( + file_path="file.parquet", + file_format=FileFormat.PARQUET, + partition={}, + record_count=10, + file_size_in_bytes=1, + value_counts={1: 10}, + null_value_counts={1: 0}, + nan_value_counts=None, + lower_bounds={1: to_bytes(StringType(), "")}, + upper_bounds={1: to_bytes(StringType(), "")}, + ) + + # Lower-bound branch: LessThan reads lower_bound only. + should_read = _InclusiveMetricsEvaluator(schema, LessThan("empty_string", "")).eval(data_file) + assert not should_read, "Should not read: lower bound is present and equal to the literal" + + # Upper-bound branch: GreaterThan reads upper_bound only. + should_read = _InclusiveMetricsEvaluator(schema, GreaterThan("empty_string", "abc")).eval(data_file) + assert not should_read, "Should not read: upper bound '' is not greater than 'abc'" + + # Both-bounds branch: EqualTo reads lower_bound and upper_bound. + should_read = _InclusiveMetricsEvaluator(schema, EqualTo("empty_string", "abc")).eval(data_file) + assert not should_read, "Should not read: 'abc' falls outside ['', '']" + + +def test_strict_metrics_evaluator_uses_empty_byte_bounds() -> None: + schema = Schema(NestedField(1, "empty_string", StringType(), required=True)) + data_file = DataFile.from_args( + file_path="file.parquet", + file_format=FileFormat.PARQUET, + partition={}, + record_count=10, + file_size_in_bytes=1, + value_counts={1: 10}, + null_value_counts={1: 0}, + nan_value_counts=None, + lower_bounds={1: to_bytes(StringType(), "")}, + upper_bounds={1: to_bytes(StringType(), "")}, + ) + + # Both-bounds branch: EqualTo reads lower_bound and upper_bound. + should_read = _StrictMetricsEvaluator(schema, EqualTo("empty_string", "")).eval(data_file) + assert should_read, "Should match: lower and upper bounds are present and equal to the literal" + + # Upper-bound branch: LessThan reads upper_bound only. + should_read = _StrictMetricsEvaluator(schema, LessThan("empty_string", "a")).eval(data_file) + assert should_read, "Should match: upper bound '' is strictly less than 'a'" + + # Both-bounds branch: NotEqualTo reads lower_bound and upper_bound. + should_read = _StrictMetricsEvaluator(schema, NotEqualTo("empty_string", "abc")).eval(data_file) + assert should_read, "Should match: 'abc' falls outside ['', '']" + + def test_string_not_starts_with( schema_data_file: Schema, data_file: DataFile, data_file_2: DataFile, data_file_3: DataFile, data_file_4: DataFile ) -> None: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index a05b295fc1..2f36661a1f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1371,6 +1371,75 @@ def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCa assert len(table.scan(row_filter="partition_id = -1").to_arrow()) == 0 +@pytest.mark.parametrize( + "partition_field_type, arrow_partition_type, partition_value", + [ + (IntegerType(), pa.int32(), 0), + (StringType(), pa.large_string(), ""), + (IntegerType(), pa.int32(), None), + ], +) +def test_identity_transform_column_projection_with_falsy_value( + tmp_path: str, + catalog: InMemoryCatalog, + partition_field_type: PrimitiveType, + arrow_partition_type: pa.DataType, + partition_value: Any, +) -> None: + """Partition value projection must preserve falsy values (0, "") and still render None as null.""" + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), + NestedField(2, "partition_col", partition_field_type, required=False), + ) + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_col"), + ) + + catalog.create_namespace("default") + table = catalog.create_table( + f"default.test_projection_partition_{partition_value!r}", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo", "bar"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(partition_value), + file_size_in_bytes=os.path.getsize(file_loc), + sort_order_id=None, + spec_id=table.metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) + + expected_schema = pa.schema([("other_field", pa.string()), ("partition_col", arrow_partition_type)]) + assert table.scan().to_arrow() == pa.table( + { + "other_field": ["foo", "bar"], + "partition_col": [partition_value, partition_value], + }, + schema=expected_schema, + ) + + def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: # Test by adding a non-partitioned data file to a multi-partitioned table, verifying partition value # projection from manifest metadata. diff --git a/tests/table/test_inspect.py b/tests/table/test_inspect.py new file mode 100644 index 0000000000..c325af2033 --- /dev/null +++ b/tests/table/test_inspect.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pathlib import PosixPath + +import pyarrow as pa +import pytest + +from pyiceberg.conversions import to_bytes +from pyiceberg.schema import Schema +from pyiceberg.table.inspect import _readable_bound +from pyiceberg.types import NestedField, StringType +from tests.catalog.test_base import InMemoryCatalog + + +def test_readable_bound_with_empty_bytes() -> None: + assert _readable_bound(StringType(), to_bytes(StringType(), "")) == "" + + +def test_readable_bound_without_bound() -> None: + assert _readable_bound(StringType(), None) is None + + +@pytest.fixture +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + cat = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + cat.create_namespace("default") + return cat + + +def test_inspect_entries_and_files_render_empty_string_bound(catalog: InMemoryCatalog) -> None: + schema = Schema(NestedField(1, "s", StringType(), required=False)) + tbl = catalog.create_table("default.empty_string_bound", schema) + tbl.append(pa.table({"s": [""]}, schema=pa.schema([pa.field("s", pa.large_string(), nullable=True)]))) + + entries_metrics = tbl.inspect.entries().to_pydict()["readable_metrics"][0]["s"] + assert entries_metrics["lower_bound"] == "" + assert entries_metrics["upper_bound"] == "" + + files_metrics = tbl.inspect.files().to_pydict()["readable_metrics"][0]["s"] + assert files_metrics["lower_bound"] == "" + assert files_metrics["upper_bound"] == "" + + +def test_inspect_entries_and_files_render_null_bound(catalog: InMemoryCatalog) -> None: + schema = Schema(NestedField(1, "s", StringType(), required=False)) + tbl = catalog.create_table("default.null_bound", schema) + tbl.append(pa.table({"s": [None]}, schema=pa.schema([pa.field("s", pa.large_string(), nullable=True)]))) + + entries_metrics = tbl.inspect.entries().to_pydict()["readable_metrics"][0]["s"] + assert entries_metrics["lower_bound"] is None + assert entries_metrics["upper_bound"] is None + + files_metrics = tbl.inspect.files().to_pydict()["readable_metrics"][0]["s"] + assert files_metrics["lower_bound"] is None + assert files_metrics["upper_bound"] is None