# Company Inc - Sales Data Analysis

## Data Modeling

<br>

### This notebook was developed to implement a data modeling process into the original dataset, aiming to support seamless data processing and analyses. The data model created is presented below:

<div style="text-align: center;">
<br>
<img src="https://drive.google.com/uc?export=view&id=10WqmVpm5Bk6mWB7xePgdqqZOifSpR4q0"  width="60%" height="30%">

### 0. Libraries import

In [0]:
from pyspark.sql import functions as f #PySpark functions, such as column, maximum, and others
from pyspark.sql import types as t #Pyspark types, such as IntegerType, DoubleType, and others
from pyspark.sql.window import Window #Pyspark Window function to perform calculations across a specified range of rows within a DataFrame

### 1. Read the original dataset located in DBFS. The original dataset was downloaded in the next website:
https://www.kaggle.com/datasets/fatihilhan/global-superstore-dataset

In [0]:
#DBFS location with the original dataset as CSV file
file_location = "/FileStore/Company_Inc/superstore.csv"

#read the CSV file with the Spark read function
df = spark.read.option("delimiter", ",").option("header", True).option("escape", "\"").csv(file_location)

#replace the dot character "." with "_"
new_cols=(column.replace('.', '_') for column in df.columns)
df = df.toDF(*new_cols)

#display the results
display(df.head(5))

Category,City,Country,Customer_ID,Customer_Name,Discount,Market,记录数,Order_Date,Order_ID,Order_Priority,Product_ID,Product_Name,Profit,Quantity,Region,Row_ID,Sales,Segment,Ship_Date,Ship_Mode,Shipping_Cost,State,Sub_Category,Year,Market2,weeknum
Office Supplies,Los Angeles,United States,LS-172304,Lycoris Saunders,0,US,1,2011-01-07 00:00:00.000,CA-2011-130813,High,OFF-PA-10002005,Xerox 225,9.3312,3,West,36624,19,Consumer,2011-01-09 00:00:00.000,Second Class,4.37,California,Paper,2011,North America,2
Office Supplies,Los Angeles,United States,MV-174854,Mark Van Huff,0,US,1,2011-01-21 00:00:00.000,CA-2011-148614,Medium,OFF-PA-10002893,"Wirebound Service Call Books, 5 1/2"" x 4""",9.2928,2,West,37033,19,Consumer,2011-01-26 00:00:00.000,Standard Class,0.94,California,Paper,2011,North America,4
Office Supplies,Los Angeles,United States,CS-121304,Chad Sievert,0,US,1,2011-08-05 00:00:00.000,CA-2011-118962,Medium,OFF-PA-10000659,"Adams Phone Message Book, Professional, 400 Message Capacity, 5 3/6” x 11”",9.8418,3,West,31468,21,Consumer,2011-08-09 00:00:00.000,Standard Class,1.81,California,Paper,2011,North America,32
Office Supplies,Los Angeles,United States,CS-121304,Chad Sievert,0,US,1,2011-08-05 00:00:00.000,CA-2011-118962,Medium,OFF-PA-10001144,Xerox 1913,53.2608,2,West,31469,111,Consumer,2011-08-09 00:00:00.000,Standard Class,4.59,California,Paper,2011,North America,32
Office Supplies,Los Angeles,United States,AP-109154,Arthur Prichep,0,US,1,2011-09-29 00:00:00.000,CA-2011-146969,High,OFF-PA-10002105,Xerox 223,3.1104,1,West,32440,6,Consumer,2011-10-03 00:00:00.000,Standard Class,1.32,California,Paper,2011,North America,40


### 2. Prepare the dimension table "Market"

In [0]:
#drop duplicates to get all the unique values
df_market = df.select("Market", "Market2").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_market = df_market.withColumn("Market_ID", f.row_number().over(Window.partitionBy().orderBy("Market", "Market2")))

#select the columns of interest in the correct order
df_market = df_market.select("Market_ID", "Market", "Market2")

#display the results
display(df_market.head(5))

Market_ID,Market,Market2
1,APAC,APAC
2,Africa,Africa
3,Canada,North America
4,EMEA,EMEA
5,EU,EU


### 3. Prepare the dimension table "Location"

In [0]:
#drop duplicates to get all the unique values
df_location = df.select("Country", "State", "Region", "City").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_location = df_location.withColumn("Location_ID", f.row_number().over(Window.partitionBy().orderBy("Country", "State", "Region", "City")))

#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df.select("Country", "State", "Region", "City", "Market", "Market2").drop_duplicates()

#combine the results between the dimension table and auxiliary dataframe
df_location = df_location.join(df_aux, on=["Country", "State", "Region", "City"])

#get the foreign key using a join with the related table
df_location = df_location.join(df_market, on=["Market", "Market2"])

