# Silver Layer Research Notebook

## Import Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Access Data Lake using Credentials


In [0]:
#### This allows Data Bricks to access the storage account(Data Lake)
### replace these values with right credentials

## <storage-account> :  YOUR_STORAGE_ACCOUNT_NAME
## <application-id> : YOUR_APPLICATION_ID
## service_credential : YOUR_SECRET_VALUE
## <directory-id> : YOUR_Directory_ID

spark.conf.set("fs.azure.account.auth.type.awdatastoragedatalake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.awdatastoragedatalake.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.awdatastoragedatalake.dfs.core.windows.net", "979a0794-0963-4b3c-b078-566c59d89823")
spark.conf.set("fs.azure.account.oauth2.client.secret.awdatastoragedatalake.dfs.core.windows.net", "Hlk8Q~wJ8CjPPLWTPUvbHQ-ub0LTAmAPep1CScKg")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.awdatastoragedatalake.dfs.core.windows.net", "https://login.microsoftonline.com/aa232db2-7a78-4414-a529-33db9124cba7/oauth2/token")

## Load Data

In [0]:
# Load AdventureWorks_Calendar dataset
df_cal = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar")

In [0]:
# Load AdventureWorks_Customers dataset
df_custom = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Customers")

In [0]:
# Load AdventureWorks_Product_Categories dataset
df_prod_cat = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [0]:
# Load AdventureWorks_Products dataset
df_prod = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Products")

In [0]:
# Load AdventureWorks_Returns dataset
df_returns = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Returns")

In [0]:
# Concatenate all the Sales_2015, Sales_2016, Sales_2017 datasets and Load
df_sales = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Sales*')
     

In [0]:
# Load AdventureWorks_Territories dataset
df_terr = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Territories")

In [0]:
# Load Product_Subcategories dataset
df_sub = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awdatastoragedatalake.dfs.core.windows.net/Product_Subcategories")

## Transformation

#### 1.Calendar dataset

In [0]:
# Extract new columns 'Month' and 'Year
df_cal = df_cal.withColumn('Month', month(col('Date'))).withColumn('Year', year(col('Date')))

In [0]:
# push new calendar dataset to datalake (silver)
df_cal.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar").save()

#### 2.Customer dataset

In [0]:
# Concatenate 'Prefix','FirstName','LastName' columns to create a new column named 'Full Name'
df_custom = df_custom.withColumn('Full Name', concat(col('Prefix'), lit(' '), col('FirstName'), lit(' '), col('LastName')))

In [0]:
# push new Customer dataset to datalake (silver)
df_custom.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Customers").save()

%md
#### 3.Products categories dataset

In [0]:
# No Transformation
# push new AdventureWorks_Returns dataset to datalake (silver)
df_prod_cat.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Product_Categories").save()

#### 4.Products dataset

In [0]:
# Get first index from the 'productSKU' & 'productName' columns
df_prod = df_prod.withColumn("productSKU",split(col("productSKU"),"-")[0]).withColumn("productName",split(col("productName")," ")[0])


In [0]:
# push new products dataset to datalake (silver)
df_prod.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Products").save()

#### 5.AdventureWorks_Returns

In [0]:
# No Transformation
# push new AdventureWorks_Returns dataset to datalake (silver)
df_returns.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Returns").save()

#### 6.Product Sales 2015, 2016, 2017 dataset

In [0]:
# 1. convert 'StockDate' column into Time-Stamp Date format
df_sales = df_sales.withColumn('StockDate', to_timestamp(col('StockDate')))

# 2. replace S -> T
df_sales = df_sales.withColumn('OrderNumber', regexp_replace('OrderNumber', 'S', 'T'))

# 3. Multiply "OrderLineItem" and "OrderQuantity"
df_sales = df_sales.withColumn('OrderQuantity * OrderLineItem', col('OrderQuantity') * col('OrderLineItem'))

In [0]:
# push new Product Sales dataset to datalake (silver)
df_sales.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/Product_Sales").save()

#### 7.AdventureWorks_Territories

In [0]:
# No Transformation
# push new AdventureWorks_Territories dataset to datalake (silver)
df_terr.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/AdventureWorks_Territories").save()

#### 8.Product Subcategories

In [0]:
# No Transformation
# push new Product Subcategories dataset to datalake (silver)
df_sub.write.format("parquet").mode("append").option("path", "abfss://silver@awdatastoragedatalake.dfs.core.windows.net/Product_Subcategories").save()

## Data Analysis

#### 1.Sales Data Analysis

In [0]:
df_sales.show(n=10, truncate=False)   

+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+-----------------------------+
|OrderDate |StockDate          |OrderNumber|ProductKey|CustomerKey|TerritoryKey|OrderLineItem|OrderQuantity|OrderQuantity * OrderLineItem|
+----------+-------------------+-----------+----------+-----------+------------+-------------+-------------+-----------------------------+
|2017-01-01|2003-12-13 00:00:00|TO61285    |529       |23791      |1           |2            |2            |4                            |
|2017-01-01|2003-09-24 00:00:00|TO61285    |214       |23791      |1           |3            |1            |3                            |
|2017-01-01|2003-09-04 00:00:00|TO61285    |540       |23791      |1           |1            |1            |1                            |
|2017-01-01|2003-09-28 00:00:00|TO61301    |529       |16747      |1           |2            |2            |4                            |
|2017-01-01|2003-10-21 00:0

In [0]:
## How many oders did recieve everyday?

df_sales.groupBy("OrderDate").agg(count('OrderNumber').alias('Total_Orders')).display()

OrderDate,Total_Orders
2017-01-06,151
2017-01-27,142
2017-02-26,119
2017-01-24,173
2017-06-29,172
2017-02-16,124
2017-04-09,140
2017-02-28,162
2017-03-28,149
2017-06-30,136


Databricks visualization. Run in Databricks to view.

##### 

#### 2.Product Categories Data Analysis

In [0]:
# show all product categories
df_prod_cat.display()

ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


Databricks visualization. Run in Databricks to view.

#### 3.Product Retuen Data Analysis

In [0]:
# How many products Items returned in each day?
df_returns.display()

ReturnDate,TerritoryKey,ProductKey,ReturnQuantity
2015-01-18,9,312,1
2015-01-18,10,310,1
2015-01-21,8,346,1
2015-01-22,4,311,1
2015-02-02,6,312,1
2015-02-15,1,312,1
2015-02-19,9,311,1
2015-02-24,8,314,1
2015-03-08,8,350,1
2015-03-13,9,350,1


Databricks visualization. Run in Databricks to view.