In [34]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ColabSpark").getOrCreate()
print("✅ Spark Version:", spark.version)

✅ Spark Version: 3.5.1


In [46]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# creating the schema for the sales data
sales_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

In [47]:
sales_df = spark.read.option("inferschema", "true")\
                      .schema(sales_schema)\
                      .csv("./sales.csv.txt")

sales_df.show()


+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

In [48]:
from pyspark.sql.functions import year, month, quarter

# Extracting year month and quater from order_date
new_sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
new_sales_df = new_sales_df.withColumn("order_month", month(sales_df.order_date))
new_sales_df = new_sales_df.withColumn("order_quater", quarter(sales_df.order_date))

new_sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quater|
+----------+-----------+----------+--------+------------+----------+-----------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|           1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|           1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|           1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|           1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|           1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|           1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|           1|
|         2|          B|2023-01-02|   India|      Swiggy|   

In [49]:
# creating the menu data schema
menu_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True)
])

In [50]:
menu_df = spark.read.schema(menu_schema).csv("./menu.csv.txt")
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [51]:
joined_table = new_sales_df.join(menu_df,
                                 "product_id",
                                 "inner")
joined_table.show()

+----------+-----------+----------+--------+------------+----------+-----------+------------+------------+-----+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quater|product_name|price|
+----------+-----------+----------+--------+------------+----------+-----------+------------+------------+-----+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|           1|       PIZZA|  100|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|           1|     Chowmin|  150|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|           1|     Chowmin|  150|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|           1|    sandwich|  120|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|           1|    sandwich|  120|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|           1|   

In [52]:
# total sales by customer
from pyspark.sql.functions import sum

joined_table.groupBy("customer_id").agg(sum("price").alias("total spent")).orderBy("customer_id").show()

+-----------+-----------+
|customer_id|total spent|
+-----------+-----------+
|          A|     4260.0|
|          B|     4440.0|
|          C|     2400.0|
|          D|     1200.0|
|          E|     2040.0|
+-----------+-----------+



In [53]:
# total product revenue
joined_table.groupBy("product_name").agg(sum("price").alias("total sales")).orderBy("product_name").show()

+------------+-----------+
|product_name|total sales|
+------------+-----------+
|     Biryani|      480.0|
|     Chowmin|     3600.0|
|        Dosa|     1320.0|
|       PIZZA|     2100.0|
|       Pasta|     1080.0|
|    sandwich|     5760.0|
+------------+-----------+



In [26]:
# Total sales per month
joined_table.groupBy("order_month").agg({"price":"sum"}).orderBy("order_month").show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



In [28]:
# Yearly Sales
joined_table.groupBy("order_year").agg({"price":"sum"}).orderBy("order_year").show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



In [32]:
from pyspark.sql.functions import count

# Count of product sales
joined_table.groupBy("product_id", "product_name")\
            .agg(count("product_id")\
            .alias("product_count"))\
            .orderBy("product_count", ascending=0)\
            .drop("product_id")\
            .limit(5)\
            .show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
+------------+-------------+



In [42]:
# showing the number of different dates a customer orders at the resturant
from pyspark.sql.functions import countDistinct
df = sales_df.filter(sales_df.source_order == 'Restaurant')\
                              .groupBy("customer_id")\
                              .agg(countDistinct('order_date'))\
                              .show()

+-----------+--------------------------+
|customer_id|count(DISTINCT order_date)|
+-----------+--------------------------+
|          E|                         5|
|          B|                         6|
|          D|                         1|
|          C|                         3|
|          A|                         6|
+-----------+--------------------------+

