In [119]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import os
import re
from functools import reduce
import pandas as pd

In [22]:
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

# Process business

In [17]:
business = sc.textFile("s3n://usfca-distributedcomputing/yelp_business.csv", 24)

In [18]:
header = business.first()
business = business.filter(lambda row: row != header)

# Clean data before splitting
business = business.map(lambda x: re.sub(',(?=[^"]*"[^"]*(?:"[^"]*"[^"]*)*$)','', x))

In [19]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v)

In [20]:
business = business.map(lambda x: x.split(','))
business = business.map(lambda row: [toDoubleSafe(x) for x in row])
business.take(2)

[['FYWN1wneV18bWNgQjJ2GNg',
  '"""Dental by Design"""',
  '',
  '"""4855 E Warner Rd Ste B9"""',
  'Ahwatukee',
  'AZ',
  85044.0,
  33.3306902,
  -111.9785992,
  4.0,
  22.0,
  1.0,
  'Dentists;General Dentistry;Health & Medical;Oral Surgeons;Cosmetic Dentists;Orthodontists'],
 ['He-G7vWjzVUysIKrfNbPUQ',
  '"""Stephen Szabo Salon"""',
  '',
  '"""3101 Washington Rd"""',
  'McMurray',
  'PA',
  15317.0,
  40.2916853,
  -80.1048999,
  3.0,
  11.0,
  1.0,
  "Hair Stylists;Hair Salons;Men's Hair Salons;Blow Dry/Out Services;Hair Extensions;Beauty & Spas"]]

In [23]:
business_df = ss.createDataFrame(business)

In [24]:
business_df.show(3)

+--------------------+--------------------+---+--------------------+---------+---+-------+----------+------------+---+----+---+--------------------+
|                  _1|                  _2| _3|                  _4|       _5| _6|     _7|        _8|          _9|_10| _11|_12|                 _13|
+--------------------+--------------------+---+--------------------+---------+---+-------+----------+------------+---+----+---+--------------------+
|FYWN1wneV18bWNgQj...|"""Dental by Desi...|   |"""4855 E Warner ...|Ahwatukee| AZ|85044.0|33.3306902|-111.9785992|4.0|22.0|1.0|Dentists;General ...|
|He-G7vWjzVUysIKrf...|"""Stephen Szabo ...|   |"""3101 Washingto...| McMurray| PA|15317.0|40.2916853| -80.1048999|3.0|11.0|1.0|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|"""Western Motor ...|   |"""6025 N 27th Av...|  Phoenix| AZ|85017.0|33.5249025|-112.1153098|1.5|18.0|1.0|Departments of Mo...|
+--------------------+--------------------+---+--------------------+---------+---+-------+----------+-----

In [32]:
old_columns = business_df.schema.names
new_columns = header.split(',')

business_df = reduce(lambda data, idx: data.withColumnRenamed(old_columns[idx],
                                                              new_columns[idx]),
                     range(len(old_columns)), business_df)

In [33]:
business_df.show(3)

+--------------------+--------------------+------------+--------------------+---------+-----+-----------+----------+------------+-----+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|     city|state|postal_code|  latitude|   longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+---------+-----+-----------+----------+------------+-----+------------+-------+--------------------+
|FYWN1wneV18bWNgQj...|"""Dental by Desi...|            |"""4855 E Warner ...|Ahwatukee|   AZ|    85044.0|33.3306902|-111.9785992|  4.0|        22.0|    1.0|Dentists;General ...|
|He-G7vWjzVUysIKrf...|"""Stephen Szabo ...|            |"""3101 Washingto...| McMurray|   PA|    15317.0|40.2916853| -80.1048999|  3.0|        11.0|    1.0|Hair Stylists;Hai...|
|KQPW8lFf1y5BT2Mxi...|"""Western Motor ...|            |"""6025 N 27th Av...|  Phoenix|   AZ|    85017.0|33.52

In [34]:
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- is_open: double (nullable = true)
 |-- categories: string (nullable = true)



