In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("Customer_id",StringType(),True),
    StructField("Order_date",DateType(),True),
    StructField("Location",StringType(),True),
    StructField("Source_Order",StringType(),True),


])

sales_df=spark.read.format('csv').option('inferschema','true').schema(schema).load('/FileStore/tables/sales_csv.txt')
display(sales_df)


product_id,Customer_id,Order_date,Location,Source_Order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
from pyspark.sql.functions import month,year,quarter

sales_df=sales_df.withColumn("orderyear",year(sales_df.Order_date))
display(sales_df)

product_id,Customer_id,Order_date,Location,Source_Order,orderyear
1,A,2023-01-01,India,Swiggy,2023
2,A,2022-01-01,India,Swiggy,2022
2,A,2023-01-07,India,Swiggy,2023
3,A,2023-01-10,India,Restaurant,2023
3,A,2022-01-11,India,Swiggy,2022
3,A,2023-01-11,India,Restaurant,2023
2,B,2022-02-01,India,Swiggy,2022
2,B,2023-01-02,India,Swiggy,2023
1,B,2023-01-04,India,Restaurant,2023
1,B,2023-02-11,India,Swiggy,2023


In [0]:
sales_df=sales_df.withColumn("ordermonth",month(sales_df.Order_date))
sales_df=sales_df.withColumn("orderquarter",quarter(sales_df.Order_date))
display(sales_df)

product_id,Customer_id,Order_date,Location,Source_Order,orderyear,ordermonth,orderquarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)

])

menu_df=spark.read.format('csv').option('inferschema','true').schema(schema).load('/FileStore/tables/menu_csv.txt')
display(sales_df)

product_id,Customer_id,Order_date,Location,Source_Order,orderyear,ordermonth,orderquarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
# Calculate total amount spent by each customer
total_spent_amount = (sales_df.join(menu_df, 'product_id')
                      .groupBy('customer_id')
                      .agg({'price': 'sum'})
                      .withColumnRenamed('sum(price)', 'total_spent')
                      .orderBy('customer_id'))

display(total_spent_amount)

customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Calculate total amount spent by each product
total_spent_amount = (sales_df.join(menu_df, 'product_id')
                      .groupBy('product_name')
                      .agg({'price': 'sum'})
                      .withColumnRenamed('sum(price)', 'total_spent')
                      .orderBy('product_name'))

display(total_spent_amount)

product_name,total_spent
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import year, sum, round

# Calculate yearly sales
yearly_sales = (sales_df.join(menu_df, 'product_id')
                .withColumn('order_year', year('Order_date')) 
                .groupBy('order_year')
                .agg(
                    round(sum('price'), 2).alias('total_sales'),
                    count('*').alias('total_orders')
                )
                .orderBy('order_year'))

display(yearly_sales)

order_year,total_sales,total_orders
2022,4350.0,33
2023,9990.0,84


Databricks visualization. Run in Databricks to view.

In [0]:

# Calculate total amount spent by each month
quarter_spent = (sales_df.join(menu_df, 'product_id')
                  .groupBy('ordermonth')
                  .agg(sum('price').alias('total_spent'))
                  .orderBy('ordermonth'))  # Order by quarter

display(quarter_spent)

ordermonth,total_spent
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:

# Calculate total amount spent by each quarter
quarter_spent = (sales_df.join(menu_df, 'product_id')
                  .groupBy('orderquarter')
                  .agg(sum('price').alias('total_spent'))
                  .orderBy('orderquarter'))  # Order by quarter

display(quarter_spent)

orderquarter,total_spent
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

# Count purchases per product 
product_purchases_with_names = (sales_df.join(menu_df, 'product_id')
                                .groupBy( 'product_name')
                                .agg(count('*').alias('purchase_count'))
                                .orderBy('purchase_count', ascending=False))

display(product_purchases_with_names)

product_name,purchase_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import countDistinct, desc

# Calculate restaurant visit frequency per customer
customer_visits = (sales_df.filter(sales_df.Source_Order == 'Restaurant')
                   .groupBy('Customer_id')
                   .agg(countDistinct('Order_date').alias('visit_count'))
                   .orderBy(desc('visit_count')))

display(customer_visits)

Customer_id,visit_count
B,6
A,6
E,5
C,3
D,1


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# Total Sales By each Country
country_sales = (sales_df.join(menu_df, 'product_id')
                 .groupBy('Location')  
                 .agg(sum('price').alias('total_sales'))
                 .orderBy('total_sales', ascending=False))

display(country_sales)

Location,total_sales
UK,7020.0
India,4860.0
USA,2460.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import sum, round

# Calculate total sales by order source
source_sales = (sales_df.join(menu_df, 'product_id')
                .groupBy('Source_Order') 
                .agg(round(sum('price'), 2).alias('total_sales'))
                .orderBy('total_sales', ascending=False))

display(source_sales)

Source_Order,total_sales
Swiggy,6330.0
zomato,4920.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.