In [None]:
spark.conf.set("fs.azure.account.key.adlsgen2strgacct.dfs.core.windows.net", "SAS")

In [None]:
# Import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("DataCleaningAndJoining").getOrCreate()

In [None]:
# File Paths
orders_path = 'abfss://data-input@adlsgen2strgacct.dfs.core.windows.net/orders.csv'
product_path = 'abfss://data-input@adlsgen2strgacct.dfs.core.windows.net/product-supplier.csv'

# Read data into DataFrames
orders_df = spark.read.format("csv").option("header", "true").load(orders_path)
product_df = spark.read.format("csv").option("header", "true").load(product_path)

In [None]:
orders_df.show()

+-----------+---------------+---------------------+-------------+---------+------------+----------------+---------------------------------+-------------------+
|Customer ID|Customer Status|Date Order was placed|Delivery Date| Order ID|  Product ID|Quantity Ordered|Total Retail Price for This Order|Cost Price Per Unit|
+-----------+---------------+---------------------+-------------+---------+------------+----------------+---------------------------------+-------------------+
|        579|         Silver|            01-Jan-17|    07-Jan-17|123002578|220101400106|               2|                             92.6|               20.7|
|       7574|         SILVER|            01-Jan-17|    05-Jan-17|123004074|210201000009|               1|                             21.7|               9.95|
|      28861|           Gold|            01-Jan-17|    04-Jan-17|123000871|230100500068|               1|                              1.7|                0.8|
|      43796|           Gold|           

In [None]:
product_df.show()

+------------+------------+-----------------+--------------------+--------------------+----------------+--------------------+-----------+
|  Product ID|Product Line| Product Category|       Product Group|        Product Name|Supplier Country|       Supplier Name|Supplier ID|
+------------+------------+-----------------+--------------------+--------------------+----------------+--------------------+-----------+
|210100100001|    Children|Children Outdoors|Outdoor things, Kids|Boy's and Girl's ...|              NO|Scandinavian Clot...|         50|
|210100100002|    Children|Children Outdoors|Outdoor things, Kids|   Children's Jacket|              ES| Luna sastreria S.A.|       4742|
|210100100003|    Children|Children Outdoors|Outdoor things, Kids|Children's Jacket...|              NO|Scandinavian Clot...|         50|
|210100100004|    Children|Children Outdoors|Outdoor things, Kids| Children's Rain Set|              NO|Scandinavian Clot...|         50|
|210100100005|    Children|Childre

In [None]:
# Data Cleaning by Removing any Duplicate datasets

orders_df = orders_df.dropDuplicates()
product_df = product_df.dropDuplicates()

In [None]:
# Data Cleaning by doing the necessary Data Type conversion 

orders_transform = orders_df \
    .withColumn("Customer ID", col("Customer ID").cast("int")) \
    .withColumn("Order ID", col("Order ID").cast("int")) \
    .withColumn("Quantity Ordered", col("Quantity Ordered").cast("int")) \
    .withColumn("Total Retail Price for This Order", col("Total Retail Price for This Order").cast("double")) \
    .withColumn("Cost Price Per Unit", col("Cost Price Per Unit").cast("double"))

product_transform = product_df \
    .withColumn("Supplier ID", col("Supplier ID").cast("int"))

In [None]:
# Printing the Schemas to verify the data types 

orders_transform.printSchema()
product_transform.printSchema()

root
 |-- Customer ID: integer (nullable = true)
 |-- Customer Status: string (nullable = true)
 |-- Date Order was placed: string (nullable = true)
 |-- Delivery Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Total Retail Price for This Order: double (nullable = true)
 |-- Cost Price Per Unit: double (nullable = true)

root
 |-- Product ID: string (nullable = true)
 |-- Product Line: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Product Group: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Supplier Country: string (nullable = true)
 |-- Supplier Name: string (nullable = true)
 |-- Supplier ID: integer (nullable = true)



In [None]:
joined_df = orders_transform.join(product_transform, "Product ID", "inner")

In [None]:
joined_df.display()

Product ID,Customer ID,Customer Status,Date Order was placed,Delivery Date,Order ID,Quantity Ordered,Total Retail Price for This Order,Cost Price Per Unit,Product Line,Product Category,Product Group,Product Name,Supplier Country,Supplier Name,Supplier ID
220100100264,19956,Silver,19-Jan-17,19-Jan-17,123000408,1,57.4,28.8,Clothes & Shoes,Clothes,Eclipse Clothing,Big Guy Men's Sweatshirt w/Hood,US,Eclipse Inc,1303
220100500029,27161,Silver,30-Jan-17,30-Jan-17,123061491,1,206.9,91.1,Clothes & Shoes,Clothes,Leisure,Van Damme Women's Reversible Fleece,NL,Van Dammeren International,2995
230100200049,17143,SILVER,26-Jan-17,27-Jan-17,123039828,1,107.2,48.9,Outdoors,Outdoors,Backpacks,Tornado 55 Litre Red Women's Backpack,AU,Toto Outdoor Gear,10692
240800100006,33842,Gold,24-Jan-17,27-Jan-17,123031066,2,507.0,108.0,Sports,Winter Sports,Ski Dress,Men's Ski Jacket Vent Air Pro,NO,Scandinavian Clothing A/S,50
220100100172,27505,Gold,04-Feb-17,04-Feb-17,123087040,1,110.3,55.25,Clothes & Shoes,Clothes,Eclipse Clothing,Big Guy Men's Killer Pants,US,Eclipse Inc,1303
220100100272,51558,Platinum,30-Jan-17,30-Jan-17,123061755,1,22.8,11.5,Clothes & Shoes,Clothes,Eclipse Clothing,Big Guy Men's T-Shirt,US,Eclipse Inc,1303
220100100356,25884,GOLD,03-Mar-17,03-Mar-17,123039158,1,35.2,17.7,Clothes & Shoes,Clothes,Eclipse Clothing,Jogging Pants,US,Eclipse Inc,1303
220100100576,37670,Gold,13-Feb-17,13-Feb-17,123036892,2,107.0,26.85,Clothes & Shoes,Clothes,Eclipse Clothing,Woman's New L/Sleeve Thermal Blouse,US,Eclipse Inc,1303
220100900030,37925,Silver,08-Jan-17,13-Jan-17,123037734,2,48.8,9.8,Clothes & Shoes,Clothes,Osprey,Osprey Men'sTwill Bermuda,US,Triple Sportswear Inc,3664
220101400043,10192,GOLD,05-Feb-17,05-Feb-17,123096685,1,17.9,7.85,Clothes & Shoes,Clothes,Tracker Clothes,Swimming Trunks Planet,US,Dolphin Sportswear Inc,16292


In [None]:
# Handle missing values after JOIN
# Remove duplicates
joined_df = joined_df.dropDuplicates()

In [None]:
# Save the cleaned and joined DataFrame back to Azure Storage

joined_df.write.csv("abfss://refined-data@adlsgen2strgacct.dfs.core.windows.net/cleaned_data.csv", header='True')
