In [0]:
dbutils.fs.ls('/mnt/retail_project/silver/retail/customers/')

[FileInfo(path='dbfs:/mnt/retail_project/silver/retail/customers/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/retail_project/silver/retail/customers/part-00000-667d54e9-6d7d-4bf4-a17d-2e4906d8be00-c000.snappy.parquet', name='part-00000-667d54e9-6d7d-4bf4-a17d-2e4906d8be00-c000.snappy.parquet', size=3756, modificationTime=1762979305000),
 FileInfo(path='dbfs:/mnt/retail_project/silver/retail/customers/part-00000-88636c91-712f-4b71-bcae-61f7cb22704a-c000.snappy.parquet', name='part-00000-88636c91-712f-4b71-bcae-61f7cb22704a-c000.snappy.parquet', size=3756, modificationTime=1762979815000)]

In [0]:
from pyspark.sql.functions import *

# Loading cleaned and processed data from silver layer
silver_path = '/mnt/retail_project/silver/retail/'
df_cus = spark.read.format('delta').options(header=True, inferSchema=True).load(f'{silver_path}customers')
df_prod = spark.read.format('delta').options(header=True, inferSchema=True).load(f'{silver_path}products')
df_stores = spark.read.format('delta').options(header=True, inferSchema=True).load(f'{silver_path}stores')
df_tran = spark.read.format('delta').options(header=True, inferSchema=True).load(f'{silver_path}transactions')
display(df_tran)

transaction_id,customer_id,product_id,store_id,quantity,transaction_date,month,year,day
1,127,8,4,4,2025-03-31,3,2025,31
2,105,3,5,4,2024-11-12,11,2024,12
3,116,2,3,2,2025-05-01,5,2025,1
4,120,8,1,1,2024-11-02,11,2024,2
5,105,5,1,2,2025-03-17,3,2025,17
6,110,7,5,3,2025-01-04,1,2025,4
7,110,7,5,2,2025-01-01,1,2025,1
8,126,7,2,5,2025-06-08,6,2025,8
9,123,1,2,3,2024-10-08,10,2024,8
10,124,2,5,2,2024-08-27,8,2024,27


In [0]:
# This the default partitions on shuffle but I will decrease it since my data is not that big
spark.conf.get("spark.sql.shuffle.partitions")

'auto'

In [0]:
# Set the shuffle partitions tp 50.
spark.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.get("spark.sql.shuffle.partitions")

'50'

In [0]:


# Create DIMENSION TABLES for star schema
dim_cus = df_cus.select(
    col('customer_id').alias('CustomerKey'),
    col('customer_id').alias('CustomerID'),
    col('first_name').alias('FirstName'),
    col('last_name').alias('LastName'),
    col('email').alias('Email'),
    col('phone').alias('Phone'),
    col('FullName').alias('FullName')
).distinct()  

dim_prod = df_prod.select(
    col('product_id').alias('ProductKey'),
    col('product_id').alias('ProductID'),
    col('product_name').alias('ProductName'),
    col('category').alias('Category'),
    col('price').alias('Price')
).distinct() 

dim_stores = df_stores.select(
    col('store_id').alias('StoreKey'),
    col('store_id').alias('StoreID'),
    col('store_name').alias('StoreName'),
    col('location').alias('Location')
).distinct()

dim_tran = df_tran.select(
    col('transaction_id').alias('TransactionKey'),
    col('transaction_id').alias('TransactionID'),
    col('customer_id').alias('CustomerID'),
    col('product_id').alias('ProductID'),
    col('store_id').alias('StoreID'),
    col('quantity').alias('Quantity'),
    col('transaction_date').alias('TransactionDate'),
    col('month').alias('Month'),
    col('year').alias('Year'),
    col('day').alias('Day')
)

display(dim_tran)

# Saving the DIMENSION tables in gold layer
gold_base = '/mnt/retail_project/gold/retail/'

