In [0]:
dbutils.fs.ls("/Volumes/workspace/ecommerce/ecommerce_data/delta/events")

[FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta/events/_delta_log/', name='_delta_log/', size=0, modificationTime=1768486110667),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta/events/deletion_vector_04e7331d-8d88-4402-abac-5ae80eb3902e.bin', name='deletion_vector_04e7331d-8d88-4402-abac-5ae80eb3902e.bin', size=2711, modificationTime=1768482892000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta/events/deletion_vector_0579c13c-d10a-4aa2-8682-bb5cb3a3200c.bin', name='deletion_vector_0579c13c-d10a-4aa2-8682-bb5cb3a3200c.bin', size=2541, modificationTime=1768482892000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta/events/deletion_vector_0919426e-b5d4-4ed8-8cc5-9d6d46e82f82.bin', name='deletion_vector_0919426e-b5d4-4ed8-8cc5-9d6d46e82f82.bin', size=2541, modificationTime=1768268952000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce/ecommerce_data/delta/events/deletion_vector_14b6fe0f-c451-4

In [0]:
from pyspark.sql import functions as F

# BRONZE: Raw ingestion
raw = (
    spark.read
    .csv(
        "/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv",
        header=True,
        inferSchema=True
    )
)

(
    raw.withColumn("ingestion_ts", F.current_timestamp())
       .write
       .format("delta")
       .mode("overwrite")
       .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze")
)


In [0]:
# SILVER: Cleaned data
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver")

In [0]:
display(silver.limit(10))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,ingestion_ts,event_date,price_tier
2019-10-13T06:26:49.000Z,view,6300680,2053013554834964853,appliances.kitchen.kettle,redmond,28.01,516613556,5263b662-8458-4617-ba04-0d1307b0c0d1,2026-01-15T14:09:37.956Z,2019-10-13,mid
2019-10-13T06:26:52.000Z,view,1004870,2053013555631882655,electronics.smartphone,samsung,285.4,520314753,06874646-bab8-4004-a526-abd74615cc13,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:27:06.000Z,view,1005205,2053013555631882655,electronics.smartphone,oppo,256.89,514323585,a83ac9bb-9e18-45b6-93d7-77d07c8a1aab,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:29:33.000Z,view,13102070,2053013553526341921,,skad,285.52,543019089,cd85f9a1-da56-402d-98dc-8d6c951e4a34,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:30:51.000Z,view,10300416,2053013553115300101,,bruder,38.59,559694866,34acb39b-c0df-496d-98a3-95cf43364ea4,2026-01-15T14:09:37.956Z,2019-10-13,mid
2019-10-13T06:32:42.000Z,view,3601505,2053013563810775923,appliances.kitchen.washer,samsung,490.36,512736428,892051a0-f960-4d8e-8a8b-6fd3bb9bb452,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:32:42.000Z,view,17200726,2053013559792632471,furniture.living_room.sofa,rals,811.66,518116501,2dc76863-c32a-4f5e-b212-e7d298c63209,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:32:54.000Z,view,1801901,2053013554415534427,electronics.video.tv,samsung,635.49,515773729,a7cfecab-ab71-448a-b549-333465a30390,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:34:39.000Z,view,15700137,2053013559733912211,,,257.15,516979460,6cde2578-2fc6-4aaa-933e-d7b33c381616,2026-01-15T14:09:37.956Z,2019-10-13,premium
2019-10-13T06:35:20.000Z,view,15902427,2053013558190408249,,polaris,28.95,543427549,b1421386-e9ff-45cb-9314-0fcdde7c86a2,2026-01-15T14:09:37.956Z,2019-10-13,mid


In [0]:
silver.select("event_type", "price", "user_id").show(10, truncate=False)

+----------+------+---------+
|event_type|price |user_id  |
+----------+------+---------+
|view      |28.01 |516613556|
|view      |285.4 |520314753|
|view      |256.89|514323585|
|view      |285.52|543019089|
|view      |38.59 |559694866|
|view      |490.36|512736428|
|view      |811.66|518116501|
|view      |635.49|515773729|
|view      |257.15|516979460|
|view      |28.95 |543427549|
+----------+------+---------+
only showing top 10 rows


In [0]:
silver.select("price") \
      .where(F.expr("try_cast(price as double) IS NULL") & F.col("price").isNotNull()) \
      .distinct() \
      .show(200, False)

+-----+
|price|
+-----+
+-----+



In [0]:
# GOLD: Aggregates
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver")

product_perf = (
    silver
    .groupBy("product_id", "brand")
    .agg(
        F.countDistinct(
            F.when(F.col("event_type") == "view", F.col("user_id"))
        ).alias("views"),
        F.countDistinct(
            F.when(F.col("event_type") == "purchase", F.col("user_id"))
        ).alias("purchases"),
        F.sum(
            F.when(F.col("event_type") == "purchase", F.col("price_double"))
        ).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0,
               (F.col("purchases") / F.col("views")) * 100
        ).otherwise(F.lit(0))
    )
)


