In [0]:
# Establishing connection with the source and sink containers in the Blob Storage 

storage_name = "retailsalesblob"
source_container = "silver"
sink_container = "gold"

SAS_TOKEN = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-07-17T03:35:48Z&st=2025-07-14T19:35:48Z&spr=https&sig=AYpKegiPKIYAe1fIrCZw5dG%2BLEIs%2FTfJnU29V2TIbFk%3D"


config_silver= {
    f"fs.azure.sas.{source_container}.{storage_name}.blob.core.windows.net": SAS_TOKEN
}
config_gold= {
    f"fs.azure.sas.{sink_container}.{storage_name}.blob.core.windows.net": SAS_TOKEN
}


source_uri = f"wasbs://{source_container}@{storage_name}.blob.core.windows.net/"
sink_uri = f"wasbs://{sink_container}@{storage_name}.blob.core.windows.net/"

In [0]:
# Mounting the silver and gold containers

dbutils.fs.mount(
    source = source_uri,
    mount_point = "/mnt/silver",
    extra_configs = config_silver
)

dbutils.fs.mount(
    source = sink_uri,
    mount_point = "/mnt/gold",
    extra_configs = config_gold
)

True

In [0]:
# Loading the dataset

file_name = "part-00000-ddc6dba8-d570-4966-8d76-663a373fa651-c000.csv"

df = spark.read.option("header", "true").format("csv").load("/mnt/silver/"+file_name)
df.display()

Transaction ID,Date,Customer ID,Gender,Age,Product Category,Quantity,Price per Unit,Total Amount,CustDC,GenderDC
1,2023-11-24,CUST001,Male,34,Beauty,3,50,150,1,male
2,2023-02-27,CUST002,Female,26,Clothing,2,500,1000,2,female
3,2023-01-13,CUST003,Male,50,Electronics,1,30,30,3,male
4,2023-05-21,CUST004,Male,37,Clothing,1,500,500,4,male
5,2023-05-06,CUST005,Male,30,Beauty,2,50,100,5,male
6,2023-04-25,CUST006,Female,45,Beauty,1,30,30,6,female
7,2023-03-13,CUST007,Male,46,Clothing,2,25,50,7,male
8,2023-02-22,CUST008,Male,30,Electronics,4,25,100,8,male
9,2023-12-13,CUST009,Male,63,Electronics,2,300,600,9,male
10,2023-10-07,CUST010,Female,52,Clothing,4,50,200,10,female


In [0]:
# Renaming column names to avoid Delta write issues

df = df.withColumnRenamed("Transaction ID", "Transaction_ID") \
       .withColumnRenamed("Customer ID", "Customer_ID") \
       .withColumnRenamed("Product Category", "Product_Category") \
       .withColumnRenamed("Price per Unit", "Price_per_Unit") \
       .withColumnRenamed("Total Amount", "Total_Amount")

df.display()

Transaction_ID,Date,Customer_ID,Gender,Age,Product_Category,Quantity,Price_per_Unit,Total_Amount,CustDC,GenderDC
1,2023-11-24,CUST001,Male,34,Beauty,3,50,150,1,male
2,2023-02-27,CUST002,Female,26,Clothing,2,500,1000,2,female
3,2023-01-13,CUST003,Male,50,Electronics,1,30,30,3,male
4,2023-05-21,CUST004,Male,37,Clothing,1,500,500,4,male
5,2023-05-06,CUST005,Male,30,Beauty,2,50,100,5,male
6,2023-04-25,CUST006,Female,45,Beauty,1,30,30,6,female
7,2023-03-13,CUST007,Male,46,Clothing,2,25,50,7,male
8,2023-02-22,CUST008,Male,30,Electronics,4,25,100,8,male
9,2023-12-13,CUST009,Male,63,Electronics,2,300,600,9,male
10,2023-10-07,CUST010,Female,52,Clothing,4,50,200,10,female


In [0]:
# Import statements for the transformations

from pyspark.sql.functions import datediff, min, max, to_date, col, round, to_date, date_format, sum as _sum, avg, countDistinct, when, count, current_date, lit, date_sub
from pyspark.sql.window import Window

In [0]:
df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

df = df.withColumn("YearMonth", date_format(col("Date"), "yyyy-MM"))

monthly_revenue = (
    df.groupBy("YearMonth", "Product_Category")
      .agg(round(_sum(col("Total_Amount")), 2).alias("Monthly_Revenue"))
      .orderBy("YearMonth", "Monthly_Revenue", ascending=[True, False])
)

monthly_revenue.display()

monthly_revenue.write.format("delta").mode("overwrite").save("/mnt/gold/monthly_revenue")


YearMonth,Product_Category,Monthly_Revenue
2023-01,Clothing,13125.0
2023-01,Beauty,12430.0
2023-01,Electronics,9895.0
2023-02,Electronics,15465.0
2023-02,Clothing,14560.0
2023-02,Beauty,14035.0
2023-03,Clothing,15065.0
2023-03,Beauty,10545.0
2023-03,Electronics,3380.0
2023-04,Clothing,13940.0


In [0]:
# Computing High Value Customers - Customers with a total expenditure amount more than a specified value

high_value_cust = (
    df.filter(col("Total_Amount") > 0)
      .groupBy("CustDC")
      .agg(
          countDistinct("Transaction_ID").alias("Total_Transactions"),
          _sum("Total_Amount").alias("Total_Spent"),
          avg("Total_Amount").alias("Avg_Spend_Per_Txn")
      )
      .filter(col("Total_Spent") > 900)
      .orderBy("Total_Spent", ascending=False)
)

