#### The Gold Layer in a data lakehouse or data architecture represents the final, refined, and business-ready data. It is optimized for consumption by end-users, such as analysts, data scientists, and business stakeholders. The transformations applied at this layer are focused on enriching data with additional context, aggregating data for insights, Optimizing data for performance and usability, Ensuring data quality and consistency, and structuring data to make it highly usable and performant for reporting, analytics, and machine learning.

In [0]:
from pyspark.sql import SparkSession  # To create a Spark session
from pyspark.sql import DataFrame  # To work with DataFrames
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, when, lit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.functions import regexp_replace


## Data Reading - Silver

In [0]:
spark.conf.set("fs.azure.account.auth.type.adventureworksadls0.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adventureworksadls0.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adventureworksadls0.dfs.core.windows.net", "326a7877-806a-4d58-b021-9d60117c06f8")
spark.conf.set("fs.azure.account.oauth2.client.secret.adventureworksadls0.dfs.core.windows.net", "mZ28Q~B0nmbhxccEDnFkJBmITvQmXq~Rug~Z0dyo")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adventureworksadls0.dfs.core.windows.net", "https://login.microsoftonline.com/26a6638c-cdb1-4861-8638-bb12bcd55165/oauth2/token")

## 1- Read Table : Address

In [0]:
df_address = spark.read.format("delta")\
        .option("header", "True")\
        .option("inferSchema", "True")\
        .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Address")

display(df_address)

AddressID,Address,City,State,Country,PostalCode,rowguid,ModifiedDate
9,8713 Yosemite Ct.,Bothell,Washington,United States,98011,268af621-76d7-4c78-9441-144fd139821a,2006-07-01
11,1318 Lasalle Street,Bothell,Washington,United States,98011,981b3303-aca2-49c7-9a96-fb670785b269,2007-04-01
25,9178 Jumping St.,Dallas,Texas,United States,75201,c8df3bd9-48f0-4654-a8dd-14a67a84d3c6,2006-09-01
28,9228 Via Del Sol,Phoenix,Arizona,United States,85004,12ae5ee1-fc3e-468b-9b92-3b970b169774,2005-09-01
32,26910 Indela Road,Montreal,Quebec,Canada,H1Y 2H5,84a95f62-3ae8-4e7e-bbd5-5a6f00cd982d,2006-08-01
185,2681 Eagle Peak,Bellevue,Washington,United States,98004,7bccf442-2268-46cc-8472-14c44c14e98c,2006-09-01
297,7943 Walnut Ave,Renton,Washington,United States,98055,52410da4-2778-4b1d-a599-95746625ce6d,2006-08-01
445,6388 Lake City Way,Burnaby,British Columbia,Canada,V5A 3A6,53572f25-9133-4a8b-a065-102ff35416ee,2006-09-01
446,52560 Free Street,Toronto,Ontario,Canada,M4B 1V7,801a1dfc-5125-486b-aa84-ccbd2ec57ca4,2005-08-01
447,22580 Free Street,Toronto,Ontario,Canada,M4B 1V7,88cee379-dbb8-433b-b84e-a35e09435500,2006-08-01


### Load Transformed Address Table Data to Gold Layer/ SalesLT/ Address/ Address.delta

In [0]:
df_address.write.format('delta')\
            .mode('append')\
            .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/Address")\
            .save()

## 2- Read Table : Customer

In [0]:
df_customer = spark.read.format("delta")\
                            .option("header", "True")\
                            .option("inferSchema", "True")\
                            .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Customer")

display(df_customer)

CustomerID,CompanyName,SalesPerson,EmailAddress,Phone,ModifiedDate,FullName
1,A Bike Store,adventure-works\pamela0,orlando0@adventure-works.com,245-555-0173,2005-08-01,Orlando N. Gee
2,Progressive Sports,adventure-works\david8,keith0@adventure-works.com,170-555-0127,2006-08-01,Keith Harris
3,Advanced Bike Components,adventure-works\jillian0,donna0@adventure-works.com,279-555-0130,2005-09-01,Donna F. Carreras
4,Modular Cycle Systems,adventure-works\jillian0,janet1@adventure-works.com,710-555-0173,2006-07-01,Janet M. Gates
5,Metropolitan Sports Supply,adventure-works\shu0,lucy0@adventure-works.com,828-555-0186,2006-09-01,Lucy Harrington
6,Aerobic Exercise Company,adventure-works\linda3,rosmarie0@adventure-works.com,244-555-0112,2007-09-01,Rosmarie J. Carroll
7,Associated Bikes,adventure-works\shu0,dominic0@adventure-works.com,192-555-0173,2006-07-01,Dominic P. Gash
10,Rural Cycle Emporium,adventure-works\josé1,kathleen0@adventure-works.com,150-555-0127,2006-09-01,Kathleen M. Garza
11,Sharp Bikes,adventure-works\josé1,katherine0@adventure-works.com,926-555-0159,2005-08-01,Katherine Harding
12,Bikes and Motorbikes,adventure-works\garrett1,johnny0@adventure-works.com,112-555-0191,2006-08-01,Johnny A. Caprio


### Load Transformed Customer Table Data to Gold Layer/ SalesLT/ Customer/ Customer.delta

In [0]:
df_customer.write.format('delta')\
            .mode('overwrite')\
            .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/Customer")\
            .save()

## 3- Read Table : CustomerAddress

In [0]:
df_CustomerAddress = spark.read.format("delta")\
                            .option("header", "True")\
                            .option("inferSchema", "True")\
                            .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/CustomerAddress")

display(df_CustomerAddress)

CustomerID,AddressID,AddressType,ModifiedDate
29485,1086,Main Office,2007-09-01
29486,621,Main Office,2005-09-01
29489,1069,Main Office,2005-07-01
29490,887,Main Office,2006-09-01
29492,618,Main Office,2006-12-01
29494,537,Main Office,2005-09-01
29496,1072,Main Office,2007-09-01
29497,889,Main Office,2005-07-01
29499,527,Main Office,2006-09-01
29502,893,Main Office,2007-07-01


### Load Transformed CustomerAddress Table Data to Gold Layer/ SalesLT/ CustomerAddress/ CustomerAddress.delta

In [0]:
df_CustomerAddress.write.format('delta')\
                    .mode('overwrite')\
                    .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/CustomerAddress")\
                    .save()

## 4- Read Table : Product

In [0]:
df_product = spark.read.format("delta")\
                    .option("header", "True")\
                    .option("inferSchema", "True")\
                    .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Product")