# dim_cus.write.format('delta').mode('overwrite').save(f'{gold_base}customers/')
# dim_prod.write.format('delta').mode('overwrite').save(f'{gold_base}products/')
# dim_stores.write.format('delta').mode('overwrite').save(f'{gold_base}stores/')
# dim_tran.write.format('delta').mode('overwrite').save(f'{gold_base}transactions/')

TransactionKey,TransactionID,CustomerID,ProductID,StoreID,Quantity,TransactionDate,Month,Year,Day
1,1,127,8,4,4,2025-03-31,3,2025,31
2,2,105,3,5,4,2024-11-12,11,2024,12
3,3,116,2,3,2,2025-05-01,5,2025,1
4,4,120,8,1,1,2024-11-02,11,2024,2
5,5,105,5,1,2,2025-03-17,3,2025,17
6,6,110,7,5,3,2025-01-04,1,2025,4
7,7,110,7,5,2,2025-01-01,1,2025,1
8,8,126,7,2,5,2025-06-08,6,2025,8
9,9,123,1,2,3,2024-10-08,10,2024,8
10,10,124,2,5,2,2024-08-27,8,2024,27


In [0]:
df_tran.columns, df_cus.columns, df_prod.columns, df_stores.columns

(['transaction_id',
  'customer_id',
  'product_id',
  'store_id',
  'quantity',
  'transaction_date',
  'month',
  'year',
  'day'],
 ['customer_id', 'first_name', 'last_name', 'email', 'phone', 'FullName'],
 ['product_id', 'product_name', 'category', 'price'],
 ['store_id', 'store_name', 'location'])

In [0]:
# Joining all the tables first 
df_joined = df_tran.alias('t').join(df_cus,'customer_id') \
        .join(df_prod.alias('p'), 'product_id') \
        .join(df_stores, 'store_id') \
        .withColumn('total_amount', col('p.price') * col('t.quantity'))
display(df_joined)

store_id,product_id,customer_id,transaction_id,quantity,transaction_date,month,year,day,first_name,last_name,email,phone,FullName,product_name,category,price,store_name,location,total_amount
3,7,101,28,5,2024-11-15,11,2024,15,Ravi,Yadav,user101@example.com,9887654321,Ravi Yadav,Smartwatch,Electronics,4999.0,Tech World Outlet,Bangalore,24995.0
2,1,102,11,3,2024-08-11,8,2024,11,Nina,Joshi,user102@example.com,9876543210,Nina Joshi,Wireless Mouse,Electronics,799.99,High Street Store,Delhi,2399.9700000000003
3,1,103,18,4,2024-09-05,9,2024,5,Sonal,Sharma,user103@example.com,9865432109,Sonal Sharma,Wireless Mouse,Electronics,799.99,Tech World Outlet,Bangalore,3199.96
4,3,104,13,3,2025-05-04,5,2025,4,Karan,Patel,user104@example.com,9854321098,Karan Patel,Yoga Mat,Fitness,499.0,Downtown Mini Store,Pune,1497.0
5,1,105,21,3,2024-10-02,10,2024,2,Riya,Singh,user105@example.com,9843210987,Riya Singh,Wireless Mouse,Electronics,799.99,Mega Plaza,Chennai,2399.9700000000003
4,9,107,22,3,2024-11-16,11,2024,16,Priya,Kapoor,user107@example.com,9821098765,Priya Kapoor,Dumbbell Set,Fitness,1999.0,Downtown Mini Store,Pune,5997.0
4,5,108,12,1,2025-05-26,5,2025,26,Rahul,Verma,user108@example.com,9810987654,Rahul Verma,Notebook Set,Stationery,149.0,Downtown Mini Store,Pune,149.0
5,8,109,17,5,2024-07-10,7,2024,10,Pooja,Mehta,user109@example.com,9809876543,Pooja Mehta,Desk Organizer,Accessories,399.0,Mega Plaza,Chennai,1995.0
5,7,110,7,2,2025-01-01,1,2025,1,Deepak,Nair,user110@example.com,9798765432,Deepak Nair,Smartwatch,Electronics,4999.0,Mega Plaza,Chennai,9998.0
5,8,116,30,4,2025-03-16,3,2025,16,Rakesh,Kapoor,user116@example.com,9732109876,Rakesh Kapoor,Desk Organizer,Accessories,399.0,Mega Plaza,Chennai,1596.0


