# Apache Iceberg with PySpark – End-to-End Hands-on

This notebook demonstrates **Apache Iceberg** using **PySpark** on a local Windows setup.

### What you will learn
- Create Iceberg tables
- Append data safely
- Schema evolution (add columns)
- Accidental overwrite & recovery
- Time travel using snapshots

⚠️ **Important:** Iceberg manages schema evolution internally. **No `mergeSchema=true` is required**, unlike plain Parquet.


In [1]:
import os
import sys
from pyspark.sql import SparkSession

# Spark 3.5.3 paths
spark_home = r"C:\spark\spark-3.5.3-bin-hadoop3"
sys.path.insert(0, spark_home + r"\python")
sys.path.insert(0, spark_home + r"\python\lib\py4j-0.10.9.7-src.zip")

# Python executables
os.environ["PYSPARK_PYTHON"] = r"C:\Users\Raghava\AppData\Local\Programs\Python\Python310\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\Raghava\AppData\Local\Programs\Python\Python310\python.exe"

In [2]:
spark = (
    SparkSession.builder
    .appName("IcebergLocalSetup-Spark353")
    .master("local[*]")
    .config(
        "spark.jars.packages",
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1"
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    )
    .config(
        "spark.sql.catalog.local",
        "org.apache.iceberg.spark.SparkCatalog"
    )
    .config("spark.sql.catalog.local.type", "hadoop")
    .config(
        "spark.sql.catalog.local.warehouse",
        "file:///H:/spark_practice/with_Iceberg/warehouse"
    )
    .getOrCreate()
)

## Day 1 – Create Iceberg Table & Initial Load

In [3]:
spark.sql("SHOW CATALOGS").show()


+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



In [4]:
spark.sql("CREATE DATABASE IF NOT EXISTS local.testdb")

DataFrame[]

In [5]:
df_day1 = spark.createDataFrame(
    [(1,'Ramesh'),(2,'Pavan')],
    ["id", "name"]
)

df_day1.writeTo("local.testdb.users") \
    .using("iceberg") \
    .create()


In [12]:
spark.table("local.testdb.users").show()


+---+------+
| id|  name|
+---+------+
|  1|Ramesh|
|  2| Pavan|
+---+------+



In [15]:
spark.sql("""
ALTER TABLE local.testdb.users
ADD COLUMN age INT
""")


DataFrame[]

## Day 2 – Schema Evolution (Add Column) & Append Data

Notice that we **do not use `mergeSchema=true`**. Iceberg evolves schema safely at metadata level.

In [16]:
df_day2 = spark.createDataFrame(
    [(3, "Akhil", 35), (4, "Nikhil", 28)],
    ["id", "name", "age"]
)


In [17]:
df_day2.writeTo("local.testdb.users").append()


In [18]:
spark.table("local.testdb.users").show()


+---+------+----+
| id|  name| age|
+---+------+----+
|  1|Ramesh|NULL|
|  3| Akhil|  35|
|  2| Pavan|NULL|
|  4|Nikhil|  28|
+---+------+----+



## Day 3 – Accidental Overwrite (Simulating a Production Mistake)

In [20]:
df_bad = spark.createDataFrame(
    [(99, "Hacker", 100)],
    ["id", "name", "age"]
)

df_bad.writeTo("local.testdb.users").overwritePartitions()



In [21]:
spark.table("local.testdb.users").show()


+---+------+---+
| id|  name|age|
+---+------+---+
| 99|Hacker|100|
+---+------+---+



## Day 4 – Time Travel & Recovery

Iceberg keeps all snapshots. We can query or rollback safely.

In [22]:
spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM local.testdb.users.snapshots
ORDER BY committed_at
""").show(truncate=False)


+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5193484984745520434|2025-12-29 16:41:55.26 |append   |
|3451235462764937396|2025-12-29 16:56:29.823|append   |
|2047872761209547268|2025-12-29 17:01:34.409|overwrite|
+-------------------+-----------------------+---------+



In [23]:
snapshot_id = 3451235462764937396

spark.read \
    .format("iceberg") \
    .option("snapshot-id", snapshot_id) \
    .load("local.testdb.users") \
    .show()


+---+------+----+
| id|  name| age|
+---+------+----+
|  1|Ramesh|NULL|
|  2| Pavan|NULL|
|  3| Akhil|  35|
|  4|Nikhil|  28|
+---+------+----+



In [24]:
spark.sql(f"""
CALL local.system.rollback_to_snapshot(
  'testdb.users',
  {snapshot_id}
)
""")


DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [25]:
spark.table("local.testdb.users").show()


+---+------+----+
| id|  name| age|
+---+------+----+
|  1|Ramesh|NULL|
|  2| Pavan|NULL|
|  3| Akhil|  35|
|  4|Nikhil|  28|
+---+------+----+



### Key Takeaways
- Iceberg prevents data loss using snapshots
- Schema evolution is **automatic & safe**
- **No `mergeSchema=true` required**
- Always read data via table, not raw parquet files