#select the columns of interest in the correct order
df_location = df_location.select("Location_ID", "Market_ID", "Country", "State", "Region", "City")

#display the results
display(df_location.head(5))

Location_ID,Market_ID,Country,State,Region,City
3417,7,United States,Maryland,East,Laurel
3586,7,United States,Oklahoma,Central,Lawton
3388,7,United States,Iowa,Central,Iowa City
3804,2,Zambia,Copperbelt,Africa,Chingola
2118,2,Mali,Bamako,Africa,Bamako


### 4. Prepare the dimension table "Customer_Segment"

In [0]:
#drop duplicates to get all the unique values
df_cust_seg = df.select("Segment").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_cust_seg = df_cust_seg.withColumn("Customer_Segment_ID", f.row_number().over(Window.partitionBy().orderBy("Segment")))

#select the columns of interest in the correct order
df_cust_seg = df_cust_seg.select("Customer_Segment_ID", "Segment")

#display the results
display(df_cust_seg.head(5))

Customer_Segment_ID,Segment
1,Consumer
2,Corporate
3,Home Office


### 5. Prepare the dimension table "Customer"

In [0]:
#drop duplicates to get all the unique values
df_cust = df.select("Customer_ID", "Customer_Name").drop_duplicates()

#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df.select("Customer_ID", "Customer_Name", "Segment").drop_duplicates()

#combine the results between the dimension table and auxiliary dataframe
df_cust = df_cust.join(df_aux, on=["Customer_ID", "Customer_Name"])

#get the foreign key using a join with the related table
df_cust = df_cust.join(df_cust_seg, on=["Segment"])

#select the columns of interest in the correct order
df_cust = df_cust.select("Customer_ID", "Customer_Segment_ID", "Customer_Name")

#display the results
display(df_cust.head(5))

Customer_ID,Customer_Segment_ID,Customer_Name
JO-152804,1,Jas O'Carroll
DB-136154,1,Doug Bickford
JK-160904,1,Juliana Krohn
SJ-201252,3,Sanjit Jacobs
PA-190602,3,Pete Armstrong


### 6. Prepare the dimension table "Product_Category"

In [0]:
#drop duplicates to get all the unique values
df_prod_cat = df.select("Category", "Sub_Category").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_prod_cat = df_prod_cat.withColumn("Product_Category_ID", f.row_number().over(Window.partitionBy().orderBy("Category", "Sub_Category")))

#select the columns of interest in the correct order
df_prod_cat = df_prod_cat.select("Product_Category_ID", "Category", "Sub_Category")

#display the results
display(df_prod_cat.head(5))

Product_Category_ID,Category,Sub_Category
1,Furniture,Bookcases
2,Furniture,Chairs
3,Furniture,Furnishings
4,Furniture,Tables
5,Office Supplies,Appliances


### 7. Prepare the dimension table "Product"

In [0]:
#drop duplicates to get all the unique values
df_prod = df.select("Product_ID", "Product_Name", "Order_Date")

#calculate the last order date by Product_ID
df_prod = df_prod.withColumn("Order_Date_last", f.max("Order_Date").over(Window.partitionBy("Product_ID")))

#filter the dataframe with only the last order date to get the last Product Name for each Product ID
df_prod = df_prod.filter(f.col("Order_Date_last")==f.col("Order_Date"))

#drop duplicates by Product ID to eliminate few cases when the Product Name changed in the last order date
df_prod = df_prod.drop_duplicates(subset=["Product_ID"])

#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df.select("Product_ID", "Product_Name", "Category", "Sub_Category").drop_duplicates()

#combine the results between the dimension table and auxiliary dataframe
df_prod = df_prod.join(df_aux, on=["Product_ID", "Product_Name"])

#get the foreign key using a join with the related table
df_prod = df_prod.join(df_prod_cat, on=["Category", "Sub_Category"])

#select the columns of interest in the correct order
df_prod = df_prod.select("Product_ID", "Product_Category_ID", "Product_Name")

#display the results
display(df_prod.head(5))

Product_ID,Product_Category_ID,Product_Name
OFF-PA-10003971,11,Xerox 1965
OFF-PA-10004948,11,Xerox 190
OFF-AR-10003477,6,4009 Highlighters
OFF-BI-10000069,7,"GBC Prepunched Paper, 19-Hole, for Binding Systems, 24-lb"
OFF-EN-10002140,8,"Jiffy Business Envelopes, Recycled"


### 8. Prepare the dimension table "Order Priority"

In [0]:
#drop duplicates to get all the unique values
df_ord_prio = df.select("Order_Priority").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_ord_prio = df_ord_prio.withColumn("Order_Priority_ID", f.row_number().over(Window.partitionBy().orderBy("Order_Priority")))

