In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, DateType, IntegerType, StringType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Sales Analysis") \
    .getOrCreate()

# Define the schema
schema = StructType([
    StructField("Product_id", IntegerType(), True),
    StructField("Customer_id", StringType(), True),
    StructField("Order_date", DateType(), True),
    StructField("Location", StringType(), True),
    StructField("Source_Order", StringType(), True)
])

# Read the CSV file with the defined schema
df = spark.read.format('csv') \
    .schema(schema) \
    .load('data/sales.csv')

# Show the first 5 rows of the DataFrame
df.show(5)


+----------+-----------+----------+--------+------------+
|Product_id|Customer_id|Order_date|Location|Source_Order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
+----------+-----------+----------+--------+------------+
only showing top 5 rows



In [4]:
# Add " year , Month , Quarter " Columns

from pyspark.sql.functions import year , month , quarter
df_t1 = df.withColumn('Year' , year(df['Order_date']))
df_t2 = df_t1.withColumn('month' , month(df['Order_date']))
df_t3 = df_t2.withColumn('quarter' , quarter(df['Order_date']))

df_t3.show(4)

+----------+-----------+----------+--------+------------+----+-----+-------+
|Product_id|Customer_id|Order_date|Location|Source_Order|Year|month|quarter|
+----------+-----------+----------+--------+------------+----+-----+-------+
|         1|          A|2023-01-01|   India|      Swiggy|2023|    1|      1|
|         2|          A|2022-01-01|   India|      Swiggy|2022|    1|      1|
|         2|          A|2023-01-07|   India|      Swiggy|2023|    1|      1|
|         3|          A|2023-01-10|   India|  Restaurant|2023|    1|      1|
+----------+-----------+----------+--------+------------+----+-----+-------+
only showing top 4 rows



In [51]:
# Importing Menu Data

schema = StructType([
  StructField("Product_id" , IntegerType() , True),
  StructField("Product_Name" , StringType() , True),
  StructField("Price" , StringType() , True),

  ])

Menu = spark.read.format('csv') \
  .schema(schema) \
  .load('data/menu.csv.')

Menu.show(5)

+----------+------------+-----+
|Product_id|Product_Name|Price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
+----------+------------+-----+
only showing top 5 rows



In [7]:
# Merge Sales & Menu

Sales = df_t3
merge = Sales.join(Menu , 'Product_id' , 'inner')
merge.show(4)

+----------+-----------+----------+--------+------------+----+-----+-------+------------+-----+
|Product_id|Customer_id|Order_date|Location|Source_Order|Year|month|quarter|Product_Name|Price|
+----------+-----------+----------+--------+------------+----+-----+-------+------------+-----+
|         1|          A|2023-01-01|   India|      Swiggy|2023|    1|      1|       PIZZA|  100|
|         2|          A|2022-01-01|   India|      Swiggy|2022|    1|      1|     Chowmin|  150|
|         2|          A|2023-01-07|   India|      Swiggy|2023|    1|      1|     Chowmin|  150|
|         3|          A|2023-01-10|   India|  Restaurant|2023|    1|      1|    sandwich|  120|
+----------+-----------+----------+--------+------------+----+-----+-------+------------+-----+
only showing top 4 rows



In [41]:
# Total amount Spent by Customer

from pyspark.sql.functions import sum, desc

total_amt_Customer = merge.groupBy('Customer_id').agg(sum('Price').alias('total')).orderBy(desc('total'))
total_amt_Customer.show()


+-----------+------+
|Customer_id| total|
+-----------+------+
|          B|4440.0|
|          A|4260.0|
|          C|2400.0|
|          E|2040.0|
|          D|1200.0|
+-----------+------+



In [42]:
# Total amount Spent by each food Category
total_amt_food = merge.groupBy('Product_Name').agg(sum('Price').alias('Total')).orderBy(desc('Total'))
total_amt_food.show()

+------------+------+
|Product_Name| Total|
+------------+------+
|    sandwich|5760.0|
|     Chowmin|3600.0|
|       PIZZA|2100.0|
|        Dosa|1320.0|
|       Pasta|1080.0|
|     Biryani| 480.0|
+------------+------+



In [43]:
# Total amount of Sales in each Month

total_amt_Month = merge.groupBy('month').agg(sum('Price').alias('Total')).orderBy('month')
total_amt_Month.show()

