- /FileStore/tables/sales_csv.txt 

- /FileStore/tables/menu_csv.txt

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
spark  = SparkSession.builder.getOrCreate()

In [0]:
spark

In [0]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Data Frames Creation


### DF1

In [0]:
df1schema= StructType(
                       [
                        StructField("Product_id", IntegerType(),False),
                        StructField("Customer_id", StringType(),False),
                        StructField("Order_date", DateType(),False),
                        StructField("Location", StringType(),False),   
                        StructField("Source_Order", StringType(),False)
                       ]
                    )


In [0]:
df1 = spark.read.csv("/FileStore/tables/sales_csv.txt", schema=df1schema, header=True)
df1.show()

+----------+-----------+----------+--------+------------+
|Product_id|Customer_id|Order_date|Location|Source_Order|
+----------+-----------+----------+--------+------------+
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|          C|2022-01-07|      UK|      zomato|
|         3|  

In [0]:
df1.printSchema()

root
 |-- Product_id: integer (nullable = true)
 |-- Customer_id: string (nullable = true)
 |-- Order_date: date (nullable = true)
 |-- Location: string (nullable = true)
 |-- Source_Order: string (nullable = true)



In [0]:
df1 = df1.withColumn("order_year", year(df1["Order_date"]))
df1 = df1.withColumn("order_month", month(df1["Order_date"]))
df1 = df1.withColumn("order_quarter", quarter(df1["Order_date"]))

In [0]:
df1.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|Product_id|Customer_id|Order_date|Location|Source_Order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|          1|            1|
|         1|          B|2023-01-04|   India|  Rest

### DF2

In [0]:
myschema = StructType(
                [
                    StructField("Product_id",IntegerType(), False),
                    StructField("Product_name",StringType(), False),
                    StructField("Price",StringType(), False)
                ]
    )

In [0]:
df2 = spark.read.csv(f"/FileStore/tables/menu_csv.txt", schema=myschema)

In [0]:
df2.show()

+----------+------------+-----+
|Product_id|Product_name|Price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



# KPIs Extraction

### 1- Total Amount Spent by Each Customer

In [0]:
joined_df = df1.join(df2, on="Product_id")
joined_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+------------+-----+
|Product_id|Customer_id|Order_date|Location|Source_Order|order_year|order_month|order_quarter|Product_name|Price|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+------------+-----+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|     Chowmin|  150|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|     Chowmin|  150|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|    sandwich|  120|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|    sandwich|  120|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|    sandwich|  120|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|        

In [0]:
TotalByCustomer = joined_df.groupBy(joined_df.Customer_id).agg(sum(joined_df.Price))
display(TotalByCustomer)

DataFrame[Customer_id: string, sum(Price): double]

### 2- Total Amount Spent by Each Customer on Each Food Category

In [0]:
TotalByCustomerPerProduct = joined_df.groupBy([joined_df.Customer_id, joined_df.Product_name]).agg(sum(joined_df.Price).alias("Total"))
TotalByCustomerPerProduct = TotalByCustomerPerProduct.sort([TotalByCustomerPerProduct.Customer_id,TotalByCustomerPerProduct.Product_name])
TotalByCustomerPerProduct.show()

+-----------+------------+------+
|Customer_id|Product_name| Total|
+-----------+------------+------+
|          A|     Chowmin|1800.0|
|          A|       PIZZA| 200.0|
|          A|    sandwich|2160.0|
|          B|     Chowmin|1800.0|
|          B|       PIZZA|1200.0|
|          B|    sandwich|1440.0|
|          C|       PIZZA| 600.0|
|          C|       Pasta|1080.0|
|          C|    sandwich| 720.0|
|          D|     Biryani| 480.0|
|          D|    sandwich| 720.0|
|          E|        Dosa|1320.0|
|          E|    sandwich| 720.0|
+-----------+------------+------+



### 3- Total Amount of Sales Per Month