display(df_product)

ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,ModifiedDate
680,"HL Road Frame - Black, 58",FR-R92B-58,Black,1059.31,1431.5,58,1016.04,18,6,2002-06-01,2008-03-11
706,"HL Road Frame - Red, 58",FR-R92R-58,Red,1059.31,1431.5,58,1016.04,18,6,2002-06-01,2008-03-11
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
709,"Mountain Bike Socks, M",SO-B909-M,White,3.3963,9.5,M,0.0,27,18,2005-07-01,2008-03-11
710,"Mountain Bike Socks, L",SO-B909-L,White,3.3963,9.5,L,0.0,27,18,2005-07-01,2008-03-11
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
712,AWC Logo Cap,CA-1098,Multi,6.9223,8.99,Unknown,0.0,23,2,2005-07-01,2008-03-11
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,0.0,25,11,2005-07-01,2008-03-11
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,0.0,25,11,2005-07-01,2008-03-11


### Load Transformed Product Table Data to Gold Layer/ SalesLT/ Product/ Product.delta

In [0]:
df_product.write.format('delta')\
            .mode('overwrite')\
            .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/Product")\
            .save()

## 5- Read Table : ProductCategory

In [0]:
df_ProductCategory = spark.read.format("delta")\
                            .option("header", "True")\
                            .option("inferSchema", "True")\
                            .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductCategory")

display(df_ProductCategory)

ProductCategoryID,ParentProductCategoryID,Name,ModifiedDate
1,0,Bikes,2002-06-01
2,0,Components,2002-06-01
3,0,Clothing,2002-06-01
4,0,Accessories,2002-06-01
5,1,Mountain Bikes,2002-06-01
6,1,Road Bikes,2002-06-01
7,1,Touring Bikes,2002-06-01
8,2,Handlebars,2002-06-01
9,2,Bottom Brackets,2002-06-01
10,2,Brakes,2002-06-01


### Load Transformed ProductCategory Table Data to Gold Layer/ SalesLT/ ProductCategory/ ProductCategory.delta



In [0]:
df_ProductCategory.write.format('delta')\
                    .mode('overwrite')\
                    .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductCategory")\
                    .save()

## 6 - Read Table : ProductDescription

In [0]:
df_prod_desc = spark.read.format("delta")\
                    .option("header", "True")\
                    .option("inferSchema", "True")\
                    .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductDescription")

display(df_prod_desc)

ProductDescriptionID,Description,ModifiedDate
3,Chromoly steel.,2007-06-01
4,Aluminum alloy cups; large diameter spindle.,2007-06-01
5,Aluminum alloy cups and a hollow axle.,2007-06-01
8,"Suitable for any type of riding, on or off-road. Fits any budget. Smooth-shifting with a comfortable ride.",2007-06-01
64,"This bike delivers a high-level of performance on a budget. It is responsive and maneuverable, and offers peace-of-mind when you decide to go off-road.",2007-06-01
88,For true trail addicts. An extremely durable bike that will go anywhere and keep you in control on challenging terrain - without breaking your budget.,2007-06-01
128,Serious back-country riding. Perfect for all levels of competition. Uses the same HL Frame as the Mountain-100.,2008-03-11
168,"Top-of-the-line competition mountain bike. Performance-enhancing options include the innovative HL Frame, super-smooth front suspension, and traction for all terrain.",2007-06-01
170,Suitable for any type of off-road trip. Fits any budget.,2007-06-01
209,Entry level adult bike; offers a comfortable ride cross-country or down the block. Quick-release hubs and rims.,2007-06-01


### Load Transformed ProductDescription Table Data to Gold Layer/ SalesLT/ ProductDescription/ProductDescription.delta

In [0]:
df_prod_desc.write.format('delta')\
                .mode('overwrite')\
                .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductDescription")\
                .save()

## 7 - Read Table : ProductModel

In [0]:
df_prod_model = spark.read.format("delta")\
                       .option("header", "True")\
                        .option("inferSchema", "True")\
                        .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductModel")

display(df_prod_model)

ProductModelID,Name,ModifiedDate
1,Classic Vest,2007-06-01
2,Cycling Cap,2005-06-01
3,Full-Finger Gloves,2006-06-01
4,Half-Finger Gloves,2006-06-01
5,HL Mountain Frame,2005-06-01
6,HL Road Frame,2002-05-02
7,HL Touring Frame,2009-05-16
8,LL Mountain Frame,2006-11-20
9,LL Road Frame,2005-06-01
10,LL Touring Frame,2009-05-16


### Load Transformed ProductModel Table Data to Gold Layer/ SalesLT/ ProductModel/ ProductModel.delta

In [0]:
df_prod_model.write.format('delta')\
                 .mode('overwrite')\
                 .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductModel")\
                .save()

## 8- Read Table : ProductModelProductDescription

In [0]:
df_prod_mode_prod_desc = spark.read.format("delta")\
                                .option("header", "True")\
                                .option("inferSchema", "True")\
                                .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductModelProductDescription")

display(df_prod_mode_prod_desc)

ProductModelID,ProductDescriptionID,Culture,ModifiedDate
1,1199,en,2007-06-01
1,1467,ar,2007-06-01
1,1589,fr,2007-06-01
1,1712,th,2007-06-01
1,1838,he,2007-06-01
1,1965,zh-cht,2007-06-01
2,1210,en,2007-06-01
2,1476,ar,2007-06-01
2,1598,fr,2007-06-01
2,1721,th,2007-06-01


### Load Transformed ProductModelProductDescription Table Data to Gold Layer/ SalesLT/ ProductModelProductDescription/ ProductModelProductDescription.delta


In [0]:
df_prod_mode_prod_desc.write.format('delta')\
                         .mode('overwrite')\
                        .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductModelProductDescription")\
                        .save()

## 9- Read Table : SalesOrderDetail

In [0]:
df_sales_order_details = spark.read.format("delta")\
                            .option("header", "True")\
                            .option("inferSchema", "True")\
                            .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderDetail")

display(df_sales_order_details)

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate
71774,110562,1,836,356.898,0.0,356.898,2008-06-01
71774,110563,1,822,356.898,0.0,356.898,2008-06-01
71776,110567,1,907,63.9,0.0,63.9,2008-06-01
71780,110616,4,905,218.454,0.0,873.816,2008-06-01
71780,110617,2,983,461.694,0.0,923.388,2008-06-01
71780,110618,6,988,112.998,0.4,406.7928,2008-06-01
71780,110619,2,748,818.7,0.0,1637.4,2008-06-01
71780,110620,1,990,323.994,0.0,323.994,2008-06-01
71780,110621,1,926,149.874,0.0,149.874,2008-06-01
71780,110622,1,743,809.76,0.0,809.76,2008-06-01


