# Apache Iceberg with Spark (Local Hadoop Catalog)

This notebook sets up a local Apache Iceberg data lake using Spark with a Hadoop catalog that writes metadata and data files to your local filesystem.

- Catalog name: `local`
- Warehouse path: `/Users/aliceguo/src/apache-iceberg-for-dummy/warehouse`
- No external services (no Hive Metastore, no MinIO) needed

You'll:
- Install dependencies
- Start a SparkSession with Iceberg runtime jars
- Create a namespace and table
- Write and read data using Iceberg



In [28]:
import sys, os
python_exe = sys.executable  # driver Python (3.12)

os.environ["PYSPARK_PYTHON"] = python_exe
os.environ["PYSPARK_DRIVER_PYTHON"] = python_exe

## Prerequisites

- Python 3.12
- Jupyter (`pip install notebook jupyterlab`)
- PySpark installed in this environment

If PySpark isn't installed, the cell below will install it alongside pandas and pyarrow.


In [29]:
%pip install -q pyspark==3.5.1 pandas pyarrow certifi
# then re-run the cell:
import pyspark; pyspark.__version__



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3.12 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


'3.5.1'

## Configure Spark with Iceberg

We download Iceberg runtime jars matching your Spark version and launch a SparkSession pre-configured for a local Hadoop catalog.


In [30]:
import os
from pathlib import Path
from urllib.request import urlretrieve

WAREHOUSE = "/Users/aliceguo/src/apache-iceberg-for-dummy/warehouse"
Path(WAREHOUSE).mkdir(parents=True, exist_ok=True)

# Resolve Spark version at runtime
import pyspark
spark_version = pyspark.__version__
scala_suffix = "2.12"  # Spark 3.x uses Scala 2.12

# Choose Iceberg runtime version compatible with Spark 3.3/3.4/3.5
iceberg_version = os.environ.get("ICEBERG_VERSION", "1.6.1")

JARS_DIR = Path("/Users/aliceguo/src/apache-iceberg-for-dummy/.jars")
JARS_DIR.mkdir(parents=True, exist_ok=True)

base = "https://repo1.maven.org/maven2"
minor = ".".join(spark_version.split(".")[:2])
supported_minors = {"3.5", "3.4", "3.3"}
runtime_minor = minor if minor in supported_minors else "3.5"
artifact_group = f"org/apache/iceberg/iceberg-spark-runtime-{runtime_minor}_{scala_suffix}"
artifact_file = f"{iceberg_version}/iceberg-spark-runtime-{runtime_minor}_{scala_suffix}-{iceberg_version}.jar"

# Download runtime jar if missing
jar_dest = JARS_DIR / artifact_file.split("/")[-1]
if not jar_dest.exists():
    url = f"{base}/{artifact_group}/{artifact_file}"
    print("Downloading", url)
    urlretrieve(url, jar_dest)

extra_jars = ",".join(str(p) for p in JARS_DIR.glob("*.jar"))
print("Using Spark", spark_version, "with Iceberg runtime for", runtime_minor)
extra_jars

Using Spark 3.5.1 with Iceberg runtime for 3.5


'/Users/aliceguo/src/apache-iceberg-for-dummy/.jars/iceberg-spark-runtime-3.5_2.12-1.6.1.jar'

In [31]:
from pathlib import Path

WAREHOUSE = "/Users/aliceguo/src/apache-iceberg-for-dummy/warehouse"
Path(WAREHOUSE).mkdir(parents=True, exist_ok=True)

In [32]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("iceberg-local-hadoop")
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config("spark.jars", extra_jars)
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", WAREHOUSE)
    .getOrCreate()
)

spark.version


'3.5.1'

## Create namespace and table

We'll use catalog `local` and namespace `demo`.


In [33]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.demo")
spark.sql("DROP TABLE IF EXISTS local.demo.events")
spark.sql(
    """
    CREATE TABLE local.demo.events (
      id BIGINT,
      category STRING,
      ts TIMESTAMP
    ) USING iceberg
    PARTITIONED BY (days(ts))
    """
)

