From d4ca6535a3cd978cd3b129ab5fcd1b6bc5a99999 Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 10:46:37 -0800
Subject: [PATCH 01/21] s/"main"/MAIN_BRANCH

(cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4)
---
 pyiceberg/table/__init__.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index b43dc3206b..08f554bfaa 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -397,7 +397,7 @@ def set_ref_snapshot(
             ),
         )
 
-        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),)
+        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),)
         return self._apply(updates, requirements)
 
     def _set_ref_snapshot(
@@ -3205,10 +3205,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch"
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
+            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),
         )
 
     @property

From 0b7aaaf928f57941da4f28694d46fdf98884245a Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 11:17:28 -0800
Subject: [PATCH 02/21] replace string literals

(cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0)
---
 pyiceberg/cli/console.py    | 6 +++---
 pyiceberg/table/__init__.py | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py
index d1833df081..213883bc70 100644
--- a/pyiceberg/cli/console.py
+++ b/pyiceberg/cli/console.py
@@ -32,7 +32,7 @@
 from pyiceberg.catalog import Catalog, load_catalog
 from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
 from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
-from pyiceberg.table.refs import SnapshotRef
+from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
 
 DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1
 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000
@@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
     refs = table.refs()
     if type:
         type = type.lower()
-        if type not in {"branch", "tag"}:
+        if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}:
             raise ValueError(f"Type must be either branch or tag, got: {type}")
 
     relevant_refs = [
@@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
 
 def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
     retention_properties = {}
-    if ref.snapshot_ref_type == "branch":
+    if ref.snapshot_ref_type == SnapshotRefType.BRANCH:
         default_min_snapshots_to_keep = table_properties.get(
             "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP
         )
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 08f554bfaa..395b8fa688 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -117,7 +117,7 @@
     NameMapping,
     update_mapping,
 )
-from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     Operation,
     Snapshot,
@@ -813,7 +813,7 @@ class AddSnapshotUpdate(IcebergBaseModel):
 class SetSnapshotRefUpdate(IcebergBaseModel):
     action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref")
     ref_name: str = Field(alias="ref-name")
-    type: Literal["tag", "branch"]
+    type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH]
     snapshot_id: int = Field(alias="snapshot-id")
     max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)]
     max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)]
@@ -3205,7 +3205,7 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch"
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH
                 ),
             ),
             (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),

From 23f04ec039f223c3995ac27dd75e72e8f21b178f Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 11:20:48 -0800
Subject: [PATCH 03/21] default writes to main branch

(cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e)
---
 pyiceberg/table/__init__.py | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 395b8fa688..5aac334764 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -397,7 +397,7 @@ def set_ref_snapshot(
             ),
         )
 
-        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),)
+        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),)
         return self._apply(updates, requirements)
 
     def _set_ref_snapshot(
@@ -1540,7 +1540,7 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
-    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None:
         """
         Shorthand API for appending a PyArrow table to the table.
 
@@ -3044,14 +3044,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     _added_data_files: List[DataFile]
     _manifest_num_counter: itertools.count[int]
     _deleted_data_files: Set[DataFile]
+    _branch: str
 
     def __init__(
         self,
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
-        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT
     ) -> None:
         super().__init__(transaction)
         self.commit_uuid = commit_uuid or uuid.uuid4()
@@ -3059,6 +3061,7 @@ def __init__(
         self._operation = operation
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
         # Since we only support the main branch for now
+        self._branch = branch
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
         )
@@ -3205,10 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),
+            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
         )
 
     @property

From af6ff9aebad07cde3fdb746cfb1d3c73ddcedc83 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Thu, 18 Jul 2024 22:33:17 +0530
Subject: [PATCH 04/21] Added some more methods for branches

---
 pyiceberg/table/__init__.py | 31 +++++++++++++++++++++----------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5aac334764..1cd0a2c04c 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -1549,13 +1549,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.append(df=df, snapshot_properties=snapshot_properties)
+            tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch)
 
     def overwrite(
         self,
         df: pa.Table,
         overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        branch: str = MAIN_BRANCH,
     ) -> None:
         """
         Shorthand for overwriting the table with a PyArrow table.
@@ -1573,7 +1574,7 @@ def overwrite(
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)
+            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
 
     def delete(
         self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
@@ -3053,14 +3054,13 @@ def __init__(
         io: FileIO,
         branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
-        snapshot_properties: Dict[str, str] = EMPTY_DICT
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         super().__init__(transaction)
         self.commit_uuid = commit_uuid or uuid.uuid4()
         self._io = io
         self._operation = operation
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
-        # Since we only support the main branch for now
         self._branch = branch
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
@@ -3208,7 +3208,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH
+                    snapshot_id=self._snapshot_id,
+                    parent_snapshot_id=self._parent_snapshot_id,
+                    ref_name=self._branch,
+                    type=SnapshotRefType.BRANCH,
                 ),
             ),
             (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
@@ -3260,10 +3263,13 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ):
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(
+            operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties
+        )
         self._predicate = AlwaysFalse()
 
     def _commit(self) -> UpdatesAndRequirements:
@@ -3419,10 +3425,11 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
         self._target_size_bytes = PropertyUtil.property_as_int(
             self._transaction.table_metadata.properties,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
@@ -3538,21 +3545,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
 class UpdateSnapshot:
     _transaction: Transaction
     _io: FileIO
+    _branch: str
     _snapshot_properties: Dict[str, str]
 
-    def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
         self._transaction = transaction
         self._io = io
         self._snapshot_properties = snapshot_properties
+        self._branch = branch
 
     def fast_append(self) -> FastAppendFiles:
         return FastAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties
         )
 
     def merge_append(self) -> MergeAppendFiles:
         return MergeAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties
         )
 
     def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
@@ -3564,6 +3573,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
             transaction=self._transaction,
             io=self._io,
             snapshot_properties=self._snapshot_properties,
+            branch=self._branch
         )
 
     def delete(self) -> DeleteFiles:
@@ -3572,6 +3582,7 @@ def delete(self) -> DeleteFiles:
             transaction=self._transaction,
             io=self._io,
             snapshot_properties=self._snapshot_properties,
