In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergMiniProject") \
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.my_catalog.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.my_catalog.uri", "http://rest:8181") \
    .config("spark.sql.catalog.my_catalog.warehouse", "s3://warehouse/") \
    .config("spark.sql.catalog.my_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.my_catalog.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.my_catalog.s3.access-key-id", "admin") \
    .config("spark.sql.catalog.my_catalog.s3.secret-access-key", "password") \
    .config("spark.sql.catalog.my_catalog.s3.region", "us-east-1") \
    .getOrCreate()

spark

26/01/18 14:23:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
spark.sql("SHOW CATALOGS").show()


+-------------+
|      catalog|
+-------------+
|         demo|
|spark_catalog|
+-------------+



In [4]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS demo
""")


DataFrame[]

In [9]:
spark.sql("SHOW DATABASES IN demo").show()

+----------+
| namespace|
+----------+
|default_db|
+----------+



In [8]:
spark.sql("""
CREATE DATABASE demo.default_db
""")

DataFrame[]

In [10]:
spark.sql("""
CREATE TABLE demo.default_db.users (
    user_id INT,
    name STRING,
    country STRING,
    created_at TIMESTAMP
)
USING iceberg
""")


DataFrame[]

In [11]:
spark.sql("""
INSERT INTO demo.default_db.users VALUES
(1, 'Ali', 'PK', current_timestamp()),
(2, 'John', 'US', current_timestamp())
""")


                                                                                

DataFrame[]

In [12]:
spark.sql("""
SELECT * FROM demo.default_db.users
""").show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|     PK|2026-01-18 14:30:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



                                                                                

In [13]:
spark.sql("""
SELECT 
  snapshot_id,
  parent_id,
  committed_at,
  operation
FROM demo.default_db.users.snapshots
ORDER BY committed_at
""").show(truncate=False)


+-------------------+---------+----------------------+---------+
|snapshot_id        |parent_id|committed_at          |operation|
+-------------------+---------+----------------------+---------+
|1607397796109514243|NULL     |2026-01-18 14:30:21.42|append   |
+-------------------+---------+----------------------+---------+



In [14]:
spark.sql("""
SELECT 
  snapshot_id,
  parent_id,
  committed_at,
  operation
FROM demo.default_db.users.snapshots
ORDER BY committed_at
""").show(truncate=False)


+-------------------+---------+----------------------+---------+
|snapshot_id        |parent_id|committed_at          |operation|
+-------------------+---------+----------------------+---------+
|1607397796109514243|NULL     |2026-01-18 14:30:21.42|append   |
+-------------------+---------+----------------------+---------+



In [15]:
spark.sql("""
INSERT INTO demo.default_db.users VALUES
(3, 'Sara', 'UK', current_timestamp())
""")


DataFrame[]

In [16]:
spark.sql("""
UPDATE demo.default_db.users
SET country = 'UAE'
WHERE user_id = 1
""")


                                                                                

DataFrame[]

In [17]:
spark.sql("""
DELETE FROM demo.default_db.users
WHERE user_id = 2
""")


DataFrame[]

In [18]:
spark.sql("""
SELECT 
  snapshot_id,
  committed_at,
  operation
