## Reference Material

[EMR Iceberg Documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html)


## Configuration

Before running the code in the cell(s) below double check that the EMR cluster attached to this notebook was created with the following in it's software settings

```[{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}]```

Update the S3 path in the ``` "spark.sql.catalog.dev.warehouse": ``` part of the configuration

In [None]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.type":"hadoop",
    "spark.sql.catalog.dev.warehouse":"s3://emr-studio-demo-s3bucket-g3frvpeeuanh/iceberg/"
    }
}

Set a variable equal to the name of the S3 bucket to read / write from

In [2]:
s3_bucket_name = "emr-studio-demo-s3bucket-g3frvpeeuanh"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1666181830864_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Jupyter SQL Cell Magic

the interface for Iceberg is primarly Spark SQL. When executing Spark SQL in a Jupyter notebook you can add ```%%sql``` to the top of the a cell and write SQL code. See the example below

In [4]:
%%sql

SHOW DATABASES

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

If you do not want to use the ```%%sql``` you can also issue SQL commands via. Pyspark. See the example below

In [5]:
spark.sql(" SHOW DATABASES ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|namespace|
+---------+
|  default|
+---------+

In the notebook below, you will use a mix of pyspark and ```%%sql```

## Create an Iceberg Table

Drop table if neccssary

In [6]:
# spark.sql(" DROP TABLE mergeTable ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create an Iceberg table

In [7]:
%%sql

CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (
    id string,
    name string,
    create_date string,
    last_update_time string
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

List all spark SQL databases

In [8]:
%%sql

SHOW DATABASES IN dev

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

List all spark SQL tables

In [9]:
%%sql

SHOW TABLES IN dev.db

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Create a DataFrame

In [10]:
data = [
        (1, "Chris", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (2, "Will", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (3, "Emma", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (4, "John", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (6, "Adam", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)    
])

inputDF = spark.createDataFrame(data=data,schema=schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write Data to Iceberg Table

In [11]:
inputDF.writeTo("dev.db.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Data from Iceberg Table

In [12]:
%%sql

SELECT * FROM dev.db.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Update and Delete Data to Iceberg Table

In [13]:
data = [
        (1, "Christopher", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (3, "Emmeline", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "delete"),
        (7, "Bill", "2020-01-02", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "append")
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False),
        StructField("change_type", StringType(), False)
])

mergeDF = spark.createDataFrame(data=data,schema=schema)

mergeDF.createOrReplaceTempView("mergeTable")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# spark.sql(" DROP TABLE mergeTable ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
spark.sql(""" 
    MERGE INTO 
        dev.db.iceberg_table t 
    USING 
        (SELECT * FROM mergeTable) s 
    ON 
        t.id = s.id
    WHEN MATCHED AND s.change_type = 'update' THEN UPDATE SET t.name = s.name, t.last_update_time = s.last_update_time 
    WHEN MATCHED AND s.change_type = 'delete' THEN DELETE
""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [16]:
%%sql

SELECT * FROM dev.db.iceberg_table ORDER BY id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Snapshots

In [17]:
%%sql

SELECT * FROM dev.db.iceberg_table.snapshots

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [18]:
# Add a new record
data = [
        (7, "Bill", "2020-01-02", datetime.strptime('2020-01-03 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)    
])

appendDF = spark.createDataFrame(data=data,schema=schema)

appendDF.writeTo("dev.db.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Time Travel

Query current table as a point of comparison

In [19]:
%%sql

SELECT * FROM dev.db.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [21]:
%%sql

SELECT * FROM dev.db.iceberg_table TIMESTAMP AS OF '2022-10-19 12:57:54.19'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [22]:
%%sql

SELECT * FROM dev.db.iceberg_table VERSION AS OF 5971921301147194984

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()