In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

###DATA ACCESS USING APP 

In [0]:
spark.conf.set("fs.azure.account.auth.type.destoragelake1.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.destoragelake1.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.destoragelake1.dfs.core.windows.net", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.destoragelake1.dfs.core.windows.net", "<your-client-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.destoragelake1.dfs.core.windows.net", "https://login.microsoftonline.com/<your-tenant-id>/oauth2/token")


### DATA LOADING

#### Reading Data

In [0]:
df_cal = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureWorks_calendar')

In [0]:
df_cus = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureworks-customers')

In [0]:
df_procat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureworks-product-categories')

In [0]:
df_pro = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/AdventureWorks_Products')

In [0]:
df_ret = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureworks_returns')

In [0]:
df_sales = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureworks_sales*')

In [0]:
df_ter = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/adventureworks_territories')

In [0]:
df_subcat = spark.read.format('csv')\
            .option("header",True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@destoragelake1.dfs.core.windows.net/product_subcategories')

### TRANSFORMATIONS

#### Calender

In [0]:
df_cal.display()

In [0]:
df_cal = df.withColumn('Month',month(col('Date')))\
            .withColumn('Year',year(col('Date')))

In [0]:
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_calendar")\
            .save()

#### Customers

In [0]:
df_cus.display()

In [0]:
df_cus.withColumn("fullName",concat(col('Prefix'),lit(' '),col('FirstName'),lit(' '),col('LastName'))).display()

In [0]:
df_cus = df_cus.withColumn('fullName',concat_ws(' ',col('Prefix'),col('FirstName'),col('lastName')))

In [0]:
df_cus.display()

In [0]:
df_cus.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_customers")\
            .save()

#### Sub Categories

In [0]:
df_subcat.display()

In [0]:
df_subcat.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_subategories")\
            .save()

#### Products

In [0]:
df_pro.display()

In [0]:
df_pro = df_pro.withColumn('ProductSKU',split(col('ProductSKU'),'-')[0])\
                .withColumn('ProductName',split(col('ProductName'),' ')[0])

In [0]:
df_pro.display()

In [0]:
df_pro.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_products")\
            .save()

#### Returns

In [0]:
df_ret.display()

In [0]:
df_ret.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_returns")\
            .save()

#### Territories

In [0]:
df_ter.display()

In [0]:
df_ter.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_territories")\
            .save()

In [0]:
df_sales.display()

In [0]:
df_sales = df_sales.withColumn('StockDate',to_timestamp('StockDate'))

In [0]:
df_sales = df_sales.withColumn('OrderNumber',regexp_replace(col('OrderNumber'),'S','T'))

In [0]:
df_sales = df_sales.withColumn('multiply',col('OrderLineItem')*col('OrderQuantity'))

In [0]:
df_sales.display()

In [0]:
df_sales.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@destoragelake1.dfs.core.windows.net/adventureworks_sales")\
            .save()

#### Sales Analysis

In [0]:
df_sales.groupBy('OrderDate').agg(count('OrderNumber').alias('total_order')).display()

In [0]:
df_procat.display()

In [0]:
df_ter.display()