In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FoodSalesApp") \
    .getOrCreate()

In [2]:
spark

Sales Dataframe

In [4]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])

sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("sales.csv")
display(sales_df)

DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string]

In [5]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

In [7]:
## adding year month and quarter
from pyspark.sql.functions import month, year,quarter
sales_df = sales_df.withColumn("order_year",year(sales_df.order_date))

sales_df = sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter",quarter(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

Menu Dataframe

In [15]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",FloatType(),True)
])

menu_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("menu.csv")
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|      null|        Name| null|
|         1|       PIZZA|100.0|
|         2|     Chowmin|150.0|
|         3|    sandwich|120.0|
|         4|        Dosa|110.0|
|         5|     Biryani| 80.0|
|         6|       Pasta|180.0|
+----------+------------+-----+



In [16]:
## Total Amount spent by each customer
total_amount_spent = (sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'}).orderBy('customer_id'))
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [19]:
## Total Amount spent by each food category
total_amount_spent_food = (sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'}).orderBy('product_name'))
total_amount_spent_food.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



In [20]:
## Total Amount of Sales in each month
sales_by_month = (sales_df.join(menu_df,'product_id').groupBy('order_month').agg({'price':'sum'}).orderBy('order_month'))
sales_by_month.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



In [21]:
## Yearly sales
yearly_sales = (sales_df.join(menu_df,'product_id').groupBy('order_year').agg({'price':'sum'}).orderBy('order_year'))
yearly_sales.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



In [23]:
## Quarterly sales
quarterly_sales = (sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'}).orderBy('order_quarter'))
quarterly_sales.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



In [33]:
## How many times each product purchased
product_order_count = sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'product_id':'count'}).withColumnRenamed('count(product_id)', 'product_count').orderBy('product_count',ascending=0)
product_order_count.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [34]:
## Top 3 ordered items
product_order_count.limit(3).show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
+------------+-------------+



In [35]:
## No of visits by customer to restaurant
from pyspark.sql.functions import countDistinct
visits_to_restaurant = sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date'))
visits_to_restaurant.show()

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          E|                5|
|          B|                6|
|          D|                1|
|          C|                3|
|          A|                6|
+-----------+-----------------+



In [36]:
## total sales by each country
sales_by_country = sales_df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'})
sales_by_country.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+

