In [1]:
!ls /shared/dataset

business.json				 tip.json
checkin.json				 uscitiesv1.3.csv
Dataset_Challenge_Dataset_Agreement.pdf  user.json
photos.json				 Yelp_Dataset_Challenge_Round_11.pdf
review.json


In [2]:
import pyspark
# sc = pyspark.SparkContext('local[*]')
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [16]:
from pyspark.sql import functions as F

In [3]:
BUSINESS = spark.read.json("/shared/dataset/business.json")
REVIEW = spark.read.json("/shared/dataset/review.json");

In [4]:
BUSINESS.registerTempTable("business") 
REVIEW.registerTempTable("review") 

In [31]:
review = REVIEW.select("business_id", "stars").groupBy(review.business_id).agg(F.avg(review.stars).alias("real_stars"))

In [37]:
business = BUSINESS.join(review, BUSINESS.business_id==review.business_id).drop(review.business_id)

In [38]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: boolean (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- BYOB: boolean (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: struct (nullable = true)
 |    |    |-- friday: boolean (nullable = true)
 |    |    |-- monday: boolean (nullable = true)
 |    |    |-- saturday: boolean (nullab

In [39]:
from pyspark.sql import functions as F

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [40]:
business_ = flatten_df(flatten_df(business))

In [63]:
from pyspark.sql.functions import array_contains
business_ = business_.where(array_contains('categories', 'Restaurants'))

In [64]:
business_.count()

54618

In [65]:
business_.printSchema()

root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- real_stars: double (nullable = true)
 |-- attributes_AcceptsInsurance: boolean (nullable = true)
 |-- attributes_AgesAllowed: string (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_BYOB: boolean (nullable = true)
 |-- attributes_BYOBCorkage: string (nullable = true)
 |-- attributes_BikeParking: boolean (nullable = true)
 |-- attributes_BusinessAcceptsBitcoin: boolean (nullable = true)
 |

In [66]:
# business = business.withColumn('column_as_str',array_to_string_udf(business["categories"]))
business_.toPandas().to_csv('/shared/business_.csv')

In [67]:
! ls /shared

business_clean.csv  flights_train.csv  review.csv
business_.csv	    Homework 4.ipynb   SunSpots_KNN.csv
business.csv	    input.json	       SunSpots_moving_avg.csv
dataset		    lib		       sunspots.txt
flights_test.csv    output	       yelp_dataset.tar


In [68]:
empty_columns = [
'attributes_AcceptsInsurance',
'attributes_AgesAllowed',
'attributes_BYOB',
'attributes_BYOBCorkage',
'attributes_BusinessAcceptsBitcoin',
'attributes_ByAppointmentOnly',
'attributes_CoatCheck',
'attributes_Corkage',
'attributes_DogsAllowed',
'attributes_DriveThru',
'attributes_GoodForDancing',
'attributes_HappyHour',
'attributes_Open24Hours',
'attributes_RestaurantsCounterService',
'attributes_Smoking',
'attributes_BestNights_friday',
'attributes_BestNights_monday',
'attributes_BestNights_saturday',
'attributes_BestNights_sunday',
'attributes_BestNights_thursday',
'attributes_BestNights_tuesday',
'attributes_BestNights_wednesday',
'attributes_DietaryRestrictions_dairy',
'attributes_DietaryRestrictions_gluten',
'attributes_DietaryRestrictions_halal',
'attributes_DietaryRestrictions_kosher',
'attributes_DietaryRestrictions_soy',
'attributes_DietaryRestrictions_vegan',
'attributes_DietaryRestrictions_vegetarian',
'attributes_HairSpecializesIn_africanamerican',
'attributes_HairSpecializesIn_asian',
'attributes_HairSpecializesIn_coloring',
'attributes_HairSpecializesIn_curly',
'attributes_HairSpecializesIn_extensions',
'attributes_HairSpecializesIn_kids',
'attributes_HairSpecializesIn_perms',
'attributes_HairSpecializesIn_straightperms',
'attributes_Music_background_music',
'attributes_Music_dj',
'attributes_Music_jukebox',
'attributes_Music_karaoke',
'attributes_Music_live',
'attributes_Music_no_music',
'attributes_Music_video'
]

In [69]:
business_clean = business_
for column in empty_columns:
    business_clean = business_clean.drop(column)

In [71]:
business_clean.toPandas().to_csv('/shared/business_clean.csv')

In [70]:
business_clean.printSchema()

root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- real_stars: double (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_BikeParking: boolean (nullable = true)
 |-- attributes_BusinessAcceptsCreditCards: boolean (nullable = true)
 |-- attributes_Caters: boolean (nullable = true)
 |-- attributes_GoodForKids: boolean (nullable = true)
 |-- attributes_HasTV: boolean (nullable = true)
 |-- attributes_NoiseLevel: string (nullable = true)
 |-- at

In [150]:
categories = business_clean.select(F.explode("categories")).distinct().count()

In [105]:
business_chinese = business_clean.where(array_contains('categories', 'Chinese'))
business_chinese.toPandas().to_csv('/shared/business_chinese.csv')

In [106]:
business_japanese = business_clean.where(array_contains('categories', 'Japanese'))
business_japanese.toPandas().to_csv('/shared/business_japanese.csv')

In [154]:
state_cate = business_.select(F.explode("categories").alias("category"), "state", "real_stars")\
        .groupby("category", "state")\
        .agg(F.count("*").alias("cnt"), F.avg("real_stars").alias("stars"))\
        .filter("cnt >= 10")\
        .filter("category != 'Restaurants'")

In [148]:
state_cate.printSchema()
state_cate.limit(10).show()

root
 |-- category: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cnt: long (nullable = false)
 |-- stars: double (nullable = true)

+--------------------+-----+-----+------------------+
|            category|state|  cnt|             stars|
+--------------------+-----+-----+------------------+
|               Irish|   ON|   51| 3.192984389454761|
|American (Traditi...|   IL|   93| 3.348595856145919|
|              Indian|  MLN|   10| 3.735530303030303|
|          Sushi Bars|   NV|  296|3.8125834959361637|
|             Italian|   NV|  483|3.4920416712470894|
|           Creperies|   NV|   25| 3.761921396785193|
|            Southern|   ON|   53|3.7080587532207407|
|           Argentine|   ON|   14|3.5758414238808536|
|        Cajun/Creole|   NV|   43|3.6520770959104234|
|         Restaurants|   ON|13501|3.3769128277346017|
+--------------------+-----+-----+------------------+



In [137]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

In [160]:
window = Window.partitionBy(state_cate['state']).orderBy(state_cate['stars'].desc())
top_10_each_state = state_cate.select('*', rank().over(window).alias('rank'))\
          .filter(col('rank') <= 10)

In [161]:
top_10_each_state.toPandas().to_csv('/shared/top_10_each_state.csv')

In [173]:
top_cities_chinese = business_chinese\
                .select('state', 'city', 'real_stars')\
                .groupBy('state', 'city')\
                .agg(F.count('*').alias('cnt'), F.avg('real_stars').alias('stars'))\
                .filter('cnt >= 10')\
                .orderBy(F.col('stars').desc())

In [175]:
top_cities_chinese.toPandas().to_csv('/shared/top_cities_chinese.csv')