In [11]:
# Install PySpark in Google Colab
!pip install pyspark

# Import required libraries
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder \
    .appName("RetailPro Analytics") \
    .master("local[*]") \
    .getOrCreate()

# Verify Spark is working
print(f"Spark Version: {spark.version}")
print("Spark session created successfully!")

Spark Version: 3.5.1
Spark session created successfully!


In [12]:
# Download the CSV file from GitHub
!wget https://raw.githubusercontent.com/futurexskill/bigdata/master/ecommerce_orders.csv

# Load the downloaded CSV file into a Spark DataFrame
orders_df = spark.read.csv(
    "ecommerce_orders.csv",
    header=True,
    inferSchema=True
)

# Display the first few rows
orders_df.show(5)

# Show the schema
orders_df.printSchema()

# Count total records
print(f"Total orders: {orders_df.count()}")

--2025-12-02 13:42:39--  https://raw.githubusercontent.com/futurexskill/bigdata/master/ecommerce_orders.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8804 (8.6K) [text/plain]
Saving to: ‘ecommerce_orders.csv.1’


2025-12-02 13:42:39 (18.6 MB/s) - ‘ecommerce_orders.csv.1’ saved [8804/8804]

+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|total_amount|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|  

In [13]:
print(f"Total orders in dataset: {orders_df.count()}")


Total orders in dataset: 100


In [14]:
unique_customers = orders_df.select("customer_id").distinct().count()
print(f"Unique customers: {unique_customers}")

Unique customers: 46


In [15]:
from pyspark.sql.functions import min, max

date_range = orders_df.select(
    min("order_date").alias("earliest_order"),
    max("order_date").alias("latest_order")
)
date_range.show()

+--------------+------------+
|earliest_order|latest_order|
+--------------+------------+
|    2025-01-15|  2025-11-30|
+--------------+------------+



In [16]:
orders_df.groupBy("category").count().orderBy("count", ascending=False).show()


+-----------+-----+
|   category|count|
+-----------+-----+
|Electronics|   44|
|  Furniture|   24|
| Stationery|   16|
|Accessories|   16|
+-----------+-----+



In [17]:
orders_df.groupBy("country").count().orderBy("count", ascending=False).show()


+---------+-----+
|  country|count|
+---------+-----+
|  Germany|   13|
|      USA|   13|
|       UK|   13|
|   Canada|   13|
|   France|   12|
|    India|   12|
|    Japan|   12|
|Australia|   12|
+---------+-----+



In [18]:
from pyspark.sql.functions import col, sum as _sum, when

# Check for nulls in each column
orders_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in orders_df.columns
]).show()

+--------+----------+-----------+------------+--------+--------+----------+----------------+------------+-------+------+--------------+-------------+
|order_id|order_date|customer_id|product_name|category|quantity|unit_price|discount_percent|total_amount|country|status|payment_method|shipping_days|
+--------+----------+-----------+------------+--------+--------+----------+----------------+------------+-------+------+--------------+-------------+
|       0|         0|          0|           0|       0|       0|         0|               0|           0|      0|     0|             0|           19|
+--------+----------+-----------+------------+--------+--------+----------+----------------+------------+-------+------+--------------+-------------+



