In [25]:
# Define the AWS env variables if you are using AWS Auth:
%env AWS_REGION= region
%env AWS_ACCESS_KEY_ID= secret
%env AWS_SECRET_ACCESS_KEY= key

env: AWS_REGION=region
env: AWS_ACCESS_KEY_ID=secret
env: AWS_SECRET_ACCESS_KEY=key


In [2]:
import pyspark
from pyspark.sql import SparkSession
import os


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
    
    # first we will define the packages that we need. Iceberg Spark runtime
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        
    # This property allows us to add any extensions that we want to use
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
    
    # configures a new catalog to a particular implementation of SparkCatalog
        .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog')
    
    # particular type of catalog we are using
        .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog')
    
    # engine writes to the warehouse
        .set('spark.sql.catalog.glue.warehouse', 's3://bucket/warehouse/')
    
    # changes IO impl of catalog, mainly for changing writing data to object storage
        .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c16a9c54-1c5f-4638-b9ec-98dfe4333ea0;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.2.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found software.amazon.awssdk#annotations;2.17.178 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found software.amazon.awssdk#http-client-spi;2.17.178 in central
	found software

23/03/27 01:04:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/27 01:05:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark Running


In [3]:
# CREATE a new Iceberg table 'employees'
spark.sql(
    """CREATE TABLE IF NOT EXISTS glue.test.employees
            (id BIGINT, name STRING, role STRING, salary double) USING iceberg"""
)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


DataFrame[]

In [4]:
# INSERT some records
spark.sql("INSERT INTO glue.test.employees values (1, 'Harry', 'Software Engineer', 25000), (2, 'John', 'Marketing Ops', 17000)")

                                                                                

DataFrame[]

In [6]:
# CREATE a TAG to save this version of the table
spark.sql("ALTER TABLE glue.test.employees CREATE TAG june_data RETAIN 10 DAYS")

23/03/27 01:14:47 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


DataFrame[]

In [7]:
# INSERT some new records to the main table
spark.sql("INSERT INTO glue.test.employees values (3, 'Alan', 'Product Manager', 23000), (4, 'Mary', 'Designer', 20000)")

                                                                                

DataFrame[]

In [9]:
# SELECT data based on the tag
spark.sql("SELECT * FROM glue.test.employees VERSION AS OF 'june_data'").toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,1,Harry,Software Engineer,25000.0
1,2,John,Marketing Ops,17000.0


In [12]:
# SELECT data from the main table
spark.sql("SELECT * FROM glue.test.employees").toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,3,Alan,Product Manager,23000.0
1,4,Mary,Designer,20000.0
2,1,Harry,Software Engineer,25000.0
3,2,John,Marketing Ops,17000.0


In [16]:
# CREATE a new BRANCH at this stage
spark.sql("ALTER TABLE glue.test.employees CREATE BRANCH ML_exp")

23/03/27 16:28:30 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up


DataFrame[]

In [19]:
# Check the records in this new branch
spark.sql("SELECT * FROM glue.test.employees VERSION AS OF 'ML_exp'").toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,3,Alan,Product Manager,23000.0
1,4,Mary,Designer,20000.0
2,1,Harry,Software Engineer,25000.0
3,2,John,Marketing Ops,17000.0


In [13]:
# INSERT some new records as part of the experiment
schema = spark.table("glue.test.employees").schema
data = [
    (6, "Troy", "CMO", 30000.0),
    (7, "Raine", "UX", 21000.0),
    (8, "Harry", "QA", 22000.0)
  ]
df = spark.createDataFrame(data, schema)

In [14]:
df.toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,6,Troy,CMO,30000.0
1,7,Raine,UX,21000.0
2,8,Harry,QA,22000.0


In [15]:
df.write.format("iceberg").option("branch", "ML_exp").mode("append").save("glue.test.employees")

                                                                                

In [21]:
# Check the records in the BRANCH after new records were ingested
spark.sql("SELECT * FROM glue.test.employees VERSION AS OF 'ML_exp'").toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,6,Troy,CMO,30000.0
1,7,Raine,UX,21000.0
2,8,Harry,QA,22000.0
3,3,Alan,Product Manager,23000.0
4,1,Harry,Software Engineer,25000.0
5,2,John,Marketing Ops,17000.0
6,4,Mary,Designer,20000.0


In [22]:
# Check the main table to ensure there are no IMPACTS
spark.sql("SELECT * FROM glue.test.employees").toPandas()

                                                                                

Unnamed: 0,id,name,role,salary
0,3,Alan,Product Manager,23000.0
1,1,Harry,Software Engineer,25000.0
2,2,John,Marketing Ops,17000.0
3,4,Mary,Designer,20000.0


In [24]:
# Metadata table to check all branches & tags in this table
spark.sql("SELECT * FROM glue.test.employees.refs").toPandas()

Unnamed: 0,name,type,snapshot_id,max_reference_age_in_ms,min_snapshots_to_keep,max_snapshot_age_in_ms
0,ML_new,BRANCH,3445466716202011002,,,
1,main,BRANCH,3445466716202011002,,,
2,ML_exp,BRANCH,7952319340804434585,,,
3,june_data,TAG,3107412191213040005,864000000.0,,


In [None]:
# DROP the branch if you want to discard
spark.sql("ALTER TABLE glue.test.employees DROP BRANCH ML_exp")