<a href="https://colab.research.google.com/github/Coolinglass/Applied-Machine-Learning-Projects/blob/master/Spark_SQL_Data_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

In [None]:
# Create SparkSession object
spark = SparkSession.builder \
        .getOrCreate()

In [None]:
# Import required libraries
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType

In [None]:
# Define schema
finalSchema = StructType([StructField('meal_id', StringType(),True),
                              StructField('category',StringType(),True),
                              StructField('cuisine', StringType(),True),
                             ])

In [None]:
# Creating dataframe from the data
Meal_info = spark.read.csv('dataset/meal_info.csv',schema=finalSchema,header=True)

In [None]:
Meal_info.printSchema()

root
 |-- meal_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)



In [None]:
Meal_info.show()

+-------+------------+-------+
|meal_id|    category|cuisine|
+-------+------------+-------+
|   1885|   Beverages|   Thai|
|   1993|   Beverages|   Thai|
|   2539|   Beverages|   Thai|
|   1248|   Beverages| Indian|
|   2631|   Beverages| Indian|
|   1311|      Extras|   Thai|
|   1062|   Beverages|Italian|
|   1778|   Beverages|Italian|
|   1803|      Extras|   Thai|
|   1198|      Extras|   Thai|
|   2707|   Beverages|Italian|
|   1847|        Soup|   Thai|
|   1438|        Soup|   Thai|
|   2494|        Soup|   Thai|
|   2760|Other Snacks|   Thai|
|   2490|       Salad|Italian|
|   1109|   Rice Bowl| Indian|
|   2290|   Rice Bowl| Indian|
|   1525|Other Snacks|   Thai|
|   2704|Other Snacks|   Thai|
+-------+------------+-------+
only showing top 20 rows



In [None]:
# Create view for the dataframe
Meal_info.createOrReplaceTempView("Meal_info")

In [None]:
# View columns
spark.sql("show columns from Meal_info").show()

+--------+
|col_name|
+--------+
| meal_id|
|category|
| cuisine|
+--------+



In [None]:
spark.sql("select meal_id, category, cuisine from Meal_info")

meal_id,category,cuisine
1885,Beverages,Thai
1993,Beverages,Thai
2539,Beverages,Thai
1248,Beverages,Indian
2631,Beverages,Indian
1311,Extras,Thai
1062,Beverages,Italian
1778,Beverages,Italian
1803,Extras,Thai
1198,Extras,Thai


In [None]:
spark.sql("Select DISTINCT category FROM meal_info")

category
Salad
Desert
Biryani
Rice Bowl
Sandwich
Pizza
Beverages
Other Snacks
Soup
Starters


# Q1. Distinct number of meal categories and cuisines

In [None]:
spark.sql("Select COUNT(DISTINCT category) as distinct_category FROM meal_info")

distinct_category
14


In [None]:
spark.sql("Select COUNT(DISTINCT cuisine) as distinct_cuisine FROM meal_info")

distinct_cuisine
4


In [None]:
# Define schema
trainschema = StructType([StructField('id', DoubleType(),True),
                              StructField('week',IntegerType(),True),
                              StructField('center_id', StringType(),True),
                              StructField('meal_id', StringType(), True),
                              StructField('checkout_price', DoubleType(), True),
                              StructField('base_price', DoubleType(), True),
                              StructField('emailer_for_promotion', StringType(), True),
                              StructField('homepage_featured', StringType(), True),
                              StructField('num_orders', IntegerType(), True),
                             ])

In [None]:
train = spark.read.csv('dataset/train.csv',schema=trainschema,header=True)

In [None]:
# Schema of dataframe
train.printSchema()

root
 |-- id: double (nullable = true)
 |-- week: integer (nullable = true)
 |-- center_id: string (nullable = true)
 |-- meal_id: string (nullable = true)
 |-- checkout_price: double (nullable = true)
 |-- base_price: double (nullable = true)
 |-- emailer_for_promotion: string (nullable = true)
 |-- homepage_featured: string (nullable = true)
 |-- num_orders: integer (nullable = true)



In [None]:
# Display dataframe
train.show()

+---------+----+---------+-------+--------------+----------+---------------------+-----------------+----------+
|       id|week|center_id|meal_id|checkout_price|base_price|emailer_for_promotion|homepage_featured|num_orders|
+---------+----+---------+-------+--------------+----------+---------------------+-----------------+----------+
|1379560.0|   1|       55|   1885|        136.83|    152.29|                    0|                0|       177|
|1466964.0|   1|       55|   1993|        136.83|    135.83|                    0|                0|       270|
|1346989.0|   1|       55|   2539|        134.86|    135.86|                    0|                0|       189|
|1338232.0|   1|       55|   2139|         339.5|    437.53|                    0|                0|        54|
|1448490.0|   1|       55|   2631|         243.5|     242.5|                    0|                0|        40|
|1270037.0|   1|       55|   1248|        251.23|    252.23|                    0|                0|    

In [None]:
# Create view for the dataframe
train.createOrReplaceTempView("train")

# 2 Center_id has highest num_orders

In [None]:
spark.sql("SELECT center_id, SUM (num_orders) FROM train GROUP BY center_id ORDER BY SUM(num_orders) desc LIMIT 1")