In [0]:
df_joined.columns

['store_id',
 'product_id',
 'customer_id',
 'transaction_id',
 'quantity',
 'transaction_date',
 'month',
 'year',
 'day',
 'first_name',
 'last_name',
 'email',
 'phone',
 'FullName',
 'product_name',
 'category',
 'price',
 'store_name',
 'location',
 'total_amount']

In [0]:
# Create FACT TABLE

FACT_TABLE = df_joined.select(
    col('store_id'),
    col('product_id'),
    col('customer_id'),
    col('transaction_id'),
    col('total_amount')
)
display(FACT_TABLE)

# Saving the FACT TABLE in gold layer
gold_base = '/mnt/retail_project/gold/retail/'
# FACT_TABLE.write.format('delta').mode('overwrite').save(f'{gold_base}fact')

store_id,product_id,customer_id,transaction_id,total_amount
4,8,127,1,1596.0
5,3,105,2,1996.0
3,2,116,3,2598.98
1,8,120,4,399.0
1,5,105,5,298.0
5,7,110,6,14997.0
5,7,110,7,9998.0
2,7,126,8,24995.0
2,1,123,9,2399.9700000000003
5,2,124,10,2598.98


In [0]:
df_joined.columns

['store_id',
 'product_id',
 'customer_id',
 'transaction_id',
 'quantity',
 'transaction_date',
 'month',
 'year',
 'day',
 'first_name',
 'last_name',
 'email',
 'phone',
 'FullName',
 'product_name',
 'category',
 'price',
 'store_name',
 'location',
 'total_amount']

In [0]:
# Now  we will create aggregate table.
# We will look at the aggregated data of each store date wise
agg_table = df_joined.groupBy(
     col('transaction_date').alias('date'),
     col('location').alias('location'),
     col('store_id').alias('store_id'),
     col('category').alias('category')) \
    .agg(
        sum('total_amount').alias('total_sales'),
        count('transaction_id').alias('total_transactions'),
        count('customer_id').alias('total_customers'),
        sum('quantity').alias('total_quantity_sold'),
        avg('total_amount').alias('avg_transaction_value'),
        count('product_id').alias('total_products_sold')
    ) \
    .orderBy(col('transaction_date'), col('location'), col('store_id'), col('category'))

agg_table = agg_table.withColumn('date', col('date').cast('date'))

display(agg_table)
# Saving the AGG TABLE in gold layer
gold_base = '/mnt/retail_project/gold/retail/'
# agg_table.write.format('delta').mode('overwrite').save(f'{gold_base}agg')



date,location,store_id,category,total_sales,total_transactions,total_customers,total_quantity_sold,avg_transaction_value,total_products_sold
2024-07-10,Chennai,5,Accessories,1995.0,1,1,5,1995.0,1
2024-07-14,Mumbai,1,Electronics,3999.95,1,1,5,3999.95,1
2024-07-14,Pune,4,Accessories,1596.0,1,1,4,1596.0,1
2024-07-17,Chennai,5,Electronics,3199.96,1,1,4,3199.96,1
2024-07-30,Delhi,2,Stationery,149.0,1,1,1,149.0,1
2024-08-11,Delhi,2,Electronics,2399.9700000000003,1,1,3,2399.9700000000003,1
2024-08-27,Chennai,5,Electronics,2598.98,1,1,2,2598.98,1
2024-09-05,Bangalore,3,Electronics,3199.96,1,1,4,3199.96,1
2024-09-21,Delhi,2,Fitness,598.0,1,1,2,598.0,1
2024-10-02,Chennai,5,Electronics,2399.9700000000003,1,1,3,2399.9700000000003,1


In [0]:
dbutils.fs.unmount("/mnt/retail_project")


/mnt/retail_project has been unmounted.


True