Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:

# avoid copying metadata for each data file
table_metadata = self._transaction.table_metadata
schema = table_metadata.schema()
default_spec = table_metadata.spec()

partition_summary_limit = int(
table_metadata.properties.get(
Expand All @@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
for data_file in self._added_data_files:
ssc.add_file(
data_file=data_file,
partition_spec=table_metadata.spec(),
schema=table_metadata.schema(),
partition_spec=default_spec,
schema=schema,
)

if len(self._deleted_data_files) > 0:
Expand All @@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
ssc.remove_file(
data_file=data_file,
partition_spec=specs[data_file.spec_id],
schema=table_metadata.schema(),
schema=schema,
)

previous_snapshot = (
Expand Down Expand Up @@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
data_file=entry.data_file,
)

# avoid copying metadata for each evaluator
table_metadata = self._transaction.table_metadata
schema = table_metadata.schema()

manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(
self.schema(), self._predicate, case_sensitive=self._case_sensitive
).eval
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
self.schema(), self._predicate, case_sensitive=self._case_sensitive
schema, self._predicate, case_sensitive=self._case_sensitive
).eval

existing_manifests = []
Expand All @@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
# Should be the current tip of the _target_branch
parent_snapshot_id_for_delete_source = self._parent_snapshot_id
if parent_snapshot_id_for_delete_source is not None:
snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
if snapshot: # Ensure snapshot is found
for manifest_file in snapshot.manifests(io=self._io):
if manifest_file.content == ManifestContent.DATA:
Expand Down Expand Up @@ -542,18 +546,19 @@ def __init__(
from pyiceberg.table import TableProperties

super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
table_properties = self._transaction.table_metadata.properties
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
table_properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
) # type: ignore
self._min_count_to_merge = property_as_int(
self._transaction.table_metadata.properties,
table_properties,
TableProperties.MANIFEST_MIN_MERGE_COUNT,
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
) # type: ignore
self._merge_enabled = property_as_bool(
self._transaction.table_metadata.properties,
table_properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
Expand Down
42 changes: 42 additions & 0 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,45 @@ def test_latest_ancestor_before_timestamp() -> None:

result = latest_ancestor_before_timestamp(metadata, 1000)
assert result is None


def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None:
"""Transaction.table_metadata replays staged updates via update_table_metadata on
every access, so the snapshot producer must not read it once per item. Guards the
hoisting introduced in #2674 and extended here.
"""
from unittest import mock

from pyiceberg.table.update import update_table_metadata
from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles

def make_file() -> DataFile:
return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record())

txn = table_v2.transaction()

with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy:
# _summary() cost must not scale with the number of data files
def summary_calls(n_files: int) -> int:
append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
for _ in range(n_files):
append.append_data_file(make_file())
spy.reset_mock()
append._summary()
return spy.call_count

few, many = summary_calls(10), summary_calls(100)
assert few == many, f"_summary() update_table_metadata calls scale with file count ({few} vs {many})"
assert many <= 2, f"_summary() triggered {many} update_table_metadata calls; expected O(1)"

# _MergeAppendFiles.__init__ should add exactly one call over _FastAppendFiles.__init__
spy.reset_mock()
_FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
fast_init = spy.call_count
spy.reset_mock()
_MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
merge_init = spy.call_count
assert merge_init - fast_init == 1, (
f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra update_table_metadata "
"calls over its superclass; expected 1 (hoisted)"
)
Loading