# Transforming the Raw Data (Silver Layer)
## Setting Permissions

In [0]:
# Grab the secret key from the key vault that is stored with the databricks secret scope
service_credential = dbutils.secrets.get(scope="<scope>",key="<service-credential-key (key vault name)>")

# Connect to data lake with the secret key using OAuth 2.0
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")




## Data Ingestion

In [0]:
# Import libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
bronze_folder = 'abfss://bronze@<storage-account>.dfs.core.windows.net/'
subfolders = {
    'territories': 'adventureworks-territories',
    'sales-2017': 'adventureworks-sales-2017',
    'sales-2016': 'adventureworks-sales-2016',
    'sales-2015': 'adventureworks-sales-2015',
    'returns': 'adventureworks-returns',
    'products': 'adventureworks-products',
    'product-subcategories': 'adventureworks-product-subcategories',
    'product-categories': 'adventureworks-product-categories',
    'customers': 'adventureworks-customers',
    'calendar': 'adventureworks-calendar'
}
sales_subfolders = ['sales-2017', 'sales-2016', 'sales-2015']
csv_config_options = {
    'header': True,
    'inferSchema': True
}

df_territories = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['territories'])

df_sales = spark.read.format('csv').options(**csv_config_options).load([bronze_folder + subfolders[subfolder] for subfolder in subfolders if subfolder in sales_subfolders])

df_returns = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['returns'])

df_products = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['products'])

df_products_subcat = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['product-subcategories'])

df_products_cat = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['product-categories'])

df_customers = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['customers'])

df_calendar = spark.read.format('csv').options(**csv_config_options).load(bronze_folder + subfolders['calendar'])

## Data Transformation

In [0]:
# Import libraries
from pyspark.sql import DataFrame, Column

In [0]:
# Environment variables
silver_folder = 'abfss://silver@<storage-account>.dfs.core.windows.net/'
# will save in the Paruqet format due to its column metadata and row chunks allowing for faster projection/filtering, which is what the end-user will be doing when they're analyzing the data

def write_store_parquet(data_frame: DataFrame, subfolder: str):
    data_frame.write\
        .format('parquet')\
        .mode('overwrite')\
        .option('path', silver_folder + subfolder)\
        .save()

def read_parquet(subfolder: str) -> DataFrame:
    return spark.read\
        .format('parquet')\
        .load(silver_folder + subfolder)
    


##### Calendar

In [0]:
# df_calendar.display()
# Transform: Month, Year cols for better granularity for PowerBI user

df_calendar = df_calendar.withColumn('Month', month('Date'))\
    .withColumn('Year', year('Date'))

# Save
write_store_parquet(df_calendar, subfolders['calendar'])
df_calendar.show()

+----------+-----+----+
|      Date|Month|Year|
+----------+-----+----+
|2015-01-01|    1|2015|
|2015-01-02|    1|2015|
|2015-01-03|    1|2015|
|2015-01-04|    1|2015|
|2015-01-05|    1|2015|
|2015-01-06|    1|2015|
|2015-01-07|    1|2015|
|2015-01-08|    1|2015|
|2015-01-09|    1|2015|
|2015-01-10|    1|2015|
|2015-01-11|    1|2015|
|2015-01-12|    1|2015|
|2015-01-13|    1|2015|
|2015-01-14|    1|2015|
|2015-01-15|    1|2015|
|2015-01-16|    1|2015|
|2015-01-17|    1|2015|
|2015-01-18|    1|2015|
|2015-01-19|    1|2015|
|2015-01-20|    1|2015|
+----------+-----+----+
only showing top 20 rows


##### Customer

In [0]:
# df_customers.display()
# Transform: Full name

# Create FullName column
combine_fullname = concat_ws(' ', df_customers.Prefix, df_customers.FirstName, df_customers.LastName)

# Add FullName column
df_customers = df_customers.withColumn('FullName', combine_fullname)
# Recall dataframes are immutable, so must edit metadata (columns) and create new Dataframe with it
columns = df_customers.columns
# Reposition FullName column
full_name_index = columns.index('FullName')
last_name_index = columns.index('LastName')
columns.insert(last_name_index + 1, columns.pop(full_name_index))
df_customers_reordered = df_customers.select(*columns)

# Save
write_store_parquet(df_customers_reordered, subfolders['customers'])
df_customers_reordered.show()

