In [0]:
# import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType
from pyspark.sql.functions import year, month, quarter, count,sum

In [0]:
# create SparkSession object
spark = SparkSession.builder.appName("End to End Pyspark Project").getOrCreate()

In [0]:
# Read sales DataFrame
schema = StructType([StructField("product_id",IntegerType()),
                     StructField("customer_id", StringType()),
                     StructField("order_date", DateType()),
                     StructField("location", StringType()),
                     StructField("source_order", StringType())])

sales_df = spark.read.schema(schema).format("csv").load("/FileStore/tables/sales.csv")
sales_df.show()

In [0]:
# Read menu DataFrame
schema = StructType([StructField("product_id", IntegerType()),
                     StructField("product_name", StringType()),
                     StructField("price", FloatType())])

menu_df = spark.read.schema(schema).format("csv").load("/FileStore/tables/menu.csv")
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|100.0|
|         2|     Chowmin|150.0|
|         3|    sandwich|120.0|
|         4|        Dosa|110.0|
|         5|     Biryani| 80.0|
|         6|       Pasta|180.0|
+----------+------------+-----+



In [0]:
# In some KPI's we need yearly, quarterly, and monthly sales, So we will create columns for those in our dataframe from order_date column
# for creating new column we use withColumn method

sales_df = sales_df.withColumn("year", year(sales_df.order_date))\
    .withColumn("month",month("order_date"))\
        .withColumn("quarter",quarter("order_date"))
sales_df.show()


+----------+-----------+----------+--------+------------+----+-----+-------+
|product_id|customer_id|order_date|location|source_order|year|month|quarter|
+----------+-----------+----------+--------+------------+----+-----+-------+
|         1|          A|2023-01-01|   India|      Swiggy|2023|    1|      1|
|         2|          A|2022-01-01|   India|      Swiggy|2022|    1|      1|
|         2|          A|2023-01-07|   India|      Swiggy|2023|    1|      1|
|         3|          A|2023-01-10|   India|  Restaurant|2023|    1|      1|
|         3|          A|2022-01-11|   India|      Swiggy|2022|    1|      1|
|         3|          A|2023-01-11|   India|  Restaurant|2023|    1|      1|
|         2|          B|2022-02-01|   India|      Swiggy|2022|    2|      1|
|         2|          B|2023-01-02|   India|      Swiggy|2023|    1|      1|
|         1|          B|2023-01-04|   India|  Restaurant|2023|    1|      1|
|         1|          B|2023-02-11|   India|      Swiggy|2023|    2|      1|

In [0]:
#Now we can go for the KPI's
# We can use Display instead of show to use the Visualization feature in Display

customer_spend = (sales_df.join(menu_df,"product_id").groupBy("customer_id").agg({"price":"sum"}).orderBy("sum(price)",ascending=False))
display(customer_spend)

customer_id,sum(price)
B,4440.0
A,4260.0
C,2400.0
E,2040.0
D,1200.0


Databricks visualization. Run in Databricks to view.

In [0]:
food_category_spend = (sales_df.join(menu_df,"product_id").groupBy("product_name","product_id").agg({"price":"sum"}).select("product_name","sum(price)"))
display(food_category_spend)

product_name,sum(price)
Chowmin,3600.0
PIZZA,2100.0
Pasta,1080.0
Biryani,480.0
sandwich,5760.0
Dosa,1320.0


Databricks visualization. Run in Databricks to view.

In [0]:
monthly_sales = (sales_df.join(menu_df,"product_id").groupBy("month").agg({"price":"sum"}).orderBy("month"))
display(monthly_sales)

month,sum(price)
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
yearly_sales = (sales_df.join(menu_df,"product_id").groupBy("year").agg({"price":"sum"}).orderBy("year"))
display(yearly_sales)

year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
quarterly_sales = (sales_df.join(menu_df,"product_id").groupBy("quarter").agg({"price":"sum"}).orderBy("quarter"))
display(quarterly_sales)

quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
# for using proper name for count of product id we will use alias, so need to import count instead of giving it as a string in .agg
categorical_order_count = (sales_df.join(menu_df,"product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count").select("product_name","product_count"))
display(categorical_order_count)

product_name,product_count
Biryani,6
Pasta,6
Dosa,12
PIZZA,21
Chowmin,24
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
# Same as above just need top 5 items but sort desc

top_5_items = (sales_df.join(menu_df,"product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=False).select("product_name","product_count").limit(5))
display(top_5_items)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
# Same as above, just limit 1 instead
top_item = (sales_df.join(menu_df,"product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=False).select("product_name","product_count").limit(1))
display(top_item)

product_name,product_count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
# We need count of order_dates per customer, but only for source_order = "Restaurant"

customer_visit_freq = (sales_df.filter(sales_df.source_order=="Restaurant").groupBy("customer_id").agg(count("order_date").alias("customer_visits"))
                       )
display(customer_visit_freq)                       

customer_id,customer_visits
E,6
B,6
D,3
C,3
A,9


Databricks visualization. Run in Databricks to view.

In [0]:
country_sales = (sales_df.join(menu_df,"product_id").groupBy("location").agg(sum("price").alias("total_sales")))
display(country_sales)

location,total_sales
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
order_source_sales = (sales_df.join(menu_df,"product_id").groupBy("source_order").agg(sum("price").alias("total_sales")))
display(order_source_sales)

source_order,total_sales
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.