In [0]:
## These files are present in DBFS location
/FileStore/tables/sales_csv.txt   

/FileStore/tables/menu_csv.txt

In [0]:
from pyspark.sql.types import StructType,StructField, IntegerType,StringType,DateType

schema = StructType([
        StructField("product_Id",IntegerType(),True),
        StructField("customer_Id",StringType(),True),
        StructField("order_Date",DateType(),True),
        StructField("location",StringType(),True),
        StructField("source_Order",StringType(),True),

])

In [0]:
sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/sales_csv.txt")

display(sales_df)

product_Id,customer_Id,order_Date,location,source_Order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
from pyspark.sql.functions import month,year,quarter

## withColumn helps to add another column to dataframe
sales_df = sales_df.withColumn("order_year",year(sales_df.order_Date))

display(sales_df)

product_Id,customer_Id,order_Date,location,source_Order,order_year
1,A,2023-01-01,India,Swiggy,2023
2,A,2022-01-01,India,Swiggy,2022
2,A,2023-01-07,India,Swiggy,2023
3,A,2023-01-10,India,Restaurant,2023
3,A,2022-01-11,India,Swiggy,2022
3,A,2023-01-11,India,Restaurant,2023
2,B,2022-02-01,India,Swiggy,2022
2,B,2023-01-02,India,Swiggy,2023
1,B,2023-01-04,India,Restaurant,2023
1,B,2023-02-11,India,Swiggy,2023


In [0]:
sales_df = sales_df.withColumn("order_month",month(sales_df.order_Date))
sales_df = sales_df.withColumn("order_quarter",quarter(sales_df.order_Date))

display(sales_df)

product_Id,customer_Id,order_Date,location,source_Order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
from pyspark.sql.types import StructType,StructField, IntegerType,StringType,DateType

schema_2 = StructType([
        StructField("product_Id",IntegerType(),True),
        StructField("product_Name",StringType(),True),
        StructField("price",StringType(),True)

])

In [0]:
menu_df = spark.read.format("csv").option("inferschema","true").schema(schema_2).load("/FileStore/tables/menu_csv.txt")

display(menu_df)

product_Id,product_Name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
total_amount_spent = (sales_df.join(menu_df,"product_Id").groupBy("customer_Id").agg({"price":"sum"}).orderBy("customer_Id")
                      )

display(total_amount_spent)                      

customer_Id,sum(price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
## Creating table from dataframe for use in SQL

sales_df.write.mode("overwrite").saveAsTable("default.sales_df")

In [0]:
menu_df.write.mode("overwrite").saveAsTable("default.menu_df")

In [0]:
%sql
SELECT count(*) FROM default.sales_df;
SELECT count(*) FROM default.menu_df;

count(1)
6


In [0]:
%sql
SELECT 
    s.customer_Id,
    SUM(m.price) AS total_amount_spent
FROM 
    sales_df s
JOIN 
    menu_df m
ON 
    s.product_Id = m.product_Id
GROUP BY 
    s.customer_Id
ORDER BY 
    s.customer_Id;

In [0]:
total_sales_category = (sales_df.join(menu_df,"product_Id").groupBy("product_Name").agg({"price":"sum"}).orderBy("product_Name")
                      )

display(total_sales_category)                      

product_Name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_month = (sales_df.join(menu_df,"product_Id").groupBy("order_month").agg({"price":"sum"}).orderBy("order_month")
                      )

display(total_sales_month)                      

order_month,sum(price)
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
yearly_sales = (sales_df.join(menu_df,"product_Id").groupBy("order_year").agg({"price":"sum"}).orderBy("order_year")
                      )

display(yearly_sales)                      

order_year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
quarterly_sales = (sales_df.join(menu_df,"product_Id").groupBy("order_quarter").agg({"price":"sum"}).orderBy("order_quarter")
                      )

display(quarterly_sales)                      

order_quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

times_purchased = (sales_df.join(menu_df,"product_Id").groupBy("product_Id","product_Name")
                   .agg(count("product_Id").alias("product_count"))
                   .orderBy('product_count',ascending =0)
                   
                   )

display(times_purchased)                   

product_Id,product_Name,product_count
3,sandwich,48
2,Chowmin,24
1,PIZZA,21
4,Dosa,12
5,Biryani,6
6,Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

top_5 = (sales_df.join(menu_df,"product_Id").groupBy("product_Id","product_Name")
                   .agg(count("product_Id").alias("product_count"))
                   .orderBy('product_count',ascending =0)
                   .drop('product_Id').limit(5)
                   
                   )

display(top_5)                   

product_Name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import countDistinct

cust_freq = (sales_df.filter(sales_df.source_Order=='Restaurant')
            .groupBy("customer_Id")
            .agg(countDistinct("order_date"))                   
                   )

display(cust_freq)                   

customer_Id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_country = (sales_df.join(menu_df,"product_Id").groupBy("location").agg({"price":"sum"})
                      )

display(total_sales_country)

location,sum(price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_order = (sales_df.join(menu_df,"product_Id").groupBy("source_Order")
                     .agg({"price":"sum"})
                      )

display(total_sales_order)

source_Order,sum(price)
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.