In [0]:
from pyspark.sql import SparkSession 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, LongType, StringType, DateType, IntegerType

In [0]:
from itertools import chain

In [0]:
import datetime
from datetime import timedelta

In [0]:
spark = SparkSession.builder.appName('FactSales2Dimensionality').getOrCreate()

In [0]:
dbutils.fs.ls("/FileStore/tables/")

Out[5]: [FileInfo(path='dbfs:/FileStore/tables/an_utd_python_s3_us_west_1_amazonaws_com_yelp_labelled.txt', name='an_utd_python_s3_us_west_1_amazonaws_com_yelp_labelled.txt', size=61320, modificationTime=1687812855000),
 FileInfo(path='dbfs:/FileStore/tables/crime_data.csv', name='crime_data.csv', size=1440, modificationTime=1687816551000),
 FileInfo(path='dbfs:/FileStore/tables/fact_sales.csv', name='fact_sales.csv', size=2348, modificationTime=1749790542000),
 FileInfo(path='dbfs:/FileStore/tables/fact_sales_data_v2.csv', name='fact_sales_data_v2.csv', size=3113, modificationTime=1750096783000),
 FileInfo(path='dbfs:/FileStore/tables/students-1.csv', name='students-1.csv', size=94, modificationTime=1749433987000),
 FileInfo(path='dbfs:/FileStore/tables/students-2.csv', name='students-2.csv', size=94, modificationTime=1749437015000),
 FileInfo(path='dbfs:/FileStore/tables/students.csv', name='students.csv', size=94, modificationTime=1749433896000)]

In [0]:
df = spark.read.csv("/FileStore/tables/fact_sales_data_v2.csv", header=True, inferSchema=True)

In [0]:
display(df)

ProductCategory,ProductName,Brand,StoreRegion,StoreName,StoreType,SalesRep,Department,EmployeeRole,UnitsSold,UnitPrice,Discount,SaleDate
Furniture,T-shirt,BrandB,East,StoreX,Franchise,Martha Long,Electronics,Cashier,12.0,-1.0,5.0,2022-12-14
Clothing,Tablet,BrandC,East,StoreZ,Franchise,Martha Long,Home,Sales Associate,,272.49,,2023-02-24
Clothing,Tablet,BrandA,South,StoreX,Retail,Emily Vazquez,Apparel,Cashier,,484.75,15.0,2025-03-24
Electronics,Smartphone,BrandB,West,StoreY,Outlet,Charles Fields,Apparel,Cashier,,205.74,10.0,2023-09-30
Furniture,T-shirt,BrandC,East,StoreZ,Outlet,Wendy Castillo,Home,Manager,46.0,20.25,5.0,2022-10-14
Furniture,T-shirt,BrandC,South,StoreY,Retail,Wendy Castillo,Home,Manager,,361.06,10.0,2024-02-23
Clothing,T-shirt,BrandC,South,StoreY,Outlet,John Harris,Home,Cashier,37.0,492.65,5.0,2024-05-06
Electronics,Smartphone,BrandC,South,StoreX,Outlet,Charles Fields,Home,Sales Associate,37.0,293.87,15.0,2023-04-04
Clothing,Jeans,BrandA,South,StoreY,Retail,Wendy Castillo,Electronics,Manager,23.0,189.47,15.0,2022-12-26
Furniture,T-shirt,BrandB,East,StoreZ,Franchise,Charles Fields,Apparel,Manager,25.0,359.08,10.0,2022-10-28


We will replace null or improper values with either mean or median of the product name or category in case of UnitPrice. In case of UnitsSold we will use mean of UnitsSold in the name or category type. In case of discount, we will use median of discount in the name or category type.

In [0]:
#Compute the most frequent ProductName per (ProductCategory)
product_mode_df = (
    df.filter(F.col("ProductName").isNotNull())
      .groupBy("ProductCategory", "ProductName")
      .agg(F.count("*").alias("count"))
)

In [0]:
display(product_mode_df)

ProductCategory,ProductName,count
Furniture,T-shirt,5
Clothing,T-shirt,1
Electronics,Desk,2
Furniture,Jeans,1
Electronics,Tablet,1
Electronics,T-shirt,1
Furniture,Smartphone,1
Electronics,Chair,1
Furniture,Desk,2
Clothing,Desk,3


In [0]:
#Assign a Rank by count within each (ProductCategory)
window_spec = Window.partitionBy("ProductCategory").orderBy(F.desc("count"))

In [0]:
most_common_names = (
    product_mode_df
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") == 1)
    .select("ProductCategory", F.col("ProductName").alias("MostCommonProductName"))
)

In [0]:
display(most_common_names)

ProductCategory,MostCommonProductName
Clothing,Desk
Electronics,Desk
Furniture,T-shirt


We can see that the most common ProductName for each ProductCategory may not be intuitive. For example:

"Clothing" → "Desk"

"Electronics" → "Desk"

So we will hardcode the entry and not go with the statistically obtained values (Reason being our dataset is very small so this method apperas more true)

In [0]:
impute_mapping = {
    "Clothing": "T-shirt",
    "Electronics": "Smartphone",
    "Furniture": "Desk"
}

In [0]:
df_imputed = df.withColumn(
    "ProductName",
    F.when(
        F.col("ProductName").isNull(),
        F.coalesce(
            F.create_map([F.lit(x) for x in chain(*impute_mapping.items())])[F.col("ProductCategory")],
            F.lit("Unknown_Product")
        )
    ).otherwise(F.col("ProductName"))
)

In [0]:
display(df_imputed)

ProductCategory,ProductName,Brand,StoreRegion,StoreName,StoreType,SalesRep,Department,EmployeeRole,UnitsSold,UnitPrice,Discount,SaleDate
Furniture,T-shirt,BrandB,East,StoreX,Franchise,Martha Long,Electronics,Cashier,12.0,-1.0,5.0,2022-12-14
Clothing,Tablet,BrandC,East,StoreZ,Franchise,Martha Long,Home,Sales Associate,,272.49,,2023-02-24
Clothing,Tablet,BrandA,South,StoreX,Retail,Emily Vazquez,Apparel,Cashier,,484.75,15.0,2025-03-24
Electronics,Smartphone,BrandB,West,StoreY,Outlet,Charles Fields,Apparel,Cashier,,205.74,10.0,2023-09-30
Furniture,T-shirt,BrandC,East,StoreZ,Outlet,Wendy Castillo,Home,Manager,46.0,20.25,5.0,2022-10-14
Furniture,T-shirt,BrandC,South,StoreY,Retail,Wendy Castillo,Home,Manager,,361.06,10.0,2024-02-23
Clothing,T-shirt,BrandC,South,StoreY,Outlet,John Harris,Home,Cashier,37.0,492.65,5.0,2024-05-06
Electronics,Smartphone,BrandC,South,StoreX,Outlet,Charles Fields,Home,Sales Associate,37.0,293.87,15.0,2023-04-04
Clothing,Jeans,BrandA,South,StoreY,Retail,Wendy Castillo,Electronics,Manager,23.0,189.47,15.0,2022-12-26
Furniture,T-shirt,BrandB,East,StoreZ,Franchise,Charles Fields,Apparel,Manager,25.0,359.08,10.0,2022-10-28


In [0]:
df_imputed.write.format("delta").mode("overwrite").save("/delta/fact_sales_stage1")