In [0]:
base_path = "/Volumes/workspace/ecommerce/ecommerce_data"
events_delta_path = f"{base_path}/events_cleaned"

events_delta = spark.read.format("delta").load(events_delta_path)
events_delta.printSchema()
events_delta.show(5, truncate=False)

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+----------------------+--------+------+---------+------------------------------------+
|event_time         |event_type|product_id|category_id        |category_code         |brand   |price |user_id  |user_session                        |
+-------------------+----------+----------+-------------------+----------------------+--------+------+---------+------------------------------------+
|2019-10-13 06:29:47|view      |22700193  |2053013556168753601|NULL                  |stels   |77.22 |537333502|1a1e1d4a-4a39-4c11-ac98-92fce3eb3de0|
|2019-10-13 06:37:14

## Time travel: see old versions

In [0]:
# checking previous versions
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, events_delta_path)
dt.history().select("version", "timestamp", "operation").show(truncate=False)

+-------+-------------------+---------+
|version|timestamp          |operation|
+-------+-------------------+---------+
|0      |2026-01-13 03:06:48|WRITE    |
+-------+-------------------+---------+



In [0]:
# 2.2 Read a specific version
v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(events_delta_path)

print("Version 0 row count:", v0.count())

Version 0 row count: 42413557


In [0]:
yesterday_df = spark.read.format("delta") \
    .option("timestampAsOf", "2026-01-13 03:06:48") \
    .load(events_delta_path) 
yesterday_df.show(truncate=False)


+-------------------+----------+----------+-------------------+--------------------------------+--------+------+---------+------------------------------------+
|event_time         |event_type|product_id|category_id        |category_code                   |brand   |price |user_id  |user_session                        |
+-------------------+----------+----------+-------------------+--------------------------------+--------+------+---------+------------------------------------+
|2019-10-13 06:29:47|view      |22700193  |2053013556168753601|NULL                            |stels   |77.22 |537333502|1a1e1d4a-4a39-4c11-ac98-92fce3eb3de0|
|2019-10-13 06:37:14|view      |15100148  |2053013557024391671|NULL                            |lider   |378.36|521327059|0a8ee6fd-8e12-4181-adb6-069288a717e2|
|2019-10-13 06:37:25|view      |1003310   |2053013555631882655|electronics.smartphone          |apple   |696.13|544301403|3c9ef31b-2f74-4d7d-9e6e-5c40e3cd5157|
|2019-10-13 06:37:47|view      |1004838 

## MERGE: incremental upserts on events

In [0]:
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import StructType
# Take one real row as a base
sample_row = events_delta.limit(1).collect()[0].asDict()

# Modify to simulate an update on same session/time (e.g., change price)
update_row = dict(sample_row)
update_row["price"] = float(update_row["price"]) + 10.0

# New session (different user_session)
new_row = dict(sample_row)
new_row["user_session"] = new_row["user_session"] + "_NEW"
schema: StructType = events_delta.schema

updates = spark.createDataFrame([update_row, new_row], schema=schema)
updates.show(truncate=False)

+-------------------+----------+----------+-------------------+-------------+-----+-----+---------+----------------------------------------+
|event_time         |event_type|product_id|category_id        |category_code|brand|price|user_id  |user_session                            |
+-------------------+----------+----------+-------------------+-------------+-----+-----+---------+----------------------------------------+
|2019-10-13 06:29:47|view      |22700193  |2053013556168753601|NULL         |stels|87.22|537333502|1a1e1d4a-4a39-4c11-ac98-92fce3eb3de0    |
|2019-10-13 06:29:47|view      |22700193  |2053013556168753601|NULL         |stels|77.22|537333502|1a1e1d4a-4a39-4c11-ac98-92fce3eb3de0_NEW|
+-------------------+----------+----------+-------------------+-------------+-----+-----+---------+----------------------------------------+



In [0]:
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, events_delta_path)

dt.alias("t").merge(
    updates.alias("s"),
    "t.user_session = s.user_session AND t.event_time = s.event_time"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
dt.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

+-------+-------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |operation|operationMetrics                                                                                                                                                                                                                                                   

In [0]:
# comparing pre-merge version with the latest one
# Assume latest version is N (from dt.history)
latest_version = dt.history().agg(F.max("version")).collect()[0][0]

before = spark.read.format("delta") \
    .option("versionAsOf", latest_version - 1) \
    .load(events_delta_path)

after = spark.read.format("delta") \
    .option("versionAsOf", latest_version) \
    .load(events_delta_path)

print("Before row count:", before.count())
print("After row count:", after.count())


Before row count: 42413557
After row count: 42413558


## OPTIMIZE & ZORDER (for performance)

In [0]:
events_delta_optimized_path = "/Volumes/workspace/ecommerce/ecommerce_data/events_optimized"

events_delta.write.format("delta") \
    .mode("overwrite") \
    .save(events_delta_optimized_path)


In [0]:
events_delta.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.ecommerce.events_optimized")

In [0]:
%sql
OPTIMIZE workspace.ecommerce.events_optimized
ZORDER BY (event_type, user_id);

path,metrics
,"List(26, 22, List(27057445, 66836611, 5.32657355E7, 26, 1384909123), List(23079590, 68613874, 6.631459327272727E7, 22, 1458921052), 0, List(minCubeSize(107374182400), List(0, 0), List(22, 1458921052), 0, List(22, 1458921052), 1, null), null, 0, 1, 22, 0, false, 0, 0, 1768357558560, 1768357581292, 8, 1, null, List(0, 0), null, 9, 9, 79163, 0, null)"


In [0]:
%sql
VACUUM workspace.ecommerce.events_optimized RETAIN 168 HOURS;

path