spark.sql("SHOW TABLES IN local.demo").show(truncate=False)


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|demo     |events   |false      |
+---------+---------+-----------+



In [22]:
from datetime import datetime
from pyspark.sql import functions as F

rows = [
    (1, "alpha", datetime(2024, 1, 1, 10, 0, 0)),
    (2, "beta", datetime(2024, 1, 2, 12, 30, 0)),
    (3, "alpha", datetime(2024, 1, 2, 13, 45, 0)),
]

df = spark.createDataFrame(rows, ["id", "category", "ts"]) 

df.writeTo("local.demo.events").append()

spark.table("local.demo.events").orderBy("id").show()


+---+--------+-------------------+
| id|category|                 ts|
+---+--------+-------------------+
|  1|   alpha|2024-01-01 10:00:00|
|  2|    beta|2024-01-02 12:30:00|
|  3|   alpha|2024-01-02 13:45:00|
+---+--------+-------------------+



In [39]:
# Table statistics for local.demo.events

# Row count from the table
row_count = spark.table("local.demo.events").count()
print(f"row_count: {row_count}")


row_count: 0


In [41]:
# Data files summary (count, total rows, total bytes)
spark.sql(
    """
    SELECT
      count(*)                AS num_data_files,
      sum(record_count)       AS total_rows_in_files,
      sum(file_size_in_bytes) AS total_bytes
    FROM local.demo.events.files
    """
).show(truncate=False)

+--------------+-------------------+-----------+
|num_data_files|total_rows_in_files|total_bytes|
+--------------+-------------------+-----------+
|0             |NULL               |NULL       |
+--------------+-------------------+-----------+



In [43]:
# Partition-level stats (if partitioned)
spark.sql("SELECT * FROM local.demo.events.partitions").show(truncate=False)


+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+---------------+------------------------+
|partition|spec_id|record_count|file_count|total_data_file_size_in_bytes|position_delete_record_count|position_delete_file_count|equality_delete_record_count|equality_delete_file_count|last_updated_at|last_updated_snapshot_id|
+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+---------------+------------------------+
+---------+-------+------------+----------+-----------------------------+----------------------------+--------------------------+----------------------------+--------------------------+---------------+------------------------+



In [44]:
spark.sql("DESCRIBE TABLE local.demo.events.history").show(truncate=False)
spark.sql("DESCRIBE TABLE local.demo.events.snapshots").show(truncate=False)

+-------------------+---------+-------+
|col_name           |data_type|comment|
+-------------------+---------+-------+
|made_current_at    |timestamp|NULL   |
|snapshot_id        |bigint   |NULL   |
|parent_id          |bigint   |NULL   |
|is_current_ancestor|boolean  |NULL   |
+-------------------+---------+-------+

+-------------+------------------+-------+
|col_name     |data_type         |comment|
+-------------+------------------+-------+
|committed_at |timestamp         |NULL   |
|snapshot_id  |bigint            |NULL   |
|parent_id    |bigint            |NULL   |
|operation    |string            |NULL   |
|manifest_list|string            |NULL   |
|summary      |map<string,string>|NULL   |
+-------------+------------------+-------+



In [37]:
# History: when snapshots became current
spark.sql("""
  SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor
  FROM local.demo.events.history
  ORDER BY made_current_at
""").show(truncate=False)


+---------------+-----------+---------+-------------------+
|made_current_at|snapshot_id|parent_id|is_current_ancestor|
+---------------+-----------+---------+-------------------+
+---------------+-----------+---------+-------------------+

+-----------+---------+------------+---------+
|snapshot_id|parent_id|committed_at|operation|
+-----------+---------+------------+---------+
+-----------+---------+------------+---------+



In [50]:
spark.sql("INSERT INTO local.demo.events VALUES (4, 'gamma', timestamp '2024-01-03 09:00:00')")
spark.sql("SELECT * FROM local.demo.events ORDER BY id").show()

+---+--------+-------------------+
| id|category|                 ts|
+---+--------+-------------------+
|  4|   gamma|2024-01-03 09:00:00|
|  4|   gamma|2024-01-03 09:00:00|
|  4|   gamma|2024-01-03 09:00:00|
|  4|   gamma|2024-01-03 09:00:00|
|  4|   gamma|2024-01-03 09:00:00|
+---+--------+-------------------+



