Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class Transaction:
_autocommit: bool
_updates: tuple[TableUpdate, ...]
_requirements: tuple[TableRequirement, ...]
_table_metadata_cache: tuple[TableMetadata, tuple[TableUpdate, ...], TableMetadata] | None

def __init__(self, table: Table, autocommit: bool = False):
"""Open a transaction to stage and commit changes to a table.
Expand All @@ -223,10 +224,21 @@ def __init__(self, table: Table, autocommit: bool = False):
self._autocommit = autocommit
self._updates = ()
self._requirements = ()
self._table_metadata_cache = None

@property
def table_metadata(self) -> TableMetadata:
return update_table_metadata(self._table.metadata, self._updates)
base, updates = self._table.metadata, self._updates
# update_table_metadata replays every staged update via model_copy(deep=True);
# the cache is keyed on the identity of its inputs so it self-invalidates
# whenever _updates is reassigned (tuple += creates a new object) or the
# underlying table metadata is refreshed.
cached = self._table_metadata_cache
if cached is not None and cached[0] is base and cached[1] is updates:
return cached[2]
result = update_table_metadata(base, updates)
self._table_metadata_cache = (base, updates, result)
return result

def __enter__(self) -> Transaction:
"""Start a transaction to update the table."""
Expand Down
26 changes: 26 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -1766,3 +1766,29 @@ def test_build_large_partition_predicate(table_v2: Table) -> None:
)

bind(table_v2.metadata.schema(), expr, case_sensitive=True)


def test_transaction_table_metadata_cached(table_v2: Table) -> None:
"""Transaction.table_metadata should not recompute (replay updates via model_copy)
on every access while the underlying inputs are unchanged, and must recompute once
new updates are staged.
"""
from unittest import mock

from pyiceberg.table.update import SetPropertiesUpdate, update_table_metadata

with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy:
txn = table_v2.transaction()

first = txn.table_metadata
for _ in range(10):
assert txn.table_metadata is first
assert spy.call_count == 1, f"expected 1 recompute for repeated reads, got {spy.call_count}"

txn._stage((SetPropertiesUpdate(updates={"k": "v"}),))
second = txn.table_metadata
assert second is not first
assert second.properties["k"] == "v"
for _ in range(10):
assert txn.table_metadata is second
assert spy.call_count == 2, f"expected 2 recomputes after one staged update, got {spy.call_count}"
Loading