In [1]:
import pyspark
from pyspark.sql import SparkSession
import os


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
    
    # first we will define the packages that we need. Iceberg Spark runtime
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        
    # This property allows us to add any extensions that we want to use
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    
    # configures a new catalog to a particular implementation of SparkCatalog
        .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog')
    
    # particular type of catalog we are using
        .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog')
    
    # engine writes to the warehouse
        .set('spark.sql.catalog.glue.warehouse', 's3://ids-sample-iceberg-datasets/sampledb/')
    
    # changes IO impl of catalog, mainly for changing writing data to object storage
        .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Spark Running


In [2]:
# CREATE a new Iceberg table 'employees'
spark.sql(
    """CREATE TABLE IF NOT EXISTS glue.sampledb.employees
            (id BIGINT, name STRING, role STRING, salary double, some_feature STRING) USING iceberg"""
)

DataFrame[]

In [3]:
# INSERT some records
spark.sql("INSERT INTO glue.sampledb.employees values (1, 'Steve', 'Clerk', 22000, 20), (2, 'Gary', 'Sales', 18000, 10)")

DataFrame[]

In [4]:
spark.sql("SELECT * FROM glue.sampledb.employees").toPandas()

Unnamed: 0,id,name,role,salary,some_feature
0,1,Steve,Clerk,22000.0,20
1,2,Gary,Sales,18000.0,10


In [5]:
# CREATE a new BRANCH at this stage
spark.sql("ALTER TABLE glue.sampledb.employees CREATE BRANCH bobs_branch")

DataFrame[]

In [6]:
spark.sql("SELECT * FROM glue.sampledb.employees.refs").toPandas()

Unnamed: 0,name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep,max_snapshot_age_in_ms
0,main,BRANCH,6403931554192988008,,,
1,bobs_branch,BRANCH,6403931554192988008,,,


In [7]:
df = spark.sql("SELECT * FROM glue.sampledb.employees")
df.withColumn('some_feature', df.some_feature * 100).write.format("iceberg").option("branch", "bobs_branch").mode("overwrite").save("glue.sampledb.employees")

# Check the records in the BRANCH after records were updated
spark.sql("SELECT * FROM glue.sampledb.employees VERSION AS OF 'bobs_branch'").toPandas()

Unnamed: 0,id,name,role,salary,some_feature
0,1,Steve,Clerk,22000.0,2000.0
1,2,Gary,Sales,18000.0,1000.0


In [8]:
spark.sql("SELECT * FROM glue.sampledb.employees").toPandas()

Unnamed: 0,id,name,role,salary,some_feature
0,1,Steve,Clerk,22000.0,20
1,2,Gary,Sales,18000.0,10


In [9]:
# DROP the branch 
spark.sql("ALTER TABLE glue.sampledb.employees DROP BRANCH bobs_branch")
spark.sql("SELECT * FROM glue.sampledb.employees.refs").toPandas()

Unnamed: 0,name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep,max_snapshot_age_in_ms
0,main,BRANCH,6403931554192988008,,,


In [32]:
spark.sql("ALTER TABLE glue.sampledb.employees SET TBLPROPERTIES ('write.wap.enabled'='true')")

DataFrame[]

In [33]:
spark.sql("SET spark.wap.branch = bobs_branch")

DataFrame[key: string, value: string]

In [36]:
spark.sql("SET spark.wap.branch = bobs_branch; CALL glue.system.fast_forward('sampledb.employees', 'main', 'bobs_branch');")

ParseException: 
 Expected format is 'SET', 'SET key', or 'SET key=value'. If you want to include special characters in key, or include semicolon in value, please use quotes, e.g., SET `ke y`=`v;alue`.        (line 1, pos 0)

== SQL ==
SET spark.wap.branch = bobs_branch; CALL glue.system.fast_forward('sampledb.employees', 'main', 'bobs_branch');
^^^


In [28]:

spark.sql("CALL glue.system.rollback_to_snapshot('sampledb.employees', 3716474338610219678)")

Py4JJavaError: An error occurred while calling o39.sql.
: org.apache.iceberg.exceptions.ValidationException: Cannot roll back to snapshot, not an ancestor of the current state: 3716474338610219678
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.SetSnapshotOperation.rollbackTo(SetSnapshotOperation.java:84)
	at org.apache.iceberg.SnapshotManager.rollbackTo(SnapshotManager.java:67)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.lambda$call$0(RollbackToSnapshotProcedure.java:88)
	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:104)
	at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:85)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.call(RollbackToSnapshotProcedure.java:83)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
	at jdk.internal.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
