Creation of raw and enriched tables loading each datasets

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
.appName("ReadExcelWithHeader") \
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5") \
.getOrCreate()

# Define the path to your Excel file
excel_file_path = "/FileStore/tables/Customer.xlsx"
df_customer_raw_enriched_table = spark.read \
.format("com.crealytics.spark.excel") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(excel_file_path)

# Remove spaces from column names
for col_name in df_customer_raw_enriched_table.columns:
    new_col_name = col_name.replace(" ", "_")  # Replace spaces with underscores
    # new_col_name = col_name.replace(" ", "")  # Remove spaces altogether
    df_customer_raw_enriched_table = df_customer_raw_enriched_table.withColumnRenamed(col_name, new_col_name)

df_customer_raw_enriched_table.write.format('delta').saveAsTable('df_customer_raw_enriched_table')

In [0]:
df_order_raw_enriched_table=spark.read.format("json").option("multiLine", True).load("/FileStore/tables/Order.json")
for col_name in df_order_raw_enriched_table.columns:
    new_col_name = col_name.replace(" ", "_")  # Replace spaces with underscores
    # new_col_name = col_name.replace(" ", "")  # Remove spaces altogether
    df_order_raw_enriched_table = df_order_raw_enriched_table.withColumnRenamed(col_name, new_col_name)

df_order_raw_enriched_table.write.format('delta').saveAsTable('df_order_raw_enriched_table')

In [0]:
# File location and type
file_location = "/FileStore/tables/Product.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_product_raw_enriched_table = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

for col_name in df_product_raw_enriched_table.columns:
    new_col_name = col_name.replace(" ", "_")  # Replace spaces with underscores
    # new_col_name = col_name.replace(" ", "")  # Remove spaces altogether
    df_product_raw_enriched_table = df_product_raw_enriched_table.withColumnRenamed(col_name, new_col_name)

df_product_raw_enriched_table.write.format('delta').saveAsTable('df_product_raw_enriched_table')

Creation of an enriched table which has
a)order information 
  i)Profit rounded to 2 decimal places
b)Customer name and country
c)Product category and sub category

In [0]:
# Performing the left joins and selecting necessary columns
order_cust_product_enriched_table = df_order_raw_enriched_table.alias("ord") \
    .join(df_customer_raw_enriched_table.alias("cus"), "Customer_ID", "left") \
    .join(df_product_raw_enriched_table.select("Product_ID", "Category", "`Sub-Category`").distinct().alias("prd"), "Product_ID", "left") \
    .selectExpr("ord.Order_ID", "ord.Order_Date", "ord.Row_ID as order_row_id", "ord.Customer_ID", 
                "cus.Customer_Name", "cus.Country", "prd.Product_ID", "prd.Category", "prd.`Sub-Category`",
                "round(ord.Profit, 2) as profit")
#since same product_id can be available on multiple regions and there can be duplicacy; Hence taking distinct values for Product datasets
# Creating the table using Delta format
order_cust_product_enriched_table.write.format("delta").saveAsTable("order_cust_product_enriched_table")

Create an aggregate table that shows profit by 
Year,
Product Category,
Product Sub Category,
Customer


In [0]:
#for this solution I have referred the previously created enriched table: order_cust_product_enriched_table

In [0]:
from pyspark.sql.functions import split, round, sum
enriched_table = spark.read.format("delta").table("order_cust_product_enriched_table")

# Split Order_Date to extract year
enriched_table = enriched_table.withColumn("Year", split("Order_Date", "/")[2])

# Group by Year, Product_Category, Product_Sub_Category, and Customer_ID and compute total profit
df_profit_by_year_prd_cat_agg = enriched_table \
    .groupBy("Year", "Category", "`Sub-Category`", "Customer_ID") \
    .agg(round(sum("profit"), 2).alias("Total_Profit"))

# Create the table using Delta format
df_profit_by_year_prd_cat_agg.write.format("delta").saveAsTable("df_profit_by_year_prd_cat_agg")


Using SQL output the following aggregates
Profit by Year,
Profit by Year + Product Category,
Profit by Customer,
Profit by Customer + Year


In [0]:
# Previously created source data table has been referred for these results

In [0]:
%sql
select SPLIT(Order_Date, "/")[2] as Year, round(sum(profit), 2) as Total_Profit from 
(select Order_Date, round(profit, 2) as profit from df_order_raw_enriched_table)
group by SPLIT(Order_Date, "/")[2]

Year,Total_Profit
2016,65073.58
2017,111085.31
2014,39185.89
2015,63073.3


In [0]:
#The Master Values are matching with the source dataset (above and below cell data). Since Enriched master table (i.e. order_cust_product_enriched_table) values are rounded to 2 decimal for each entry while creation, hence for the validation before aggregating from source dataset it is rounded to 2 decimal place.

In [0]:
%sql
select SPLIT(Order_Date, "/")[2] as Year, round(sum(profit), 2) as Total_Profit from
order_cust_product_enriched_table
group by SPLIT(Order_Date, "/")[2]