FROM demo.default_db.users.snapshots
ORDER BY committed_at
""").show(truncate=False)


+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|1607397796109514243|2026-01-18 14:30:21.42 |append   |
|872424970877289686 |2026-01-18 14:32:02.38 |append   |
|5738472431389787117|2026-01-18 14:32:16.755|overwrite|
|3969833432834730559|2026-01-18 14:32:29.795|delete   |
+-------------------+-----------------------+---------+



In [21]:
spark.sql("""
SELECT * FROM demo.default_db.users
VERSION AS OF 5738472431389787117
""").show()


+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|    UAE|2026-01-18 14:30:...|
|      3|Sara|     UK|2026-01-18 14:32:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



In [22]:
spark.sql("""
SELECT * FROM demo.default_db.users
TIMESTAMP AS OF '2026-01-18 14:31:00'
""").show()


+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|     PK|2026-01-18 14:30:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



In [23]:
spark.sql("""
SELECT * FROM demo.default_db.users
VERSION AS OF 1607397796109514243
EXCEPT
SELECT * FROM demo.default_db.users
""").show()


[Stage 16:>                                                         (0 + 1) / 1]

+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      2|John|     US|2026-01-18 14:30:...|
|      1| Ali|     PK|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



                                                                                

In [24]:
spark.sql("""
SELECT * FROM demo.default_db.users
""").show()

+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|    UAE|2026-01-18 14:30:...|
|      3|Sara|     UK|2026-01-18 14:32:...|
+-------+----+-------+--------------------+



In [25]:
spark.sql("""
DESCRIBE TABLE demo.default_db.users
""").show(truncate=False)


+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|user_id   |int      |NULL   |
|name      |string   |NULL   |
|country   |string   |NULL   |
|created_at|timestamp|NULL   |
+----------+---------+-------+



In [26]:
spark.sql("""
ALTER TABLE demo.default_db.users
ADD COLUMN email STRING
""")


DataFrame[]

In [27]:
spark.sql("""
DESCRIBE TABLE demo.default_db.users
""").show(truncate=False)


+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|user_id   |int      |NULL   |
|name      |string   |NULL   |
|country   |string   |NULL   |
|created_at|timestamp|NULL   |
|email     |string   |NULL   |
+----------+---------+-------+



In [28]:
spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)


+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|3969833432834730559|2026-01-18 14:32:29.795|delete   |
|5738472431389787117|2026-01-18 14:32:16.755|overwrite|
|872424970877289686 |2026-01-18 14:32:02.38 |append   |
|1607397796109514243|2026-01-18 14:30:21.42 |append   |
+-------------------+-----------------------+---------+



In [29]:
spark.sql("""
ALTER TABLE demo.default_db.users
RENAME COLUMN name TO full_name
""")


DataFrame[]

In [30]:
spark.sql("""
DESCRIBE TABLE demo.default_db.users
""").show(truncate=False)


+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|user_id   |int      |NULL   |
|full_name |string   |NULL   |
|country   |string   |NULL   |
|created_at|timestamp|NULL   |
|email     |string   |NULL   |
+----------+---------+-------+



In [31]:
spark.sql("""
SELECT snapshot_id, operation
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)


+-------------------+---------+
|snapshot_id        |operation|
+-------------------+---------+
|3969833432834730559|delete   |
|5738472431389787117|overwrite|
|872424970877289686 |append   |
|1607397796109514243|append   |
+-------------------+---------+



In [32]:
spark.sql("""
ALTER TABLE demo.default_db.users
ALTER COLUMN country AFTER full_name
""")


DataFrame[]

In [33]:
spark.sql("""
DESCRIBE TABLE demo.default_db.users
""").show(truncate=False)


+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|user_id   |int      |NULL   |
|full_name |string   |NULL   |
|country   |string   |NULL   |
|created_at|timestamp|NULL   |
|email     |string   |NULL   |
+----------+---------+-------+



In [34]:
spark.sql("""
SELECT user_id, full_name, country, email
FROM demo.default_db.users
""").show()


+-------+---------+-------+-----+
|user_id|full_name|country|email|
+-------+---------+-------+-----+
|      3|     Sara|     UK| NULL|
|      1|      Ali|    UAE| NULL|
+-------+---------+-------+-----+



In [35]:
spark.sql("""
SELECT * FROM demo.default_db.users
VERSION AS OF 872424970877289686
""").show()


+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      3|Sara|     UK|2026-01-18 14:32:...|
|      1| Ali|     PK|2026-01-18 14:30:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



In [36]:
spark.sql("""
SELECT * FROM demo.default_db.users
VERSION AS OF 1607397796109514243
""").show()


+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|     PK|2026-01-18 14:30:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



In [37]:
spark.sql("""
SELECT * FROM demo.default_db.users
TIMESTAMP AS OF '2026-01-18 14:31:00'
""").show()


+-------+----+-------+--------------------+
|user_id|name|country|          created_at|
+-------+----+-------+--------------------+
|      1| Ali|     PK|2026-01-18 14:30:...|
|      2|John|     US|2026-01-18 14:30:...|
+-------+----+-------+--------------------+



In [38]:
spark.sql("SELECT * FROM demo.default_db.users.snapshots").show(truncate=False)


+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                |summary                                                                                                                                                                                                                                                                  

In [39]:
spark.sql("DESCRIBE TABLE demo.default_db.users").show()


+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|   user_id|      int|   NULL|
| full_name|   string|   NULL|
|   country|   string|   NULL|
|created_at|timestamp|   NULL|
|     email|   string|   NULL|
+----------+---------+-------+



In [41]:
spark.sql("""
SELECT *
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)

+-----------------------+-------------------+-------------------+---------+-------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                |summary                                                                                                                                                                                                                                                                  

In [42]:
spark.sql("""
SELECT snapshot_id, committed_at, operation, manifest_list
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)


