In [0]:
from pyspark.sql import functions as F
raw = (spark.read.format("delta")
                .option("header","true") 
                .option("inferSchema","true")
                .load("/Volumes/workspace/ecommerce/ecommerce_data/events/"))


(raw.withColumn("ingestion_ts", F.current_timestamp()) 
   .write.format("delta").mode("overwrite").saveAsTable("bronze_tbl"))

In [0]:
display(spark.sql("select * from bronze_tbl"))

In [0]:
bronze = spark.table("bronze_tbl")

silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
    

silver.write.format("delta").mode("overwrite").saveAsTable("silver_tbl")

In [0]:
display(spark.sql("select * from silver_tbl"))

In [0]:
silver = spark.table("silver_tbl")

product_perf = (
    silver
    .groupBy("product_id", "brand")
    .agg(
        F.countDistinct(
            F.when(F.col("event_type") == "view", F.col("user_id"))
        ).alias("views"),

        F.countDistinct(
            F.when(F.col("event_type") == "purchase", F.col("user_id"))
        ).alias("purchases"),

        F.sum(
            F.when(
                F.col("event_type") == "purchase",
                F.expr("try_cast(price as double)")
            )
        ).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(
            F.col("views") > 0,
            (F.col("purchases") / F.col("views")) * 100
        ).otherwise(0)
    )
)

product_perf.write.format("delta").mode("overwrite").saveAsTable("gold_tbl")


In [0]:
display(spark.sql("select * from gold_tbl"))