Connecting to the Silver container to retrieve the Cleaned data

In [0]:
dbutils.fs.ls("/mnt/silver")

[FileInfo(path='dbfs:/mnt/silver/df_customer/', name='df_customer/', size=0, modificationTime=1714712773000),
 FileInfo(path='dbfs:/mnt/silver/df_product/', name='df_product/', size=0, modificationTime=1714712776000),
 FileInfo(path='dbfs:/mnt/silver/df_store/', name='df_store/', size=0, modificationTime=1714712778000)]

Checking for data in Gold container

In [0]:
dbutils.fs.ls("/mnt/gold")

[]

Assigning paths of customer, product, store tables

In [0]:
customers_path = '/mnt/silver/df_customer/'
products_path = '/mnt/silver/df_product/'
store_path = '/mnt/silver/df_store/'

Loading Data into DataFrame from delta

In [0]:
new_df_customer = spark.read.format('delta').load(customers_path)
new_df_product = spark.read.format('delta').load(products_path)
new_df_store = spark.read.format('delta').load(store_path)

Understanding data in each table

In [0]:
display(new_df_customer.head(5))

Customer_ID,Customer_Name,Segment,Country,City,State,Postal_Code,Region,Age
CG/12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,42
DV/13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,47
SO/20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,19
BH/11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,39
AA/10480,Andrew Allen,Consumer,United States,Concord,North Carolina,28027,South,31


In [0]:
display(new_df_product.head(5))

Product_ID,Category,Sub_Category,Product_Name
FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase
FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back"
OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal
FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table
OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System


In [0]:
display(new_df_store.head(5))

Order_ID,Order_Date,Ship_Date,Ship_Mode,Customer_ID,Product_ID,Sales,Discount
CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG/12520,FUR-BO-10001798,3929400.0,0.02
CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG/12520,FUR-CH-10000454,10979100.0,0.01
CA-2017-138688,2017-06-12,2017-06-16,Second Class,DV/13045,OFF-LA-10000240,219300.0,0.01
US-2016-108966,2016-10-11,2016-10-18,Standard Class,SO/20335,FUR-TA-10000577,14363662.5,0.02
US-2016-108966,2016-10-11,2016-10-18,Standard Class,SO/20335,OFF-ST-10000760,335520.0,0.03


Normalizing Product Table

In [0]:
new_df_product.select("Category", "Sub_Category").distinct().count()

17

In [0]:
new_df_product.select("Category", "Sub_Category").distinct().show()

+---------------+------------+
|       Category|Sub_Category|
+---------------+------------+
|Office Supplies|  Appliances|
|Office Supplies|   Envelopes|
|      Furniture|      Chairs|
|Office Supplies|     Storage|
|Office Supplies|       Paper|
|Office Supplies|    Supplies|
|     Technology|     Copiers|
|Office Supplies|         Art|
|Office Supplies|     Binders|
|     Technology| Accessories|
|Office Supplies|   Fasteners|
|      Furniture|   Bookcases|
|     Technology|      Phones|
|Office Supplies|      Labels|
|     Technology|    Machines|
|      Furniture|      Tables|
|      Furniture| Furnishings|
+---------------+------------+



Creating Category Dimension Table

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

dim_category = (
    new_df_product.select("Category", "Sub_Category")
    .distinct()
    .withColumn("Category_ID", monotonically_increasing_id() + 1)
)

dim_category = dim_category.select("Category_ID", "Category", "Sub_Category")

In [0]:
display(dim_category)

Category_ID,Category,Sub_Category
1,Office Supplies,Appliances
2,Office Supplies,Envelopes
3,Furniture,Chairs
4,Office Supplies,Storage
5,Office Supplies,Paper
6,Office Supplies,Supplies
7,Technology,Copiers
8,Office Supplies,Art
9,Office Supplies,Binders
10,Technology,Accessories


In [0]:
new_df_product = (
    new_df_product.select("Product_ID", "Product_Name", "Category", "Sub_Category")
    .join(dim_category, ["Category", "Sub_Category"], "inner")
    .select("Product_ID", "Product_Name", "Category_ID")
)

In [0]:
display(new_df_product)

