## Installing importing the necessary packages

In [3]:
# Install pyspark
!pip install pyspark



In [4]:
# Importing packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, DateType, TimestampType, BooleanType, DecimalType
from pyspark.sql.window import Window

### Initialize Spark Session

In [5]:
spark = SparkSession.builder.appName("e_commerce_shop").getOrCreate()

### Loading data

In [33]:
from re import S
# Defining custom schemas for the data

# Products Schema
products_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("sku", StringType(), True),
    StructField("cost", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("name", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("retail_price", FloatType(), True),
    StructField("department", StringType(), True)
])

# Orders Schema
orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("returned_at", TimestampType(), True),
    StructField("shipped_at", TimestampType(), True),
    StructField("delivered_at", TimestampType(), True),
    StructField("num_of_item", IntegerType(), True)
])

# Order Items Schema
order_items_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("shipped_at", TimestampType(), True),
    StructField("delivered_at", TimestampType(), True),
    StructField("returned_at", TimestampType(), True),
    StructField("sale_price", FloatType(), True)
])

In [42]:
# Loading the data
products_df = spark.read.csv("drive/MyDrive/Colab Notebooks/e_commerce_data/products.csv", header=True, schema=products_schema)
orders_df = spark.read.csv("drive/MyDrive/Colab Notebooks/e_commerce_data/orders/", header=True, schema=orders_schema)
order_items_df = spark.read.csv("drive/MyDrive/Colab Notebooks/e_commerce_data/order_items/", header=True, schema=order_items_schema)

In [43]:
# Confirming Schemas
print("Products Schema")
products_df.printSchema()
print("\nOrders Schema")
orders_df.printSchema()
print("\nOrder Items Schema")
order_items_df.printSchema()

Products Schema
root
 |-- id: integer (nullable = true)
 |-- sku: string (nullable = true)
 |-- cost: float (nullable = true)
 |-- category: string (nullable = true)
 |-- name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- retail_price: float (nullable = true)
 |-- department: string (nullable = true)


Orders Schema
root
 |-- order_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- returned_at: timestamp (nullable = true)
 |-- shipped_at: timestamp (nullable = true)
 |-- delivered_at: timestamp (nullable = true)
 |-- num_of_item: integer (nullable = true)