In [0]:
Sales_per_month=joined_df.groupBy((joined_df.order_month).alias("Month")).agg(sum(joined_df.Price).alias("Total")).sort(joined_df.order_month)
Sales_per_month.show()

+-----+------+
|Month| Total|
+-----+------+
|    1|2860.0|
|    2|2730.0|
|    3| 910.0|
|    5|2960.0|
|    6|2960.0|
|    7| 910.0|
|   11| 910.0|
+-----+------+



### 4-Yearly Sales

In [0]:
Sales_per_year=joined_df.groupBy((joined_df.order_year).alias("Year")).agg(sum(joined_df.Price).alias("Total")).sort(joined_df.order_year)
Sales_per_year.show()

+----+------+
|Year| Total|
+----+------+
|2022|4350.0|
|2023|9890.0|
+----+------+



### 5-Quarterly Sales

In [0]:
Sales_per_quarter=joined_df.groupBy((joined_df.order_quarter).alias("Quarter")).agg(sum(joined_df.Price).alias("Total")).sort(joined_df.order_quarter)
Sales_per_quarter.show()

+-------+------+
|Quarter| Total|
+-------+------+
|      1|6500.0|
|      2|5920.0|
|      3| 910.0|
|      4| 910.0|
+-------+------+



### 6- Total Number Of Orders by Each Category

In [0]:
noOrdersPerProduct = joined_df.groupBy(joined_df.Product_name).agg(count(joined_df.Price).alias("Number Of Orders"))
noOrdersPerProduct = noOrdersPerProduct.sort(noOrdersPerProduct["Number Of Orders"].desc())
noOrdersPerProduct.show()

+------------+----------------+
|Product_name|Number Of Orders|
+------------+----------------+
|    sandwich|              48|
|     Chowmin|              24|
|       PIZZA|              20|
|        Dosa|              12|
|       Pasta|               6|
|     Biryani|               6|
+------------+----------------+



### 7- TOP 5 Ordered Items

In [0]:
noOrdersPerProduct_top5 = joined_df.groupBy(joined_df.Product_name).agg(count(joined_df.Price).alias("Number Of Orders"))
noOrdersPerProduct_top5 = noOrdersPerProduct_top5.sort(noOrdersPerProduct_top5["Number Of Orders"].desc())
noOrdersPerProduct_top5 = noOrdersPerProduct_top5.limit(5)
noOrdersPerProduct.show()

+------------+----------------+
|Product_name|Number Of Orders|
+------------+----------------+
|    sandwich|              48|
|     Chowmin|              24|
|       PIZZA|              20|
|        Dosa|              12|
|       Pasta|               6|
|     Biryani|               6|
+------------+----------------+



### 8- Customer Visit Frequency

In [0]:
customer_freq = joined_df.groupBy(joined_df.Customer_id).agg(countDistinct(joined_df.Order_date).alias("Number Of Visits"))
customer_freq = customer_freq.sort(customer_freq["Number Of Visits"].desc())
customer_freq.show()

+-----------+----------------+
|Customer_id|Number Of Visits|
+-----------+----------------+
|          B|              22|
|          A|              16|
|          E|              14|
|          D|              10|
|          C|               9|
+-----------+----------------+



### 9- Total Sales Per Country

In [0]:
Sales_per_month=joined_df.groupBy((joined_df.Location).alias("Country")).agg(sum(joined_df.Price).alias("Total")).sort(joined_df.Location)
Sales_per_month.show()

+-------+------+
|Country| Total|
+-------+------+
|  India|4760.0|
|     UK|7020.0|
|    USA|2460.0|
+-------+------+



### 10- Total Sales Per Order_Source

In [0]:
Sales_per_month=joined_df.groupBy((joined_df.Source_Order).alias("Order Source")).agg(sum(joined_df.Price).alias("Total")).sort(joined_df.Source_Order)
Sales_per_month.show()

+------------+------+
|Order Source| Total|
+------------+------+
|  Restaurant|3090.0|
|      Swiggy|6230.0|
|      zomato|4920.0|
+------------+------+