Year,Total_Profit
2016,65073.58
2017,111085.31
2014,39185.89
2015,63073.3


In [0]:
%sql
select SPLIT(ord.Order_Date, "/")[2] as Year, prd.Category as Product_Category, round(sum(ord.profit), 2) as Total_Profit 
from (select Order_Date, Product_ID, round(profit, 2) as profit from df_order_raw_enriched_table) ord
left join (select distinct Product_ID, Category from df_product_raw_enriched_table) prd 
--since same product_id can be available on multiple regions and there can be duplicacy, hence taking unique values while joining
on ord.Product_ID = prd.Product_ID
group by SPLIT(ord.Order_Date, "/")[2], prd.Category
-- in the given datasets some order's product_id aren't available in the product tables, hence some null value on product category can be observed
-- we can add null removal filter if that is required.

Year,Product_Category,Total_Profit
2016,,404.44
2017,,488.51
2014,,523.13
2016,Office Supplies,34555.74
2016,Furniture,6889.56
2015,Technology,34943.43
2015,Furniture,3027.2
2014,Office Supplies,22500.43
2016,Technology,23223.84
2015,,583.16


In [0]:
#The Master table Values are matching with the source dataset (above and below cell data). Since Enriched master table (i.e. order_cust_product_enriched_table) values are rounded to 2 decimal for each entry while creation, hence for the validation before aggregating from source dataset it is rounded to 2 decimal place.

In [0]:
%sql
select SPLIT(Order_Date, "/")[2] as Year, Category as Product_Category, round(sum(profit), 2) as Total_Profit from order_cust_product_enriched_table
group by SPLIT(Order_Date, "/")[2], Category

Year,Product_Category,Total_Profit
2016,,404.44
2017,,488.51
2014,,523.13
2016,Office Supplies,34555.74
2016,Furniture,6889.56
2015,Technology,34943.43
2015,Furniture,3027.2
2014,Office Supplies,22500.43
2016,Technology,23223.84
2015,,583.16


In [0]:
%sql
select Customer_ID, round(sum(profit), 2) as Total_Profit from
(select Customer_ID, round(profit,2) as profit from df_order_raw_enriched_table) ord
group by Customer_ID

Customer_ID,Total_Profit
VW-21775,-874.66
RR-19315,-73.83
PB-19210,21.9
MS-17530,84.02
EM-13960,102.3
MY-17380,319.14
KH-16630,727.51
BD-11500,1142.12
SW-20275,332.76
AH-10690,1296.89


In [0]:
#The Master table Values are matching with the source dataset (above and below cell data). Since Enriched master table (i.e. order_cust_product_enriched_table) values are rounded to 2 decimal for each entry while creation, hence for the validation before aggregating from source dataset it is rounded to 2 decimal place.

In [0]:
%sql
select Customer_ID, round(sum(profit), 2) as Total_Profit from
(select Customer_ID, round(profit,2) as profit from order_cust_product_enriched_table) ord
group by Customer_ID

Customer_ID,Total_Profit
VW-21775,-874.66
RR-19315,-73.83
PB-19210,21.9
MS-17530,84.02
EM-13960,102.3
MY-17380,319.14
KH-16630,727.51
BD-11500,1142.12
SW-20275,332.76
AH-10690,1296.89


In [0]:
%sql
select Customer_ID, Split(Order_Date, "/")[2] as Year, round(sum(ord.profit), 2) as Total_Profit 
from (select Customer_ID, Order_Date, round(profit,2) as profit from df_order_raw_enriched_table) ord
group by Customer_ID, Split(Order_Date, "/")[2]

Customer_ID,Year,Total_Profit
SC-20380,2017,270.29
CK-12325,2014,326.42
JH-15910,2016,63.17
JF-15355,2015,161.11
AS-10240,2016,23.18
ED-13885,2016,185.14
BP-11230,2014,27.56
CV-12805,2016,-35.39
TH-21235,2014,-2.93
EH-13945,2015,10.62


In [0]:
#The Master table Values are matching with the source dataset (above and below cell data). Since Enriched master table (i.e. order_cust_product_enriched_table) values are rounded to 2 decimal for each entry while creation, hence for the validation before aggregating from source dataset it is rounded to 2 decimal place.

In [0]:
%sql
select Customer_ID, Split(Order_Date, "/")[2] as Year, round(sum(ord.profit), 2) as Total_Profit from order_cust_product_enriched_table ord
group by Customer_ID, Split(Order_Date, "/")[2]

Customer_ID,Year,Total_Profit
SC-20380,2017,270.29
CK-12325,2014,326.42
JH-15910,2016,63.17
JF-15355,2015,161.11
AS-10240,2016,23.18
ED-13885,2016,185.14
BP-11230,2014,27.56
CV-12805,2016,-35.39
TH-21235,2014,-2.93
EH-13945,2015,10.62
