## Start Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("spark://iceberg-sandbox-spark-master:7077")
    .appName("iceberg-sandbox")
    .config("spark.driver.host", "iceberg-sandbox-jupyter")
    .config("spark.driver.bindAddress", "0.0.0.0")
    .config("spark.driver.port", "7078")
    .config("spark.blockManager.port", "7079")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.iceberg_jdbc", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg_jdbc.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog")
    .config("spark.sql.catalog.iceberg_jdbc.uri", "jdbc:postgresql://iceberg-sandbox-postgres:5432/iceberg-jdbc-catalog")
    .config("spark.sql.catalog.iceberg_jdbc.jdbc.user", "iceberg")
    .config("spark.sql.catalog.iceberg_jdbc.jdbc.password", "iceberg")
    .config("spark.sql.catalog.iceberg_jdbc.jdbc.driver", "org.postgresql.Driver")
    .config("spark.sql.catalog.iceberg_jdbc.warehouse", "s3a://iceberg/warehouse")
    .config("spark.hadoop.fs.s3a.endpoint", "http://iceberg-sandbox-minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "1g")
    .config("spark.cores.max", "1")
    .config("spark.ui.showConsoleProgress", "true")
    .getOrCreate()
)


## Iceberg Features: 1. Time Travel

### Check Snapshots

In [2]:
spark.sql("""
SELECT * 
FROM iceberg_jdbc.ecommerce.orders.snapshots
""").show(truncate=False)

+-----------------------+-------------------+---------+---------+----------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id|operation|manifest_list                                                                                                         |summary                                                                                                                                                                                                                                                                                                           |
+---------------

### Delete data

In [3]:
spark.sql("""DELETE FROM iceberg_jdbc.ecommerce.orders
WHERE order_status = 'canceled';""")

DataFrame[]

### Check Available Snapshots

In [4]:
spark.sql("""SELECT snapshot_id, committed_at, operation
FROM iceberg_jdbc.ecommerce.orders.snapshots
ORDER BY committed_at desc;
""").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|3487823717667428865|2026-02-23 06:59:37.708|overwrite|
|7428214545948977217|2026-02-23 06:58:02.876|append   |
|546889620057850498 |2026-02-23 04:40:24.048|append   |
|6223667570380203935|2026-02-23 04:38:24.072|append   |
+-------------------+-----------------------+---------+



In [10]:
df_snapshot = spark.sql("""
SELECT snapshot_id
FROM (
    SELECT 
        snapshot_id,
        committed_at,
        operation,
        ROW_NUMBER() OVER (ORDER BY committed_at DESC) as rn
    FROM iceberg_jdbc.ecommerce.orders.snapshots
) t
WHERE rn = 2
""")

# Extract value into Python variable
snapshot_id = df_snapshot.collect()[0]["snapshot_id"]

print(snapshot_id)

7428214545948977217


### Check the snapshot data

In [11]:
spark.sql(f"""SELECT * FROM iceberg_jdbc.ecommerce.orders VERSION AS OF {snapshot_id}""").show()

+------------+------------+------------+------------------------+-------------------+-------------------------+-----------------------------+
|    order_id| customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_timestamp|order_estimated_delivery_date|
+------------+------------+------------+------------------------+-------------------+-------------------------+-----------------------------+
|Axfy13Hk4PIk|hCT0x9JiGXBQ|   delivered|     2017-10-22 18:57:54|2017-10-22 19:14:13|      2017-10-26 22:19:52|                   2017-11-09|
|v6px92oS8cLG|PxA7fv9spyhx|   delivered|     2018-06-20 21:40:31|2018-06-20 22:20:20|      2018-07-03 22:51:22|                   2018-07-24|
|Ulpf9skrhjfm|g3nXeJkGI0Qw|   delivered|     2018-02-16 16:19:31|2018-02-17 16:15:35|      2018-02-27 01:29:50|                   2018-03-08|
|bwJVWupf2keN|EOEsCQ6QlpIg|   delivered|     2018-08-18 18:04:29|2018-08-18 18:15:16|      2018-08-27 20:03:51|                   2018-09-19|
|Dd0Qn

