# Delta Tables - GOLD LAYER

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
silver = 'abfss://silver@ecommerceprojectdl.dfs.core.windows.net'
gold = 'abfss://gold@ecommerceprojectdl.dfs.core.windows.net'

In [0]:
silver_path = f"{silver}/month_Oct"
gold_path   = f"{gold}/ecommerce_clickstream_kpi"

In [0]:
df_oct = spark.read.format("delta").load(silver_path)

In [0]:
display(df_oct)

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,event_date,event_time_only,main_category,sub_category,item_category
2019-10-01T00:00:37.000Z,view,1701111,2053013553031414015,computers.peripherals.monitor,acer,514.79,547028884,3ea7c620-a8d7-45c5-9ced-2e9874e2f549,2019-10-01,00:00:37,computers,peripherals,monitor
2019-10-01T00:18:25.000Z,view,22700084,2053013556168753601,unknown,force,241.94,529394039,41fbb210-93e8-4823-bb78-d6dee4340d45,2019-10-01,00:18:25,unknown,unknown,unknown
2019-10-01T00:30:16.000Z,view,1307004,2053013558920217191,computers.notebook,lenovo,290.61,551508458,eb9a8d4b-da50-43c5-b2c2-21fe9487a175,2019-10-01,00:30:16,computers,notebook,unknown
2019-10-01T02:15:31.000Z,view,3600666,2053013563810775923,appliances.kitchen.washer,samsung,319.14,534491202,08def6d1-ae28-4365-a247-bd318312eaba,2019-10-01,02:15:31,appliances,kitchen,washer
2019-10-01T02:18:43.000Z,view,1004749,2053013555631882655,electronics.smartphone,samsung,203.35,530162018,133bc099-f8d0-40c2-8d91-b49a322aeae2,2019-10-01,02:18:43,electronics,smartphone,unknown
2019-10-01T02:18:49.000Z,view,1005073,2053013555631882655,electronics.smartphone,samsung,1207.71,512694696,c7e588c7-78a9-4033-9326-562f7ad76eda,2019-10-01,02:18:49,electronics,smartphone,unknown
2019-10-01T02:18:55.000Z,view,12709740,2053013553559896355,unknown,yokohama,49.04,515905287,6f7bec35-afd9-4d42-a767-e283355c9346,2019-10-01,02:18:55,unknown,unknown,unknown
2019-10-01T02:19:36.000Z,view,4804055,2053013554658804075,electronics.audio.headphone,apple,189.91,517129864,df8afa49-66e0-4e54-b9ca-4be6de5c9a0f,2019-10-01,02:19:36,electronics,audio,headphone
2019-10-01T02:19:46.000Z,view,1004210,2053013555631882655,electronics.smartphone,samsung,95.21,513272701,96802e73-0fbc-42b1-8cbf-ccb5208cfd5e,2019-10-01,02:19:46,electronics,smartphone,unknown
2019-10-01T02:21:18.000Z,view,1004750,2053013555631882655,electronics.smartphone,samsung,197.43,515553990,4312ec52-14bf-4478-a9cf-32801fe327f7,2019-10-01,02:21:18,electronics,smartphone,unknown


In [0]:
from pyspark.sql import functions as F
# 3️⃣ Compute KPIs
df_gold_oct = (
  df_oct.agg(
        F.count("*").alias("total_events"),
        F.countDistinct("user_id").alias("total_users"),
        F.countDistinct("user_session").alias("total_sessions"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_purchases"),
        F.avg(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("avg_purchase_price"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("total_views"),
        F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("total_carts")
    )
    .withColumn("conversion_rate", F.col("total_purchases") / F.col("total_sessions"))
    .withColumn("avg_events_per_session", F.col("total_events") / F.col("total_sessions"))
    .withColumn("view_to_cart_rate", F.round(F.col("total_carts") / F.col("total_views"), 4))
    .withColumn("cart_to_purchase_rate", F.round(F.col("total_purchases") / F.col("total_carts"), 4))
    .withColumn("month", F.lit("2019-10"))
)

In [0]:
df_gold_oct.display()

total_events,total_users,total_sessions,total_purchases,avg_purchase_price,total_views,total_carts,conversion_rate,avg_events_per_session,view_to_cart_rate,cart_to_purchase_rate,month
5996221,710032,1289469,104888,319.75067062009003,5767752,123581,0.081342009773015,4.650147463800991,0.0214,0.8487,2019-10


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ecommerce_catalog.gold;

In [0]:
df_gold_oct.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold}/ecommerce_eventtype_kpi") \
    .saveAsTable("ecommerce_catalog.gold.ecommerce_eventtype_kpi")

In [0]:
path_silver = f"{silver}/month_Nov"
gold_path   = f"{gold}/ecommerce_clickstream_kpi_nov"

In [0]:
df_nov = spark.read.format("delta").load(path_silver)

In [0]:
from pyspark.sql import functions as F
# 3️⃣ Compute KPIs
df_gold_nov = (
  df_nov.agg(
        F.count("*").alias("total_events"),
        F.countDistinct("user_id").alias("total_users"),
        F.countDistinct("user_session").alias("total_sessions"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_purchases"),
        F.avg(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("avg_purchase_price"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("total_views"),
        F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("total_carts")
    )
    .withColumn("conversion_rate", F.col("total_purchases") / F.col("total_sessions"))
    .withColumn("avg_events_per_session", F.col("total_events") / F.col("total_sessions"))
    .withColumn("view_to_cart_rate", F.round(F.col("total_carts") / F.col("total_views"), 4))
    .withColumn("cart_to_purchase_rate", F.round(F.col("total_purchases") / F.col("total_carts"), 4))
    .withColumn("month", F.lit("2019-11"))
)

In [0]:
df_gold_nov.write.format("delta") \
    .mode("overwrite") \
    .option("path", f"{gold}/ecommerce_clickstream_kpi_nov") \
    .saveAsTable("ecommerce_catalog.gold.ecommerce_clickstream_kpi_nov")

In [0]:
display(df_gold_nov)

total_events,total_users,total_sessions,total_purchases,avg_purchase_price,total_views,total_carts,conversion_rate,avg_events_per_session,view_to_cart_rate,cart_to_purchase_rate,month
5997478,748770,1331322,89797,299.07843680746566,5832448,75233,0.0674494975670799,4.5049041479071175,0.0129,1.1936,2019-11
