In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from datetime import datetime
from pyspark.sql import Row

In [2]:
import os
import sys
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/jars/org.apache.iceberg_iceberg-spark-runtime-3.5_2.12-1.6.1.jar pyspark-shell'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


from infra import get_spark_session

# Constant
CATALOG_NAME = 'optimus'
SCHEMA_NAME = 'bronze'
TABLE_NAME = 'customers'


# Initialize Spark with Iceberg Configuration, including Jar files and Catalog name
# NOTE: This config is used for Hadoop local file. If you want to use Iceberg in Object Storage, let's read the official document
def build_iceberg_conf(kwargs={}):
    catalog_name = kwargs.get("catalog_name", CATALOG_NAME)
    return (
        SparkConf()
        .setAppName("Thesis's Application")
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set(f'spark.sql.catalog.{catalog_name}', 'org.apache.iceberg.spark.SparkCatalog')
        .set(f'spark.sql.catalog.{catalog_name}.type', 'hadoop')
        .set(f'spark.sql.catalog.{catalog_name}.warehouse', '/home/jovyan/warehouse')
        # .set(f'spark.sql.catalog.{catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    )
spark = get_spark_session(build_iceberg_conf())

In [3]:
'''
    Part 1: Create Iceberg table in Local environment
'''
# This is the DDL for Demo table
iceberg_tbl = """
CREATE TABLE IF NOT EXISTS `{catalog_name}`.`{schema_name}`.`{table_name}`
    ( 
        customer_id BIGINT,
        name        STRING,
        email       STRING,
        country     STRING
    )
USING iceberg
PARTITIONED BY (`country`);
"""

spark.sql(iceberg_tbl.format(
        catalog_name=CATALOG_NAME,
        schema_name=SCHEMA_NAME,
        table_name=TABLE_NAME)
).show()

# Use Catalog optimus.bronze
spark.sql('USE optimus.bronze;').show()

++
||
++
++



In [17]:
spark.sql('select * from optimus.bronze.customers.history').show()

+---------------+-----------+---------+-------------------+
|made_current_at|snapshot_id|parent_id|is_current_ancestor|
+---------------+-----------+---------+-------------------+
+---------------+-----------+---------+-------------------+



In [19]:
spark.sql('''
    INSERT INTO optimus.bronze.customers VALUES 
        (1, 'Alice', 'alice@example.com', 'US'),
        (2, 'Bob',   'bob@example.com',   'CA'),
        (3, 'Carlos','carlos@example.com','US');
''')

DataFrame[]

In [20]:
# Query history
spark.sql('select * from optimus.bronze.customers.history').show()

+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2025-05-18 01:54:...|2965786013090896756|     NULL|               true|
+--------------------+-------------------+---------+-------------------+



In [21]:
# Query Snapshot
spark.sql('select * from optimus.bronze.customers.snapshots').show()

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2025-05-18 01:54:...|2965786013090896756|     NULL|   append|/home/jovyan/ware...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



In [24]:
# Query data file
spark.sql('select file_path, partition, record_count from optimus.bronze.customers.files').show()

+--------------------+---------+------------+
|           file_path|partition|record_count|
+--------------------+---------+------------+
|/home/jovyan/ware...|     {US}|           2|
|/home/jovyan/ware...|     {CA}|           1|
+--------------------+---------+------------+



In [25]:
# Upsert
spark.sql('''
MERGE INTO optimus.bronze.customers t
USING (
  SELECT 1 AS customer_id, 'Alice' AS name, 'alice@newdomain.com' AS email, 'US' AS country  -- updated Alice
  UNION ALL
  SELECT 4, 'Diana', 'diana@example.com', 'UK'                                              -- new customer
) s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED THEN 
  INSERT *;
''')

DataFrame[]

In [26]:
spark.sql('''SELECT customer_id, name, email, country FROM optimus.bronze.customers;''').show()

+-----------+------+-------------------+-------+
|customer_id|  name|              email|country|
+-----------+------+-------------------+-------+
|          2|   Bob|    bob@example.com|     CA|
|          4| Diana|  diana@example.com|     UK|
|          1| Alice|alice@newdomain.com|     US|
|          3|Carlos| carlos@example.com|     US|
+-----------+------+-------------------+-------+



In [32]:
# Query snapshot
spark.sql('select snapshot_id, parent_id, operation from optimus.bronze.customers.snapshots').show()

+-------------------+-------------------+---------+
|        snapshot_id|          parent_id|operation|
+-------------------+-------------------+---------+
|2965786013090896756|               NULL|   append|
|8003928851978538073|2965786013090896756|overwrite|
+-------------------+-------------------+---------+



In [33]:
# Row level deletion
spark.sql('''
DELETE FROM customers 
WHERE customer_id = 2;
''').show()

++
||
++
++



In [34]:
# Query snapshot
spark.sql('select snapshot_id, parent_id, operation from optimus.bronze.customers.snapshots').show()

+-------------------+-------------------+---------+
|        snapshot_id|          parent_id|operation|
+-------------------+-------------------+---------+
|2965786013090896756|               NULL|   append|
|8003928851978538073|2965786013090896756|overwrite|
|1334965724800134053|8003928851978538073|   delete|
+-------------------+-------------------+---------+



In [35]:
spark.sql('''SELECT customer_id, name, country 
FROM optimus.bronze.customers VERSION AS OF 8003928851978538073;''').show()

+-----------+------+-------+
|customer_id|  name|country|
+-----------+------+-------+
|          4| Diana|     UK|
|          1| Alice|     US|
|          3|Carlos|     US|
|          2|   Bob|     CA|
+-----------+------+-------+

