In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import(col, to_date, sum as _sum, avg,
    countDistinct, count, when, round
)



In [None]:
spark = SparkSession.builder\
                    .appName('E_kpis')\
                    .config("spark.executor.memory", "4g")\
                    .config("spark.driver.memory", "4g")\
                    .config("spark.executor.cores", "2")\
                    .getOrCreate()

In [None]:
order_items_df1 = spark.read.option('header', 'true')\
                      .option('inferSchema', 'true')\
                      .csv('/content/order_items_part1.csv')
order_items_df1.show(10)

+---+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+
| id|order_id|user_id|product_id|   status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|
+---+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+
|  1|       1|   9116|      5585| returned|2025-03-30 17:09:26|2025-03-31 05:09:26|2025-04-02 03:09:26|2025-04-07 03:09:26|     81.64|
|  2|       2|   2897|      9376|delivered|2025-03-29 16:28:59|2025-03-29 18:28:59|2025-04-01 07:28:59|               NULL|    112.97|
|  3|       3|   4173|      7218| returned|2025-03-14 11:47:52|2025-03-15 11:47:52|2025-03-16 14:47:52|2025-03-19 14:47:52|     82.71|
|  4|       4|   9360|      4807|delivered|2025-04-02 19:56:36|2025-04-03 19:56:36|2025-04-06 02:56:36|               NULL|     53.62|
|  5|       5|   6930|      6443|delivered|2025-03-09 0

In [None]:
order_items_df = spark.read.csv('/content/order_items_part1.csv', header=True, inferSchema=True)

In [None]:
order_items_df.describe().show(10)
# order_items_df.show(10)

+-------+------------------+------------------+-----------------+------------------+---------+------------------+
|summary|                id|          order_id|          user_id|        product_id|   status|        sale_price|
+-------+------------------+------------------+-----------------+------------------+---------+------------------+
|  count|              1500|              1500|             1500|              1500|     1500|              1500|
|   mean|             750.5|264.78133333333335|5443.942666666667| 4963.900666666666|     NULL| 78.61090666666671|
| stddev|433.15701541127095|151.22896653232306|2516.201102596006|2896.5809296803145|     NULL|31.339009915326574|
|    min|                 1|                 1|             1004|                 1|delivered|              9.45|
|    max|              1500|               524|             9981|              9996| returned|            163.58|
+-------+------------------+------------------+-----------------+------------------+----

In [None]:
order_df = spark.read.csv('/content/orders_part1.csv', header=True, inferSchema=True)

In [None]:
order_df.show(10)

+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+
|order_id|user_id|   status|         created_at|        returned_at|         shipped_at|       delivered_at|num_of_item|
+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+
|       1|   9116| returned|2025-03-30 16:14:26|2025-04-03 10:14:26|2025-03-31 06:14:26|2025-04-02 10:14:26|          1|
|       2|   2897|delivered|2025-03-29 15:39:59|               NULL|2025-03-31 13:39:59|2025-04-02 17:39:59|          1|
|       3|   4173| returned|2025-03-14 11:46:52|2025-03-19 04:46:52|2025-03-15 12:46:52|2025-03-18 04:46:52|          1|
|       4|   9360|delivered|2025-04-02 19:08:36|               NULL|2025-04-03 20:08:36|2025-04-05 22:08:36|          1|
|       5|   6930|delivered|2025-03-09 08:02:56|               NULL|2025-03-11 00:02:56|2025-03-12 16:02:56|          3|
|       6|   8130|delivered|2025

In [None]:
product_df = spark.read.csv('/content/products.csv', header=True, inferSchema=True)

In [None]:
product_df.show(10)

+---+------------+-----+--------------+--------------------+--------+------------+-------------+
| id|         sku| cost|      category|                name|   brand|retail_price|   department|
+---+------------+-----+--------------+--------------------+--------+------------+-------------+
|  1|eHZ-67752426|15.58|        Beauty|Down-sized stable...|  Globex|       53.95|Personal Care|
|  2|jlT-35226003|14.74|        Beauty|Organized nationa...|Umbrella|       53.07|Personal Care|
|  3|WHI-96258902| 8.02|Home & Kitchen|Focused discrete ...|Umbrella|       17.24|         Home|
|  4|nLG-40948677|58.32|   Electronics|Future-proofed re...| Initech|       95.54|         Tech|
|  5|teg-89693441|47.67|      Clothing|Optimized respons...|   Stark|       65.19|      Fashion|
|  6|phO-34055321|20.17|        Sports|Up-sized holistic...|  Globex|       44.19|     Outdoors|
|  7|FeZ-14558143|36.98|        Sports|Fundamental async...|   Stark|       46.15|     Outdoors|
|  8|LVl-60282100|62.35|      

In [None]:
order_df = order_df.withColumn('order_date', to_date(col('created_at')))
order_items_df = order_items_df.withColumn("sale_price", col("sale_price").cast("float"))
order_df.show(10)

