In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, to_date, sum as _sum, countDistinct, count, when, round
)

In [2]:
spark = SparkSession.builder \
    .appName("KPI_Job") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

In [3]:
spark

In [8]:
order_items_df = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("../Data/order_items/order_items_part1.csv")
# order_items_df = order_items_df.limit(1000)

In [9]:
order_items_df.show(10)

+---+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+
| id|order_id|user_id|product_id|   status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|
+---+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+
|  1|       1|   9116|      5585| returned|2025-03-30 17:09:26|2025-03-31 05:09:26|2025-04-02 03:09:26|2025-04-07 03:09:26|     81.64|
|  2|       2|   2897|      9376|delivered|2025-03-29 16:28:59|2025-03-29 18:28:59|2025-04-01 07:28:59|               NULL|    112.97|
|  3|       3|   4173|      7218| returned|2025-03-14 11:47:52|2025-03-15 11:47:52|2025-03-16 14:47:52|2025-03-19 14:47:52|     82.71|
|  4|       4|   9360|      4807|delivered|2025-04-02 19:56:36|2025-04-03 19:56:36|2025-04-06 02:56:36|               NULL|     53.62|
|  5|       5|   6930|      6443|delivered|2025-03-09 0

In [5]:
orders_df = spark.read.option("header", True).csv("../Data/orders/orders_part1.csv", inferSchema=True)

In [10]:
orders_df.show()

+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+
|order_id|user_id|   status|         created_at|        returned_at|         shipped_at|       delivered_at|num_of_item|
+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+
|       1|   9116| returned|2025-03-30 16:14:26|2025-04-03 10:14:26|2025-03-31 06:14:26|2025-04-02 10:14:26|          1|
|       2|   2897|delivered|2025-03-29 15:39:59|               NULL|2025-03-31 13:39:59|2025-04-02 17:39:59|          1|
|       3|   4173| returned|2025-03-14 11:46:52|2025-03-19 04:46:52|2025-03-15 12:46:52|2025-03-18 04:46:52|          1|
|       4|   9360|delivered|2025-04-02 19:08:36|               NULL|2025-04-03 20:08:36|2025-04-05 22:08:36|          1|
|       5|   6930|delivered|2025-03-09 08:02:56|               NULL|2025-03-11 00:02:56|2025-03-12 16:02:56|          3|
|       6|   8130|delivered|2025

In [6]:
products_df = spark.read.option("header", True).csv("../Data/products.csv", inferSchema=True)

In [11]:
products_df.show()

+---+------------+-----+--------------+--------------------+--------+------------+-------------+
| id|         sku| cost|      category|                name|   brand|retail_price|   department|
+---+------------+-----+--------------+--------------------+--------+------------+-------------+
|  1|eHZ-67752426|15.58|        Beauty|Down-sized stable...|  Globex|       53.95|Personal Care|
|  2|jlT-35226003|14.74|        Beauty|Organized nationa...|Umbrella|       53.07|Personal Care|
|  3|WHI-96258902| 8.02|Home & Kitchen|Focused discrete ...|Umbrella|       17.24|         Home|
|  4|nLG-40948677|58.32|   Electronics|Future-proofed re...| Initech|       95.54|         Tech|
|  5|teg-89693441|47.67|      Clothing|Optimized respons...|   Stark|       65.19|      Fashion|
|  6|phO-34055321|20.17|        Sports|Up-sized holistic...|  Globex|       44.19|     Outdoors|
|  7|FeZ-14558143|36.98|        Sports|Fundamental async...|   Stark|       46.15|     Outdoors|
|  8|LVl-60282100|62.35|      

In [13]:
orders_df = orders_df.withColumn("order_date", to_date(col("created_at")))
order_items_df = order_items_df.withColumn("sale_price", col("sale_price").cast("float"))

In [15]:
joined_category_df = (
            order_items_df
            .join(orders_df.select("order_id", "order_date"), on="order_id")
            .join(products_df.select(col("id").alias("product_id"), "category"), on="product_id")
            .withColumn("is_returned", when(col("status") == "returned", 1).otherwise(0))
        )

In [16]:
joined_category_df.show()

+----------+--------+---+-------+---------+-------------------+-------------------+-------------------+-------------------+----------+----------+--------------+-----------+
|product_id|order_id| id|user_id|   status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|order_date|      category|is_returned|
+----------+--------+---+-------+---------+-------------------+-------------------+-------------------+-------------------+----------+----------+--------------+-----------+
|      5585|       1|  1|   9116| returned|2025-03-30 17:09:26|2025-03-31 05:09:26|2025-04-02 03:09:26|2025-04-07 03:09:26|     81.64|2025-03-30|      Clothing|          1|
|      9376|       2|  2|   2897|delivered|2025-03-29 16:28:59|2025-03-29 18:28:59|2025-04-01 07:28:59|               NULL|    112.97|2025-03-29|   Electronics|          0|
|      7218|       3|  3|   4173| returned|2025-03-14 11:47:52|2025-03-15 11:47:52|2025-03-16 14:47:52|2025-03-19 14:47:52|     82.71|2