
### SLIVER LAYER SCRIPT

In [0]:
from pyspark.sql.functions import (
    col,
    lit,
    concat_ws,
    current_date,
    current_timestamp,
    to_date,
    date_format,
    year,
    month,
    dayofmonth,
    datediff,
    add_months
)

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DateType,
    TimestampType
)



## Data access using Application

#### For Hard-Coding the credentials

spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

In [0]:
dbutils.fs.ls("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/")


#### dbutils.secrets

In [0]:
dbutils.secrets.list(scope='jayeshscope')

In [0]:
dbutils.secrets.get(scope='jayeshscope', key='app-secret')

In [0]:
spark.conf.set(
  "fs.azure.account.oauth2.client.secret.jtdatalakestorage.dfs.core.windows.net",
  dbutils.secrets.get(scope="jayeshscope", key="app-secret"))


In [0]:
service_credential = dbutils.secrets.get(scope="jayeshscope",key="app-secret")

spark.conf.set("fs.azure.account.auth.type.jtdatalakestorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.jtdatalakestorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.jtdatalakestorage.dfs.core.windows.net", "b4b287c7-0259-4593-aa7e-97fb218ff175")
spark.conf.set("fs.azure.account.oauth2.client.secret.jtdatalakestorage.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.jtdatalakestorage.dfs.core.windows.net", "https://login.microsoftonline.com/09e368db-e8d2-4081-adb3-e98c48cee1de/oauth2/token")


## DATA LOADING


## Reading data

In [0]:
df_cal = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Calendar/")


In [0]:
df_cus = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Customers")

In [0]:
df_prodcat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [0]:
df_pro = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Products")

In [0]:
df_ret = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Returns")

In [0]:
df_sales = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Sales*")

In [0]:
df_ter = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Territories")

In [0]:
df_subcat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load("abfss://bronze@jtdatalakestorage.dfs.core.windows.net/Product_Subcategories")


## Transformations


## Calender

In [0]:
df_cal.display()

In [0]:
df_cal = df_cal.withColumn("Month", month(col("Date"))) \
               .withColumn("Year", year(col("Date")))

display(df_cal)


In [0]:
df_cal.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Calendar")


## Customers

In [0]:
df_cus.display()

In [0]:
df_cus = df_cus.withColumn("FullName",concat(col('prefix'),lit(' '),col('FirstName'),lit(' '),col('LastName')))

df_cus.display()

In [0]:
df_cus = df_cus.withColumn("FullName",concat_ws(' ',col('prefix'),col('FirstName'),col('LastName')))

df_cus.display()

In [0]:
df_cus.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Customers")


## Sub Categories

In [0]:
df_subcat.display()

In [0]:
df_subcat.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Subcategories")


#### Products

In [0]:
df_pro.display()

In [0]:
df_pro = df_pro.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
            .withColumn('ProductName',split(col('ProductName'),' ')[0])

In [0]:
df_pro.display()

In [0]:
df_pro.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Products")

In [0]:
df_ret.display()

In [0]:
df_ret.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Returns")


#### Territories

In [0]:
df_ter.display()

In [0]:
df_ter.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Territories")


### Sales

In [0]:
df_sales.display()

In [0]:
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))

In [0]:
df_sales = df_sales.withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'J','T')) 

In [0]:
df_sales = df_sales.withColumn('multiply',col('OrderLineItem')*col('OrderQuantity'))

In [0]:
df_sales.display()


#### Sales Analysis 

In [0]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('total_number')).display()   

In [0]:
df_prodcat.display()

In [0]:
df_ter.display()

In [0]:
df_sales.write.format('parquet')\
            .mode("append")\
            .save("abfss://silver@jtdatalakestorage.dfs.core.windows.net/AdventureWorks_Sales")