In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import *

In [0]:
sales_path = "/FileStore/tables/sales_csv.txt"
menu_path = "/FileStore/tables/menu_csv.txt"

In [0]:
# We need to define a schema cause our txt files don't have headers
# We have to set the column name, de type of the data and True to avoid Null values
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("costumer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

# Creating a dataframe
sales_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load(sales_path)
display(sales_df)

product_id,costumer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
display(sales_df)

product_id,costumer_id,order_date,location,source_order,order_year
1,A,2023-01-01,India,Swiggy,2023
2,A,2022-01-01,India,Swiggy,2022
2,A,2023-01-07,India,Swiggy,2023
3,A,2023-01-10,India,Restaurant,2023
3,A,2022-01-11,India,Swiggy,2022
3,A,2023-01-11,India,Restaurant,2023
2,B,2022-02-01,India,Swiggy,2022
2,B,2023-01-02,India,Swiggy,2023
1,B,2023-01-04,India,Restaurant,2023
1,B,2023-02-11,India,Swiggy,2023


In [0]:
sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
display(sales_df)

product_id,costumer_id,order_date,location,source_order,order_year,order_month
1,A,2023-01-01,India,Swiggy,2023,1
2,A,2022-01-01,India,Swiggy,2022,1
2,A,2023-01-07,India,Swiggy,2023,1
3,A,2023-01-10,India,Restaurant,2023,1
3,A,2022-01-11,India,Swiggy,2022,1
3,A,2023-01-11,India,Restaurant,2023,1
2,B,2022-02-01,India,Swiggy,2022,2
2,B,2023-01-02,India,Swiggy,2023,1
1,B,2023-01-04,India,Restaurant,2023,1
1,B,2023-02-11,India,Swiggy,2023,2


In [0]:
sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
display(sales_df)

product_id,costumer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
# We need to define a schema cause our txt files don't have headers
# We have to set the column name, de type of the data and True to avoid Null values
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True)
])

# Creating a dataframe
menu_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load(menu_path)
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# We start by joining both tables using the column 'product_id', grouping them by the costumer_id and apply an aggregation for suming the price
total_amount_spent = (sales_df.join(menu_df, "product_id").groupBy("costumer_id").agg({"price": "sum"}).orderBy("costumer_id"))
display(total_amount_spent)

costumer_id,sum(price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Food category stands for each product_name
total_amount_category = (sales_df.join(menu_df, "product_id").groupBy("product_name").agg({"price": "sum"}).orderBy("product_name"))
display(total_amount_category)

product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_month = (sales_df.join(menu_df, "product_id").groupBy("order_month").agg({"price": "sum"}).orderBy("order_month"))
display(total_sales_month)

order_month,sum(price)
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_year = (sales_df.join(menu_df, "product_id").groupBy("order_year").agg({"price": "sum"}).orderBy("order_year"))
display(total_sales_year)

order_year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_quarter = (sales_df.join(menu_df, "product_id").groupBy("order_quarter").agg({"price": "sum"}).orderBy("order_quarter"))
display(total_sales_quarter)

order_quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
product_sales = (sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=False))
display(product_sales)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
top_5_items = (sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=False).limit(5))
display(top_5_items)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


In [0]:
top_item = (sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=False).limit(1))
display(top_item)

product_name,product_count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
visits = sales_df.groupBy("costumer_id").agg(count("costumer_id")).orderBy(desc(count("costumer_id")), asc("costumer_id"))
display(visits)

costumer_id,count(costumer_id)
B,36
A,33
C,18
E,18
D,12


Databricks visualization. Run in Databricks to view.

In [0]:
# First we need to filter the orders where source order is Restaurant and the order date cannot be the same 
df_filtered = sales_df.filter(sales_df.source_order == "Restaurant")
# Number of orders each client makes on each day
visits_resturant = df_filtered.groupBy("costumer_id").agg(countDistinct("order_date")).orderBy(desc(countDistinct("order_date")), asc("costumer_id"))
display(visits_resturant)

costumer_id,count(order_date)
A,6
B,6
E,5
C,3
D,1


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_country = (sales_df.join(menu_df, "product_id").groupBy("location").agg(sum("price").alias("Total_sales")).orderBy("Total_sales", ascending = False))
display(total_sales_country)

location,Total_sales
UK,7020.0
India,4860.0
USA,2460.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_source = (sales_df.join(menu_df, "product_id").groupBy("source_order").agg(sum("price").alias("Total_sales")).orderBy("Total_sales", ascending = False))
display(total_sales_source)

source_order,Total_sales
Swiggy,6330.0
zomato,4920.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.