+-----+------+
|month| Total|
+-----+------+
|    1|2960.0|
|    2|2730.0|
|    3| 910.0|
|    5|2960.0|
|    6|2960.0|
|    7| 910.0|
|   11| 910.0|
+-----+------+



In [44]:
# Yearly Sales

total_amt_Year = merge.groupBy('Year').agg(sum('Price').alias('Total')).orderBy('Year')
total_amt_Year.show()

+----+------+
|Year| Total|
+----+------+
|2022|4350.0|
|2023|9990.0|
+----+------+



In [45]:
# quarter Sales

total_amt_quarter = merge.groupBy('quarter').agg(sum('Price').alias('Total')).orderBy('quarter')
total_amt_quarter.show()

+-------+------+
|quarter| Total|
+-------+------+
|      1|6600.0|
|      2|5920.0|
|      3| 910.0|
|      4| 910.0|
+-------+------+



In [46]:
# How many times each product purchased ? 

product_purchased = merge.groupBy('Product_Name').agg(count('*').alias('cnt_orders')).orderBy(desc('cnt_orders'))
product_purchased.show()

+------------+----------+
|Product_Name|cnt_orders|
+------------+----------+
|    sandwich|        48|
|     Chowmin|        24|
|       PIZZA|        21|
|        Dosa|        12|
|       Pasta|         6|
|     Biryani|         6|
+------------+----------+



In [47]:
# Frequency of Customer visiting Restaurant

Customer_purchase1 = merge.filter(merge.Source_Order == 'Restaurant').groupBy('Customer_id').agg(countDistinct('Order_date').alias('cnt')).orderBy(desc('cnt'))
Customer_purchase1.show()

+-----------+---+
|Customer_id|cnt|
+-----------+---+
|          B|  6|
|          A|  6|
|          E|  5|
|          C|  3|
|          D|  1|
+-----------+---+



In [48]:
# Total Sales By each Country

Country_sales = merge.groupBy('Location').agg(sum('Price').alias('Total')).orderBy(desc('Total'))
Country_sales.show()

+--------+------+
|Location| Total|
+--------+------+
|      UK|7020.0|
|   India|4860.0|
|     USA|2460.0|
+--------+------+



In [49]:
# Total sales by Order Source

source_order_sales = merge.groupBy('Source_Order').agg(sum('Price').alias('Total')).orderBy(desc('Total'))
source_order_sales.show()

+------------+------+
|Source_Order| Total|
+------------+------+
|      Swiggy|6330.0|
|      zomato|4920.0|
|  Restaurant|3090.0|
+------------+------+



In [19]:
# Top 5 Customers by Total Spend
top_customers = merge.groupBy('Customer_id').agg(sum('Price').alias('Total_Spend')).orderBy(desc('Total_Spend')).limit(5)
top_customers.show()

+-----------+-----------+
|Customer_id|Total_Spend|
+-----------+-----------+
|          B|     4440.0|
|          A|     4260.0|
|          C|     2400.0|
|          E|     2040.0|
|          D|     1200.0|
+-----------+-----------+



In [29]:
# Average Spend per Order by Customer
from pyspark.sql.functions import avg, format_number

avg_spend_per_order = merge.groupBy('Customer_id') \
    .agg(format_number(avg('Price'), 2).alias('Avg_Spend')) \
    .orderBy(desc('Avg_Spend'))

avg_spend_per_order.show()

+-----------+---------+
|Customer_id|Avg_Spend|
+-----------+---------+
|          C|   133.33|
|          A|   129.09|
|          B|   123.33|
|          E|   113.33|
|          D|   100.00|
+-----------+---------+



In [21]:
# Product Popularity by Month
product_popularity_month = merge.groupBy('Product_Name', 'month').agg(count('*').alias('Order_Count')).orderBy('month', desc('Order_Count'))
product_popularity_month.show()

