## Source File Paths

In [0]:
sales_file_path = '/Volumes/workspace/default/sales_project_source_data/sales.csv.txt'
menu_file_path = '/Volumes/workspace/default/sales_project_source_data/menu.csv.txt'

### Loading the Sales Data to the sales_df DataFrame

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DateType

sales_schema = StructType([
    StructField('Product_ID', IntegerType(), True),
    StructField('Customer_ID', StringType(), True),
    StructField('Order_Date', DateType(), True),
    StructField('Location', StringType(), True),
    StructField('Source_Order', StringType(), True)
])

sales_df = spark.read.csv(sales_file_path, header = True, inferSchema=True, schema=sales_schema)
display(sales_df)

Product_ID,Customer_ID,Order_Date,Location,Source_Order
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy
3,B,2023-01-16,India,zomato


### Adding Year, Quarter, and Month Columns to the sales_df DataFrame

In [0]:
from pyspark.sql.functions import month, year, quarter

sales_df = sales_df.withColumn('Order_Year', year(col('Order_Date')))\
                    .withColumn('Order_Quarter', quarter(col('Order_Date')))\
                    .withColumn('Order_Month', month(col('Order_Date')))
sales_df.display()

Product_ID,Customer_ID,Order_Date,Location,Source_Order,Order_Year,Order_Quarter,Order_Month
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,1,2
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,1,2
3,B,2023-01-16,India,zomato,2023,1,1


### Loading the Menu Data to the menu_df DataFrame

In [0]:
from pyspark.sql.types import DecimalType

menu_schema = StructType([
  StructField('Product_ID', IntegerType(), True),
  StructField('Product_Name', StringType(), True),
  StructField('Price', StringType(), True)
])

menu_df = spark.read.csv(menu_file_path, header = True, inferSchema=True, schema=menu_schema)
display(menu_df)

Product_ID,Product_Name,Price
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
menu_df = menu_df.withColumn('Price', col('Price').cast(DecimalType(10,2)))
menu_df.display()

Product_ID,Product_Name,Price
2,Chowmin,150.0
3,sandwich,120.0
4,Dosa,110.0
5,Biryani,80.0
6,Pasta,180.0


## KPIs

### Total Amount Spent by Each Customer
- **The Customer Information is available in the Sales_df and the Price details are available in the menu_df.**
- **So, we need to join the Sales_df and the Menu_df to find this KPI.**

In [0]:
from pyspark.sql import functions as F

total_amount_spent_by_customer = (
    sales_df.join(menu_df, on='Product_ID', how='inner')\
        .groupBy('Customer_ID').agg(F.sum('Price').alias('Total_Amount'))\
        .orderBy('Customer_ID')
)

total_amount_spent_by_customer.display()

Customer_ID,Total_Amount
A,3960.0
B,3240.0
C,1800.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

### Total Amount Spent by Each Food Category

In [0]:
total_spent_by_food_category = (
    sales_df.join(menu_df, on = 'Product_ID', how = 'inner')
    .groupBy('Product_Name').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Product_Name')
)

total_spent_by_food_category.display()

Product_Name,Total_Sales
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

### Total Amount of Sales in Each Month

In [0]:
total_sales_by_month = (
    sales_df.join(menu_df, on = 'Product_ID', how = 'inner')
    .groupBy('Order_Month').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Order_Month')
)

total_sales_by_month.display()

Order_Month,Total_Sales
1,2460.0
2,2430.0
3,810.0
5,2460.0
6,2460.0
7,810.0
11,810.0


Databricks visualization. Run in Databricks to view.

### Yearly Sales

In [0]:
yearly_sales = (
    sales_df.join(menu_df, on = "Product_ID", how = 'inner')
    .groupBy('Order_Year').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Order_Year')
)

yearly_sales.display()

Order_Year,Total_Sales
2022,4350.0
2023,7890.0


Databricks visualization. Run in Databricks to view.

### Quarterly Sales

In [0]:
quarterly_sales = (
    sales_df.join(menu_df, "Product_ID", "inner")
    .groupBy('Order_Quarter').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Order_Quarter')
)

quarterly_sales.display()

Order_Quarter,Total_Sales
1,5700.0
2,4920.0
3,810.0
4,810.0


Databricks visualization. Run in Databricks to view.

### Total Number of Orders by Each Category

In [0]:
total_orders_by_category = (
    sales_df.join(menu_df, on = 'Product_ID', how = 'inner')
    .groupBy('Product_Name').agg(F.count('Product_ID').alias('Total_Orders'))
    .orderBy('Total_Orders', ascending = False)
)

total_orders_by_category.display()

Product_Name,Total_Orders
sandwich,48
Chowmin,24
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

### Top 3 Ordered Items

In [0]:
top_3_ordered_items = (
    sales_df.join(menu_df, on = 'Product_ID', how = 'inner')
    .groupBy('Product_Name').agg(F.count('Product_ID').alias('Total_Orders'))
    .orderBy('Total_Orders', ascending = False)
)

top_3_ordered_items.limit(3).display()

Product_Name,Total_Orders
sandwich,48
Chowmin,24
Dosa,12


Databricks visualization. Run in Databricks to view.

### Most Ordered Item

In [0]:
most_ordered_item = (
    sales_df.join(menu_df, "Product_ID", "inner")
    .groupBy('Product_Name').agg(F.count('Product_ID').alias('Total_Order'))
    .orderBy('Total_Order', ascending = False)
    .limit(1)
)

most_ordered_item.display()

Product_Name,Total_Order
sandwich,48


Databricks visualization. Run in Databricks to view.

### Frequency of Customers Visited to Restaurant

In [0]:
from pyspark.sql.functions import countDistinct

frequency_of_customers = (
    sales_df.filter(col('Source_Order') == 'Restaurant')
    .groupBy('Customer_ID').agg(F.countDistinct('Order_Date').alias('Frequency'))
) 

frequency_of_customers.display()

Customer_ID,Frequency
A,6
C,3
B,6
E,5
D,1


Databricks visualization. Run in Databricks to view.

### Total Sales by Country

In [0]:
total_sales_by_country = (
    sales_df.join(menu_df, "Product_ID", "inner")
    .groupBy('Location').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Location')
)

total_sales_by_country.display()

Location,Total_Sales
India,3960.0
UK,6120.0
USA,2160.0


Databricks visualization. Run in Databricks to view.

### Total Sales by Order_Source

In [0]:
sales_by_order_source = (
    sales_df.join(menu_df, 'Product_ID', "inner")
    .groupBy('Source_Order').agg(F.sum('Price').alias('Total_Sales'))
    .orderBy('Source_Order')
)

sales_by_order_source.display()

Source_Order,Total_Sales
Restaurant,2490.0
Swiggy,4830.0
zomato,4920.0


Databricks visualization. Run in Databricks to view.