center_id,sum(num_orders)
13,1742220


In [None]:
# Left outer join
left_outer_join_df = train.join(Meal_info, 'meal_id', 'leftouter')

In [None]:
spark.sql("SELECT * FROM train LEFT JOIN Meal_info ON train.meal_id = Meal_info.meal_id WHERE center_id = 13")

id,week,center_id,meal_id,checkout_price,base_price,emailer_for_promotion,homepage_featured,num_orders,meal_id.1,category,cuisine
1171094.0,1,13,1885,135.86,122.28,0,1,2132,1885,Beverages,Thai
1068455.0,1,13,1993,134.86,122.28,0,1,2418,1993,Beverages,Thai
1105491.0,1,13,2539,133.86,133.86,0,0,474,2539,Beverages,Thai
1486384.0,1,13,2139,337.62,437.53,0,0,123,2139,Beverages,Indian
1345938.0,1,13,2631,252.23,437.47,0,0,162,2631,Beverages,Indian
1277430.0,1,13,1248,253.23,251.23,0,0,121,1248,Beverages,Indian
1005971.0,1,13,1778,184.36,182.36,0,0,513,1778,Beverages,Italian
1217794.0,1,13,1062,185.33,185.33,0,0,998,1062,Beverages,Italian
1264572.0,1,13,2707,191.09,193.09,0,0,2078,2707,Beverages,Italian
1269641.0,1,13,1207,307.49,382.24,0,0,366,1207,Beverages,Continental


# 3 Top selling cuisine at the center_id that has highest num_orders

In [None]:
spark.sql("SELECT cuisine, SUM(num_orders) FROM train LEFT JOIN Meal_info ON train.meal_id = Meal_info.meal_id WHERE center_id = 13 GROUP BY cuisine ORDER BY sum(num_orders) desc LIMIT 1")

cuisine,sum(num_orders)
Thai,654724


In [None]:
# Define schema
fulfilmentcenter = StructType([StructField('center_id', StringType(),True),
                              StructField ('city_code',StringType(),True),
                              StructField ('region_code', StringType(),True),
                              StructField ('center_type', StringType(),True),
                              StructField ('op_area', DoubleType(),True),
                             ])

In [None]:
# Creating dataframe from the data
Fulfilment_center = spark.read.csv('dataset/fulfilment_center_info.csv',schema=fulfilmentcenter,header=True)

In [None]:
# Create view for the dataframe
Fulfilment_center.createOrReplaceTempView("Fulfilment_center")

In [None]:
spark.sql("show columns from Fulfilment_center").show()

+-----------+
|   col_name|
+-----------+
|  center_id|
|  city_code|
|region_code|
|center_type|
|    op_area|
+-----------+



In [None]:
spark.sql ("SELECT city_code, region_Code,op_area, Center_type FROM Fulfilment_Center")

city_code,region_Code,op_area,Center_type
679,56,3.7,TYPE_A
590,56,6.7,TYPE_B
590,56,4.0,TYPE_C
648,34,4.1,TYPE_A
632,34,3.6,TYPE_C
553,77,4.4,TYPE_A
593,77,3.9,TYPE_A
693,34,2.8,TYPE_C
526,34,4.1,TYPE_A
562,77,3.8,TYPE_B


# 4 Avg op_Area per center_type

In [None]:
spark.sql ("SELECT Center_type, AVG(op_area) FROM Fulfilment_Center GROUP BY Center_Type ORDER BY AVG(op_area)")

Center_type,avg(op_area)
TYPE_C,3.1578947368421044
TYPE_A,4.076744186046512
TYPE_B,4.7733333333333325


# 5 Center_type having highest revenue

In [None]:
spark.sql ("SELECT Center_type, sum(checkout_price*num_orders) AS revenue FROM train LEFT JOIN Fulfilment_Center ON train.center_id = Fulfilment_Center.center_id GROUP BY Center_type ORDER BY revenue desc LIMIT 1")

Center_type,revenue
TYPE_A,7276203201.869873


# 6 top ordered cusine based on number of orders

In [None]:
spark.sql ("SELECT cuisine, sum(num_orders) FROM train LEFT JOIN Meal_info ON train.meal_id = Meal_info.meal_id GROUP BY cuisine ORDER BY sum(num_orders) desc LIMIT 1")

cuisine,sum(num_orders)
Italian,17166334


# 7 num_orders_per cuisine_per week

In [None]:
spark.sql ("SELECT cuisine,week,sum(num_orders) FROM train LEFT JOIN Meal_info ON train.meal_id = Meal_info.meal_id GROUP BY cuisine, week ORDER BY cuisine, week")

cuisine,week,sum(num_orders)
Continental,1,146020
Continental,2,133570
Continental,3,97977
Continental,4,118819
Continental,5,116077
Continental,6,107160
Continental,7,77051
Continental,8,43897
Continental,9,152678
Continental,10,156051


In [None]:
spark.sql("SELECT center_id, Count(checkout_price < base_price) AS discount from train GROUP BY center_id ORDER BY discount desc LIMIT 1")

center_id,discount
13,2834