### Rollback the table

In [12]:
spark.sql(f"""
CALL iceberg_jdbc.system.rollback_to_snapshot(
  table => 'ecommerce.orders',
  snapshot_id => {snapshot_id}
)
""")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

### Check the table Again

In [13]:
spark.sql("""
SELECT * from iceberg_jdbc.ecommerce.orders where order_status = 'canceled'
""").show(truncate=False)

+------------+------------+------------+------------------------+-------------------+-------------------------+-----------------------------+
|order_id    |customer_id |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_timestamp|order_estimated_delivery_date|
+------------+------------+------------+------------------------+-------------------+-------------------------+-----------------------------+
|P5R6jr1qZdh4|FrEvnEiMKGpr|canceled    |2017-07-24 11:38:43     |2017-07-24 11:50:18|NULL                     |2017-08-07                   |
|2HQ26ShSPhCA|uvuFFZDOAlU7|canceled    |2017-07-29 12:56:17     |2017-07-29 13:05:18|NULL                     |2017-08-18                   |
|1By8LOosrvF2|grsbZ5L1H5ty|canceled    |2017-11-06 15:47:20     |2017-11-07 07:30:29|NULL                     |2017-11-28                   |
|70G4cTVwm38h|WnCydkx96ul3|canceled    |2018-07-04 16:05:56     |2018-07-06 02:55:16|NULL                     |2018-08-22                   |
|aDxB5

### Check Iceberg Time Travel Retention (Currently Unlimited / Default) 

In [14]:
spark.sql("""
SHOW TBLPROPERTIES iceberg_jdbc.ecommerce.orders;
""").show(truncate=False)

+-------------------------------+-------------------+
|key                            |value              |
+-------------------------------+-------------------+
|current-snapshot-id            |7428214545948977217|
|format                         |iceberg/parquet    |
|format-version                 |2                  |
|write.parquet.compression-codec|zstd               |
+-------------------------------+-------------------+



### Set the Snapshots Time Travel Retention

- Keep snapshots up to 7 days old
- Always keep at least 5 snapshots
- Even if older than 7 days

In [15]:
spark.sql("""
ALTER TABLE iceberg_jdbc.ecommerce.orders
SET TBLPROPERTIES (
  'history.expire.max-snapshot-age-ms'='604800000',
  'history.expire.min-snapshots-to-keep'='5'
);
""")

DataFrame[]

### Check Iceberg Time Travel Retention (Altered)

In [16]:
spark.sql("""
SHOW TBLPROPERTIES iceberg_jdbc.ecommerce.orders;
""").show(truncate=False)

+------------------------------------+-------------------+
|key                                 |value              |
+------------------------------------+-------------------+
|current-snapshot-id                 |7428214545948977217|
|format                              |iceberg/parquet    |
|format-version                      |2                  |
|history.expire.max-snapshot-age-ms  |604800000          |
|history.expire.min-snapshots-to-keep|5                  |
|write.parquet.compression-codec     |zstd               |
+------------------------------------+-------------------+



### Enforce / Apply the Retention Time

Iceberg doesnt automatically delete expired snapshot, it should be run manually or automatically via scheduler. and it cant execute the query one for all.

In [17]:
spark.sql("""
CALL iceberg_jdbc.system.expire_snapshots(
  table => 'ecommerce.orders'
)
""").show(truncate=False)

+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|deleted_statistics_files_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|0                       |0                                  |0                                  |0                           |0                           |0                             |
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+



### Enforece / Apply the Snapshot Retention For All Iceberg Table

In [18]:
schemas = spark.sql("SHOW DATABASES IN iceberg_jdbc").collect()

for s in schemas:
    schema = s.namespace

    tables = spark.sql(f"SHOW TABLES IN iceberg_jdbc.{schema}").collect()

    for t in tables:
        table_name = f"{schema}.{t.tableName}"

        spark.sql(f"""
            CALL iceberg_jdbc.system.expire_snapshots(
              table => '{table_name}'
            )
        """)


## Iceberg Features: 2. Merge