high_value_cust.display()

high_value_cust.write.format("delta").mode("overwrite").save("/mnt/gold/high_value_customers")


CustDC,Total_Transactions,Total_Spent,Avg_Spend_Per_Txn
447,1,2000.0,2000.0
124,1,2000.0,2000.0
577,1,2000.0,2000.0
743,1,2000.0,2000.0
15,1,2000.0,2000.0
700,1,2000.0,2000.0
155,1,2000.0,2000.0
595,1,2000.0,2000.0
626,1,2000.0,2000.0
970,1,2000.0,2000.0


In [0]:
# Customer Demographics Heatmap

df = df.withColumn(
    "Age_Group",
    when(col("Age") < 10, "<10")
    .when((col("Age") >= 10) & (col("Age") < 20), "10-19")
    .when((col("Age") >= 20) & (col("Age") < 30), "20-29")
    .when((col("Age") >= 30) & (col("Age") < 40), "30-39")
    .when((col("Age") >= 40) & (col("Age") < 50), "40-49")
    .when((col("Age") >= 50) & (col("Age") < 60), "50-59")
    .when((col("Age") >= 60) & (col("Age") < 70), "60-69")
    .otherwise("70+")
)

demo_heatmap = (
    df.groupBy("Age_Group", "GenderDC")
      .agg(
          countDistinct("*").alias("Total_Transactions"),
          _sum("Total_Amount").alias("Total_Revenue")
      )
      .orderBy(col("Total_Revenue").desc())
)

demo_heatmap.display()

demo_heatmap.write.format("delta").mode("overwrite").save("/mnt/gold/demo_heatmap")


Age_Group,GenderDC,Total_Transactions,Total_Revenue
50-59,female,112,50675.0
30-39,female,93,49245.0
20-29,male,104,48820.0
40-49,female,121,48305.0
20-29,female,105,48250.0
50-59,male,109,47665.0
30-39,male,98,47080.0
40-49,male,101,45060.0
60-69,male,60,23725.0
60-69,female,55,21090.0


In [0]:
# Top 3 products by Quantity Sold

top_products = (
    df.groupBy("Product_Category")
      .agg(_sum("Quantity").alias("Total_Units_Sold"))
      .orderBy("Total_Units_Sold", ascending=False)
      .limit(10)
)

top_products.display()

top_products.write.format("delta").mode("overwrite").save("/mnt/gold/top_products")


Product_Category,Total_Units_Sold
Clothing,894.0
Electronics,849.0
Beauty,771.0


In [0]:
# Gender Revenue Share %

from pyspark.sql.functions import sum as _sum, round, col
from pyspark.sql import Window

gender_rev = (
    df.groupBy("GenderDC")
      .agg(_sum("Total_Amount").alias("Revenue"))
)

window_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

gender_share = gender_rev.withColumn(
    "Revenue_Percentage",
    round(col("Revenue") * 100 / _sum("Revenue").over(window_all), 2)
)

gender_share.display()

gender_share.write.format("delta").mode("overwrite").save("/mnt/gold/gender_revenue_share")


GenderDC,Revenue,Revenue_Percentage
female,232840.0,51.06
male,223160.0,48.94


In [0]:
# Inactive Customers (No Purchase in Last 30 Days)

from pyspark.sql.functions import to_date, col, lit, date_sub

df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

latest_date = to_date(lit("2023-12-31"), "yyyy-MM-dd")
start_date = date_sub(latest_date, 30)

recent_customers = df.filter(
    (col("Date") >= start_date) & (col("Date") <= latest_date)
)

recent_customers.display()

recent_customers.write.format("delta").mode("overwrite").save("/mnt/gold/recent_customers")


Transaction_ID,Date,Customer_ID,Gender,Age,Product_Category,Quantity,Price_per_Unit,Total_Amount,CustDC,GenderDC,YearMonth,Age_Group
9,2023-12-13,CUST009,Male,63,Electronics,2,300,600,9,male,2023-12,60-69
25,2023-12-26,CUST025,Female,64,Beauty,1,50,50,25,female,2023-12,60-69
34,2023-12-24,CUST034,Female,51,Clothing,3,50,150,34,female,2023-12,50-59
62,2023-12-27,CUST062,Male,18,Beauty,2,50,100,62,male,2023-12,10-19
65,2023-12-05,CUST065,Male,51,Electronics,4,500,2000,65,male,2023-12,50-59
80,2023-12-10,CUST080,Female,64,Clothing,2,30,60,80,female,2023-12,60-69
82,2023-12-26,CUST082,Female,32,Beauty,4,50,200,82,female,2023-12,30-39
83,2023-12-16,CUST083,Male,54,Electronics,2,50,100,83,male,2023-12,50-59
96,2023-12-19,CUST096,Female,44,Clothing,2,300,600,96,female,2023-12,40-49
99,2023-12-17,CUST099,Female,50,Electronics,4,300,1200,99,female,2023-12,50-59


In [0]:
# Testing the uploads 

monthly_revenue = spark.read.format("delta").load("/mnt/gold/monthly_revenue")
monthly_revenue.display()

YearMonth,Product_Category,Monthly_Revenue
2023-01,Clothing,13125.0
2023-01,Beauty,12430.0
2023-01,Electronics,9895.0
2023-02,Electronics,15465.0
2023-02,Clothing,14560.0
2023-02,Beauty,14035.0
2023-03,Clothing,15065.0
2023-03,Beauty,10545.0
2023-03,Electronics,3380.0
2023-04,Clothing,13940.0