Product_ID,Product_Name,Category_ID
FUR-BO-10001798,Bush Somerset Collection Bookcase,12
FUR-CH-10000454,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",3
OFF-LA-10000240,Self-Adhesive Address Labels for Typewriters by Universal,14
FUR-TA-10000577,Bretford CR4500 Series Slim Rectangular Table,16
OFF-ST-10000760,Eldon Fold 'N Roll Cart System,4
FUR-FU-10001487,"Eldon Expressions Wood and Plastic Desk Accessories, Cherry Wood",17
OFF-AR-10002833,Newell 322,8
TEC-PH-10002275,Mitel 5320 IP Phone VoIP phone,13
OFF-BI-10003910,DXL Angle-View Binders with Locking Rings by Samsill,9
OFF-AP-10002892,Belkin F5C206VTEL 6 Outlet Surge,1


Normalizing Customer Table

In [0]:
display(new_df_customer.select("Country", "Region","State", "City", "Postal_Code").distinct())

Country,Region,State,City,Postal_Code
United States,West,Arizona,Gilbert,85234
United States,West,California,Los Angeles,90045
United States,South,Virginia,Charlottesville,22901
United States,East,Maryland,Baltimore,21215
United States,East,Ohio,Grove City,43123
United States,East,New York,New York City,10011
United States,East,New York,Niagara Falls,14304
United States,Central,Minnesota,Minneapolis,55407
United States,West,Washington,Seattle,98115
United States,Central,Texas,Austin,78745


Creating Address and Customer Segment Dimension Tables

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

dim_address = (
    new_df_customer.select("Country", "Region","State", "City", "Postal_Code")
    .distinct()
    .withColumn("Address_ID", monotonically_increasing_id() + 1)
)

dim_address = dim_address.select("Address_ID", "Country", "Region","State", "City", "Postal_Code")

dim_cust_segment = (
    new_df_customer.select("Segment")
    .distinct()
    .withColumn("Customer_Segment_ID", monotonically_increasing_id() + 1)
)

dim_cust_segment = dim_cust_segment.select("Customer_Segment_ID", "Segment")

In [0]:
display(dim_address)

Address_ID,Country,Region,State,City,Postal_Code
1,United States,West,Arizona,Gilbert,85234
2,United States,West,California,Los Angeles,90045
3,United States,South,Virginia,Charlottesville,22901
4,United States,East,Maryland,Baltimore,21215
5,United States,East,Ohio,Grove City,43123
6,United States,East,New York,New York City,10011
7,United States,East,New York,Niagara Falls,14304
8,United States,Central,Minnesota,Minneapolis,55407
9,United States,West,Washington,Seattle,98115
10,United States,Central,Texas,Austin,78745


In [0]:
display(dim_cust_segment)

Customer_Segment_ID,Segment
1,Consumer
2,Home Office
3,Corporate


In [0]:
display(new_df_customer)

Customer_ID,Customer_Name,Segment,Country,City,State,Postal_Code,Region,Age
CG/12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,42
DV/13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,47
SO/20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,19
BH/11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,39
AA/10480,Andrew Allen,Consumer,United States,Concord,North Carolina,28027,South,31
IM/15070,Irene Maddox,Consumer,United States,Seattle,Washington,98103,West,45
HP/14815,Harold Pawlan,Home Office,United States,Fort Worth,Texas,76106,Central,38
PK/19075,Pete Kriz,Consumer,United States,Madison,Wisconsin,53711,Central,38
AG/10270,Alejandro Grove,Consumer,United States,West Jordan,Utah,84084,West,36
ZD/21925,Zuschuss Donatelli,Consumer,United States,San Francisco,California,94109,West,19


In [0]:
new_df_customer = (
    new_df_customer.select("Customer_ID", "Customer_Name", "Segment", "Country", "City", "State","Postal_Code", "Region", "Age")
    .join(dim_address, ["Country", "Region","State", "City", "Postal_Code"], "inner")
    .join(dim_cust_segment, "Segment", "inner")
    .select("Customer_ID", "Customer_Name", "Age", "Customer_Segment_ID","Address_ID")
)

