In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
#Configuring Service Principal 

STORAGE_ACCOUNT_NAME = "adlsshivendra"
ADLS_CONTAINER_NAME = "raw"
CLIENT_ID = "4069d864-2ea2-4354-a634-71a8488be89e"
TENANT_ID = "4ac50105-0c66-404e-a107-7cbd8a9a6442"
CLIENT_SECRET = "o.O8Q~ARpL1hfZzcjC_yJZTPhhc-jTCDlI_BRdif"


#Setting the Spark configuration for this session
spark.conf.set(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", CLIENT_ID)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", CLIENT_SECRET)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token")
print("Spark config set for Service Principal.")



Spark config set for Service Principal.


Bronze to Silver Transformations

In [0]:
#Loading all Bronze tables

df_orders = spark.table("bronze_orders")
df_customers = spark.table("bronze_customers")
df_products = spark.table("bronze_products")
df_stores = spark.table("bronze_stores")
df_payments = spark.table("bronze_payments")
df_inventory = spark.table("bronze_inventory")

In [0]:
# Checking Schema and Null in Orders Dataframe
df_orders.printSchema()
df_orders.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_orders.columns
]).show()


root
 |-- OrderID: integer (nullable = true)
 |-- OrderDateTime: timestamp (nullable = true)
 |-- StoreID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- DiscountPct: integer (nullable = true)
 |-- PaymentID: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- Channel: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- OrderTotal: double (nullable = true)
 |-- Currency: string (nullable = true)

+-------------+-------------------+-------------+----------------+---------------+--------------+---------------+-----------------+---------------+-------------------+-------------+------------+----------------+--------------+
|OrderID_nulls|OrderDateTime_nulls|StoreID_nulls|CustomerID_nulls|ProductID_nulls|Quantity_nulls|UnitPrice_nulls|DiscountPct_nulls|PaymentID_nulls|PaymentMethod_nulls|Channel_nulls|St

In [0]:
# Checking Schema and Null in Customers Dataframe
df_customers.printSchema()
df_customers.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_customers.columns
]).show()

root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: long (nullable = true)
 |-- LoyaltyPoints: integer (nullable = true)
 |-- Tier: string (nullable = true)
 |-- SignupDate: date (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- PostalCode: integer (nullable = true)

+----------------+---------------+--------------+-----------+-----------+-------------------+----------+----------------+----------+-----------+----------------+
|CustomerID_nulls|FirstName_nulls|LastName_nulls|Email_nulls|Phone_nulls|LoyaltyPoints_nulls|Tier_nulls|SignupDate_nulls|City_nulls|State_nulls|PostalCode_nulls|
+----------------+---------------+--------------+-----------+-----------+-------------------+----------+----------------+----------+-----------+----------------+
|               0|              0|             0|          0|          0| 

In [0]:
# Checking Schema and Null in Products Dataframe
df_products.printSchema()
df_products.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_products.columns
]).show()

root
 |-- ProductID: integer (nullable = true)
 |-- SKU: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Subcategory: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Cost: double (nullable = true)
 |-- Supplier: string (nullable = true)
 |-- Discontinued: boolean (nullable = true)

+---------------+---------+-----------------+--------------+-----------------+---------------+----------+--------------+------------------+
|ProductID_nulls|SKU_nulls|ProductName_nulls|Category_nulls|Subcategory_nulls|UnitPrice_nulls|Cost_nulls|Supplier_nulls|Discontinued_nulls|
+---------------+---------+-----------------+--------------+-----------------+---------------+----------+--------------+------------------+
|              0|        0|                0|             0|                0|              0|         0|             0|                 0|
+---------------+---------+-----------------+--------------+---------

In [0]:
# Checking Schema and Null in stores Dataframe
df_stores.printSchema()
df_stores.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_stores.columns
]).show()

root
 |-- StoreID: integer (nullable = true)
 |-- StoreName: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- OpenDate: date (nullable = true)
 |-- SquareFeet: integer (nullable = true)

+-------------+---------------+------------+----------+-----------+--------------+----------------+
|StoreID_nulls|StoreName_nulls|Region_nulls|City_nulls|State_nulls|OpenDate_nulls|SquareFeet_nulls|
+-------------+---------------+------------+----------+-----------+--------------+----------------+
|            0|              0|           0|         0|          0|             0|               0|
+-------------+---------------+------------+----------+-----------+--------------+----------------+



