# 🧊 Apache Iceberg + Spark (Jupyter Quickstart)
This notebook is pre-configured to connect Spark to your **local Iceberg (Hadoop catalog)** on **MinIO**.

**Tip:** Make sure you've run your Docker stack and fetched JARs so they are mounted in the notebook container (see README).

In [4]:
# Reset cell (safe to run anytime once the saprk context is created )
# Drop the demo table/namespace if they already exist (keeps notebook idempotent)
spark.sql("DROP TABLE IF EXISTS local.airline.flights PURGE")
# Only drop namespace if empty (avoids errors)
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.airline")
try:
    spark.sql("DROP NAMESPACE local.airline")
    spark.sql("CREATE NAMESPACE local.airline")
except Exception:
    pass
print("Environment reset.")


Environment reset.


In [5]:
# Helper Functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


iceberg_jars = ",".join([
    "/home/jovyan/extra-jars/iceberg-spark-runtime-3.5_2.12-1.6.1.jar",
    "/home/jovyan/extra-jars/hadoop-aws-3.3.4.jar",
    "/home/jovyan/extra-jars/aws-java-sdk-bundle-1.12.262.jar"
])

spark = (SparkSession.builder
    .appName("iceberg_fix")
    .config("spark.jars", iceberg_jars)
    .config("spark.driver.extraClassPath", iceberg_jars)
    .config("spark.executor.extraClassPath", iceberg_jars)
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", "s3a://warehouse/iceberg")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.access.key", "minio")
    .config("spark.hadoop.fs.s3a.secret.key", "minio123")
    .getOrCreate())


SEP = "─" * 80

def sql(q, title=None, truncate=False):
    if title:
        print(f"\n{SEP}\n{title}\n{SEP}")
    return spark.sql(q).show(truncate=truncate)

def scalar(q):
    """Return a single value from a single-row/col query."""
    df = spark.sql(q).limit(1).toPandas()
    return None if df.empty else list(df.iloc[0])[0]

def snapshots_df():
    return spark.sql("SELECT * FROM local.airline.flights.snapshots ORDER BY committed_at")

def print_snapshots(title="Snapshots"):
    print(f"\n{SEP}\n{title}\n{SEP}")
    snapshots_df().show(truncate=False)

def section(name):
    print(f"\n\n🧊 {name}\n")


In [6]:
section("Step 1 — Create + Insert")

sql("CREATE NAMESPACE IF NOT EXISTS local.airline")

sql("""
CREATE TABLE IF NOT EXISTS local.airline.flights (
  flight_id STRING,
  carrier   STRING,
  origin    STRING,
  dest      STRING,
  scheduled_dep_ts TIMESTAMP,
  actual_dep_ts    TIMESTAMP
) USING iceberg
""", "Create table")

sql("""
INSERT INTO local.airline.flights VALUES
('LH1234','LH','FRA','BOM',timestamp('2025-08-13 10:30:00'),timestamp('2025-08-13 10:29:00')),
('LH5678','LH','MUC','LHR',timestamp('2025-08-13 12:00:00'),timestamp('2025-08-13 12:07:00')),
('LH9001','LH','FRA','BER',timestamp('2025-08-13 09:10:00'),timestamp('2025-08-13 09:35:00'))
""", "Insert 3 rows")

sql("""
SELECT flight_id, origin, dest, scheduled_dep_ts, actual_dep_ts
FROM local.airline.flights
ORDER BY scheduled_dep_ts
""", "Current rows", truncate=False)

print_snapshots()




🧊 Step 1 — Create + Insert

++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Create table
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Insert 3 rows
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Current rows
────────────────────────────────────────────────────────────────────────────────
+---------+------+----+-------------------+-------------------+
|flight_id|origin|dest|scheduled_dep_ts   |actual_dep_ts      |
+---------+------+----+-------------------+-------------------+
|LH9001   |FRA   |BER |2025-08-13 09:10:00|2025-08-13 09:35:00|
|LH1234   |FRA   |BOM |2025-08-13 10:30:00|2025-08-13 10:29:00|
|LH5678   |MUC   |LHR |2025-08-13 12:00:00|2025-08-13 12:07:00|
+---------+------+

In [7]:
section("Step 2 — Partition & Evolve")

# Add a date column derived from scheduled departure
sql("ALTER TABLE local.airline.flights ADD COLUMN dep_date DATE", "Add dep_date column")
sql("UPDATE local.airline.flights SET dep_date = DATE(scheduled_dep_ts)", "Backfill dep_date")

# Evolve the partition spec: new writes will be partitioned by dep_date
sql("ALTER TABLE local.airline.flights ADD PARTITION FIELD dep_date", "Add partition field dep_date")

# Insert a couple of new flights (these will be written under the new partition spec)
sql("""
INSERT INTO local.airline.flights (flight_id, carrier, origin, dest, scheduled_dep_ts, actual_dep_ts, dep_date) VALUES
('LH1111','LH','TXL','CDG',timestamp('2025-08-14 08:00:00'),timestamp('2025-08-14 07:55:00'), DATE('2025-08-14')),
('LH2222','LH','CDG','TXL',timestamp('2025-08-14 19:20:00'),timestamp('2025-08-14 19:44:00'), DATE('2025-08-14'))
""", "Insert 2 more rows (new partition spec)")

sql("""
SELECT flight_id, origin, dest, dep_date
FROM local.airline.flights
ORDER BY dep_date, flight_id
""", "Rows w/ dep_date", truncate=False)

