![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("My Test").getOrCreate()

spark

To be able to rerun the notebook several times, lets drop the table if it exists to start fresh.

In [5]:
%%sql

CREATE DATABASE IF NOT EXISTS working

25/01/05 11:55:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
%%sql

DROP TABLE IF EXISTS working.emp_hours_week

In [7]:
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField

schema = StructType([
    StructField("employee-name", StringType(), True),
    StructField("working-hours", DoubleType(), True),
    StructField("KW", IntegerType(), True)
])

df = spark.read.option("header", True)\
.option("delimiter", ";")\
.schema(schema)\
.csv("/home/iceberg/raw/data.csv")

# create table with data
df.write.saveAsTable("working.emp_hours_week")
# df.show()

                                                                                

In [8]:
%%sql

DESCRIBE EXTENDED working.emp_hours_week

col_name,data_type,comment
employee-name,string,
working-hours,double,
KW,int,
,,
# Metadata Columns,,
_spec_id,int,
_partition,struct<>,
_file,string,
_pos,bigint,
_deleted,boolean,


In [12]:
%%sql

SELECT `employee-name`, sum(`working-hours`)
FROM working.emp_hours_week
GROUP BY `employee-name` 

employee-name,sum(working-hours)
Stefan,129.0
Gerda,67.5
Hans,100.0
Torsten,153.69


In [13]:
%%sql

ALTER TABLE working.emp_hours_week ADD COLUMN good_one BOOLEAN DEFAULT false

In [14]:
%%sql
    
UPDATE working.emp_hours_week
SET good_one = true
WHERE `working-hours` > 40

In [16]:
%%sql

SELECT `employee-name`, sum(`working-hours`)
FROM working.emp_hours_week
WHERE good_one = true
GROUP BY `employee-name` 

employee-name,sum(working-hours)
Stefan,90.5
Hans,100.0
Torsten,124.79


In [17]:
%%sql
    
--get files meta data
SELECT count(*) FROM working.emp_hours_week.files

count(1)
1


In [18]:
%%sql
INSERT INTO working.emp_hours_week
(`employee-name`, `working-hours`, kw, good_one) values('Linda', 53.6, 1, true), ('Linda', 49.1, 2, true), ('Linda', 39.89, 3, false)

In [19]:
%%sql
--get files meta data
SELECT count(*) FROM working.emp_hours_week.files

count(1)
4


## Partitioning

Lets partition by KW (calendar week)

In [21]:
%%sql

ALTER TABLE working.emp_hours_week
ADD PARTITION FIELD KW

## Metadata Tables

Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table.

In [23]:
%%sql

SELECT snapshot_id, manifest_list
FROM working.emp_hours_week.snapshots

snapshot_id,manifest_list
8813771131025432153,s3://warehouse/working/emp_hours_week/metadata/snap-8813771131025432153-1-d7cbdbf4-302f-4faa-9324-998261ae71d6.avro
3632032178207886299,s3://warehouse/working/emp_hours_week/metadata/snap-3632032178207886299-1-22ca0a5a-8c8d-4788-97c8-24a506d7ad0d.avro
1099080088407263793,s3://warehouse/working/emp_hours_week/metadata/snap-1099080088407263793-1-d3fc9865-4237-4835-a562-83d8d8c38cde.avro


## Time Travel

The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table.

In [24]:
%%sql

SELECT *
FROM working.emp_hours_week.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2025-01-05 11:55:44.671000,8813771131025432153,,True
2025-01-05 11:59:03.624000,3632032178207886299,8.813771131025432e+18,True
2025-01-05 12:01:59.055000,1099080088407263793,3.6320321782078863e+18,True


You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID.

In [26]:
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL system.rollback_to_snapshot('working.emp_hours_week', {original_snapshot})")
original_snapshot

8813771131025432153

In [27]:
%%sql

SELECT * FROM working.emp_hours_week

employee-name,working-hours,KW,good_one
Stefan,42.5,1,
Stefan,38.5,2,
Stefan,48.0,3,
Torsten,57.25,1,
Torsten,67.54,2,
Torsten,28.9,3,
Gerda,20.0,1,
Gerda,25.0,2,
Gerda,22.5,3,
Hans,100.0,1,


Another look at the history table shows that the original state of the table has been added as a new entry
with the original snapshot ID.

In [28]:
%%sql

SELECT *
FROM working.emp_hours_week.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2025-01-05 11:55:44.671000,8813771131025432153,,True
2025-01-05 11:59:03.624000,3632032178207886299,8.813771131025432e+18,False
2025-01-05 12:01:59.055000,1099080088407263793,3.6320321782078863e+18,False
2025-01-05 12:36:01.064000,8813771131025432153,,True


In [None]:
%%sql

-- ToDo: revert Time Travel