---
## STEP 1: SCHEMA EVOLUTION Demo

Simulate adding new column `payment_method` to source data.
Iceberg handles schema changes automatically!

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

try:
    spark.stop()
    print("✅ Stop previous SparkSession.")
except NameError:
    pass

spark = SparkSession.builder \
    .appName("Lakehouse ETL eCommerces data") \
    .getOrCreate()
print("✅Started new sparksession.")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/04 16:31:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/04 16:31:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
26/02/04 16:31:39 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


✅Started new sparksession.


In [2]:
# Add new column to Bronze table (Schema Evolution)
spark.sql("""
    ALTER TABLE lakehouse.bronze.ecommerce_events 
    ADD COLUMN payment_method STRING
""")

print("Column 'payment_method' added!")
spark.sql("DESCRIBE lakehouse.bronze.ecommerce_events").show(truncate=False)

AnalysisException: [FIELDS_ALREADY_EXISTS] Cannot add column, because `payment_method` already exists in "STRUCT<event_time: TIMESTAMP, event_type: STRING, product_id: INT, category_id: BIGINT, category_code: STRING, brand: STRING, price: DOUBLE, user_id: INT, user_session: STRING, _ingestion_time: TIMESTAMP, _source_file: STRING, payment_method: STRING>".; line 2 pos 4;
AddColumns [QualifiedColType(None,payment_method,StringType,true,None,None,None)]
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@9a49fec, bronze.ecommerce_events, lakehouse.bronze.ecommerce_events, [event_time#0, event_type#1, product_id#2, category_id#3L, category_code#4, brand#5, price#6, user_id#7, user_session#8, _ingestion_time#9, _source_file#10, payment_method#11]


In [16]:


# Insert new data with the new column (simulating Day T+1 data)
from datetime import datetime

new_data = [
    (datetime(2019, 10, 2, 10, 0, 0), "purchase", 12345, 1234567890, "electronics.phone",
     "samsung", 599.99, 100001, "new-session-001", datetime.now(), "demo_day2.csv", "credit_card"),

    (datetime(2019, 10, 2, 11, 0, 0), "purchase", 12346, 1234567890, "electronics.phone",
     "apple", 999.99, 100002, "new-session-002", datetime.now(), "demo_day2.csv", "paypal"),
]


schema = spark.table("lakehouse.bronze.ecommerce_events").schema
df_new = spark.createDataFrame(new_data, schema)

df_new.writeTo("lakehouse.bronze.ecommerce_events").append()

# Verify - old data has NULL for payment_method, new data has values
spark.sql("""
    SELECT event_time, brand, price, payment_method 
    FROM lakehouse.bronze.ecommerce_events 
    WHERE payment_method IS NOT NULL OR brand = 'samsung'
    LIMIT 10
""").show(truncate=False)

                                                                                

+-------------------+-------+------+--------------+
|event_time         |brand  |price |payment_method|
+-------------------+-------+------+--------------+
|2019-10-02 10:00:00|samsung|599.99|credit_card   |
|2019-10-02 11:00:00|apple  |999.99|paypal        |
+-------------------+-------+------+--------------+



In [17]:
print(schema)

StructType([StructField('event_time', TimestampType(), True), StructField('event_type', StringType(), True), StructField('product_id', IntegerType(), True), StructField('category_id', LongType(), True), StructField('category_code', StringType(), True), StructField('brand', StringType(), True), StructField('price', DoubleType(), True), StructField('user_id', IntegerType(), True), StructField('user_session', StringType(), True), StructField('_ingestion_time', TimestampType(), True), StructField('_source_file', StringType(), True), StructField('payment_method', StringType(), True)])


---
## STEP 2: TIME TRAVEL Demo

Query historical snapshots of data.

In [19]:
# View table history
print("=== TABLE HISTORY ===")
spark.sql("SELECT * FROM lakehouse.bronze.ecommerce_events.history").show(truncate=False)

=== TABLE HISTORY ===
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2026-01-30 04:10:44.685|9127348836860691094|NULL               |true               |
|2026-01-30 04:19:26.313|6471009077848248100|9127348836860691094|true               |
+-----------------------+-------------------+-------------------+-------------------+



In [20]:
# View snapshots
print("=== SNAPSHOTS ===")
snapshots_df = spark.sql("SELECT snapshot_id, committed_at, operation FROM lakehouse.bronze.ecommerce_events.snapshots")
snapshots_df.show(truncate=False)

# Get first snapshot ID for time travel
first_snapshot = snapshots_df.orderBy("committed_at").first()["snapshot_id"]
print(f"First snapshot ID: {first_snapshot}")

=== SNAPSHOTS ===
+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|8345581712853693656|2026-01-30 04:04:51.168|append   |
|8961319236875262264|2026-01-30 04:08:08.245|append   |
|7909682360207467037|2026-01-30 04:09:21.43 |append   |
|4160572160808099697|2026-01-30 04:10:06.081|append   |
|9127348836860691094|2026-01-30 04:10:44.685|append   |
|6471009077848248100|2026-01-30 04:19:26.313|append   |
+-------------------+-----------------------+---------+

First snapshot ID: 8345581712853693656


In [21]:
#Cột count_at_snapshot là gì

# Query data at first snapshot (before schema evolution)
print("=== DATA AT FIRST SNAPSHOT (before new column) ===")
spark.sql(f"""
    SELECT COUNT(*) as count_at_snapshot   
    FROM lakehouse.bronze.ecommerce_events 
    VERSION AS OF {first_snapshot}
""").show()

print("=== CURRENT DATA (after inserts) ===")
spark.sql("SELECT COUNT(*) as current_count FROM lakehouse.bronze.ecommerce_events").show()

=== DATA AT FIRST SNAPSHOT (before new column) ===
+-----------------+
|count_at_snapshot|
+-----------------+
|          3533286|
+-----------------+

=== CURRENT DATA (after inserts) ===
+-------------+
|current_count|
+-------------+
|      4264754|
+-------------+