+-------------------+-----------------------+---------+-------------------------------------------------------------------------------------------------------------+
|snapshot_id        |committed_at           |operation|manifest_list                                                                                                |
+-------------------+-----------------------+---------+-------------------------------------------------------------------------------------------------------------+
|3969833432834730559|2026-01-18 14:32:29.795|delete   |s3://warehouse/default_db/users/metadata/snap-3969833432834730559-1-cdee9361-9641-4b44-aab5-6b510afcd1eb.avro|
|5738472431389787117|2026-01-18 14:32:16.755|overwrite|s3://warehouse/default_db/users/metadata/snap-5738472431389787117-1-ea7a1e2e-b530-447e-91d8-c105efbef4b5.avro|
|872424970877289686 |2026-01-18 14:32:02.38 |append   |s3://warehouse/default_db/users/metadata/snap-872424970877289686-1-9f364735-6083-48b7-b354-47cb3be3e328.avro |
|160

In [43]:
spark.sql("""
SELECT manifest_list
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)

+-------------------------------------------------------------------------------------------------------------+
|manifest_list                                                                                                |
+-------------------------------------------------------------------------------------------------------------+
|s3://warehouse/default_db/users/metadata/snap-3969833432834730559-1-cdee9361-9641-4b44-aab5-6b510afcd1eb.avro|
|s3://warehouse/default_db/users/metadata/snap-5738472431389787117-1-ea7a1e2e-b530-447e-91d8-c105efbef4b5.avro|
|s3://warehouse/default_db/users/metadata/snap-872424970877289686-1-9f364735-6083-48b7-b354-47cb3be3e328.avro |
|s3://warehouse/default_db/users/metadata/snap-1607397796109514243-1-91668742-6b4f-46e2-9db3-fdc593103966.avro|
+-------------------------------------------------------------------------------------------------------------+



In [44]:
spark.sql("""
SELECT summary
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|summary                                                                                                                                                                                                                                                                                                                                                                    |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [45]:
spark.sql("""
SELECT
    committed_at,
    snapshot_id,
    parent_id,
    operation,
    summary['added-records'] AS added_records,
    summary['deleted-records'] AS deleted_records,
    summary['total-records'] AS total_records,
    summary['added-data-files'] AS added_files,
    summary['deleted-data-files'] AS deleted_files
FROM demo.default_db.users.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)


+-----------------------+-------------------+-------------------+---------+-------------+---------------+-------------+-----------+-------------+
|committed_at           |snapshot_id        |parent_id          |operation|added_records|deleted_records|total_records|added_files|deleted_files|
+-----------------------+-------------------+-------------------+---------+-------------+---------------+-------------+-----------+-------------+
|2026-01-18 14:32:29.795|3969833432834730559|5738472431389787117|delete   |NULL         |1              |2            |NULL       |1            |
|2026-01-18 14:32:16.755|5738472431389787117|872424970877289686 |overwrite|1            |1              |3            |1          |1            |
|2026-01-18 14:32:02.38 |872424970877289686 |1607397796109514243|append   |1            |NULL           |3            |1          |NULL         |
|2026-01-18 14:30:21.42 |1607397796109514243|NULL               |append   |2            |NULL           |2            |2    

In [46]:
snapshot_df = spark.table("demo.default_db.users.snapshots")
summary_keys = ["added-records", "deleted-records", "total-records", "added-data-files", "deleted-data-files"]
for key in summary_keys:
    snapshot_df = snapshot_df.withColumn(key.replace("-", "_"), snapshot_df.summary[key])
snapshot_df.select("committed_at", "snapshot_id", "operation", *[k.replace("-", "_") for k in summary_keys]).show(truncate=False)


TypeError: 'method' object is not subscriptable

In [47]:
from pyspark.sql.functions import col

# Load snapshots table
snapshot_df = spark.table("demo.default_db.users.snapshots")

# Keys you want to extract from summary map
summary_keys = ["added-records", "deleted-records", "total-records", "added-data-files", "deleted-data-files"]

# Extract each key using getItem()
for key in summary_keys:
    snapshot_df = snapshot_df.withColumn(key.replace("-", "_"), col("summary").getItem(key))

# Select columns and show neatly
snapshot_df.select(
    "committed_at",
    "snapshot_id",
    "parent_id",
    "operation",
    *[k.replace("-", "_") for k in summary_keys],
    "manifest_list"
).orderBy(col("committed_at").desc()).show(truncate=False)


+-----------------------+-------------------+-------------------+---------+-------------+---------------+-------------+----------------+------------------+-------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|added_records|deleted_records|total_records|added_data_files|deleted_data_files|manifest_list                                                                                                |
+-----------------------+-------------------+-------------------+---------+-------------+---------------+-------------+----------------+------------------+-------------------------------------------------------------------------------------------------------------+
|2026-01-18 14:32:29.795|3969833432834730559|5738472431389787117|delete   |NULL         |1              |2            |NULL            |1                 |s3://warehouse/default_db/users/metadata/snap-3