In [14]:
import findspark
findspark.init()
from pyspark import SparkContext as sc
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import json
import pyspark

#data_path = ['../Data']
#Business_filepath = os.sep.join(['yelp_academic_dataset_business.json'])




#Load Business data
#Business_data = sqlContext.read.json(Business_filepath)

In [15]:
#Initializing Spark Context
spark = SparkSession.builder.appName("Yelp Data Analysis").getOrCreate()

In [16]:
#Reading JSON data
Business_data = spark.read.json("Data/yelp_academic_dataset_business.json")
Reviews_data = spark.read.json("Data/yelp_academic_dataset_review.json")
User_data = spark.read.json("Data/yelp_academic_dataset_user.json")

In [17]:
Business_data.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [18]:
Reviews_data.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [19]:
User_data.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [20]:
COVID_data.printSchema()

root
 |-- Call To Action enabled: string (nullable = true)
 |-- Covid Banner: string (nullable = true)
 |-- Grubhub enabled: string (nullable = true)
 |-- Request a Quote Enabled: string (nullable = true)
 |-- Temporary Closed Until: string (nullable = true)
 |-- Virtual Services Offered: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- delivery or takeout: string (nullable = true)
 |-- highlights: string (nullable = true)



In [21]:
COVID_data = COVID_data.withColumnRenamed('Call To Action enabled', 'Call_To_Action_enabled')\
                        .withColumnRenamed('Covid Banner', 'Covid_Banner')\
                        .withColumnRenamed('Grubhub enabled', 'Grubhub_enabled')\
                        .withColumnRenamed('Request a Quote Enabled', 'Request_a_Quote_Enabled')\
                        .withColumnRenamed('Temporary Closed Until', 'Temporary_Closed_Until')\
                        .withColumnRenamed('Virtual Services Offered', 'Virtual_Services_Offered')\
                        .withColumnRenamed('delivery or takeout', 'delivery_or_takeout')\

In [30]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import col

#Filter out the attributes that you need
#arr_categories = Business_data.loc[:,'categories'].values

Business = Business_data.select(explode(split(col('categories'), "\s+")).alias("category"),Business_data.state,  Business_data.city, Business_data.stars, Business_data.review_count)

#Register as temp table
Business.registerTempTable("Business_Agg")

#Run the SQL Query
result = sqlContext.sql("SELECT Business_Agg.city, Business_Agg.category,\
AVG(Business_Agg.review_count) As AverageReview,\
AVG(Business_Agg.stars) as AverageStars FROM Business_Agg GROUP BY Business_Agg.city, Business_Agg.category")

#saving the result in a csv file
result.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question1')
#Check the result
result.show()

+-------------------+-------------+------------------+------------------+
|               city|     category|     AverageReview|      AverageStars|
+-------------------+-------------+------------------+------------------+
|         Scottsdale|       Health|17.955139125496878| 4.034923339011925|
|          Beachwood|         Arts|             8.875|            4.3125|
|        Scarborough|    Shopping,| 7.475806451612903|3.0524193548387095|
|        Rocky River|     Spirits,|              31.0|             3.625|
|              Parma|(Traditional)|48.285714285714285| 3.357142857142857|
|          East York|      Donuts,|               3.0|               1.5|
|            Phoenix|    Services,|24.197680157946692|3.6712117472852914|
|             Peoria|      Fishing|              29.5|              4.75|
|          Las Vegas|         Vape|24.819875776397517| 4.062111801242236|
|           Glendale|      Massage|15.084745762711865| 4.338983050847458|
|             Peoria|         Nail| 34

In [32]:
#Flatten the category array
CategoryExplo = Business_data.select(explode(split(col('categories'), "\s+")).alias("category"),\
                                    Business_data.attributes, Business_data.stars)

#Filter out Mexican and TakeOut
CategoryAtt = CategoryExplo.select(CategoryExplo.attributes.RestaurantsTakeOut.alias("takeout"),\
                                   CategoryExplo.category, CategoryExplo.stars)

CategoryAtt.registerTempTable("CategoryAtt")

#Run the query on the table
MexicanTakeout = "SELECT category, AVG(stars) AS Stars FROM CategoryAtt WHERE category = 'Mexican' \
AND takeout = True GROUP BY category"
RatingMexicanTakeO = sqlContext.sql(MexicanTakeout)

RatingMexicanTakeO.show()

+--------+------------------+
|category|             Stars|
+--------+------------------+
| Mexican|3.3838582677165356|
+--------+------------------+



In [33]:
RatingMexicanTakeO.coalesce(1).write.format('com.databricks.spark.csv').option("header", "true").save('Question3')