In [0]:
%sql
CREATE VOLUME IF NOT EXISTS retail_project.default.retail_project_volume;

In [0]:
#check to see if i can read my data
storage_account = "your storage account name"
storage_key = "your secret key" 

# Now read using the abfss protocol
path = f"abfss://retail@{storage_account}.dfs.core.windows.net/bronze/transaction/"
df = spark.read.parquet(path)
display(df)


transaction_id,customer_id,product_id,store_id,quantity,transaction_date
1,301,1,1,1,2025-01-05
2,302,5,2,2,2025-01-08
3,303,8,5,1,2025-01-12
4,304,3,4,3,2025-01-18
5,305,6,1,1,2025-01-22
6,306,10,3,2,2025-02-02
7,307,7,5,1,2025-02-06
8,308,4,2,1,2025-02-10
9,309,9,4,2,2025-02-14
10,310,2,1,1,2025-02-20


In [0]:
%sql
-- This creates a permanent pointer to your ADLS 'bronze' folder
CREATE EXTERNAL VOLUME IF NOT EXISTS retail_project.default.bronze_data
LOCATION 'abfss://retail@dataengprojectsa.dfs.core.windows.net/bronze/'
COMMENT 'Linked to ADLS Gen2 Bronze folder';


In [0]:
# List the contents of your new volume
display(dbutils.fs.ls("/Volumes/retail_project/default/bronze_data/"))


path,name,size,modificationTime
dbfs:/Volumes/retail_project/default/bronze_data/customer/,customer/,0,1767330176000
dbfs:/Volumes/retail_project/default/bronze_data/product/,product/,0,1767330169000
dbfs:/Volumes/retail_project/default/bronze_data/store/,store/,0,1767330162000
dbfs:/Volumes/retail_project/default/bronze_data/transaction/,transaction/,0,1767330155000


In [0]:
# 1. Read Parquet folders
df_transactions = spark.read.parquet("/Volumes/retail_project/default/bronze_data/transaction/")
df_store        = spark.read.parquet("/Volumes/retail_project/default/bronze_data/store/")
df_products     = spark.read.parquet("/Volumes/retail_project/default/bronze_data/product/")

display(df_transactions)


transaction_id,customer_id,product_id,store_id,quantity,transaction_date
1,301,1,1,1,2025-01-05
2,302,5,2,2,2025-01-08
3,303,8,5,1,2025-01-12
4,304,3,4,3,2025-01-18
5,305,6,1,1,2025-01-22
6,306,10,3,2,2025-02-02
7,307,7,5,1,2025-02-06
8,308,4,2,1,2025-02-10
9,309,9,4,2,2025-02-14
10,310,2,1,1,2025-02-20


In [0]:
# 2. Read JSON folder
df_customers = spark.read.parquet("/Volumes/retail_project/default/bronze_data/customer/ChuksJoy/Azure-Data-Engineering-medallion-architecture/refs/heads/main/")

display(df_customers)


# https://dataengprojectsa.blob.core.windows.net/retail/bronze/customer/ChuksJoy/Azure-Data-Engineering-medallion-architecture/refs/heads/main/Customer.parquet

customer_id,first_name,last_name,email,phone,city,registration_date
301,Amina,Bello,amina.bello@example.com,8031234501,Lagos,2023-05-14
302,Funke,Adeyemi,funke.adeyemi@example.com,8124567892,Ibadan,2023-07-22
303,Zainab,Sadiq,zainab.sadiq@example.com,7039871245,Abuja,2023-10-05
304,Chioma,Okafor,chioma.okafor@example.com,9051237864,Onitsha,2024-01-18
305,Sade,Olawale,sade.olawale@example.com,8067893412,Abeokuta,2023-11-02
306,Blessing,Eze,blessing.eze@example.com,8190345678,Enugu,2024-02-10
307,Hadiza,Musa,hadiza.musa@example.com,7084561239,Kano,2023-06-27
308,Ifeoma,Nwoye,ifeoma.nwoye@example.com,9033456781,Owerri,2024-03-04
309,Yewande,Ajayi,yewande.ajayi@example.com,8051239876,Akure,2023-08-19
310,Rahma,Abdul,rahma.abdul@example.com,8167894523,Ilorin,2024-04-11


SILVER LAYER - DATA CLEANING

In [0]:
from pyspark.sql.functions import col
# Convert types and clean data

df_transactions = df_transactions.select(
    col("transaction_id").cast("int"),
    col("customer_id").cast("int"),
    col("product_id").cast("int"),
    col("store_id").cast("int"),
    col("quantity").cast("int"),
    col("transaction_date").cast("date")
)

