# Yelp Data Analysis in Spark 1

### The analysis have been performed on used business.json

There are total of 5 objectives in this work.

1. Learning to work with JSON hierarical data in SparkSQL
2. Performing grouping aggregation using on data using SparkSQL
3. Pivoting the data, this might not be the best use case for pivoting but in genreally there are a lot of use cases which require pivoting of data
4. Filtering out data based on attribute values
5. Performing Geospacial analysis

A continuation of this of this notebook will be YelpAnalysisSpark2.ipynb which joins two datasets.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext as sc
import pandas as pd
import os
import numpy as np
from pyspark.sql import SQLContext
import json
import pyspark

data_path = ['../Data']
Business_filepath = os.sep.join(data_path + ['business.json'])

sc = sc(appName="Yelp")
sqlContext = SQLContext(sc)


#Load Business data
Business_data = sqlContext.read.json(Business_filepath)

In [2]:
Business_data.show()

+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+--------------+--------------------+-----------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|            city|               hours|is_open|     latitude|     longitude|                name|     neighborhood|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+--------------+--------------------+-----------------+-----------+------------+-----+-----+
|     691 Richmond Rd|[null,null,null,n...|YDf95gJZaq05wvo7h...|[Shopping, Shoppi...|Richmond Heights|[10:00-21:00,10:0...|      1|   41.5417162|   -81.4931165|Richmond Town Square|                 |      44143|          17|  2.0|   OH|
|      2824 Milton Rd|[null,null,null,[...|mLwM-h2Yh

## Finding Average review count and stars by city and category

In [3]:
#Filter out the attributes that you need
Business = Business_data.select(pyspark.sql.functions.explode(Business_data.categories).alias("category"), 
                               Business_data.state,  Business_data.city, Business_data.stars, Business_data.review_count)
#Register as temp table
Business.registerTempTable("Business_Agg")

#Run the SQL Query
result = sqlContext.sql("SELECT Business_Agg.city, Business_Agg.category,\
AVG(Business_Agg.review_count) As AverageReview,\
AVG(Business_Agg.stars) as AverageStars FROM Business_Agg GROUP BY Business_Agg.city, Business_Agg.category")

#saving the result in a csv file
result.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question1')
#Check the result
result.show()

+----------------+--------------------+------------------+------------------+
|            city|            category|     AverageReview|      AverageStars|
+----------------+--------------------+------------------+------------------+
|Richmond Heights|            Shopping|              10.5|               3.5|
|         Madison|    Laundry Services| 6.851851851851852| 2.962962962962963|
|          Elyria|             Doctors|               3.0|               2.5|
|            Mesa|  Auto Customization|12.857142857142858|              4.25|
|         Phoenix|                Pets| 21.04076086956522| 4.073369565217392|
|         Toronto|  Financial Services|          6.296875|           2.96875|
|           Tempe|     Hotels & Travel| 29.19298245614035| 3.258771929824561|
|        Surprise|            Shopping|12.532374100719425|3.5719424460431655|
|       Henderson|               Pizza|  96.3804347826087|3.3532608695652173|
|       Etobicoke|      Sporting Goods| 6.583333333333333|      

## Pivoting the table

In [4]:
BusinessPivot = Business_data.select(pyspark.sql.functions.explode(Business_data.categories).alias("category"),\
                                     Business_data.state,  Business_data.city, Business_data.stars, \
                                     Business_data.review_count)

BusinessPivot.registerTempTable("AggreCate")

pivot_table = BusinessPivot.groupBy("city", "State").pivot("category").avg("stars")
#pivot_table.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question2')

## Filter out data based on attribute values

In [5]:
#Flatten the category array
CategoryExplo = Business_data.select(pyspark.sql.functions.explode(Business_data.categories).alias("category"),\
                                    Business_data.attributes, Business_data.stars)

#Filter out Mexican and TakeOut
CategoryAtt = CategoryExplo.select(CategoryExplo.attributes.RestaurantsTakeOut.alias("takeout"),\
                                   CategoryExplo.category, CategoryExplo.stars)

CategoryAtt.registerTempTable("CategoryAtt")

#Run the query on the table
MexicanTakeout = "SELECT category, AVG(stars) AS Stars FROM CategoryAtt WHERE category = 'Mexican' \
AND takeout = True GROUP BY category"
RatingMexicanTakeO = sqlContext.sql(MexicanTakeout)

RatingMexicanTakeO.show()

+--------+-----------------+
|category|            Stars|
+--------+-----------------+
| Mexican|3.436754507628294|
+--------+-----------------+



In [6]:
RatingMexicanTakeO.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question3')

## Geospacial Analysis
### FIltering businesses near toronto specifically within 15 km of Toronto center

In [7]:
#lattitude, longitude, exploded(category)
LatLong = Business_data.select(pyspark.sql.functions.explode(Business_data.categories).alias("category"), 
                              Business_data.latitude, Business_data.longitude, Business_data.stars,
                              Business_data.review_count)

#Register Temp Table
LatLong.registerTempTable("LatLong")

#Then run a SQL query to filter out the ones only the ones within 15km distance from Toronto Center
BusinessNearTorontofilt = sqlContext.sql("SELECT * FROM  LatLong WHERE \
acos(sin(0.763782941288) * sin(LatLong.latitude * 3.14159 /180) + \
cos(0.763782941288) * cos(LatLong.latitude * 3.14159 /180) * cos((LatLong.longitude * 3.14159 /180) \
- (-1.38598479111))) * 6371 <= 15")

#Then apply the aggregate function on BusinessNearTorontofilt.
BusinessNearTorontofilt.registerTempTable("BusinessNearToronto")
AggQuery = sqlContext.sql("SELECT category,AVG(BusinessNearToronto.stars) AS stars_avg,\
AVG(BusinessNearToronto.review_count) AS Review_count_avg FROM BusinessNearToronto GROUP BY category ORDER BY CATEGORY")

AggQuery.show()

+--------------------+------------------+------------------+
|            category|         stars_avg|  Review_count_avg|
+--------------------+------------------+------------------+
|         3D Printing|              3.25|               6.0|
|          Acai Bowls|               4.5|               9.0|
|         Accessories| 3.508130081300813| 7.800813008130081|
|         Accountants|             3.875| 8.333333333333334|
|      Acne Treatment|             2.875|              10.0|
|         Active Life| 3.795514511873351|12.275725593667547|
|         Acupuncture|  4.30827067669173| 8.774436090225564|
|               Adult|3.8181818181818183| 8.363636363636363|
|     Adult Education|2.6666666666666665| 4.166666666666667|
| Adult Entertainment|2.7954545454545454|10.454545454545455|
|         Advertising|               3.0| 4.666666666666667|
|              Afghan|3.6964285714285716|23.321428571428573|
|             African|3.8461538461538463|26.333333333333332|
|            Airlines|  

In [8]:
AggQuery.count()

791

In [10]:
AggQuery.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question4')