# Yelp Data Analysis in Spark 2

This is a continuation of the previous notebook YelpAnalysisSpark1.ipnyb which can be found in this repository.
This notebook deals whith coming two files business.json and reviews.json to find average reviews from January to May in all the years for food categories near Toronto.

First we need to filter out the food businesses near Toronto with business Id and name which will be our left table

Second we need to find the reviews given from January to May.

Then join the two tables based on Business id 

Perform analysis on the joined table!


In [1]:
import findspark
findspark.init()
from pyspark import SparkContext as sc
import pandas as pd
import numpy as np
from pyspark.sql import SQLContext
import json
import pyspark
data_business = "../Data/business.json"
data_review = "../Data/review.json"

In [None]:
sc = sc(appName="Yelp")
sqlContext = SQLContext(sc)

In [2]:
#Load business data
testJsonData = sqlContext.read.json(data_business)

#Filter the attributes you want
LatLong = testJsonData.select(testJsonData.business_id, testJsonData.name, 
                              pyspark.sql.functions.explode(testJsonData.categories).alias("category"), 
                              testJsonData.latitude, testJsonData.longitude, testJsonData.stars, 
                              testJsonData.review_count)

#Register Temp Table
LatLong.registerTempTable("LatLong")

#Then run a SQL query to filter out the ones only the ones within 15km distance from Toronto Center
BusinessNearTorontofilt = sqlContext.sql("SELECT * FROM  LatLong WHERE acos(sin(0.763782941288) * sin(LatLong.latitude * 3.14 /180) + cos(0.763782941288) * cos(LatLong.latitude * 3.14 /180) * cos((LatLong.longitude * 3.14 /180) - (-1.38598479111))) * 6371 <= 15")
BusinessNearTorontofilt.registerTempTable("BusinessNearToronto")

#Then filter the business with food category
AggQuery = sqlContext.sql("SELECT business_id, name FROM BusinessNearToronto WHERE category = 'Food'")
AggQuery.registerTempTable("RightTable")

In [3]:
#We want only those business id which are in 15km radius of toronto with food category
AggQuery.show()

+--------------------+--------------------+
|         business_id|                name|
+--------------------+--------------------+
|v2WhjAB3PIBA8J8Vx...|    The Tea Emporium|
|L1XHTn7S-6har9UGA...|Papa Chang's Tea ...|
|kEq7eudoX5qdcaSLA...|Paris Bakery & Pa...|
|UZShf6G75npKCCjiH...|Fortinos Supermarket|
|8gzRJEmYJGTCdahRK...|       Tea & Kitchen|
|SRg6VOJLidr8Pw-Im...|Freshii Rutherfor...|
|zcWit_aSGR5wiunYB...|          Second Cup|
|o-0Nen1h78jiWZF_A...|  Toronto Distillery|
|N-FKBizx_wu3L8mvD...|Just Desserts Res...|
|nL_IofJ0sQVigmj3H...| Welcome Food Market|
|YHXczxm4W3BkGT-z7...|Chatime - Yonge &...|
|HnbWx7Q8P4-MXbxMp...|      Jimmy's Coffee|
|rBBYRHZk2Bs5IW-GL...|     Columbus Bakery|
|opnw3jgcHUsfHCv6Y...|  Aroma Espresso Bar|
|ExoqgA3aAz6bYidI0...|California Sandwi...|
|k1Nm4agS92jbY8ZKl...|     Pat East Market|
|PgtOActam5PdtWn45...|   Bruno's Valu-Mart|
|Cr9yQp1qrYOJTx1J-...|Seara Bakery & Pa...|
|FiRAbcgD7ja2suCv8...|     Crawford Market|
|dlLG3yMa9UtU5K0Vj...|  Purdys C

In [4]:
#Load Review Data
reviewData = sqlContext.read.json(data_review)
#Select Business, star  and month attributes
reviewDataattr = reviewData.select(reviewData.business_id, reviewData.stars, 
                                   pyspark.sql.functions.month(reviewData.date).alias("month"))
reviewDataattr.registerTempTable("reviewData")
#Filter the months before May
DateFilter = sqlContext.sql("SELECT business_id, stars FROM  reviewData WHERE month < 06")
DateFilter.registerTempTable("LeftTable")

In [5]:
JoinedTable = sqlContext.sql("SELECT LeftTable.business_id, LeftTable.stars, RightTable.name FROMLeftTable LEFT JOIN RightTable ON LeftTable.business_id = RightTable.business_id")
JoinedTable.registerTempTable("JoinedTable")

In [6]:
JoinedTable.show()

+--------------------+-----+----+
|         business_id|stars|name|
+--------------------+-----+----+
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    2|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    3|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    3|null|
|--9e1ONYQuAa-CB_R...|    1|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    2|null|
|--9e1ONYQuAa-CB_R...|    4|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    2|null|
|--9e1ONYQuAa-CB_R...|    4|null|
|--9e1ONYQuAa-CB_R...|    1|null|
|--9e1ONYQuAa-CB_R...|    5|null|
|--9e1ONYQuAa-CB_R...|    4|null|
|--9e1ONYQuAa-CB_R...|    5|null|
+--------------------+-----+----+
only showing top 20 rows



In [7]:
AggResult_Bottom_10 = sqlContext.sql("SELECT name, AVG(stars) as Stars FROM JoinedTable                                     GROUP BY name ORDER BY Stars LIMIT 10")

In [30]:
AggResult_Bottom_10.show()
AggResult_Bottom_10.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question5_bot')

+--------------------+-----+
|                name|Stars|
+--------------------+-----+
|    Sweet Fix Bakery|  1.0|
|     85 C Sugar Time|  1.0|
|        Castle Fruit|  1.0|
|  Alfredos Fine Food|  1.0|
|    Pfannkuchen Köln|  1.0|
|           Taco Bell|  1.0|
|     House of Cheung|  1.0|
|Northwest Kitchen...|  1.0|
|      G's Fine Foods|  1.0|
|               Doria|  1.0|
+--------------------+-----+



In [11]:
AggResult_Top_10 = sqlContext.sql("SELECT name, AVG(stars) as Stars FROM JoinedTable GROUP BY name                                  ORDER BY Stars DESC LIMIT 10")