In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum as _sum


spark = SparkSession.builder.appName("ECommerce-Capstone").getOrCreate()


customers_data = [
(1, "Rahul Sharma", "Bangalore", 28),
(2, "Priya Singh", "Delhi", 32),
(3, "Aman Kumar", "Hyderabad", 25),
(4, "Sneha Reddy", "Chennai", 35),
(5, "Arjun Mehta", "Mumbai", 30),
(6, "Divya Nair", "Delhi", 29)
]
customers_cols = ["customer_id", "name", "city", "age"]
customers_df = spark.createDataFrame(customers_data, customers_cols)


products_data = [
(101, "Laptop", "Electronics", 55000),
(102, "Mobile", "Electronics", 25000),
(103, "Headphones", "Electronics", 3000),
(104, "Chair", "Furniture", 5000),
(105, "Book", "Stationery", 700),
(106, "Shoes", "Fashion", 2500)
]
products_cols = ["product_id", "product_name", "category", "price"]
products_df = spark.createDataFrame(products_data, products_cols)


orders_data = [
(1001, 1, 101, 1),
(1002, 2, 102, 2),
(1003, 1, 103, 3),
(1004, 3, 104, 1),
(1005, 5, 105, 5),
(1006, 6, 106, 2),
(1007, 7, 101, 1)
]
orders_cols = ["order_id", "customer_id", "product_id", "quantity"]
orders_df = spark.createDataFrame(orders_data, orders_cols)
customers_df.show()
products_df.show()
orders_df.show()

+-----------+------------+---------+---+
|customer_id|        name|     city|age|
+-----------+------------+---------+---+
|          1|Rahul Sharma|Bangalore| 28|
|          2| Priya Singh|    Delhi| 32|
|          3|  Aman Kumar|Hyderabad| 25|
|          4| Sneha Reddy|  Chennai| 35|
|          5| Arjun Mehta|   Mumbai| 30|
|          6|  Divya Nair|    Delhi| 29|
+-----------+------------+---------+---+

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|       101|      Laptop|Electronics|55000|
|       102|      Mobile|Electronics|25000|
|       103|  Headphones|Electronics| 3000|
|       104|       Chair|  Furniture| 5000|
|       105|        Book| Stationery|  700|
|       106|       Shoes|    Fashion| 2500|
+----------+------------+-----------+-----+

+--------+-----------+----------+--------+
|order_id|customer_id|product_id|quantity|
+--------+-----------+----------+--------+
|    1001|         

In [19]:
# 1
customers_df.select("name", "city").show()
# 2
products_df.select("category").distinct().show()
# 3
customers_df.filter(col("age") > 30).show()

+------------+---------+
|        name|     city|
+------------+---------+
|Rahul Sharma|Bangalore|
| Priya Singh|    Delhi|
|  Aman Kumar|Hyderabad|
| Sneha Reddy|  Chennai|
| Arjun Mehta|   Mumbai|
|  Divya Nair|    Delhi|
+------------+---------+

+-----------+
|   category|
+-----------+
|Electronics|
| Stationery|
|    Fashion|
|  Furniture|
+-----------+

+-----------+-----------+-------+---+
|customer_id|       name|   city|age|
+-----------+-----------+-------+---+
|          2|Priya Singh|  Delhi| 32|
|          4|Sneha Reddy|Chennai| 35|
+-----------+-----------+-------+---+



In [20]:
# 4
orders_df.groupBy("customer_id").count().withColumnRenamed("count", "num_orders").show()
# 5
customers_df.groupBy("city").agg(avg("age").alias("avg_age")).show()
# 6
orders_with_product = orders_df.join(products_df, on="product_id", how="left")
orders_with_product.withColumn("revenue", col("price") * col("quantity")) \
.groupBy("product_id", "product_name") \
.agg(_sum("revenue").alias("total_revenue")) \
.show()

+-----------+----------+
|customer_id|num_orders|
+-----------+----------+
|          1|         2|
|          2|         1|
|          7|         1|
|          6|         1|
|          5|         1|
|          3|         1|
+-----------+----------+

+---------+-------+
|     city|avg_age|
+---------+-------+
|Bangalore|   28.0|
|    Delhi|   30.5|
|Hyderabad|   25.0|
|  Chennai|   35.0|
|   Mumbai|   30.0|
+---------+-------+

+----------+------------+-------------+
|product_id|product_name|total_revenue|
+----------+------------+-------------+
|       103|  Headphones|         9000|
|       101|      Laptop|       110000|
|       102|      Mobile|        50000|
|       104|       Chair|         5000|
|       106|       Shoes|         5000|
|       105|        Book|         3500|
+----------+------------+-------------+



In [21]:
# 7
customers_df.join(orders_df, on="customer_id", how="left") \
.join(products_df, on="product_id", how="left") \
.select("customer_id", "name", "product_name", "quantity") \
.show()
# 8
orders_df.join(products_df, on="product_id", how="left") \
.select("order_id", "customer_id", "product_id", "product_name", "price", "quantity") \
.show()
# 9
customers_df.join(orders_df, on="customer_id", how="left_anti").show()
# 10
products_df.join(orders_df, on="product_id", how="left_anti").show()

