### IMPORT LIBRARIES

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

### CONFIGURE DATABRICKS + ADLS CONNECTION

In [0]:
tenant_id = dbutils.secrets.get("oinscope","oinSP-tenant-id")

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get("oinscope","oinSP-client-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get("oinscope","oinSP-ID"),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

### CREATE WIDGETS

In [0]:
dbutils.widgets.text("storage-acc","","Storage_Account")
dbutils.widgets.text("container","","Container")

### GET WIDGETS VALUE FROM PIPELINE PARAMETER

In [0]:
storage_acc = dbutils.widgets.get("storage-acc")
container = dbutils.widgets.get("container")

### MOUNT DATA LAKE STORAGE

In [0]:
if "/mnt/mountbronze" in [m.mountPoint for m in dbutils.fs.mounts()]:
    dbutils.fs.unmount("/mnt/mountbronze")

dbutils.fs.mount(source = f"abfss://{container}@{storage_acc}.dfs.core.windows.net/",
                mount_point = "/mnt/mountbronze",
                extra_configs = configs)

### READ CALENDER DATA

In [0]:
calender_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Calender/AdventureWorks_Calendar.csv")

In [0]:
customer_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Customers/AdventureWorks_Customers.csv")

In [0]:
product_cat_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Product_Categories/AdventureWorks_Product_Categories.csv")

In [0]:
product_subcat_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Product_Subcategories/AdventureWorks_Product_Subcategories.csv")

In [0]:
products_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Products/AdventureWorks_Products.csv")

In [0]:
returns_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Returns/AdventureWorks_Returns.csv")

In [0]:
territories_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .option("mode","PERMISSIVE") \
                        .load("/mnt/mountbronze/Territories/AdventureWorks_Territories.csv")

In [0]:
## RECURSIVELY READ ALL FILES HAVING SALES IN FILE NAME

sales_df = spark.read.format("csv") \
                        .option("header", "true") \
                        .option("inferSchema", "true") \
                        .load("/mnt/mountbronze/Sales*")


sales_df.select(year("OrderDate")).distinct().display()

### TRANSFORMATIONS 

### CALENDER

In [0]:
## CREATE MONTH AND YEAR

calender_df = calender_df.withColumn("Year",year(col("Date"))) \
                            .withColumn("Month",month(col("Date")))

### CUSTOMER

In [0]:
## CREATE FULL NAME BY CONCAT PREFIX,FIRSTNAME,LASTNAME AND DROP ORIGINAL COLUMNS

customer_df = customer_df.withColumn("Full_name",initcap(concat_ws(' ',col("Prefix"),col("FirstName"),col("LastName")))) \
                         .drop(col("Prefix"),col("LastName"),col("FirstName"))

### PRODUCTS

In [0]:
products_df = products_df.withColumn("ProductSKU",split(col("ProductSKU"),'-')[0]) \
                         .withColumn("ProductName",split(col("ProductName"),' ')[0])

### MOUNT SILVER CONTAINER

In [0]:
if "/mnt/mountSilver" in [m.mountPoint for m in dbutils.fs.mounts()]:
    dbutils.fs.unmount("/mnt/mountSilver")

dbutils.fs.mount(source = f"abfss://silver@{storage_acc}.dfs.core.windows.net/",
                mount_point = "/mnt/mountSilver",
                extra_configs = configs)

### WRTIE CALENDER DATA TO SILVER CONTAINER

In [0]:
calender_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/calender") \
            .save()

In [0]:
customer_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/customer") \
            .save()

In [0]:
product_cat_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/product_categories") \
            .save()

In [0]:
product_subcat_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/product_subcategories") \
            .save()

In [0]:
products_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/products") \
            .save()

In [0]:
returns_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/returns") \
            .save()

In [0]:
territories_df.write.format("parquet") \
            .mode("overwrite") \
            .option("path","/mnt/mountSilver/territories") \
            .save()

In [0]:
sales_df = sales_df.withColumn("StockDate",to_timestamp("StockDate")) \
                .withColumn("OrderNumber",regexp_replace("OrderNumber","S","T")) \
                .withColumn("Multiplied",col("OrderLineItem") * col("OrderQuantity"))

### SALES ANALYSIS

In [0]:
## How many orders we received in each date
sales_df.groupby("OrderDate").agg(count("OrderNumber").alias("No. of Orders")).sort("OrderDate",ascending=False).display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
## Product category and orders

orders_cat_df = product_cat_df.join(product_subcat_df,on = "ProductCategoryKey",how = "left") \
                                .join(products_df,on = "ProductSubcategoryKey",how = "left") \
                                .join(sales_df,on = "ProductKey",how = "left") \
                                .select("ProductCategoryKey","CategoryName","ProductSubcategoryKey","SubcategoryName","ProductKey",     "ProductName","ProductCost","ProductPrice","OrderDate","OrderNumber","OrderQuantity")

Databricks visualization. Run in Databricks to view.

In [0]:
## Each product category wise how many orders

orders_cat_df.groupby("CategoryName").agg(count("OrderNumber").alias("Number of Orders")).sort(col("Number of Orders"), ascending=False).display()

Databricks visualization. Run in Databricks to view.

In [0]:
## Each category wise total amount sales

orders_cat_df.groupby("CategoryName").agg(sum(col("OrderQuantity") * col("ProductPrice")).alias("Total Sales")).sort("Total Sales",ascending=False).display()

Databricks visualization. Run in Databricks to view.

In [0]:
## How many regions in each country

territories_df.groupBy("Country").agg(countDistinct("Region").alias("No. of Regions")).display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
sales_df.write.format("parquet") \
        .mode("overwrite") \
         .option("path","/mnt/mountSilver/sales") \
         .save()