+-----------+------+---------+--------+--------------------+----------+-------------+------+--------------------+------------+-------------+---------------+--------------+---------+
|CustomerKey|Prefix|FirstName|LastName|            FullName| BirthDate|MaritalStatus|Gender|        EmailAddress|AnnualIncome|TotalChildren| EducationLevel|    Occupation|HomeOwner|
+-----------+------+---------+--------+--------------------+----------+-------------+------+--------------------+------------+-------------+---------------+--------------+---------+
|      11000|   MR.|      JON|    YANG|        MR. JON YANG|1966-04-08|            M|     M|jon24@adventure-w...|    $90,000 |            2|      Bachelors|  Professional|        Y|
|      11001|   MR.|   EUGENE|   HUANG|    MR. EUGENE HUANG|1965-05-14|            S|     M|eugene10@adventur...|    $60,000 |            3|      Bachelors|  Professional|        N|
|      11002|   MR.|    RUBEN|  TORRES|    MR. RUBEN TORRES|1965-08-12|            M|     

##### Product Categories

In [0]:
# df_products_cat.display()
write_store_parquet(df_products_cat, subfolders['product-categories'])
df_products_cat.show()

+------------------+------------+
|ProductCategoryKey|CategoryName|
+------------------+------------+
|                 1|       Bikes|
|                 2|  Components|
|                 3|    Clothing|
|                 4| Accessories|
+------------------+------------+



##### Product SubCategories

In [0]:
# df_products_subcat.display()
write_store_parquet(df_products_subcat, subfolders['product-subcategories'])
df_products_subcat.show()

+---------------------+---------------+------------------+
|ProductSubcategoryKey|SubcategoryName|ProductCategoryKey|
+---------------------+---------------+------------------+
|                    1| Mountain Bikes|                 1|
|                    2|     Road Bikes|                 1|
|                    3|  Touring Bikes|                 1|
|                    4|     Handlebars|                 2|
|                    5|Bottom Brackets|                 2|
|                    6|         Brakes|                 2|
|                    7|         Chains|                 2|
|                    8|      Cranksets|                 2|
|                    9|    Derailleurs|                 2|
|                   10|          Forks|                 2|
|                   11|       Headsets|                 2|
|                   12|Mountain Frames|                 2|
|                   13|         Pedals|                 2|
|                   14|    Road Frames|                 

##### Product


In [0]:
# Create function for repositioning columns
def reposition_columns(data_frame: DataFrame, target_col: str, curr_col: str)-> DataFrame:
    columns = data_frame.columns
    # Reposition target column
    target_col_index = columns.index(target_col)
    curr_pos_index = columns.index(curr_col)
    columns.insert(target_col_index + 1, columns.pop(curr_pos_index))
    reordered_data_frame = data_frame.select(*columns)
    return reordered_data_frame

In [0]:
# df_products.display()
# Transform: 
#   SKUcategory = first acronym (e.g. HL), looks like it has to do with region/language as there are multiple rows with it
#   Modelcateogry = first word/acronym (e.g. Sport), looks like it repeats and can be used for further analysis

# Create SKUcategory
ProductSKU_split = split(df_products.ProductSKU, '-')
SKUcategory = ProductSKU_split[0]
# Create Modelcategory
ModelName_split = split(df_products.ModelName, '[- ]')
Modelcategory = ModelName_split[0]

# Reposition columns
df_products = df_products.withColumn('SKUcategory', SKUcategory)\
    .withColumn('Modelcategory', Modelcategory)
reordered_SKUcategory = reposition_columns(df_products, 'ProductSKU', 'SKUcategory')
reordered_Modelcategory = reposition_columns(reordered_SKUcategory, 'ModelName', 'Modelcategory')

# Save
write_store_parquet(reordered_Modelcategory, subfolders['products'])
reordered_Modelcategory.show()

+----------+---------------------+----------+-----------+--------------------+--------------------+-------------+--------------------+------------+-----------+------------+-----------+------------+
|ProductKey|ProductSubcategoryKey|ProductSKU|SKUcategory|         ProductName|           ModelName|Modelcategory|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+---------------------+----------+-----------+--------------------+--------------------+-------------+--------------------+------------+-----------+------------+-----------+------------+
|       214|                   31| HL-U509-R|         HL|Sport-100 Helmet,...|           Sport-100|        Sport|Universal fit, we...|         Red|          0|           0|    13.0863|       34.99|
|       215|                   31|   HL-U509|         HL|Sport-100 Helmet,...|           Sport-100|        Sport|Universal fit, we...|       Black|          0|           0|    12.0278|     33.6442|
|       21