### Load Transformed SalesOrderDetail Table Data to Gold Layer/ SalesLT/ SalesOrderDetail/ SalesOrderDetail.delta

In [0]:
df_sales_order_details.write.format('delta')\
                         .mode('overwrite')\
                        .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderDetail")\
                        .save()

## 10- Read Table : SalesOrderHeader

In [0]:
df_sales_order_header = spark.read.format("delta")\
                                .option("header", "True")\
                                .option("inferSchema", "True") \
                                .load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderHeader")

display(df_sales_order_header)

SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ShipToAddressID,BillToAddressID,ShipMethod,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
71774,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71774,PO348186287,10-4020-000609,29847,1092,1092,CARGO TRANSPORT 5,880.3484,70.4279,22.0087,972.785,2008-06-08
71776,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71776,PO19952192051,10-4020-000106,30072,640,640,CARGO TRANSPORT 5,78.81,6.3048,1.9703,87.0851,2008-06-08
71780,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71780,PO19604173239,10-4020-000340,30113,653,653,CARGO TRANSPORT 5,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71782,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71782,PO19372114749,10-4020-000582,29485,1086,1086,CARGO TRANSPORT 5,39785.3304,3182.8264,994.6333,43962.7901,2008-06-08
71783,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71783,PO19343113609,10-4020-000024,29957,992,992,CARGO TRANSPORT 5,83858.4261,6708.6741,2096.4607,92663.5609,2008-06-08
71784,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71784,PO19285135919,10-4020-000448,29736,659,659,CARGO TRANSPORT 5,108561.8317,8684.9465,2714.0458,119960.824,2008-06-08
71796,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71796,PO17052159664,10-4020-000420,29660,1058,1058,CARGO TRANSPORT 5,57634.6342,4610.7707,1440.8659,63686.2708,2008-06-08
71797,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71797,PO16501134889,10-4020-000142,29796,642,642,CARGO TRANSPORT 5,78029.6898,6242.3752,1950.7422,86222.8072,2008-06-08
71815,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71815,PO13021155785,10-4020-000276,30089,1034,1034,CARGO TRANSPORT 5,1141.5782,91.3263,28.5395,1261.444,2008-06-08
71816,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71816,PO12992180445,10-4020-000295,30027,1038,1038,CARGO TRANSPORT 5,3398.1659,271.8533,84.9541,3754.9733,2008-06-08


### Load Transformed SalesOrderHeader Table Data to Gold Layer/ SalesLT/ SalesOrderHeader/ SalesOrderHeader.delta


In [0]:
df_sales_order_header.write.format('delta')\
                         .mode('overwrite')\
                         .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderHeader")\
                         .save()

# GOLD TRANSFORMATIONS

In [0]:
# %md
#  Understanding Gold Layer Requirements
#  The gold layer should contain:

#  Dimension Tables: Master/reference data that provides context (slow-changing)

#  Fact Tables: Transactional data with business metrics (fast-changing)

#  Aggregate Tables: Pre-computed summaries for performance

#  Bridge Tables: For many-to-many relationships (if needed)

In [0]:
# %md
#  Star Schema Design:

#  Dimension Tables (Conformed Dimensions):

#  DimCustomer (from Customer, CustomerAddress, Address tables)

#  DimProduct (from Product, ProductCategory, ProductModel tables)

#  DimDate (generate date dimension)

#  Fact Tables:

#  FactSales (from SalesOrderHeader, SalesOrderDetail tables)

#  Aggregate Tables:

#  SalesSummaryByCustomer (customer performance metrics)

#  SalesSummaryByProduct (product performance metrics)

#  SalesTrendsByTime (time-based aggregations)


# Dimensions

## 1 - DimCustomer

In [0]:
# %md
#  DimCustomer : Transforming for Gold Layer in PySpark

#  Create the DimCustomer dimension table from the Silver Layer tables:

#  1 - Silver.Customer

#  2 - Silver.CustomerAddress

#  3 - Silver.Address

## Step 1 : Load Data from Silver Layer

In [0]:
silver = "abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/"

In [0]:
df_customer = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Customer")
df_customer_address = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/CustomerAddress")
df_address = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Address")


In [0]:
display(df_customer)
display(df_customer_address)
display(df_address)

CustomerID,CompanyName,SalesPerson,EmailAddress,Phone,ModifiedDate,FullName
1,A Bike Store,adventure-works\pamela0,orlando0@adventure-works.com,245-555-0173,2005-08-01,Orlando N. Gee
2,Progressive Sports,adventure-works\david8,keith0@adventure-works.com,170-555-0127,2006-08-01,Keith Harris
3,Advanced Bike Components,adventure-works\jillian0,donna0@adventure-works.com,279-555-0130,2005-09-01,Donna F. Carreras
4,Modular Cycle Systems,adventure-works\jillian0,janet1@adventure-works.com,710-555-0173,2006-07-01,Janet M. Gates
5,Metropolitan Sports Supply,adventure-works\shu0,lucy0@adventure-works.com,828-555-0186,2006-09-01,Lucy Harrington
6,Aerobic Exercise Company,adventure-works\linda3,rosmarie0@adventure-works.com,244-555-0112,2007-09-01,Rosmarie J. Carroll
7,Associated Bikes,adventure-works\shu0,dominic0@adventure-works.com,192-555-0173,2006-07-01,Dominic P. Gash
10,Rural Cycle Emporium,adventure-works\josé1,kathleen0@adventure-works.com,150-555-0127,2006-09-01,Kathleen M. Garza
11,Sharp Bikes,adventure-works\josé1,katherine0@adventure-works.com,926-555-0159,2005-08-01,Katherine Harding
12,Bikes and Motorbikes,adventure-works\garrett1,johnny0@adventure-works.com,112-555-0191,2006-08-01,Johnny A. Caprio


CustomerID,AddressID,AddressType,ModifiedDate
29485,1086,Main Office,2007-09-01
29486,621,Main Office,2005-09-01
29489,1069,Main Office,2005-07-01
29490,887,Main Office,2006-09-01
29492,618,Main Office,2006-12-01
29494,537,Main Office,2005-09-01
29496,1072,Main Office,2007-09-01
29497,889,Main Office,2005-07-01
29499,527,Main Office,2006-09-01
29502,893,Main Office,2007-07-01


