In [0]:
# If using Azure, as uploading file not possible at the moment on Azure using Databricks instead
# storage_account_name = "your_storage_account"
# container_name = "raw-data"
# sas_token = "your_sas_token"
# mount_point = f"/mnt/{container_name}"
# dbutils.fs.mount(
#     source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
#     mount_point=mount_point,
#     extra_configs={f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net": sas_token}
# )


In [0]:
# Check files in Databricks FileStore/tables
display(dbutils.fs.ls("/FileStore/tables/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/Baby_Names.csv,Baby_Names.csv,1316490,1716455484000
dbfs:/FileStore/tables/Book1-1.csv,Book1-1.csv,131,1727106499000
dbfs:/FileStore/tables/Book1.csv,Book1.csv,73,1727106073000
dbfs:/FileStore/tables/fact_averagecosts_dlm-1.gz,fact_averagecosts_dlm-1.gz,2219727,1741519789000
dbfs:/FileStore/tables/fact_averagecosts_dlm.gz,fact_averagecosts_dlm.gz,2219727,1741519580000
dbfs:/FileStore/tables/fact_transactions_dlm.gz,fact_transactions_dlm.gz,72346794,1741520450000
dbfs:/FileStore/tables/hier_clnd_dlm-1.gz,hier_clnd_dlm-1.gz,35281,1741519871000
dbfs:/FileStore/tables/hier_clnd_dlm.gz,hier_clnd_dlm.gz,35281,1741519581000
dbfs:/FileStore/tables/hier_hldy_dlm-1.gz,hier_hldy_dlm-1.gz,297,1741519871000
dbfs:/FileStore/tables/hier_hldy_dlm.gz,hier_hldy_dlm.gz,297,1741519582000


In [0]:
from pyspark.sql import SparkSession
import gzip

# Creating Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

# Define the path where files are stored in Databricks FileStore in our case its FileStore/tables/
databricks_data_path = "dbfs:/FileStore/tables/"

# List all uploaded files, this may include other files previous uploaded to the tables folder
files = dbutils.fs.ls(databricks_data_path)
display(files)  # Verifying the file paths

# Read a sample gzip file (fact.transactions)
sample_file = "dbfs:/FileStore/tables/fact_transactions_dlm.gz"


# Load gzip file into a DataFrame
df = spark.read.option("delimiter", "|").csv(sample_file, header=True, inferSchema=True)

# Show the DataFrame structure
df.printSchema()
#df.show() can be used to check whether the data is properly populated and proper schema is defined or not in our case we are querying transactions table
df.show(5)


path,name,size,modificationTime
dbfs:/FileStore/tables/Baby_Names.csv,Baby_Names.csv,1316490,1716455484000
dbfs:/FileStore/tables/Book1-1.csv,Book1-1.csv,131,1727106499000
dbfs:/FileStore/tables/Book1.csv,Book1.csv,73,1727106073000
dbfs:/FileStore/tables/fact_averagecosts_dlm-1.gz,fact_averagecosts_dlm-1.gz,2219727,1741519789000
dbfs:/FileStore/tables/fact_averagecosts_dlm.gz,fact_averagecosts_dlm.gz,2219727,1741519580000
dbfs:/FileStore/tables/fact_transactions_dlm.gz,fact_transactions_dlm.gz,72346794,1741520450000
dbfs:/FileStore/tables/hier_clnd_dlm-1.gz,hier_clnd_dlm-1.gz,35281,1741519871000
dbfs:/FileStore/tables/hier_clnd_dlm.gz,hier_clnd_dlm.gz,35281,1741519581000
dbfs:/FileStore/tables/hier_hldy_dlm-1.gz,hier_hldy_dlm-1.gz,297,1741519871000
dbfs:/FileStore/tables/hier_hldy_dlm.gz,hier_hldy_dlm.gz,297,1741519582000


root
 |-- order_id: long (nullable = true)
 |-- line_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- dt: timestamp (nullable = true)
 |-- pos_site_id: string (nullable = true)
 |-- sku_id: string (nullable = true)
 |-- fscldt_id: integer (nullable = true)
 |-- price_substate_id: string (nullable = true)
 |-- sales_units: integer (nullable = true)
 |-- sales_dollars: double (nullable = true)
 |-- discount_dollars: double (nullable = true)
 |-- original_order_id: long (nullable = true)
 |-- original_line_id: integer (nullable = true)

+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
| order_id|line_id|type|                 dt|pos_site_id|    sku_id|fscldt_id|price_substate_id|sales_units|sales_dollars|discount_dollars|original_order_id|original_line_id|
+---------+-------+----+-------------------+-----------+----------+---------+-------------

In [0]:
%sql
-- Create a database for the project
CREATE DATABASE IF NOT EXISTS retail_dw;
USE retail_dw;


In [0]:
# Defining path for fact_transactions_dlm table
transactions_path = "dbfs:/FileStore/tables/fact_transactions_dlm.gz"

# loading data into a DataFrame
df_transactions = (
    spark.read.option("delimiter", "|").option("header", True).csv(transactions_path)
)

# Creating Bronze table
df_transactions.write.mode("overwrite").saveAsTable("retail_dw.bronze_transactions")

# Verifying
spark.sql("SELECT * FROM retail_dw.bronze_transactions LIMIT 5").show()


+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
| order_id|line_id|type|                 dt|pos_site_id|    sku_id|fscldt_id|price_substate_id|sales_units|sales_dollars|discount_dollars|original_order_id|original_line_id|
+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
|164087401|      2|Sale|2016-01-31T06:17:01|    CATMAIN|2668940801| 20160131|               FP|          1|        58.95|               0|             null|            null|
|164087409|      4|Sale|2016-01-31T06:17:25|    CATMAIN|2920920601| 20160131|               FP|          1|        49.95|               0|             null|            null|
|164087440|      2|Sale|2016-01-31T06:19:28|   INETMAIN|0695690000| 20160131|               FP|          2|         37.9|         

In [0]:
# Defining path for fact_averagecosts_dlm table
averagecosts_path = "dbfs:/FileStore/tables/fact_averagecosts_dlm.gz"

# Loading data into a DataFrame
df_averagecosts = (
    spark.read.option("delimiter", "|").option("header", True).csv(averagecosts_path)
)

# Creating Bronze table
df_averagecosts.write.mode("overwrite").saveAsTable("retail_dw.bronze_averagecosts")

# Verifying
spark.sql("SELECT * FROM retail_dw.bronze_averagecosts LIMIT 5").show()


+---------+----------+-------------------------+-----------------------+
|fscldt_id|    sku_id|average_unit_standardcost|average_unit_landedcost|
+---------+----------+-------------------------+-----------------------+
| 20160131|0174410000|                    10.06|                  10.06|
| 20160201|0174410000|                    10.06|                  10.06|
| 20160202|0174410000|                    10.06|                  10.06|
| 20160203|0174410000|                    10.06|                  10.06|
| 20160204|0174410000|                    10.06|                  10.06|
+---------+----------+-------------------------+-----------------------+



In [0]:
# Defining the file names for dimension table
file_mappings = {
    "bronze_averagecosts": "dbfs:/FileStore/tables/fact_averagecosts_dlm.gz",
    "bronze_clnd": "dbfs:/FileStore/tables/hier_clnd_dlm.gz",
    "bronze_hldy": "dbfs:/FileStore/tables/hier_hldy_dlm.gz",
    "bronze_invloc": "dbfs:/FileStore/tables/hier_invloc_dlm.gz",
    "bronze_invstatus": "dbfs:/FileStore/tables/hier_invstatus_dlm.gz",
    "bronze_possite": "dbfs:/FileStore/tables/hier_possite_dlm.gz",
    "bronze_pricestate": "dbfs:/FileStore/tables/hier_pricestate_dlm.gz",
    "bronze_prod": "dbfs:/FileStore/tables/hier_prod_dlm.gz",
    "bronze_rtlloc": "dbfs:/FileStore/tables/hier_rtlloc_dlm.gz",
}

# Looping through files and load them
for table_name, file_path in file_mappings.items():
    df = spark.read.option("delimiter", "|").option("header", True).csv(file_path)
    df.write.mode("overwrite").saveAsTable(f"retail_dw.{table_name}")
    print(f"✅ Loaded {table_name} into Databricks")


✅ Loaded bronze_averagecosts into Databricks
✅ Loaded bronze_clnd into Databricks
✅ Loaded bronze_hldy into Databricks
✅ Loaded bronze_invloc into Databricks
✅ Loaded bronze_invstatus into Databricks
✅ Loaded bronze_possite into Databricks
✅ Loaded bronze_pricestate into Databricks
✅ Loaded bronze_prod into Databricks
✅ Loaded bronze_rtlloc into Databricks


In [0]:
from pyspark.sql.functions import col

# Reading Bronze table for transaction
df_transactions = spark.table("retail_dw.bronze_transactions")

# Cleaning data: Droping nulls and ensuring correct datatypes
df_transactions_clean = (
    df_transactions
    .dropna(subset=["order_id", "line_id", "sku_id", "pos_site_id", "fscldt_id"])  # Remove nulls in key fields
    .withColumn("line_id", col("line_id").cast("int"))
    .withColumn("sales_units", col("sales_units").cast("int"))
    .withColumn("sales_dollars", col("sales_dollars").cast("double"))
    .withColumn("discount_dollars", col("discount_dollars").cast("double"))
)

# Saving as Silver Table
df_transactions_clean.write.mode("overwrite").saveAsTable("retail_dw.silver_transactions")

# Verifying
spark.sql("SELECT * FROM retail_dw.silver_transactions LIMIT 5").show()


+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
| order_id|line_id|type|                 dt|pos_site_id|    sku_id|fscldt_id|price_substate_id|sales_units|sales_dollars|discount_dollars|original_order_id|original_line_id|
+---------+-------+----+-------------------+-----------+----------+---------+-----------------+-----------+-------------+----------------+-----------------+----------------+
|164087401|      2|Sale|2016-01-31T06:17:01|    CATMAIN|2668940801| 20160131|               FP|          1|        58.95|             0.0|             null|            null|
|164087409|      4|Sale|2016-01-31T06:17:25|    CATMAIN|2920920601| 20160131|               FP|          1|        49.95|             0.0|             null|            null|
|164087440|      2|Sale|2016-01-31T06:19:28|   INETMAIN|0695690000| 20160131|               FP|          2|         37.9|         

In [0]:
# Reading  Bronze table for averagecosts
df_averagecosts = spark.table("retail_dw.bronze_averagecosts")

# Cleaning data: Droping nulls and ensuring correct datatypes
df_averagecosts_clean = (
    df_averagecosts
    .dropna(subset=["fscldt_id", "sku_id"])
    .withColumn("average_unit_standardcost", col("average_unit_standardcost").cast("double"))
    .withColumn("average_unit_landedcost", col("average_unit_landedcost").cast("double"))
)

# Saving as Silver Table
df_averagecosts_clean.write.mode("overwrite").saveAsTable("retail_dw.silver_averagecosts")

# Verifying
spark.sql("SELECT * FROM retail_dw.silver_averagecosts LIMIT 5").show()


+---------+----------+-------------------------+-----------------------+
|fscldt_id|    sku_id|average_unit_standardcost|average_unit_landedcost|
+---------+----------+-------------------------+-----------------------+
| 20160131|0174410000|                    10.06|                  10.06|
| 20160201|0174410000|                    10.06|                  10.06|
| 20160202|0174410000|                    10.06|                  10.06|
| 20160203|0174410000|                    10.06|                  10.06|
| 20160204|0174410000|                    10.06|                  10.06|
+---------+----------+-------------------------+-----------------------+



In [0]:
# Reading Bronze table for prod
df_prod = spark.table("retail_dw.bronze_prod")

# Selecting relevant columns and removing duplicates
df_prod_clean = df_prod.select(
    "sku_id", "sku_label", "stylclr_id", "stylclr_label", "styl_id", "styl_label",
    "subcat_id", "subcat_label", "cat_id", "cat_label", "dept_id", "dept_label"
).dropDuplicates()

# Saving as Silver Table
df_prod_clean.write.mode("overwrite").saveAsTable("retail_dw.silver_prod")

# Verifying
spark.sql("SELECT * FROM retail_dw.silver_prod LIMIT 5").show()


+----------+--------------------+----------+--------------------+-------+--------------------+---------+-------------------+------+--------------+-------+----------+
|    sku_id|           sku_label|stylclr_id|       stylclr_label|styl_id|          styl_label|subcat_id|       subcat_label|cat_id|     cat_label|dept_id|dept_label|
+----------+--------------------+----------+--------------------+-------+--------------------+---------+-------------------+------+--------------+-------+----------+
|2759930501|ESSENTIAL LOUNGE ...|   2759930|ESSENTIAL LOUNGE ...|  27599|ESSENTIAL LOUNGE TOP|      405|               TOPS|  1000|          TOPS|   2000|   APPAREL|
|0806490000|SCENTUAL SUEDES B...|   0806490|SCENTUAL SUEDES B...|  08064|SCENTUAL SUEDES B...|      526|FRAGRANCE / PERFUME|  1017|     FRAGRANCE|   2003|    BEAUTY|
|0723420000|GLAMOLASH MASCARA...|   0723420|GLAMOLASH MASCARA...|  07234|GLAMOLASH MASCARA...|      532|          COSMETICS|  1019|        MAKEUP|   2003|    BEAUTY|
|036

In [0]:
# Reading Bronze table for possite
df_possite = spark.table("retail_dw.bronze_possite")

# Removing duplicates
df_possite_clean = df_possite.dropDuplicates()

# Saving as Silver Table
df_possite_clean.write.mode("overwrite").saveAsTable("retail_dw.silver_possite")

# Verifying
spark.sql("SELECT * FROM retail_dw.silver_possite LIMIT 5").show()


+-------+----------------+----------+-------------+-------+----------+
|site_id|      site_label|subchnl_id|subchnl_label|chnl_id|chnl_label|
+-------+----------------+----------+-------------+-------+----------+
|    118|  Evergreen Walk|    RTLPOS|   Retail POS| RTLPOS|Retail POS|
|    150|Kierland Commons|    RTLPOS|   Retail POS| RTLPOS|Retail POS|
|CATMAIN|    Catalog Main|   CATMAIN| Catalog Main|    CAT|   Catalog|
|    173|   Paddock Shops|    RTLPOS|   Retail POS| RTLPOS|Retail POS|
|    137|Greenway Station|    RTLPOS|   Retail POS| RTLPOS|Retail POS|
+-------+----------------+----------+-------------+-------+----------+



In [0]:
# Reading Bronze table for hldy
df_hldy = spark.table("retail_dw.bronze_hldy")

# Removing duplicates
df_hldy_clean = df_hldy.dropDuplicates()

# Saving as Silver Table
df_hldy_clean.write.mode("overwrite").saveAsTable("retail_dw.silver_hldy")

# Verifying
spark.sql("SELECT * FROM retail_dw.silver_hldy LIMIT 5").show()


+------------+------------+
|     hldy_id|  hldy_label|
+------------+------------+
|Memorial_Day|Memorial Day|
| Mothers_Day|Mother's Day|
|   Halloween|   Halloween|
|Election_Day|Election Day|
|Columbus_Day|Columbus Day|
+------------+------------+



Why Normalization is Not Needed for Certain Tables

    hier.invloc (Inventory Locations) – No Normalization Needed
        These locations are not referenced in fact tables, meaning they are not directly involved in transactional data processing.

    hier.rtlloc (Retail Locations) – No Normalization Needed
        Similar to inventory locations, these are not linked to fact tables, so normalization won’t improve efficiency.

    hier.invstatus (Inventory Status) – No Normalization Needed
        Inventory status is not used for transactions, making normalization unnecessary.


In [0]:
from pyspark.sql.functions import sum

# Reading clean transactions data
df_sales = spark.table("retail_dw.silver_transactions")

# Aggregating to create weekly sales summary
df_weekly_sales = (
    df_sales
    .groupBy("pos_site_id", "sku_id", "fscldt_id", "price_substate_id", "type")
    .agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )
)

# Saving as Gold Table
df_weekly_sales.write.mode("overwrite").saveAsTable("retail_dw.mview_weekly_sales")

# Verifying the table
spark.sql("SELECT * FROM retail_dw.mview_weekly_sales LIMIT 5").show()


+-----------+----------+---------+-----------------+----+-----------------+-------------------+----------------------+
|pos_site_id|    sku_id|fscldt_id|price_substate_id|type|total_sales_units|total_sales_dollars|total_discount_dollars|
+-----------+----------+---------+-----------------+----+-----------------+-------------------+----------------------+
|        129|0174450000| 20160201|               FP|Sale|                1|              28.75|                   0.0|
|    CATMAIN|0403020000| 20160201|               FP|Sale|                8| 183.59999999999997|                   0.0|
|    CATMAIN|1085190075| 20160202|               FP|Sale|                1|               75.0|                   0.0|
|        132|0693790000| 20160202|              MD3|Sale|                3| 38.910000000000004|                   0.0|
|    CATMAIN|2785131701| 20160203|               FP|Sale|                1|              69.95|                   0.0|
+-----------+----------+---------+--------------

In [0]:
from pyspark.sql.functions import max

# Getting last processed fiscal date from mview_weekly_sales
last_processed_date = spark.sql("SELECT MAX(fscldt_id) FROM retail_dw.mview_weekly_sales").collect()[0][0]

# Reading only new transactions since last processed date
df_latest_sales = spark.sql(f"""
    SELECT * FROM retail_dw.silver_transactions
    WHERE fscldt_id > '{last_processed_date}'
""")

# Aggregating new sales data
df_new_aggregates = (
    df_latest_sales
    .groupBy("pos_site_id", "sku_id", "fscldt_id", "price_substate_id", "type")
    .agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )
)

