#Load Existing Delta Table

In [0]:
from delta.tables import DeltaTable

delta_path = "/Volumes/workspace/ecommerce/ecommerce_delta"

deltaTable = DeltaTable.forPath(spark, delta_path)

print("Delta table loaded successfully")

Delta table loaded successfully


#Create Incremental Update Data (MERGE Source)

In [0]:
from pyspark.sql import Row
from datetime import datetime

updates_data = [
    Row(
        user_session="sess_101",
        user_id=1001,
        event_type="purchase",
        event_time=datetime(2024, 1, 10, 12, 30),
        product_id=501,
        price=120.0
    ),
    Row(
        user_session="sess_202",
        user_id=1002,
        event_type="cart",
        event_time=datetime(2024, 1, 10, 13, 15),
        product_id=305,
        price=75.5
    )
]

updates_df = spark.createDataFrame(updates_data)

updates_df.show(truncate=False)


+------------+-------+----------+-------------------+----------+-----+
|user_session|user_id|event_type|event_time         |product_id|price|
+------------+-------+----------+-------------------+----------+-----+
|sess_101    |1001   |purchase  |2024-01-10 12:30:00|501       |120.0|
|sess_202    |1002   |cart      |2024-01-10 13:15:00|305       |75.5 |
+------------+-------+----------+-------------------+----------+-----+



#Perform Incremental MERGE (Upsert into Delta Table)

In [0]:
deltaTable.alias("t").merge(
    updates_df.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdate(set={
    "user_id": "s.user_id",
    "event_type": "s.event_type",
    "product_id": "s.product_id",
    "price": "s.price"
}).whenNotMatchedInsert(values={
    "user_session": "s.user_session",
    "user_id": "s.user_id",
    "event_type": "s.event_type",
    "event_time": "s.event_time",
    "product_id": "s.product_id",
    "price": "s.price",
    "category_id": "NULL"
}).execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

#Inspect Delta Table History (Version Log)

In [0]:
spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").show(truncate=False)


+-------+-------------------+--------------+-----------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+------------------------+-----------+-----------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#Time Travel by Version (Pre-MERGE Snapshot)

In [0]:
v0_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path)

v0_df.count()
v0_df.filter("user_session IN ('sess_101', 'sess_202')").show()



67501979

#Time Travel by Timestamp (Point-in-Time Query)

In [0]:
before_merge_df = spark.read.format("delta") \
    .option("timestampAsOf", "2026-01-13 10:05:00") \
    .load(delta_path)

before_merge_df.count()
before_merge_df.filter("user_session IN ('sess_101', 'sess_202')").show()



67501979

#OPTIMIZE Delta Table (File Compaction)

In [0]:
spark.sql(f"""
OPTIMIZE delta.`{delta_path}`
""")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

#OPTIMIZE with ZORDER (Query-Aware Data Layout)

In [0]:
spark.sql(f"""
OPTIMIZE delta.`{delta_path}`
ZORDER BY (event_type, user_id)
""")


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

#VACUUM Delta Table (Cleanup Old Files)

In [0]:
spark.sql(f"""
VACUUM delta.`{delta_path}` RETAIN 168 HOURS
""")

DataFrame[path: string]