+------------+-----+-----------+
|Product_Name|month|Order_Count|
+------------+-----+-----------+
|    sandwich|    1|         10|
|     Chowmin|    1|          6|
|       PIZZA|    1|          5|
|       Pasta|    1|          2|
|    sandwich|    2|          9|
|        Dosa|    2|          6|
|       PIZZA|    2|          3|
|     Chowmin|    2|          3|
|     Biryani|    2|          3|
|    sandwich|    3|          3|
|        Dosa|    3|          2|
|     Chowmin|    3|          1|
|     Biryani|    3|          1|
|       PIZZA|    3|          1|
|    sandwich|    5|         10|
|     Chowmin|    5|          6|
|       PIZZA|    5|          5|
|       Pasta|    5|          2|
|    sandwich|    6|         10|
|     Chowmin|    6|          6|
+------------+-----+-----------+
only showing top 20 rows



In [22]:
# Sales Trend Over Time (Daily Sales)
from pyspark.sql.functions import to_date
daily_sales = merge.withColumn('Order_Date', to_date('Order_date')).groupBy('Order_Date').agg(sum('Price').alias('Total_Sales')).orderBy('Order_Date')
daily_sales.show()

+----------+-----------+
|Order_Date|Total_Sales|
+----------+-----------+
|2022-01-01|      300.0|
|2022-01-07|      180.0|
|2022-01-11|      240.0|
|2022-02-01|      350.0|
|2022-02-05|      350.0|
|2022-02-06|      350.0|
|2022-03-01|      260.0|
|2022-03-16|      120.0|
|2022-05-05|      300.0|
|2022-05-07|      180.0|
|2022-05-11|      240.0|
|2022-06-06|      300.0|
|2022-06-11|      420.0|
|2022-07-05|      260.0|
|2022-07-16|      120.0|
|2022-11-06|      260.0|
|2022-11-16|      120.0|
|2023-01-01|      540.0|
|2023-01-02|      300.0|
|2023-01-04|      200.0|
+----------+-----------+
only showing top 20 rows



In [30]:
# Percentage Contribution by Each Food Category
from pyspark.sql.functions import sum, desc, format_number

# Calculate the total sales
total_sales = merge.agg(sum('Price').alias('Total_Sales')).collect()[0]['Total_Sales']

# Calculate the contribution percentage for each food category
contribution = merge.groupBy('Product_Name') \
    .agg(format_number((sum('Price')/total_sales*100), 2).alias('Contribution_Percentage')) \
    .orderBy(desc('Contribution_Percentage'))

contribution.show()


+------------+-----------------------+
|Product_Name|Contribution_Percentage|
+------------+-----------------------+
|        Dosa|                   9.21|
|       Pasta|                   7.53|
|    sandwich|                  40.17|
|     Biryani|                   3.35|
|     Chowmin|                  25.10|
|       PIZZA|                  14.64|
+------------+-----------------------+



In [25]:
# Sales Comparison Between Different Order Sources
source_comparison = merge.groupBy('Source_Order').agg(sum('Price').alias('Total_Sales')).orderBy(desc('Total_Sales'))
source_comparison.show()

+------------+-----------+
|Source_Order|Total_Sales|
+------------+-----------+
|      Swiggy|     6330.0|
|      zomato|     4920.0|
|  Restaurant|     3090.0|
+------------+-----------+



In [26]:
# Month-wise Sales Growth Rate
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.orderBy("month")
monthly_sales = merge.groupBy('month').agg(sum('Price').alias('Total_Sales')).orderBy('month')
monthly_sales = monthly_sales.withColumn("Previous_Sales", lag(monthly_sales['Total_Sales']).over(windowSpec))
monthly_sales = monthly_sales.withColumn("Growth_Rate", ((monthly_sales['Total_Sales'] - monthly_sales['Previous_Sales']) / monthly_sales['Previous_Sales']) * 100)
monthly_sales.show()

+-----+-----------+--------------+------------------+
|month|Total_Sales|Previous_Sales|       Growth_Rate|
+-----+-----------+--------------+------------------+
|    1|     2960.0|          NULL|              NULL|
|    2|     2730.0|        2960.0| -7.77027027027027|
|    3|      910.0|        2730.0|-66.66666666666666|
|    5|     2960.0|         910.0|225.27472527472528|
|    6|     2960.0|        2960.0|               0.0|
|    7|      910.0|        2960.0|-69.25675675675676|
|   11|      910.0|         910.0|               0.0|
+-----+-----------+--------------+------------------+



In [28]:
# Customer Retention Analysis
retention = merge.groupBy('Customer_id', 'Year').agg(countDistinct('Order_date').alias('Orders_Per_Year')).orderBy('Customer_id', 'Year')
retention.show()

