In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#SILVER LAYER SCRIPT 

In [0]:
spark.conf.set(
  "fs.azure.account.auth.type.awprojectdatalakegen2.dfs.core.windows.net",
  "OAuth"
)

spark.conf.set(
  "fs.azure.account.oauth.provider.type.awprojectdatalakegen2.dfs.core.windows.net",
  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)

spark.conf.set(
  "fs.azure.account.oauth2.client.id.awprojectdatalakegen2.dfs.core.windows.net",
  dbutils.secrets.get(scope="adls-scope", key="client-id")
)

spark.conf.set(
  "fs.azure.account.oauth2.client.secret.awprojectdatalakegen2.dfs.core.windows.net",
  dbutils.secrets.get(scope="adls-scope", key="client-secret")
)

spark.conf.set(
  "fs.azure.account.oauth2.client.endpoint.awprojectdatalakegen2.dfs.core.windows.net",
  dbutils.secrets.get(scope="adls-scope", key="tenant-endpoint")
)

## DATA LOADING 

#### Read General Data

In [0]:
df_cal = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Calendar")

In [0]:
df_cus = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Customers")

In [0]:
df_Prod_Cat = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Product_Categories")

In [0]:
df_Prod_Subcat = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/Product_Subcategories")

In [0]:
df_Prod = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Products")

In [0]:
df_ret = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Returns")

In [0]:
df_Sales2015 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Sales_2015")

In [0]:
df_Sales2016 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Sales_2016")

In [0]:
df_Sales2017 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Sales_2017")

In [0]:
df_Sales= spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Sales*")

In [0]:
df_Territories = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("abfss://bronze@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Territories")

##TRANSFROMATION


###Calendar

In [0]:
df_cal.display()

In [0]:
df_cal = df_cal.withColumn('Month',month(col('Date')))\
            .withColumn('Year',year(col('Date')))
df_cal.display()

In [0]:
df_cal.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Calendar")\
            .save() 

###Customers

In [0]:
df_cus.display()

In [0]:
df_cus.withColumn("FullName",concat(col("Prefix"),lit(' '),col("FirstName"),lit(' '),col("LastName"))).display()


In [0]:
df_cus = df_cus.withColumn("FullName",concat_ws(' ',col("Prefix"),col("FirstName"),col("LastName")))


In [0]:
df_cus.display()


In [0]:
df_cus.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Customers")\
            .save()

#### Product SubCategories


In [0]:
df_Prod_Subcat.display()

In [0]:
df_Prod_Subcat.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Product_Subcategories")\
            .save()

####Products 

In [0]:
df_Prod.display()


In [0]:
df_Prod=df_Prod.withColumn("ProductSKU",split(col("ProductSKU"),"-")[0])\
                .withColumn("ProductName",split(col("ProductName")," ")[0])

In [0]:
df_Prod.display()


In [0]:
df_Prod.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Products")\
            .save()

###Returns 

In [0]:
df_ret.display()


In [0]:
df_ret.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Returns")\
            .save()

### Territories

In [0]:
df_Territories.display()

In [0]:
df_Territories.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Territories")\
            .save()

####SALES

In [0]:
df_Sales.display()


In [0]:
from pyspark.sql.functions import to_timestamp
df_Sales = df_Sales.withColumn("StockDate", to_timestamp("StockDate"))


In [0]:
from pyspark.sql.functions import regexp_replace
df_Sales = df_Sales.withColumn("OrderNumber",regexp_replace(col('OrderNumber'),"S","T"))


In [0]:
df_Sales = df_Sales.withColumn("multiply",col("OrderLineItem")*col("OrderQuantity"))

In [0]:
df_Sales.display()

#####SALES ANALYSIS

In [0]:
df_Sales.groupBy("OrderDate").agg(count("OrderNumber").alias("Total_Order")).display()

Databricks visualization. Run in Databricks to view.

In [0]:
df_Prod_Cat.display()


Databricks visualization. Run in Databricks to view.

In [0]:
df_Territories.display()


Databricks visualization. Run in Databricks to view.

In [0]:
df_Sales.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@awprojectdatalakegen2.dfs.core.windows.net/AdventureWorks_Sales")\
            .save()