# Name: Chioma Tagbo

### Spark SQL Project
##### 2nd October 2022

In [65]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

In [66]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [67]:
schema_train = StructType([StructField("id",StringType(),True),
                    StructField("week",StringType(),True),
                    StructField("center_id",StringType(),True),
                    StructField("meal_id",StringType(),True),
                    StructField("checkout_price",DoubleType(),True),
                    StructField("base_price",DoubleType(),True),
                    StructField("emailer_for_promotion",StringType(),True),
                    StructField("homepage_featured",StringType(),True),
                    StructField("num_orders",IntegerType(),True)])

In [68]:
schema_meal = StructType([StructField("meal_id",StringType(),True),
                    StructField("category",StringType(),True),
                    StructField("cuisine",StringType(),True)])

In [69]:
schema_fulfilment = StructType([StructField("center_id",StringType(),True),
                    StructField("city_code",StringType(),True),
                    StructField("region_code",StringType(),True),
                    StructField("center_type",StringType(),True),
                    StructField("op_area",DoubleType(),True)])

In [70]:
weekly_orders_info = spark.read.format('csv').option('header','True').schema(schema_train).load('Weekly_orders.csv')

In [71]:
weekly_orders_info.printSchema()

root
 |-- id: string (nullable = true)
 |-- week: string (nullable = true)
 |-- center_id: string (nullable = true)
 |-- meal_id: string (nullable = true)
 |-- checkout_price: double (nullable = true)
 |-- base_price: double (nullable = true)
 |-- emailer_for_promotion: string (nullable = true)
 |-- homepage_featured: string (nullable = true)
 |-- num_orders: integer (nullable = true)



In [72]:
meal_data = spark.read.format('csv').option('header','True').schema(schema_meal).load('meal_info.csv')

In [73]:
meal_data.printSchema()

root
 |-- meal_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)



In [74]:
fulfilment_center_data = spark.read.format('csv').option('header','True').schema(schema_fulfilment).load('fulfilment_center_info.csv')

In [75]:
fulfilment_center_data.printSchema()

root
 |-- center_id: string (nullable = true)
 |-- city_code: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- center_type: string (nullable = true)
 |-- op_area: double (nullable = true)



## Question 1
What are the distinct number of meal categories and cuisines?

In [76]:
meal_data.createOrReplaceTempView("meal_table") 
spark.sql("select count(distinct category) as Category_count, count(distinct cuisine) as Cusine_count from meal_table").show()

+--------------+------------+
|Category_count|Cusine_count|
+--------------+------------+
|            14|           4|
+--------------+------------+



## Question 2
 Which center_id has the highest num_orders?

In [77]:
weekly_orders_info.createOrReplaceTempView("orders_table") 
spark.sql("select center_id, sum(num_orders) as Total_orders from orders_table group by center_id order by total_orders desc").show(1)

+---------+------------+
|center_id|Total_orders|
+---------+------------+
|       13|     1742220|
+---------+------------+
only showing top 1 row



## Question 3
What is the top selling cuisine at the center_id that had the highest num_orders?

In [78]:
spark.sql("""select b.cuisine, sum(a.num_orders) as Total_orders
       from orders_table a
       left join meal_table b
       on a.meal_id = b.meal_id
       where a.center_id = '13'
       group by b.cuisine
       order by Total_orders desc""").show(1)

+-------+------------+
|cuisine|Total_orders|
+-------+------------+
|   Thai|      654724|
+-------+------------+
only showing top 1 row



## Question 4
What is the average op_area per center_type

In [82]:
fulfilment_center_data.createOrReplaceTempView("fulfilment_center_table") 
spark.sql("select center_type, avg(op_area) Average_op_area from fulfilment_center_table group by center_type").show()

+-----------+------------------+
|center_type|   Average_op_area|
+-----------+------------------+
|     TYPE_C|3.1578947368421044|
|     TYPE_B|4.7733333333333325|
|     TYPE_A| 4.076744186046512|
+-----------+------------------+



## Question 5
Which center_type had the highest revenue? (Revenue is total sum of
checkout_price*num_orders)

In [83]:
spark.sql("""select a.center_type, sum(b.checkout_price * b.num_orders) as Revenue
from fulfilment_center_table a
left join orders_table b
on trim(a.center_id) = trim(b.center_id)
group by center_type
order by Revenue desc""").show(1)

+-----------+-------------------+
|center_type|            Revenue|
+-----------+-------------------+
|     TYPE_A|7.276203201869771E9|
+-----------+-------------------+
only showing top 1 row



## Question 6
Which is the top ordered cuisine in terms of num_orders?

In [87]:
spark.sql(
"""select cuisine, sum(num_orders) No_Of_Orders
from meal_table a
left join orders_table b
on a.meal_id = b.meal_id
group by cuisine
order by No_Of_Orders desc""").show(1)

+-------+------------+
|cuisine|No_Of_Orders|
+-------+------------+
|Italian|    17166334|
+-------+------------+
only showing top 1 row



## Question 7

What are the num_orders per cuisine per week?

In [88]:
spark.sql(
"""select a.cuisine, sum(b.num_orders) Total_Orders, b.week
from meal_table a
left join orders_table b
on a.meal_id = b.meal_id
group by a.cuisine, b.week """).show()

+-----------+------------+----+
|    cuisine|Total_Orders|week|
+-----------+------------+----+
|Continental|       30678|  61|
|       Thai|      255566|  51|
|       Thai|      300860|  48|
|Continental|      163067|  40|
|     Indian|      175317|   1|
|    Italian|      595946|  32|
|Continental|      146020|   1|
|Continental|       82776|  59|
|    Italian|      291732|  33|
|Continental|      133504|  54|
|     Indian|      171557|  30|
|    Italian|      228836|   1|
|Continental|       74798|  24|
|     Indian|      338423|  45|
|Continental|      289173|  52|
|    Italian|      277871|  46|
|    Italian|      171341|   7|
|       Thai|      207946|  28|
|     Indian|      140576|  36|
|Continental|      156051|  10|
+-----------+------------+----+
only showing top 20 rows



## Question 8
 Which center_id gave the highest number of discounts? (Discount is considered
when checkout_price is less than base_price)

In [89]:
spark.sql("""select center_id, count(*) as Discount_count
            from orders_table 
            where checkout_price < base_price
            group by center_id 
            order by Discount_count desc
          """).show(1)

+---------+--------------+
|center_id|Discount_count|
+---------+--------------+
|       13|          1509|
+---------+--------------+
only showing top 1 row



## THANK YOU!