+-----------+----+---------------+
|Customer_id|Year|Orders_Per_Year|
+-----------+----+---------------+
|          A|2022|              6|
|          A|2023|             11|
|          B|2022|              6|
|          B|2023|             16|
|          C|2022|              3|
|          C|2023|              6|
|          D|2022|              6|
|          D|2023|              4|
|          E|2022|              3|
|          E|2023|             11|
+-----------+----+---------------+



In [32]:
# Calculate the total sales amount for each product.

total_sales_by_product = merge.groupBy('Product_Name') \
    .agg(format_number(sum('Price'), 2).alias('Total_Sales')) \
    .orderBy(desc('Total_Sales'))

total_sales_by_product.show()


+------------+-----------+
|Product_Name|Total_Sales|
+------------+-----------+
|    sandwich|   5,760.00|
|     Biryani|     480.00|
|     Chowmin|   3,600.00|
|       PIZZA|   2,100.00|
|        Dosa|   1,320.00|
|       Pasta|   1,080.00|
+------------+-----------+



In [33]:
##  Percentage Contribution by Location
# Calculate the percentage contribution of each location to the total sales.

# Calculate the total sales
total_sales = merge.agg(sum('Price').alias('Total_Sales')).collect()[0]['Total_Sales']

# Calculate the contribution percentage by location
contribution_by_location = merge.groupBy('Location') \
    .agg(format_number((sum('Price') / total_sales * 100), 2).alias('Contribution_Percentage')) \
    .orderBy(desc('Contribution_Percentage'))

contribution_by_location.show()


+--------+-----------------------+
|Location|Contribution_Percentage|
+--------+-----------------------+
|      UK|                  48.95|
|   India|                  33.89|
|     USA|                  17.15|
+--------+-----------------------+



In [35]:
# Calculate the average spend per customer for each month
from pyspark.sql.functions import avg

monthly_avg_spend = merge.groupBy('Customer_id', 'month') \
    .agg(format_number(avg('Price'), 2).alias('Avg_Spend')) \
    .orderBy('Customer_id', 'month')

monthly_avg_spend.show()


+-----------+-----+---------+
|Customer_id|month|Avg_Spend|
+-----------+-----+---------+
|          A|    1|   129.09|
|          A|    5|   129.09|
|          A|    6|   129.09|
|          B|    1|   123.33|
|          B|    2|   123.33|
|          B|    3|   123.33|
|          B|    5|   123.33|
|          B|    6|   123.33|
|          B|    7|   123.33|
|          B|   11|   123.33|
|          C|    1|   133.33|
|          C|    5|   133.33|
|          C|    6|   133.33|
|          D|    2|   100.00|
|          D|    3|   100.00|
|          D|    7|   100.00|
|          D|   11|   100.00|
|          E|    2|   113.33|
|          E|    3|   113.33|
|          E|    7|   113.33|
+-----------+-----+---------+
only showing top 20 rows



In [38]:
#Calculate the total sales and average price by order source.
from pyspark.sql.functions import avg

sales_by_source = merge.groupBy('Source_Order') \
    .agg(
        format_number(sum('Price'), 2).alias('Total_Sales'),
        format_number(avg('Price'), 2).alias('Avg_Price')
    ) \
    .orderBy(desc('Total_Sales'))

sales_by_source.show()


+------------+-----------+---------+
|Source_Order|Total_Sales|Avg_Price|
+------------+-----------+---------+
|      Swiggy|   6,330.00|   124.12|
|      zomato|   4,920.00|   126.15|
|  Restaurant|   3,090.00|   114.44|
+------------+-----------+---------+



In [37]:
#Find the top 5 most expensive products based on their average price.

top_expensive_products = merge.groupBy('Product_Name') \
    .agg(format_number(avg('Price'), 2).alias('Avg_Price')) \
    .orderBy(desc('Avg_Price')) \
    .limit(5)

top_expensive_products.show()


+------------+---------+
|Product_Name|Avg_Price|
+------------+---------+
|     Biryani|    80.00|
|       Pasta|   180.00|
|     Chowmin|   150.00|
|    sandwich|   120.00|
|        Dosa|   110.00|
+------------+---------+

