# Iceberg Lab 
## Unit 7: Snapshot Management

In the previous unit, we -
1. Learned how to "Time Travel" in Iceberg Tables

In this unit, we will-
1. Learn about managing table snapshots 


### 1. Imports

In [1]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

import warnings
warnings.filterwarnings('ignore')

### 2. Create a Spark session powered by Cloud Dataproc 

In [2]:
spark = SparkSession.builder.appName('Loan Analysis').getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark

24/05/13 17:12:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### 3. Declare variables

In [3]:
project_id_output = !gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID = project_id_output[0]
print("PROJECT_ID: ", PROJECT_ID)

PROJECT_ID:  delta-lake-diy-lab


In [4]:
project_name_output = !gcloud projects describe $PROJECT_ID | grep name | cut -d':' -f2 | xargs
PROJECT_NAME = project_name_output[0]
print("PROJECT_NAME: ", PROJECT_NAME)

PROJECT_NAME:  delta-lake-diy-lab


In [5]:
project_number_output = !gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | xargs
PROJECT_NUMBER = project_number_output[0]
print("PROJECT_NUMBER: ", PROJECT_NUMBER)

PROJECT_NUMBER:  11002190840


In [24]:
DPMS_NAME=f"dll-hms-{PROJECT_NUMBER}"
LOCATION="us-central1"

metastore_dir = !gcloud metastore services describe $DPMS_NAME --location $LOCATION |grep 'hive.metastore.warehouse.dir'| cut -d':' -f2- | xargs 
HIVE_METASTORE_WAREHOUSE_DIR = metastore_dir[0]
print("HIVE_METASTORE_WAREHOUSE_DIR",HIVE_METASTORE_WAREHOUSE_DIR)

HIVE_METASTORE_WAREHOUSE_DIR gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse


In [7]:
TABLE_NAME="loans_by_state_iceberg"
DB_NAME="loan_db"

#fully qualified table name
FQTN=f"{DB_NAME}.{TABLE_NAME}"

print("Fully quailified table name :",FQTN)

Fully quailified table name : loan_db.loans_by_state_iceberg


### 4. Snapshot Management

In [8]:
#Listing currently available snapshots for the table

spark.table("loan_db.loans_by_state_iceberg.snapshots").show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-05-13 16:40:...|9176687385630465169|               null|   append|gs://gcs-bucket-d...|{spark.app.id -> ...|
|2024-05-13 16:48:...|5233765456160638845|9176687385630465169|overwrite|gs://gcs-bucket-d...|{spark.app.id -> ...|
|2024-05-13 16:50:...|7272046516454809414|5233765456160638845|   append|gs://gcs-bucket-d...|{spark.app.id -> ...|
|2024-05-13 16:51:...|6697245229602575300|7272046516454809414|overwrite|gs://gcs-bucket-d...|{spark.app.id -> ...|
|2024-05-13 16:52:...|1795098683837972174|6697245229602575300|overwrite|gs://gcs-bucket-d...|{spark.app.id -> ...|
|2024-05-13 16:54:...|4282559450411521188|1795098683837972174|overwrite|gs://gcs

                                                                                

In [10]:
#Checking the Table state with few sample records

#Fetch current values for 4 states
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)

+----------+----------+
|addr_state|loan_count|
+----------+----------+
|AZ        |11111     |
|CA        |11111     |
|IA        |11111     |
|IN        |11111     |
+----------+----------+



#### a. rollback_to_snapshot

In [11]:
#Fetch the 5th snapshot update for this example
ROLLBACK_SNAPSHOT_ID = spark.sql(f"SELECT snapshot_id FROM \
(SELECT snapshot_id, ROW_NUMBER() OVER(ORDER BY committed_at ASC) rownum FROM {FQTN}.snapshots) \
a where a.rownum =2").collect()[0][0]

print("ROLLBACK_SNAPSHOT_ID ", ROLLBACK_SNAPSHOT_ID)

24/05/13 17:12:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:12:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:12:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 3:>                                                          (0 + 1) / 1]

ROLLBACK_SNAPSHOT_ID  5233765456160638845


                                                                                

In [12]:
#build snapshot rollback statement

SNPSHT_RLBK_STMNT = f"CALL spark_catalog.system.rollback_to_snapshot('{FQTN}',{ROLLBACK_SNAPSHOT_ID})"
print(SNPSHT_RLBK_STMNT)

spark.sql(f"{SNPSHT_RLBK_STMNT}").show()

CALL spark_catalog.system.rollback_to_snapshot('loan_db.loans_by_state_iceberg',5233765456160638845)
+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 4282559450411521188|5233765456160638845|
+--------------------+-------------------+



In [13]:
#Table current state has been updated to an older snaphsot version
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


+----------+----------+
|addr_state|loan_count|
+----------+----------+
|CA        |62090     |
|IN        |7511      |
|IA        |1         |
+----------+----------+



