In [0]:
# Step 1: Read CSV
oct_df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
    header=True,
    inferSchema=True
)

# Step 2: Set Delta table path
delta_path = "/Volumes/workspace/ecommerce/delta/ecommerce_events"

# Step 3: Save as Delta table (overwrite if exists)
oct_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(delta_path)

# Step 4: Import DeltaTable and Spark functions
from delta.tables import DeltaTable
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Step 5: Load Delta table
delta_table = DeltaTable.forPath(spark, delta_path)

# Step 6: Deduplicate source to avoid multiple source row errors
window_spec = Window.partitionBy(
    "user_session",
    "event_time",
    "product_id",
    "event_type"
).orderBy(col("event_time").desc())

updates_df = (
    oct_df
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)

# Step 7: Check count before merge
before_count = spark.read.format("delta").load(delta_path).count()
print("Before merge count of rows:", before_count)

# Step 8: Perform MERGE
delta_table.alias("t").merge(
    updates_df.alias("s"),
    """
    t.user_session = s.user_session AND
    t.event_time = s.event_time AND
    t.product_id = s.product_id AND
    t.event_type = s.event_type
    """
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Step 9: Check count after merge
after_count = spark.read.format("delta").load(delta_path).count()
print("After merge count of rows:", after_count)


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8587327753182349>, line 15[0m
[1;32m      9[0m delta_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/Volumes/workspace/ecommerce/delta/ecommerce_events[39m[38;5;124m"[39m
[1;32m     11[0m [38;5;66;03m# Step 3: Save as Delta table (overwrite if exists)[39;00m
[1;32m     12[0m oct_df[38;5;241m.[39mwrite \
[1;32m     13[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m     14[0m     [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m) \
[0;32m---> 15[0m     [38;5;241m.[39msave(delta_path)
[1;32m     17[0m [38;5;66;03m# Step 4: Import DeltaTable and Spark functions[39;00m
[1;32m     18[0m [38;5;28;01mfrom[39;00m [38;5;21;01mdelta[39;00m[38;5;21;01m.[39;00m[38;5;21;01mtables[39;00m [

In [0]:
from delta.tables import DeltaTable

# Load table
delta_table = DeltaTable.forPath(spark, delta_path)

# a) Show Delta history
spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`").show(truncate=False)

# b) Query an old version by version number
version_0_df = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(delta_path)

print("Number of rows in version 0:", version_0_df.count())

# c) Query as of a timestamp
from datetime import datetime

# Example: all rows as of Jan 1, 2026
yesterday_df = spark.read.format("delta") \
    .option("timestampAsOf", "2026-01-12 15:20:23") \
    .load(delta_path)

print("Number of rows as of timestamp:", yesterday_df.count())




In [0]:
# Optimize Delta table with ZORDER by columns often queried
spark.sql(f"OPTIMIZE delta.`{delta_path}` ZORDER BY (event_type, user_id)")



In [0]:
spark.sql("""
VACUUM events_table RETAIN 168 HOURS
""")  # 7 days