In [0]:
# Checking Schema and Null in Inventory Dataframe
df_inventory.printSchema()
df_inventory.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_inventory.columns
]).show()

root
 |-- SnapshotDate: date (nullable = true)
 |-- StoreID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- OnHandQty: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)

+------------------+-------------+---------------+---------------+------------------+
|SnapshotDate_nulls|StoreID_nulls|ProductID_nulls|OnHandQty_nulls|ReorderPoint_nulls|
+------------------+-------------+---------------+---------------+------------------+
|                 0|            0|              0|              0|                 0|
+------------------+-------------+---------------+---------------+------------------+



In [0]:
# Checking Schema and Null in Payments Dataframe
df_payments.printSchema()
df_payments.select([
    sum(col(c).isNull().cast("int")).alias(c + "_nulls")
    for c in df_payments.columns
]).show()


root
 |-- PaymentID: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Method: string (nullable = true)
 |-- Gateway: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- AuthCode: string (nullable = true)
 |-- FraudScore: double (nullable = true)

+---------------+-------------+------------+--------------+------------+-------------+------------+--------------+----------------+
|PaymentID_nulls|OrderID_nulls|Amount_nulls|Currency_nulls|Method_nulls|Gateway_nulls|Status_nulls|AuthCode_nulls|FraudScore_nulls|
+---------------+-------------+------------+--------------+------------+-------------+------------+--------------+----------------+
|              0|            0|           0|             0|           0|            0|           0|             0|               0|
+---------------+-------------+------------+--------------+------------+-------------+------------+-----------

In [0]:
# Masking Email and Phone no. of customers
df_silver_customers = df_customers \
    .withColumn("Email", regexp_replace(col("Email"), r"^(.*)@", "XXXX@")) \
    .withColumn("Phone", regexp_replace(col("Phone").cast("string"), r"\d(?=\d{4})", "X"))

df_silver_customers.show(15, truncate=False)

#ADLS silver path for customer table
silver_customers_path = f"abfss://silver@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/silver_customers"

# Saving the silver customers table into ADLS silver
df_silver_customers.write.mode("overwrite") \
    .format("delta") \
    .option("path", silver_customers_path) \
    .saveAsTable("silver_customers")



+----------+---------+--------+----------------+------------+-------------+------+----------+-----------+-----+----------+
|CustomerID|FirstName|LastName|Email           |Phone       |LoyaltyPoints|Tier  |SignupDate|City       |State|PostalCode|
+----------+---------+--------+----------------+------------+-------------+------+----------+-----------+-----+----------+
|1         |Vivaan   |Singh   |XXXX@example.com|XXXXXXXX7923|547          |Silver|2023-12-29|Nagpur     |IN   |432043    |
|2         |Pooja    |Verma   |XXXX@example.com|XXXXXXXX0525|559          |Bronze|2021-05-21|Mumbai     |IN   |761820    |
|3         |Advika   |Patel   |XXXX@example.com|XXXXXXXX3256|456          |Bronze|2021-07-05|Ahmedabad  |IN   |203225    |
|4         |Ananya   |Chettri |XXXX@example.com|XXXXXXXX6825|490          |Silver|2022-09-06|Jaipur     |IN   |130353    |
|5         |Vihaan   |Singh   |XXXX@example.com|XXXXXXXX5019|336          |Bronze|2021-04-25|Bhopal     |IN   |475876    |
|6         |Riya

In [0]:
# 1. Creating silver_stores and saving it to ADLS silver
df_silver_stores = df_stores

#ADLS silver path for stores table
silver_stores_path = f"abfss://silver@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/silver_stores"
df_silver_stores.write.mode("overwrite") \
    .format("delta") \
    .option("path", silver_stores_path) \
    .saveAsTable("silver_stores")

# 2. Creating silver_products and saving it to ADLS silver
df_silver_products = df_products

#ADLS silver path for products table
silver_products_path = f"abfss://silver@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/silver_products"
df_silver_products.write.mode("overwrite") \
    .format("delta") \
    .option("path", silver_products_path) \
    .saveAsTable("silver_products")