AddressID,Address,City,State,Country,PostalCode,rowguid,ModifiedDate
9,8713 Yosemite Ct.,Bothell,Washington,United States,98011,268af621-76d7-4c78-9441-144fd139821a,2006-07-01
11,1318 Lasalle Street,Bothell,Washington,United States,98011,981b3303-aca2-49c7-9a96-fb670785b269,2007-04-01
25,9178 Jumping St.,Dallas,Texas,United States,75201,c8df3bd9-48f0-4654-a8dd-14a67a84d3c6,2006-09-01
28,9228 Via Del Sol,Phoenix,Arizona,United States,85004,12ae5ee1-fc3e-468b-9b92-3b970b169774,2005-09-01
32,26910 Indela Road,Montreal,Quebec,Canada,H1Y 2H5,84a95f62-3ae8-4e7e-bbd5-5a6f00cd982d,2006-08-01
185,2681 Eagle Peak,Bellevue,Washington,United States,98004,7bccf442-2268-46cc-8472-14c44c14e98c,2006-09-01
297,7943 Walnut Ave,Renton,Washington,United States,98055,52410da4-2778-4b1d-a599-95746625ce6d,2006-08-01
445,6388 Lake City Way,Burnaby,British Columbia,Canada,V5A 3A6,53572f25-9133-4a8b-a065-102ff35416ee,2006-09-01
446,52560 Free Street,Toronto,Ontario,Canada,M4B 1V7,801a1dfc-5125-486b-aa84-ccbd2ec57ca4,2005-08-01
447,22580 Free Street,Toronto,Ontario,Canada,M4B 1V7,88cee379-dbb8-433b-b84e-a35e09435500,2006-08-01


## Step 2 : Transformation : Join Tables to Create a Denormalized DimCustomer

In [0]:
df_dim_customer = df_customer \
    .join(df_customer_address, "CustomerID", "left") \
    .join(df_address, "AddressID", "left") \
    .select(
        df_customer["CustomerID"].alias("customer_key"),  # Surrogate Key
        df_customer["CompanyName"].alias("company_name"),
        df_customer["FullName"].alias("full_name"),
        df_customer["Phone"].alias("phone"),
        df_customer["EmailAddress"].alias("email"),
        df_address["Address"].alias("address"),
        df_address["City"].alias("city"),
        df_address["State"].alias("state"),
        df_address["Country"].alias("country"),
        df_address["PostalCode"].alias("postal_code"),
        df_customer["ModifiedDate"].alias("customer_modified_date")
    )


In [0]:
# %md
#  Explanation:

# 1 - Joins Customer with CustomerAddress using CustomerID.

# 2 - Joins the result with Address using AddressID.

# 3 - Selects only necessary columns for analytics.

# 4 - Renames CustomerID to CustomerKey (surrogate key).

In [0]:
display(df_dim_customer)

customer_key,company_name,full_name,phone,email,address,city,state,country,postal_code,customer_modified_date
29485,Professional Sales and Service,Catherine R. Abel,747-555-0171,catherine0@adventure-works.com,57251 Serene Blvd,Van Nuys,California,United States,91411,2009-05-16
29486,Riders Company,Kim Abercrombie,334-555-0137,kim2@adventure-works.com,Tanger Factory,Branch,Minnesota,United States,55056,2009-05-16
29489,Area Bike Accessories,Frances B. Adams,991-555-0183,frances0@adventure-works.com,6900 Sisk Road,Modesto,California,United States,95354,2009-05-16
29490,Bicycle Accessories and Kits,Margaret J. Smith,959-555-0151,margaret0@adventure-works.com,Lewiston Mall,Lewiston,Idaho,United States,83501,2009-05-16
29492,Valley Bicycle Specialists,Jay Adams,158-555-0142,jay1@adventure-works.com,Blue Ridge Mall,Kansas City,Missouri,United States,64106,2009-05-16
29494,Vinyl and Plastic Goods Corporation,Samuel N. Agcaoili,554-555-0110,samuel0@adventure-works.com,No. 25800-130 King Street West,Toronto,Ontario,Canada,M4B 1V5,2005-09-01
29496,Fun Toys and Bikes,Robert E. Ahlering,678-555-0175,robert1@adventure-works.com,6500 East Grant Road,Tucson,Arizona,United States,85701,2007-09-01
29497,Great Bikes,François Ferrier,571-555-0128,françois1@adventure-works.com,Eastridge Mall,Casper,Wyoming,United States,82601,2005-07-01
29499,Valley Toy Store,Amy E. Alberts,727-555-0115,amy1@adventure-works.com,252851 Rowan Place,Richmond,British Columbia,Canada,V6B 3P7,2006-09-01
29502,Major Sport Suppliers,Paul L. Alcorn,331-555-0162,paul2@adventure-works.com,White Mountain Mall,Rock Springs,Wyoming,United States,82901,2007-07-01


## Step 3 : Generate Surrogate Key for Customer

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

df_dim_customer = df_dim_customer.withColumn("customer_key", monotonically_increasing_id())

In [0]:
# %md
# Explanation

# 1 - customer_key is a surrogate key used in fact tables.

# 2 - Avoids using primary keys from source systems.

## Step 4: Handle Missing & Null Values

In [0]:
df_dim_customer = df_dim_customer.fillna({
    "company_name": "Unknown",
    "phone": "Not Available",
    "email": "Not Provided",
    "city": "Unknown",
    "state": "Unknown",
    "country": "Unknown"
})

display(df_dim_customer)

customer_key,company_name,full_name,phone,email,address,city,state,country,postal_code,customer_modified_date
0,Professional Sales and Service,Catherine R. Abel,747-555-0171,catherine0@adventure-works.com,57251 Serene Blvd,Van Nuys,California,United States,91411,2009-05-16
1,Riders Company,Kim Abercrombie,334-555-0137,kim2@adventure-works.com,Tanger Factory,Branch,Minnesota,United States,55056,2009-05-16
2,Area Bike Accessories,Frances B. Adams,991-555-0183,frances0@adventure-works.com,6900 Sisk Road,Modesto,California,United States,95354,2009-05-16
3,Bicycle Accessories and Kits,Margaret J. Smith,959-555-0151,margaret0@adventure-works.com,Lewiston Mall,Lewiston,Idaho,United States,83501,2009-05-16
4,Valley Bicycle Specialists,Jay Adams,158-555-0142,jay1@adventure-works.com,Blue Ridge Mall,Kansas City,Missouri,United States,64106,2009-05-16
5,Vinyl and Plastic Goods Corporation,Samuel N. Agcaoili,554-555-0110,samuel0@adventure-works.com,No. 25800-130 King Street West,Toronto,Ontario,Canada,M4B 1V5,2005-09-01
6,Fun Toys and Bikes,Robert E. Ahlering,678-555-0175,robert1@adventure-works.com,6500 East Grant Road,Tucson,Arizona,United States,85701,2007-09-01
7,Great Bikes,François Ferrier,571-555-0128,françois1@adventure-works.com,Eastridge Mall,Casper,Wyoming,United States,82601,2005-07-01
8,Valley Toy Store,Amy E. Alberts,727-555-0115,amy1@adventure-works.com,252851 Rowan Place,Richmond,British Columbia,Canada,V6B 3P7,2006-09-01
9,Major Sport Suppliers,Paul L. Alcorn,331-555-0162,paul2@adventure-works.com,White Mountain Mall,Rock Springs,Wyoming,United States,82901,2007-07-01


