### IMPORT LIBRARIES

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

### CONFIGURE DATALAKE + DATABRICKS ACCESS

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "4fb90b9b-dc4b-423e-b6a0-b69e7ec6c133",
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get("oinscope","oinSP-ID"),
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d190dabc-b32b-45d1-8fa5-dec58f0885a2/oauth2/token"
}

In [0]:
spark.conf.set("fs.azure.account.auth.type.storageadlsdemooin.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.storageadlsdemooin.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.storageadlsdemooin.dfs.core.windows.net", "4fb90b9b-dc4b-423e-b6a0-b69e7ec6c133")
spark.conf.set("fs.azure.account.oauth2.client.secret.storageadlsdemooin.dfs.core.windows.net", dbutils.secrets.get("oinscope","oinSP-ID"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.storageadlsdemooin.dfs.core.windows.net", "https://login.microsoftonline.com/d190dabc-b32b-45d1-8fa5-dec58f0885a2/oauth2/token")

In [0]:
dbutils.widgets.text(name = "StorageAccountName", defaultValue = "", label = "StorageAccountName")
dbutils.widgets.text(name = "ContainerName", defaultValue = "" , label = "ContainerName")

### GET WIDGETS VALUE

In [0]:
storage_acc = dbutils.widgets.get("StorageAccountName")
container = dbutils.widgets.get("ContainerName")

### CREATE MOUNT POINT

In [0]:
if "/mnt/mount-raw-container" in [m.mountPoint for m in dbutils.fs.mounts()]:
    dbutils.fs.unmount("/mnt/mount-raw-container")
else:
    dbutils.fs.mount(source = f"abfss://{container}@{storage_acc}.dfs.core.windows.net",
                    mount_point = "/mnt/mount-raw-container",
                    extra_configs = configs)

### Read the CSV file into a Spark DataFrame

In [0]:
raw_df = spark.read.format("csv") \
                    .option("header","True") \
                    .option("inferSchema","True") \
                    .load("/mnt/mount-raw-container/sales_data_proj_2.csv")

raw_df.display()

Date,Product,Region,UnitsSold,UnitPrice
2025-01-01,Widget,A,100,20.0
2025-01-01,Widget,B,50,20.0
2025-01-02,Gadget,A,150,15.5
2025-01-03,Widget,A,200,20.0
2025-01-03,Gadget,C,75,15.5
2025-01-04,Widget,B,120,20.0


### Remove duplicates and rows with null values

In [0]:
clean_df = raw_df.dropDuplicates().dropna()
clean_df.display()

Date,Product,Region,UnitsSold,UnitPrice
2025-01-03,Gadget,C,75,15.5
2025-01-01,Widget,B,50,20.0
2025-01-02,Gadget,A,150,15.5
2025-01-01,Widget,A,100,20.0
2025-01-03,Widget,A,200,20.0
2025-01-04,Widget,B,120,20.0


### Add a new column "TotalSale" calculated as UnitsSold * UnitPrice

In [0]:
transformed_df = clean_df.withColumn("TotalSale", col("UnitsSold") * col("UnitPrice"))
transformed_df.display()

Date,Product,Region,UnitsSold,UnitPrice,TotalSale
2025-01-03,Gadget,C,75,15.5,1162.5
2025-01-01,Widget,B,50,20.0,1000.0
2025-01-02,Gadget,A,150,15.5,2325.0
2025-01-01,Widget,A,100,20.0,2000.0
2025-01-03,Widget,A,200,20.0,4000.0
2025-01-04,Widget,B,120,20.0,2400.0


### Write the transformed data back to ADLS in Parquet format

In [0]:
transformed_df.write.mode("overwrite").parquet("abfss://herovired-processed@storageadlsdemooin.dfs.core.windows.net/sales_data_transformed")