###Transformation Layer Scripting

####Data Access using APP:

In [0]:
spark.conf.set("fs.azure.account.auth.type.awstoragedlake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.awstoragedlake.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.awstoragedlake.dfs.core.windows.net", "9b6a323d-7f20-4efd-87c7-fe8f844ca2a4")
spark.conf.set("fs.azure.account.oauth2.client.secret.awstoragedlake.dfs.core.windows.net", "PtO8Q~HC6X2~G16cQ2MwSuIWUHysrr.JF.g.naXE")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.awstoragedlake.dfs.core.windows.net", "https://login.microsoftonline.com/70fb14d0-68e7-4e3c-a38c-9175b54c37cd/oauth2/token")

####Data Loading:

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *                 

#####Reading Data

In [0]:
df_calendar=spark.read.format("csv")\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Calendar")


In [0]:
df_customers=spark.read.format("csv")\
                       .option("header", "true")\
                       .option("inferSchema", "true")\
                       .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Customers")

In [0]:
df_PC=spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Product_Categories")

In [0]:
df_PsubC=spark.read.format("csv")\
                   .option("header", "true")\
                   .option("inferSchema", "true")\
                   .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Product_Subcategories")

In [0]:
df_products=spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Products")                           

In [0]:
df_returns=spark.read.format("csv")\
                .option("header", "true")\
                .option("inferSchema", "true")\
                .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Returns")                                         

In [0]:
df_sales=spark.read.format("csv")\
                  .option("header", "true")\
                  .option("inferSchema", "true")\
                  .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Sales_*")

In [0]:
df_territories=spark.read.format("csv")\
                         .option("header", "true")\
                         .option("inferSchema", "true")\
                         .load("abfss://bronze-container@awstoragedlake.dfs.core.windows.net/Territories") 

###TRANSFORMATIONS:

####Calendar Transformation

In [0]:
df_calendar.display()

In [0]:
## Creating two new fields Month and Year in calendar dataframe
df_calendar=df_calendar.withColumn('Month',month('Date')).withColumn('Year',year('Date'))
df_calendar.display()

In [0]:
# Write the new calendar data to Silver layer in Data lake storage
df_calendar.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Calendar").save()

####Customers

In [0]:
df_customers.display()

In [0]:
# Create a new column called full name:
df_customers.withColumn("FullName", concat(df_customers.Prefix, lit(" "), df_customers.FirstName, lit(" "), df_customers.LastName)).display()


In [0]:
#Best approach use delimiter one time.
df_customers=df_customers.withColumn("Full Name",concat_ws(' ',col('Prefix'),col('FirstName'),col('LastName')))
df_customers.display()

In [0]:
df_customers.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Customers").save()

####Product Categories

In [0]:
df_PC.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Product_Categories").save()                                                                 

####Sub Categories

In [0]:
df_PsubC.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Product_Subcategories").save()                                                          

####Products

In [0]:
df_products.display()           

In [0]:
df_products=df_products.withColumn("ProductSKU",split(col("ProductSKU"),"-")[0])\
                    .withColumn("ProductName",split(col("ProductName"),' ')[0])
df_products.display()

In [0]:
df_products.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Products").save()  

####Returns

In [0]:
df_returns.display()

In [0]:
df_returns.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Returns").save()

####Territory

In [0]:
df_territories.display()

In [0]:
df_territories=df_territories.withColumn("Region",when(col("Region")=="Canada","South").when(col("Region")=="France","North").when(col("Region")=="Germany","East").when(col("Region")=="Australia","West").when(col("Region")=="United Kingdom","East").otherwise(col("Region")))
df_territories.display()

In [0]:
df_territories.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Territories").save()

####Sales Data

In [0]:
df_sales.display()

In [0]:
# Lets make a transformation to convert date column to timestamp
df_sales=df_sales.withColumn("StockDate", to_timestamp(col("StockDate"), "yyyy-MM-dd"))
df_sales.display()

In [0]:
# Now make transformation to replace order number first letter S with H
df_sales=df_sales.withColumn("OrderNumber", regexp_replace(col("OrderNumber"), "S", "H"))
df_sales.display()

In [0]:
# Multiply column Orderquantity and Orderlineitem to form a new column
df_sales=df_sales.withColumn("Multiply",col("OrderLineItem")*col("OrderQuantity" ))
df_sales=df_sales.drop("OrderTotal")
df_sales.display()


####Sales Analysis

In [0]:
#Aggregate the data based on Orderdate daywise showing count of orders
df_sales.groupBy("OrderDate").agg(count("OrderNumber")).alias("Count of Orders").display()

In [0]:
# Show Category wise sales
df_PC.display()

In [0]:
# Count of regions in a country
df_territories.display()

In [0]:
# Write sales to silver layer
df_sales.write.format("parquet").mode("append").option("path","abfss://silver-container@awstoragedlake.dfs.core.windows.net/Sales").save()