In [51]:
# Snapshots: includes committed_at and operation
spark.sql("""
  SELECT snapshot_id, parent_id, committed_at, operation
  FROM local.demo.events.snapshots
  ORDER BY committed_at
""").show(truncate=False)

+-------------------+-------------------+-----------------------+---------+
|snapshot_id        |parent_id          |committed_at           |operation|
+-------------------+-------------------+-----------------------+---------+
|2120259642793238659|NULL               |2025-10-13 20:30:18.203|append   |
|6813048920963727275|2120259642793238659|2025-10-13 20:31:16.44 |append   |
|5834103816349075604|6813048920963727275|2025-10-13 20:31:46.413|append   |
|8803006309687028399|5834103816349075604|2025-10-13 20:32:08.918|append   |
|2816266580316681037|8803006309687028399|2025-10-13 20:32:22.227|append   |
+-------------------+-------------------+-----------------------+---------+



In [55]:
# Delete records and observe snapshot operation change

# Delete example: remove all rows with id = 4
spark.sql("DELETE FROM local.demo.events WHERE id = 4")

# Show current table contents
spark.table("local.demo.events").orderBy("id", "ts").show(truncate=False)

# Show most recent snapshots with operations
spark.sql(
    """
    SELECT snapshot_id, parent_id, committed_at, operation
    FROM local.demo.events.snapshots
    ORDER BY committed_at DESC
    LIMIT 5
    """
).show(truncate=False)

# Show added/removed counts on the latest snapshot
spark.sql(
    """
    SELECT
      committed_at,
      operation,
      CAST(coalesce(summary['added-records'],   '0') AS BIGINT)   AS added_records,
      CAST(coalesce(summary['removed-records'], '0') AS BIGINT)   AS removed_records,
      CAST(coalesce(summary['added-data-files'],'0') AS BIGINT)   AS added_files,
      CAST(coalesce(summary['removed-data-files'],'0') AS BIGINT) AS removed_files
    FROM local.demo.events.snapshots
    ORDER BY committed_at DESC
    LIMIT 1
    """
).show(truncate=False)


+---+--------+---+
|id |category|ts |
+---+--------+---+
+---+--------+---+

+-------------------+-------------------+-----------------------+---------+
|snapshot_id        |parent_id          |committed_at           |operation|
+-------------------+-------------------+-----------------------+---------+
|5292537171771044943|2816266580316681037|2025-10-13 21:05:01.881|delete   |
|2816266580316681037|8803006309687028399|2025-10-13 20:32:22.227|append   |
|8803006309687028399|5834103816349075604|2025-10-13 20:32:08.918|append   |
|5834103816349075604|6813048920963727275|2025-10-13 20:31:46.413|append   |
|6813048920963727275|2120259642793238659|2025-10-13 20:31:16.44 |append   |
+-------------------+-------------------+-----------------------+---------+

+-----------------------+---------+-------------+---------------+-----------+-------------+
|committed_at           |operation|added_records|removed_records|added_files|removed_files|
+-----------------------+---------+-------------+-----

In [62]:
# Generate many small data files for compaction demo

from datetime import datetime, timedelta
import random

# Keep one shuffle partition so each write produces a single small file
spark.conf.set("spark.sql.shuffle.partitions", "1")

random.seed(42)

def make_batch(batch_idx: int, num_rows: int = 200):
    base_ts = datetime(2024, 1, 1) + timedelta(days=batch_idx % 7)
    rows = [
        (
            batch_idx * 1_000_000 + i,
            random.choice(["alpha", "beta", "gamma", "delta"]),
            base_ts + timedelta(minutes=i)
        )
        for i in range(num_rows)
    ]
    return spark.createDataFrame(rows, ["id", "category", "ts"]) 

# Write many small files via multiple appends
for b in range(30):  # increase if you want more files
    make_batch(b, num_rows=100).coalesce(1).writeTo("local.demo.events").append()

