In [0]:
%sql
USE CATALOG workspace;
USE SCHEMA ecommerce;

CREATE VOLUME IF NOT EXISTS workspace.ecommerce.raw;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.bronze;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.silver;
CREATE VOLUME IF NOT EXISTS workspace.ecommerce.gold;


In [0]:
from pyspark.sql import Row
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import random

# -----------------------------
# Generate synthetic raw data
# -----------------------------
data = []

event_types = ["view", "cart", "purchase"]

for i in range(100):
    data.append(Row(
        user_id=f"user_{random.randint(1, 20)}",
        user_session=f"session_{random.randint(1, 30)}",
        product_id=f"product_{random.randint(1, 10)}",
        product_name=f"Product {random.randint(1, 10)}",
        event_type=random.choice(event_types),
        price=random.randint(5, 500),
        event_time=datetime.now() - timedelta(minutes=random.randint(1, 500))
    ))

raw_df = spark.createDataFrame(data)

# -----------------------------
# BRONZE write (append-only)
# -----------------------------
bronze_path = "/Volumes/workspace/ecommerce/bronze/events"

bronze_df = raw_df.withColumn(
    "ingestion_ts", F.current_timestamp()
)

bronze_df.write.format("delta") \
    .mode("append") \
    .save(bronze_path)


In [0]:
from pyspark.sql import functions as F

bronze_path = "/Volumes/workspace/ecommerce/bronze/events"
silver_path = "/Volumes/workspace/ecommerce/silver/events"

bronze_df = spark.read.format("delta").load(bronze_path)

silver_df = (
    bronze_df
    .filter(F.col("price").isNotNull())
    .filter((F.col("price") > 0) & (F.col("price") < 10000))
    .dropDuplicates(["user_session", "event_time", "product_id"])
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn(
        "price_tier",
        F.when(F.col("price") < 50, "budget")
         .when(F.col("price") < 200, "mid")
         .otherwise("premium")
    )
)

silver_df.write.format("delta") \
    .mode("overwrite") \
    .save(silver_path)


In [0]:
from pyspark.sql import functions as F

silver_path = "/Volumes/workspace/ecommerce/silver/events"
gold_path = "/Volumes/workspace/ecommerce/gold/product_performance"

silver_df = spark.read.format("delta").load(silver_path)

gold_df = (
    silver_df
    .groupBy("product_id", "product_name")
    .agg(
        F.countDistinct(
            F.when(F.col("event_type") == "view", F.col("user_id"))
        ).alias("views"),
        F.countDistinct(
            F.when(F.col("event_type") == "purchase", F.col("user_id"))
        ).alias("purchases"),
        F.sum(
            F.when(F.col("event_type") == "purchase", F.col("price"))
        ).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0,
               (F.col("purchases") / F.col("views")) * 100
        ).otherwise(0)
    )
)

gold_df.write.format("delta") \
    .mode("overwrite") \
    .save(gold_path)
