# Silver Layer Script

#### Importing Pyspark Functions and Types

In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *


### Data Access Using App

In [0]:
https://learn.microsoft.com/en-us/azure/databricks/connect/storage/azure-storage#–oauth-20

# Credentials needed:
# Application (client) ID        --> <APPLICATION_CLIENT_ID>
# Tenant ID                      --> <TENANT_ID>
# Storage Account (Storage Name) --> <STORAGE_ACCOUNT_NAME>
# Secret value                    --> <CLIENT_SECRET_VALUE>

spark.conf.set("fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "<APPLICATION_CLIENT_ID>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "<CLIENT_SECRET_VALUE>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/<TENANT_ID>/oauth2/token")

### Data Loading

#### Reading Data

In [0]:
# Load the data
df_cal = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Calendar')

In [0]:
# Load the data
df_cust = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Customers')

In [0]:
# Load the data
df_pcat = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Product_Categories')

In [0]:
# Load the data
df_psbcat = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Product_Subcategories')

In [0]:
# Load the data
df_prod = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Products')

In [0]:
# Load the data
df_ret = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Returns')

In [0]:
# Load the data
# The * is a wildcard for all files in the directory. In this case, the name convention is the same so the only difference is the year. So we can just use the wildcard to get all years.
df_sales = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Sales/AdventureWorks_Sales_*.csv')

In [0]:
# Load the data
df_terr = spark.read.format('csv').option("header", True).option('InferSchema', True).load('abfss://bronze-raw@adventureworksdl.dfs.core.windows.net/AdventureWorks_Territories')

### Transformations

##### Calendar

In [0]:
df_cal.display()

In [0]:
# Adding month & year columns to Calendar csv 
df_cal = df.withColumn("Month", month(col('Date')))\
           .withColumn("Year", year(col('Date')))
df_cal.display()

In [0]:
# Push data to the silver layer in Parquet format (stores data in a columnar format rather than row based format)
df_cal.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Calendar')\
    .save()

##### Customers

In [0]:
df_cust.display()

In [0]:
df_cust.withColumn('fullName', concat(col("Prefix"),lit(' '),col("FirstName"),lit(' '),col("LastName"))).display()



In [0]:
# Creating a full name column using the Prefix, FirstName and LastName
df_cust = df_cust.withColumn('fullName', concat_ws(' ', col("Prefix"),col("FirstName"),col("LastName")))

In [0]:
df_cust.display()


In [0]:
# Push data to the silver layer in Parquet format (stores data in a columnar format rather than row based format)
df_cust.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Customers')\
    .save()

##### Product Categories

In [0]:
df_pcat.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Product_Categories')\
    .save()

##### Product Subcategories

In [0]:
df_psbcat.display()

In [0]:
df_psbcat.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Product_Subcategories')\
    .save()

##### Products

In [0]:
df_prod.display()

In [0]:
df_prod = df_prod.withColumn('ProductSKU', split(col('ProductSKU'), '-')[0])\
                 .withColumn('ProductName', split(col('ProductName'), ' ')[0])
    

In [0]:
df_prod.display()

In [0]:
df_prod.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Products')\
    .save()

##### Returns

In [0]:
df_ret.display()

In [0]:
df_ret.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Returns')\
    .save()

##### Territories

In [0]:
df_terr.display()

In [0]:
df_terr.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Territories')\
    .save()

##### Sales

In [0]:
df_sales.display()

In [0]:
#Changing Stock Date to timestamp
df_sales = df_sales.withColumn('StockDate', to_timestamp('StockDate'))

In [0]:
# Utilizing REGEXP_REPLACE to change OrderNumber
df_sales = df_sales.withColumn('OrderNumber', regexp_replace(col('OrderNumber'), 'S', 'T'))

In [0]:
df_sales = df_sales.withColumn('Multiply',col('OrderLineItem')*col('OrderQuantity'))

In [0]:
df_sales.display()

##### Analyses

In [0]:
# Analyzing the number of orders received per day
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('Total Daily Orders')).display()

##### Where can we find opportunity to increase share of wallet?

In [0]:
# Display different category tables and their relationship to sales
df_pcat.display()
df_psbcat.display()
df_prod.display()

In [0]:
# Inner join products table to obtain ProductSubcategoryKey
df_wshare = df_sales.join(df_prod, on = 'ProductKey', how='inner')


In [0]:
# Inner join subcategory table to obtain ProductCategoryKey
df_wshare = df_wshare.join(df_psbcat, on = 'ProductSubcategoryKey', how='inner')

In [0]:
#Inner join category table to obtain CategoryName 
df_wshare = df_wshare.join(df_pcat, on = 'ProductCategoryKey', how='inner')

In [0]:
df_wshare.select('OrderDate', 'OrderNumber', 'CategoryName','ProductKey', 'ProductSubcategoryKey', 'ProductCategoryKey').display()

In [0]:
# Aggregrate orders by category 
df_wshare.groupBy('CategoryName').agg(count('OrderNumber').alias('Total Orders Per Category')).display()

CategoryName,Total Orders Per Category
Bikes,13929
Clothing,8510
Accessories,33607


Databricks visualization. Run in Databricks to view.

An area for exploration would be Clothing.

##### Where can we pilot the new clothing strategy?

In [0]:
df_wshare.display()
df_terr.display()

In [0]:
df_wshare = df_wshare.join(df_terr, on = df_wshare['TerritoryKey'] == df_terr['SalesTerritoryKey'], how='inner')

In [0]:
df_wshare.select('OrderDate', 'OrderNumber', 'CategoryName','ProductKey', 'ProductSubcategoryKey', 'ProductCategoryKey', 'Region', 'Country').display()

In [0]:
df_wshare.filter(col('CategoryName') == 'Clothing')\
    .groupBy('Region')\
    .agg(count('OrderNumber').alias('Clothing Orders Per Region'))\
    .display()

Region,Clothing Orders Per Region
Germany,717
France,727
Northwest,1343
Southeast,7
Central,4
Canada,1204
Southwest,1810
Australia,1764
United Kingdom,930
Northeast,4


Databricks visualization. Run in Databricks to view.

The two largest markets are Australia and Southwest, US. These are two good areas for pilot testing new strategies as they represent two vastly different market makeups. This would allow the company to stress test strategies in different environments with opportunity for great learnings.

Another area for exploration would be why is there such a vast difference in sales in Southwest US, vs all other US regions? Is it lack of market presence via physical locations or lack of brand awareness? 

##### Write sales to sliver layer

In [0]:

df_sales.write.format('parquet')\
    .mode('append')\
    .option('path', 'abfss://silver-transform@adventureworksdl.dfs.core.windows.net/AdventureWorks_Sales')\
    .save()