##### Returns

In [0]:
# df_returns.display()
# Note: This is a fact table, so must be as granular as possible i.e. usually don't delete any columns since they're mostly numeric data, just add

# Transform: Add the Price and Cost columns

new_df_products = read_parquet(subfolders['products'])
columns_to_add = new_df_products.select(new_df_products.ProductKey, new_df_products.ProductPrice, new_df_products.ProductCost)
# Combine the tables by using a left join to retain all historical sales records, even if no matching product is found
df_returns = df_returns.join(
    columns_to_add, 
    on='ProductKey', 
    how='left'
)

write_store_parquet(df_returns, subfolders['returns'])
df_returns.show()

+----------+----------+------------+--------------+------------+-----------+
|ProductKey|ReturnDate|TerritoryKey|ReturnQuantity|ProductPrice|ProductCost|
+----------+----------+------------+--------------+------------+-----------+
|       312|2015-01-18|           9|             1|     3578.27|  2171.2942|
|       310|2015-01-18|          10|             1|     3578.27|  2171.2942|
|       346|2015-01-21|           8|             1|     3399.99|  1912.1544|
|       311|2015-01-22|           4|             1|     3578.27|  2171.2942|
|       312|2015-02-02|           6|             1|     3578.27|  2171.2942|
|       312|2015-02-15|           1|             1|     3578.27|  2171.2942|
|       311|2015-02-19|           9|             1|     3578.27|  2171.2942|
|       314|2015-02-24|           8|             1|     3578.27|  2171.2942|
|       350|2015-03-08|           8|             1|     3374.99|  1898.0944|
|       350|2015-03-13|           9|             1|     3374.99|  1898.0944|

##### Territories

In [0]:
# df_territories.display()

write_store_parquet(df_territories, subfolders['territories'])

##### Sales

In [0]:
# df_sales.display()
# print(df_products.count())
# print(df_sales.count())
# print(df_products.columns)
# print(df_sales.columns)
# Transform:
#   AdjHistoricTotalCost = total cost of sold products if it were made today
#   AdjHistoricTotalRevenue = total revenue of sold products if it were made today
#   TimeStamp_StockDate = timestamp, just because I wanted to play around with some functions :P
#   TimeStamp_OrderDate

## Create Historically Adjusted columns
df_sales_potential = df_sales.join(
    new_df_products, 
    on='ProductKey', 
    how='left'
)
df_sales_potential = df_sales_potential.withColumn('AdjHistoricTotalCost', df_sales_potential.OrderQuantity * df_sales_potential.ProductCost)
df_sales_potential = df_sales_potential.withColumn('AdjHistoricTotalRevenue', df_sales_potential.OrderQuantity * df_sales_potential.ProductPrice)

## Create timestamp columns
df_sales_potential = df_sales_potential.withColumn('TimeStamp_StockDate', to_timestamp('StockDate'))
df_sales_potential = df_sales_potential.withColumn('TimeStamp_OrderDate', to_timestamp('OrderDate'))


# Save
write_store_parquet(df_sales_potential, 'sales')
df_sales_potential.show()

+----------+----------+----------+-----------+-----------+------------+-------------+-------------+---------------------+----------+-----------+--------------------+--------------------+-------------+--------------------+------------+-----------+------------+-----------+------------+--------------------+-----------------------+-------------------+-------------------+
|ProductKey| OrderDate| StockDate|OrderNumber|CustomerKey|TerritoryKey|OrderLineItem|OrderQuantity|ProductSubcategoryKey|ProductSKU|SKUcategory|         ProductName|           ModelName|Modelcategory|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|AdjHistoricTotalCost|AdjHistoricTotalRevenue|TimeStamp_StockDate|TimeStamp_OrderDate|
+----------+----------+----------+-----------+-----------+------------+-------------+-------------+---------------------+----------+-----------+--------------------+--------------------+-------------+--------------------+------------+-----------+------------+-

###### Ad-hoc sales analysis

In [0]:
df_total_orders = df_sales_potential.groupBy('OrderDate').agg(count('OrderNumber')).orderBy('OrderDate').alias('TotalOrders')
# df_total_orders.display()

OrderDate,count(OrderNumber)
2015-01-01,4
2015-01-02,4
2015-01-03,8
2015-01-04,5
2015-01-05,3
2015-01-06,6
2015-01-07,4
2015-01-08,8
2015-01-09,4
2015-01-10,4


Databricks visualization. Run in Databricks to view.