Order Items Schema
root
 |-- id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- shipped_at: timestamp (nullable = tru

In [44]:
# Displaying sample data from the data loaded
print("Products Data")
products_df.describe().show()
products_df.show(5)

print("\nOrders Data")
orders_df.describe().show()
orders_df.show(5)

print("\nOrder Items Data")
order_items_df.describe().show()
order_items_df.show(5)

Products Data
+-------+------------------+------------+------------------+--------+--------------------+-----+------------------+----------+
|summary|                id|         sku|              cost|category|                name|brand|      retail_price|department|
+-------+------------------+------------+------------------+--------+--------------------+-----+------------------+----------+
|  count|             10000|       10000|             10000|   10000|               10000| 9895|             10000|     10000|
|   mean|            5000.5|        NULL| 52.33561098270416|    NULL|                NULL| NULL| 79.76773598585129|      NULL|
| stddev|2886.8956799071675|        NULL|27.368669487259126|    NULL|                NULL| NULL|30.303686789476078|      NULL|
|    min|                 1|AAB-46471957|               5.0|  Beauty|Adaptive 24hour c...| Acme|             10.87|   Fashion|
|    max|             10000|zzs-44769856|             100.0|    Toys|Visionary zero-de...|Wonka| 

In [45]:
order_items_df.filter(col("order_id") == 1).show()

+---+--------+-------+----------+--------+-------------------+-------------------+-------------------+-------------------+----------+
| id|order_id|user_id|product_id|  status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|
+---+--------+-------+----------+--------+-------------------+-------------------+-------------------+-------------------+----------+
|  1|       1|   9116|      5585|returned|2025-03-30 17:09:26|2025-03-31 05:09:26|2025-04-02 03:09:26|2025-04-07 03:09:26|     81.64|
+---+--------+-------+----------+--------+-------------------+-------------------+-------------------+-------------------+----------+



## KPI'S Computation

### Category-Level KPIs (Per Category, Per Day)

In [61]:
# Step 1: Join order_items_df with orders_df to get order details
order_items_with_orders = order_items_df.alias("oi").join(
    orders_df.alias("o"),
    col("oi.order_id") == col("o.order_id"),
    "inner"
)

# Step 2: Join with products_df to get the category
order_items_with_products = order_items_with_orders.join(
    products_df.alias("p"),
    col("oi.product_id") == col("p.id"),
    "inner"
)

# Step 3: Extract the order date (without time) and compute KPIs
category_kpis = (order_items_with_products
    .withColumn("order_date", to_date(col("oi.created_at")))  # Use the alias "oi"
    .groupBy("p.category", "order_date")
    .agg(
        # Daily revenue: Sum of sale_price for the category on that day
        round(sum("oi.sale_price"), 2).alias("daily_revenue"),

        # Avg order value: Average sale_price per order in the category
        round(avg("oi.sale_price"), 2).alias("avg_order_value"),

        # Avg return rate: Percentage of orders returned
        round(
            (count(when(col("oi.status") == "returned", 1)) / count("*")) * 100, 2
        ).alias("avg_return_rate")
    )
    .orderBy("category", "order_date")
)

# Step 4: Show the results
print("Category-Level KPIs (Per Category, Per Day):")
category_kpis.show(truncate=False)

Category-Level KPIs (Per Category, Per Day):
+--------+----------+-------------+---------------+---------------+
|category|order_date|daily_revenue|avg_order_value|avg_return_rate|
+--------+----------+-------------+---------------+---------------+
|Beauty  |2025-03-08|10540.85     |82.35          |16.41          |
|Beauty  |2025-03-09|12470.12     |75.12          |22.29          |
|Beauty  |2025-03-10|11263.91     |79.89          |20.57          |
|Beauty  |2025-03-11|10350.56     |77.24          |15.67          |
|Beauty  |2025-03-12|9827.83      |80.56          |20.49          |
|Beauty  |2025-03-13|10095.4      |78.87          |13.28          |
|Beauty  |2025-03-14|10087.23     |75.28          |13.43          |
|Beauty  |2025-03-15|8351.46      |76.62          |11.93          |
|Beauty  |2025-03-16|12711.93     |77.99          |17.79          |
|Beauty  |2025-03-17|10997.62     |83.32          |14.39          |
|Beauty  |2025-03-18|12457.01     |83.6           |18.12          |
|Be

In [55]:
order_items_with_products.show()

+-----+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+----+------------+-----+--------------+--------------------+-------+------------+-------------+
|   id|order_id|user_id|product_id|   status|         created_at|         shipped_at|       delivered_at|        returned_at|sale_price|order_id|user_id|   status|         created_at|        returned_at|         shipped_at|       delivered_at|num_of_item|  id|         sku| cost|      category|                name|  brand|retail_price|   department|
+-----+--------+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+----------+--------+-------+---------+-------------------+-------------------+-------------------+-------------------+-----------+----+------------+-----+--------------+----

### Order-Level KPIs (Per Day)

In [62]:
# Step 1: Join orders_df with order_items_df to get sale prices
orders_with_items = orders_df.alias("o").join(
    order_items_df.alias("oi"),
    col("o.order_id") == col("oi.order_id"),
    "inner"
)

# Step 2: Compute Order-Level KPIs
order_kpis = (orders_with_items
    .withColumn("order_date", to_date(col("o.created_at")))  # Extract date from orders.created_at
    .groupBy("order_date")
    .agg(
        # Total orders: Count of unique orders
        countDistinct("o.order_id").alias("total_orders"),

        # Total revenue: Sum of sale_price for the day
        round(sum("oi.sale_price"), 2).alias("total_revenue"),

        # Total items sold: Sum of num_of_item
        sum("o.num_of_item").alias("total_items_sold"),

        # Return rate: Percentage of orders returned
        round(
            (count(when(col("o.status") == "returned", 1)) / countDistinct("o.order_id")) * 100, 2
        ).alias("return_rate"),

        # Unique customers: Count of distinct user_id
        countDistinct("o.user_id").alias("unique_customers")
    )
    .orderBy("order_date")
)

# Step 3: Show the results
print("Order-Level KPIs (Per Day):")
order_kpis.show(truncate=False)

Order-Level KPIs (Per Day):
+----------+------------+-------------+----------------+-----------+----------------+
|order_date|total_orders|total_revenue|total_items_sold|return_rate|unique_customers|
+----------+------------+-------------+----------------+-----------+----------------+
|2025-03-08|286         |69188.47     |3262            |75.17      |281             |
|2025-03-09|311         |77044.39     |3608            |61.41      |306             |
|2025-03-10|281         |65621.51     |2998            |71.17      |280             |
|2025-03-11|285         |68304.72     |3152            |54.74      |279             |
|2025-03-12|276         |63064.54     |2751            |55.8       |271             |
|2025-03-13|289         |66228.69     |2982            |51.21      |285             |
|2025-03-14|292         |68190.83     |3159            |58.56      |286             |
|2025-03-15|292         |73359.79     |3449            |58.9       |282             |
|2025-03-16|315         |7