## Reference Material

[EMR Iceberg Documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html)


## Configuration

Before running the code in the cell(s) below double check that the EMR cluster attached to this notebook was created with the following in it's software settings

```[{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}]```

Update the S3 path in the ``` "spark.sql.catalog.dev.warehouse": ``` part of the configuration

In [1]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.type":"hadoop",
    "spark.sql.catalog.dev.warehouse":"s3://<<update-s3-bucket>>/iceberg/"
    }
}

Set a variable equal to the name of the S3 bucket to read / write from

In [2]:
s3_bucket_name = "<<update-s3-bucket>>"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1679448305701_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create an Iceberg Table

In [4]:
# Drop the table if you need to ...
# spark.sql(" DROP TABLE dev.db.iceberg_table ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Create an Iceberg table
spark.sql(" CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, name string, create_date string, last_update_time string) USING iceberg LOCATION 's3://" + s3_bucket_name + "/iceberg/db/iceberg_table' ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [5]:
# List all spark SQL tables
spark.sql(" SHOW TABLES IN dev.db ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------+
|namespace|    tableName|
+---------+-------------+
|       db|iceberg_table|
+---------+-------------+

## Create a DataFrame

In [6]:
data = [
        (1, "Chris", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (2, "Will", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (3, "Emma", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (4, "John", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (6, "Adam", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)    
])

inputDF = spark.createDataFrame(data=data,schema=schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write Data to Iceberg Table

In [7]:
inputDF.writeTo("dev.db.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Data from Iceberg Table

In [8]:
spark.sql(" SELECT * FROM dev.db.iceberg_table ").show() 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+

## Update and Delete Data to Iceberg Table

In [9]:
data = [
        (1, "Christopher", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (3, "Emmeline", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "delete"),
        (7, "Bill", "2020-01-02", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "append")
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False),
        StructField("change_type", StringType(), False)
])

mergeDF = spark.createDataFrame(data=data,schema=schema)

mergeDF.createOrReplaceTempView("mergeTable")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# spark.sql(" DROP TABLE mergeTable ")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
spark.sql(""" 
    MERGE INTO 
        dev.db.iceberg_table t 
    USING 
        (SELECT * FROM mergeTable) s 
    ON 
        t.id = s.id
    WHEN MATCHED AND s.change_type = 'update' THEN UPDATE SET t.name = s.name, t.last_update_time = s.last_update_time 
    WHEN MATCHED AND s.change_type = 'delete' THEN DELETE
""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [11]:
spark.sql(" SELECT * FROM dev.db.iceberg_table ORDER BY id ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+-----------+-------------------+
| id|       name|create_date|   last_update_time|
+---+-----------+-----------+-------------------+
|  1|Christopher| 2020-01-01|2020-01-02 00:00:00|
|  2|       Will| 2020-01-01|2020-01-01 00:00:00|
|  3|   Emmeline| 2020-01-01|2020-01-02 00:00:00|
|  4|       John| 2020-01-01|2020-01-01 00:00:00|
|  6|       Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----------+-----------+-------------------+

## Snapshots

In [14]:
spark.sql(" SELECT * FROM dev.db.iceberg_table.snapshots ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2023-03-22 15:21:...|5920941198207654327|               null|   append|s3://iceberg-woek...|{spark.app.id -> ...|
|2023-03-22 15:23:...|9217764525599659710|5920941198207654327|overwrite|s3://iceberg-woek...|{spark.app.id -> ...|
|2023-03-22 15:24:...|1815855305192887753|9217764525599659710|   append|s3://iceberg-woek...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+