## Home work
Use icerberg to create and update product table, you can create some sample records for updating products.

In [16]:
%session_id_prefix native-iceberg-sql-
%glue_version 5.0
%number_of_workers 2
%worker_type G.1X
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

Setting session ID prefix to native-iceberg-sql-
Setting Glue version to: 5.0
Previous number of workers: 2
Setting new number of workers to: 2
Previous worker type: G.1X
Setting new worker type to: G.1X
Current idle_timeout is 60 minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions', '--datalake-formats': 'iceberg'}


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergGlueExample") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://imba-chien/iceberg-warehouse/") \
    .getOrCreate()

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Idle Timeout: 60
Session ID: native-iceberg-sql--0edfbc7f-45b8-4028-b878-01228c3e9efb
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--datalake-formats iceberg
Waiting for session native-iceberg-sql--0edfbc7f-45b8-4028-b878-01228c3e9efb to get into ready status...
Session native-iceberg-sql--0edfbc7f-45b8-4028-b878-01228c3e9efb has been created.



In [10]:
products_df = spark.read.table(f"spark_catalog.imba.products").filter("product_id IS NOT NULL")




In [12]:
products_df.show(5, truncate=False)

+----------+-----------------------------------------------------------------+--------+-------------+
|product_id|product_name                                                     |aisle_id|department_id|
+----------+-----------------------------------------------------------------+--------+-------------+
|1         |Chocolate Sandwich Cookies                                       |61      |19           |
|2         |All-Seasons Salt                                                 |104     |13           |
|3         |Robust Golden Unsweetened Oolong Tea                             |94      |7            |
|4         |Smart Ones Classic Favorites Mini Rigatoni With Vodka Cream Sauce|38      |1            |
|5         |Green Chile Anytime Sauce                                        |5       |13           |
+----------+-----------------------------------------------------------------+--------+-------------+
only showing top 5 rows


In [19]:
import time
from pyspark.sql.functions import lit
ut = time.time()

products_df = products_df.withColumn("updated_at", lit(ut))
products_df.show(5, truncate=False)


+----------+-----------------------------------------------------------------+--------+-------------+--------------------+
|product_id|product_name                                                     |aisle_id|department_id|updated_at          |
+----------+-----------------------------------------------------------------+--------+-------------+--------------------+
|1         |Chocolate Sandwich Cookies                                       |61      |19           |1.7525519196447296E9|
|2         |All-Seasons Salt                                                 |104     |13           |1.7525519196447296E9|
|3         |Robust Golden Unsweetened Oolong Tea                             |94      |7            |1.7525519196447296E9|
|4         |Smart Ones Classic Favorites Mini Rigatoni With Vodka Cream Sauce|38      |1            |1.7525519196447296E9|
|5         |Green Chile Anytime Sauce                                        |5       |13           |1.7525519196447296E9|
+----------+----

In [20]:
products_df.writeTo("glue_catalog.imba_iceberg.products_iceberg") \
    .using("iceberg") \
    .tableProperty("format-version", "2") \
    .createOrReplace()




In [21]:
%%sql
SELECT * FROM glue_catalog.imba_iceberg.products_iceberg

+----------+--------------------+--------+-------------+--------------------+
|product_id|        product_name|aisle_id|department_id|          updated_at|
+----------+--------------------+--------+-------------+--------------------+
|         1|Chocolate Sandwic...|      61|           19|1.7525519196447296E9|
|         2|    All-Seasons Salt|     104|           13|1.7525519196447296E9|
|         3|Robust Golden Uns...|      94|            7|1.7525519196447296E9|
|         4|Smart Ones Classi...|      38|            1|1.7525519196447296E9|
|         5|Green Chile Anyti...|       5|           13|1.7525519196447296E9|
|         6|        Dry Nose Oil|      11|           11|1.7525519196447296E9|
|         7|Pure Coconut Wate...|      98|            7|1.7525519196447296E9|
|         8|Cut Russet Potato...|     116|            1|1.7525519196447296E9|
|         9|Light Strawberry ...|     120|           16|1.7525519196447296E9|
|        10|Sparkling Orange ...|     115|            7|1.752551

## Upsert records into Iceberg table

In [22]:
from pyspark.sql import Row

ut = time.time()