#select the columns of interest in the correct order
df_ord_prio = df_ord_prio.select("Order_Priority_ID", "Order_Priority")

#display the results
display(df_ord_prio.head(5))

Order_Priority_ID,Order_Priority
1,Critical
2,High
3,Low
4,Medium


### 9. Prepare the dimension table "Calendar_Date"

In [0]:
#create dataframe from scratch with a constant value
df_calendar = spark.createDataFrame([(1,)], ["id"])

#create a date sequence by day
df_calendar = df_calendar.withColumn("Date", f.explode(f.expr("sequence(to_date('2000-01-01'), to_date('2100-01-01'), interval 1 day)")))

#create a incremental integer to be the key of the dimension table
df_calendar = df_calendar.withColumn("Date_ID", f.row_number().over(Window.partitionBy().orderBy("Date")))

#select the columns of interest in the correct order
df_calendar = df_calendar.select("Date_ID", "Date")

display(df_calendar.head(5))

Date_ID,Date
1,2000-01-01
2,2000-01-02
3,2000-01-03
4,2000-01-04
5,2000-01-05


### 10. Prepare the dimension table "Ship_Mode"

In [0]:
#drop duplicates to get all the unique values
df_ship_mode = df.select("Ship_Mode").drop_duplicates()

#create a incremental integer to be the key of the dimension table
df_ship_mode = df_ship_mode.withColumn("Ship_Mode_ID", f.row_number().over(Window.partitionBy().orderBy("Ship_Mode")))

#select the columns of interest in the correct order
df_ship_mode = df_ship_mode.select("Ship_Mode_ID", "Ship_Mode")

#display the results
display(df_ship_mode.head(5))

Ship_Mode_ID,Ship_Mode
1,First Class
2,Same Day
3,Second Class
4,Standard Class


### 11. Prepare the dimension table "Ship"

In [0]:
#drop duplicates to get all the unique values
df_ship = df.select("Ship_Date", "Ship_Mode", "Shipping_Cost").drop_duplicates()

#create an auxiliary dataframe to use as a reference for the foreign key
df_ship = df_ship.withColumn("Ship_ID", f.row_number().over(Window.partitionBy().orderBy("Ship_Date", "Ship_Mode", "Shipping_Cost")))

#combine the results between the dimension table and auxiliary dataframe
df_ship = df_ship.join(df_ship_mode, on=["Ship_Mode"])

#get the foreign key using a join with the related table
cond = df_ship["Ship_Date"] == df_calendar["Date"]
df_ship = df_ship.join(df_calendar, on=cond)

#select the columns of interest in the correct order
df_ship = df_ship.select("Ship_ID", "Ship_Mode_ID", "Date_ID", "Shipping_Cost")

#change the Date column name
df_ship = df_ship.withColumnRenamed("Date_ID", "Ship_Date")

#display the results
display(df_ship.head(5))

Ship_ID,Ship_Mode_ID,Ship_Date,Shipping_Cost
1,2,4021,125.32
2,2,4021,7.46
3,3,4023,4.82
4,3,4023,8.17
5,4,4024,24.1


### 12. Prepare the fact table "Order"

#### 12.1 Combine the fact table "Order" with the dimension table "Location"

In [0]:
#drop duplicates to get all the unique values
df_order = df.select("Order_ID", "Quantity", "Sales", "Discount", "Profit", "Country", "State", "Region", "City", "Market", "Market2", "Order_Date", "Ship_Date", "Ship_Mode", "Shipping_Cost", "Customer_ID", "Customer_Name", "Segment", "Product_ID", "Product_Name","Order_Priority").drop_duplicates()

#change the Date column name
df_order = df_order.withColumnRenamed("Order_ID", "Order_Local_ID")

#create an auxiliary dataframe to use as a reference for the foreign key
df_order = df_order.withColumn("Order_ID", f.row_number().over(Window.partitionBy().orderBy("Order_Date", "Customer_ID")))

#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df_location.join(df_market, on=["Market_ID"])

#combine the results between the fact table and the auxiliary dataframe
df_order = df_order.join(df_aux, on=["Country", "State", "Region", "City", "Market", "Market2"])

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Country", "State", "Region", "City", "Market", "Market2", "Market_ID")

#### 12.2 Combine the fact table "Order" with the dimension table "Calendar_Date"

In [0]:
#combine the results between the fact table and the dimension table
cond = df_order["Order_Date"] == df_calendar["Date"]
df_order = df_order.join(df_calendar, on=cond)

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Order_Date", "Date")

#change the Date column name
df_order = df_order.withColumnRenamed("Date_ID", "Order_Date")

#### 12.3 Combine the fact table "Order" with the dimension table "Ship"

