In [0]:
# Importing Libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('customer_id', StringType(), True),
    StructField('order_date', DateType(), True),
    StructField('location', StringType(), True),
    StructField('source_order', StringType(), True),
])

In [0]:
# Creating a DataFrame(Sales)
sales_df = spark.read.csv('/FileStore/tables/sales_csv.txt', schema = schema,header = False)
display(sales_df)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


#Extracting Year, Month, Quater


In [0]:
# importing Neccessary Libraries
from pyspark.sql.functions import month, year, quarter

sales_df = sales_df.withColumn('order_year', year(sales_df.order_date))\
    .withColumn('order_month', month(sales_df.order_date))\
    .withColumn('order_quarter', quarter(sales_df.order_date))

display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
#Creating a DataFrame(menu)

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

schema1 = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('product_name', StringType(), True),
    StructField('Price', DoubleType(), True)
])

menu_df = spark.read.csv('/FileStore/tables/menu_csv.txt', schema = schema1,header = False)
display(menu_df)

product_id,product_name,Price
1,PIZZA,100.0
2,Chowmin,150.0
3,sandwich,120.0
4,Dosa,110.0
5,Biryani,80.0
6,Pasta,180.0


#Total amount spent by each customer

In [0]:
from pyspark.sql import functions as F

total_amount = (sales_df.join(menu_df, 'product_id')\
    .groupby('customer_id')\
        .agg(F.sum('price').alias('total_amount'))\
            .orderBy('customer_id'))

display(total_amount)

customer_id,total_amount
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


#Total amount spent by each food category 

In [0]:
total_amount_category = (sales_df.join(menu_df, 'product_id')\
    .groupby('product_name')\
        .agg(F.sum('price').alias('total_amount'))\
            .orderBy('product_name'))

display(total_amount_category)

product_name,total_amount
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

#Total amount of sales each month

In [0]:
total_sales_by_Month = (sales_df.join(menu_df, 'product_id')\
    .groupby('order_month')\
        .agg(F.sum('price').alias('total_sales'))\
            .orderBy('order_month'))

display(total_sales_by_Month)

order_month,total_sales
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

#Calculating Yearly Sales

In [0]:
df1 = (sales_df.join(menu_df, 'product_id').groupby('order_year').agg(F.sum('price')).orderBy('order_year'))
display(df1)

order_year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

#Calculating Quarterly Sales

In [0]:
df2 = (sales_df.join(menu_df, 'product_id').groupby('order_quarter').agg(F.sum('price')).orderBy('order_quarter'))
display(df2)

order_quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

#How many times each Product purchased

In [0]:
product_counts = (sales_df.join(menu_df, 'product_id')\
    .groupby('product_id', 'product_name')\
        .agg(F.count('product_id').alias('product_count'))\
            .orderBy('product_count',ascending= False))

display(product_counts)

product_id,product_name,product_count
3,sandwich,48
2,Chowmin,24
1,PIZZA,21
4,Dosa,12
5,Biryani,6
6,Pasta,6


Databricks visualization. Run in Databricks to view.

#Frequency of the customer visiting restuarant

In [0]:
from pyspark.sql.functions import countDistinct
df = sales_df.filter(sales_df.source_order == 'Restaurant')\
    .groupby('customer_id')\
        .agg(countDistinct('order_date'))\
            .orderBy('customer_id')
display(df)

customer_id,count(order_date)
A,6
B,6
C,3
D,1
E,5


Databricks visualization. Run in Databricks to view.

#Total sales by each country 


In [0]:
total_sales_country = (sales_df.join(menu_df, 'product_id')\
    .groupby('location')\
        .agg(F.sum('price').alias('total'))\
            .orderBy('location'))

display(total_sales_country)

location,total
India,4860.0
UK,7020.0
USA,2460.0


Databricks visualization. Run in Databricks to view.