+            branch=self._branch
         )
 
 

From 6fbf3f18c994b4c2f8d3484ee5c43cf4c3d9991f Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 10:46:37 -0800
Subject: [PATCH 05/21] s/"main"/MAIN_BRANCH

(cherry picked from commit ad1ec672bac7cc7cf5e74defcee8a37df6c316f4)
---
 pyiceberg/table/__init__.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 34e9d2c53b..cb6e4efde4 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -402,7 +402,7 @@ def set_ref_snapshot(
             ),
         )
 
-        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref="main"),)
+        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),)
         return self._apply(updates, requirements)
 
     def _set_ref_snapshot(
@@ -3227,10 +3227,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch"
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
+            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),
         )
 
     @property

From 8ce1509c6ca5e6aa38d615c99bcdb25051bcb4ad Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 11:17:28 -0800
Subject: [PATCH 06/21] replace string literals

(cherry picked from commit 40cf10e0c5d3d2a5366b5b9b85191aa319e151d0)
---
 pyiceberg/cli/console.py    | 6 +++---
 pyiceberg/table/__init__.py | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py
index d1833df081..213883bc70 100644
--- a/pyiceberg/cli/console.py
+++ b/pyiceberg/cli/console.py
@@ -32,7 +32,7 @@
 from pyiceberg.catalog import Catalog, load_catalog
 from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
 from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
-from pyiceberg.table.refs import SnapshotRef
+from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
 
 DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1
 DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000
@@ -420,7 +420,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
     refs = table.refs()
     if type:
         type = type.lower()
-        if type not in {"branch", "tag"}:
+        if type not in {SnapshotRefType.BRANCH, SnapshotRefType.TAG}:
             raise ValueError(f"Type must be either branch or tag, got: {type}")
 
     relevant_refs = [
@@ -434,7 +434,7 @@ def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
 
 def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str]) -> Dict[str, str]:
     retention_properties = {}
-    if ref.snapshot_ref_type == "branch":
+    if ref.snapshot_ref_type == SnapshotRefType.BRANCH:
         default_min_snapshots_to_keep = table_properties.get(
             "history.expire.min-snapshots-to-keep", DEFAULT_MIN_SNAPSHOTS_TO_KEEP
         )
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index cb6e4efde4..b8362338f4 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -115,7 +115,7 @@
     NameMapping,
     update_mapping,
 )
-from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     Operation,
     Snapshot,
@@ -828,7 +828,7 @@ class AddSnapshotUpdate(IcebergBaseModel):
 class SetSnapshotRefUpdate(IcebergBaseModel):
     action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref")
     ref_name: str = Field(alias="ref-name")
-    type: Literal["tag", "branch"]
+    type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH]
     snapshot_id: int = Field(alias="snapshot-id")
     max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)]
     max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)]
@@ -3227,7 +3227,7 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type="branch"
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH
                 ),
             ),
             (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),

From 6daf29e4459c9e64403e56eab07f07457bd392a9 Mon Sep 17 00:00:00 2001
From: Kevin Liu <kevin.jq.liu@gmail.com>
Date: Sat, 27 Jan 2024 11:20:48 -0800
Subject: [PATCH 07/21] default writes to main branch

(cherry picked from commit 6dbe68d8dcb282375f158fbcc0466b985092078e)
---
 pyiceberg/table/__init__.py | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index b8362338f4..595adb701e 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -402,7 +402,7 @@ def set_ref_snapshot(
             ),
         )
 
-        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=MAIN_BRANCH),)
+        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),)
         return self._apply(updates, requirements)
 
     def _set_ref_snapshot(
@@ -1562,7 +1562,7 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
-    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None:
         """
         Shorthand API for appending a PyArrow table to the table.
 
@@ -3066,14 +3066,16 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     _added_data_files: List[DataFile]
     _manifest_num_counter: itertools.count[int]
     _deleted_data_files: Set[DataFile]
+    _branch: str
 
     def __init__(
         self,
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
-        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT
     ) -> None:
         super().__init__(transaction)
         self.commit_uuid = commit_uuid or uuid.uuid4()
@@ -3081,6 +3083,7 @@ def __init__(
         self._operation = operation
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
         # Since we only support the main branch for now
+        self._branch = branch
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
         )
@@ -3227,10 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=MAIN_BRANCH, type=SnapshotRefType.BRANCH
+                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=MAIN_BRANCH),),
+            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
         )
 
     @property

From 09321cdbe3ee36bff32cc389c5679308a2e6e83e Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Thu, 18 Jul 2024 22:33:17 +0530
Subject: [PATCH 08/21] Added some more methods for branches

---
 pyiceberg/table/__init__.py | 31 +++++++++++++++++++++----------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 595adb701e..d2a94b2e35 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -1571,13 +1571,14 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.append(df=df, snapshot_properties=snapshot_properties)
+            tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch)
 
     def overwrite(
         self,
         df: pa.Table,
         overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        branch: str = MAIN_BRANCH,
     ) -> None:
         """
         Shorthand for overwriting the table with a PyArrow table.
@@ -1595,7 +1596,7 @@ def overwrite(
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties)
+            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
 
     def delete(
         self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
@@ -3075,14 +3076,13 @@ def __init__(
         io: FileIO,
         branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
-        snapshot_properties: Dict[str, str] = EMPTY_DICT
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         super().__init__(transaction)
         self.commit_uuid = commit_uuid or uuid.uuid4()
         self._io = io
         self._operation = operation
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
-        # Since we only support the main branch for now
         self._branch = branch
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
@@ -3230,7 +3230,10 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name=self._branch, type=SnapshotRefType.BRANCH
+                    snapshot_id=self._snapshot_id,
+                    parent_snapshot_id=self._parent_snapshot_id,
+                    ref_name=self._branch,
+                    type=SnapshotRefType.BRANCH,
                 ),
             ),
             (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
@@ -3282,10 +3285,13 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ):
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(
+            operation=operation, transaction=transaction, io=io, branch=branch, commit_uuid=commit_uuid, snapshot_properties=snapshot_properties
+        )
         self._predicate = AlwaysFalse()
 
     def _commit(self) -> UpdatesAndRequirements:
@@ -3441,10 +3447,11 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
         self._target_size_bytes = PropertyUtil.property_as_int(
             self._transaction.table_metadata.properties,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
@@ -3560,21 +3567,23 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
 class UpdateSnapshot:
     _transaction: Transaction
     _io: FileIO
+    _branch: str
     _snapshot_properties: Dict[str, str]
 
-    def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def __init__(self, transaction: Transaction, io: FileIO, branch:str, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
         self._transaction = transaction
         self._io = io
         self._snapshot_properties = snapshot_properties
+        self._branch = branch
 
     def fast_append(self) -> FastAppendFiles:
         return FastAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties
         )
 
     def merge_append(self) -> MergeAppendFiles:
         return MergeAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND, transaction=self._transaction, io=self._io,branch=self._branch, snapshot_properties=self._snapshot_properties
         )
 
     def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
@@ -3586,6 +3595,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
             transaction=self._transaction,
             io=self._io,
             snapshot_properties=self._snapshot_properties,
+            branch=self._branch
         )
 
     def delete(self) -> DeleteFiles:
