In [0]:
#/FileStore/tables/sales_csv.txt --sales
#/FileStore/tables/menu_csv.txt --menu

In [0]:
   #Sales DataFrame

   from pyspark.sql.types import StructType,StructField,IntegerType,DateType,StringType
   Schema = StructType([
       StructField("Product_id",IntegerType(),True),
       StructField("Customer_id",StringType(),True),
       StructField("Order_date",DateType(),True),
       StructField("Location",StringType(),True),
       StructField("Source_order",StringType(),True)
   ])

   SalesDf = spark.read.format("csv").option("inferschema","true").schema(Schema).load("/FileStore/tables/sales_csv.txt")
   display(SalesDf)



Product_id,Customer_id,Order_date,Location,Source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
# Derive

from pyspark.sql.functions import month,year,quarter

SalesDf = SalesDf.withColumn("Order_year",year(SalesDf.Order_date))
SalesDf = SalesDf.withColumn("Order_month",month(SalesDf.Order_date))
SalesDf = SalesDf.withColumn("Order_quarter",quarter(SalesDf.Order_date))

display(SalesDf)

Product_id,Customer_id,Order_date,Location,Source_order,Order_year,Order_month,Order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
#Menu DF

Schema = StructType([
       StructField("Product_id",IntegerType(),True),
       StructField("Product_Name",StringType(),True),
       StructField("Price",StringType(),True)
   ])

MenuDf = spark.read.format("csv").option("inferschema","true").schema(Schema).load("/FileStore/tables/menu_csv.txt")
display(MenuDf)

Product_id,Product_Name,Price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
total_amount_spend = (SalesDf.join(MenuDf,'Product_id').groupBy('Customer_id').agg({'Price':'sum'}).orderBy('Customer_id'))
display(total_amount_spend)

Customer_id,sum(Price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_per_food = (SalesDf.join(MenuDf,'Product_id').groupBy('Product_name').agg({'Price':'sum'}).orderBy('Product_name'))
display(total_sales_per_food)

Product_name,sum(Price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_in_each_month= (SalesDf.join(MenuDf,'Product_id').groupBy('Order_month').agg({'Price':'sum'}))
display(total_sales_in_each_month)

Order_month,sum(Price)
1,2960.0
6,2960.0
3,910.0
5,2960.0
7,910.0
11,910.0
2,2730.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_in_each_year= (SalesDf.join(MenuDf,'Product_id').groupBy('Order_year').agg({'Price':'sum'}))
display(total_sales_in_each_year)

Order_year,sum(Price)
2023,9990.0
2022,4350.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_in_each_Quarter= (SalesDf.join(MenuDf,'Product_id').groupBy('Order_quarter').agg({'Price':'sum'}).orderBy('Order_quarter'))
display(total_sales_in_each_Quarter)

Order_quarter,sum(Price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count
item_got_purchased = (SalesDf.join(MenuDf,'Product_id').groupBy('Product_id','Product_name').agg(count('Product_id').alias('Product_count')).orderBy('Product_count',ascending = 0))
display(item_got_purchased)

Product_id,Product_name,Product_count
3,sandwich,48
2,Chowmin,24
1,PIZZA,21
4,Dosa,12
5,Biryani,6
6,Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
5item_got_purchased = (SalesDf.join(MenuDf,'Product_id').groupBy('Product_id','Product_name').agg(count('Product_id').alias('Product_count')).orderBy('Product_count',ascending = 0)
                      .drop('product_id')
                      .limit(5))
display(5item_got_purchased)

[0;36m  File [0;32m<command-1558819179496857>:1[0;36m[0m
[0;31m    5item_got_purchased = (SalesDf.join(MenuDf,'Product_id').groupBy('Product_id','Product_name').agg(count('Product_id').alias('Product_count')).orderBy('Product_count',ascending = 0)[0m
[0m     ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
top_item_purchased = (SalesDf.join(MenuDf,'Product_id').groupBy('Product_id','Product_name').agg(count('Product_id').alias('Product_count')).orderBy('Product_count',ascending = 0)
                      .drop('product_id')
                      .limit(1))
display(top_item_purchased)



In [0]:
from pyspark.sql.functions import countDistinct

Restrovisit = (SalesDf.filter(SalesDf.Source_order=='Restaurant').groupBy('Customer_id').agg(countDistinct('Order_date')).orderBy('Customer_id'))
display(Restrovisit)



In [0]:
sales_by_country= (SalesDf.join(MenuDf,'Product_id').groupBy('Location').agg({'Price':'sum'}).orderBy('Location'))
display(sales_by_country)



In [0]:
sales_by_source= (SalesDf.join(MenuDf,'Product_id').groupBy('Source_order').agg({'Price':'sum'}))
display(sales_by_source)

