# Pre-processing Yelp Data and Joining With Dinesafe Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, MapType

# spark session
spark = SparkSession \
    .builder \
    .appName("Yelp and DineSafe Toronto") \
    .getOrCreate()
sc = spark.sparkContext

# Read datasets from Yelp
#review = spark.read.json('yelp_academic_dataset_review.json')
#tip = spark.read.json('yelp_academic_dataset_tip.json')
#user = spark.read.json('yelp_academic_dataset_user.json')
business = spark.read.json('yelp_academic_dataset_business.json')

# Explore Yelp datasets (see also https://www.yelp.com/dataset/documentation/main)
#review.printSchema()
#tip.printSchema()
#user.printSchema()

In [2]:
# Explore Yelp Business Dataset
business.printSchema()
business.show(5)
business.count()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

209393

In [3]:
# Create rating column in business based on stars
from pyspark.sql import functions as f
business = business.withColumn('rating', f.when(f.col('stars')<=2.5, 'bad').when(f.col('stars')>=4, 'good').otherwise('neutral'))
business.select(['stars','rating']).show(100, truncate=False)

+-----+-------+
|stars|rating |
+-----+-------+
|3.5  |neutral|
|5.0  |good   |
|5.0  |good   |
|2.5  |bad    |
|4.5  |good   |
|4.5  |good   |
|3.5  |neutral|
|5.0  |good   |
|4.5  |good   |
|3.0  |neutral|
|5.0  |good   |
|5.0  |good   |
|3.5  |neutral|
|2.5  |bad    |
|3.5  |neutral|
|5.0  |good   |
|4.5  |good   |
|4.5  |good   |
|4.5  |good   |
|4.5  |good   |
|3.0  |neutral|
|2.5  |bad    |
|4.0  |good   |
|3.5  |neutral|
|4.5  |good   |
|4.5  |good   |
|4.0  |good   |
|2.5  |bad    |
|4.5  |good   |
|4.0  |good   |
|3.0  |neutral|
|3.0  |neutral|
|2.5  |bad    |
|4.0  |good   |
|4.5  |good   |
|5.0  |good   |
|2.5  |bad    |
|3.0  |neutral|
|3.0  |neutral|
|3.5  |neutral|
|3.5  |neutral|
|4.5  |good   |
|3.0  |neutral|
|3.0  |neutral|
|4.0  |good   |
|3.5  |neutral|
|3.5  |neutral|
|3.5  |neutral|
|5.0  |good   |
|3.5  |neutral|
|3.0  |neutral|
|5.0  |good   |
|3.5  |neutral|
|4.5  |good   |
|3.0  |neutral|
|3.5  |neutral|
|2.5  |bad    |
|4.0  |good   |
|2.5  |bad    |
|3.0  |n

In [4]:
# Create data frame with Yelp establishments in Toronto (might miss other cities close to metropolitan area of Toronto)
from pyspark.sql.functions import *

toronto_business = business.filter(business.city.contains("Toronto"))
 
# lowercase and trim address; filter out records with no address
toronto_business = toronto_business.withColumn("address", lower(toronto_business["address"]))
toronto_business = toronto_business.withColumn("address", trim(toronto_business["address"]))
toronto_business = toronto_business.filter(toronto_business.address != "")

# lowercase and trim name; filter out records with no name
toronto_business = toronto_business.withColumn("name", lower(toronto_business["name"]))
toronto_business = toronto_business.withColumn("name", trim(toronto_business["name"]))
toronto_business = toronto_business.filter(toronto_business.address != "")

In [5]:
# explore Toronto businesses
toronto_business.printSchema()
toronto_business.show(5, truncate=False)
print("There are {} businesses in Yelp - Toronto".format(toronto_business.count()))

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [6]:
# Load DineSafe Data
dineSafe = spark.read.parquet('dineSafeParsed.parquet')

In [7]:
# Yelp longitude and latitude data are more granular than dineSafe data (see https://gis.stackexchange.com/questions/8650/measuring-accuracy-of-latitude-and-longitude)
# It seems that 4 decimal digits will be enough to identify businesses
# PROBLEM: businesses with same latitude-longitude coordinate but different floors
toronto_business.select("address","longitude","latitude").sort("address").show(10)
dineSafe.select("ds_address","ds_longitude","ds_latitude").sort("ds_address").show(10)

+--------------------+--------------+-------------+
|             address|     longitude|     latitude|
+--------------------+--------------+-------------+
|           , m5v 3w4|   -79.4105157|   43.6413544|
|            , unit 2|-79.4212992489|43.6417010198|
| , various locations|-79.3836021423|43.6531554223|
|1 - 2574 st. clai...|   -79.4893371|   43.6678619|
|1 - 3096 danforth...|   -79.2880336|   43.6914855|
|   1 - 473 church st|   -79.3804174|   43.6645109|
|1 - 5235 steeles ...|    -79.552379|   43.7688249|
|1 - 70 eglinton s...|     -79.29961|    43.723833|
|     1 adelaide st e|   -79.3776407|   43.6505593|
|     1 adelaide st e|   -79.3777975|   43.6501613|
+--------------------+--------------+-------------+
only showing top 10 rows