### Create Temp Data

In [19]:
spark.sql("""
CREATE OR REPLACE TEMP VIEW orders_stage AS
SELECT
  'P5R6jr1qZdh4' AS order_id,
  'cust_001' AS customer_id,
  'delivered' AS order_status,
  current_timestamp() AS order_purchase_timestamp,
  current_timestamp() AS order_approved_at,
  current_timestamp() AS order_delivered_timestamp,
  current_date() AS order_estimated_delivery_date

UNION ALL

SELECT
  'NEW_ORDER_123' AS order_id,
  'cust_999' AS customer_id,
  'processing' AS order_status,
  current_timestamp() AS order_purchase_timestamp,
  current_timestamp() AS order_approved_at,
  NULL AS order_delivered_timestamp,
  current_date() AS order_estimated_delivery_date
""")

spark.sql("SELECT * FROM orders_stage").show(truncate=False)


+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+
|order_id     |customer_id|order_status|order_purchase_timestamp  |order_approved_at         |order_delivered_timestamp |order_estimated_delivery_date|
+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+
|P5R6jr1qZdh4 |cust_001   |delivered   |2026-02-23 07:05:23.164715|2026-02-23 07:05:23.164715|2026-02-23 07:05:23.164715|2026-02-23                   |
|NEW_ORDER_123|cust_999   |processing  |2026-02-23 07:05:23.164715|2026-02-23 07:05:23.164715|NULL                      |2026-02-23                   |
+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+



### UPSERT Using MERGE

In [20]:
spark.sql("""
MERGE INTO iceberg_jdbc.ecommerce.orders t
USING orders_stage s
ON t.order_id = s.order_id

WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")


DataFrame[]

### Check Upserted Data

In [21]:
spark.sql("SELECT * FROM iceberg_jdbc.ecommerce.orders where order_id in ('P5R6jr1qZdh4', 'NEW_ORDER_123')").show(truncate=False)

+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+
|order_id     |customer_id|order_status|order_purchase_timestamp  |order_approved_at         |order_delivered_timestamp |order_estimated_delivery_date|
+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+
|NEW_ORDER_123|cust_999   |processing  |2026-02-23 07:05:29.541247|2026-02-23 07:05:29.541247|NULL                      |2026-02-23                   |
|P5R6jr1qZdh4 |cust_001   |delivered   |2026-02-23 07:05:29.541247|2026-02-23 07:05:29.541247|2026-02-23 07:05:29.541247|2026-02-23                   |
+-------------+-----------+------------+--------------------------+--------------------------+--------------------------+-----------------------------+



### Check the Latest Snapshot

In [23]:
spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM iceberg_jdbc.ecommerce.orders.snapshots
ORDER BY committed_at DESC;
""").show(truncate=False)

+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|6977739499014344028|2026-02-23 07:05:32.408|overwrite|
|3487823717667428865|2026-02-23 06:59:37.708|overwrite|
|7428214545948977217|2026-02-23 06:58:02.876|append   |
|546889620057850498 |2026-02-23 04:40:24.048|append   |
|6223667570380203935|2026-02-23 04:38:24.072|append   |
+-------------------+-----------------------+---------+



In [24]:
df_snapshot = spark.sql("""
SELECT snapshot_id
FROM (
    SELECT 
        snapshot_id,
        committed_at,
        operation,
        ROW_NUMBER() OVER (ORDER BY committed_at DESC) as rn
    FROM iceberg_jdbc.ecommerce.orders.snapshots
) t
WHERE rn = 3
""")

# Extract value into Python variable
snapshot_id = df_snapshot.collect()[0]["snapshot_id"]

print(snapshot_id)

7428214545948977217


### Rollback the table snapshot

In [25]:
spark.sql(f"""
CALL iceberg_jdbc.system.rollback_to_snapshot(
  table => 'ecommerce.orders',
  snapshot_id => {snapshot_id}
)
""").show(truncate=False)

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
|6977739499014344028 |7428214545948977217|
+--------------------+-------------------+



## Iceberg Features: 3. Schema Evolution

In [None]:
##spark.stop()