df_products = df_products.select(
    col("product_id").cast("int"),
    col("product_name"),
    col("category"),
    col("price").cast("double")
)

df_store = df_store.select(
    col("store_id").cast("int"),
    col("store_name"),
    col("location")
)

df_customers = df_customers.select(
    "customer_id", "first_name", "last_name", "email", "city", "registration_date"
).dropDuplicates(["customer_id"])


In [0]:
# Join all data
df_silver = df_transactions \
    .join(df_customers, "customer_id") \
    .join(df_products, "product_id") \
    .join(df_store, "store_id") \
    .withColumn("total_amount", col("quantity") * col("price"))


In [0]:
display(df_silver)

store_id,product_id,customer_id,transaction_id,quantity,transaction_date,first_name,last_name,email,city,registration_date,product_name,category,price,store_name,location,total_amount
1,6,305,5,1,2025-01-22,Sade,Olawale,sade.olawale@example.com,Abeokuta,2023-11-02,Skincare Gift Box,Beauty,21999.0,Elegance Boutique,Lagos,21999.0
1,8,314,14,1,2025-03-16,Fatima,Shehu,fatima.shehu@example.com,Zaria,2024-02-22,Perfume Eau de Parfum,Fragrance,29999.0,Elegance Boutique,Lagos,29999.0
4,2,325,25,2,2025-05-22,Omolara,Ajiboye,omolara.ajiboye@example.com,Osogbo,2023-12-27,Heeled Sandals,Footwear,17999.0,Urban Chic Outlet,Ibadan,35998.0
5,12,321,21,1,2025-05-02,Bimpe,Alade,bimpe.alade@example.com,Ijebu Ode,2023-06-11,Spa Robe,Wellness,19999.0,The Feminine Hub,Lekki,19999.0
3,10,306,6,2,2025-02-02,Blessing,Eze,blessing.eze@example.com,Enugu,2024-02-10,Statement Earrings,Jewelry,9999.0,Style & Grace,Port Harcourt,19998.0
5,8,303,3,1,2025-01-12,Zainab,Sadiq,zainab.sadiq@example.com,Abuja,2023-10-05,Perfume Eau de Parfum,Fragrance,29999.0,The Feminine Hub,Lekki,29999.0
2,5,313,13,3,2025-03-11,Kemi,Ojo,kemi.ojo@example.com,Ife,2023-09-29,Lipstick Collection,Beauty,15999.0,Glow Beauty Store,Abuja,47997.0
2,4,323,23,1,2025-05-10,Esther,Daniels,esther.daniels@example.com,Uyo,2023-08-02,Makeup Brush Set,Beauty,12999.0,Glow Beauty Store,Abuja,12999.0
2,7,318,18,1,2025-04-10,Halima,Garba,halima.garba@example.com,Yola,2024-03-18,Yoga Leggings,Activewear,13999.0,Glow Beauty Store,Abuja,13999.0
4,9,320,20,1,2025-04-21,Safiya,Ibrahim,safiya.ibrahim@example.com,Gombe,2024-04-29,Hair Styling Kit,Beauty,18999.0,Urban Chic Outlet,Ibadan,18999.0


MOVE TO ADLS SILVER CONTAINER

In [0]:
# 1. Define the ADLS path for your Silver layer
# Ensure 'silver' exists as a folder in your 'retail' container
adls_silver_path = "abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/"

# 2. Write the DataFrame as Delta format directly to Azure
df_silver.write \
  .mode("overwrite") \
  .format("delta") \
  .save(adls_silver_path)

print(f"Data successfully written to ADLS at: {adls_silver_path}")


Data successfully written to ADLS at: abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/