In [0]:
# %md
# Why?

# 1 - Ensures data completeness in Gold Layer.

# 2 - Fills missing values with default placeholders.

## Step 5: Format Date Columns


In [0]:
from pyspark.sql.functions import to_date

df_dim_customer = df_dim_customer.withColumn("customer_modified_date", to_date("customer_modified_date", "yyyy-MM-dd"))

display(df_dim_customer)

customer_key,company_name,full_name,phone,email,address,city,state,country,postal_code,customer_modified_date
0,Professional Sales and Service,Catherine R. Abel,747-555-0171,catherine0@adventure-works.com,57251 Serene Blvd,Van Nuys,California,United States,91411,2009-05-16
1,Riders Company,Kim Abercrombie,334-555-0137,kim2@adventure-works.com,Tanger Factory,Branch,Minnesota,United States,55056,2009-05-16
2,Area Bike Accessories,Frances B. Adams,991-555-0183,frances0@adventure-works.com,6900 Sisk Road,Modesto,California,United States,95354,2009-05-16
3,Bicycle Accessories and Kits,Margaret J. Smith,959-555-0151,margaret0@adventure-works.com,Lewiston Mall,Lewiston,Idaho,United States,83501,2009-05-16
4,Valley Bicycle Specialists,Jay Adams,158-555-0142,jay1@adventure-works.com,Blue Ridge Mall,Kansas City,Missouri,United States,64106,2009-05-16
5,Vinyl and Plastic Goods Corporation,Samuel N. Agcaoili,554-555-0110,samuel0@adventure-works.com,No. 25800-130 King Street West,Toronto,Ontario,Canada,M4B 1V5,2005-09-01
6,Fun Toys and Bikes,Robert E. Ahlering,678-555-0175,robert1@adventure-works.com,6500 East Grant Road,Tucson,Arizona,United States,85701,2007-09-01
7,Great Bikes,François Ferrier,571-555-0128,françois1@adventure-works.com,Eastridge Mall,Casper,Wyoming,United States,82601,2005-07-01
8,Valley Toy Store,Amy E. Alberts,727-555-0115,amy1@adventure-works.com,252851 Rowan Place,Richmond,British Columbia,Canada,V6B 3P7,2006-09-01
9,Major Sport Suppliers,Paul L. Alcorn,331-555-0162,paul2@adventure-works.com,White Mountain Mall,Rock Springs,Wyoming,United States,82901,2007-07-01


In [0]:
# %md

# Why?

# Standardizes date formats for reporting.



## Step 6: Store DimCustomer as a Delta Table in the Gold Layer

In [0]:
df_dim_customer.write.format("delta")\
                .mode("overwrite")\
                .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/DimCustomer")\
                .save()

# Dimensions

## 2 - DimProduct

In [0]:
# %md
# Transforming DimProduct for Gold Layer in PySpark

# Create the `DimProduct`` dimension table from the Silver Layer tables:

# 1 - Silver.Product

# 2 - Silver.ProductCategory

# 3 - Silver.ProductModel

## Step 1 : Load Data from Silver Layer

In [0]:
df_product = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/Product")
df_product_category = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductCategory")
df_product_model = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/ProductModel")

display(df_product)
display(df_product_category)
display(df_product_model)


ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,ModifiedDate
680,"HL Road Frame - Black, 58",FR-R92B-58,Black,1059.31,1431.5,58,1016.04,18,6,2002-06-01,2008-03-11
706,"HL Road Frame - Red, 58",FR-R92R-58,Red,1059.31,1431.5,58,1016.04,18,6,2002-06-01,2008-03-11
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
709,"Mountain Bike Socks, M",SO-B909-M,White,3.3963,9.5,M,0.0,27,18,2005-07-01,2008-03-11
710,"Mountain Bike Socks, L",SO-B909-L,White,3.3963,9.5,L,0.0,27,18,2005-07-01,2008-03-11
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,Unknown,0.0,35,33,2005-07-01,2008-03-11
712,AWC Logo Cap,CA-1098,Multi,6.9223,8.99,Unknown,0.0,23,2,2005-07-01,2008-03-11
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,0.0,25,11,2005-07-01,2008-03-11
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,0.0,25,11,2005-07-01,2008-03-11


ProductCategoryID,ParentProductCategoryID,Name,ModifiedDate
1,0,Bikes,2002-06-01
2,0,Components,2002-06-01
3,0,Clothing,2002-06-01
4,0,Accessories,2002-06-01
5,1,Mountain Bikes,2002-06-01
6,1,Road Bikes,2002-06-01
7,1,Touring Bikes,2002-06-01
8,2,Handlebars,2002-06-01
9,2,Bottom Brackets,2002-06-01
10,2,Brakes,2002-06-01


ProductModelID,Name,ModifiedDate
1,Classic Vest,2007-06-01
2,Cycling Cap,2005-06-01
3,Full-Finger Gloves,2006-06-01
4,Half-Finger Gloves,2006-06-01
5,HL Mountain Frame,2005-06-01
6,HL Road Frame,2002-05-02
7,HL Touring Frame,2009-05-16
8,LL Mountain Frame,2006-11-20
9,LL Road Frame,2005-06-01
10,LL Touring Frame,2009-05-16


## Step 2: Join Tables to Create a Denormalized DimProduct

