In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *

In [0]:
# File uploaded to /FileStore/tables/sales_csv.txt
# File uploaded to /FileStore/tables/menu_csv.txt
schema1 = StructType([
    StructField('Product_id',IntegerType(), True),
    StructField('Customer_id',StringType(), True),
    StructField('Order_date',DateType(), True),
    StructField('Location',StringType(), True),
    StructField('Source_order',StringType(), True)
])

schema2 = StructType([
    StructField('Product_id', IntegerType(), True),
    StructField('Product_name', StringType(), True),
    StructField('Price', StringType(), True)
    
])

df1 = spark.read.format('csv').option('inferschema', True).schema(schema1).load('/FileStore/tables/sales_csv.txt')
df2 = spark.read.format('csv').option('inferschema', True).schema(schema2).load('/FileStore/tables/menu_csv.txt')

In [0]:
df1.display()
df2.display()

Product_id,Customer_id,Order_date,Location,Source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


Product_id,Product_name,Price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
sales_df = df1.withColumn('order_year',year(col("Order_date")))
sales_df = sales_df.withColumn('order_month',month(col("Order_date")))
sales_df = sales_df.withColumn('order_quarter',quarter(col("Order_date")))
sales_df.display()
df2.display()

Product_id,Customer_id,Order_date,Location,Source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


Product_id,Product_name,Price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# Total amount spent by each customers
total_amount_spent = (sales_df.join(df2,'Product_id').groupBy('Customer_id').agg(sum('price').alias('total')).orderBy('Customer_id'))
total_amount_spent.display()


Customer_id,total
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


In [0]:
# Total amount spent by each category
total_amount_spent_category = sales_df.join(df2,'Product_id').groupBy('Product_name').agg(sum('price')).orderBy('Product_name')
total_amount_spent_category.display()


Product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


In [0]:
# Total amount spent in each month
total_spent_month = sales_df.join(df2,'Product_id').groupBy('order_month').agg(sum('price').alias('total_spent')).orderBy('order_month')
total_spent_month.display()


order_month,sum(price)
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


In [0]:
# Total amount spent in each year
total_spent_year = sales_df.join(df2,'Product_id').groupBy('order_year').agg(sum('price').alias('total_spent')).orderBy('order_year')
total_spent_year.display()

order_year,total_spent
2022,4350.0
2023,9990.0


In [0]:
# Total amount spent in each quaterly
total_spent_quater = sales_df.join(df2,'Product_id').groupBy('order_quarter').agg(sum('price').alias('total_spent')).orderBy('order_quarter')
total_spent_quater.display()

order_quarter,total_spent
1,6600.0
2,5920.0
3,910.0
4,910.0


In [0]:
# how many time each item purches
total_order_catagory = (
    sales_df.join(df2,'Product_id')
    .groupBy('Product_name')
    .agg(count('product_id').alias('item_cnt'))
    .orderBy(col('item_cnt').asc())
    )
total_order_catagory.display()

Product_name,item_cnt
Pasta,6
Biryani,6
Dosa,12
PIZZA,21
Chowmin,24
sandwich,48


In [0]:
    # top 5 ordered item
top_five_item = (
    sales_df
    .join(df2, "Product_id")
    .groupBy("Product_name",'Product_id')
    .agg(count("Product_id").alias("product_count"))
    .orderBy(col("product_count").desc())
    .limit(5)
)
top_five_item.display()

Product_name,Product_id,product_count
sandwich,3,48
Chowmin,2,24
PIZZA,1,21
Dosa,4,12
Pasta,6,6


In [0]:
# frequncy of customer visited
frequncy_customer_visited = sales_df.groupBy(col('Customer_id'),col('Location')).agg(count('Customer_id')).orderBy(col("Customer_id"))
frequncy_customer_visited.display()


Customer_id,Location,count(Customer_id)
A,UK,15
A,India,18
B,UK,18
B,India,18
C,UK,9
C,India,3
C,USA,6
D,USA,6
D,UK,6
E,UK,9


In [0]:
# Top orderd item
top_ordered_item = (sales_df.join(df2,'product_id')
    .groupBy('Product_id','Product_name')
    .agg(count('Product_id').alias('total'))
    .orderBy(col('total').desc())
    .limit(1)
    
)
top_ordered_item.display()

Product_id,Product_name,total
3,sandwich,48


In [0]:
# Total sales by each country
total_sale_country = (sales_df.join(df2,'Product_id')
                      .groupBy('Location')
                      .agg(sum('price').alias('total'))
                      .orderBy(col('total').desc())
                      )
total_sale_country.display()

Location,total
UK,7020.0
India,4860.0
USA,2460.0


In [0]:
# Total sales by each order_source
total_sale_order_source = (
    sales_df.join(df2,'Product_id')
    .groupBy('Source_order')
    .agg(sum('price').alias('total'))
    .orderBy(col('total').desc())
    )
total_sale_order_source.display()

Source_order,total
Swiggy,6330.0
zomato,4920.0
Restaurant,3090.0
