In [0]:
#/FileStore/tables/menu_csv-2.txt

#/FileStore/tables/sales_csv-2.txt

In [0]:
#from pyspark.context import SparkContext
#from pyspark.sql.session import SparkSession
#sc = SparkContext('local')
#spark = SparkSession(sc)

In [0]:
spark = SparkSession.builder.appName("Sales_Analysis").getOrCreate()

# Sales Dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema= StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

sales_df=spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/sales_csv-2.txt")
display(sales_df)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


# Deriving year, month, quarter

In [0]:
from pyspark.sql.functions import month,year,quarter

sales_df=sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter",quarter(sales_df.order_date))
sales_df=sales_df.withColumn("order_year",year(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_month,order_quarter,order_year
1,A,2023-01-01,India,Swiggy,1,1,2023
2,A,2022-01-01,India,Swiggy,1,1,2022
2,A,2023-01-07,India,Swiggy,1,1,2023
3,A,2023-01-10,India,Restaurant,1,1,2023
3,A,2022-01-11,India,Swiggy,1,1,2022
3,A,2023-01-11,India,Restaurant,1,1,2023
2,B,2022-02-01,India,Swiggy,2,1,2022
2,B,2023-01-02,India,Swiggy,1,1,2023
1,B,2023-01-04,India,Restaurant,1,1,2023
1,B,2023-02-11,India,Swiggy,2,1,2023


# Menu Dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema= StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("price", StringType(), True),

])

menu_df=spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/menu_csv-2.txt")
display(menu_df)

product_id,customer_id,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


# Total Amount spent by each customer

In [0]:
from pyspark.sql.functions import sum

# Assuming menu_df has a column named 'price'
total_amount_spent_each_customer = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.customer_id") \
    .agg(sum("m.price").alias("total_spent")) \
    .orderBy("s.customer_id")

display(total_amount_spent_each_customer)

customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


# Total Amount spent by each food category (Product_id)

In [0]:
from pyspark.sql.functions import sum

# Assuming menu_df has a column named 'price'
total_amount_spent_food_category = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.product_id") \
    .agg(sum("m.price").alias("total_spent")) \
    .orderBy("s.product_id")

display(total_amount_spent_food_category)

product_id,total_spent
1,2100.0
2,3600.0
3,5760.0
4,1320.0
5,480.0
6,1080.0


# Total Amount of sales in each month

In [0]:
from pyspark.sql.functions import sum

# Assuming menu_df has a column named 'price'
total_amount_spent_each_month = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.order_month") \
    .agg(sum("m.price").alias("total_spent")) \
    .orderBy("s.order_month")

display(total_amount_spent_each_month)

order_month,total_spent
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


# Total Amount of sales in each quarter

In [0]:
from pyspark.sql.functions import sum

# Assuming menu_df has a column named 'price'
total_amount_spent_each_quarter = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.order_quarter") \
    .agg(sum("m.price").alias("total_spent")) \
    .orderBy("s.order_quarter")

display(total_amount_spent_each_quarter)

order_quarter,total_spent
1,6600.0
2,5920.0
3,910.0
4,910.0


# Total Amount of sales in each year

In [0]:
from pyspark.sql.functions import sum

# Assuming menu_df has a column named 'price'
total_amount_spent_each_year = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.order_year") \
    .agg(sum("m.price").alias("total_spent")) \
    .orderBy("s.order_year")

display(total_amount_spent_each_year)

order_year,total_spent
2022,4350.0
2023,9990.0


# How many times each product purchased

In [0]:
from pyspark.sql.functions import count

# Correcting the aggregation logic
total_purchase_each_product = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.product_id") \
    .agg(count("m.product_id").alias("total_purchases")) \
    .orderBy("total_purchases", ascending=False)

display(total_purchase_each_product)


product_id,total_purchases
3,48
2,24
1,21
4,12
6,6
5,6


# Top 5 ordered items

In [0]:
from pyspark.sql.functions import count

# Correcting the aggregation logic
top_5_product = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.product_id") \
    .agg(count("m.product_id").alias("total_purchases")) \
    .orderBy("total_purchases", ascending=False) \
    .limit(5)

display(top_5_product)


product_id,total_purchases
3,48
2,24
1,21
4,12
6,6


# Top ordered item

In [0]:
from pyspark.sql.functions import count

# Correcting the aggregation logic
top_product = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.product_id") \
    .agg(count("m.product_id").alias("total_purchases")) \
    .orderBy("total_purchases", ascending=False) \
    .limit(1)

display(top_product)

product_id,total_purchases
3,48


# Frequency of the customer visited to restaurant

In [0]:
from pyspark.sql.functions import countDistinct

# Correcting the aggregation logic
Frequency_customer_visit = (sales_df.filter(sales_df.source_order=='Restaurant') \
    .groupBy("customer_id") \
    .agg(countDistinct("order_date").alias("Frequency_customer_visit"))\
    .orderBy("Frequency_customer_visit", ascending=False))

display(Frequency_customer_visit)

customer_id,Frequency_customer_visit
B,6
A,6
E,5
C,3
D,1


# Top Sales by each country

In [0]:
from pyspark.sql.functions import sum

# Correcting the aggregation logic
Top_sales_countrywise = sales_df.alias("s").join(menu_df.alias("m"), "product_id") \
    .groupBy("s.location") \
    .agg(sum("m.price").alias("Top_sales_countrywise")) \
    .orderBy("Top_sales_countrywise", ascending=False) \

display(Top_sales_countrywise)

location,Top_sales_countrywise
UK,7020.0
India,4860.0
USA,2460.0
