In [0]:
#BRONZE LAYER

df = spark.read.table("workspace.default.raw_customer_transactions")

In [0]:
%sql
use catalog `workspace`; select * from `default`.`raw_customer_transactions` limit 100;

transaction_id,customer_id,transaction_date,amount,product_category,region
T001,C1001,01/15/2023,150.75,Electronics,US
T002,C1002,01/16/2023,,Clothing,UK
T003,C1001,01/17/2023,-25.0,electronics,US
T004,C1003,01/18/2023,300.5,Books,
T005,C1002,01/18/2023,75.25,CLOTHING,UK
T006,C1004,01/19/2023,200.0,Books,US
T007,C1001,01/15/2023,150.75,Electronics,US
T008,C1005,01/20/2023,450.0,Home Appliances,US
T009,C1006,01/21/2023,89.99,electronics,UK
T010,C1003,01/22/2023,120.0,Books,US


In [0]:
#SILVER LAYER

from pyspark.sql.functions import lower, col, when, regexp_replace, to_date, avg

df = spark.read.table("workspace.default.raw_customer_transactions")

# df.printSchema()

#dropping duplicates
df = df.dropDuplicates()

#changing format of date and changing type from string to date
df = df.withColumn("transaction_date", to_date(regexp_replace(col("transaction_Date"), "-", "/"), "mm/dd/yyyy"))

#filling missing values of amount with average value based on its category, changing negative values to positive
df_avg = df.groupBy("product_category").agg(avg("amount").alias("avg_cat"))
df_joined = df.join(df_avg, "product_category", "inner")
df = df_joined.withColumn("amount", when(col("amount").isNull(), col("avg_cat"))
                           .when(col("amount")<0, -(col("amount")))
                           .otherwise(col("amount")))
df = df.drop("avg_cat")

#changing product category values to lower case
df = df.withColumn("product_category", lower(col("product_category")))

#filling missing values of region with unknown
df = df.withColumn("region", when(col("region").isNull(), "Unknown")
                    .otherwise(col("region")))

#saving dataframe to delta table and silver table is created
df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.cleaned_customer_transactions")


In [0]:
%sql
use catalog `workspace`; select * from `default`.`cleaned_customer_transactions` limit 100;

product_category,transaction_id,customer_id,transaction_date,amount,region
clothing,T041,C1006,2023-01-22,110.0,US
electronics,T007,C1001,2023-01-15,150.75,US
electronics,T017,C1001,2023-01-29,175.0,US
clothing,T002,C1002,2023-01-16,75.6875,UK
clothing,T034,C1009,2023-01-15,90.25,UK
home appliances,T015,C1009,2023-01-27,220.0,UK
clothing,T011,C1007,2023-01-23,10.5,Unknown
electronics,T033,C1008,2023-01-14,550.0,US
electronics,T001,C1001,2023-01-15,150.75,US
electronics,T046,C1001,2023-01-27,190.0,US


In [0]:
#GOLD LAYER

from pyspark.sql.functions import count, sum, max, min, collect_set

df = spark.read.table("workspace.default.cleaned_customer_transactions")

df_gold = df.groupBy("customer_id").agg(
    count("customer_id").alias("no_of_orders"),
    sum("amount").alias("total_amount"),
    max("amount").alias("max_amount"),
    min("amount").alias("min_amount"),
    count("transaction_id").alias("transactions"), 
    min("transaction_date").alias("first_purchased"),
    max("transaction_date").alias("last_purchased"),
    collect_set("product_category").alias("categories"),
    collect_set("region").alias("regions"),
)

df_gold.write.format("delta").mode("overwrite").saveAsTable("gold_customer_transactions")

In [0]:
%sql
use catalog `workspace`; select * from `default`.`gold_customer_transactions` limit 100;

customer_id,no_of_orders,total_amount,max_amount,min_amount,transactions,first_purchased,last_purchased,categories,regions
C1003,6,1621.25,340.0,120.0,6,2023-01-01,2023-01-31,List(books),"List(US, Unknown)"
C1010,4,509.95,210.0,50.0,4,2023-01-06,2023-01-28,"List(electronics, clothing, books)",List(US)
C1005,5,2330.0,600.0,380.0,5,2023-01-01,2023-01-21,"List(home appliances, electronics)","List(US, Unknown, UK)"
C1002,7,489.2375,85.0,45.0,7,2023-01-08,2023-01-30,List(clothing),List(UK)
C1007,4,230.75,95.25,10.5,4,2023-01-03,2023-01-23,"List(clothing, electronics, books)","List(US, UK, Unknown)"
C1006,4,290.49,110.0,15.0,4,2023-01-02,2023-01-22,"List(clothing, electronics)","List(US, UK)"
C1004,5,1059.0227272727273,267.5,125.0,5,2023-01-02,2023-01-26,"List(home appliances, books)",List(US)
C1009,4,510.75,220.0,20.0,4,2023-01-05,2023-01-27,"List(clothing, home appliances, books)",List(UK)
C1001,7,1058.0,200.75,25.0,7,2023-01-07,2023-01-29,List(electronics),List(US)
C1008,4,2040.25,750.25,320.0,4,2023-01-04,2023-01-25,"List(books, electronics)",List(US)