# 3. Creating silver_payments and saving it to ADLS silver
df_silver_payments = df_payments

#ADLS silver path for products table
silver_payments_path = f"abfss://silver@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/silver_payments"
df_silver_products.write.mode("overwrite") \
    .format("delta") \
    .option("path", silver_payments_path) \
    .saveAsTable("silver_payments")


In [0]:
# Creating Enriched Fact Table (Silver)

# Cleansing and rename the base fact table 
df_orders_renamed = df_orders \
    .withColumn("DiscountPct", coalesce(col("DiscountPct"), lit(0))) \
    .withColumnRenamed("UnitPrice", "SaleUnitPrice") \
    .withColumnRenamed("Status", "OrderStatus") \
    .withColumnRenamed("Currency", "OrderCurrency")

# Rename conflicting columns on ALL dimension tables
df_stores_renamed = df_stores \
    .withColumnRenamed("City", "StoreCity") \
    .withColumnRenamed("State", "StoreState")
    
df_customers_renamed = df_customers \
    .withColumnRenamed("City", "CustomerCity") \
    .withColumnRenamed("State", "CustomerState")
    
df_products_renamed = df_products \
    .withColumnRenamed("UnitPrice", "ProductListPrice")
    
df_payments_renamed = df_payments \
    .withColumnRenamed("Status", "PaymentStatus") \
    .withColumnRenamed("Currency", "PaymentCurrency")

# Joining all the renamed tables
df_silver_fact_orders = df_orders_renamed \
    .join(df_payments_renamed, ["OrderID", "PaymentID"]) \
    .join(df_stores_renamed, "StoreID") \
    .join(df_products_renamed, "ProductID") \
    .join(df_customers_renamed, "CustomerID")

# Defining the ADLS path
silver_fact_path = f"abfss://silver@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/silver_fact_orders"


# Writing to ADLS Silver
df_silver_fact_orders.write.mode("overwrite") \
    .format("delta") \
    .option("path", silver_fact_path) \
    .saveAsTable("silver_fact_orders")

In [0]:
df_silver_fact_orders.show(10,truncate=False)

+----------+---------+-------+-------+-----------+-------------------+--------+-------------+-----------+-------------+-------+-----------+----------+-------------+------+---------------+------+--------+-------------+--------+----------+---------+-------+---------+----------+----------+----------+--------+-----------------------+--------+------------+----------------+-----+------------+------------+---------+--------+-----------------------------+------------+-------------+------+----------+------------+-------------+----------+
|CustomerID|ProductID|StoreID|OrderID|PaymentID  |OrderDateTime      |Quantity|SaleUnitPrice|DiscountPct|PaymentMethod|Channel|OrderStatus|OrderTotal|OrderCurrency|Amount|PaymentCurrency|Method|Gateway |PaymentStatus|AuthCode|FraudScore|StoreName|Region |StoreCity|StoreState|OpenDate  |SquareFeet|SKU     |ProductName            |Category|Subcategory |ProductListPrice|Cost |Supplier    |Discontinued|FirstName|LastName|Email                        |Phone       |L

Gold Aggregations

In [0]:
# 1. Daily Sales Summary
df_silver_base = spark.table("silver_fact_orders")
df_gold_daily_sales = df_silver_base \
    .withColumn("OrderDate", date_format(col("OrderDateTime"), "yyyy-MM-dd")) \
    .groupBy("OrderDate", "StoreName", "Region", "Category") \
    .agg(
        round(sum("OrderTotal"),2).alias("TotalRevenue"),
        sum("Quantity").alias("TotalQuantity")
    )

df_gold_daily_sales.show(10,truncate=False)

# Defining path and saving to Gold container
gold_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_daily_sales_summary"
spark.sql("DROP TABLE IF EXISTS gold_daily_sales_summary")
df_gold_daily_sales.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_path) \
    .saveAsTable("gold_daily_sales_summary")