@@ -3594,6 +3604,7 @@ def delete(self) -> DeleteFiles:
             transaction=self._transaction,
             io=self._io,
             snapshot_properties=self._snapshot_properties,
+            branch=self._branch
         )
 
 

From 45b01a68e1d4572494082a09b49883ecdf00b329 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Sun, 13 Oct 2024 04:50:17 +0530
Subject: [PATCH 09/21] Updated antries for branches

---
 pyiceberg/table/__init__.py        | 32 +++++++++++++++----------
 pyiceberg/table/update/__init__.py |  4 ++--
 pyiceberg/table/update/snapshot.py | 38 +++++++++++++++++++++++-------
 3 files changed, 52 insertions(+), 22 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 52dca937f8..5f4f24f40a 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -78,7 +78,7 @@
 from pyiceberg.table.name_mapping import (
     NameMapping,
 )
-from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
 from pyiceberg.table.snapshots import (
     Snapshot,
     SnapshotLogEntry,
@@ -402,21 +402,22 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
             name_mapping=self.table_metadata.name_mapping(),
         )
 
-    def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
+    def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot:
         """Create a new UpdateSnapshot to produce a new snapshot for the table.
 
         Returns:
             A new UpdateSnapshot
         """
-        return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)
+        return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)
 
-    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
         """
         Shorthand API for appending a PyArrow table to a table transaction.
 
         Args:
             df: The Arrow dataframe that will be appended to overwrite the table
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the overwrite operation
         """
         try:
             import pyarrow as pa
@@ -444,7 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
             TableProperties.MANIFEST_MERGE_ENABLED,
             TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
         )
-        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
+        update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties)
         append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append
 
         with append_method() as append_files:
@@ -461,6 +462,7 @@ def overwrite(
         df: pa.Table,
         overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        branch: str = MAIN_BRANCH,
     ) -> None:
         """
         Shorthand for adding a table overwrite with a PyArrow table to the transaction.
@@ -476,6 +478,7 @@ def overwrite(
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial overwrite
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the overwrite operation
         """
         try:
             import pyarrow as pa