In [0]:
# Use the ADLS path authorized by your External Location
silver_location = "abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS retail_project.default.retail_silver_cleaned
USING DELTA
LOCATION '{silver_location}'
""")



DataFrame[]

In [0]:
%%sql 

SELECT * FROM retail_project.default.retail_silver_cleaned;


store_id,product_id,customer_id,transaction_id,quantity,transaction_date,first_name,last_name,email,city,registration_date,product_name,category,price,store_name,location,total_amount
1,6,305,5,1,2025-01-22,Sade,Olawale,sade.olawale@example.com,Abeokuta,2023-11-02,Skincare Gift Box,Beauty,21999.0,Elegance Boutique,Lagos,21999.0
1,8,314,14,1,2025-03-16,Fatima,Shehu,fatima.shehu@example.com,Zaria,2024-02-22,Perfume Eau de Parfum,Fragrance,29999.0,Elegance Boutique,Lagos,29999.0
4,2,325,25,2,2025-05-22,Omolara,Ajiboye,omolara.ajiboye@example.com,Osogbo,2023-12-27,Heeled Sandals,Footwear,17999.0,Urban Chic Outlet,Ibadan,35998.0
5,12,321,21,1,2025-05-02,Bimpe,Alade,bimpe.alade@example.com,Ijebu Ode,2023-06-11,Spa Robe,Wellness,19999.0,The Feminine Hub,Lekki,19999.0
3,10,306,6,2,2025-02-02,Blessing,Eze,blessing.eze@example.com,Enugu,2024-02-10,Statement Earrings,Jewelry,9999.0,Style & Grace,Port Harcourt,19998.0
5,8,303,3,1,2025-01-12,Zainab,Sadiq,zainab.sadiq@example.com,Abuja,2023-10-05,Perfume Eau de Parfum,Fragrance,29999.0,The Feminine Hub,Lekki,29999.0
2,5,313,13,3,2025-03-11,Kemi,Ojo,kemi.ojo@example.com,Ife,2023-09-29,Lipstick Collection,Beauty,15999.0,Glow Beauty Store,Abuja,47997.0
2,4,323,23,1,2025-05-10,Esther,Daniels,esther.daniels@example.com,Uyo,2023-08-02,Makeup Brush Set,Beauty,12999.0,Glow Beauty Store,Abuja,12999.0
2,7,318,18,1,2025-04-10,Halima,Garba,halima.garba@example.com,Yola,2024-03-18,Yoga Leggings,Activewear,13999.0,Glow Beauty Store,Abuja,13999.0
4,9,320,20,1,2025-04-21,Safiya,Ibrahim,safiya.ibrahim@example.com,Gombe,2024-04-29,Hair Styling Kit,Beauty,18999.0,Urban Chic Outlet,Ibadan,18999.0


DataFrame[store_id: int, product_id: int, customer_id: int, transaction_id: int, quantity: int, transaction_date: date, first_name: string, last_name: string, email: string, city: string, registration_date: string, product_name: string, category: string, price: double, store_name: string, location: string, total_amount: double]

In [0]:
# Run this to see if a _delta_log folder exists
display(dbutils.fs.ls("abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/"))


path,name,size,modificationTime
abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/_delta_log/,_delta_log/,0,1767350021000
abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/part-00000-195258f9-7fc0-4662-bf15-b9d904583975.c000.snappy.parquet,part-00000-195258f9-7fc0-4662-bf15-b9d904583975.c000.snappy.parquet,6761,1767350022000


In [0]:

silver_path = "abfss://retail@dataengprojectsa.dfs.core.windows.net/silver/"

# Now you can use format("delta")
silver_df = spark.read.format("delta").load(silver_path)


In [0]:
display(silver_df)


store_id,product_id,customer_id,transaction_id,quantity,transaction_date,first_name,last_name,email,city,registration_date,product_name,category,price,store_name,location,total_amount
1,6,305,5,1,2025-01-22,Sade,Olawale,sade.olawale@example.com,Abeokuta,2023-11-02,Skincare Gift Box,Beauty,21999.0,Elegance Boutique,Lagos,21999.0
1,8,314,14,1,2025-03-16,Fatima,Shehu,fatima.shehu@example.com,Zaria,2024-02-22,Perfume Eau de Parfum,Fragrance,29999.0,Elegance Boutique,Lagos,29999.0
4,2,325,25,2,2025-05-22,Omolara,Ajiboye,omolara.ajiboye@example.com,Osogbo,2023-12-27,Heeled Sandals,Footwear,17999.0,Urban Chic Outlet,Ibadan,35998.0
5,12,321,21,1,2025-05-02,Bimpe,Alade,bimpe.alade@example.com,Ijebu Ode,2023-06-11,Spa Robe,Wellness,19999.0,The Feminine Hub,Lekki,19999.0
3,10,306,6,2,2025-02-02,Blessing,Eze,blessing.eze@example.com,Enugu,2024-02-10,Statement Earrings,Jewelry,9999.0,Style & Grace,Port Harcourt,19998.0
5,8,303,3,1,2025-01-12,Zainab,Sadiq,zainab.sadiq@example.com,Abuja,2023-10-05,Perfume Eau de Parfum,Fragrance,29999.0,The Feminine Hub,Lekki,29999.0
2,5,313,13,3,2025-03-11,Kemi,Ojo,kemi.ojo@example.com,Ife,2023-09-29,Lipstick Collection,Beauty,15999.0,Glow Beauty Store,Abuja,47997.0
2,4,323,23,1,2025-05-10,Esther,Daniels,esther.daniels@example.com,Uyo,2023-08-02,Makeup Brush Set,Beauty,12999.0,Glow Beauty Store,Abuja,12999.0
2,7,318,18,1,2025-04-10,Halima,Garba,halima.garba@example.com,Yola,2024-03-18,Yoga Leggings,Activewear,13999.0,Glow Beauty Store,Abuja,13999.0
4,9,320,20,1,2025-04-21,Safiya,Ibrahim,safiya.ibrahim@example.com,Gombe,2024-04-29,Hair Styling Kit,Beauty,18999.0,Urban Chic Outlet,Ibadan,18999.0


GOLD LAYER

In [0]:
from pyspark.sql.functions import sum, countDistinct, avg

In [0]:
gold_df = silver_df.groupBy(
    "transaction_date",
    "product_id", "product_name", "category",
    "store_id", "store_name", "location"
).agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_amount").alias("total_sales_amount"),
    countDistinct("transaction_id").alias("number_of_transactions"),
    avg("total_amount").alias("average_transaction_value")
)

In [0]:

display(gold_df)

transaction_date,product_id,product_name,category,store_id,store_name,location,total_quantity_sold,total_sales_amount,number_of_transactions,average_transaction_value
2025-02-10,4,Makeup Brush Set,Beauty,2,Glow Beauty Store,Abuja,1,12999.0,1,12999.0
2025-05-02,12,Spa Robe,Wellness,5,The Feminine Hub,Lekki,1,19999.0,1,19999.0
2025-01-22,6,Skincare Gift Box,Beauty,1,Elegance Boutique,Lagos,1,21999.0,1,21999.0
2025-03-22,1,Luxury Handbag,Fashion,4,Urban Chic Outlet,Ibadan,2,49998.0,1,49998.0
2025-04-21,9,Hair Styling Kit,Beauty,4,Urban Chic Outlet,Ibadan,1,18999.0,1,18999.0
2025-02-06,7,Yoga Leggings,Activewear,5,The Feminine Hub,Lekki,1,13999.0,1,13999.0
2025-04-15,3,Silk Scarf,Accessories,1,Elegance Boutique,Lagos,4,35996.0,1,35996.0
2025-04-01,6,Skincare Gift Box,Beauty,3,Style & Grace,Port Harcourt,1,21999.0,1,21999.0
2025-03-03,11,Ankara Maxi Dress,Fashion,3,Style & Grace,Port Harcourt,1,27999.0,1,27999.0
2025-05-22,2,Heeled Sandals,Footwear,4,Urban Chic Outlet,Ibadan,2,35998.0,1,35998.0


In [0]:
# Define Gold layer path in ADLS
gold_path = "abfss://retail@dataengprojectsa.dfs.core.windows.net/gold/"

# Write the aggregated data back to ADLS
gold_df.write.mode("overwrite").format("delta").save(gold_path)


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS retail_project.default.retail_gold_sales_summary
USING DELTA
LOCATION '{gold_path}'
""")