In [0]:
df_dim_product = df_product \
    .join(df_product_category, "ProductCategoryID", "left") \
    .join(df_product_model, "ProductModelID", "left") \
    .select(
        df_product["ProductID"].alias("product_key"),  # Surrogate Key
        df_product["Name"].alias("product_name"),
        df_product_category["Name"].alias("category_name"),
        df_product_category["ProductCategoryID"].alias("product_category_key"),
        df_product_category["ParentProductCategoryID"].alias("parent_product_category_key"),
        df_product_category["ModifiedDate"].alias("product_category_modified_date"),
        df_product_model["Name"].alias("model_name"),
        df_product_model["ModifiedDate"].alias("product_model_modified_date"),
        df_product["ProductNumber"].alias("product_number"),
        df_product["Color"].alias("color"),
        df_product["Size"].alias("size"),
        df_product["StandardCost"].alias("standard_cost"),
        df_product["ListPrice"].alias("list_price"),
        df_product["Weight"].alias("weight"),
        df_product["SellStartDate"].alias("sell_start_date"),
        df_product["ModifiedDate"].alias("product_modified_date")
    )


In [0]:
display(df_dim_product)

product_key,product_name,category_name,product_category_key,parent_product_category_key,product_category_modified_date,model_name,product_model_modified_date,product_number,color,size,standard_cost,list_price,weight,sell_start_date,product_modified_date
680,"HL Road Frame - Black, 58",Road Frames,18,2,2002-06-01,HL Road Frame,2002-05-02,FR-R92B-58,Black,58,1059.31,1431.5,1016.04,2002-06-01,2008-03-11
706,"HL Road Frame - Red, 58",Road Frames,18,2,2002-06-01,HL Road Frame,2002-05-02,FR-R92R-58,Red,58,1059.31,1431.5,1016.04,2002-06-01,2008-03-11
707,"Sport-100 Helmet, Red",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509-R,Red,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
708,"Sport-100 Helmet, Black",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509,Black,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
709,"Mountain Bike Socks, M",Socks,27,3,2002-06-01,Mountain Bike Socks,2005-06-01,SO-B909-M,White,M,3.3963,9.5,0.0,2005-07-01,2008-03-11
710,"Mountain Bike Socks, L",Socks,27,3,2002-06-01,Mountain Bike Socks,2005-06-01,SO-B909-L,White,L,3.3963,9.5,0.0,2005-07-01,2008-03-11
711,"Sport-100 Helmet, Blue",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509-B,Blue,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
712,AWC Logo Cap,Caps,23,3,2002-06-01,Cycling Cap,2005-06-01,CA-1098,Multi,Unknown,6.9223,8.99,0.0,2005-07-01,2008-03-11
713,"Long-Sleeve Logo Jersey, S",Jerseys,25,3,2002-06-01,Long-Sleeve Logo Jersey,2005-06-01,LJ-0192-S,Multi,S,38.4923,49.99,0.0,2005-07-01,2008-03-11
714,"Long-Sleeve Logo Jersey, M",Jerseys,25,3,2002-06-01,Long-Sleeve Logo Jersey,2005-06-01,LJ-0192-M,Multi,M,38.4923,49.99,0.0,2005-07-01,2008-03-11


In [0]:
# %md
# Explanation:

# Joins Product with ProductCategory using ProductCategoryID.

# Joins the result with ProductModel using ProductModelID.

# Selects only necessary columns for analytics.

# Renames ProductID to ProductKey (used in Fact tables).

## Step 3: Generate Surrogate Key for Product

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

df_dim_product = df_dim_product.withColumn("product_key", monotonically_increasing_id())

In [0]:
# %md

# Why?

# ProductKey is a surrogate key for fact tables.

# Helps maintain data integrity.



## Step 4: Handle Missing & Null Values

In [0]:
df_dim_product = df_dim_product.fillna({
    "category_name": "Unknown",
    "model_name": "Unknown",
    "color": "Not Specified",
    "size": "Not Specified",
    "weight": 0.0,
    "sell_start_date": "1900-01-01",
})


In [0]:
display(df_dim_product)

product_key,product_name,category_name,product_category_key,parent_product_category_key,product_category_modified_date,model_name,product_model_modified_date,product_number,color,size,standard_cost,list_price,weight,sell_start_date,product_modified_date
0,"HL Road Frame - Black, 58",Road Frames,18,2,2002-06-01,HL Road Frame,2002-05-02,FR-R92B-58,Black,58,1059.31,1431.5,1016.04,2002-06-01,2008-03-11
1,"HL Road Frame - Red, 58",Road Frames,18,2,2002-06-01,HL Road Frame,2002-05-02,FR-R92R-58,Red,58,1059.31,1431.5,1016.04,2002-06-01,2008-03-11
2,"Sport-100 Helmet, Red",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509-R,Red,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
3,"Sport-100 Helmet, Black",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509,Black,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
4,"Mountain Bike Socks, M",Socks,27,3,2002-06-01,Mountain Bike Socks,2005-06-01,SO-B909-M,White,M,3.3963,9.5,0.0,2005-07-01,2008-03-11
5,"Mountain Bike Socks, L",Socks,27,3,2002-06-01,Mountain Bike Socks,2005-06-01,SO-B909-L,White,L,3.3963,9.5,0.0,2005-07-01,2008-03-11
6,"Sport-100 Helmet, Blue",Helmets,35,4,2002-06-01,Sport-100,2005-06-01,HL-U509-B,Blue,Unknown,13.0863,34.99,0.0,2005-07-01,2008-03-11
7,AWC Logo Cap,Caps,23,3,2002-06-01,Cycling Cap,2005-06-01,CA-1098,Multi,Unknown,6.9223,8.99,0.0,2005-07-01,2008-03-11
8,"Long-Sleeve Logo Jersey, S",Jerseys,25,3,2002-06-01,Long-Sleeve Logo Jersey,2005-06-01,LJ-0192-S,Multi,S,38.4923,49.99,0.0,2005-07-01,2008-03-11
9,"Long-Sleeve Logo Jersey, M",Jerseys,25,3,2002-06-01,Long-Sleeve Logo Jersey,2005-06-01,LJ-0192-M,Multi,M,38.4923,49.99,0.0,2005-07-01,2008-03-11


In [0]:
# Why?

# Ensures data completeness in the Gold Layer.

# Uses default values for missing data.

## Step 5: Format Date Columns

In [0]:
from pyspark.sql.functions import to_date

df_dim_product = df_dim_product \
    .withColumn("sell_start_date", to_date("sell_start_date", "yyyy-MM-dd")) \
    .withColumn("product_modified_date", to_date("product_modified_date", "yyyy-MM-dd"))\
    .withColumn("product_category_modified_date", to_date("product_category_modified_date", "yyyy-MM-dd"))\
    .withColumn("product_model_modified_date", to_date("product_model_modified_date", "yyyy-MM-dd"))

