In [0]:
%scala
// /dbfs/FileStore/tables/sales_csv.txt
// /dbfs/FileStore/tables/menu_csv.txt

## Sales Dataframe

In [0]:
%python
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType

schema = StructType([
  StructField("product_id",IntegerType(), True),
  StructField("customer_id",StringType(), True),
  StructField("order_date",DateType(), True),
  StructField("location",StringType(), True),
  StructField("source_order",StringType(), True)
])

sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("dbfs:/FileStore/tables/sales_csv.txt")

In [0]:
%python
sales_df.display()

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


#### Adding year month quater 

In [0]:
from pyspark.sql.functions import month, year, quarter

sales_df = sales_df.withColumn("order_year",year(sales_df.order_date)).withColumn("order_month", month(sales_df.order_date)).withColumn("order_quarter", quarter(sales_df.order_date))


In [0]:
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


## Menu DataFrame

In [0]:
%python
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DateType

schema = StructType([
  StructField("product_id",IntegerType(), True),
  StructField("product_name",StringType(), True),
  StructField("price",StringType(), True)
])

menu_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("dbfs:/FileStore/tables/menu_csv.txt")

In [0]:
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


# KPI 

In [0]:
# Joining both the dataframe 

df = sales_df.join(menu_df, sales_df.product_id == menu_df.product_id )
display(df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter,product_id.1,product_name,price
1,A,2023-01-01,India,Swiggy,2023,1,1,1,PIZZA,100
2,A,2022-01-01,India,Swiggy,2022,1,1,2,Chowmin,150
2,A,2023-01-07,India,Swiggy,2023,1,1,2,Chowmin,150
3,A,2023-01-10,India,Restaurant,2023,1,1,3,sandwich,120
3,A,2022-01-11,India,Swiggy,2022,1,1,3,sandwich,120
3,A,2023-01-11,India,Restaurant,2023,1,1,3,sandwich,120
2,B,2022-02-01,India,Swiggy,2022,2,1,2,Chowmin,150
2,B,2023-01-02,India,Swiggy,2023,1,1,2,Chowmin,150
1,B,2023-01-04,India,Restaurant,2023,1,1,1,PIZZA,100
1,B,2023-02-11,India,Swiggy,2023,2,1,1,PIZZA,100


#### we need to convert price col from string to integer 

In [0]:
df = df.withColumn("price", df["price"].cast(IntegerType()))

In [0]:
display(df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter,product_id.1,product_name,price
1,A,2023-01-01,India,Swiggy,2023,1,1,1,PIZZA,100
2,A,2022-01-01,India,Swiggy,2022,1,1,2,Chowmin,150
2,A,2023-01-07,India,Swiggy,2023,1,1,2,Chowmin,150
3,A,2023-01-10,India,Restaurant,2023,1,1,3,sandwich,120
3,A,2022-01-11,India,Swiggy,2022,1,1,3,sandwich,120
3,A,2023-01-11,India,Restaurant,2023,1,1,3,sandwich,120
2,B,2022-02-01,India,Swiggy,2022,2,1,2,Chowmin,150
2,B,2023-01-02,India,Swiggy,2023,1,1,2,Chowmin,150
1,B,2023-01-04,India,Restaurant,2023,1,1,1,PIZZA,100
1,B,2023-02-11,India,Swiggy,2023,2,1,1,PIZZA,100


### Total Amount Spent by each Customer

In [0]:
total_amt_spend = (df.groupBy('customer_id')
              .sum('price')
              .orderBy('customer_id',ascending=True))
total_amt_spend.display()

customer_id,sum(price)
A,4260
B,4440
C,2400
D,1200
E,2040


Databricks visualization. Run in Databricks to view.

### Total Amount Spent by each food category

In [0]:
total_amt_spend_food_cat = (df.groupBy("product_name")
                            .sum("price")
                            .withColumnRenamed("sum(price)", "total_amount_spent")
                            .orderBy("total_amount_spent",ascending=True))
total_amt_spend_food_cat.display()

product_name,total_amount_spent
Biryani,480
Pasta,1080
Dosa,1320
PIZZA,2100
Chowmin,3600
sandwich,5760


Databricks visualization. Run in Databricks to view.

### Total Amount of sales in each month

In [0]:
total_amt_month = (df.groupBy("order_month")
                   .sum("price")
                   .withColumnRenamed("sum(price)", "total_amount_month_wise")
                   )
total_amt_month.display()             

order_month,total_amount_month_wise
1,2960
6,2960
3,910
5,2960
7,910
11,910
2,2730


Databricks visualization. Run in Databricks to view.

### Yearly Sales 

In [0]:
total_amt_year = (df.groupBy("order_year")
                   .sum("price")
                   .withColumnRenamed("sum(price)", "total_amount_month_wise")
                   )
total_amt_year.display()       

order_year,total_amount_month_wise
2023,9990
2022,4350


Databricks visualization. Run in Databricks to view.

### Quaterly Sales 

In [0]:
total_amt_quarter = (df.groupBy("order_quarter")
                   .sum("price")
                   .withColumnRenamed("sum(price)", "total_amount_quarter_wise")
                   .orderBy("order_quarter",acsending=True)
                   )
total_amt_quarter.display()       

order_quarter,total_amount_quarter_wise
1,6600
2,5920
3,910
4,910


Databricks visualization. Run in Databricks to view.

### how many times each products purchased

In [0]:
df.show(n=2)


+----------+-----------+----------+--------+------------+----------+-----------+-------------+----------+------------+-----+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|product_id|product_name|price|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+----------+------------+-----+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|         1|       PIZZA|  100|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|         2|     Chowmin|  150|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+----------+------------+-----+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import count

most_df = (df.groupBy("product_name")
           .agg(count("product_name").alias("count_product_name"))
           .orderBy("count_product_name",ascending=False)
           )

most_df.display()

product_name,count_product_name
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


Databricks visualization. Run in Databricks to view.

### Top 5 ordered items

In [0]:
most_df = (df.groupBy("product_name")
           .agg(count("product_name").alias("count_product_name"))
           .orderBy("count_product_name",ascending=False)
           .limit(5)
           )

most_df.display()

product_name,count_product_name
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


### Top ordered items

In [0]:
most_df = (df.groupBy("product_name")
           .agg(count("product_name").alias("count_product_name"))
           .orderBy("count_product_name",ascending=False)
           .limit(1)
           )

most_df.display()

product_name,count_product_name
sandwich,48


Databricks visualization. Run in Databricks to view.

### frequecy of customer visisted 

In [0]:
display(df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter,product_id.1,product_name,price
1,A,2023-01-01,India,Swiggy,2023,1,1,1,PIZZA,100
2,A,2022-01-01,India,Swiggy,2022,1,1,2,Chowmin,150
2,A,2023-01-07,India,Swiggy,2023,1,1,2,Chowmin,150
3,A,2023-01-10,India,Restaurant,2023,1,1,3,sandwich,120
3,A,2022-01-11,India,Swiggy,2022,1,1,3,sandwich,120
3,A,2023-01-11,India,Restaurant,2023,1,1,3,sandwich,120
2,B,2022-02-01,India,Swiggy,2022,2,1,2,Chowmin,150
2,B,2023-01-02,India,Swiggy,2023,1,1,2,Chowmin,150
1,B,2023-01-04,India,Restaurant,2023,1,1,1,PIZZA,100
1,B,2023-02-11,India,Swiggy,2023,2,1,1,PIZZA,100


In [0]:
from pyspark.sql.functions import countDistinct

freq_cust_visited = (
    df.filter(df.source_order == "Restaurant")
            .groupBy("customer_id")
            .agg(countDistinct("order_date")))
freq_cust_visited.display()

customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

### total sales by each country 

In [0]:
df.columns

Out[75]: ['product_id',
 'customer_id',
 'order_date',
 'location',
 'source_order',
 'order_year',
 'order_month',
 'order_quarter',
 'product_id',
 'product_name',
 'price']

In [0]:
total_sales_country = (df.groupBy("location")
                       .sum("price")
                       .withColumnRenamed("sum(price)","total_amount_country"))
total_sales_country.display()

location,total_amount_country
India,4860
USA,2460
UK,7020


Databricks visualization. Run in Databricks to view.

### Total sales by order source

In [0]:
df.columns

Out[79]: ['product_id',
 'customer_id',
 'order_date',
 'location',
 'source_order',
 'order_year',
 'order_month',
 'order_quarter',
 'product_id',
 'product_name',
 'price']

In [0]:
total_sales_source_order = (df.groupBy("source_order")
                       .sum("price")
                       .withColumnRenamed("sum(price)","total_amount_source_order"))
total_sales_source_order.display()

source_order,total_amount_source_order
zomato,4920
Swiggy,6330
Restaurant,3090


Databricks visualization. Run in Databricks to view.