+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+----------+
|order_id|user_id|   status|         created_at|        returned_at|         shipped_at|       delivered_at|num_of_item|order_date|
+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+----------+
|       1|   9116| returned|2025-03-30 16:14:26|2025-04-03 10:14:26|2025-03-31 06:14:26|2025-04-02 10:14:26|          1|2025-03-30|
|       2|   2897|delivered|2025-03-29 15:39:59|               NULL|2025-03-31 13:39:59|2025-04-02 17:39:59|          1|2025-03-29|
|       3|   4173| returned|2025-03-14 11:46:52|2025-03-19 04:46:52|2025-03-15 12:46:52|2025-03-18 04:46:52|          1|2025-03-14|
|       4|   9360|delivered|2025-04-02 19:08:36|               NULL|2025-04-03 20:08:36|2025-04-05 22:08:36|          1|2025-04-02|
|       5|   6930|delivered|2025-03-09 08:02:56|               NULL|2025-03-

In [None]:
joint_category_df = (
            order_items_df
            .join(order_df.select("order_id", "order_date"), on="order_id")
            .join(product_df.select(col("id").alias("product_id"), "category"), on="product_id")
            .withColumn("is_returned", when(col("status") == "returned", 1).otherwise(0))
        )

In [None]:
joint_category_df.show()

+----------+--------+---+-------+---------+-------------------+-------------------+-------------------+-------------------+----------+----------+--------------+-----------+
|product_id|order_id| id|user_id|   status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|order_date|      category|is_returned|
+----------+--------+---+-------+---------+-------------------+-------------------+-------------------+-------------------+----------+----------+--------------+-----------+
|      5585|       1|  1|   9116| returned|2025-03-30 17:09:26|2025-03-31 05:09:26|2025-04-02 03:09:26|2025-04-07 03:09:26|     81.64|2025-03-30|      Clothing|          1|
|      9376|       2|  2|   2897|delivered|2025-03-29 16:28:59|2025-03-29 18:28:59|2025-04-01 07:28:59|               NULL|    112.97|2025-03-29|   Electronics|          0|
|      7218|       3|  3|   4173| returned|2025-03-14 11:47:52|2025-03-15 11:47:52|2025-03-16 14:47:52|2025-03-19 14:47:52|     82.71|2

## First Kpi

In [None]:
category_kpi_df = (
                  joint_category_df.groupBy("category", "order_date")\
                  .agg(
                      round(_sum("sale_price"), 2).alias("daily_revenue"),
                      round(_sum("sale_price") / countDistinct("order_id"), 2).alias("avg_order_vslue"),
                      round(_sum("is_returned") / countDistinct("order_id"), 4).alias("avg_return_rate")
                  ))

In [None]:
category_kpi_df.show()

+--------------+----------+-------------+---------------+---------------+
|      category|order_date|daily_revenue|avg_order_vslue|avg_return_rate|
+--------------+----------+-------------+---------------+---------------+
|          Toys|2025-04-06|       413.42|          59.06|         0.1429|
|Home & Kitchen|2025-03-30|       220.37|          55.09|            0.5|
|          Toys|2025-03-21|       575.94|          95.99|            0.0|
|   Electronics|2025-03-09|      1187.99|           99.0|         0.0833|
|      Clothing|2025-03-11|       359.25|          71.85|            0.2|
|   Electronics|2025-03-17|       375.24|          93.81|            0.0|
|        Sports|2025-03-19|       715.61|          71.56|            0.2|
|      Clothing|2025-03-15|       410.53|          82.11|            0.2|
|Home & Kitchen|2025-03-15|       784.42|         112.06|         0.1429|
|        Sports|2025-04-01|       444.26|          74.04|         0.3333|
|      Clothing|2025-04-04|       315.

In [None]:
category_kpi_df.count()

210

## Second kpi


In [None]:
joint_order_df = (
                  order_items_df
                  .join(order_df.select("order_id", "order_date"), on="order_id")
                  .withColumn("is_returned", when(col("status") == "returned", 1).otherwise(0))
)

In [None]:
order_kpi_df = (
    joint_order_df.groupBy("order_date")\
    .agg(
        countDistinct("order_id").alias("total_orders"),
        round(_sum("sale_price"), 2).alias("total_revenue"),
        count("id").alias("total_items_sold"),
        round(_sum("is_returned") / countDistinct("order_id"), 4).alias("return_rate"),
        countDistinct("user_id").alias("unique_customers")
    )
    )


In [35]:
order_kpi_df.show()

+----------+------------+-------------+----------------+-----------+----------------+
|order_date|total_orders|total_revenue|total_items_sold|return_rate|unique_customers|
+----------+------------+-------------+----------------+-----------+----------------+
|2025-03-23|          16|      3712.99|              47|     0.4375|              16|
|2025-03-16|          13|      2333.34|              32|     0.4615|              13|
|2025-03-08|          21|      4351.86|              55|      0.381|              21|
|2025-03-12|          18|      3879.97|              50|     0.6111|              18|
|2025-03-13|          13|       2777.5|              37|     0.6154|              13|
|2025-03-31|          14|      3981.04|              48|     1.0714|              14|
|2025-03-19|          18|      4229.11|              54|     0.2778|              18|
|2025-04-05|          19|      4322.24|              60|     0.4737|              19|
|2025-03-10|          11|      2539.75|              3

In [36]:
order_kpi_df.count()

30