In [0]:
# Why?

# Standardizes date formats for better analytics.

## Step 6: Store as a Delta Table in the Gold Layer

In [0]:
df_dim_product.write.format("delta")\
                 .mode("overwrite")\
                 .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/DimProduct")\
                 .save()

# Facts

## Fact Sales

In [0]:
# Creating FactSales Fact Table for Gold Layer in PySpark

# Create the FactSales fact table from the Silver Layer tables:

# Silver.SalesOrderHeader

# Silver.SalesOrderDetail

## Step 1: Load Data from Silver Layer

In [0]:
df_sales_order_header = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderHeader")
df_sales_order_detail = spark.read.format("delta").load("abfss://silver@adventureworksadls0.dfs.core.windows.net/SalesLT/SalesOrderDetail")

display(df_sales_order_header)
display(df_sales_order_detail)

SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ShipToAddressID,BillToAddressID,ShipMethod,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
71774,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71774,PO348186287,10-4020-000609,29847,1092,1092,CARGO TRANSPORT 5,880.3484,70.4279,22.0087,972.785,2008-06-08
71776,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71776,PO19952192051,10-4020-000106,30072,640,640,CARGO TRANSPORT 5,78.81,6.3048,1.9703,87.0851,2008-06-08
71780,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71780,PO19604173239,10-4020-000340,30113,653,653,CARGO TRANSPORT 5,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71782,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71782,PO19372114749,10-4020-000582,29485,1086,1086,CARGO TRANSPORT 5,39785.3304,3182.8264,994.6333,43962.7901,2008-06-08
71783,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71783,PO19343113609,10-4020-000024,29957,992,992,CARGO TRANSPORT 5,83858.4261,6708.6741,2096.4607,92663.5609,2008-06-08
71784,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71784,PO19285135919,10-4020-000448,29736,659,659,CARGO TRANSPORT 5,108561.8317,8684.9465,2714.0458,119960.824,2008-06-08
71796,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71796,PO17052159664,10-4020-000420,29660,1058,1058,CARGO TRANSPORT 5,57634.6342,4610.7707,1440.8659,63686.2708,2008-06-08
71797,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71797,PO16501134889,10-4020-000142,29796,642,642,CARGO TRANSPORT 5,78029.6898,6242.3752,1950.7422,86222.8072,2008-06-08
71815,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71815,PO13021155785,10-4020-000276,30089,1034,1034,CARGO TRANSPORT 5,1141.5782,91.3263,28.5395,1261.444,2008-06-08
71816,2,2008-06-01,2008-06-13,2008-06-08,5,False,SO71816,PO12992180445,10-4020-000295,30027,1038,1038,CARGO TRANSPORT 5,3398.1659,271.8533,84.9541,3754.9733,2008-06-08


SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate
71774,110562,1,836,356.898,0.0,356.898,2008-06-01
71774,110563,1,822,356.898,0.0,356.898,2008-06-01
71776,110567,1,907,63.9,0.0,63.9,2008-06-01
71780,110616,4,905,218.454,0.0,873.816,2008-06-01
71780,110617,2,983,461.694,0.0,923.388,2008-06-01
71780,110618,6,988,112.998,0.4,406.7928,2008-06-01
71780,110619,2,748,818.7,0.0,1637.4,2008-06-01
71780,110620,1,990,323.994,0.0,323.994,2008-06-01
71780,110621,1,926,149.874,0.0,149.874,2008-06-01
71780,110622,1,743,809.76,0.0,809.76,2008-06-01


## Step 2: Join Tables to Create a Denormalized FactSales

In [0]:
df_fact_sales = df_sales_order_detail \
    .join(df_sales_order_header, "SalesOrderID", "inner") \
    .select(
        df_sales_order_detail["SalesOrderID"].alias("sales_order_key"),
        df_sales_order_detail["SalesOrderDetailID"].alias("sales_order_detail_key"),
        df_sales_order_header["CustomerID"].alias("customer_key"),
        df_sales_order_detail["ProductID"].alias("product_key"),
        df_sales_order_header["OrderDate"].alias("order_date"),
        df_sales_order_header["ShipDate"].alias("ship_date"),
        df_sales_order_header["DueDate"].alias("due_date"),
        df_sales_order_detail["OrderQty"].alias("order_quantity"),
        df_sales_order_detail["UnitPrice"].alias("unit_price"),
        df_sales_order_detail["UnitPriceDiscount"].alias("unit_price_discount"),
        df_sales_order_detail["LineTotal"].alias("gross_amount"),
        df_sales_order_header["SubTotal"].alias("total_amount"),
        df_sales_order_header["TaxAmt"].alias("tax_amount"),
        df_sales_order_header["Freight"].alias("freight_amount"),
        df_sales_order_header["TotalDue"].alias("total_due"),
        df_sales_order_header["ModifiedDate"].alias("sales_order_modified_date"),
    )


In [0]:
display(df_fact_sales)

sales_order_key,sales_order_detail_key,customer_key,product_key,order_date,ship_date,due_date,order_quantity,unit_price,unit_price_discount,gross_amount,total_amount,tax_amount,freight_amount,total_due,sales_order_modified_date
71774,110562,29847,836,2008-06-01,2008-06-08,2008-06-13,1,356.898,0.0,356.898,880.3484,70.4279,22.0087,972.785,2008-06-08
71774,110563,29847,822,2008-06-01,2008-06-08,2008-06-13,1,356.898,0.0,356.898,880.3484,70.4279,22.0087,972.785,2008-06-08
71776,110567,30072,907,2008-06-01,2008-06-08,2008-06-13,1,63.9,0.0,63.9,78.81,6.3048,1.9703,87.0851,2008-06-08
71780,110616,30113,905,2008-06-01,2008-06-08,2008-06-13,4,218.454,0.0,873.816,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110617,30113,983,2008-06-01,2008-06-08,2008-06-13,2,461.694,0.0,923.388,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110618,30113,988,2008-06-01,2008-06-08,2008-06-13,6,112.998,0.4,406.7928,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110619,30113,748,2008-06-01,2008-06-08,2008-06-13,2,818.7,0.0,1637.4,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110620,30113,990,2008-06-01,2008-06-08,2008-06-13,1,323.994,0.0,323.994,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110621,30113,926,2008-06-01,2008-06-08,2008-06-13,1,149.874,0.0,149.874,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
71780,110622,30113,743,2008-06-01,2008-06-08,2008-06-13,1,809.76,0.0,809.76,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08