# Merging new aggregates with existing data
df_final = df_new_aggregates.union(spark.table("retail_dw.mview_weekly_sales")).dropDuplicates()

# Overwriting the Gold table with updated data
df_final.write.mode("overwrite").saveAsTable("retail_dw.mview_weekly_sales")

# Verifying
spark.sql("SELECT * FROM retail_dw.mview_weekly_sales ORDER BY fscldt_id DESC LIMIT 5").show()


+-----------+----------+---------+-----------------+----+-----------------+-------------------+----------------------+
|pos_site_id|    sku_id|fscldt_id|price_substate_id|type|total_sales_units|total_sales_dollars|total_discount_dollars|
+-----------+----------+---------+-----------------+----+-----------------+-------------------+----------------------+
|    INETOUT|2AG3220601| 20200423|               FP|Sale|                5|              100.0|                   0.0|
|    INETOUT|2BT1320701| 20200423|               FP|Sale|                4|               80.0|                   0.0|
|    INETOUT|2CG3330701| 20200423|              MD3|Sale|                1|              19.97|                   0.0|
|    CATMAIN|0882281000| 20200423|               FP|Sale|                1|               22.0|                   0.0|
|    INETOUT|2920940601| 20200423|               FP|Sale|                1|               20.0|                   0.0|
+-----------+----------+---------+--------------