In [0]:
display(new_df_customer)

Customer_ID,Customer_Name,Age,Customer_Segment_ID,Address_ID
PF/19120,Peter Fuller,46,1,1
AO/10810,Anthony O'Donnell,37,3,2
AS/10090,Adam Shillingsburg,40,1,3
BF/11080,Bart Folk,38,1,4
CK/12595,Clytie Kelty,35,1,5
BS/11365,Bill Shonely,46,3,6
NP/18685,Nora Pelletier,39,2,7
AC/10450,Amy Cox,31,1,8
MH/18025,Michelle Huthwaite,34,1,9
RD/19930,Russell D'Ascenzo,33,1,10


In [0]:
new_df_customer.where(new_df_customer["Customer_Name"] == 'Harold Pawlan').show()

+-----------+-------------+---+-------------------+----------+
|Customer_ID|Customer_Name|Age|Customer_Segment_ID|Address_ID|
+-----------+-------------+---+-------------------+----------+
|   HP/14815|Harold Pawlan| 38|                  2|       135|
+-----------+-------------+---+-------------------+----------+



Normalizing Store Table

In [0]:
display(new_df_store)

Order_ID,Order_Date,Ship_Date,Ship_Mode,Customer_ID,Product_ID,Sales,Discount
CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG/12520,FUR-BO-10001798,3929400.0,0.02
CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG/12520,FUR-CH-10000454,10979100.0,0.01
CA-2017-138688,2017-06-12,2017-06-16,Second Class,DV/13045,OFF-LA-10000240,219300.0,0.01
US-2016-108966,2016-10-11,2016-10-18,Standard Class,SO/20335,FUR-TA-10000577,14363662.5,0.02
US-2016-108966,2016-10-11,2016-10-18,Standard Class,SO/20335,OFF-ST-10000760,335520.0,0.03
CA-2015-115812,2015-06-09,2015-06-14,Standard Class,BH/11710,FUR-FU-10001487,732900.0,0.02
CA-2015-115812,2015-06-09,2015-06-14,Standard Class,BH/11710,OFF-AR-10002833,109200.0,0.02
CA-2015-115812,2015-06-09,2015-06-14,Standard Class,BH/11710,TEC-PH-10002275,13607280.0,0.02
CA-2015-115812,2015-06-09,2015-06-14,Standard Class,BH/11710,OFF-BI-10003910,277560.0,0.02
CA-2015-115812,2015-06-09,2015-06-14,Standard Class,BH/11710,OFF-AP-10002892,1723500.0,0.01


Creating Date Dimension

In [0]:
from pyspark.sql.functions import min, max
min_date = new_df_store.select(min("Order_Date"), min("Ship_Date")).collect()
min_order_date = min_date[0][0]
min_ship_date = min_date[0][1]

max_date = new_df_store.select(max("Order_Date"), max("Ship_Date")).collect()
max_order_date = max_date[0][0]
max_ship_date = max_date[0][1]

start_date = min_order_date if min_order_date < min_ship_date else min_ship_date
end_date = max_order_date if max_order_date > max_ship_date else max_ship_date
print('Start Date: ', start_date)
print('End Date: ', end_date)

Start Date:  2015-01-03
End Date:  2019-01-05


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

dateDimSchema = StructType([
    StructField("date", DateType(), False),
    StructField("year", IntegerType(), False),
    StructField("quarter", IntegerType(), False),
    StructField("month", IntegerType(), False),
    StructField("month_name", StringType(), False),
    StructField("day", IntegerType(), False),
    StructField("day_of_week", IntegerType(), False),
    StructField("day_of_week_name", StringType(), False)
])

dates = spark.sql(
    f"""
    SELECT
        to_date(date_sub(date_add('{start_date}', row_number() OVER (ORDER BY 'a')), 1)) AS date
    FROM
        (SELECT explode(sequence(0, datediff('{end_date}', '{start_date}'))) AS row_num) a
    """
)

