**Filepaths:**

/FileStore/tables/menu_csv.txt

/FileStore/tables/sales_csv.txt

**Schema Structure:**

1)Customer data

- Product ID
- Customer ID
- Order Date
- Location
- Source order

2)Product data

- Product ID
- Product Name
- Price

### Sales DF

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema = StructType([
     StructField('product_id',IntegerType(),True),
     StructField('customer_id',StringType(),True),
     StructField('order_date',DateType(),True),
     StructField('location',StringType(),True),
     StructField('source_order',StringType(),True)
])

sales_df = spark.read.format('csv').option('inferschema' ,'true').schema(schema).load('/FileStore/tables/sales_csv.txt')
display(sales_df)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
from pyspark.sql.functions import month, year, quarter

sales_df = sales_df.withColumn('order_year',year(sales_df.order_date))
sales_df = sales_df.withColumn('order_month',month(sales_df.order_date))
sales_df = sales_df.withColumn('order_quarter',quarter(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


### Menu df

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema = StructType([
     StructField('product_id',IntegerType(),True),
     StructField('product_name',StringType(),True),
     StructField('price',StringType(),True)
])

product_df = spark.read.format('csv').option('inferschema' ,'true').schema(schema).load('/FileStore/tables/menu_csv.txt')
display(product_df)

product_id,product_name,price
1,PIZZA,100
2,Noodles,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


Total Amount spent by each customer

In [0]:
total_amount_spent = (sales_df
    .join(product_df, 'product_id')
    .groupBy('customer_id')
    .agg({'price': 'sum'})
    .withColumnRenamed('sum(price)', 'total_spent')
    .orderBy('customer_id')
)
display(total_amount_spent)
    

customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

Total Amount Spend by each food category

In [0]:
total_amount_spent = (sales_df
    .join(product_df, 'product_id')
    .groupBy('product_name')
    .agg({'price': 'sum'})
    .withColumnRenamed('sum(price)', 'total')
    .orderBy('product_name')
)
display(total_amount_spent)
    

product_name,total
Biryani,480.0
Dosa,1320.0
Noodles,3600.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

Total Amount of Sales for each month

In [0]:
from pyspark.sql.functions import when, col
total_per_month = (sales_df
    .join(product_df, 'product_id')
    .groupBy('order_month')
    .agg({'price': 'sum'})
    .withColumnRenamed('sum(price)', 'total')
    .withColumn('month_name', 
        when(col('order_month') == 1, 'January')
        .when(col('order_month') == 2, 'February')
        .when(col('order_month') == 3, 'March')
        .when(col('order_month') == 4, 'April')
        .when(col('order_month') == 5, 'May')
        .when(col('order_month') == 6, 'June')
        .when(col('order_month') == 7, 'July')
        .when(col('order_month') == 8, 'August')
        .when(col('order_month') == 9, 'September')
        .when(col('order_month') == 10, 'October')
        .when(col('order_month') == 11, 'November')
        .when(col('order_month') == 12, 'December')
    )
    .orderBy('order_month') 
    .select('month_name', 'total') 
)

display(total_per_month)



month_name,total
January,2960.0
February,2730.0
March,910.0
May,2960.0
June,2960.0
July,910.0
November,910.0


Databricks visualization. Run in Databricks to view.

Yearly Sales

In [0]:
total_per_year = (sales_df
    .join(product_df, 'product_id')
    .groupBy('order_year')
    .agg({'price': 'sum'})
    .withColumnRenamed('sum(price)', 'total')
    .orderBy('order_year')
)
display(total_per_year)

order_year,total
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_per_quarter = (sales_df
    .join(product_df, 'product_id')
    .groupBy('order_quarter')
    .agg({'price': 'sum'})
    .withColumnRenamed('sum(price)', 'total')
    .orderBy('order_quarter')
)
display(total_per_quarter)

order_quarter,total
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

Number of times each product was purchased

In [0]:
order_per_product = (sales_df
    .join(product_df, 'product_id')
    .groupBy('product_name')
    .agg({'product_id': 'count'})
    .withColumnRenamed('count(product_id)', 'total')
    .orderBy('total')
)
display(order_per_product)

product_name,total
Pasta,6
Biryani,6
Dosa,12
PIZZA,21
Noodles,24
sandwich,48


Databricks visualization. Run in Databricks to view.

Top most ordered item

In [0]:
from pyspark.sql.functions import count
order_per_product = (sales_df
    .join(product_df, 'product_id')
    .groupBy('product_name')
    .agg(count('product_id').alias('product_count')) 
    .orderBy('product_count', ascending=False)  
    .limit(1) 
)

display(order_per_product)

product_name,product_count
sandwich,48


Frequency of customer

In [0]:
from pyspark.sql.functions import countDistinct

customer_count = (sales_df.filter(sales_df.source_order=='Restaurant')).groupBy('customer_id').agg(countDistinct('order_date')).orderBy('customer_id')
display(customer_count)

customer_id,count(order_date)
A,6
B,6
C,3
D,1
E,5


Databricks visualization. Run in Databricks to view.

Total Sales per country

In [0]:
country_sales = (sales_df
    .join(product_df, 'product_id') 
    .groupBy('location') 
    .agg({'price': 'sum'})
)
display(country_sales)

location,sum(price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
country_source = (sales_df
    .join(product_df, 'product_id') 
    .groupBy('source_order') 
    .agg({'price': 'sum'})
)
display(country_source)

source_order,sum(price)
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.