+----------+---------+-------+-----------+------------+-------------+
|OrderDate |StoreName|Region |Category   |TotalRevenue|TotalQuantity|
+----------+---------+-------+-----------+------------+-------------+
|2025-05-06|Store-006|North  |Electronics|271.72      |4            |
|2025-06-17|Store-010|West   |Grocery    |446.33      |7            |
|2025-06-13|Store-011|North  |Electronics|1557.32     |46           |
|2025-04-07|Store-010|West   |Home       |446.53      |9            |
|2025-07-05|Store-005|West   |Apparel    |386.69      |7            |
|2025-07-08|Store-008|Central|Electronics|404.41      |6            |
|2025-07-12|Store-008|Central|Health     |156.2       |3            |
|2025-06-12|Store-012|West   |Electronics|366.42      |5            |
|2025-07-07|Store-001|North  |Home       |250.95      |4            |
|2025-05-06|Store-002|East   |Apparel    |129.96      |3            |
+----------+---------+-------+-----------+------------+-------------+
only showing top 10 

In [0]:
# 2. Monthly Product Sales 

df_gold_product_sales = df_silver_base \
    .withColumn("OrderYear", year(col("OrderDateTime"))) \
    .withColumn("OrderMonth", month(col("OrderDateTime"))) \
    .groupBy("OrderYear", "OrderMonth", "ProductID", "ProductName", "Category", "Subcategory") \
    .agg(
        sum("Quantity").alias("TotalQuantitySold"),
        round(sum("OrderTotal"),2).alias("TotalRevenue")
    )

df_gold_product_sales.show(10,truncate=False)

# Defining path and saving to Gold container
gold_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_monthly_product_sales"
spark.sql("DROP TABLE IF EXISTS gold_monthly_product_sales")
df_gold_product_sales.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_path) \
    .saveAsTable("gold_monthly_product_sales")


+---------+----------+---------+---------------------------+-----------+------------+-----------------+------------+
|OrderYear|OrderMonth|ProductID|ProductName                |Category   |Subcategory |TotalQuantitySold|TotalRevenue|
+---------+----------+---------+---------------------------+-----------+------------+-----------------+------------+
|2025     |5         |143      |Electronics-Audio-143      |Electronics|Audio       |35               |2990.0      |
|2025     |8         |229      |Health-OTC-229             |Health     |OTC         |20               |882.0       |
|2025     |8         |200      |Grocery-Beverages-200      |Grocery    |Beverages   |24               |1935.2      |
|2025     |5         |68       |Electronics-Audio-068      |Electronics|Audio       |21               |1435.01     |
|2025     |7         |184      |Health-PersonalCare-184    |Health     |PersonalCare|32               |1992.15     |
|2025     |6         |211      |Home-Kitchen-211           |Home

In [0]:
# 3. Sales by Channel & Payment 

df_gold_channel_payment = df_silver_base \
    .groupBy("Channel", "PaymentMethod") \
    .agg(
        count("OrderID").alias("OrderCount"),
        round(sum("OrderTotal"),2).alias("TotalRevenue")
    )

df_gold_channel_payment.show(truncate=False)

# Defining path and saving to Gold container
gold_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_sales_by_channel_payment"
spark.sql("DROP TABLE IF EXISTS gold_sales_by_channel_payment")
df_gold_channel_payment.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_path) \
    .saveAsTable("gold_sales_by_channel_payment")


+-------+-------------+----------+------------+
|Channel|PaymentMethod|OrderCount|TotalRevenue|
+-------+-------------+----------+------------+
|POS    |CARD         |10608     |898448.09   |
|POS    |UPI          |6286      |519853.15   |
|Online |WALLET       |937       |77980.79    |
|Online |CARD         |4492      |393439.34   |
|Online |UPI          |2635      |224738.6    |
|Online |CASH         |911       |72565.02    |
|POS    |CASH         |2042      |180323.42   |
|POS    |WALLET       |2089      |174415.62   |
+-------+-------------+----------+------------+



In [0]:
# 4. Customer Tier Analysis 

df_gold_customer_tier = df_silver_base \
    .groupBy("Tier") \
    .agg(
        count("OrderID").alias("OrderCount"),
        round(sum("OrderTotal"),2).alias("TotalRevenue"),
        round(avg("OrderTotal"),2).alias("AverageOrderValue")
    )

df_gold_customer_tier.show(truncate=False)

# Defining path and saving to Gold container
gold_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_customer_tier_analysis"
spark.sql("DROP TABLE IF EXISTS gold_customer_tier_analysis")
df_gold_customer_tier.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_path) \
    .saveAsTable("gold_customer_tier_analysis")


