In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import count

In [0]:
/FileStore/tables/menu_csv.txt

/FileStore/tables/sales_csv.txt


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),  
    StructField("location", StringType(), True),    
    StructField("source_ordered", StringType(), True) 
])


In [0]:
sales_data = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .schema(schema) \
    .load("/FileStore/tables/sales_csv.txt")

# Display the DataFrame
display(sales_data)


product_id,customer_id,order_date,location,source_ordered
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
from pyspark.sql.functions import month, year, quarter

sales_data = sales_data.withColumn("order_year", year(sales_data.order_date)) \
                       .withColumn("order_month", month(sales_data.order_date)) \
                       .withColumn("order_quarter", quarter(sales_data.order_date))
display(sales_data)

product_id,customer_id,order_date,location,source_ordered,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("order_price", StringType(), True)  
   
])

In [0]:
menu_data = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .schema(schema) \
    .load("/FileStore/tables/menu_csv.txt")

# Display the DataFrame
display(menu_data)

product_id,product_name,order_price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
display(sales_data)

product_id,customer_id,order_date,location,source_ordered,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
data = (
    menu_data
    .join(sales_data, "product_id", "inner")
    .select(
        "product_id",
        "product_name",
        "order_price",
        "customer_id",
        "order_date",
        "location",
        "source_ordered",
        "order_year",
        "order_month",
        "order_quarter"
    )
)

In [0]:
TotalAmt_spent = data.groupBy("customer_id").agg(F.sum("order_price").alias("total_spent"))

TotalAmt_spent = TotalAmt_spent.orderBy(F.col("total_spent").desc())

display(TotalAmt_spent)

customer_id,total_spent
B,4440.0
A,4260.0
C,2400.0
E,2040.0
D,1200.0


Databricks visualization. Run in Databricks to view.

In [0]:
TotalAmt_cat = data.groupBy("product_name").agg(F.sum("order_price").alias("total_spent"))

TotalAmt_cat = TotalAmt_cat.orderBy(F.col("total_spent").desc())

display(TotalAmt_cat)

product_name,total_spent
sandwich,5760.0
Chowmin,3600.0
PIZZA,2100.0
Dosa,1320.0
Pasta,1080.0
Biryani,480.0


Databricks visualization. Run in Databricks to view.

In [0]:
Totalmth_sales = data.groupBy("order_month").agg(F.sum("order_price"))

display(Totalmth_sales)

order_month,sum(order_price)
1,2960.0
6,2960.0
3,910.0
5,2960.0
7,910.0
11,910.0
2,2730.0


Databricks visualization. Run in Databricks to view.

In [0]:
yearly_sales = data.groupBy("order_year").agg(F.sum("order_price"))

display(yearly_sales)

order_year,sum(order_price)
2023,9990.0
2022,4350.0


Databricks visualization. Run in Databricks to view.

In [0]:
quarter_sales = data.groupBy("order_quarter").agg(F.sum("order_price"))

display(quarter_sales)

order_quarter,sum(order_price)
1,6600.0
3,910.0
4,910.0
2,5920.0


Databricks visualization. Run in Databricks to view.

In [0]:
pro_purchase = data.groupBy("product_name", "product_id") \
                   .agg(F.count("product_id").alias("product_count")) \
                   .orderBy(F.desc("product_count")) \
                   .drop("product_id") 

display(pro_purchase)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
top5 = data.groupBy("product_name", "product_id") \
                   .agg(F.count("product_id").alias("product_count")) \
                   .orderBy(F.desc("product_count")) \
                   .drop("product_id").limit(5) 

display(top5)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


In [0]:
top = data.groupBy("product_name", "product_id") \
                   .agg(F.count("product_id").alias("product_count")) \
                   .orderBy(F.desc("product_count")) \
                   .drop("product_id").limit(1) 

display(top)

product_name,product_count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
country_sales = data.groupBy("location").agg(F.sum("order_price"))

display(country_sales)

location,sum(order_price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
Swiggy=(sales_data.filter(sales_data.source_ordered=="Swiggy").groupBy("customer_id").agg(countDistinct("order_date")))

display(Swiggy)

customer_id,count(order_date)
E,6
B,16
C,3
A,12


Databricks visualization. Run in Databricks to view.

In [0]:
order_source = data.groupBy("source_ordered").agg(F.sum("order_price").alias("total_order_price"))

display(order_source)

source_ordered,total_order_price
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.

In [0]:
display(data)

product_id,product_name,order_price,customer_id,order_date,location,source_ordered,order_year,order_month,order_quarter
1,PIZZA,100,A,2023-01-01,India,Swiggy,2023,1,1
2,Chowmin,150,A,2022-01-01,India,Swiggy,2022,1,1
2,Chowmin,150,A,2023-01-07,India,Swiggy,2023,1,1
3,sandwich,120,A,2023-01-10,India,Restaurant,2023,1,1
3,sandwich,120,A,2022-01-11,India,Swiggy,2022,1,1
3,sandwich,120,A,2023-01-11,India,Restaurant,2023,1,1
2,Chowmin,150,B,2022-02-01,India,Swiggy,2022,2,1
2,Chowmin,150,B,2023-01-02,India,Swiggy,2023,1,1
1,PIZZA,100,B,2023-01-04,India,Restaurant,2023,1,1
1,PIZZA,100,B,2023-02-11,India,Swiggy,2023,2,1