+-----------+------------+------------+--------+
|customer_id|        name|product_name|quantity|
+-----------+------------+------------+--------+
|          1|Rahul Sharma|  Headphones|       3|
|          1|Rahul Sharma|      Laptop|       1|
|          3|  Aman Kumar|       Chair|       1|
|          2| Priya Singh|      Mobile|       2|
|          6|  Divya Nair|       Shoes|       2|
|          5| Arjun Mehta|        Book|       5|
|          4| Sneha Reddy|        NULL|    NULL|
+-----------+------------+------------+--------+

+--------+-----------+----------+------------+-----+--------+
|order_id|customer_id|product_id|product_name|price|quantity|
+--------+-----------+----------+------------+-----+--------+
|    1003|          1|       103|  Headphones| 3000|       3|
|    1001|          1|       101|      Laptop|55000|       1|
|    1002|          2|       102|      Mobile|25000|       2|
|    1004|          3|       104|       Chair| 5000|       1|
|    1006|          6|    

In [22]:
# 11
products_df.join(orders_df, on="product_id", how="inner") \
.select("product_id", "product_name", "price") \
.distinct() \
.orderBy(col("price").desc()) \
.limit(3) \
.show()
# 12
orders_with_product.withColumn("revenue", col("price") * col("quantity")) \
.groupBy("category") \
.agg(_sum("revenue").alias("total_revenue")) \
.show()
# 13
customers_spend = orders_df.join(products_df, on="product_id", how="left") \
.withColumn("revenue", col("price") * col("quantity")) \
.groupBy("customer_id") \
.agg(_sum("revenue").alias("total_spent"))
customers_df.join(customers_spend, on="customer_id", how="left") \
.fillna({"total_spent": 0}) \
.orderBy(col("total_spent").desc()) \
.select("customer_id", "name", "city", "total_spent") \
.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|       101|      Laptop|55000|
|       102|      Mobile|25000|
|       104|       Chair| 5000|
+----------+------------+-----+

+-----------+-------------+
|   category|total_revenue|
+-----------+-------------+
| Stationery|         3500|
|    Fashion|         5000|
|Electronics|       169000|
|  Furniture|         5000|
+-----------+-------------+

+-----------+------------+---------+-----------+
|customer_id|        name|     city|total_spent|
+-----------+------------+---------+-----------+
|          1|Rahul Sharma|Bangalore|      64000|
|          2| Priya Singh|    Delhi|      50000|
|          3|  Aman Kumar|Hyderabad|       5000|
|          6|  Divya Nair|    Delhi|       5000|
|          5| Arjun Mehta|   Mumbai|       3500|
|          4| Sneha Reddy|  Chennai|          0|
+-----------+------------+---------+-----------+



In [23]:
# 11
products_df.join(orders_df, on="product_id", how="inner") \
.select("product_id", "product_name", "price") \
.distinct() \
.orderBy(col("price").desc()) \
.limit(3) \
.show()
# 12
orders_with_product.withColumn("revenue", col("price") * col("quantity")) \
.groupBy("category") \
.agg(_sum("revenue").alias("total_revenue")) \
.show()
# 13
customers_spend = orders_df.join(products_df, on="product_id", how="left") \
.withColumn("revenue", col("price") * col("quantity")) \
.groupBy("customer_id") \
.agg(_sum("revenue").alias("total_spent"))
customers_df.join(customers_spend, on="customer_id", how="left") \
.fillna({"total_spent": 0}) \
.orderBy(col("total_spent").desc()) \
.select("customer_id", "name", "city", "total_spent") \
.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|       101|      Laptop|55000|
|       102|      Mobile|25000|
|       104|       Chair| 5000|
+----------+------------+-----+

+-----------+-------------+
|   category|total_revenue|
+-----------+-------------+
| Stationery|         3500|
|    Fashion|         5000|
|Electronics|       169000|
|  Furniture|         5000|
+-----------+-------------+

+-----------+------------+---------+-----------+
|customer_id|        name|     city|total_spent|
+-----------+------------+---------+-----------+
|          1|Rahul Sharma|Bangalore|      64000|
|          2| Priya Singh|    Delhi|      50000|
|          3|  Aman Kumar|Hyderabad|       5000|
|          6|  Divya Nair|    Delhi|       5000|
|          5| Arjun Mehta|   Mumbai|       3500|
|          4| Sneha Reddy|  Chennai|          0|
+-----------+------------+---------+-----------+



In [24]:
# 14
customers_df.createOrReplaceTempView("customers")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")


# 15
spark.sql("""
SELECT c.city, SUM(p.price * o.quantity) AS total_revenue
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
GROUP BY c.city
ORDER BY total_revenue DESC
LIMIT 2
""").show()


# 16
spark.sql("""
SELECT c.customer_id, c.name, SUM(p.price * o.quantity) AS total_spent
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
GROUP BY c.customer_id, c.name
HAVING total_spent > 50000
""").show()


# 17
spark.sql("""
SELECT p.category, SUM(p.price * o.quantity) AS total_revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
GROUP BY p.category
ORDER BY total_revenue DESC
LIMIT 1
""").show()




+---------+-------------+
|     city|total_revenue|
+---------+-------------+
|Bangalore|        64000|
|    Delhi|        55000|
+---------+-------------+

+-----------+------------+-----------+
|customer_id|        name|total_spent|
+-----------+------------+-----------+
|          1|Rahul Sharma|      64000|
+-----------+------------+-----------+

+-----------+-------------+
|   category|total_revenue|
+-----------+-------------+
|Electronics|       169000|
+-----------+-------------+

