### Install the iceberg jar and pip install pyspark

In [1]:
!pip install pyspark
!wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.3.1/iceberg-spark-runtime-3.4_2.12-1.3.1.jar


--2025-05-06 08:09:38--  https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.3.1/iceberg-spark-runtime-3.4_2.12-1.3.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28536568 (27M) [application/java-archive]
Saving to: ‘iceberg-spark-runtime-3.4_2.12-1.3.1.jar’


2025-05-06 08:09:39 (151 MB/s) - ‘iceberg-spark-runtime-3.4_2.12-1.3.1.jar’ saved [28536568/28536568]



#### Check the folder where iceberg jar has been configured

Initialize the spark

In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("IcebergDemo")
         .config("spark.jars", "/content/iceberg-spark-runtime-3.4_2.12-1.3.1.jar")
         .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
         .config("spark.sql.catalog.iceberg.type", "hadoop")
         .config("spark.sql.catalog.iceberg.warehouse", "/content/iceberg_warehouse")
         .getOrCreate())


### You are set to create the iceberg table and then check the files underligned

In [4]:
spark.sql("""
    CREATE TABLE iceberg.tutorial.employee (
        id INT,
        name STRING,
        department STRING,
        salary INT
    )
    USING iceberg
""")


DataFrame[]

### Here you will see only metadata folder as just DDL has been created and no data folder as the data isnt inserted so far

In [6]:
!ls -R /content/iceberg_warehouse/tutorial/employee/

/content/iceberg_warehouse/tutorial/employee/:
metadata

/content/iceberg_warehouse/tutorial/employee/metadata:
v1.metadata.json  version-hint.text


### Lets insert some data

In [7]:
from pyspark.sql import Row

data = [
    Row(id=1, name="Alice", department="HR", salary=5000),
    Row(id=2, name="Bob", department="Engineering", salary=7000),
    Row(id=3, name="Charlie", department="Marketing", salary=6000)
]

df = spark.createDataFrame(data)
df.writeTo("iceberg.tutorial.employee").append()


In [8]:
!ls -R /content/iceberg_warehouse/tutorial/employee/
## In this step you will see the data folder also being created

/content/iceberg_warehouse/tutorial/employee/:
data  metadata

/content/iceberg_warehouse/tutorial/employee/data:
00000-0-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet
00001-1-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet

/content/iceberg_warehouse/tutorial/employee/metadata:
d2c54ef6-ace4-46d9-982f-8a560fa04b02-m0.avro
snap-6599416823092334606-1-d2c54ef6-ace4-46d9-982f-8a560fa04b02.avro
v1.metadata.json
v2.metadata.json
version-hint.text


In [9]:
# Update salary
spark.sql("""
    UPDATE iceberg.tutorial.employee
    SET salary = 8000
    WHERE name = 'Bob'
""")



DataFrame[]

In [10]:
!ls -R /content/iceberg_warehouse/tutorial/employee/

### You will be seeing the metadata folder and data folder gets new files

/content/iceberg_warehouse/tutorial/employee/:
data  metadata

/content/iceberg_warehouse/tutorial/employee/data:
00000-0-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet
00000-5-49e0028a-9803-4673-bf5b-dd2adcdc3286-00001.parquet
00001-1-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet

/content/iceberg_warehouse/tutorial/employee/metadata:
29074bd8-df94-46fb-8510-271ea6d8c46a-m0.avro
29074bd8-df94-46fb-8510-271ea6d8c46a-m1.avro
d2c54ef6-ace4-46d9-982f-8a560fa04b02-m0.avro
snap-2953705924203826168-1-29074bd8-df94-46fb-8510-271ea6d8c46a.avro
snap-6599416823092334606-1-d2c54ef6-ace4-46d9-982f-8a560fa04b02.avro
v1.metadata.json
v2.metadata.json
v3.metadata.json
version-hint.text


In [11]:
# Delete record
spark.sql("""
    DELETE FROM iceberg.tutorial.employee
    WHERE name = 'Charlie'
""")


DataFrame[]

# 1. Time travel

In [12]:
spark.sql("SELECT * FROM iceberg.tutorial.employee.snapshots").show(truncate=False)


