In [0]:
from pyspark.sql import functions as F

# Widgets
dbutils.widgets.text(
    "source",
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    "Source CSV Path"
)

dbutils.widgets.dropdown(
    "layer",
    "bronze",
    ["bronze", "silver", "gold"],
    "Pipeline Layer"
)

# Read widget values
source = dbutils.widgets.get("source")
layer = dbutils.widgets.get("layer")

In [0]:
BASE_PATH = "/Volumes/workspace/ecommerce/ecommerce_data"

BRONZE_PATH = f"{BASE_PATH}/bronze/events"
SILVER_PATH = f"{BASE_PATH}/silver/events"
GOLD_PATH   = f"{BASE_PATH}/gold/products"

In [0]:
# Bronze Layer (Raw → Delta)
def run_bronze():
    print("Running BRONZE layer")

    raw = spark.read.csv(
        source,
        header=True,
        inferSchema=True
    )

    raw.withColumn("ingestion_ts", F.current_timestamp()) \
       .write.format("delta") \
       .mode("overwrite") \
       .save(BRONZE_PATH)

    print("Bronze layer completed")

In [0]:
# Silver Layer (Clean + Deduplicate)
def run_silver():
    print("Running SILVER layer")

    bronze = spark.read.format("delta").load(BRONZE_PATH)

    silver = bronze.filter(F.col("price") > 0) \
        .filter(F.col("price") < 10000) \
        .dropDuplicates(["user_session", "event_time"]) \
        .withColumn("event_date", F.to_date("event_time")) \
        .withColumn(
            "price_tier",
            F.when(F.col("price") < 10, "budget")
             .when(F.col("price") < 50, "mid")
             .otherwise("premium")
        )

    silver.write.format("delta") \
        .mode("overwrite") \
        .save(SILVER_PATH)

    print("Silver layer completed")

In [0]:
def run_gold():
    print("Running GOLD layer")

    silver = spark.read.format("delta").load(SILVER_PATH)

    product_perf = silver.groupBy("product_id") \
        .agg(
            F.countDistinct(
                F.when(F.col("event_type") == "view", F.col("user_id"))
            ).alias("views"),

            F.countDistinct(
                F.when(F.col("event_type") == "purchase", F.col("user_id"))
            ).alias("purchases"),

            F.sum(
                F.when(F.col("event_type") == "purchase", F.col("price"))
            ).alias("revenue")
        ) \
        .withColumn(
            "conversion_rate",
            F.try_divide(F.col("purchases"), F.col("views")) * 100
        )

    product_perf.write.format("delta") \
        .mode("overwrite") \
        .save(GOLD_PATH)

    print("Gold layer completed")


In [0]:
def run_layer(layer_name):
    if layer_name == "bronze":
        run_bronze()
    elif layer_name == "silver":
        run_silver()
    elif layer_name == "gold":
        run_gold()
    else:
        raise ValueError(f"Unknown layer: {layer_name}")

# Execute selected layer
run_layer(layer)

Running BRONZE layer
Bronze layer completed


In [0]:
spark.read.format("delta").load(BRONZE_PATH).limit(5).display()

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,ingestion_ts
2019-11-01T00:00:00.000Z,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33,2026-01-15T09:33:49.989Z
2019-11-01T00:00:00.000Z,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283,2026-01-15T09:33:49.989Z
2019-11-01T00:00:01.000Z,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387,2026-01-15T09:33:49.989Z
2019-11-01T00:00:01.000Z,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f,2026-01-15T09:33:49.989Z
2019-11-01T00:00:01.000Z,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2,2026-01-15T09:33:49.989Z


In [0]:
spark.read.format("delta").load(SILVER_PATH).select(
    "price", "price_tier", "event_date"
).limit(5).display()

price,price_tier,event_date
1363.95,premium,2019-11-17
967.82,premium,2019-11-17
57.4,premium,2019-11-17
172.46,premium,2019-11-17
115.81,premium,2019-11-17


In [0]:
spark.read.format("delta").load(GOLD_PATH).orderBy(
    F.desc("revenue")
).limit(5).display()

product_id,views,purchases,revenue,conversion_rate
1005115,372008,21687,33030410.27000068,5.829713339498075
1005105,240053,10333,21684603.369999968,4.30446609707023
1004249,190107,10754,13543934.480000034,5.656814320356431
1005135,128820,4948,12654328.770000009,3.84101847539202
1004767,369924,28926,11004247.779999962,7.81944399390145
