In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import *

schema = StructType([
    StructField("product_id", IntegerType(),True),
    StructField("customer_id", StringType(),True),
    StructField("order_date", DateType(),True),
    StructField("location", StringType(),True),
    StructField("source_order", StringType(),True),
])

In [0]:
df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/sales_csv.txt")
display(df)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
df = df.withColumn("order_year",year(df.order_date))
df = df.withColumn("order_quater",quarter(df.order_date))
df = df.withColumn("order_month",month(df.order_date))
display(df)

product_id,customer_id,order_date,location,source_order,order_year,order_quater,order_month
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,1,2
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,1,2


In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import *

schema = StructType([
    StructField("product_id", IntegerType(),True),
    StructField("product_name", StringType(),True),
    StructField("price", StringType(),True),
])

menu_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/menu_csv.txt")
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

df.createOrReplaceTempView("orders")
menu_df.createOrReplaceTempView("menu")

# Run the SQL query using Spark SQL
result_df = spark.sql("""
    SELECT a.customer_id, SUM(b.price) as total_spent
    FROM orders a join menu b
    on a.product_id = b.product_id
    GROUP BY a.customer_id
    order by customer_id
""")

# Show the result
result_df.show()


# Join orders and menu DataFrames
joined_df = df.join(menu_df,df.product_id == menu_df.product_id, "inner")

# Group by customer_id and calculate total_spent
result_df1 = joined_df.groupBy("customer_id").agg(sum("price").alias("total_spent")).orderBy("customer_id")

display(result_df1)


+-----------+-----------+
|customer_id|total_spent|
+-----------+-----------+
|          A|     4260.0|
|          B|     4440.0|
|          C|     2400.0|
|          D|     1200.0|
|          E|     2040.0|
+-----------+-----------+



customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Join orders and menu DataFrames
joined_df1 = df.join(menu_df,df.product_id == menu_df.product_id, "inner")

# Group by product_name and calculate total_spent
result_df1 = joined_df1.groupBy("product_name").agg(sum("price").alias("total_spent")).orderBy("product_name")

display(result_df1)

product_name,total_spent
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Group by order_month and calculate total_spent
result_df2 = joined_df1.groupBy("order_month").agg(sum("price").alias("revenue_per_month")).orderBy("order_month")
display(result_df2)


order_month,revenue_per_month
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Group by order_year and calculate total_spent
result_df2 = joined_df1.groupBy("order_year").agg(sum("price").alias("revenue_per_year")).orderBy("order_year")
display(result_df2)

order_year,revenue_per_year
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Group by order_quater and calculate total_spent
result_df2 = joined_df1.groupBy("order_quater").agg(sum("price").alias("revenue_per_quater")).orderBy("order_quater")
display(result_df2)

order_quater,revenue_per_quater
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
display(joined_df1)

product_id,customer_id,order_date,location,source_order,order_year,order_quater,order_month,product_id.1,product_name,price
1,A,2023-01-01,India,Swiggy,2023,1,1,1,PIZZA,100
2,A,2022-01-01,India,Swiggy,2022,1,1,2,Chowmin,150
2,A,2023-01-07,India,Swiggy,2023,1,1,2,Chowmin,150
3,A,2023-01-10,India,Restaurant,2023,1,1,3,sandwich,120
3,A,2022-01-11,India,Swiggy,2022,1,1,3,sandwich,120
3,A,2023-01-11,India,Restaurant,2023,1,1,3,sandwich,120
2,B,2022-02-01,India,Swiggy,2022,1,2,2,Chowmin,150
2,B,2023-01-02,India,Swiggy,2023,1,1,2,Chowmin,150
1,B,2023-01-04,India,Restaurant,2023,1,1,1,PIZZA,100
1,B,2023-02-11,India,Swiggy,2023,1,2,1,PIZZA,100


In [0]:
 mos_df = (df.join(menu_df,'product_id').groupBy("Product_id", "product_name")
    .agg(count("product_id").alias("count"))
    .orderBy('count', ascending=0)
    .drop("product_id")
    )
 display(mos_df)
 

product_name,count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
 mos_df = (df.join(menu_df,'product_id').groupBy("Product_id", "product_name")
    .agg(count("product_id").alias("count"))
    .orderBy('count', ascending=0)
    .drop("product_id").limit(1)
    )
 display(mos_df)

product_name,count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
shoq_df = (df.filter(df.source_order=="Restaurant").groupBy("customer_id").agg(countDistinct("order_date")))
display(shoq_df)

customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [0]:


 sales_by_country = (df.join(menu_df,'product_id').groupBy("location")
    .agg(sum("price").alias("total_sales"))
    .orderBy('total_sales', ascending=False)
    .drop("product_id")
    )
 display(sales_by_country)

location,total_sales
UK,7020.0
India,4860.0
USA,2460.0


Databricks visualization. Run in Databricks to view.

In [0]:
 sales_by_source_order = (df.join(menu_df,'product_id').groupBy("source_order")
    .agg(sum("price").alias("total_sales"))
    .orderBy('total_sales', ascending=False)
    .drop("product_id")
    )
 display(sales_by_source_order)

source_order,total_sales
Swiggy,6330.0
zomato,4920.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.