In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lit
from delta.tables import DeltaTable
from pyspark.sql.functions import col, row_number

In [0]:
# Step 1: Read CSV
oct_df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

In [0]:
# Step 2: Set Delta table path
delta_path = "/dbfs/tmp/ecommerce_events"

In [0]:
# Step 3: Save as Delta table (overwrite if exists)
oct_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ecommerce_events")

In [0]:
# Step 5: Load Delta table
delta_table = DeltaTable.forName(spark, "ecommerce_events")

In [0]:
# Step 6: Deduplicate source to avoid multiple source row errors
window_spec = Window.partitionBy(
    "user_session",
    "event_time",
    "product_id",
    "event_type"
).orderBy(col("event_time").desc())

updates_df = (
    oct_df
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)

In [0]:
# Step 7: Check count before merge
before_count = spark.read.format("delta").table("ecommerce_events").count()
print("Before merge count of rows:", before_count)

Before merge count of rows: 42448764


In [0]:
# Step 8: Perform MERGE
delta_table.alias("t").merge(
    updates_df.alias("s"),
    """
    t.user_session = s.user_session AND
    t.event_time = s.event_time AND
    t.product_id = s.product_id AND
    t.event_type = s.event_type
    """
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Step 9: Check count after merge
after_count = spark.read.format("delta").table("ecommerce_events").count()
print("After merge count of rows:", after_count)

After merge count of rows: 42448766


In [0]:
from delta.tables import DeltaTable

# Load table
delta_table = DeltaTable.forPath(spark, delta_path)

In [0]:
# a) Show Delta history
spark.sql("DESCRIBE HISTORY ecommerce_events").show(truncate=False)

+-------+-------------------+--------------+------------------------+---------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+------------------------+-----------+-----------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# b) Query an old version by version number
version_0_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .table("ecommerce_events")

print("Number of rows in version 0:", version_0_df.count())

Number of rows in version 0: 42448764


In [0]:
# c) Query as of a timestamp
from datetime import datetime

yesterday_df = spark.read.format("delta") \
    .option("timestampAsOf", "2026-01-13 16:02:24") \
    .table("ecommerce_events")

print("Number of rows as of timestamp:", yesterday_df.count())

Number of rows as of timestamp: 42448764


In [0]:
# Optimize Delta table with ZORDER by columns often queried
spark.sql("OPTIMIZE ecommerce_events ZORDER BY (event_type, user_id)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# retention: 7 days (168 hours)
delta_table.vacuum(retentionHours=168)

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-8375019326151658>, line 2[0m
[1;32m      1[0m [38;5;66;03m# retention: 7 days (168 hours)[39;00m
[0;32m----> 2[0m delta_table[38;5;241m.[39mvacuum(retentionHours[38;5;241m=[39m[38;5;241m168[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/delta/connect/tables.py:177[0m, in [0;36mDeltaTable.vacuum[0;34m(self, retentionHours)[0m
[1;32m    175[0m [38;5;28;01mdef[39;00m [38;5;21mvacuum[39m([38;5;28mself[39m, retentionHours: Optional[[38;5;28mfloat[39m] [38;5;241m=[39m [38;5;28;01mNone[39;00m) [38;5;241m-[39m[38;5;241m>[39m DataFrame:
[1;32m    176[0m     command [38;5;241m=[39m Vacuum([38;5;28mself[39m[38;5;241m.[39m_to_proto(), retentionHours)[38;5;241m.[39mcommand(session[38;5;241m=[39m[38;5;28mself[39m[38;5;241m.[39m_spark[38;5

In [0]:
# retention: 7 days (168 hours)
delta_table = DeltaTable.forName(spark, "ecommerce_events")
delta_table.vacuum(retentionHours=168)