+------------------+------------+-----------+
|        ds_address|ds_longitude|ds_latitude|
+------------------+------------+-----------+
|1 1/2 garfield ave|   -79.38331|   43.68731|
|   1 adelaide st e|   -79.37794|   43.65051|
|   1 adelaide s

In [8]:
# Join Toronto_Yelp AND DineSafe Datasets with various methods

# Join by address DFs toronto_business and dineSafe datasets
joinedByAddress = dineSafe.join(toronto_business, dineSafe.ds_address == toronto_business.address)

# Join by name DFs toronto_business and dineSafe datasets
joinedByName = dineSafe.join(toronto_business, dineSafe.ds_name == toronto_business.name)

# Join by coordinates DFs toronto_business and dineSafe datasets
joinedByCoordinates = dineSafe.join(toronto_business, [dineSafe.ds_latitude == toronto_business.latitude, dineSafe.ds_longitude == toronto_business.longitude])

# Join by name AND address DFs toronto_business and dineSafe datasets
joinedByNameAndAddress = dineSafe.join(toronto_business, [dineSafe.ds_address == toronto_business.address, dineSafe.ds_name == toronto_business.name])

In [9]:
# explore joined
print("There are {} matches in joinedByAddress dataset of dineSafe and Yelp".format(joinedByAddress.count()))
print("There are {} matches in joinedByName dataset of dineSafe and Yelp".format(joinedByName.count()))
print("There are {} matches in joinedByCoordinates dataset of dineSafe and Yelp".format(joinedByCoordinates.count()))
print("There are {} matches in joinedByNameAndAddress dataset of dineSafe and Yelp".format(joinedByNameAndAddress.count()))

There are 17280 matches in joinedByAddress dataset of dineSafe and Yelp
There are 276662 matches in joinedByName dataset of dineSafe and Yelp
There are 6 matches in joinedByCoordinates dataset of dineSafe and Yelp
There are 1401 matches in joinedByNameAndAddress dataset of dineSafe and Yelp


In [10]:
# Quality of matches: see some matches
vars_to_select = ["ds_address","address","ds_longitude","longitude","ds_latitude","latitude","ds_name","name"]

joinedByAddress.select(vars_to_select).show(15, truncate=False)
joinedByNameAndAddress.select(vars_to_select).show(15, truncate=False)
joinedByName.select(vars_to_select).show(15, truncate=False)

+--------------+--------------+------------+-----------+-----------+----------+-------------------------+----------------------------+
|ds_address    |address       |ds_longitude|longitude  |ds_latitude|latitude  |ds_name                  |name                        |
+--------------+--------------+------------+-----------+-----------+----------+-------------------------+----------------------------+
|100 bloor st e|100 bloor st e|-79.38459   |-79.3846434|43.67096   |43.6709361|the market by longo's    |the market by longo's       |
|100 bloor st e|100 bloor st e|-79.38459   |-79.3846434|43.67096   |43.6709361|the market by longo's    |the market by longo's       |
|100 bloor st e|100 bloor st e|-79.38459   |-79.3846434|43.67096   |43.6709361|the market by longo's    |the market by longo's       |
|100 bloor st e|100 bloor st e|-79.38459   |-79.3846434|43.67096   |43.6709361|the market by longo's    |the market by longo's       |
|100 bloor st e|100 bloor st e|-79.38459   |-79.3846434

In [11]:
# examine final data
joinedByNameAndAddress.printSchema()
joinedByNameAndAddress.show(10)
joinedByNameAndAddress.count()

root
 |-- ds_address: string (nullable = true)
 |-- ds_id: string (nullable = true)
 |-- ds_latitude: double (nullable = true)
 |-- ds_longitude: double (nullable = true)
 |-- ds_name: string (nullable = true)
 |-- ds_status: string (nullable = true)
 |-- ds_type: string (nullable = true)
 |-- ds_inspection: string (nullable = true)
 |-- ds_id_ins: string (nullable = true)
 |-- ds_pos_ins: string (nullable = true)
 |-- ds_date: string (nullable = true)
 |-- ds_status_ins: string (nullable = true)
 |-- ds_severity: string (nullable = true)
 |-- ds_action: string (nullable = true)
 |-- ds_subdatabase: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-

1401

In [None]:
# Saving dataset as parquet file
joinedByNameAndAddress.write.option("compression", "gzip").parquet('data.parquet')

In [None]:
# Read in parquet file
dataRetrieved = spark.read.parquet('data.parquet')