+-----------------------+-------------------+-------------------+---------+--------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                             |summary                                                                                                                                                                                                                                        

In [14]:
spark.sql("select * from iceberg.tutorial.employee").show()

+---+-----+-----------+------+
| id| name| department|salary|
+---+-----+-----------+------+
|  2|  Bob|Engineering|  8000|
|  1|Alice|         HR|  5000|
+---+-----+-----------+------+



In [16]:
spark.sql("""
    SELECT * FROM iceberg.tutorial.employee
    VERSION AS OF 2953705924203826168
""").show()



+---+-------+-----------+------+
| id|   name| department|salary|
+---+-------+-----------+------+
|  2|    Bob|Engineering|  8000|
|  3|Charlie|  Marketing|  6000|
|  1|  Alice|         HR|  5000|
+---+-------+-----------+------+



# 2. Schema Evolution (ADD Column)

In [17]:
# Add a new column 'age'
spark.sql("""
    ALTER TABLE iceberg.tutorial.employee ADD COLUMN age INT
""")



DataFrame[]

In [18]:
!ls -R /content/iceberg_warehouse/tutorial/employee/

/content/iceberg_warehouse/tutorial/employee/:
data  metadata

/content/iceberg_warehouse/tutorial/employee/data:
00000-0-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet
00000-5-49e0028a-9803-4673-bf5b-dd2adcdc3286-00001.parquet
00000-9-83dfe384-2001-48bd-b447-9fcac6266030-00001.parquet
00001-1-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet

/content/iceberg_warehouse/tutorial/employee/metadata:
29074bd8-df94-46fb-8510-271ea6d8c46a-m0.avro
29074bd8-df94-46fb-8510-271ea6d8c46a-m1.avro
8d8b521a-cf6f-43f1-83a0-47ad206e78a2-m0.avro
8d8b521a-cf6f-43f1-83a0-47ad206e78a2-m1.avro
d2c54ef6-ace4-46d9-982f-8a560fa04b02-m0.avro
snap-2953705924203826168-1-29074bd8-df94-46fb-8510-271ea6d8c46a.avro
snap-3117810971636079597-1-8d8b521a-cf6f-43f1-83a0-47ad206e78a2.avro
snap-6599416823092334606-1-d2c54ef6-ace4-46d9-982f-8a560fa04b02.avro
v1.metadata.json
v2.metadata.json
v3.metadata.json
v4.metadata.json
v5.metadata.json
version-hint.text


In [19]:
# Insert data with new schema
spark.sql("""
    INSERT INTO iceberg.tutorial.employee VALUES (4, 'David', 'Finance', 6500, 35)
""")


DataFrame[]

In [20]:
!ls -R /content/iceberg_warehouse/tutorial/employee/

/content/iceberg_warehouse/tutorial/employee/:
data  metadata

/content/iceberg_warehouse/tutorial/employee/data:
00000-0-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet
00000-14-aab62923-345f-46b1-9450-0ac701c881f9-00001.parquet
00000-5-49e0028a-9803-4673-bf5b-dd2adcdc3286-00001.parquet
00000-9-83dfe384-2001-48bd-b447-9fcac6266030-00001.parquet
00001-1-01bce7bc-1a11-40e1-a6b6-2743ee1b3b60-00001.parquet

/content/iceberg_warehouse/tutorial/employee/metadata:
29074bd8-df94-46fb-8510-271ea6d8c46a-m0.avro
29074bd8-df94-46fb-8510-271ea6d8c46a-m1.avro
4063661f-fa39-4b56-ada5-2a537dc3be14-m0.avro
8d8b521a-cf6f-43f1-83a0-47ad206e78a2-m0.avro
8d8b521a-cf6f-43f1-83a0-47ad206e78a2-m1.avro
d2c54ef6-ace4-46d9-982f-8a560fa04b02-m0.avro
snap-2953705924203826168-1-29074bd8-df94-46fb-8510-271ea6d8c46a.avro
snap-3117810971636079597-1-8d8b521a-cf6f-43f1-83a0-47ad206e78a2.avro
snap-6021783865302088004-1-4063661f-fa39-4b56-ada5-2a537dc3be14.avro
snap-6599416823092334606-1-d2c54ef6-ace4-46d9-982f-8a560f