In [19]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- discount_percent: integer (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- country: string (nullable = true)
 |-- status: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- shipping_days: integer (nullable = true)



In [20]:
total_orders = orders_df.count()
print(f"Total orders: {total_orders}")

Total orders: 100


In [21]:
orders_df.describe().show()

+-------+------------------+------------------+------------+-----------+-----------------+------------------+-----------------+------------------+---------+---------+--------------+------------------+
|summary|          order_id|       customer_id|product_name|   category|         quantity|        unit_price| discount_percent|      total_amount|  country|   status|payment_method|     shipping_days|
+-------+------------------+------------------+------------+-----------+-----------------+------------------+-----------------+------------------+---------+---------+--------------+------------------+
|  count|               100|               100|         100|        100|              100|               100|              100|               100|      100|      100|           100|                81|
|   mean|            1050.5|           5026.84|        NULL|       NULL|             2.56|177.12999999999988|              9.2|236.62360000000004|     NULL|     NULL|          NULL|  6.54320987654

In [22]:
from pyspark.sql.functions import col

high_value_orders = orders_df.filter(col("total_amount") > 500)
high_value_orders.show(10)


+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|total_amount|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|    1001|2025-01-15|       5023|      LAPTOP|Electronics|       2|    899.99|              10|     1619.98|    USA|Completed|   Credit Card|            5|
|    1007|2025-03-08|       5041|        DESK|  Furniture|       2|    349.99|               0|      699.98|  Japan|Completed|   Credit Card|            8|
|    1013|2025-04-20|       5011|      LAPTOP|Electronics|       1|    899.99|              20|      719.99| France|Completed|        PayPal|            3|
|    1025|2025-07-12|       5018|      LAPTOP|Electronics|      

In [23]:
electronics_high_value = orders_df.filter(
    (col("total_amount") > 500) &
    (col("total_amount") < 1000) &
    (col("category") == "Electronics")
)
electronics_high_value.show()


+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|total_amount|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|    1013|2025-04-20|       5011|      LAPTOP|Electronics|       1|    899.99|              20|      719.99| France|Completed|        PayPal|            3|
|    1025|2025-07-12|       5018|      LAPTOP|Electronics|       1|    899.99|               0|      899.99|    USA|Completed|   Credit Card|            2|
|    1049|2025-03-01|       5019|      LAPTOP|Electronics|       1|    899.99|              10|      809.99|    USA|Completed|   Credit Card|            3|
|    1061|2025-08-06|       5042|      LAPTOP|Electronics|      

In [24]:
sales_report = orders_df.select("order_date", "product_name", "total_amount")
sales_report.show(10)


+----------+------------+------------+
|order_date|product_name|total_amount|
+----------+------------+------------+
|2025-01-15|      LAPTOP|     1619.98|
|2025-01-18|  HEADPHONES|       79.99|
|2025-02-03|    KEYBOARD|      127.47|
|2025-02-10|     MONITOR|      239.99|
|2025-02-14|       MOUSE|      142.45|
|2025-03-02|       CHAIR|      179.99|
|2025-03-08|        DESK|      699.98|
|2025-03-15|        LAMP|      113.97|
|2025-03-22|    NOTEBOOK|        49.9|
|2025-03-25|     PEN_SET|       46.76|
+----------+------------+------------+
only showing top 10 rows



In [25]:
sales_report_renamed = orders_df.select(
    col("order_date").alias("date"),
    col("product_name").alias("product"),
    col("total_amount").alias("revenue")
)
sales_report_renamed.show(10)

+----------+----------+-------+
|      date|   product|revenue|
+----------+----------+-------+
|2025-01-15|    LAPTOP|1619.98|
|2025-01-18|HEADPHONES|  79.99|
|2025-02-03|  KEYBOARD| 127.47|
|2025-02-10|   MONITOR| 239.99|
|2025-02-14|     MOUSE| 142.45|
|2025-03-02|     CHAIR| 179.99|
|2025-03-08|      DESK| 699.98|
|2025-03-15|      LAMP| 113.97|
|2025-03-22|  NOTEBOOK|   49.9|
|2025-03-25|   PEN_SET|  46.76|
+----------+----------+-------+
only showing top 10 rows



In [26]:
# Alternative approach
orders_renamed = orders_df.withColumnRenamed("total_amount", "revenue")
orders_renamed.show(5)

+--------+----------+-----------+------------+-----------+--------+----------+----------------+-------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|revenue|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+-------+-------+---------+--------------+-------------+
|    1001|2025-01-15|       5023|      LAPTOP|Electronics|       2|    899.99|              10|1619.98|    USA|Completed|   Credit Card|            5|
|    1002|2025-01-18|       5012|  HEADPHONES|Electronics|       1|     79.99|               0|  79.99|     UK|Completed|        PayPal|            3|
|    1003|2025-02-03|       5045|    KEYBOARD|Electronics|       3|     49.99|              15| 127.47| Canada|  Shipped|    Debit Card|            7|
|    1004|2025-02-10|       5008|     MONITOR|Electronics|       1|    299.99|              20

In [27]:
top_orders = orders_df.orderBy(col("total_amount").desc())
top_orders.show(10)

+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|total_amount|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|    1037|2025-10-06|       5046|      LAPTOP|Electronics|       2|    899.99|               5|     1709.98| France|Completed|   Credit Card|            4|
|    1073|2025-03-19|       5005|      LAPTOP|Electronics|       2|    899.99|               5|     1709.98|    USA|Completed|   Credit Card|            3|
|    1001|2025-01-15|       5023|      LAPTOP|Electronics|       2|    899.99|              10|     1619.98|    USA|Completed|   Credit Card|            5|
|    1025|2025-07-12|       5018|      LAPTOP|Electronics|      

In [28]:
orders_sorted = orders_df.orderBy(
    col("category"),
    col("total_amount").desc()
)
orders_sorted.show(15)

+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|order_id|order_date|customer_id|product_name|   category|quantity|unit_price|discount_percent|total_amount|country|   status|payment_method|shipping_days|
+--------+----------+-----------+------------+-----------+--------+----------+----------------+------------+-------+---------+--------------+-------------+
|    1035|2025-09-22|       5017|    BACKPACK|Accessories|       2|     59.99|              20|       95.98| Canada|Completed| Bank Transfer|            5|
|    1059|2025-07-09|       5050|    BACKPACK|Accessories|       2|     59.99|              20|       95.98| Canada|Completed| Bank Transfer|            6|
|    1095|2025-05-30|       5032|    BACKPACK|Accessories|       2|     59.99|              20|       95.98|  Japan|Completed| Bank Transfer|           12|
|    1024|2025-07-05|       5044|WATER_BOTTLE|Accessories|      

In [29]:
from pyspark.sql.functions import sum, count, avg

country_revenue = orders_df.groupBy("country") \
    .agg(sum("total_amount").alias("total_revenue")) \
    .orderBy(col("total_revenue").desc())

country_revenue.show()

+---------+------------------+
|  country|     total_revenue|
+---------+------------------+
|      USA| 6463.469999999999|
|   France|4647.0199999999995|
|   Canada|3101.2599999999998|
|    Japan|           2345.32|
|    India|           2129.65|
|       UK|           1678.76|
|  Germany|1670.7900000000002|
|Australia|           1626.09|
+---------+------------------+



In [30]:
# Code - Multiple aggregations
country_stats = orders_df.groupBy("country").agg(
    count("order_id").alias("total_orders"),
    sum("total_amount").alias("total_revenue"),
    avg("total_amount").alias("avg_order_value")
).orderBy(col("total_revenue").desc())

country_stats.show()

+---------+------------+------------------+------------------+
|  country|total_orders|     total_revenue|   avg_order_value|
+---------+------------+------------------+------------------+
|      USA|          13| 6463.469999999999|497.18999999999994|
|   France|          12|4647.0199999999995| 387.2516666666666|
|   Canada|          13|3101.2599999999998|238.55846153846153|
|    Japan|          12|           2345.32|195.44333333333336|
|    India|          12|           2129.65|177.47083333333333|
|       UK|          13|           1678.76|129.13538461538462|
|  Germany|          13|1670.7900000000002|128.52230769230772|
|Australia|          12|           1626.09|          135.5075|
+---------+------------+------------------+------------------+



In [31]:
from pyspark.sql.functions import round as spark_round

category_performance = orders_df.groupBy("category").agg(
    count("order_id").alias("total_orders"),
    sum("total_amount").alias("total_revenue"),
    spark_round(avg("total_amount"), 2).alias("avg_order_value")
).orderBy(col("total_revenue").desc())

category_performance.show()

+-----------+------------+-----------------+---------------+
|   category|total_orders|    total_revenue|avg_order_value|
+-----------+------------+-----------------+---------------+
|Electronics|          44|15439.69999999999|          350.9|
|  Furniture|          24|6648.569999999999|         277.02|
|Accessories|          16|          1028.64|          64.29|
| Stationery|          16|545.4499999999999|          34.09|
+-----------+------------+-----------------+---------------+



In [32]:
from pyspark.sql.functions import round as spark_round

orders_with_discount = orders_df.withColumn(
    "discount_amount",
    spark_round(col("total_amount") * col("discount_percent") / 100, 2)
)

orders_with_discount.select("product_name", "total_amount", "discount_percent", "discount_amount").show()


+------------+------------+----------------+---------------+
|product_name|total_amount|discount_percent|discount_amount|
+------------+------------+----------------+---------------+
|      LAPTOP|     1619.98|              10|          162.0|
|  HEADPHONES|       79.99|               0|            0.0|
|    KEYBOARD|      127.47|              15|          19.12|
|     MONITOR|      239.99|              20|           48.0|
|       MOUSE|      142.45|               5|           7.12|
|       CHAIR|      179.99|              10|           18.0|
|        DESK|      699.98|               0|            0.0|
|        LAMP|      113.97|               5|            5.7|
|    NOTEBOOK|        49.9|               0|            0.0|
|     PEN_SET|       46.76|              10|           4.68|
|    BACKPACK|       50.99|              15|           7.65|
|WATER_BOTTLE|       59.97|               0|            0.0|
|      LAPTOP|      719.99|              20|          144.0|
|  HEADPHONES|      151.

In [33]:
# Add actual revenue
orders_with_metrics = orders_with_discount.withColumn(
    "actual_revenue",
    spark_round(col("total_amount") - col("discount_amount"), 2)
)

orders_with_metrics.select("product_name", "total_amount", "discount_amount", "actual_revenue").show()


+------------+------------+---------------+--------------+
|product_name|total_amount|discount_amount|actual_revenue|
+------------+------------+---------------+--------------+
|      LAPTOP|     1619.98|          162.0|       1457.98|
|  HEADPHONES|       79.99|            0.0|         79.99|
|    KEYBOARD|      127.47|          19.12|        108.35|
|     MONITOR|      239.99|           48.0|        191.99|
|       MOUSE|      142.45|           7.12|        135.33|
|       CHAIR|      179.99|           18.0|        161.99|
|        DESK|      699.98|            0.0|        699.98|
|        LAMP|      113.97|            5.7|        108.27|
|    NOTEBOOK|        49.9|            0.0|          49.9|
|     PEN_SET|       46.76|           4.68|         42.08|
|    BACKPACK|       50.99|           7.65|         43.34|
|WATER_BOTTLE|       59.97|            0.0|         59.97|
|      LAPTOP|      719.99|          144.0|        575.99|
|  HEADPHONES|      151.98|            7.6|        144.3

In [34]:
from pyspark.sql.functions import month, year, dayofweek

orders_with_dates = orders_df.withColumn("order_month", month("order_date")) \
    .withColumn("order_year", year("order_date")) \
    .withColumn("day_of_week", dayofweek("order_date"))

orders_with_dates.select("order_date", "order_month", "order_year", "day_of_week", "total_amount").show()


+----------+-----------+----------+-----------+------------+
|order_date|order_month|order_year|day_of_week|total_amount|
+----------+-----------+----------+-----------+------------+
|2025-01-15|          1|      2025|          4|     1619.98|
|2025-01-18|          1|      2025|          7|       79.99|
|2025-02-03|          2|      2025|          2|      127.47|
|2025-02-10|          2|      2025|          2|      239.99|
|2025-02-14|          2|      2025|          6|      142.45|
|2025-03-02|          3|      2025|          1|      179.99|
|2025-03-08|          3|      2025|          7|      699.98|
|2025-03-15|          3|      2025|          7|      113.97|
|2025-03-22|          3|      2025|          7|        49.9|
|2025-03-25|          3|      2025|          3|       46.76|
|2025-04-05|          4|      2025|          7|       50.99|
|2025-04-12|          4|      2025|          7|       59.97|
|2025-04-20|          4|      2025|          1|      719.99|
|2025-04-28|          4|

In [35]:
# Monthly revenue trend
monthly_revenue = orders_with_dates.groupBy("order_year", "order_month").agg(
    sum("total_amount").alias("monthly_revenue"),
    count("order_id").alias("order_count")
).orderBy("order_year", "order_month")

monthly_revenue.show()


+----------+-----------+------------------+-----------+
|order_year|order_month|   monthly_revenue|order_count|
+----------+-----------+------------------+-----------+
|      2025|          1|           1759.07|          4|
|      2025|          2|            792.83|          7|
|      2025|          3|           3938.43|         11|
|      2025|          4|            1745.9|         10|
|      2025|          5|1792.3500000000001|          9|
|      2025|          6|1183.8400000000001|         10|
|      2025|          7|1603.1800000000003|          9|
|      2025|          8|           2838.88|          9|
|      2025|          9|2027.8600000000001|         12|
|      2025|         10|3743.3199999999997|          9|
|      2025|         11|2236.7000000000003|         10|
+----------+-----------+------------------+-----------+



In [36]:
# Code - Count missing values
from pyspark.sql.functions import when, col

missing_count = orders_df.select(
    count(when(col("shipping_days").isNull(), 1)).alias("missing_shipping_days")
)
missing_count.show()

+---------------------+
|missing_shipping_days|
+---------------------+
|                   19|
+---------------------+



In [37]:
orders_cleaned = orders_df.dropna(subset=["shipping_days"])
print(f"Orders after dropping nulls: {orders_cleaned.count()}")


Orders after dropping nulls: 81


In [38]:
# Fill missing values with average
avg_shipping = orders_df.select(avg("shipping_days")).collect()[0][0]
print(f"Average shipping days: {round(avg_shipping, 2)}")

orders_filled = orders_df.fillna({"shipping_days": round(avg_shipping, 0)})
orders_filled.select("order_id", "shipping_days").show()

Average shipping days: 6.54
+--------+-------------+
|order_id|shipping_days|
+--------+-------------+
|    1001|            5|
|    1002|            3|
|    1003|            7|
|    1004|            4|
|    1005|            7|
|    1006|           12|
|    1007|            8|
|    1008|           14|
|    1009|            2|
|    1010|            7|
|    1011|            6|
|    1012|            5|
|    1013|            3|
|    1014|            9|
|    1015|            7|
|    1016|           11|
|    1017|            4|
|    1018|            7|
|    1019|           13|
|    1020|            5|
+--------+-------------+
only showing top 20 rows



In [39]:
status_distribution = orders_df.groupBy("status").agg(
    count("order_id").alias("order_count"),
    sum("total_amount").alias("total_revenue")
).orderBy(col("order_count").desc())

status_distribution.show()

+---------+-----------+------------------+
|   status|order_count|     total_revenue|
+---------+-----------+------------------+
|Completed|         71|19594.179999999993|
|  Shipped|         12|2584.7200000000003|
|  Pending|          9|1224.3200000000002|
|Cancelled|          8|259.14000000000004|
+---------+-----------+------------------+



In [40]:
# Analyze cancelled orders
cancelled_orders = orders_df.filter(col("status") == "Cancelled")
cancelled_analysis = cancelled_orders.groupBy("category").agg(
    count("order_id").alias("cancelled_count"),
    spark_round(avg("total_amount"), 2).alias("avg_cancelled_value")
).orderBy(col("cancelled_count").desc())

cancelled_analysis.show()

+-----------+---------------+-------------------+
|   category|cancelled_count|avg_cancelled_value|
+-----------+---------------+-------------------+
| Stationery|              6|              26.19|
|Accessories|              2|              50.99|
+-----------+---------------+-------------------+



In [41]:
payment_analysis = orders_df.groupBy("payment_method").agg(
    count("order_id").alias("transaction_count"),
    sum("total_amount").alias("total_value"),
    spark_round(avg("total_amount"), 2).alias("avg_transaction_value")
).orderBy(col("total_value").desc())

payment_analysis.show()


+--------------+-----------------+------------------+---------------------+
|payment_method|transaction_count|       total_value|avg_transaction_value|
+--------------+-----------------+------------------+---------------------+
|   Credit Card|               26|11457.329999999998|               440.67|
| Bank Transfer|               23| 4720.719999999998|               205.25|
|        PayPal|               26| 4248.359999999999|                163.4|
|    Debit Card|               25|3235.9499999999994|               129.44|
+--------------+-----------------+------------------+---------------------+



In [42]:
# Code - Customer lifetime value
customer_value = orders_df.groupBy("customer_id").agg(
    count("order_id").alias("total_orders"),
    sum("total_amount").alias("lifetime_value"),
    spark_round(avg("total_amount"), 2).alias("avg_order_value")
).orderBy(col("lifetime_value").desc())

customer_value.show(20)

+-----------+------------+------------------+---------------+
|customer_id|total_orders|    lifetime_value|avg_order_value|
+-----------+------------+------------------+---------------+
|       5005|           2|           1949.97|         974.99|
|       5023|           2|           1739.95|         869.98|
|       5046|           2|           1735.96|         867.98|
|       5045|           3|           1282.45|         427.48|
|       5041|           3|           1272.46|         424.15|
|       5027|           2|           1209.96|         604.98|
|       5025|           2|           1064.98|         532.49|
|       5042|           2|           1051.97|         525.99|
|       5019|           3|           1001.02|         333.67|
|       5018|           2|            933.11|         466.56|
|       5011|           3|            851.95|         283.98|
|       5026|           2|            690.96|         345.48|
|       5031|           2| 652.0600000000001|         326.03|
|       

In [43]:
# Add customer segmentation
from pyspark.sql.functions import when

customer_segments = customer_value.withColumn(
    "customer_segment",
    when(col("lifetime_value") > 1000, "VIP")
    .when(col("lifetime_value") > 500, "High Value")
    .when(col("lifetime_value") > 200, "Regular")
    .otherwise("New")
)

customer_segments.show(20)

+-----------+------------+------------------+---------------+----------------+
|customer_id|total_orders|    lifetime_value|avg_order_value|customer_segment|
+-----------+------------+------------------+---------------+----------------+
|       5005|           2|           1949.97|         974.99|             VIP|
|       5023|           2|           1739.95|         869.98|             VIP|
|       5046|           2|           1735.96|         867.98|             VIP|
|       5045|           3|           1282.45|         427.48|             VIP|
|       5041|           3|           1272.46|         424.15|             VIP|
|       5027|           2|           1209.96|         604.98|             VIP|
|       5025|           2|           1064.98|         532.49|             VIP|
|       5042|           2|           1051.97|         525.99|             VIP|
|       5019|           3|           1001.02|         333.67|             VIP|
|       5018|           2|            933.11|       

In [44]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Calculate revenue per product
product_revenue = orders_df.groupBy("category", "product_name").agg(
    sum("total_amount").alias("product_revenue")
)


In [45]:

# Create window specification
window_spec = Window.partitionBy("category").orderBy(col("product_revenue").desc())

# Add ranking
ranked_products = product_revenue.withColumn(
    "rank_in_category",
    row_number().over(window_spec)
)

ranked_products.show(20)


+-----------+------------+------------------+----------------+
|   category|product_name|   product_revenue|rank_in_category|
+-----------+------------+------------------+----------------+
|Accessories|    BACKPACK|            536.89|               1|
|Accessories|WATER_BOTTLE|            491.75|               2|
|Electronics|      LAPTOP|          10079.88|               1|
|Electronics|     MONITOR|2504.8999999999996|               2|
|Electronics|  HEADPHONES|           1275.83|               3|
|Electronics|    KEYBOARD| 884.8100000000001|               4|
|Electronics|       MOUSE| 694.2800000000001|               5|
|  Furniture|        DESK|4024.8800000000006|               1|
|  Furniture|       CHAIR|            1819.9|               2|
|  Furniture|        LAMP| 803.7900000000001|               3|
| Stationery|    NOTEBOOK|            336.33|               1|
| Stationery|     PEN_SET|            209.12|               2|
+-----------+------------+------------------+----------

In [46]:
top_3_per_category = ranked_products.filter(col("rank_in_category") <= 3)
top_3_per_category.show()

+-----------+------------+------------------+----------------+
|   category|product_name|   product_revenue|rank_in_category|
+-----------+------------+------------------+----------------+
|Accessories|    BACKPACK|            536.89|               1|
|Accessories|WATER_BOTTLE|            491.75|               2|
|Electronics|      LAPTOP|          10079.88|               1|
|Electronics|     MONITOR|2504.8999999999996|               2|
|Electronics|  HEADPHONES|           1275.83|               3|
|  Furniture|        DESK|4024.8800000000006|               1|
|  Furniture|       CHAIR|            1819.9|               2|
|  Furniture|        LAMP| 803.7900000000001|               3|
| Stationery|    NOTEBOOK|            336.33|               1|
| Stationery|     PEN_SET|            209.12|               2|
+-----------+------------+------------------+----------------+



In [47]:
# Calculate overall business metrics
total_revenue = orders_df.agg(sum("total_amount")).collect()[0][0]
total_orders_count = orders_df.count()
avg_order_value = orders_df.agg(avg("total_amount")).collect()[0][0]
total_customers = orders_df.select("customer_id").distinct().count()


In [48]:

# Print dashboard
print("=" * 60)
print("RETAILPRO E-COMMERCE DASHBOARD")
print("=" * 60)
print(f"Total Revenue: ${total_revenue:,.2f}")
print(f"Total Orders: {total_orders_count:,}")
print(f"Average Order Value: ${avg_order_value:.2f}")
print(f"Total Customers: {total_customers:,}")
print(f"Revenue per Customer: ${total_revenue/total_customers:.2f}")
print("=" * 60)

RETAILPRO E-COMMERCE DASHBOARD
Total Revenue: $23,662.36
Total Orders: 100
Average Order Value: $236.62
Total Customers: 46
Revenue per Customer: $514.40


In [49]:
# Save top products to CSV
top_3_per_category.coalesce(1).write.mode("overwrite").csv("top_products_by_category", header=True)

# Save customer segments to Parquet (more efficient format)
customer_segments.write.mode("overwrite").parquet("customer_segments")

# Save country revenue to JSON
country_revenue.write.mode("overwrite").json("country_revenue")

In [50]:
!ls

country_revenue    ecommerce_orders.csv    sample_data
customer_segments  ecommerce_orders.csv.1  top_products_by_category


In [51]:
!cat ecommerce_orders.csv

order_id,order_date,customer_id,product_name,category,quantity,unit_price,discount_percent,total_amount,country,status,payment_method,shipping_days
1001,2025-01-15,5023,LAPTOP,Electronics,2,899.99,10,1619.98,USA,Completed,Credit Card,5
1002,2025-01-18,5012,HEADPHONES,Electronics,1,79.99,0,79.99,UK,Completed,PayPal,3
1003,2025-02-03,5045,KEYBOARD,Electronics,3,49.99,15,127.47,Canada,Shipped,Debit Card,7
1004,2025-02-10,5008,MONITOR,Electronics,1,299.99,20,239.99,Germany,Completed,Bank Transfer,4
1005,2025-02-14,5034,MOUSE,Electronics,5,29.99,5,142.45,France,Pending,Credit Card,
1006,2025-03-02,5019,CHAIR,Furniture,1,199.99,10,179.99,Australia,Completed,PayPal,12
1007,2025-03-08,5041,DESK,Furniture,2,349.99,0,699.98,Japan,Completed,Credit Card,8
1008,2025-03-15,5007,LAMP,Furniture,3,39.99,5,113.97,India,Completed,Debit Card,14
1009,2025-03-22,5028,NOTEBOOK,Stationery,10,4.99,0,49.90,USA,Completed,PayPal,2
1010,2025-03-25,5016,PEN_SET,Stationery,4,12.99,10,46.76,UK,Cancelled,Credit Card,
