## Loading data with fixing datatype

In [None]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema = StructType([
    StructField('product_id',IntegerType(),True),
    StructField('customer_id',StringType(),True),
    StructField('order_date',DateType(),True),
    StructField('location',StringType(),True),
    StructField('source_order',StringType(),True)
])

sales_df = spark.read.format('csv').option('inferschema','true').schema(schema).load('/FileStore/tables/sales_csv-2.txt')
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

## Creating new column 'order year'

In [None]:
from pyspark.sql.functions import year,month,quarter

sales_df = sales_df.withColumn('order_year',year(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+
|product_id|customer_id|order_date|location|source_order|order_year|
+----------+-----------+----------+--------+------------+----------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|
|         2|          B|2023-01-02|   India|      Swiggy|      2023|
|         1|          B|2023-01-04|   India|  Restaurant|      2023|
|         1|          B|2023-02-11|   India|      Swiggy|      2023|
|         3|          B|2023-01-16|   India|      zomato|      2023|
|         3|          B|2022-02-01

## Also create 'Order month',order quarter'

In [None]:
sales_df = sales_df.withColumn('order_month',month(sales_df.order_date))
sales_df = sales_df.withColumn('order_quarter',quarter(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

## Loading another data 'menu_csv' 

In [None]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType

schema = StructType([
    StructField('product_id',IntegerType(),True),
    StructField('product_name',StringType(),True),
    StructField('price',StringType(),True)
])

menu_df = spark.read.format('csv').option('inferschema','true').schema(schema).load('/FileStore/tables/menu_csv.txt')
menu_df.show()
display(menu_df)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


## Total amount spent by each customer

In [None]:
total_amt_spent = (sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'})
                   .orderBy('customer_id'))
total_amt_spent.show()
display(total_amt_spent)

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



customer_id,sum(price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

## Total amount spent by each food category

In [None]:
total_food_spent = (sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'}).orderBy('product_name'))
total_food_spent.show()
display(total_food_spent)

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

## Monthly total sales

In [None]:
monthly_sale = (sales_df.join(menu_df,'product_id').groupBy('order_month').agg({'price':'sum'}).orderBy('sum(price)',ascending=False))
monthly_sale.show()
display(monthly_sale)

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          6|    2960.0|
|          5|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



order_month,sum(price)
1,2960.0
6,2960.0
5,2960.0
2,2730.0
3,910.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

## Yearly sales

In [None]:
yearly_sale = (sales_df.join(menu_df,'product_id').groupBy('order_year').agg({'price':'sum'}).orderBy('order_year'))
yearly_sale.show()
display(yearly_sale)

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



order_year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

## Quarterly sales

In [None]:
quarterly_sale = (sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'}).orderBy('order_quarter'))
quarterly_sale.show()
display(quarterly_sale)

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



order_quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

## How many times each product purchased

In [None]:
from pyspark.sql.functions import count

items_most = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count'))
              .orderBy('product_count',ascending=False).drop('product_id'))
items_most.show()
display(items_most)

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

## Top 5 Order

In [None]:
top_5 = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count'))
         .orderBy('product_count',ascending=False).drop('product_id')
         .limit(5)
         )
top_5.show()
display(top_5)

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
+------------+-------------+



product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6


## Most ordered item

In [None]:
most_order = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count'))
              .orderBy('product_count',ascending=False).drop('product_id').limit(1))
most_order.show()
display(most_order)

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
+------------+-------------+



product_name,product_count
sandwich,48


## How many customers visiting restaurant

In [None]:
from pyspark.sql.functions import countDistinct

cust_visit = (sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date')))
cust_visit.show()
display(cust_visit)

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          E|                5|
|          B|                6|
|          D|                1|
|          C|                3|
|          A|                6|
+-----------+-----------------+



customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

## Total Sales by each country

In [None]:
country_sales = (sales_df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'}))
country_sales.show()
display(country_sales)

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



location,sum(price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

## Total sales by each order source

In [None]:
source_sales = (sales_df.join(menu_df,'product_id').groupBy('source_order').agg({'price':'sum'}))
source_sales.show()
display(source_sales)

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+



source_order,sum(price)
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.