In [22]:
spark.sql("select * from iceberg.tutorial.employee").show()
# this shows how schema evolution is being taken care, the only new value is inserted
# and the rest all will be null, no more analysys exceptions

+---+-----+-----------+------+----+
| id| name| department|salary| age|
+---+-----+-----------+------+----+
|  4|David|    Finance|  6500|  35|
|  2|  Bob|Engineering|  8000|NULL|
|  1|Alice|         HR|  5000|NULL|
+---+-----+-----------+------+----+



# 3: Partitioning (Optional but Important)



In [24]:
# Create partitioned table by department
spark.sql("""
    CREATE TABLE iceberg.tutorial.employee_partitioned (
        id INT,
        name STRING,
        department STRING,
        salary INT
    )
    USING iceberg
    PARTITIONED BY (department)
""")



DataFrame[]

In [25]:
df.writeTo("iceberg.tutorial.employee_partitioned").append()

In [26]:
!ls -R /content/iceberg_warehouse/tutorial/employee_partitioned/

/content/iceberg_warehouse/tutorial/employee_partitioned/:
data  metadata

/content/iceberg_warehouse/tutorial/employee_partitioned/data:
'department=Engineering'  'department=HR'  'department=Marketing'

'/content/iceberg_warehouse/tutorial/employee_partitioned/data/department=Engineering':
00000-21-41577e70-1b1e-4506-8f5a-7a7bab16e6fd-00001.parquet

'/content/iceberg_warehouse/tutorial/employee_partitioned/data/department=HR':
00000-21-41577e70-1b1e-4506-8f5a-7a7bab16e6fd-00002.parquet

'/content/iceberg_warehouse/tutorial/employee_partitioned/data/department=Marketing':
00000-21-41577e70-1b1e-4506-8f5a-7a7bab16e6fd-00003.parquet

/content/iceberg_warehouse/tutorial/employee_partitioned/metadata:
844ef08e-ffea-47ec-afdb-36e68b99bc2a-m0.avro
snap-4818441535154196469-1-844ef08e-ffea-47ec-afdb-36e68b99bc2a.avro
v1.metadata.json
v2.metadata.json
version-hint.text


# 4: ACID Transactions (UPDATE/DELETE)

In [27]:
# Update salary
spark.sql("""
    select * from iceberg.tutorial.employee
""").show()


+---+-----+-----------+------+----+
| id| name| department|salary| age|
+---+-----+-----------+------+----+
|  4|David|    Finance|  6500|  35|
|  2|  Bob|Engineering|  8000|NULL|
|  1|Alice|         HR|  5000|NULL|
+---+-----+-----------+------+----+



# 5: Merge (UPSERT)

In [31]:
# Perform a MERGE (UPSERT)
spark.sql("""
    MERGE INTO iceberg.tutorial.employee t
    USING (SELECT 2 AS id, 'Bob' AS name, 'Engineering' AS department, 9000 AS salary, 30 AS age) s
    ON t.id = s.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")


DataFrame[]

In [32]:
# Update salary
spark.sql("""
    select * from iceberg.tutorial.employee
""").show()


+---+-----+-----------+------+----+
| id| name| department|salary| age|
+---+-----+-----------+------+----+
|  2|  Bob|Engineering|  9000|  30|
|  1|Alice|         HR|  5000|NULL|
|  4|David|    Finance|  6500|  35|
+---+-----+-----------+------+----+



# 6: Table History and Rollback

In [33]:
# Show history
spark.sql("SELECT * FROM iceberg.tutorial.employee.history").show(truncate=False)



+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-05-06 08:17:00.445|6599416823092334606|NULL               |true               |
|2025-05-06 08:18:08.738|2953705924203826168|6599416823092334606|true               |
|2025-05-06 08:18:54.275|3117810971636079597|2953705924203826168|true               |
|2025-05-06 08:31:01.993|6021783865302088004|3117810971636079597|true               |
|2025-05-06 08:36:37.835|2895367200101812802|6021783865302088004|true               |
|2025-05-06 08:37:30.937|4742875239200770694|2895367200101812802|true               |
|2025-05-06 08:39:25.993|5752421767766051467|4742875239200770694|true               |
+-----------------------+-------------------+-------------------+-------------------+

