# Sample Iceberg Notebook

## Load Env. Varibles - AWS Keys

We use %%local to run code inside the local kernel, everything else goes through livy to run on the driver

In [1]:
%%local
from minio import Minio
from dotenv import dotenv_values
import os

config = dotenv_values("../.env")

In [2]:
# Imports
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read & Perform Basic Operations

In [3]:
# Read CSV from S3
wine_df = spark.read.csv("s3a://samples-csv-src/wine.csv", header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
%%pretty
wine_df.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rownames,country,alcohol,deaths,heart,liver
1,Australia,2.5,785,211,15.300000190734863
2,Austria,3.900000095367432,863,167,45.59999847412109


## Save Dataframe As Iceberg

Create a new bucket and save modified dataframe as parquet and a hive table

In [5]:
%%local
client = Minio("minio:10000", 
    access_key=config["AWS_ACCESS_KEY_ID"], 
    secret_key=config["AWS_SECRET_ACCESS_KEY"],
    secure=False
)

# Make bucket
if not client.bucket_exists("samples-csv-pre"):
    client.make_bucket("samples-csv-pre")

print("Bucket Created: ", client.bucket_exists("samples-csv-pre"))

Bucket Created:  True


In [6]:
# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS samples_pre")

# Save df as hive table
(wine_df.write
    .format("iceberg")
    .mode("overwrite")
    .option("path", "s3a://samples-csv-pre/wine")
    .saveAsTable("samples_pre.wine")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Iceberg Table

In [7]:
wine_ice_df = spark.read.format("iceberg").load("samples_pre.wine")
wine_hive_df = spark.sql("SELECT * FROM samples_pre.wine")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
%%pretty
wine_ice_df.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rownames,country,alcohol,deaths,heart,liver
1,Australia,2.5,785,211,15.300000190734863
2,Austria,3.900000095367432,863,167,45.59999847412109


In [9]:
%%pretty
wine_hive_df.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rownames,country,alcohol,deaths,heart,liver
1,Australia,2.5,785,211,15.300000190734863
2,Austria,3.900000095367432,863,167,45.59999847412109


## Alter Schema

In [10]:
%%sql
ALTER TABLE samples_pre.wine ADD COLUMNS (
    new_col STRING COMMENT 'Yaaaay'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [11]:
spark.read.format("iceberg").load("samples_pre.wine").printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- rownames: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- heart: integer (nullable = true)
 |-- liver: double (nullable = true)
 |-- new_col: string (nullable = true)

No rewrite has been performed: only one snapshot_id

In [12]:
%%sql
SELECT * FROM samples_pre.wine.snapshots

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## ACID Transactions

In [13]:
%%sql
UPDATE samples_pre.wine SET new_col = 'foo';

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [14]:
%%sql
INSERT INTO samples_pre.wine VALUES (777, NULL, NULL, NULL, NULL, NULL, NULL);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [15]:
%%sql
DELETE FROM samples_pre.wine WHERE rownames = 1;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [16]:
%%sql
SELECT * FROM samples_pre.wine WHERE rownames IN (1, 777)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [17]:
%%sql
SELECT * FROM samples_pre.wine.snapshots

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Time Travel

In [20]:
%%sql
SELECT * FROM samples_pre.wine VERSION AS OF {YOUR SNAPSHOT ID} LIMIT 5;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [21]:
spark.read.option("snapshot-id", {YOUR SNAPSHOT ID}).format("iceberg").load("samples_pre.wine").printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- rownames: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- heart: integer (nullable = true)
 |-- liver: double (nullable = true)