In [0]:
# BRONZE: Raw ingestion
import pyspark.sql.functions as F
raw = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)
raw.withColumn("ingestion_ts", F.current_timestamp() ) \
.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")

In [0]:
# SILVER: Cleaned data
import pyspark.sql.functions as F
bronze = spark.read. format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")
silver = bronze.filter(F.col("price") > 0) \
.filter(F.col("price") < 10000) \
.dropDuplicates(["user_session", "event_time"]) \
.withColumn("event_date", F.to_date("event_time") ) \
.withColumn("price_tier",
F.when(F.col("price") < 10, "budget")
.when(F.col("price") < 50, "mid")
.otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")

In [0]:
from pyspark.sql import functions as F
# Read Silver layer
silver = spark.read.format("delta") \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")
# GOLD: Product-level performance
product_perf = (
    silver
    .groupBy("product_id", "brand")
    .agg(
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("views"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        F.sum(F.when(F.col("event_type") == "purchase", F.col("price")).otherwise(0)).alias("revenue")
    )

    .withColumn(
        "conversion_rate",
        F.when(
            F.col("views") > 0,
            (F.col("purchases") / F.col("views")) * 100
        ).otherwise(0)
    )
)
display(product_perf.limit(5))