In [0]:
spark.sql("SHOW VOLUMES IN workspace.ecommerce").show()

+---------+--------------+
| database|   volume_name|
+---------+--------------+
|ecommerce|ecommerce_data|
+---------+--------------+



In [0]:
from pyspark.sql import functions as F
raw = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
bronze= raw.withColumn("ingestion_ts", F.current_timestamp())
bronze.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")

In [0]:
 
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/bronze_events")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")

In [0]:
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/silver_events")
product_perf = silver.groupBy("product_id") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.expr("try_cast(price as double)"))).alias("revenue")
    ).withColumn("conversion_rate", F.expr("try_divide(purchases, views) * 100"))
product_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/gold_products")