In [0]:
#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df_ship.join(df_ship_mode, on=["Ship_Mode_ID"])

#combine the results between the dimension table and auxiliary dataframe
cond = df_aux["Ship_Date"] == df_calendar["Date_ID"]
df_aux = df_aux.join(df_calendar, on=cond)

#drop the unnecessary columns and change the Date column name
df_aux = df_aux.drop("Ship_Date", "Date_ID")
df_aux = df_aux.withColumnRenamed("Date", "Ship_Date")

#combine the results between the fact table and the dimension table
df_order = df_order.join(df_aux, on=["Ship_Date", "Ship_Mode", "Shipping_Cost"])

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Ship_Mode_ID", "Ship_Date", "Ship_Mode", "Shipping_Cost")

#### 12.4 Combine the fact table "Order" with the dimension table "Customer"

In [0]:
#create an auxiliary dataframe to use as a reference for the foreign key
df_aux = df_cust.join(df_cust_seg, on=["Customer_Segment_ID"])

#combine the results between the fact table and the dimension table
df_order = df_order.join(df_aux, on=["Customer_ID", "Customer_Name", "Segment"])

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Customer_Name", "Segment", "Customer_Segment_ID")

#### 12.5 Combine the fact table "Order" with the dimension table "Product"

In [0]:
#combine the results between the fact table and the dimension table
df_order = df_order.join(df_prod, on=["Product_ID", "Product_Name"])

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Product_Name", "Product_Category_ID")

#### 12.6 Combine the fact table "Order" with the dimension table "Order_Priority"

In [0]:
#combine the results between the fact table and the dimension table
df_order = df_order.join(df_ord_prio, on=["Order_Priority"])

#drop the unnecessary columns in the fact column
df_order = df_order.drop("Order_Priority")

#select the columns of interest in the correct order
df_order = df_order.select("Order_ID", "Order_Local_ID", "Location_ID", "Order_Date", "Ship_ID", "Customer_ID", "Order_Priority_ID", "Product_ID", "Quantity", "Sales", "Discount", "Profit")

#### 12.7 Convert to numerical few columns from fact table "Order"

In [0]:
#Convert from String to Integer type the column "Quantity"
df_order = df_order.withColumn("Quantity", f.col("Quantity").cast(t.IntegerType()))
#Convert to Integer type the columns "Sales", "Discount", and "Profit"
df_order = df_order.withColumn("Sales", f.col("Sales").cast(t.DoubleType()))
df_order = df_order.withColumn("Discount", f.col("Discount").cast(t.DoubleType()))
df_order = df_order.withColumn("Profit", f.col("Profit").cast(t.DoubleType()))

#Display the results
display(df_order.head(5))

Order_ID,Order_Local_ID,Location_ID,Order_Date,Ship_ID,Customer_ID,Order_Priority_ID,Product_ID,Quantity,Sales,Discount,Profit
38745,CA-2014-100055,3417,5263,38618,MD-178604,4,OFF-AP-10001469,3,125.0,0.0,36.2877
38744,CA-2014-100055,3417,5263,38599,MD-178604,4,FUR-FU-10001473,2,27.0,0.0,9.8856
46528,CA-2014-130211,3586,5410,46110,BD-116204,2,OFF-ST-10000129,3,333.0,0.0,23.3163
46527,CA-2014-130211,3586,5410,46109,BD-116204,2,FUR-TA-10003748,2,249.0,0.0,54.7756
3062,US-2011-112949,3586,4189,3194,Co-126404,3,OFF-AP-10001005,6,472.0,0.0,155.727


### 13. Create the database "munder" and save all the results as delta tables

In [0]:
%sql
-- create the database "company" to save all the results as delta tables
create database company

In [0]:
#save all the dataframe results as delta tables in the "company" database

df_order.write.format("delta").mode("overwrite").saveAsTable("company.order")

df_market.write.format("delta").mode("overwrite").saveAsTable("company.market")

df_location.write.format("delta").mode("overwrite").saveAsTable("company.location")

df_calendar.write.format("delta").mode("overwrite").saveAsTable("company.calendar_date")

df_ship_mode.write.format("delta").mode("overwrite").saveAsTable("company.ship_mode")

df_ship.write.format("delta").mode("overwrite").saveAsTable("company.ship")

df_cust.write.format("delta").mode("overwrite").saveAsTable("company.customer")

df_cust_seg.write.format("delta").mode("overwrite").saveAsTable("company.customer_segment")

df_ord_prio.write.format("delta").mode("overwrite").saveAsTable("company.order_priority")

df_prod.write.format("delta").mode("overwrite").saveAsTable("company.product")

df_prod_cat.write.format("delta").mode("overwrite").saveAsTable("company.product_category")