+--------+----------+------------+-----------------+
|Tier    |OrderCount|TotalRevenue|AverageOrderValue|
+--------+----------+------------+-----------------+
|Platinum|1426      |120278.03   |84.35            |
|Silver  |8706      |738069.96   |84.78            |
|Gold    |4384      |379517.32   |86.57            |
|Bronze  |15484     |1303898.72  |84.21            |
+--------+----------+------------+-----------------+



In [0]:
#  5. Daily Fraud Summary 

df_gold_fraud_summary = df_silver_base \
    .withColumn("OrderDate", date_format(col("OrderDateTime"), "yyyy-MM-dd")) \
    .groupBy("OrderDate", "StoreName") \
    .agg(
        count("OrderID").alias("TotalOrders"),
        round(avg("FraudScore"),5).alias("AverageFraudScore"),
        sum(when(col("FraudScore") > 0.75, 1).otherwise(0)).alias("HighFraudOrderCount")
    )

df_gold_fraud_summary.desc("AverageFraudScore")show(truncate=False)

# Defining path and saving to Gold container
gold_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_daily_fraud_summary"
spark.sql("DROP TABLE IF EXISTS gold_daily_fraud_summary")
df_gold_fraud_summary.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_path) \
    .saveAsTable("gold_daily_fraud_summary")


+----------+---------+-----------+-----------------+-------------------+
|OrderDate |StoreName|TotalOrders|AverageFraudScore|HighFraudOrderCount|
+----------+---------+-----------+-----------------+-------------------+
|2025-07-24|Store-012|18         |0.1265           |0                  |
|2025-04-18|Store-007|22         |0.13127          |0                  |
|2025-07-27|Store-002|16         |0.10956          |0                  |
|2025-05-03|Store-002|20         |0.0972           |0                  |
|2025-07-05|Store-001|23         |0.10322          |0                  |
|2025-04-28|Store-009|21         |0.11524          |0                  |
|2025-06-30|Store-002|17         |0.11382          |0                  |
|2025-07-16|Store-010|18         |0.09789          |0                  |
|2025-06-15|Store-011|10         |0.0925           |0                  |
|2025-08-17|Store-003|15         |0.09613          |0                  |
|2025-05-18|Store-010|21         |0.13652          

In [0]:
# Creating Initial Inventory Table (for Streaming)

# Read from bronze_inventory 
df_inventory_snapshot = df_inventory

# Find the latest snapshot for each item
windowSpec = Window.partitionBy("StoreID", "ProductID").orderBy(desc("SnapshotDate"))

df_latest_inventory = df_inventory_snapshot \
    .withColumn("row_num", row_number().over(windowSpec)) \
    .filter("row_num = 1") \
    .drop("row_num", "SnapshotDate") # We only need the current quantity

# 3. Defining the ADLS path
gold_inventory_path = f"abfss://gold@{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/gold_current_inventory"

# 4. Drop old registration and save
print("Dropping old table registration (if any)...")
spark.sql("DROP TABLE IF EXISTS gold_current_inventory")

print(f"Writing to new location: {gold_inventory_path}")
df_latest_inventory.write.mode("overwrite") \
    .format("delta") \
    .option("path", gold_inventory_path) \
    .saveAsTable("gold_current_inventory")

print("`gold_current_inventory` table created successfully.")
display(df_latest_inventory)

Dropping old table registration (if any)...
Writing to new location: abfss://gold@adlsshivendra.dfs.core.windows.net/gold_current_inventory
`gold_current_inventory` table created successfully.


StoreID,ProductID,OnHandQty,ReorderPoint
1,1,50,40
1,2,158,26
1,3,154,34
1,4,121,30
1,5,114,36
1,6,139,44
1,7,139,35
1,8,174,31
1,9,139,33
1,10,155,49


In [0]:
%sql
SELECT * FROM gold_current_inventory
where ProductID = 1
order by StoreID;

StoreID,ProductID,OnHandQty,ReorderPoint
1,1,50,40
2,1,133,42
3,1,137,32
4,1,127,34
5,1,125,26
6,1,166,52
7,1,94,39
8,1,119,29
9,1,103,50
10,1,110,43
