## Step 1: Data Ingestion

####Establishing JDBC Connection to the Azure SQL server

In [0]:
jdbchostname = "advaithsql.database.windows.net"
jdbcport = "1433"
jdbcdatabase = "Advaithsqldb"
jdbcuser = "advaith"
jdbcpwd = dbutils.secrets.get(scope = "my-scope", key = "azsqlpwd")

jdbcurl = f"jdbc:sqlserver://{jdbchostname}:{jdbcport};database={jdbcdatabase};user={jdbcuser};password={jdbcpwd}"

Read DIM product table

In [0]:
df_product = spark.read.format("jdbc").option("url",jdbcurl).option("dbtable",'dbo.dim_product').option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").load()
display(df_product)

product_id,product_name,category,price,manufacturer,color,weight
1,Laptop,Electronics,800.0,Dell,Silver,2.5
2,Smartphone,Electronics,500.0,Samsung,Black,0.3
3,Headphones,Electronics,50.0,Sony,White,0.1
4,WashingMachine,Electronics,1000.0,LG,,
5,AC,Electronics,2000.0,Voltas,,


Read Fact table

In [0]:
df_sales = spark.read.format("jdbc").option("url", jdbcurl).option("dbtable", "dbo.fact_product").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()
display(df_sales)

sale_date,product_id,quantity_sold,revenue
2024-04-01,1,10,8000.0
2024-04-01,2,20,10000.0
2024-04-02,1,5,4000.0
2024-04-02,1,3,2400.0
2024-04-02,3,15,750.0
2024-04-03,2,10,5000.0
2024-04-02,2,15,1750.0
2024-04-03,2,10,5400.0
2024-04-03,4,2,1200.0
2024-04-03,5,6,6600.0


## Step 2: Pre-processing

- Remove nulls
- Remove duplicates
- Convert Data types
- Missing values - Impute, default
- Data validations
- Outliers
- Filter irrelevant data
- Transformations

In [0]:
df_product_cleansed = df_product.na.fill({"color":"White","weight":1})

display(df_product_cleansed)

product_id,product_name,category,price,manufacturer,color,weight
1,Laptop,Electronics,800.0,Dell,Silver,2.5
2,Smartphone,Electronics,500.0,Samsung,Black,0.3
3,Headphones,Electronics,50.0,Sony,White,0.1
4,WashingMachine,Electronics,1000.0,LG,White,1.0
5,AC,Electronics,2000.0,Voltas,White,1.0


In [0]:
display(df_sales)

sale_date,product_id,quantity_sold,revenue
2024-04-01,1,10,8000.0
2024-04-01,2,20,10000.0
2024-04-02,1,5,4000.0
2024-04-02,1,3,2400.0
2024-04-02,3,15,750.0
2024-04-03,2,10,5000.0
2024-04-02,2,15,1750.0
2024-04-03,2,10,5400.0
2024-04-03,4,2,1200.0
2024-04-03,5,6,6600.0


In [0]:
df_sales_cleansed = df_sales.dropDuplicates()
display(df_sales_cleansed)

sale_date,product_id,quantity_sold,revenue
2024-04-02,1,5,4000.0
2024-04-03,4,2,200.0
2024-04-03,4,2,1200.0
2024-04-02,3,15,750.0
2024-04-03,2,10,5400.0
2024-04-01,2,20,10000.0
2024-04-01,1,10,8000.0
2024-04-03,5,6,2600.0
2024-04-02,2,15,1750.0
2024-04-03,2,10,5000.0


Join the two tables and get the required columns

In [0]:
df_joined = df_product_cleansed.join(df_sales_cleansed, df_product_cleansed.product_id == df_sales_cleansed.product_id, "left").select(df_product_cleansed.product_id, df_product_cleansed.product_name, df_product_cleansed.price, df_product_cleansed.manufacturer, df_sales_cleansed.revenue)

display(df_joined)

product_id,product_name,price,manufacturer,revenue
1,Laptop,800.0,Dell,2400.0
1,Laptop,800.0,Dell,8000.0
1,Laptop,800.0,Dell,4000.0
3,Headphones,50.0,Sony,750.0
5,AC,2000.0,Voltas,6600.0
5,AC,2000.0,Voltas,2600.0
4,WashingMachine,1000.0,LG,1200.0
4,WashingMachine,1000.0,LG,200.0
2,Smartphone,500.0,Samsung,5000.0
2,Smartphone,500.0,Samsung,1750.0


Grouping the joined table to find the total revenue

In [0]:
df_grouped = df_joined.groupBy("product_id","product_name","manufacturer").sum("revenue").withColumnRenamed("sum(revenue)","Total Revenue")

display(df_grouped)

product_id,product_name,manufacturer,Total Revenue
1,Laptop,Dell,14400.0
3,Headphones,Sony,750.0
5,AC,Voltas,9200.0
4,WashingMachine,LG,1400.0
2,Smartphone,Samsung,22150.0


## Step 3: Load Data to ADLS

In [0]:
df_grouped.write.format("csv").save("/mnt/datasets/electronics_products_info")

In [0]:
%fs ls /mnt/datasets

path,name,size,modificationTime
dbfs:/mnt/datasets/books.csv,books.csv,808,1752685047000
dbfs:/mnt/datasets/cars.csv,cars.csv,20751,1752685047000
dbfs:/mnt/datasets/electronics_products_info/,electronics_products_info/,0,1753317175000


In [0]:
from datetime import datetime

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

output_path = f"/mnt/datasets/electronics_products_info_{timestamp}"

df_grouped.orderBy("product_id").coalesce(1).write.format("csv").option("header","true").mode("overwrite").save(output_path)

In [0]:
%fs ls /mnt/datasets

path,name,size,modificationTime
dbfs:/mnt/datasets/books.csv,books.csv,808,1752685047000
dbfs:/mnt/datasets/cars.csv,cars.csv,20751,1752685047000
dbfs:/mnt/datasets/electronics_products_info/,electronics_products_info/,0,1753317175000
dbfs:/mnt/datasets/electronics_products_info_20250724_004305/,electronics_products_info_20250724_004305/,0,1753317787000
dbfs:/mnt/datasets/electronics_products_info_20250724_004554/,electronics_products_info_20250724_004554/,0,1753317956000
