## Simple PySpark Case Study


### 1. initialize Spark and load raw data


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *
import os
import datetime

In [28]:
# 1.1 create SparkSession

spark = SparkSession.builder \
    .appName("Insight2Profit Interview Case Study") \
    .getOrCreate()

In [29]:
# 1.2 I will define schema to have control over schema types (not a mandatory step, but a good practice for me)

products_schema = StructType([
    StructField("ProductID", IntegerType(), False), # primary key
    StructField("ProductDesc", StringType(), True),
    StructField("ProductNumber", StringType(), True),
    StructField("MakeFlag", BooleanType(), True),  
    StructField("Color", StringType(), True),
    StructField("SafetyStockLevel", IntegerType(), True),
    StructField("ReorderPoint", IntegerType(), True),
    StructField("StandardCost", DecimalType(12, 2), True),
    StructField("ListPrice", DecimalType(12, 2), True),
    StructField("Size", StringType(), True),
    StructField("SizeUnitMeasureCode", StringType(), True),
    StructField("Weight", DecimalType(10, 2), True),
    StructField("WeightUnitMeasureCode", StringType(), True),
    StructField("ProductCategoryName", StringType(), True),
    StructField("ProductSubCategoryName", StringType(), True)
])

sales_order_header_schema = StructType([
    StructField("SalesOrderID", IntegerType(), False), # primary key
    StructField("OrderDate", DateType(), True),
    StructField("ShipDate", DateType(), True),
    StructField("OnlineOrderFlag", BooleanType(), True),
    StructField("AccountNumber", StringType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("SalesPersonID", IntegerType(), True),
    StructField("Freight", DecimalType(12, 2), True)
])

sales_order_detail_schema = StructType([
    StructField("SalesOrderID", IntegerType(), True),
    StructField("SalesOrderDetailID", IntegerType(), True),
    StructField("OrderQty", IntegerType(), True),
    StructField("ProductID", IntegerType(), True),
    StructField("UnitPrice", DecimalType(12, 2), True),
    StructField("UnitPriceDiscount", DecimalType(12, 2), True)
])

In [30]:
# 1.3 Load data with raw prefix. I am using my local computer as the containers, the raw data are in a folder I named raw

product_path = "C:\\Users\\opeij\\Documents\\Insight2Profit Interview Case Study\\raw\\products.csv"
sales_order_header_path = "C:\\Users\\opeij\\Documents\\Insight2Profit Interview Case Study\\raw\\sales_order_header.csv"
sales_order_detail_path = "C:\\Users\\opeij\\Documents\\Insight2Profit Interview Case Study\\raw\\sales_order_detail.csv"

raw_products = spark.read.csv(product_path, header=True, schema=products_schema, nullValue="")
raw_sales_order_header = spark.read.csv(sales_order_header_path, header=True, schema=sales_order_header_schema, nullValue="")
raw_sales_order_detail = spark.read.csv(sales_order_detail_path, header=True, schema=sales_order_detail_schema, nullValue="")

### 2. Review schemas, cast types, identify keys and store intermediate data



In [31]:
# 2.1 Review Schemas

raw_products.printSchema()
raw_sales_order_header.printSchema()
raw_sales_order_detail.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- ProductDesc: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: boolean (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: decimal(12,2) (nullable = true)
 |-- ListPrice: decimal(12,2) (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- Weight: decimal(10,2) (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- ProductCategoryName: string (nullable = true)
 |-- ProductSubCategoryName: string (nullable = true)

root
 |-- SalesOrderID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- OnlineOrderFlag: boolean (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- SalesPersonID: integer (nullable = 

In [None]:
2.2 # the review of the schemas shows PySpark already enforced the Schema definition I defined in 1.2 above, and no further type cast is required.

raw_products.show(5)
raw_sales_order_header.show(5)
raw_sales_order_detail.show(5)

In [54]:
# 2.3 identify primary and secondary key

print("Primary Keys:")
print("- raw_products: ProductID")
print("- raw_sales_order_header: SalesOrderID")
print("- raw_sales_order_detail: SalesOrderDetailID")
print("")
print("Foreign Keys:")
print("- raw_sales_order_detail.SalesOrderID -- raw_sales_order_header.SalesOrderID")
print("- raw_sales_order_detail.ProductID -- raw_products.ProductID")


Primary Keys:
- raw_products: ProductID
- raw_sales_order_header: SalesOrderID
- raw_sales_order_detail: SalesOrderDetailID

Foreign Keys:
- raw_sales_order_detail.SalesOrderID -- raw_sales_order_header.SalesOrderID
- raw_sales_order_detail.ProductID -- raw_products.ProductID


In [55]:
# 2.4 data quality checks, check for nulls and duplicates in primary key fields

# 2.4.1 clean Products: drop null ProductID, then drop duplicates on ProductID

clean_products = (raw_products
    .filter(col("ProductID").isNotNull())
    .dropDuplicates(["ProductID"])
)

# 2.4.2 clean Sales Order Header: drop null SalesOrderID, then drop duplicates

clean_sales_order_header = (raw_sales_order_header
    .filter(col("SalesOrderID").isNotNull())
    .dropDuplicates(["SalesOrderID"])
)

# 2.4.3 clean Sales Order Detail: drop null SalesOrderDetailID, then drop duplicates

clean_sales_order_detail = (raw_sales_order_detail
    .filter(col("SalesOrderDetailID").isNotNull())
    .dropDuplicates(["SalesOrderDetailID"])
)



In [56]:
# 2.5 persisting cleaned version with store prefix

store_products = clean_products.select(
    col("ProductID"),
    col("ProductDesc"),
    col("ProductNumber"),
    col("MakeFlag"),
    col("Color"),
    col("SafetyStockLevel"),
    col("ReorderPoint"),
    col("StandardCost"),
    col("ListPrice"),
    col("Size"),
    col("SizeUnitMeasureCode"),
    col("Weight"),
    col("WeightUnitMeasureCode"),
    col("ProductCategoryName"),
    col("ProductSubCategoryName")
)

store_sales_order_detail = clean_sales_order_header.select(
    col("SalesOrderID"),
    col("OrderDate"), 
    col("ShipDate"),
    col("OnlineOrderFlag"),
    col("AccountNumber"),
    col("CustomerID"),
    col("SalesPersonID"),
    col("Freight")
)
store_sales_order_header = clean_sales_order_detail.select(
    col("SalesOrderID"),
    col("SalesOrderDetailID"),
    col("OrderQty"),
    col("ProductID"),
    col("UnitPrice"),
    col("UnitPriceDiscount")
)

In [37]:
# 2.6 I am going to write the store tables into my processed folder on my local computer (silver layer) as csv file

processed_dir = "C:/Users/opeij/Documents/Insight2Profit Interview Case Study/processed"

store_products.write.mode("overwrite").csv(processed_dir)
store_sales_order_header.write.mode("overwrite").csv(processed_dir)
store_sales_order_detail.write.mode("overwrite").csv(processed_dir)


### 3. Product master transformation to publish_product

In [57]:
# replace NULL values in Color field with 'N/A'
# enhance ProductCategoryName when NULL

publish_product = store_products.select(
    col("ProductID"),
    col("ProductDesc"),
    col("ProductNumber"),
    col("MakeFlag"),
    # replace NULL Color with 'N/A'
    when(col("Color").isNull() | (col("Color") == ""), lit("N/A")).otherwise(col("Color")).alias("Color"),
    col("SafetyStockLevel"),
    col("ReorderPoint"),
    col("StandardCost"),
    col("ListPrice"),
    col("Size"),
    col("SizeUnitMeasureCode"),
    col("Weight"),
    col("WeightUnitMeasureCode"),
    # enhanced ProductCategoryName logic
    when(col("ProductCategoryName").isNull() | (col("ProductCategoryName") == ""), 
         when(col("ProductSubCategoryName").isin(['Gloves', 'Shorts', 'Socks', 'Tights', 'Vests']), 
              lit("Clothing"))
         .when(col("ProductSubCategoryName").isin(['Locks', 'Lights', 'Headsets', 'Helmets', 'Pedals', 'Pumps']), 
               lit("Accessories"))
         .when(col("ProductSubCategoryName").contains("Frames") | 
               col("ProductSubCategoryName").isin(['Wheels', 'Saddles']), 
               lit("Components"))
         .otherwise(col("ProductCategoryName"))
    ).otherwise(col("ProductCategoryName")).alias("ProductCategoryName"),
    col("ProductSubCategoryName")
)

In [58]:
# to confirm if there are still nulls in ProductCategoryName

publish_product \
  .filter(col("ProductCategoryName").isNull()) \
  .select(
    col("ProductID"),
    col("ProductCategoryName"),
    col("ProductSubcategoryName")
  ) \
  .show(100)


# there are still nulls in the ProductCategoryName that aren't accounted for by the ProductSubCategory logic provided.
# For the sake of this project, I will leave them as it is, however, further steps like clarifying if caps, Jerseys, and Bib-shorts qualify as clothings
# can be taken to further enrich the data, and at the end if they still can't be determined, we can replace the nulls with N/A


+---------+-------------------+----------------------+
|ProductID|ProductCategoryName|ProductSubcategoryName|
+---------+-------------------+----------------------+
|      712|               NULL|                  Caps|
|      713|               NULL|               Jerseys|
|      714|               NULL|               Jerseys|
|      715|               NULL|               Jerseys|
|      716|               NULL|               Jerseys|
|      802|               NULL|                 Forks|
|      803|               NULL|                 Forks|
|      804|               NULL|                 Forks|
|      842|               NULL|              Panniers|
|      855|               NULL|            Bib-Shorts|
|      856|               NULL|            Bib-Shorts|
|      857|               NULL|            Bib-Shorts|
|      870|               NULL|     Bottles and Cages|
|      871|               NULL|     Bottles and Cages|
|      872|               NULL|     Bottles and Cages|
|      873

In [40]:
# write the publish_product to my final (gold layer) folder on my local computer as publish_product.csv

final_dir = "C:/Users/opeij/Documents/Insight2Profit Interview Case Study/final"
publish_product.write.mode("overwrite").csv(final_dir)

### 4. Transform Sales Orders to publish_orders

In [50]:
# 4.1  join SalesOrderDetail with SalesOrderHeader

joined_orders = store_sales_order_detail.join(
    store_sales_order_header,
    "SalesOrderID",
    "inner"
)

# 4.2 apply transformations

publish_orders = joined_orders.select(
    # all fields from SalesOrderDetail
    col("SalesOrderID"),
    col("SalesOrderDetailID"),
    col("OrderQty"),
    col("ProductID"),
    col("UnitPrice"),
    col("UnitPriceDiscount"),
    
    # all fields from SalesOrderHeader except SalesOrderID (already included)
    col("OrderDate"),
    col("ShipDate"),
    col("OnlineOrderFlag"),
    col("AccountNumber"),
    col("CustomerID"),
    col("SalesPersonID"),
    col("Freight").alias("TotalOrderFreight"),  # Renamed as required
    
    # calculated fields
    # 4.2.1 LeadTimeInBusinessDays (simplified calculation using spark built-in array and date functions)
    when(col("OrderDate").isNull() | col("ShipDate").isNull(), None)
    .otherwise(expr("size(filter(sequence(OrderDate, ShipDate, interval 1 day), d -> dayofweek(d) NOT IN (1,7))) - 1"))
    .alias("LeadTimeInBusinessDays"),
    
    # 4.2.2 TotalLineExtendedPrice
    (col("OrderQty") * (col("UnitPrice") - col("UnitPriceDiscount"))).alias("TotalLineExtendedPrice")
)

In [None]:
# 4.3 write publish_orders to my final folder(gold layer on my local pc) as a csv file

final_dir = "C:/Users/opeij/Documents/Insight2Profit Interview Case Study/final"
publish_orders.write.mode("overwrite").csv(final_dir)

### 5. Analysis Questions
5.1 Which color generated the highest revenue?

In [52]:
# Revenue per line already exists in publish_orders as TotalLineExtendedPrice
# Join orders and products to bring in Color and OrderDate

orders_with_color = (publish_orders.alias("o")
    .join(publish_product.select("ProductID","Color"), on="ProductID")
)

# Compute revenue by year and color

revenue_by_color = (orders_with_color
    .groupBy(year(col("OrderDate")).alias("Year"), col("Color"))
    .agg(round(sum("TotalLineExtendedPrice"), 2).alias("Revenue"))
)

window_spec = Window.partitionBy("Year").orderBy(col("Revenue").desc())

top_color_each_year = (revenue_by_color
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .select("Year", "Color", "Revenue")
)

formatted_top = top_color_each_year.withColumn(
    "Revenue",
    format_number("Revenue", 2)  # adds commas, two decimals
)

formatted_top.show(truncate=False)

+----+------+-------------+
|Year|Color |Revenue      |
+----+------+-------------+
|2021|Red   |6,019,613.15 |
|2022|Black |14,005,216.19|
|2023|Black |15,047,626.43|
|2024|Yellow|6,480,720.07 |
+----+------+-------------+



5.2 What is the average LeadTimeInBusinessDays by ProductCategoryName?

In [53]:
# Join orders to products to get ProductCategoryName
orders_with_category = (publish_orders.alias("o")
    .join(publish_product.select("ProductID","ProductCategoryName"), on="ProductID")
)

# Compute average
avg_leadtime = (orders_with_category
    .filter(
    col("LeadTimeInBusinessDays").isNotNull() & 
    col("ProductCategoryName").isNotNull()
    )
    .groupBy("ProductCategoryName")
    .agg(round(avg("LeadTimeInBusinessDays"),2).alias("AvgLeadTimeDays"))
    .orderBy("ProductCategoryName")
)

avg_leadtime.show()

+-------------------+---------------+
|ProductCategoryName|AvgLeadTimeDays|
+-------------------+---------------+
|        Accessories|            4.7|
|              Bikes|           4.67|
|           Clothing|           4.71|
|         Components|           4.67|
+-------------------+---------------+

