From 7839569e8c44554d21c27253b8e013e52c308215 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 13:37:58 -0700 Subject: [PATCH 1/3] perf: Hoist table_metadata at remaining repeat-access sites in snapshot update Follow-up to #2674. Transaction.table_metadata replays all staged updates via model_copy on every access; this applies the #2674 hoist pattern to three more sites in snapshot.py that still read the property more than once per invocation: - _SnapshotProducer._summary: hoist spec()/schema() out of the per-data-file loop - _DeleteFiles._compute_deletes: hoist table_metadata/schema once (was 3 accesses at method entry) - _MergeAppendFiles.__init__: 3 consecutive .properties reads -> 1 Adds a regression test asserting _summary() access count is independent of file count and _MergeAppendFiles.__init__ adds exactly one access over its superclass. --- pyiceberg/table/update/snapshot.py | 27 +++++++++------- tests/table/test_snapshots.py | 51 ++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..d75072872b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: # avoid copying metadata for each data file table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + default_spec = table_metadata.spec() partition_summary_limit = int( table_metadata.properties.get( @@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: for data_file in self._added_data_files: ssc.add_file( data_file=data_file, - partition_spec=table_metadata.spec(), - schema=table_metadata.schema(), + partition_spec=default_spec, + schema=schema, ) if len(self._deleted_data_files) > 0: @@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ssc.remove_file( data_file=data_file, partition_spec=specs[data_file.spec_id], - schema=table_metadata.schema(), + schema=schema, ) previous_snapshot = ( @@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> data_file=entry.data_file, ) + # avoid copying metadata for each evaluator + table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive - ).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive + schema, self._predicate, case_sensitive=self._case_sensitive ).eval existing_manifests = [] @@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Should be the current tip of the _target_branch parent_snapshot_id_for_delete_source = self._parent_snapshot_id if parent_snapshot_id_for_delete_source is not None: - snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) + snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) if snapshot: # Ensure snapshot is found for manifest_file in snapshot.manifests(io=self._io): if manifest_file.content == ManifestContent.DATA: @@ -542,18 +546,19 @@ def __init__( from pyiceberg.table import TableProperties super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) + table_properties = self._transaction.table_metadata.properties self._target_size_bytes = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, ) # type: ignore self._min_count_to_merge = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MIN_MERGE_COUNT, TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, ) # type: ignore self._merge_enabled = property_as_bool( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..2e4b4c3e81 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -551,3 +551,54 @@ def test_latest_ancestor_before_timestamp() -> None: result = latest_ancestor_before_timestamp(metadata, 1000) assert result is None + + +def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None: + """Transaction.table_metadata recomputes via model_copy on every access, so the + snapshot producer must not read it once per item. Guards the hoisting introduced + in #2674 and extended here. + """ + from unittest.mock import patch + + from pyiceberg.table import Transaction + from pyiceberg.table.metadata import TableMetadata + from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles + + original_fget = Transaction.table_metadata.fget + call_count = 0 + + def counting(self: Transaction) -> TableMetadata: + nonlocal call_count + call_count += 1 + return original_fget(self) + + def make_file() -> DataFile: + return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record()) + + txn = table_v2.transaction() + + with patch.object(Transaction, "table_metadata", property(counting)): + # _summary() access count must not scale with the number of data files + def summary_accesses(n_files: int) -> int: + append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + for _ in range(n_files): + append.append_data_file(make_file()) + before = call_count + append._summary() + return call_count - before + + few, many = summary_accesses(10), summary_accesses(100) + assert few == many, f"_summary() table_metadata accesses scale with file count ({few} vs {many})" + assert many <= 2, f"_summary() accessed table_metadata {many} times; expected O(1)" + + # _MergeAppendFiles.__init__ should add exactly one access over _FastAppendFiles.__init__ + before = call_count + _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + fast_init = call_count - before + before = call_count + _MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) + merge_init = call_count - before + assert merge_init - fast_init == 1, ( + f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra table_metadata " + "accesses over its superclass; expected 1 (hoisted)" + ) From ef0c138e6692ca3db76a152acb1817e8fce287ec Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 14:07:38 -0700 Subject: [PATCH 2/3] test: Spy on update_table_metadata instead of property descriptor Avoids mypy attr-defined on Transaction.table_metadata.fget by counting calls to the underlying update_table_metadata function (the actual expensive operation) via mock.patch with wraps. --- pyiceberg/table/update/snapshot.py | 27 +++++++--------- tests/table/test_snapshots.py | 49 ++++++++++++------------------ 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index d75072872b..37d120969a 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -228,8 +228,6 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: # avoid copying metadata for each data file table_metadata = self._transaction.table_metadata - schema = table_metadata.schema() - default_spec = table_metadata.spec() partition_summary_limit = int( table_metadata.properties.get( @@ -241,8 +239,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: for data_file in self._added_data_files: ssc.add_file( data_file=data_file, - partition_spec=default_spec, - schema=schema, + partition_spec=table_metadata.spec(), + schema=table_metadata.schema(), ) if len(self._deleted_data_files) > 0: @@ -251,7 +249,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ssc.remove_file( data_file=data_file, partition_spec=specs[data_file.spec_id], - schema=schema, + schema=table_metadata.schema(), ) previous_snapshot = ( @@ -426,14 +424,12 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> data_file=entry.data_file, ) - # avoid copying metadata for each evaluator - table_metadata = self._transaction.table_metadata - schema = table_metadata.schema() - manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval + strict_metrics_evaluator = _StrictMetricsEvaluator( + self.schema(), self._predicate, case_sensitive=self._case_sensitive + ).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - schema, self._predicate, case_sensitive=self._case_sensitive + self.schema(), self._predicate, case_sensitive=self._case_sensitive ).eval existing_manifests = [] @@ -445,7 +441,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Should be the current tip of the _target_branch parent_snapshot_id_for_delete_source = self._parent_snapshot_id if parent_snapshot_id_for_delete_source is not None: - snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) + snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) if snapshot: # Ensure snapshot is found for manifest_file in snapshot.manifests(io=self._io): if manifest_file.content == ManifestContent.DATA: @@ -546,19 +542,18 @@ def __init__( from pyiceberg.table import TableProperties super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) - table_properties = self._transaction.table_metadata.properties self._target_size_bytes = property_as_int( - table_properties, + self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, ) # type: ignore self._min_count_to_merge = property_as_int( - table_properties, + self._transaction.table_metadata.properties, TableProperties.MANIFEST_MIN_MERGE_COUNT, TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, ) # type: ignore self._merge_enabled = property_as_bool( - table_properties, + self._transaction.table_metadata.properties, TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 2e4b4c3e81..077027f7b9 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -554,51 +554,42 @@ def test_latest_ancestor_before_timestamp() -> None: def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None: - """Transaction.table_metadata recomputes via model_copy on every access, so the - snapshot producer must not read it once per item. Guards the hoisting introduced - in #2674 and extended here. + """Transaction.table_metadata replays staged updates via update_table_metadata on + every access, so the snapshot producer must not read it once per item. Guards the + hoisting introduced in #2674 and extended here. """ - from unittest.mock import patch + from unittest import mock - from pyiceberg.table import Transaction - from pyiceberg.table.metadata import TableMetadata + from pyiceberg.table.update import update_table_metadata from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles - original_fget = Transaction.table_metadata.fget - call_count = 0 - - def counting(self: Transaction) -> TableMetadata: - nonlocal call_count - call_count += 1 - return original_fget(self) - def make_file() -> DataFile: return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record()) txn = table_v2.transaction() - with patch.object(Transaction, "table_metadata", property(counting)): - # _summary() access count must not scale with the number of data files - def summary_accesses(n_files: int) -> int: + with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy: + # _summary() cost must not scale with the number of data files + def summary_calls(n_files: int) -> int: append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) for _ in range(n_files): append.append_data_file(make_file()) - before = call_count + spy.reset_mock() append._summary() - return call_count - before + return spy.call_count - few, many = summary_accesses(10), summary_accesses(100) - assert few == many, f"_summary() table_metadata accesses scale with file count ({few} vs {many})" - assert many <= 2, f"_summary() accessed table_metadata {many} times; expected O(1)" + few, many = summary_calls(10), summary_calls(100) + assert few == many, f"_summary() update_table_metadata calls scale with file count ({few} vs {many})" + assert many <= 2, f"_summary() triggered {many} update_table_metadata calls; expected O(1)" - # _MergeAppendFiles.__init__ should add exactly one access over _FastAppendFiles.__init__ - before = call_count + # _MergeAppendFiles.__init__ should add exactly one call over _FastAppendFiles.__init__ + spy.reset_mock() _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) - fast_init = call_count - before - before = call_count + fast_init = spy.call_count + spy.reset_mock() _MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io) - merge_init = call_count - before + merge_init = spy.call_count assert merge_init - fast_init == 1, ( - f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra table_metadata " - "accesses over its superclass; expected 1 (hoisted)" + f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra update_table_metadata " + "calls over its superclass; expected 1 (hoisted)" ) From b221b5b8e00440c6953035ef89722afad0839b59 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Thu, 30 Apr 2026 14:08:28 -0700 Subject: [PATCH 3/3] Restore hoists in snapshot.py (accidentally reverted in prior commit) --- pyiceberg/table/update/snapshot.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..d75072872b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: # avoid copying metadata for each data file table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + default_spec = table_metadata.spec() partition_summary_limit = int( table_metadata.properties.get( @@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: for data_file in self._added_data_files: ssc.add_file( data_file=data_file, - partition_spec=table_metadata.spec(), - schema=table_metadata.schema(), + partition_spec=default_spec, + schema=schema, ) if len(self._deleted_data_files) > 0: @@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary: ssc.remove_file( data_file=data_file, partition_spec=specs[data_file.spec_id], - schema=table_metadata.schema(), + schema=schema, ) previous_snapshot = ( @@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> data_file=entry.data_file, ) + # avoid copying metadata for each evaluator + table_metadata = self._transaction.table_metadata + schema = table_metadata.schema() + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - strict_metrics_evaluator = _StrictMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive - ).eval + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval inclusive_metrics_evaluator = _InclusiveMetricsEvaluator( - self.schema(), self._predicate, case_sensitive=self._case_sensitive + schema, self._predicate, case_sensitive=self._case_sensitive ).eval existing_manifests = [] @@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Should be the current tip of the _target_branch parent_snapshot_id_for_delete_source = self._parent_snapshot_id if parent_snapshot_id_for_delete_source is not None: - snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) + snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source) if snapshot: # Ensure snapshot is found for manifest_file in snapshot.manifests(io=self._io): if manifest_file.content == ManifestContent.DATA: @@ -542,18 +546,19 @@ def __init__( from pyiceberg.table import TableProperties super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch) + table_properties = self._transaction.table_metadata.properties self._target_size_bytes = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, ) # type: ignore self._min_count_to_merge = property_as_int( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MIN_MERGE_COUNT, TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, ) # type: ignore self._merge_enabled = property_as_bool( - self._transaction.table_metadata.properties, + table_properties, TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, )