@@ -500,7 +503,7 @@ def overwrite(
 
         self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
 
-        with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
+        with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
             # skip writing data files if the dataframe is empty
             if df.shape[0] > 0:
                 data_files = _dataframe_to_data_files(
@@ -509,7 +512,12 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
-    def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def delete(
+        self,
+        delete_filter: Union[str, BooleanExpression],
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        branch: str = MAIN_BRANCH,
+    ) -> None:
         """
         Shorthand for deleting record from a table.
 
@@ -537,7 +545,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
         if isinstance(delete_filter, str):
             delete_filter = _parse_row_filter(delete_filter)
 
-        with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
+        with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot:
             delete_snapshot.delete_by_predicate(delete_filter)
 
         # Check if there are any files that require an actual rewrite of a data file
@@ -585,7 +593,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
                     ))
 
             if len(replaced_files) > 0:
-                with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
+                with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite(
                     commit_uuid=commit_uuid
                 ) as overwrite_snapshot:
                     for original_data_file, replaced_data_files in replaced_files:
@@ -1003,7 +1011,7 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
-    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None:
+    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
         """
         Shorthand API for appending a PyArrow table to the table.
 
@@ -1012,7 +1020,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch)
+            tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
 
     def overwrite(
         self,
@@ -1037,7 +1045,7 @@ def overwrite(
             snapshot_properties: Custom properties to be added to the snapshot summary
         """
         with self.transaction() as tx:
-            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
+            tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
 
     def delete(
         self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index 6e14046f9a..d02fdc39b6 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -30,7 +30,7 @@
 from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
-from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     MetadataLogEntry,
     Snapshot,
@@ -136,7 +136,7 @@ class AddSnapshotUpdate(IcebergBaseModel):
 class SetSnapshotRefUpdate(IcebergBaseModel):
     action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref")
     ref_name: str = Field(alias="ref-name")
-    type: Literal["tag", "branch"]
+    type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH]
     snapshot_id: int = Field(alias="snapshot-id")
     max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)]
     max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)]
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 47e5fc55e3..3af3ba9a23 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -55,6 +55,7 @@
 from pyiceberg.partitioning import (
     PartitionSpec,
 )
+from pyiceberg.table.refs import SnapshotRefType
 from pyiceberg.table.snapshots import (
     Operation,
     Snapshot,
@@ -103,12 +104,14 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     _added_data_files: List[DataFile]
     _manifest_num_counter: itertools.count[int]
     _deleted_data_files: Set[DataFile]
+    _branch: str
 
     def __init__(
         self,
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
@@ -117,7 +120,7 @@ def __init__(
         self._io = io
         self._operation = operation
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
-        # Since we only support the main branch for now
+        self._branch = branch
         self._parent_snapshot_id = (
             snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
         )
@@ -272,10 +275,13 @@ def _commit(self) -> UpdatesAndRequirements:
             (
                 AddSnapshotUpdate(snapshot=snapshot),
                 SetSnapshotRefUpdate(
-                    snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
+                    snapshot_id=self._snapshot_id,
+                    parent_snapshot_id=self._parent_snapshot_id,
+                    ref_name=self._branch,
+                    type=SnapshotRefType.BRANCH,
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
+            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
         )
 
     @property
@@ -324,10 +330,11 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ):
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
         self._predicate = AlwaysFalse()
 
     def _commit(self) -> UpdatesAndRequirements:
@@ -482,12 +489,13 @@ def __init__(
         operation: Operation,
         transaction: Transaction,
         io: FileIO,
+        branch: str,
         commit_uuid: Optional[uuid.UUID] = None,
         snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         from pyiceberg.table import TableProperties
 
-        super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
+        super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
         self._target_size_bytes = property_as_int(
             self._transaction.table_metadata.properties,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
@@ -603,21 +611,33 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
 class UpdateSnapshot:
     _transaction: Transaction
     _io: FileIO
+    _branch: str
     _snapshot_properties: Dict[str, str]
 
-    def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+    def __init__(
+        self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT
+    ) -> None:
         self._transaction = transaction
         self._io = io
         self._snapshot_properties = snapshot_properties
+        self._branch = branch
 
     def fast_append(self) -> _FastAppendFiles:
         return _FastAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND,
+            transaction=self._transaction,
+            io=self._io,
+            branch=self._branch,
+            snapshot_properties=self._snapshot_properties,
         )
 
     def merge_append(self) -> _MergeAppendFiles:
         return _MergeAppendFiles(
-            operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
+            operation=Operation.APPEND,
+            transaction=self._transaction,
+            io=self._io,
+            branch=self._branch,
+            snapshot_properties=self._snapshot_properties,
         )
 
     def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
@@ -628,6 +648,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
             else Operation.APPEND,
             transaction=self._transaction,
             io=self._io,
+            branch=self._branch,
             snapshot_properties=self._snapshot_properties,
         )
 
@@ -636,6 +657,7 @@ def delete(self) -> _DeleteFiles:
             operation=Operation.DELETE,
             transaction=self._transaction,
             io=self._io,
+            branch=self._branch,
             snapshot_properties=self._snapshot_properties,
         )
 

From 917b044b2a3e244a7a25919325cc619fd8992500 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Tue, 15 Oct 2024 05:23:32 +0530
Subject: [PATCH 10/21] Fixed some bugs

---
 pyiceberg/table/__init__.py        | 3 ++-
 pyiceberg/table/update/__init__.py | 2 ++
 pyiceberg/table/update/snapshot.py | 9 ++++++++-
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5f4f24f40a..2260cd150b 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -417,7 +417,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
         Args:
             df: The Arrow dataframe that will be appended to overwrite the table
             snapshot_properties: Custom properties to be added to the snapshot summary
-            branch: Branch Reference to run the overwrite operation
+            branch: Branch Reference to run the append operation
         """
         try:
             import pyarrow as pa
@@ -529,6 +529,7 @@ def delete(
         Args:
             delete_filter: A boolean expression to delete rows from a table
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the delete operation
         """
         from pyiceberg.io.pyarrow import (
             ArrowScan,
diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index d02fdc39b6..03b9855307 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -597,6 +597,8 @@ class AssertRefSnapshotId(ValidatableTableRequirement):
     def validate(self, base_metadata: Optional[TableMetadata]) -> None:
         if base_metadata is None:
             raise CommitFailedException("Requirement failed: current table metadata is missing")
+        elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH:
+            raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}")
         elif snapshot_ref := base_metadata.refs.get(self.ref):
             ref_type = snapshot_ref.snapshot_ref_type
             if self.snapshot_id is None:
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 3af3ba9a23..902141fa5c 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -281,7 +281,14 @@ def _commit(self) -> UpdatesAndRequirements:
                     type=SnapshotRefType.BRANCH,
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
+            (
+                AssertRefSnapshotId(
+                    snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id
+                    if self._branch in self._transaction.table_metadata.refs
+                    else self._transaction.table_metadata.current_snapshot_id,
+                    ref=self._branch,
+                ),
+            ),
         )
 
     @property

From 398f6c0ee33cee4ba8564d62ede143de52c1243f Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Wed, 16 Oct 2024 05:29:23 +0530
Subject: [PATCH 11/21] Fixed bugs in delete and overwrite

---
 pyiceberg/table/__init__.py        | 14 ++++++++++----
 pyiceberg/table/update/snapshot.py |  4 ++--
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 2260cd150b..662a9795d2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -501,7 +501,7 @@ def overwrite(
             self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
         )
 
-        self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
+        self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
 
         with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
             # skip writing data files if the dataframe is empty
@@ -554,7 +554,7 @@ def delete(
             bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
             preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)
 
-            files = self._scan(row_filter=delete_filter).plan_files()
+            files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files()
 
             commit_uuid = uuid.uuid4()
             counter = itertools.count(0)
@@ -1019,6 +1019,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
         Args:
             df: The Arrow dataframe that will be appended to overwrite the table
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the delete operation
         """
         with self.transaction() as tx:
             tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
@@ -1044,12 +1045,16 @@ def overwrite(
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial overwrite
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the delete operation
         """
         with self.transaction() as tx:
             tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
 
     def delete(
-        self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
+        self,
+        delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+        branch: str = MAIN_BRANCH,
     ) -> None:
         """
         Shorthand for deleting rows from the table.
@@ -1057,9 +1062,10 @@ def delete(
         Args:
             delete_filter: The predicate that used to remove rows
             snapshot_properties: Custom properties to be added to the snapshot summary
+            branch: Branch Reference to run the delete operation
         """
         with self.transaction() as tx:
-            tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
+            tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
 
     def add_files(
         self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 902141fa5c..135e4f5293 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -122,7 +122,7 @@ def __init__(
         self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
         self._branch = branch
         self._parent_snapshot_id = (
-            snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
+            snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None
         )
         self._added_data_files = []
         self._deleted_data_files = set()
@@ -548,7 +548,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
         """Determine if there are any existing manifest files."""
         existing_files = []
 
-        if snapshot := self._transaction.table_metadata.current_snapshot():
+        if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch):
             for manifest_file in snapshot.manifests(io=self._io):
                 entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
                 found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]

From b7b8ba0b945f592d771aca936d1ceafb78a478c5 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Wed, 16 Oct 2024 12:23:35 +0530
Subject: [PATCH 12/21] Added tests and some refactoring

---
 pyiceberg/table/__init__.py                  |  2 +-
 pyiceberg/table/update/snapshot.py           |  4 +-
 tests/integration/test_writes/test_writes.py | 80 +++++++++++++++++++-
 3 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 662a9795d2..d34f875b29 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -501,7 +501,7 @@ def overwrite(
             self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
         )
 
-        self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
+        self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
 
         with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
             # skip writing data files if the dataframe is empty
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 135e4f5293..b3ef552c55 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -55,7 +55,7 @@
 from pyiceberg.partitioning import (
     PartitionSpec,
 )
-from pyiceberg.table.refs import SnapshotRefType
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType
 from pyiceberg.table.snapshots import (
     Operation,
     Snapshot,
@@ -622,7 +622,7 @@ class UpdateSnapshot:
     _snapshot_properties: Dict[str, str]
 
     def __init__(
-        self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT
+        self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
     ) -> None:
         self._transaction = transaction
         self._io = io
diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index fc2746c614..39e7758461 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -39,7 +39,7 @@
 from pyiceberg.catalog.hive import HiveCatalog
 from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.sql import SqlCatalog
-from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
 from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
 from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -1015,7 +1015,8 @@ def test_table_write_schema_with_valid_nullability_diff(
         NestedField(field_id=1, name="long", field_type=LongType(), required=False),
     )
     other_schema = pa.schema((
-        pa.field("long", pa.int64(), nullable=False),  # can support writing required pyarrow field to optional Iceberg field
+        pa.field("long", pa.int64(), nullable=False),
+        # can support writing required pyarrow field to optional Iceberg field
     ))
     arrow_table = pa.Table.from_pydict(
         {
@@ -1062,7 +1063,8 @@ def test_table_write_schema_with_valid_upcast(
             pa.field("list", pa.large_list(pa.int64()), nullable=False),
             pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
             pa.field("double", pa.float64(), nullable=True),  # can support upcasting float to double
-            pa.field("uuid", pa.binary(length=16), nullable=True),  # can UUID is read as fixed length binary of length 16
+            pa.field("uuid", pa.binary(length=16), nullable=True),
+            # can UUID is read as fixed length binary of length 16
         ))
     )
     lhs = spark.table(f"{identifier}").toPandas()
@@ -1448,3 +1450,75 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) ->
             EqualTo("category", "A"),
         ),
     )
+
+
+@pytest.mark.integration
+def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.test_non_existing_branch"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [])
+    with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"):
+        tbl.append(arrow_table_with_null, branch="non_existing_branch")
+
+
+@pytest.mark.integration
+def test_append_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.test_existing_branch_append"
+    branch = "existing_branch"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+
+    assert tbl.metadata.current_snapshot_id is not None
+
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
+    tbl.append(arrow_table_with_null, branch=branch)
+
+    assert len(tbl.scan().use_ref(branch).to_arrow()) == 6
+    assert len(tbl.scan().to_arrow()) == 3
+    branch_snapshot = tbl.metadata.snapshot_by_name(branch)
+    assert branch_snapshot is not None
+    main_snapshot = tbl.metadata.snapshot_by_name("main")
+    assert main_snapshot is not None
+    assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id
+
+
+@pytest.mark.integration
+def test_delete_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.test_existing_branch_delete"
+    branch = "existing_branch"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+
+    assert tbl.metadata.current_snapshot_id is not None
+
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
+    tbl.delete(delete_filter="int = 9", branch=branch)
+
+    assert len(tbl.scan().use_ref(branch).to_arrow()) == 2
+    assert len(tbl.scan().to_arrow()) == 3
+    branch_snapshot = tbl.metadata.snapshot_by_name(branch)
+    assert branch_snapshot is not None
+    main_snapshot = tbl.metadata.snapshot_by_name("main")
+    assert main_snapshot is not None
+    assert branch_snapshot.parent_snapshot_id == main_snapshot.snapshot_id
+
+
+@pytest.mark.integration
+def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.test_existing_branch_overwrite"
+    branch = "existing_branch"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+
+    assert tbl.metadata.current_snapshot_id is not None
+
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
+    tbl.overwrite(arrow_table_with_null, branch=branch)
+
+    assert len(tbl.scan().use_ref(branch).to_arrow()) == 3
+    assert len(tbl.scan().to_arrow()) == 3
+    branch_snapshot = tbl.metadata.snapshot_by_name(branch)
+    assert branch_snapshot is not None and branch_snapshot.parent_snapshot_id is not None
+    delete_snapshot = tbl.metadata.snapshot_by_id(branch_snapshot.parent_snapshot_id)
+    assert delete_snapshot is not None
+    main_snapshot = tbl.metadata.snapshot_by_name("main")
+    assert main_snapshot is not None
+    assert (
+        delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id
+    )  # Currently overwrite is a delete followed by an append operation

From ee591b476becc77193b8bb28b67d3d237da81131 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Wed, 16 Oct 2024 13:44:38 +0530
Subject: [PATCH 13/21] Added another integration test

---
 tests/integration/test_writes/test_writes.py | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index 39e7758461..c6b4b5257b 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -1522,3 +1522,28 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with
     assert (
         delete_snapshot.parent_snapshot_id == main_snapshot.snapshot_id
     )  # Currently overwrite is a delete followed by an append operation
+
+
+@pytest.mark.integration
+def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
+    identifier = "default.test_concurrent_branch_operations"
+    branch1 = "existing_branch_1"
+    branch2 = "existing_branch_2"
+
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+
+    assert tbl.metadata.current_snapshot_id is not None
+
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch1).commit()
+
+    tbl.delete("int = 9", branch=branch1)
+
+    tbl.append(arrow_table_with_null)
+
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch2).commit()
+
+    tbl.overwrite(arrow_table_with_null, branch=branch2)
+
+    assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2
+    assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3
+    assert len(tbl.scan().to_arrow()) == 6

From e81907d296f8fc2f2f9cc8917e67933120506b11 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Thu, 17 Oct 2024 01:59:16 +0530
Subject: [PATCH 14/21] Fixed bug: concurrent same name branch and tag writes

---
 pyiceberg/table/__init__.py        |  3 ++-
 pyiceberg/table/update/__init__.py |  3 +++
 pyiceberg/table/update/snapshot.py |  1 +
 tests/table/test_init.py           | 29 ++++++++++++++++++++++-------
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index d34f875b29..3c14c565d7 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -349,7 +349,7 @@ def set_ref_snapshot(
             ),
         )
 
-        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name),)
+        requirements = (AssertRefSnapshotId(snapshot_id=parent_snapshot_id, ref=ref_name, ref_type=type),)
         return self._apply(updates, requirements)
 
     def _set_ref_snapshot(
@@ -380,6 +380,7 @@ def _set_ref_snapshot(
             AssertRefSnapshotId(
                 snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
                 ref=ref_name,
+                ref_type=type,
             ),
         )
 
diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index 03b9855307..8c511b9763 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -592,6 +592,7 @@ class AssertRefSnapshotId(ValidatableTableRequirement):
 
     type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id")
     ref: str = Field(...)
+    ref_type: SnapshotRefType = Field(...)
     snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")
 
     def validate(self, base_metadata: Optional[TableMetadata]) -> None:
@@ -607,6 +608,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None:
                 raise CommitFailedException(
                     f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
                 )
+            elif ref_type != self.ref_type:
+                raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}")
         elif self.snapshot_id is not None:
             raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")
 
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index b3ef552c55..638f3d4d7b 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -287,6 +287,7 @@ def _commit(self) -> UpdatesAndRequirements:
                     if self._branch in self._transaction.table_metadata.refs
                     else self._transaction.table_metadata.current_snapshot_id,
                     ref=self._branch,
+                    ref_type=SnapshotRefType.BRANCH,
                 ),
             ),
         )
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 1c4029a292..d0ae5e8da4 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -49,7 +49,7 @@
     _match_deletes_to_data_file,
 )
 from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
-from pyiceberg.table.refs import SnapshotRef
+from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     MetadataLogEntry,
     Operation,
@@ -982,28 +982,43 @@ def test_assert_table_uuid(table_v2: Table) -> None:
 
 def test_assert_ref_snapshot_id(table_v2: Table) -> None:
     base_metadata = table_v2.metadata
-    AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata)
+    AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate(
+        base_metadata
+    )
 
     with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
-        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None)
+        AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch main was created concurrently",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata)
+        AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata)
+        AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+
+    with pytest.raises(
+        CommitFailedException,
+        match="Requirement failed: branch or tag not_exist_branch is missing, expected 1",
+    ):
+        AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+
+    with pytest.raises(
+        CommitFailedException,
+        match="Requirement failed: branch or tag not_exist_tag is missing, expected 1",
+    ):
+        AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata)
 
+    # existing Tag in metadata: test
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: branch or tag not_exist is missing, expected 1",
+        match="Requirement failed: tag test can't be changed to type branch",
     ):
-        AssertRefSnapshotId(ref="not_exist", snapshot_id=1).validate(base_metadata)
+        AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
 
 
 def test_assert_last_assigned_field_id(table_v2: Table) -> None:

From bc6fb6883c1da195d40acded2c9d1684d2444975 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Thu, 14 Nov 2024 06:28:13 +0530
Subject: [PATCH 15/21] Added integration tests with spark

---
 tests/integration/test_writes/test_writes.py | 60 +++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index bdfc5f124f..35e2ccf668 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -1648,7 +1648,7 @@ def test_overwrite_to_existing_branch(session_catalog: Catalog, arrow_table_with
 
 @pytest.mark.integration
 def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
-    identifier = "default.test_concurrent_branch_operations"
+    identifier = "default.test_intertwined_branch_operations"
     branch1 = "existing_branch_1"
     branch2 = "existing_branch_2"
 
@@ -1669,3 +1669,61 @@ def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_nu
     assert len(tbl.scan().use_ref(branch1).to_arrow()) == 2
     assert len(tbl.scan().use_ref(branch2).to_arrow()) == 3
     assert len(tbl.scan().to_arrow()) == 6
+
+
+@pytest.mark.integration
+def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None:
+    # Intialize table with branch
+    identifier = "default.test_branch_spark_write_py_read"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+    branch = "existing_spark_branch"
+
+    # Create branch in Spark
+    spark.sql(f"ALTER TABLE {identifier} CREATE BRANCH {branch}")
+
+    # Spark Write
+    spark.sql(
+        f"""
+            DELETE FROM {identifier}.branch_{branch}
+            WHERE int = 9
+        """
+    )
+
+    # Refresh table to get new refs
+    tbl.refresh()
+
+    # Python Read
+    assert len(tbl.scan().to_arrow()) == 3
+    assert len(tbl.scan().use_ref(branch).to_arrow()) == 2
+
+
+@pytest.mark.integration
+def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None:
+    # Intialize table with branch
+    identifier = "default.test_branch_py_write_spark_read"
+    tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
+    branch = "existing_py_branch"
+
+    assert tbl.metadata.current_snapshot_id is not None
+
+    # Create branch
+    tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()
+
+    # Python Write
+    tbl.delete("int = 9", branch=branch)
+
+    # Spark Read
+    main_df = spark.sql(
+        f"""
+            SELECT *
+            FROM {identifier}
+        """
+    )
+    branch_df = spark.sql(
+        f"""
+            SELECT *
+            FROM {identifier}.branch_{branch}
+        """
+    )
+    assert main_df.count() == 3
+    assert branch_df.count() == 2

From 82e65e155aeda7f49395d381546512032e3f6a35 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 01:47:55 +0530
Subject: [PATCH 16/21] Fixed comments for AssertSnapshotRef

---
 pyiceberg/table/__init__.py                  |  1 -
 pyiceberg/table/update/__init__.py           | 10 ++++++----
 pyiceberg/table/update/snapshot.py           |  1 -
 tests/integration/test_writes/test_writes.py |  4 ++--
 tests/table/test_init.py                     | 20 +++++++++-----------
 5 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 4913ac0baf..a0f420c2eb 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -341,7 +341,6 @@ def _set_ref_snapshot(
             AssertRefSnapshotId(
                 snapshot_id=self.table_metadata.refs[ref_name].snapshot_id if ref_name in self.table_metadata.refs else None,
                 ref=ref_name,
-                ref_type=type,
             ),
         )
 
diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index de54eded3e..9dfe28acb1 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -609,14 +609,16 @@ class AssertRefSnapshotId(ValidatableTableRequirement):
 
     type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id")
     ref: str = Field(...)
-    ref_type: SnapshotRefType = Field(...)
+    # ref_type: SnapshotRefType = Field(...)
     snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")
 
     def validate(self, base_metadata: Optional[TableMetadata]) -> None:
         if base_metadata is None:
             raise CommitFailedException("Requirement failed: current table metadata is missing")
         elif len(base_metadata.snapshots) == 0 and self.ref != MAIN_BRANCH:
-            raise CommitFailedException(f"Requirement failed: No snapshot available in table for ref: {self.ref}")
+            raise CommitFailedException(
+                f"Requirement failed: Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."
+            )
         elif snapshot_ref := base_metadata.refs.get(self.ref):
             ref_type = snapshot_ref.snapshot_ref_type
             if self.snapshot_id is None:
@@ -625,8 +627,8 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None:
                 raise CommitFailedException(
                     f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
                 )
-            elif ref_type != self.ref_type:
-                raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} can't be changed to type {self.ref_type}")
+            elif ref_type == SnapshotRefType.TAG:
+                raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created")
         elif self.snapshot_id is not None:
             raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")
 
diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py
index 638f3d4d7b..b3ef552c55 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -287,7 +287,6 @@ def _commit(self) -> UpdatesAndRequirements:
                     if self._branch in self._transaction.table_metadata.refs
                     else self._transaction.table_metadata.current_snapshot_id,
                     ref=self._branch,
-                    ref_type=SnapshotRefType.BRANCH,
                 ),
             ),
         )
diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index 35e2ccf668..2624fcd6bb 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -44,7 +44,7 @@
 from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties
+from pyiceberg.table import TableProperties, MAIN_BRANCH
 from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
 from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
 from pyiceberg.types import (
@@ -1578,7 +1578,7 @@ def test_abort_table_transaction_on_exception(
 def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
     identifier = "default.test_non_existing_branch"
     tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [])
-    with pytest.raises(CommitFailedException, match="No snapshot available in table for ref:"):
+    with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."):
         tbl.append(arrow_table_with_null, branch="non_existing_branch")
 
 
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index d0ae5e8da4..78b3d31aac 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -49,7 +49,7 @@
     _match_deletes_to_data_file,
 )
 from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
-from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
+from pyiceberg.table.refs import SnapshotRef
 from pyiceberg.table.snapshots import (
     MetadataLogEntry,
     Operation,
@@ -982,43 +982,41 @@ def test_assert_table_uuid(table_v2: Table) -> None:
 
 def test_assert_ref_snapshot_id(table_v2: Table) -> None:
     base_metadata = table_v2.metadata
-    AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id, ref_type=SnapshotRefType.BRANCH).validate(
-        base_metadata
-    )
+    AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata)
 
     with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
-        AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(None)
+        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch main was created concurrently",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=None, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+        AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch or tag not_exist_branch is missing, expected 1",
     ):
-        AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+        AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata)
 
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: branch or tag not_exist_tag is missing, expected 1",
     ):
-        AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1, ref_type=SnapshotRefType.TAG).validate(base_metadata)
+        AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata)
 
     # existing Tag in metadata: test
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: tag test can't be changed to type branch",
+        match="Requirement failed: TAG test can't be updated once created",
     ):
-        AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004, ref_type=SnapshotRefType.BRANCH).validate(base_metadata)
+        AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata)
 
 
 def test_assert_last_assigned_field_id(table_v2: Table) -> None:

From 82e5b906206ab466775cbd44e5c2a7be66c922d1 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 02:08:22 +0530
Subject: [PATCH 17/21] Fixed comments and linter issues

---
 tests/integration/test_writes/test_writes.py | 7 +++++--
 tests/table/test_init.py                     | 6 +++++-
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py
index 2624fcd6bb..0af5b1e8a4 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -44,7 +44,8 @@
 from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties, MAIN_BRANCH
+from pyiceberg.table import TableProperties
+from pyiceberg.table.refs import MAIN_BRANCH
 from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
 from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
 from pyiceberg.types import (
@@ -1578,7 +1579,9 @@ def test_abort_table_transaction_on_exception(
 def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
     identifier = "default.test_non_existing_branch"
     tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [])
-    with pytest.raises(CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."):
+    with pytest.raises(
+        CommitFailedException, match=f"Table has no snapshots and can only be written to the {MAIN_BRANCH} BRANCH."
+    ):
         tbl.append(arrow_table_with_null, branch="non_existing_branch")
 
 
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 78b3d31aac..7a416ac83c 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -49,7 +49,7 @@
     _match_deletes_to_data_file,
 )
 from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
-from pyiceberg.table.refs import SnapshotRef
+from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     MetadataLogEntry,
     Operation,
@@ -1012,6 +1012,10 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None:
         AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata)
 
     # existing Tag in metadata: test
+    ref_tag = table_v2.refs().get("test")
+    assert ref_tag is not None
+    assert ref_tag.snapshot_ref_type == SnapshotRefType.TAG, "TAG test should be present in table to be tested"
+
     with pytest.raises(
         CommitFailedException,
         match="Requirement failed: TAG test can't be updated once created",

From 84d0971a100e66a6fd8118806498ab21e397dc6c Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 02:11:55 +0530
Subject: [PATCH 18/21] Fixed comments

---
 pyiceberg/table/__init__.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index a0f420c2eb..b90936c532 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -988,7 +988,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
         Args:
             df: The Arrow dataframe that will be appended to overwrite the table
             snapshot_properties: Custom properties to be added to the snapshot summary
-            branch: Branch Reference to run the delete operation
+            branch: Branch Reference to run the append operation
         """
         with self.transaction() as tx:
             tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
@@ -1014,7 +1014,7 @@ def overwrite(
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial overwrite
             snapshot_properties: Custom properties to be added to the snapshot summary
-            branch: Branch Reference to run the delete operation
+            branch: Branch Reference to run the overwrite operation
         """
         with self.transaction() as tx:
             tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)

From 3efe53cc35fe761c2e329599823a8b3accd4b327 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 02:22:28 +0530
Subject: [PATCH 19/21] Fixed comments

---
 tests/table/test_init.py | 25 +++++++++++--------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 7a416ac83c..42c066ef01 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -49,7 +49,7 @@
     _match_deletes_to_data_file,
 )
 from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadataUtil, TableMetadataV2, _generate_snapshot_id
-from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
+from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
 from pyiceberg.table.snapshots import (
     MetadataLogEntry,
     Operation,
@@ -982,34 +982,31 @@ def test_assert_table_uuid(table_v2: Table) -> None:
 
 def test_assert_ref_snapshot_id(table_v2: Table) -> None:
     base_metadata = table_v2.metadata
-    AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata)
+    AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata)
 
     with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
-        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None)
+        AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(None)
 
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: branch main was created concurrently",
+        match=f"Requirement failed: branch {MAIN_BRANCH} was created concurrently",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata)
+        AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=None).validate(base_metadata)
 
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004",
+        match=f"Requirement failed: branch {MAIN_BRANCH} has changed: expected id 1, found 3055729675574597004",
     ):
-        AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata)
+        AssertRefSnapshotId(ref=MAIN_BRANCH, snapshot_id=1).validate(base_metadata)
 
-    with pytest.raises(
-        CommitFailedException,
-        match="Requirement failed: branch or tag not_exist_branch is missing, expected 1",
-    ):
-        AssertRefSnapshotId(ref="not_exist_branch", snapshot_id=1).validate(base_metadata)
+    non_existing_ref = "not_exist_branch_or_tag"
+    assert table_v2.refs().get("not_exist_branch_or_tag") is None
 
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: branch or tag not_exist_tag is missing, expected 1",
+        match=f"Requirement failed: branch or tag {non_existing_ref} is missing, expected 1",
     ):
-        AssertRefSnapshotId(ref="not_exist_tag", snapshot_id=1).validate(base_metadata)
+        AssertRefSnapshotId(ref=non_existing_ref, snapshot_id=1).validate(base_metadata)
 
     # existing Tag in metadata: test
     ref_tag = table_v2.refs().get("test")

From dfedc63c04328d89fb61b0dcb5799ad39a94c4ff Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 05:42:32 +0530
Subject: [PATCH 20/21] Fixed a bug in tests

---
 pyiceberg/table/update/__init__.py | 5 ++---
 tests/table/test_init.py           | 2 +-
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index 9dfe28acb1..671b3b9e94 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -609,7 +609,6 @@ class AssertRefSnapshotId(ValidatableTableRequirement):
 
     type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id")
     ref: str = Field(...)
-    # ref_type: SnapshotRefType = Field(...)
     snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")
 
     def validate(self, base_metadata: Optional[TableMetadata]) -> None:
@@ -624,11 +623,11 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None:
             if self.snapshot_id is None:
                 raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently")
             elif self.snapshot_id != snapshot_ref.snapshot_id:
+                if ref_type == SnapshotRefType.TAG:
+                    raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created")
                 raise CommitFailedException(
                     f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
                 )
-            elif ref_type == SnapshotRefType.TAG:
-                raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created")
         elif self.snapshot_id is not None:
             raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")
 
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 42c066ef01..89e0577ae8 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -1017,7 +1017,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None:
         CommitFailedException,
         match="Requirement failed: TAG test can't be updated once created",
     ):
-        AssertRefSnapshotId(ref="test", snapshot_id=3051729675574597004).validate(base_metadata)
+        AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata)
 
 
 def test_assert_last_assigned_field_id(table_v2: Table) -> None:

From 076a6d5c040ca19cdc58949e7c533418211dc027 Mon Sep 17 00:00:00 2001
From: Vinayak Jaiswal <vinayakjaiswal01@gmail.com>
Date: Mon, 24 Feb 2025 05:55:59 +0530
Subject: [PATCH 21/21] Fixed some more tests

---
 pyiceberg/table/update/__init__.py | 2 --
 tests/table/test_init.py           | 2 +-
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py
index 671b3b9e94..249de747de 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -623,8 +623,6 @@ def validate(self, base_metadata: Optional[TableMetadata]) -> None:
             if self.snapshot_id is None:
                 raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently")
             elif self.snapshot_id != snapshot_ref.snapshot_id:
-                if ref_type == SnapshotRefType.TAG:
-                    raise CommitFailedException(f"Requirement failed: TAG {self.ref} can't be updated once created")
                 raise CommitFailedException(
                     f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
                 )
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 89e0577ae8..15f27aba61 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -1015,7 +1015,7 @@ def test_assert_ref_snapshot_id(table_v2: Table) -> None:
 
     with pytest.raises(
         CommitFailedException,
-        match="Requirement failed: TAG test can't be updated once created",
+        match="Requirement failed: tag test has changed: expected id 3055729675574597004, found 3051729675574597004",
     ):
         AssertRefSnapshotId(ref="test", snapshot_id=3055729675574597004).validate(base_metadata)