#### b. rollback_to_timestamp

In [14]:
#Fetch a specific timestamp to revert table state to

timestamp_val = spark.sql("select committed_at from (SELECT committed_at, ROW_NUMBER() OVER(ORDER BY committed_at ASC) rownum from loan_db.loans_by_state_iceberg.snapshots) a where a.rownum =3").collect()[0][0]
print("Rolling back to timestamp", timestamp_val)

24/05/13 17:13:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:13:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:13:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Rolling back to timestamp 2024-05-13 16:50:19.235000


In [16]:
tmstmp_rlbk_stmnt = f"CALL spark_catalog.system.rollback_to_timestamp('loan_db.loans_by_state_iceberg',TIMESTAMP '{timestamp_val}')"
print(tmstmp_rlbk_stmnt)

spark.sql(tmstmp_rlbk_stmnt).show()

CALL spark_catalog.system.rollback_to_timestamp('loan_db.loans_by_state_iceberg',TIMESTAMP '2024-05-13 16:50:19.235000')
+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 5233765456160638845|5233765456160638845|
+--------------------+-------------------+



In [17]:
#Table current state
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)

+----------+----------+
|addr_state|loan_count|
+----------+----------+
|CA        |62090     |
|IN        |7511      |
|IA        |1         |
+----------+----------+



**NOTE:**


Both rollback_to_snapshot and rollback_to_timestamp can only switch the snapshot if the updating snapshot_id is an ancestor of the current snapshot (older than current_snapshot). 


In [18]:
#Fetching a younger snapshot which is not an ancestor of the current snapshot

newer_snpsht_id = spark.sql("select snapshot_id from (SELECT snapshot_id, ROW_NUMBER() OVER(ORDER BY committed_at DESC) rownum from loan_db.loans_by_state_iceberg.snapshots) a where a.rownum =1").collect()[0][0]
print("Rolling back to snapshot id ", newer_snpsht_id)

Rolling back to snapshot id  4282559450411521188


24/05/13 17:13:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:13:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/13 17:13:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [19]:
snpsht_rlbk_stmmt = f"CALL spark_catalog.system.rollback_to_snapshot('loan_db.loans_by_state_iceberg',{newer_snpsht_id})"
print(snpsht_rlbk_stmmt)

spark.sql(snpsht_rlbk_stmmt).show()

CALL spark_catalog.system.rollback_to_snapshot('loan_db.loans_by_state_iceberg',4282559450411521188)


Py4JJavaError: An error occurred while calling o88.sql.
: org.apache.iceberg.exceptions.ValidationException: Cannot roll back to snapshot, not an ancestor of the current state: 4282559450411521188
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.SetSnapshotOperation.rollbackTo(SetSnapshotOperation.java:84)
	at org.apache.iceberg.SnapshotManager.rollbackTo(SnapshotManager.java:67)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.lambda$call$0(RollbackToSnapshotProcedure.java:88)
	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
	at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:88)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.call(RollbackToSnapshotProcedure.java:83)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


As seen above we get the error _"Cannot roll back to snapshot, not an ancestor of the current state"_

We can workaround this problem using another procedure called **_set_current_snapshot_**

#### c. set_current_snapshot

In [20]:
set_snpsht_stmnt = f"CALL spark_catalog.system.set_current_snapshot('loan_db.loans_by_state_iceberg',{newer_snpsht_id})"
print(set_snpsht_stmnt)

spark.sql(set_snpsht_stmnt).show(truncate=False)

CALL spark_catalog.system.set_current_snapshot('loan_db.loans_by_state_iceberg',4282559450411521188)
+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
|5233765456160638845 |4282559450411521188|
+--------------------+-------------------+



In [21]:
#Table current state modified to newer_snapshot_id
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


[Stage 12:>                                                         (0 + 1) / 1]

+----------+----------+
|addr_state|loan_count|
+----------+----------+
|AZ        |11111     |
|CA        |11111     |
|IA        |11111     |
|IN        |11111     |
+----------+----------+



                                                                                

In [25]:
!gsutil ls -r  {HIVE_METASTORE_WAREHOUSE_DIR}/loan_db.db/{TABLE_NAME}/

gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/:

gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/:
gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-2-525c32e9-c437-4d4c-a7a0-c6fac91bba85-0-00001.parquet
gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-204-10f4d0c1-80ac-4e51-acf9-b918ef057a8d-0-00001.parquet
gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-409-0e90780c-f8a8-40ff-a14d-6defc2157af7-0-00001.parquet
gs://gcs-bucket-dll-hms-11002190840-e1664035-a2d9-4215-be46-45c40712/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-441-eb0fbb23-c499-40cf-9192-c8bc95b48657-0-00001.parquet
gs://gcs-buc

### THIS CONCLUDES THIS LAB. PROCEED TO THE NEXT NOTEBOOK.