dim_date = dates.withColumn("year", F.year("date")) \
    .withColumn("quarter", F.quarter("date")) \
    .withColumn("month", F.month("date")) \
    .withColumn("month_name", F.date_format("date", "MMMM")) \
    .withColumn("day", F.dayofmonth("date")) \
    .withColumn("day_of_week", F.dayofweek("date")) \
    .withColumn("day_of_week_name", F.date_format("date", "EEEE")) \
    .select("date", "year", "quarter", "month", "month_name", "day", "day_of_week", "day_of_week_name") \
    .withColumn("year", F.col("year").cast(IntegerType())) \
    .withColumn("quarter", F.col("quarter").cast(IntegerType())) \
    .withColumn("month", F.col("month").cast(IntegerType())) \
    .withColumn("day", F.col("day").cast(IntegerType())) \
    .withColumn("day_of_week", F.col("day_of_week").cast(IntegerType()))

display(dim_date)

date,year,quarter,month,month_name,day,day_of_week,day_of_week_name
2015-01-03,2015,1,1,January,3,7,Saturday
2015-01-04,2015,1,1,January,4,1,Sunday
2015-01-05,2015,1,1,January,5,2,Monday
2015-01-06,2015,1,1,January,6,3,Tuesday
2015-01-07,2015,1,1,January,7,4,Wednesday
2015-01-08,2015,1,1,January,8,5,Thursday
2015-01-09,2015,1,1,January,9,6,Friday
2015-01-10,2015,1,1,January,10,7,Saturday
2015-01-11,2015,1,1,January,11,1,Sunday
2015-01-12,2015,1,1,January,12,2,Monday


Creating Order Dimension Table

In [0]:
dim_order = new_df_store.select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode").distinct()
display(dim_order)

Order_ID,Order_Date,Ship_Date,Ship_Mode
CA-2015-106376,2015-12-05,2015-12-10,Standard Class
CA-2015-130092,2015-01-11,2015-01-14,First Class
CA-2018-133235,2018-08-01,2018-08-04,First Class
CA-2017-159212,2017-11-01,2017-11-05,Standard Class
CA-2017-106530,2017-05-08,2017-05-08,Same Day
CA-2017-140501,2017-06-23,2017-06-28,Standard Class
CA-2018-116225,2018-11-05,2018-11-09,Standard Class
CA-2017-164735,2017-04-14,2017-04-19,Second Class
CA-2018-118773,2018-02-09,2018-02-14,Standard Class
US-2016-163783,2016-12-27,2017-01-01,Standard Class


In [0]:
dim_order = (
    dim_order.select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode")
    .join(dim_date, dim_order["Order_Date"] == dim_date["date"], "inner")
    .select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode")
)

dim_order = (
    dim_order.select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode")
    .join(dim_date, dim_order["Ship_Date"] == dim_date["date"], "inner")
    .select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode")
)
display(dim_order)

Order_ID,Order_Date,Ship_Date,Ship_Mode
CA-2015-106376,2015-12-05,2015-12-10,Standard Class
CA-2015-130092,2015-01-11,2015-01-14,First Class
CA-2018-133235,2018-08-01,2018-08-04,First Class
CA-2017-159212,2017-11-01,2017-11-05,Standard Class
CA-2017-106530,2017-05-08,2017-05-08,Same Day
CA-2017-140501,2017-06-23,2017-06-28,Standard Class
CA-2018-116225,2018-11-05,2018-11-09,Standard Class
CA-2017-164735,2017-04-14,2017-04-19,Second Class
CA-2018-118773,2018-02-09,2018-02-14,Standard Class
US-2016-163783,2016-12-27,2017-01-01,Standard Class


In [0]:
new_df_store = (
    new_df_store.select("Order_ID", "Order_Date", "Ship_Date", "Ship_Mode", "Customer_ID", "Product_ID", "Sales", "Discount")
    .join(dim_order, "Order_ID", "inner")
    .select("Order_ID", "Customer_ID", "Product_ID", "Sales", "Discount")
)
display(new_df_store)