DataFrame[]

In [0]:
%%sql
SELECT * FROM retail_project.default.retail_gold_sales_summary

transaction_date,product_id,product_name,category,store_id,store_name,location,total_quantity_sold,total_sales_amount,number_of_transactions,average_transaction_value
2025-02-10,4,Makeup Brush Set,Beauty,2,Glow Beauty Store,Abuja,1,12999.0,1,12999.0
2025-05-02,12,Spa Robe,Wellness,5,The Feminine Hub,Lekki,1,19999.0,1,19999.0
2025-01-22,6,Skincare Gift Box,Beauty,1,Elegance Boutique,Lagos,1,21999.0,1,21999.0
2025-03-22,1,Luxury Handbag,Fashion,4,Urban Chic Outlet,Ibadan,2,49998.0,1,49998.0
2025-04-21,9,Hair Styling Kit,Beauty,4,Urban Chic Outlet,Ibadan,1,18999.0,1,18999.0
2025-02-06,7,Yoga Leggings,Activewear,5,The Feminine Hub,Lekki,1,13999.0,1,13999.0
2025-04-15,3,Silk Scarf,Accessories,1,Elegance Boutique,Lagos,4,35996.0,1,35996.0
2025-04-01,6,Skincare Gift Box,Beauty,3,Style & Grace,Port Harcourt,1,21999.0,1,21999.0
2025-03-03,11,Ankara Maxi Dress,Fashion,3,Style & Grace,Port Harcourt,1,27999.0,1,27999.0
2025-05-22,2,Heeled Sandals,Footwear,4,Urban Chic Outlet,Ibadan,2,35998.0,1,35998.0


DataFrame[transaction_date: date, product_id: int, product_name: string, category: string, store_id: int, store_name: string, location: string, total_quantity_sold: bigint, total_sales_amount: double, number_of_transactions: bigint, average_transaction_value: double]