# Delta Tables - GOLD LAYER

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
silver = 'abfss://silver@ecommerceprojectdl.dfs.core.windows.net'
gold = 'abfss://gold@ecommerceprojectdl.dfs.core.windows.net'

In [0]:
silver_path = f"{silver}/month_Oct"
gold_path   = f"{gold}/ecommerce_clickstream_kpi"

In [0]:
df_oct = spark.read.format("delta").load(silver_path)

In [0]:
from pyspark.sql import functions as F
# 3️⃣ Compute KPIs
df_gold_oct = (
  df_oct.agg(
        F.count("*").alias("total_events"),
        F.countDistinct("user_id").alias("total_users"),
        F.countDistinct("user_session").alias("total_sessions"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_purchases"),
        F.avg(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("avg_purchase_price"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("total_views"),
        F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("total_carts")
    )
    .withColumn("conversion_rate", F.col("total_purchases") / F.col("total_sessions"))
    .withColumn("avg_events_per_session", F.col("total_events") / F.col("total_sessions"))
    .withColumn("view_to_cart_rate", F.round(F.col("total_carts") / F.col("total_views"), 4))
    .withColumn("cart_to_purchase_rate", F.round(F.col("total_purchases") / F.col("total_carts"), 4))
    .withColumn("month", F.lit("2019-10"))
)

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ecommerce_catalog.gold;

In [0]:
df_gold_oct.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold}/ecommerce_eventtype_kpi") \
    .saveAsTable("ecommerce_catalog.gold.ecommerce_eventtype_kpi")

In [0]:
path_silver = f"{silver}/month_Nov"
gold_path   = f"{gold}/ecommerce_clickstream_kpi_nov"

In [0]:
df_nov = spark.read.format("delta").load(path_silver)

In [0]:
from pyspark.sql import functions as F
# 3️⃣ Compute KPIs
df_gold_nov = (
  df_nov.agg(
        F.count("*").alias("total_events"),
        F.countDistinct("user_id").alias("total_users"),
        F.countDistinct("user_session").alias("total_sessions"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_purchases"),
        F.avg(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("avg_purchase_price"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("total_views"),
        F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("total_carts")
    )
    .withColumn("conversion_rate", F.col("total_purchases") / F.col("total_sessions"))
    .withColumn("avg_events_per_session", F.col("total_events") / F.col("total_sessions"))
    .withColumn("view_to_cart_rate", F.round(F.col("total_carts") / F.col("total_views"), 4))
    .withColumn("cart_to_purchase_rate", F.round(F.col("total_purchases") / F.col("total_carts"), 4))
    .withColumn("month", F.lit("2019-11"))
)

In [0]:
df_gold_nov.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold}/ecommerce_clickstream_kpi_nov") \
    .saveAsTable("ecommerce_catalog.gold.ecommerce_clickstream_kpi_nov")

In [0]:
display(df_gold_nov)