In [107]:
import pandas as pd
import numpy as np
import datetime 
import config 

In [108]:
# get module variables
ROOT_DIR = config.ROOT_DIR

In [109]:
import findspark
findspark.init()

In [110]:
# spark session start to begin transforming data (processing layer)
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("adventure_works_analysis").getOrCreate()

#### Transform AdventureWorks_Products.csv from raw to processed

In [111]:
aw_products_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Products.csv")

In [112]:
aw_products_dim.head()

Unnamed: 0,ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductSize,ProductStyle,ProductCost,ProductPrice
0,214,31,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap...",Red,0,0,13.0863,34.99
1,215,31,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap...",Black,0,0,12.0278,33.6442
2,218,23,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers st...,White,M,U,3.3963,9.5
3,219,23,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers st...,White,L,U,3.3963,9.5
4,220,31,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap...",Blue,0,0,12.0278,33.6442


In [113]:
aw_products_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 293 entries, 0 to 292
Data columns (total 11 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   ProductKey             293 non-null    int64  
 1   ProductSubcategoryKey  293 non-null    int64  
 2   ProductSKU             293 non-null    object 
 3   ProductName            293 non-null    object 
 4   ModelName              293 non-null    object 
 5   ProductDescription     293 non-null    object 
 6   ProductColor           243 non-null    object 
 7   ProductSize            293 non-null    object 
 8   ProductStyle           293 non-null    object 
 9   ProductCost            293 non-null    float64
 10  ProductPrice           293 non-null    float64
dtypes: float64(2), int64(2), object(7)
memory usage: 25.3+ KB


In [114]:
aw_products_dim.drop(columns="ProductSize", inplace=True)
aw_products_dim.sort_values(by="ProductKey", inplace=True)

In [115]:
# create new column DiscountPrice with 15% discount
aw_products_dim["DiscountPrice"] = aw_products_dim["ProductPrice"] * 0.85

# limit numeric columns ProductCost and ProductPrice to 2 decimal places 
aw_products_dim.loc[:, ["ProductCost", "ProductPrice", "DiscountPrice"]] = aw_products_dim[["ProductCost", "ProductPrice", "DiscountPrice"]].round(2)

In [116]:
# Extract letters before second '-' in ProductSKU column
product_sku_split = aw_products_dim["ProductSKU"].str.split("-", expand=True)
aw_products_dim["SKUType"] = product_sku_split[0].str.cat(product_sku_split[1], sep="-")

In [117]:
# replace 0s in ProductStyle column with null values
aw_products_dim["ProductStyle"].replace("0", np.nan, inplace=True)

In [118]:
aw_products_dim.head()

Unnamed: 0,ProductKey,ProductSubcategoryKey,ProductSKU,ProductName,ModelName,ProductDescription,ProductColor,ProductStyle,ProductCost,ProductPrice,DiscountPrice,SKUType
0,214,31,HL-U509-R,"Sport-100 Helmet, Red",Sport-100,"Universal fit, well-vented, lightweight , snap...",Red,,13.09,34.99,29.74,HL-U509
1,215,31,HL-U509,"Sport-100 Helmet, Black",Sport-100,"Universal fit, well-vented, lightweight , snap...",Black,,12.03,33.64,28.6,HL-U509
2,218,23,SO-B909-M,"Mountain Bike Socks, M",Mountain Bike Socks,Combination of natural and synthetic fibers st...,White,U,3.4,9.5,8.07,SO-B909
3,219,23,SO-B909-L,"Mountain Bike Socks, L",Mountain Bike Socks,Combination of natural and synthetic fibers st...,White,U,3.4,9.5,8.07,SO-B909
4,220,31,HL-U509-B,"Sport-100 Helmet, Blue",Sport-100,"Universal fit, well-vented, lightweight , snap...",Blue,,12.03,33.64,28.6,HL-U509


In [119]:
aw_products_dim.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 293 entries, 0 to 292
Data columns (total 12 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   ProductKey             293 non-null    int64  
 1   ProductSubcategoryKey  293 non-null    int64  
 2   ProductSKU             293 non-null    object 
 3   ProductName            293 non-null    object 
 4   ModelName              293 non-null    object 
 5   ProductDescription     293 non-null    object 
 6   ProductColor           243 non-null    object 
 7   ProductStyle           209 non-null    object 
 8   ProductCost            293 non-null    float64
 9   ProductPrice           293 non-null    float64
 10  DiscountPrice          293 non-null    float64
 11  SKUType                293 non-null    object 
dtypes: float64(3), int64(2), object(7)
memory usage: 29.8+ KB


In [120]:
aw_products_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_products_dim.csv", index=False)

#### Transform AdventureWorks_Customers.csv from raw to processed

In [121]:
aw_customers_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Customers.csv")

In [122]:
aw_customers_dim.head()

Unnamed: 0,CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner
0,11000,MR.,JON,YANG,4/8/1966,M,M,jon24@adventure-works.com,"$90,000",2,Bachelors,Professional,Y
1,11001,MR.,EUGENE,HUANG,5/14/1965,S,M,eugene10@adventure-works.com,"$60,000",3,Bachelors,Professional,N
2,11002,MR.,RUBEN,TORRES,8/12/1965,M,M,ruben35@adventure-works.com,"$60,000",3,Bachelors,Professional,Y
3,11003,MS.,CHRISTY,ZHU,2/15/1968,S,F,christy12@adventure-works.com,"$70,000",0,Bachelors,Professional,N
4,11004,MRS.,ELIZABETH,JOHNSON,8/8/1968,S,F,elizabeth5@adventure-works.com,"$80,000",5,Bachelors,Professional,Y


In [123]:
aw_customers_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18148 entries, 0 to 18147
Data columns (total 13 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   CustomerKey     18148 non-null  int64 
 1   Prefix          18018 non-null  object
 2   FirstName       18148 non-null  object
 3   LastName        18148 non-null  object
 4   BirthDate       18148 non-null  object
 5   MaritalStatus   18148 non-null  object
 6   Gender          18018 non-null  object
 7   EmailAddress    18148 non-null  object
 8   AnnualIncome    18148 non-null  object
 9   TotalChildren   18148 non-null  int64 
 10  EducationLevel  18148 non-null  object
 11  Occupation      18148 non-null  object
 12  HomeOwner       18148 non-null  object
dtypes: int64(2), object(11)
memory usage: 1.8+ MB


In [124]:
# use series string methods with transform to capitalize (proper case) Prefix, FirstName, LastName
aw_customers_dim.loc[:, ["Prefix", "FirstName", "LastName"]] = aw_customers_dim.loc[:, ["Prefix", "FirstName", "LastName"]].transform(lambda x: x.str.capitalize())

In [125]:
# concatenate 
aw_customers_dim["Full_Name"] = aw_customers_dim["Prefix"].str.cat(aw_customers_dim["FirstName"], sep=" ").str.cat(aw_customers_dim["LastName"], sep=" ")

In [126]:
email_split = aw_customers_dim["EmailAddress"].str.split("@", expand=True)
aw_customers_dim["UserName"] = email_split[0]
aw_customers_dim["Domain"] = email_split[1].str.split(".com", expand=True)[0].str.replace("-", " ").str.title()

In [127]:
aw_customers_dim["AnnualIncome"] = aw_customers_dim["AnnualIncome"].str.split("$", expand=True)[1].str.replace(",", "").astype(float)

In [128]:
aw_customers_dim["BirthDate"] = pd.to_datetime(aw_customers_dim["BirthDate"], format="%m/%d/%Y")
aw_customers_dim["Birth_Year"] = aw_customers_dim["BirthDate"].dt.year
aw_customers_dim["Current_Age"] = (datetime.datetime.now() - aw_customers_dim["BirthDate"]).astype("m8[Y]")

In [129]:
aw_customers_dim["Parent"] = np.where(aw_customers_dim["TotalChildren"] > 0, "Yes", "No")

In [130]:
aw_customers_dim.head()

Unnamed: 0,CustomerKey,Prefix,FirstName,LastName,BirthDate,MaritalStatus,Gender,EmailAddress,AnnualIncome,TotalChildren,EducationLevel,Occupation,HomeOwner,Full_Name,UserName,Domain,Birth_Year,Current_Age,Parent
0,11000,Mr.,Jon,Yang,1966-04-08,M,M,jon24@adventure-works.com,90000.0,2,Bachelors,Professional,Y,Mr. Jon Yang,jon24,Adventure Works,1966,57.0,Yes
1,11001,Mr.,Eugene,Huang,1965-05-14,S,M,eugene10@adventure-works.com,60000.0,3,Bachelors,Professional,N,Mr. Eugene Huang,eugene10,Adventure Works,1965,57.0,Yes
2,11002,Mr.,Ruben,Torres,1965-08-12,M,M,ruben35@adventure-works.com,60000.0,3,Bachelors,Professional,Y,Mr. Ruben Torres,ruben35,Adventure Works,1965,57.0,Yes
3,11003,Ms.,Christy,Zhu,1968-02-15,S,F,christy12@adventure-works.com,70000.0,0,Bachelors,Professional,N,Ms. Christy Zhu,christy12,Adventure Works,1968,55.0,No
4,11004,Mrs.,Elizabeth,Johnson,1968-08-08,S,F,elizabeth5@adventure-works.com,80000.0,5,Bachelors,Professional,Y,Mrs. Elizabeth Johnson,elizabeth5,Adventure Works,1968,54.0,Yes


In [131]:
aw_customers_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18148 entries, 0 to 18147
Data columns (total 19 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   CustomerKey     18148 non-null  int64         
 1   Prefix          18018 non-null  object        
 2   FirstName       18148 non-null  object        
 3   LastName        18148 non-null  object        
 4   BirthDate       18148 non-null  datetime64[ns]
 5   MaritalStatus   18148 non-null  object        
 6   Gender          18018 non-null  object        
 7   EmailAddress    18148 non-null  object        
 8   AnnualIncome    18148 non-null  float64       
 9   TotalChildren   18148 non-null  int64         
 10  EducationLevel  18148 non-null  object        
 11  Occupation      18148 non-null  object        
 12  HomeOwner       18148 non-null  object        
 13  Full_Name       18018 non-null  object        
 14  UserName        18148 non-null  object        
 15  Do

In [132]:
aw_customers_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_customers_dim.csv", index=False)

#### Transform AdventureWorks_Calendar

In [133]:
aw_calendar_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Calendar.csv")

# convert Date column to datetime[ns] dtype
aw_calendar_dim["Date"] = pd.to_datetime(aw_calendar_dim["Date"], format="%m/%d/%Y")

In [134]:
aw_calendar_dim.head()

Unnamed: 0,Date
0,2015-01-01
1,2015-01-02
2,2015-01-03
3,2015-01-04
4,2015-01-05


In [135]:
aw_calendar_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 912 entries, 0 to 911
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   Date    912 non-null    datetime64[ns]
dtypes: datetime64[ns](1)
memory usage: 7.2 KB


In [136]:
aw_calendar_dim["Day_Name"] = aw_calendar_dim["Date"].dt.day_name()
aw_calendar_dim["Month_Name"] = aw_calendar_dim["Date"].dt.month_name()
aw_calendar_dim["Year"] = aw_calendar_dim["Date"].dt.year
aw_calendar_dim["Start_of_Year"] = aw_calendar_dim["Date"].dt.to_period("Y").dt.to_timestamp()
aw_calendar_dim["Start_of_Month"] = aw_calendar_dim["Date"].dt.to_period("M").dt.to_timestamp()
aw_calendar_dim["Start_of_Week"] = aw_calendar_dim["Date"].dt.to_period("W").dt.start_time

In [137]:
aw_calendar_dim["Day_Of_Week"] = aw_calendar_dim["Date"].dt.dayofweek + 1 # Monday = 1, Sunday = 7
aw_calendar_dim["Weekend"] = np.where((aw_calendar_dim["Day_Of_Week"] == 6) | (aw_calendar_dim["Day_Of_Week"] == 7), "Weekend", "Weekday")

In [138]:
aw_calendar_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 912 entries, 0 to 911
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Date            912 non-null    datetime64[ns]
 1   Day_Name        912 non-null    object        
 2   Month_Name      912 non-null    object        
 3   Year            912 non-null    int64         
 4   Start_of_Year   912 non-null    datetime64[ns]
 5   Start_of_Month  912 non-null    datetime64[ns]
 6   Start_of_Week   912 non-null    datetime64[ns]
 7   Day_Of_Week     912 non-null    int64         
 8   Weekend         912 non-null    object        
dtypes: datetime64[ns](4), int64(2), object(3)
memory usage: 64.2+ KB


In [139]:
aw_calendar_dim.head()

Unnamed: 0,Date,Day_Name,Month_Name,Year,Start_of_Year,Start_of_Month,Start_of_Week,Day_Of_Week,Weekend
0,2015-01-01,Thursday,January,2015,2015-01-01,2015-01-01,2014-12-29,4,Weekday
1,2015-01-02,Friday,January,2015,2015-01-01,2015-01-01,2014-12-29,5,Weekday
2,2015-01-03,Saturday,January,2015,2015-01-01,2015-01-01,2014-12-29,6,Weekend
3,2015-01-04,Sunday,January,2015,2015-01-01,2015-01-01,2014-12-29,7,Weekend
4,2015-01-05,Monday,January,2015,2015-01-01,2015-01-01,2015-01-05,1,Weekday


In [140]:
aw_calendar_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_calendar_dim.csv", index=False)

#### Concatenate AdventureWorks_Sales 2015, 2016 and 2017 along vertical axis (equivalent to sql union operation)

In [141]:
aw_sales_2015 = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Sales_2015.csv")
aw_sales_2016 = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Sales_2016.csv")
aw_sales_2017 = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Sales_2017.csv")

In [142]:
aw_sales_fact = pd.concat([aw_sales_2015, aw_sales_2016, aw_sales_2017], ignore_index=True)
aw_sales_fact["QuantityType"] = np.where(aw_sales_fact["OrderQuantity"] > 1, "Multiple Items", "Single Item")

In [143]:
aw_sales_fact["OrderDate"] = pd.to_datetime(aw_sales_fact["OrderDate"], format="%m/%d/%Y")
aw_sales_fact["StockDate"] = pd.to_datetime(aw_sales_fact["StockDate"], format="%m/%d/%Y")

In [144]:
aw_sales_fact.head()

Unnamed: 0,OrderDate,StockDate,OrderNumber,ProductKey,CustomerKey,TerritoryKey,OrderLineItem,OrderQuantity,QuantityType
0,2015-01-01,2001-09-21,SO45080,332,14657,1,1,1,Single Item
1,2015-01-01,2001-12-05,SO45079,312,29255,4,1,1,Single Item
2,2015-01-01,2001-10-29,SO45082,350,11455,9,1,1,Single Item
3,2015-01-01,2001-11-16,SO45081,338,26782,6,1,1,Single Item
4,2015-01-02,2001-12-15,SO45083,312,14947,10,1,1,Single Item


In [145]:
aw_sales_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56046 entries, 0 to 56045
Data columns (total 9 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   OrderDate      56046 non-null  datetime64[ns]
 1   StockDate      56046 non-null  datetime64[ns]
 2   OrderNumber    56046 non-null  object        
 3   ProductKey     56046 non-null  int64         
 4   CustomerKey    56046 non-null  int64         
 5   TerritoryKey   56046 non-null  int64         
 6   OrderLineItem  56046 non-null  int64         
 7   OrderQuantity  56046 non-null  int64         
 8   QuantityType   56046 non-null  object        
dtypes: datetime64[ns](2), int64(5), object(2)
memory usage: 3.8+ MB


In [146]:
aw_sales_fact.to_csv(rf"{ROOT_DIR}/storage/processed/aw_sales_fact.csv", index=False)

#### Transform AdventureWorks_Territories

In [147]:
aw_territories_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Territories.csv")

In [148]:
aw_territories_dim.head()

Unnamed: 0,SalesTerritoryKey,Region,Country,Continent
0,1,Northwest,United States,North America
1,2,Northeast,United States,North America
2,3,Central,United States,North America
3,4,Southwest,United States,North America
4,5,Southeast,United States,North America


In [149]:
aw_territories_dim.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 4 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   SalesTerritoryKey  10 non-null     int64 
 1   Region             10 non-null     object
 2   Country            10 non-null     object
 3   Continent          10 non-null     object
dtypes: int64(1), object(3)
memory usage: 448.0+ bytes


In [150]:
aw_territories_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_territories_dim.csv", index=False)

#### Transform AdventureWorks_Product_Categories.csv and AdventureWorks_Product_Subcategories.csv

In [151]:
aw_product_categories_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Product_Categories.csv")

In [152]:
aw_product_categories_dim

Unnamed: 0,ProductCategoryKey,CategoryName
0,1,Bikes
1,2,Components
2,3,Clothing
3,4,Accessories


In [153]:
aw_product_categories_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_product_categories_dim.csv", index=False)

In [154]:
aw_product_subcategories_dim = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Product_SubCategories.csv")

In [155]:
aw_product_subcategories_dim.head()

Unnamed: 0,ProductSubcategoryKey,SubcategoryName,ProductCategoryKey
0,1,Mountain Bikes,1
1,2,Road Bikes,1
2,3,Touring Bikes,1
3,4,Handlebars,2
4,5,Bottom Brackets,2


In [156]:
aw_product_subcategories_dim.to_csv(rf"{ROOT_DIR}/storage/processed/aw_product_subcategories_dim.csv", index=False)

#### Transform AdventureWorks_Returns.csv

In [157]:
aw_returns_fact = pd.read_csv(rf"{ROOT_DIR}/storage/raw/AdventureWorks_Returns.csv")

aw_returns_fact["ReturnDate"] = pd.to_datetime(aw_returns_fact["ReturnDate"], format="%m/%d/%Y")

In [158]:
aw_returns_fact.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1809 entries, 0 to 1808
Data columns (total 4 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   ReturnDate      1809 non-null   datetime64[ns]
 1   TerritoryKey    1809 non-null   int64         
 2   ProductKey      1809 non-null   int64         
 3   ReturnQuantity  1809 non-null   int64         
dtypes: datetime64[ns](1), int64(3)
memory usage: 56.7 KB


In [159]:
aw_returns_fact.to_csv(rf"{ROOT_DIR}/storage/processed/aw_returns_fact.csv", index=False)