# Show partition metadata (may be empty until writes exist under new spec)
sql("SELECT * FROM local.airline.flights.partitions", "Partition metadata")
print_snapshots()




🧊 Step 2 — Partition & Evolve


────────────────────────────────────────────────────────────────────────────────
Add dep_date column
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Backfill dep_date
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Add partition field dep_date
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Insert 2 more rows (new partition spec)
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Rows w/ dep_date
─────────────────────────────────────────────────────────

In [8]:
section("Step 3 — MERGE Upsert")

# Capture a "before-merge" snapshot id for time travel later
before_merge_id = scalar("""
SELECT snapshot_id FROM local.airline.flights.snapshots
ORDER BY committed_at DESC
""")

sql("""
MERGE INTO local.airline.flights t
USING (
  SELECT 'LH9001' AS flight_id, 'LH' AS carrier, 'FRA' AS origin, 'BER' AS dest,
         timestamp('2025-08-13 09:10:00') AS scheduled_dep_ts,
         timestamp('2025-08-13 09:20:00') AS actual_dep_ts,
         DATE('2025-08-13') AS dep_date
) s
ON t.flight_id = s.flight_id AND DATE(t.scheduled_dep_ts) = s.dep_date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""", "Apply MERGE (correct delay)")

sql("""
SELECT flight_id, scheduled_dep_ts, actual_dep_ts
FROM local.airline.flights
WHERE flight_id='LH9001'
""", "Row after MERGE", truncate=False)

print_snapshots("Snapshots (after MERGE)")
print(f"Saved before_merge_id = {before_merge_id}")




🧊 Step 3 — MERGE Upsert


────────────────────────────────────────────────────────────────────────────────
Apply MERGE (correct delay)
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Row after MERGE
────────────────────────────────────────────────────────────────────────────────
+---------+-------------------+-------------------+
|flight_id|scheduled_dep_ts   |actual_dep_ts      |
+---------+-------------------+-------------------+
|LH9001   |2025-08-13 09:10:00|2025-08-13 09:20:00|
+---------+-------------------+-------------------+


────────────────────────────────────────────────────────────────────────────────
Snapshots (after MERGE)
────────────────────────────────────────────────────────────────────────────────
+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------

In [9]:
section("Step 4 — Time Travel")

# If we captured a snapshot ID, read the table as of that snapshot
if before_merge_id is not None:
    sql(f"""
    SELECT flight_id, scheduled_dep_ts, actual_dep_ts
    FROM local.airline.flights VERSION AS OF {int(before_merge_id)}
    WHERE flight_id='LH9001'
    """, f"State BEFORE MERGE (VERSION AS OF {before_merge_id})", truncate=False)
else:
    print("No before_merge snapshot captured; skipping VERSION AS OF demo.")

# Show current (latest) for comparison
sql("""
SELECT flight_id, scheduled_dep_ts, actual_dep_ts
FROM local.airline.flights
WHERE flight_id='LH9001'
""", "Current (latest) state", truncate=False)




🧊 Step 4 — Time Travel


────────────────────────────────────────────────────────────────────────────────
State BEFORE MERGE (VERSION AS OF 7081131151258605231)
────────────────────────────────────────────────────────────────────────────────
+---------+-------------------+-------------------+
|flight_id|scheduled_dep_ts   |actual_dep_ts      |
+---------+-------------------+-------------------+
|LH9001   |2025-08-13 09:10:00|2025-08-13 09:35:00|
+---------+-------------------+-------------------+


────────────────────────────────────────────────────────────────────────────────
Current (latest) state
────────────────────────────────────────────────────────────────────────────────
+---------+-------------------+-------------------+
|flight_id|scheduled_dep_ts   |actual_dep_ts      |
+---------+-------------------+-------------------+
|LH9001   |2025-08-13 09:10:00|2025-08-13 09:20:00|
+---------+-------------------+-------------------+



In [10]:
section("Step 5 — Schema Evolution")

# Add a new column
sql("ALTER TABLE local.airline.flights ADD COLUMN aircraft_tail STRING", "Add new column aircraft_tail")

# Rename a column (dest -> destination)
sql("ALTER TABLE local.airline.flights RENAME COLUMN dest TO destination", "Rename dest -> destination")

# Drop a column (carrier)
sql("ALTER TABLE local.airline.flights DROP COLUMN carrier", "Drop column carrier")

# Insert a new row using the evolved schema (explicit column list)
sql("""
INSERT INTO local.airline.flights
(flight_id, origin, destination, scheduled_dep_ts, actual_dep_ts, dep_date, aircraft_tail)
VALUES
('LH3333','FRA','LHR',timestamp('2025-08-15 07:30:00'),timestamp('2025-08-15 07:37:00'), DATE('2025-08-15'), 'D-AIAB')
""", "Insert row with evolved schema")

sql("""
SELECT * FROM local.airline.flights
ORDER BY scheduled_dep_ts
""", "All rows after schema evolution", truncate=False)




🧊 Step 5 — Schema Evolution


────────────────────────────────────────────────────────────────────────────────
Add new column aircraft_tail
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Rename dest -> destination
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Drop column carrier
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
Insert row with evolved schema
────────────────────────────────────────────────────────────────────────────────
++
||
++
++


────────────────────────────────────────────────────────────────────────────────
All rows after schema evolution
────────────────────────────────────────────