Order_ID,Customer_ID,Product_ID,Sales,Discount
CA-2015-106376,BS/11590,TEC-PH-10002726,2519520.0,0.03
CA-2015-130092,SV/20365,FUR-FU-10000010,149100.0,0.03
CA-2018-133235,LH/16750,TEC-PH-10002660,4079400.0,0.03
CA-2017-159212,KM/16375,OFF-EN-10002230,3776850.0,0.02
CA-2017-106530,CL/12565,TEC-AC-10001465,871680.0,0.01
CA-2017-140501,IM/15070,OFF-FA-10004076,59400.0,0.01
CA-2018-116225,SV/20935,TEC-AC-10001432,5861250.0,0.01
CA-2017-164735,TB/21400,OFF-ST-10001558,1218000.0,0.02
CA-2018-118773,TP/21415,OFF-AP-10000055,194880.0,0.02
US-2016-163783,DR/12940,OFF-ST-10002957,190080.0,0.02


In [0]:
display(new_df_customer.head(5))
display(dim_address.head(5))
display(dim_cust_segment.head(5))
display(new_df_product.head(5))
display(dim_category.head(5))
display(new_df_store.head(5))
display(dim_order.head(5))
display(dim_date.head(5))

Customer_ID,Customer_Name,Age,Customer_Segment_ID,Address_ID
PF/19120,Peter Fuller,46,1,1
AO/10810,Anthony O'Donnell,37,3,2
AS/10090,Adam Shillingsburg,40,1,3
BF/11080,Bart Folk,38,1,4
CK/12595,Clytie Kelty,35,1,5


Address_ID,Country,Region,State,City,Postal_Code
1,United States,West,Arizona,Gilbert,85234
2,United States,West,California,Los Angeles,90045
3,United States,South,Virginia,Charlottesville,22901
4,United States,East,Maryland,Baltimore,21215
5,United States,East,Ohio,Grove City,43123


Customer_Segment_ID,Segment
1,Consumer
2,Home Office
3,Corporate


Product_ID,Product_Name,Category_ID
FUR-BO-10001798,Bush Somerset Collection Bookcase,12
FUR-CH-10000454,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",3
OFF-LA-10000240,Self-Adhesive Address Labels for Typewriters by Universal,14
FUR-TA-10000577,Bretford CR4500 Series Slim Rectangular Table,16
OFF-ST-10000760,Eldon Fold 'N Roll Cart System,4


Category_ID,Category,Sub_Category
1,Office Supplies,Appliances
2,Office Supplies,Envelopes
3,Furniture,Chairs
4,Office Supplies,Storage
5,Office Supplies,Paper


Order_ID,Customer_ID,Product_ID,Sales,Discount
CA-2015-106376,BS/11590,TEC-PH-10002726,2519520.0,0.03
CA-2015-130092,SV/20365,FUR-FU-10000010,149100.0,0.03
CA-2018-133235,LH/16750,TEC-PH-10002660,4079400.0,0.03
CA-2017-159212,KM/16375,OFF-EN-10002230,3776850.0,0.02
CA-2017-106530,CL/12565,TEC-AC-10001465,871680.0,0.01


Order_ID,Order_Date,Ship_Date,Ship_Mode
CA-2015-106376,2015-12-05,2015-12-10,Standard Class
CA-2015-130092,2015-01-11,2015-01-14,First Class
CA-2018-133235,2018-08-01,2018-08-04,First Class
CA-2017-159212,2017-11-01,2017-11-05,Standard Class
CA-2017-106530,2017-05-08,2017-05-08,Same Day


date,year,quarter,month,month_name,day,day_of_week,day_of_week_name
2015-01-03,2015,1,1,January,3,7,Saturday
2015-01-04,2015,1,1,January,4,1,Sunday
2015-01-05,2015,1,1,January,5,2,Monday
2015-01-06,2015,1,1,January,6,3,Tuesday
2015-01-07,2015,1,1,January,7,4,Wednesday


Saving Data in Gold Container

In [0]:
dfs = [new_df_customer, dim_address, dim_cust_segment, new_df_product, dim_category, new_df_store, dim_order, dim_date]
df_names = ["customer", "address", "customer_segment", "product", "product_category", "sales_detail", "order_detail", "date"]

for i, df in enumerate(dfs, start=1):
  output_path = '/mnt/gold/' + df_names[i-1] + '/'
  df.write.format('delta').mode("overwrite").save(output_path)