In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
ICEBERG_REST = "http://iceberg-rest:8181"
MINIO_ENDPOINT = "http://minio:9000"
AWS_REGION = "us-east-1"
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", "minioadmin")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", "minioadmin")

In [6]:
spark = (
    SparkSession.builder
    .appName("Iceberg REST via MinIO")
    .master("spark://spark:7077")  # point to the standalone master
    .config("spark.driver.host", "jupyter")  # driver is this container; resolvable by workers
    # Optional but useful:
    # .config("spark.driver.port", "7078")  # pin a port if you want (not needed if all on bridge)
    # .config("spark.blockManager.port", "7079")
    .config("spark.eventLog.enabled", "true")
    .config("spark.eventLog.dir", "s3://spark-events/")
    .config("spark.sql.catalog.ice", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.ice.type", "rest")
    .config("spark.sql.catalog.ice.uri", ICEBERG_REST)
    .config("spark.sql.catalog.ice.warehouse", "s3://warehouse/")
    .config("spark.sql.catalog.ice.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.catalog.ice.s3.endpoint", MINIO_ENDPOINT)
    .config("spark.sql.catalog.ice.s3.path-style-access", "true")
    .config("spark.sql.catalog.ice.s3.region", AWS_REGION)
    .config("spark.sql.catalog.ice.s3.access-key-id", AWS_ACCESS_KEY_ID)
    .config("spark.sql.catalog.ice.s3.secret-access-key", AWS_SECRET_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)


In [7]:
print("Spark version:", spark.version)

Spark version: 3.5.3


In [8]:
print(spark.sparkContext.master) # should be spark://spark:7077
print(spark.sparkContext.uiWebUrl) # link to the app UI

spark://spark:7077
http://jupyter:4040


In [9]:
spark.sql("SHOW NAMESPACES IN ice").show(truncate=False)

+---------+
|namespace|
+---------+
+---------+



In [11]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS ice.demo")

DataFrame[]

In [12]:
spark.sql("SHOW NAMESPACES IN ice").show(truncate=False)

+---------+
|namespace|
+---------+
|demo     |
+---------+



In [29]:
%env AWS_REGION=us-east-1
%env AWS_ACCESS_KEY_ID=minioadmin
%env AWS_SECRET_ACCESS_KEY=minioadmin

env: AWS_REGION=us-east-1
env: AWS_ACCESS_KEY_ID=minioadmin
env: AWS_SECRET_ACCESS_KEY=minioadmin


In [13]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS ice.demo.customers (
        id BIGINT,
        name STRING,
        email STRING
    )
    USING iceberg
    PARTITIONED BY (email)
""")

DataFrame[]

In [14]:
spark.sql("""
    INSERT INTO ice.demo.customers VALUES
      (1, 'Alice Smith', 'alice@example.com'),
      (2, 'Bob Johnson', 'bob@example.com'),
      (3, 'Carol Adams', 'carol@example.com')
""")

DataFrame[]

Select the customers

In [15]:
spark.sql("SELECT * FROM ice.demo.customers").show()

+---+-----------+-----------------+
| id|       name|            email|
+---+-----------+-----------------+
|  3|Carol Adams|carol@example.com|
|  1|Alice Smith|alice@example.com|
|  2|Bob Johnson|  bob@example.com|
+---+-----------+-----------------+



Select the customers with an `o`

In [16]:
spark.sql("SELECT * FROM ice.demo.customers WHERE name like '%o%'").show()

+---+-----------+-----------------+
| id|       name|            email|
+---+-----------+-----------------+
|  3|Carol Adams|carol@example.com|
|  2|Bob Johnson|  bob@example.com|
+---+-----------+-----------------+



Let's add some more data to our DataLake

In [17]:
spark.sql("""
    INSERT INTO ice.demo.customers VALUES
      (4,  'Diego Ramirez',       'diego.ramirez@example.com'),
      (5,  'Maya Patel',          'maya.patel@example.com'),
      (6,  'Liam O’Connor',       'liam.oconnor@example.com'),
      (7,  'Sofia Almeida',       'sofia.almeida@example.com'),
      (8,  'Noah Williams',       'noah.williams@example.com'),
      (9,  'Ava Thompson',        'ava.thompson@example.com'),
      (10, 'Ethan Chen',          'ethan.chen@example.com'),
      (11, 'Olivia Garcia',       'olivia.garcia@example.com'),
      (12, 'Lucas Martin',        'lucas.martin@example.com'),
      (13, 'Emma Robinson',       'emma.robinson@example.com'),
      (14, 'Benjamin Kim',        'benjamin.kim@example.com'),
      (15, 'Isabella Rossi',      'isabella.rossi@example.com'),
      (16, 'James Nguyen',        'james.nguyen@example.com'),
      (17, 'Mila Novak',          'mila.novak@example.com'),
      (18, 'Henry Scott',         'henry.scott@example.com'),
      (19, 'Aria Johnson',        'aria.johnson@example.com'),
      (20, 'Daniela Costa',       'daniela.costa@example.com'),
      (21, 'Jack Wilson',         'jack.wilson@example.com'),
      (22, 'Zoe King',            'zoe.king@example.com'),
      (23, 'Oliver Brown',        'oliver.brown@example.com')
""")

DataFrame[]

Let's run our query

In [18]:
spark.sql("SELECT * FROM ice.demo.customers WHERE name like '%o%'").show()

+---+--------------+--------------------+
| id|          name|               email|
+---+--------------+--------------------+
| 22|      Zoe King|zoe.king@example.com|
| 21|   Jack Wilson|jack.wilson@examp...|
| 15|Isabella Rossi|isabella.rossi@ex...|
| 18|   Henry Scott|henry.scott@examp...|
|  7| Sofia Almeida|sofia.almeida@exa...|
| 19|  Aria Johnson|aria.johnson@exam...|
| 20| Daniela Costa|daniela.costa@exa...|
|  4| Diego Ramirez|diego.ramirez@exa...|
|  8| Noah Williams|noah.williams@exa...|
| 17|    Mila Novak|mila.novak@exampl...|
|  9|  Ava Thompson|ava.thompson@exam...|
| 13| Emma Robinson|emma.robinson@exa...|
|  6| Liam O’Connor|liam.oconnor@exam...|
| 23|  Oliver Brown|oliver.brown@exam...|
|  3|   Carol Adams|   carol@example.com|
|  2|   Bob Johnson|     bob@example.com|
+---+--------------+--------------------+



Let's view the snapshots, we had two batches, done at different times. So we should see two snapshots

In [19]:
spark.sql("""
  SELECT snapshot_id, committed_at, operation
  FROM ice.demo.customers.snapshots
  ORDER BY committed_at
""").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|6418764763303810672|2025-09-10 18:57:49.224|append   |
|5585728760866818996|2025-09-10 18:58:00.554|append   |
+-------------------+-----------------------+---------+



Let's now view the lineage of the data

In [20]:
spark.sql("""
  SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor
  FROM ice.demo.customers.history
  ORDER BY made_current_at
""").show(truncate=False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-09-10 18:57:49.224|6418764763303810672|NULL               |true               |
|2025-09-10 18:58:00.554|5585728760866818996|6418764763303810672|true               |
+-----------------------+-------------------+-------------------+-------------------+



Now, let's go back in time by reverting the timestamp

In [21]:
spark.conf.set("spark.sql.session.timeZone", "UTC")

# Replace the following timestamp with one after the initial timestamp
ts = "2025-09-10 18:57:49.225"

df_ts = spark.sql(f"""
  SELECT *
  FROM ice.demo.customers
  TIMESTAMP AS OF '{ts}'
""")
df_ts.show()

+---+-----------+-----------------+
| id|       name|            email|
+---+-----------+-----------------+
|  3|Carol Adams|carol@example.com|
|  1|Alice Smith|alice@example.com|
|  2|Bob Johnson|  bob@example.com|
+---+-----------+-----------------+



In [26]:
spark.sql("""SELECT snapshot_id, committed_at, operation
FROM ice.demo.customers.snapshots
ORDER BY committed_at DESC
LIMIT 5;""").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|5585728760866818996|2025-09-10 18:58:00.554|append   |
|6418764763303810672|2025-09-10 18:57:49.224|append   |
+-------------------+-----------------------+---------+



In [29]:
snapshots = spark.sql("""
  SELECT snapshot_id, committed_at
  FROM ice.demo.customers.snapshots
  ORDER BY committed_at DESC
""")
snapshots.show()


+-------------------+--------------------+
|        snapshot_id|        committed_at|
+-------------------+--------------------+
|5585728760866818996|2025-09-10 18:58:...|
|6418764763303810672|2025-09-10 18:57:...|
+-------------------+--------------------+



In [30]:
spark.stop()