# Validate number of files and row count
spark.sql(
    "SELECT COUNT(*) AS num_files, SUM(file_size_in_bytes) AS total_bytes FROM local.demo.events.files"
).show(truncate=False)

print("row_count:", spark.table("local.demo.events").count())


+---------+-----------+
|num_files|total_bytes|
+---------+-----------+
|30       |45038      |
+---------+-----------+

row_count: 3000


In [63]:
spark.sql(
    """
    SELECT snapshot_id, parent_id, committed_at, operation
    FROM local.demo.events.snapshots
    ORDER BY committed_at DESC
    LIMIT 5
    """
).show(truncate=False)

+-------------------+-------------------+-----------------------+---------+
|snapshot_id        |parent_id          |committed_at           |operation|
+-------------------+-------------------+-----------------------+---------+
|8409446886655369985|6318710171621420987|2025-10-13 21:19:10.05 |append   |
|6318710171621420987|7648811580166220859|2025-10-13 21:19:09.71 |append   |
|7648811580166220859|8071463243716675358|2025-10-13 21:19:09.209|append   |
|8071463243716675358|6464031408296887663|2025-10-13 21:19:08.679|append   |
|6464031408296887663|1491900029434595614|2025-10-13 21:19:08.077|append   |
+-------------------+-------------------+-----------------------+---------+



In [70]:
# Compact small data files (bin-pack) and inspect results
# Rewrite (bin-pack) data files to target size (128MB example)
spark.sql(
    """
    CALL local.system.rewrite_data_files(
      table => 'local.demo.events',
      options => map(
        'target-file-size-bytes','134217728',   -- 128MB
        'min-input-files','1'
      )
    )
    """
).show(truncate=False)

# Check files and snapshots after compaction
spark.sql(
    "SELECT COUNT(*) AS num_files, SUM(file_size_in_bytes) AS total_bytes FROM local.demo.events.files"
).show(truncate=False)

+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|0                         |0                     |0                    |0                      |
+--------------------------+----------------------+---------------------+-----------------------+

+---------+-----------+
|num_files|total_bytes|
+---------+-----------+
|7        |15357      |
+---------+-----------+



In [71]:
# Count snapshots by operation type
spark.sql(
    """
    SELECT operation, COUNT(*) AS num_snapshots
    FROM local.demo.events.snapshots
    GROUP BY operation
    ORDER BY operation
    """
).show(truncate=False)


+---------+-------------+
|operation|num_snapshots|
+---------+-------------+
|append   |35           |
|delete   |1            |
|replace  |1            |
+---------+-------------+



In [72]:
# Per-snapshot added/removed files and records (from snapshots.summary map)
spark.sql(
    """
    SELECT
      committed_at,
      operation,
      CAST(coalesce(summary['added-data-files'],   '0') AS BIGINT) AS added_files,
      CAST(coalesce(summary['removed-data-files'], '0') AS BIGINT) AS removed_files,
      CAST(coalesce(summary['added-records'],      '0') AS BIGINT) AS added_records,
      CAST(coalesce(summary['removed-records'],    '0') AS BIGINT) AS removed_records
    FROM local.demo.events.snapshots
    ORDER BY committed_at
    """
).show(truncate=False)


+-----------------------+---------+-----------+-------------+-------------+---------------+
|committed_at           |operation|added_files|removed_files|added_records|removed_records|
+-----------------------+---------+-----------+-------------+-------------+---------------+
|2025-10-13 20:30:18.203|append   |1          |0            |1            |0              |
|2025-10-13 20:31:16.44 |append   |1          |0            |1            |0              |
|2025-10-13 20:31:46.413|append   |1          |0            |1            |0              |
|2025-10-13 20:32:08.918|append   |1          |0            |1            |0              |
|2025-10-13 20:32:22.227|append   |1          |0            |1            |0              |
|2025-10-13 21:05:01.881|delete   |0          |0            |0            |0              |
|2025-10-13 21:18:56.601|append   |1          |0            |100          |0              |
|2025-10-13 21:18:57.028|append   |1          |0            |100          |0    