In [0]:
# Explanation:

# Joins SalesOrderDetail with SalesOrderHeader using SalesOrderID.

# Selects only necessary columns for analytics.

# Uses CustomerID and ProductID as foreign keys linking to DimCustomer and DimProduct.

## Step 3: Handle Missing & Null Values

In [0]:
df_fact_sales = df_fact_sales.fillna({
    "ship_date": "1900-01-01",
    "due_date": "1900-01-01",
    "unit_price_discount": 0.0,
    "freight_amount": 0.0,
    "tax_amount": 0.0
})


In [0]:
# Why?

# Ensures data completeness in the Gold Layer.

# Uses default values for missing data.

## Step 4: Format Date Columns

In [0]:
from pyspark.sql.functions import to_date

df_fact_sales = df_fact_sales \
    .withColumn("order_date", to_date("order_date", "yyyy-MM-dd")) \
    .withColumn("ship_date", to_date("ship_date", "yyyy-MM-dd")) \
    .withColumn("due_date", to_date("due_date", "yyyy-MM-dd")) \
    .withColumn("modified_date", to_date("sales_order_modified_date", "yyyy-MM-dd"))


In [0]:
# Why?

# Standardizes date formats for better analytics.


## Step 5: Store as a Delta Table in the Gold Layer

In [0]:
df_fact_sales.write.format("delta")\
                .mode("overwrite")\
                .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/FactSales")\
                .save()

# FactSalesAggregation

In [0]:
# Creating FactSalesAgg (Aggregated Fact Table) for Gold Layer in PySpark
# The FactSalesAgg table is an aggregated version of FactSales, optimized for faster analytics and reporting. It will summarize sales data at the Customer and Product level.

## Step 1: Load Data from Gold Layer (FactSales)

In [0]:
df_fact_sales = spark.read.format("delta")\
                         .load("abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/FactSales")

display(df_fact_sales)

sales_order_key,sales_order_detail_key,customer_key,product_key,order_date,ship_date,due_date,order_quantity,unit_price,unit_price_discount,gross_amount,total_amount,tax_amount,freight_amount,total_due,sales_order_modified_date,modified_date
71774,110562,29847,836,2008-06-01,2008-06-08,2008-06-13,1,356.898,0.0,356.898,880.3484,70.4279,22.0087,972.785,2008-06-08,2008-06-08
71774,110563,29847,822,2008-06-01,2008-06-08,2008-06-13,1,356.898,0.0,356.898,880.3484,70.4279,22.0087,972.785,2008-06-08,2008-06-08
71776,110567,30072,907,2008-06-01,2008-06-08,2008-06-13,1,63.9,0.0,63.9,78.81,6.3048,1.9703,87.0851,2008-06-08,2008-06-08
71780,110616,30113,905,2008-06-01,2008-06-08,2008-06-13,4,218.454,0.0,873.816,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110617,30113,983,2008-06-01,2008-06-08,2008-06-13,2,461.694,0.0,923.388,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110618,30113,988,2008-06-01,2008-06-08,2008-06-13,6,112.998,0.4,406.7928,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110619,30113,748,2008-06-01,2008-06-08,2008-06-13,2,818.7,0.0,1637.4,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110620,30113,990,2008-06-01,2008-06-08,2008-06-13,1,323.994,0.0,323.994,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110621,30113,926,2008-06-01,2008-06-08,2008-06-13,1,149.874,0.0,149.874,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08
71780,110622,30113,743,2008-06-01,2008-06-08,2008-06-13,1,809.76,0.0,809.76,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08,2008-06-08


## Step 2: Perform Aggregations


In [0]:
from pyspark.sql.functions import sum, avg, count, round

df_fact_sales_agg = df_fact_sales.groupBy("customer_key", "product_key") \
    .agg(
        count("sales_order_key").alias("total_orders"),
        sum("order_quantity").alias("total_quantity"),
        sum("gross_amount").alias("total_sales_amount"),
        sum("tax_amount").alias("total_tax"),
        sum("freight_amount").alias("total_freight_amount"),
        round(avg("unit_price"), 2).alias("avg_unit_price"),
        round(avg("unit_price_discount"), 2).alias("avg_discount")
    )


In [0]:
# Explanation:

# Groups data by CustomerKey and ProductKey.

# Calculates:
# ✔ TotalOrders → Number of orders per customer-product
# ✔ TotalQuantity → Total units ordered
# ✔ TotalSalesAmount → Total revenue generated
# ✔ TotalTax → Total tax paid
# ✔ TotalFreight → Total shipping cost
# ✔ AvgUnitPrice → Average selling price per unit
# ✔ AvgDiscount → Average discount applied

## Step 3: Optimize for Query Performance

In [0]:
df_fact_sales_agg = df_fact_sales_agg.repartition("customer_key")


In [0]:
# Why?

# Partitioning by CustomerKey ensures faster queries.

## Step 4: Store as a Delta Table in the Gold Layer

In [0]:
df_fact_sales_agg.write.format("delta")\
                    .mode("overwrite")\
                    .option("path", "abfss://gold@adventureworksadls0.dfs.core.windows.net/SalesLT/FactSalesAggregation")\
                    .save()

In [0]:
# What NOT to Load to Gold Layer:
    
# Raw transaction tables - These should remain in silver

# Intermediate tables - Used only for transformation pipelines

# Staging tables - Temporary tables used during ETL

# Audit/log tables - Should remain in silver or a separate audit layer

# Gold Layer Transformation Best Practices
# Business Naming Conventions:

# Use clear business terms (e.g., "net_sales" instead of "amt_net")

# Consistent naming across all tables

# Data Enrichment:

# Add calculated business metrics

# Create derived dimensions (e.g., customer segments)

# Add time intelligence (YTD, QTD, MTD flags)

# Data Quality:

# Add data quality checks as columns

# Implement surrogate keys

# Handle NULLs consistently

# Performance Optimization:

# Proper partitioning (typically by date)

# Z-ordering on frequently filtered columns

# Materialized aggregates for common queries

# SCD Handling:

# Type 2 for slowly changing dimensions

# Effective/expiration dates for historical tracking

# Implementation Roadmap
# First Load:

# Create dimension tables

# Create fact tables

# Generate date dimension

# Incremental Loads:

# Identify new/changed records in silver

# Apply SCD logic to dimensions

# Append new facts with proper keys

# Aggregates:

# Build after initial load

# Refresh based on business needs

# Optimization:

# Analyze query patterns

# Adjust partitioning and indexing

# Vacuum and optimize regularly