product_updates = [
    {'product_id': 31, 'product_name': 'Heater', 'aisle_id': 17, 'department_id': 19, 'updated_at': ut}, # Update
    {'product_id': 32, 'product_name': 'Chair', 'aisle_id': 131, 'department_id': 19, 'updated_at': ut} # Insert
]
df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)




In [23]:
df_product_updates.createOrReplaceTempView(f"tmp_products_updates")




In [25]:
query = f"""
MERGE INTO glue_catalog.imba_iceberg.products_iceberg AS t
USING (SELECT * FROM tmp_products_updates) AS u
ON t.product_id = u.product_id
WHEN MATCHED THEN UPDATE SET t.product_name = u.product_name, t.aisle_id = u.aisle_id, t.department_id = u.department_id, t.updated_at = u.updated_at
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(query)

DataFrame[]


In [34]:
%%sql
SELECT * FROM glue_catalog.imba_iceberg.products_iceberg WHERE product_id == 31 OR product_id == 32

+----------+------------+--------+-------------+--------------------+
|product_id|product_name|aisle_id|department_id|          updated_at|
+----------+------------+--------+-------------+--------------------+
|        31|      Heater|      17|           19|1.7525520311724865E9|
|        32|       Chair|     131|           19|1.7525520311724865E9|
+----------+------------+--------+-------------+--------------------+


In [35]:
%%sql
DELETE FROM glue_catalog.imba_iceberg.products_iceberg WHERE product_id == 31

++
||
++
++


In [36]:
%%sql
SELECT * FROM glue_catalog.imba_iceberg.products_iceberg WHERE product_id > 30 AND product_id < 35

+----------+--------------------+--------+-------------+--------------------+
|product_id|        product_name|aisle_id|department_id|          updated_at|
+----------+--------------------+--------+-------------+--------------------+
|        33|Organic Spaghetti...|     131|            9|1.7525519196447296E9|
|        34|Peanut Butter Cereal|     121|           14|1.7525519196447296E9|
|        32|               Chair|     131|           19|1.7525520311724865E9|
+----------+--------------------+--------+-------------+--------------------+


## View History and Snapshots

In [37]:
%%sql
SELECT * FROM glue_catalog.imba_iceberg.products_iceberg.history

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2025-07-15 03:53:...|4784070957029174251|               NULL|              false|
|2025-07-15 03:59:...|3377752431076810215|               NULL|               true|
|2025-07-15 04:03:...|8525553201731189683|3377752431076810215|               true|
|2025-07-15 04:08:...|2771212789558647808|8525553201731189683|               true|
+--------------------+-------------------+-------------------+-------------------+


In [38]:
%%sql
SELECT * FROM glue_catalog.imba_iceberg.products_iceberg.snapshots

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-07-15 03:53:...|4784070957029174251|               NULL|   append|s3://imba-chien/i...|{spark.app.id -> ...|
|2025-07-15 03:59:...|3377752431076810215|               NULL|   append|s3://imba-chien/i...|{spark.app.id -> ...|
|2025-07-15 04:03:...|8525553201731189683|3377752431076810215|overwrite|s3://imba-chien/i...|{spark.app.id -> ...|
|2025-07-15 04:08:...|2771212789558647808|8525553201731189683|overwrite|s3://imba-chien/i...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+


In [39]:
%%sql
SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"] 
FROM glue_catalog.test.products_iceberg.history h 
JOIN glue_catalog.test.products_iceberg.snapshots s  
ON h.snapshot_id = s.snapshot_id 
ORDER BY made_current_at

+--------------------+---------+-------------------+-------------------+---------------------+
|     made_current_at|operation|        snapshot_id|is_current_ancestor|summary[spark.app.id]|
+--------------------+---------+-------------------+-------------------+---------------------+
|2025-07-15 03:32:...|   append|4023877369599476016|               true| spark-application...|
|2025-07-15 03:33:...|overwrite|3428421400707676683|               true| spark-application...|
|2025-07-15 03:35:...|overwrite|4928947529892970448|               true| spark-application...|
+--------------------+---------+-------------------+-------------------+---------------------+


In [18]:
%stop_session

Stopping session: native-iceberg-sql--0edfbc7f-45b8-4028-b878-01228c3e9efb
Stopped session.