In [41]:
# Filter and keep only restaurants that are opened
restaurants_df = business_df.filter(business_df['categories'].contains('Restaurant')).filter(business_df['is_open']==1)
restaurants_df.show(2)

+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+----------+-----------+-----+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|          city|state|postal_code|  latitude|  longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+----------+-----------+-----+------------+-------+--------------------+
|PfOCPjBrlQAnz__NX...|"""Brick House Ta...|            |  """581 Howe Ave"""|Cuyahoga Falls|   OH|    44221.0|41.1195346|-81.4756898|  3.5|       116.0|    1.0|American (New);Ni...|
|o9eMRCWt5PkpLDE0g...|       """Messina"""|            |"""Richterstr. 11"""|     Stuttgart|   BW|    70567.0|   48.7272|    9.14795|  4.0|         5.0|    1.0| Italian;Restaurants|
+--------------------+--------------------+------------+--------------------+-------------

In [40]:
restaurants_df.count()

54630

In [43]:
# Keep only interesting variables
restaurants_df = restaurants_df.select("business_id", "latitude", "longitude", "stars", "review_count")
restaurants_df.show(3)

+--------------------+----------+-----------+-----+------------+
|         business_id|  latitude|  longitude|stars|review_count|
+--------------------+----------+-----------+-----+------------+
|PfOCPjBrlQAnz__NX...|41.1195346|-81.4756898|  3.5|       116.0|
|o9eMRCWt5PkpLDE0g...|   48.7272|    9.14795|  4.0|         5.0|
|fNMVV_ZX7CJSDWQGd...|35.2216474|-80.8393449|  3.5|         7.0|
+--------------------+----------+-----------+-----+------------+
only showing top 3 rows



In [54]:
# No Null values
print(restaurants_df.filter(restaurants_df.review_count.isNull()).count())
print(restaurants_df.filter(restaurants_df.business_id.isNull()).count())
print(restaurants_df.filter(restaurants_df.longitude.isNull()).count())
print(restaurants_df.filter(restaurants_df.latitude.isNull()).count())
print(restaurants_df.filter(restaurants_df.stars.isNull()).count())

0
0
0
0
0


# Process business attributes

In [79]:
attributes_df = ss.read.csv("s3n://usfca-distributedcomputing/yelp_business_attributes.csv", header=True)

In [80]:
attributes_df = attributes_df.select("BusinessAcceptsCreditCards", "BusinessParking_garage",
                                     "BusinessParking_street", "BusinessParking_lot",
                                     "BusinessParking_valet", "WheelchairAccessible", 
                                     "BikeParking", "Alcohol", "HappyHour", "OutdoorSeating",
                                     "DogsAllowed", "business_id")
attributes_df.printSchema()

root
 |-- BusinessAcceptsCreditCards: string (nullable = true)
 |-- BusinessParking_garage: string (nullable = true)
 |-- BusinessParking_street: string (nullable = true)
 |-- BusinessParking_lot: string (nullable = true)
 |-- BusinessParking_valet: string (nullable = true)
 |-- WheelchairAccessible: string (nullable = true)
 |-- BikeParking: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- HappyHour: string (nullable = true)
 |-- OutdoorSeating: string (nullable = true)
 |-- DogsAllowed: string (nullable = true)
 |-- business_id: string (nullable = true)



In [81]:
# Create Parking boolean column if Car parking available
attributes_df = attributes_df.withColumn("Parking",
                                         (attributes_df['BusinessParking_garage'] == 'True') | 
                                         (attributes_df['BusinessParking_street'] == 'True') |
                                         (attributes_df['BusinessParking_lot'] == 'True') |
                                         (attributes_df['BusinessParking_valet'] == 'True'))
attributes_df = attributes_df.drop('BusinessParking_garage', 'BusinessParking_street',
                                   'BusinessParking_lot', 'BusinessParking_valet')

In [83]:
attributes_df.printSchema()

root
 |-- BusinessAcceptsCreditCards: string (nullable = true)
 |-- WheelchairAccessible: string (nullable = true)
 |-- BikeParking: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- HappyHour: string (nullable = true)
 |-- OutdoorSeating: string (nullable = true)
 |-- DogsAllowed: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- Parking: boolean (nullable = true)



In [84]:
# Create boolean values for all variables
attributes_df = attributes_df.withColumn("Accepts_Credit_Cards", 
                                         attributes_df['BusinessAcceptsCreditCards'] == 'True')
attributes_df = attributes_df.withColumn("Wheelchair_Accessible", 
                                         attributes_df['WheelchairAccessible'] == 'True')
attributes_df = attributes_df.withColumn("Bike_Parking", 
                                         attributes_df['BikeParking'] == 'True')
attributes_df = attributes_df.withColumn("Alcohol_drinks", 
                                         attributes_df['Alcohol'] == 'True')
attributes_df = attributes_df.withColumn("Happy_Hour", 
                                         attributes_df['HappyHour'] == 'True')
attributes_df = attributes_df.withColumn("Outdoor_Seating", 
                                         attributes_df['OutdoorSeating'] == 'True')
attributes_df = attributes_df.withColumn("Dogs_Allowed", 
                                         attributes_df['DogsAllowed'] == 'True')


attributes_df = attributes_df.drop('BusinessAcceptsCreditCards', 'WheelchairAccessible', 'BikeParking',
                                   'Alcohol', 'HappyHour', 'OutdoorSeating', 'DogsAllowed')

In [85]:
attributes_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- Parking: boolean (nullable = true)
 |-- Accepts_Credit_Cards: boolean (nullable = true)
 |-- Wheelchair_Accessible: boolean (nullable = true)
 |-- Bike_Parking: boolean (nullable = true)
 |-- Alcohol_drinks: boolean (nullable = true)
 |-- Happy_Hour: boolean (nullable = true)
 |-- Outdoor_Seating: boolean (nullable = true)
 |-- Dogs_Allowed: boolean (nullable = true)



# Process Checkin

In [110]:
checkin_df = ss.read.csv("s3n://usfca-distributedcomputing/yelp_checkin.csv", inferSchema=True, header=True)
checkin_df.show(5)
checkin_df.printSchema()

+--------------------+-------+-----+--------+
|         business_id|weekday| hour|checkins|
+--------------------+-------+-----+--------+
|3Mc-LxcqeguOXOVT_...|    Tue| 0:00|      12|
|SVFx6_epO22bZTZnK...|    Wed| 0:00|       4|
|vW9aLivd4-IorAfSt...|    Tue|14:00|       1|
|tEzxhauTQddACyqdJ...|    Fri|19:00|       1|
|CEyZU32P-vtMhgqRC...|    Tue|17:00|       1|
+--------------------+-------+-----+--------+
only showing top 5 rows

root
 |-- business_id: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- checkins: integer (nullable = true)



In [111]:
# check null
print(checkin_df.where(checkin_df.business_id.isNull()).count())
print(checkin_df.where(checkin_df.weekday.isNull()).count())
print(checkin_df.where(checkin_df.hour.isNull()).count())
print(checkin_df.where(checkin_df.checkins.isNull()).count())

0
0
0
0


In [112]:
# counts of business_id, max 168 (7*24)
id_ct = checkin_df.groupBy("business_id").count().orderBy("count", ascending=False)
id_ct.show(10)

+--------------------+-----+
|         business_id|count|
+--------------------+-----+
|DYAorbxOyubUB_wtQ...|  168|
|yfxDa8RFOvJPQh0rN...|  168|
|ccGIAOl08zHt2lcn_...|  168|
|IB8zLlGraOg9LU7qQ...|  168|
|ByFMv3p5X1aNeZhU6...|  168|
|YzuWj3u9BZRpxNmd1...|  168|
|VxCnyVYn-FFgv6F1E...|  168|
|DfgZlNgKwBvCpA_0a...|  168|
|p0iEUamJVp_QpaheE...|  168|
|IWKtGvVg4hqc9rWHj...|  168|
+--------------------+-----+
only showing top 10 rows



In [113]:
# evenly distributed
checkin_df.groupBy("weekday").count().show()

+-------+------+
|weekday| count|
+-------+------+
|    Sun|567181|
|    Mon|500373|
|    Thu|545489|
|    Sat|653551|
|    Wed|537730|
|    Tue|522104|
|    Fri|584790|
+-------+------+



In [115]:
checkin_df.write.option("path", "s3n://usfca-distributedcomputing/checkin_df").saveAsTable("checkin_df")

In [117]:
# keep ids with at least 7*8=56 occurences
checkin_v1 = ss.sql("with checkin as (select * from parquet.`s3n://usfca-distributedcomputing/checkin_df`) \
                     , id_ct as (select business_id, count(*) as ct from checkin group by business_id having ct >= 56) \
                     select * from checkin where business_id in (select business_id from id_ct)")
print(checkin_v1.count())
checkin_v1.show()

2090061
+--------------------+-------+-----+--------+
|         business_id|weekday| hour|checkins|
+--------------------+-------+-----+--------+
|--9e1ONYQuAa-CB_R...|    Mon| 3:00|      46|
|--9e1ONYQuAa-CB_R...|    Wed|22:00|       2|
|--9e1ONYQuAa-CB_R...|    Fri| 0:00|      11|
|--9e1ONYQuAa-CB_R...|    Tue|14:00|       1|
|--9e1ONYQuAa-CB_R...|    Thu| 2:00|      53|
|--9e1ONYQuAa-CB_R...|    Sun| 4:00|     133|
|--9e1ONYQuAa-CB_R...|    Fri|17:00|       1|
|--9e1ONYQuAa-CB_R...|    Mon|21:00|       6|
|--9e1ONYQuAa-CB_R...|    Tue|20:00|       9|
|--9e1ONYQuAa-CB_R...|    Sat|22:00|      10|
|--9e1ONYQuAa-CB_R...|    Tue| 5:00|      17|
|--9e1ONYQuAa-CB_R...|    Sat|18:00|      12|
|--9e1ONYQuAa-CB_R...|    Fri|23:00|       3|
|--9e1ONYQuAa-CB_R...|    Wed|20:00|      10|
|--9e1ONYQuAa-CB_R...|    Sat|23:00|       6|
|--9e1ONYQuAa-CB_R...|    Sat| 0:00|      19|
|--9e1ONYQuAa-CB_R...|    Tue|19:00|      13|
|--9e1ONYQuAa-CB_R...|    Sun|13:00|       1|
|--9e1ONYQuAa-CB_R...|    

In [120]:
def is_weekend(day):
    if day in ["Sat", "Sun"]:
        return "Weekend"
    else:
        return "Weekday"
    
check_is_weekend = udf(is_weekend)

In [121]:
# flag weekend
checkin_v1 = checkin_v1.select(["business_id", "hour", "checkins", "weekday", check_is_weekend("weekday")])
checkin_v1 = checkin_v1.withColumnRenamed("is_weekend(weekday)", "weekend")
checkin_v1.show(3)

+--------------------+-----+--------+-------+-------+
|         business_id| hour|checkins|weekday|weekend|
+--------------------+-----+--------+-------+-------+
|--9e1ONYQuAa-CB_R...| 3:00|      46|    Mon|Weekday|
|--9e1ONYQuAa-CB_R...|22:00|       2|    Wed|Weekday|
|--9e1ONYQuAa-CB_R...| 0:00|      11|    Fri|Weekday|
+--------------------+-----+--------+-------+-------+
only showing top 3 rows



In [122]:
# average hourly checkin numbers
avg_hourly_checkin = checkin_v1.groupBy(["business_id"]).pivot("weekend").agg(avg("checkins"))
print(avg_hourly_checkin.count())
avg_hourly_checkin.show(5)

24450
+--------------------+------------------+------------------+
|         business_id|           Weekday|           Weekend|
+--------------------+------------------+------------------+
|6rFjfFSavabsIxEKu...|1.1428571428571428|2.6363636363636362|
|fYqr99HXATERIGXQm...| 3.511627906976744|2.9583333333333335|
|0bqV9uzFVz98Bn_RI...| 5.087719298245614| 4.208333333333333|
|V_maCS_uBRMjqa_BC...|1.9642857142857142|               1.7|
|0859wfd1BQHG46Zpw...| 8.036363636363637|              11.0|
+--------------------+------------------+------------------+
only showing top 5 rows



# Join the data

In [88]:
# Left_outer join
joined_df = restaurants_df.join(attributes_df, 'business_id', 'left_outer')

In [92]:
joined_df.show(2)

+--------------------+----------+-----------+-----+------------+-------+--------------------+---------------------+------------+--------------+----------+---------------+------------+
|         business_id|  latitude|  longitude|stars|review_count|Parking|Accepts_Credit_Cards|Wheelchair_Accessible|Bike_Parking|Alcohol_drinks|Happy_Hour|Outdoor_Seating|Dogs_Allowed|
+--------------------+----------+-----------+-----+------------+-------+--------------------+---------------------+------------+--------------+----------+---------------+------------+
|PfOCPjBrlQAnz__NX...|41.1195346|-81.4756898|  3.5|       116.0|  false|               false|                false|       false|         false|     false|          false|       false|
|o9eMRCWt5PkpLDE0g...|   48.7272|    9.14795|  4.0|         5.0|  false|               false|                false|       false|         false|     false|          false|       false|
+--------------------+----------+-----------+-----+------------+-------+--------

In [93]:
joined_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- Parking: boolean (nullable = true)
 |-- Accepts_Credit_Cards: boolean (nullable = true)
 |-- Wheelchair_Accessible: boolean (nullable = true)
 |-- Bike_Parking: boolean (nullable = true)
 |-- Alcohol_drinks: boolean (nullable = true)
 |-- Happy_Hour: boolean (nullable = true)
 |-- Outdoor_Seating: boolean (nullable = true)
 |-- Dogs_Allowed: boolean (nullable = true)



In [96]:
# We had some restaurant from business table not in business_attributes
# Need to treat the null values
joined_df.groupBy(joined_df["Parking"]).count().orderBy("count", ascending=False).show()

+-------+-----+
|Parking|count|
+-------+-----+
|  false|28652|
|   true|10885|
|   null|  868|
+-------+-----+



In [108]:
# Replace null values by false (most common value)
joined_df = joined_df.na.fill(False)

In [109]:
joined_df.groupBy(joined_df["Parking"]).count().orderBy("count", ascending=False).show()

+-------+-----+
|Parking|count|
+-------+-----+
|  false|29520|
|   true|10885|
+-------+-----+



In [124]:
joined_df = joined_df.join(avg_hourly_checkin, 'business_id')

In [125]:
joined_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: double (nullable = true)
 |-- Parking: boolean (nullable = false)
 |-- Accepts_Credit_Cards: boolean (nullable = false)
 |-- Wheelchair_Accessible: boolean (nullable = false)
 |-- Bike_Parking: boolean (nullable = false)
 |-- Alcohol_drinks: boolean (nullable = false)
 |-- Happy_Hour: boolean (nullable = false)
 |-- Outdoor_Seating: boolean (nullable = false)
 |-- Dogs_Allowed: boolean (nullable = false)
 |-- Weekday: double (nullable = true)
 |-- Weekend: double (nullable = true)



# Save data

In [None]:
joined_df.write.option("path", "s3n://usfca-distributedcomputing/yelp_data").saveAsTable('yelp_data')

# Read data

In [None]:
ss.sql("select * from parquet.`s3n://usfca-distributedcomputing/yelp_data`").show(3)