# SILVER LAYER SCRIPT

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Data Access Using App

In [0]:
spark.conf.set("fs.azure.account.auth.type.opapastoragedatalake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.opapastoragedatalake.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.opapastoragedatalake.dfs.core.windows.net", "e97b0685-f0ff-4d3b-929a-a6c5d36663ed")
spark.conf.set("fs.azure.account.oauth2.client.secret.opapastoragedatalake.dfs.core.windows.net", "LDO8Q~8si5okkbohtA2fyZyyYX86cLPxTqpFAcRL")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.opapastoragedatalake.dfs.core.windows.net", "https://login.microsoftonline.com/6e2cc058-79af-4df8-92e6-ee6e0d1dea3e/oauth2/token")

### Data Loading

**Reading data**

In [0]:
# Reading Calender Data
df_calender=spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar")

In [0]:
# Reading Customer Data
df_customers = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Customers/")


In [0]:
# Reading Product Categories data
df_procat = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Product_Categories/")


In [0]:
# Reading Products data
df_products = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Products/")


In [0]:
# Reading Returns data
df_returns = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Returns/")


In [0]:
# Reading Sales data
df_sales = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Sales*/")


In [0]:
df_territories = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Territories/")


In [0]:
df_subcat = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://bronze@opapastoragedatalake.dfs.core.windows.net/Product_Subcategories/")


### **Transformations**

**Calender**

In [0]:
df_calender.display()

In [0]:
df_calender=df_calender.withColumn("Month",month(col("Date"))).withColumn("Year",year(col("Date")))
df_calender.display()

In [0]:
# Writing Calender data to Silver Layer
df_calender.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Calendar")

**Customers**

In [0]:
df_customers.display()

In [0]:
df_customers=df_customers.withColumn("Full Name",concat_ws(" ",col("Prefix"),col("FirstName"),col("LastName")))\
                         .withColumn("Age",(datediff(current_date(), col("BirthDate")) / 365).cast("int"))\
                         .withColumn("Children",col("TotalChildren").cast("int"))\
                         .withColumn("Marital Status",when(col("MaritalStatus")=="M","Married").otherwise("Single"))\
                         .withColumn("Gender",expr("case when Gender='M' then 'Male' when Gender='NA' then 'Unknown' else 'Female' end"))
df_customers=df_customers.drop("FirstName","LastName","MaritalStatus","TotalChildren")
df_customers.display()

In [0]:
df_customers.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Customers")

**Product Category**

In [0]:
df_procat.display()

ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


In [0]:
df_procat.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Product_Categories")


**Sub Category**

In [0]:
df_subcat.display()

ProductSubcategoryKey,SubcategoryName,ProductCategoryKey
1,Mountain Bikes,1
2,Road Bikes,1
3,Touring Bikes,1
4,Handlebars,2
5,Bottom Brackets,2
6,Brakes,2
7,Chains,2
8,Cranksets,2
9,Derailleurs,2
10,Forks,2


In [0]:
df_subcat.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/Product_Subcategories")


**Products**

In [0]:
df_products.display()

In [0]:
df_products=df_products.withColumn("ProductSKU",split((col("ProductSKU")),'-')[0])\
                        .withColumn("ProductName",split((col("ProductName")),' ')[0])

df_products.display()                        

In [0]:
df_products.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Products")


**Returns**

In [0]:
df_returns.display()

In [0]:
df_returns.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Returns")


**Territories**

In [0]:
df_territories.display()

SalesTerritoryKey,Region,Country,Continent
1,Northwest,United States,North America
2,Northeast,United States,North America
3,Central,United States,North America
4,Southwest,United States,North America
5,Southeast,United States,North America
6,Canada,Canada,North America
7,France,France,Europe
8,Germany,Germany,Europe
9,Australia,Australia,Pacific
10,United Kingdom,United Kingdom,Europe


In [0]:
df_territories.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Territories")


**Sales**

In [0]:
df_sales.display()

In [0]:
df_sales=df_sales.withColumn("StockDate",to_timestamp(col("StockDate")))\
                 .withColumn("OrderNumber",regexp_replace(col("OrderNumber"),"S", "T"))\
                 .withColumn("Multiply",col("OrderLineItem")*col("OrderQuantity"))    

df_sales.display()                 

In [0]:
df_sales.write.mode("overwrite").format("parquet").save("abfss://silver@opapastoragedatalake.dfs.core.windows.net/AdventureWorks_Sales")

**Sales Analysis**

In [0]:
df_sales.groupby("OrderDate").agg(count("OrderNumber").alias("Total Orders")).display()

In [0]:
df_procat.display()

ProductCategoryKey,CategoryName
1,Bikes
2,Components
3,Clothing
4,Accessories


Databricks visualization. Run in Databricks to view.