The goal of the project is to build an ETL pipline that extract yelp data and weather data from S3, processes them using spark, and loads the data back in to s3 as dimensional tables. This would help yelp team find insights from the data, and the data can be used to provide business owners a resource to help them make strategic decisions.

This notebook is to explore the data and help design the data model.

In [1]:
#! /home/rick/anaconda3/envs/de_capstone/bin/python

import os
import datetime
import configparser

os.environ['SPARK_HOME'] = '/home/rick/spark-3.0.0-preview2-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON'] = '/home/rick/anaconda3/envs/udacity_de/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/rick/anaconda3/envs/udacity_de/bin/ipython'

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

import pandas as pd
import numpy as np

In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [3]:
project_dir = '/mnt/data-ubuntu/Projects/data_engineering_projects/project_6_capstone'

In [4]:
# Import json as spark dataframe
df_review = spark.read.json(project_dir + '/data/yelp-dataset/yelp_academic_dataset_review.json')
df_business = spark.read.json(project_dir + '/data/yelp-dataset/yelp_academic_dataset_business.json')
df_checkin = spark.read.json(project_dir + '/data/yelp-dataset/yelp_academic_dataset_checkin.json')
df_tip = spark.read.json(project_dir + '/data/yelp-dataset/yelp_academic_dataset_tip.json')
df_user = spark.read.json(project_dir + '/data/yelp-dataset/yelp_academic_dataset_user.json')

## Check schema and update data type

In [5]:
df_review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [6]:
# Check total rows.
df_review.count()

8021122

In [7]:
# Check the data.
df_review.take(2)

[Row(business_id='-MhfebM0QIsKt87iDN-FNw', cool=0, date='2015-04-15 05:21:16', funny=0, review_id='xQY8N_XvtGbearJ5X4QryQ', stars=2.0, text='As someone who has worked with many museums, I was eager to visit this gallery on my most recent trip to Las Vegas. When I saw they would be showing infamous eggs of the House of Faberge from the Virginia Museum of Fine Arts (VMFA), I knew I had to go!\n\nTucked away near the gelateria and the garden, the Gallery is pretty much hidden from view. It\'s what real estate agents would call "cozy" or "charming" - basically any euphemism for small.\n\nThat being said, you can still see wonderful art at a gallery of any size, so why the two *s you ask? Let me tell you:\n\n* pricing for this, while relatively inexpensive for a Las Vegas attraction, is completely over the top. For the space and the amount of art you can fit in there, it is a bit much.\n* it\'s not kid friendly at all. Seriously, don\'t bring them.\n* the security is not trained properly fo

In [8]:
df_review.select('review_id') \
    .groupBy('review_id') \
    .count() \
    .sort(F.desc('count')) \
    .show()
# There is no duplicates in review_id

+--------------------+-----+
|           review_id|count|
+--------------------+-----+
|jdeXUl1zbo8Vsf0TE...|    1|
|1xk6Zpi5RIHhww0Oi...|    1|
|rbewsDf1t_kp_aybe...|    1|
|xFp1PMJRPvJfG4Kuz...|    1|
|s4OE2CR1ZBUOp4zcV...|    1|
|FGfB9royRBNIJGn7p...|    1|
|bNaTfd98aSrCHnZkI...|    1|
|ja65_A4GG_r6h5SFf...|    1|
|GR0B51lu8ZJYvrhys...|    1|
|M8G8bhMTJZCzeu58Z...|    1|
|u7DqzfN7AFaJr3WaI...|    1|
|bEF1LA2ljJYJ2B0eO...|    1|
|Rto7Hf2Yt6u3K4BAC...|    1|
|vxutX6TeYTYPVdhjm...|    1|
|yy2kR5UttSCG-3L6w...|    1|
|YoIMkh_TcMFnmL6tw...|    1|
|0RvujvqEgdEimjVjv...|    1|
|huFwb7WqheWETocTR...|    1|
|sbazF6-M3hSc1XnUG...|    1|
|3W6X1kzjEFmgVi0PI...|    1|
+--------------------+-----+
only showing top 20 rows



In [9]:
df_review.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|-MhfebM0QIsKt87iD...|   0|2015-04-15 05:21:16|    0|xQY8N_XvtGbearJ5X...|  2.0|As someone who ha...|     5|OwjRMXRC0KyPrIlcj...|
|lbrU8StCq3yDfr-QM...|   0|2013-12-07 03:16:52|    1|UmFMZ8PyXZTY2Qcwz...|  1.0|I am actually hor...|     1|nIJD_7ZXHq-FX8byP...|
|HQl28KMwrEKHqhFrr...|   0|2015-12-05 03:18:11|    0|LG2ZaYiOgpr2DK_90...|  5.0|I love Deagan's. ...|     1|V34qejxNsCbcgD8C0...|
|5JxlZaqCnk1MnbgRi...|   0|2011-05-27 05:30:52|    0|i6g_oA9Yf9Y31qt0w...|  1.0|Dismal, lukewarm,...|     0|ofKDkJKXSKZXu5xJN...|
|IS4cv902ykd8wj1TR...|   0|2017-01-14 21:56:57|    0|6TdNDKywdbjoTkize...|  4.0|Oh happy d

In [10]:
# Check missing values
df_review \
    .select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_review.columns)) \
    .show()
# There is no missing values. It's a great news.

+-----------+----+----+-----+---------+-----+----+------+-------+
|business_id|cool|date|funny|review_id|stars|text|useful|user_id|
+-----------+----+----+-----+---------+-----+----+------+-------+
|          0|   0|   0|    0|        0|    0|   0|     0|      0|
+-----------+----+----+-----+---------+-----+----+------+-------+



In [11]:
# Change the date column to timestamp
df_review = df_review \
    .withColumn('time_stamp',
                F.unix_timestamp('date', 'yyyy-MM-dd HH:mm:ss') \
                    .cast(T.TimestampType())) \
    .drop('date')

In [12]:
df_review.take(1)

[Row(business_id='-MhfebM0QIsKt87iDN-FNw', cool=0, funny=0, review_id='xQY8N_XvtGbearJ5X4QryQ', stars=2.0, text='As someone who has worked with many museums, I was eager to visit this gallery on my most recent trip to Las Vegas. When I saw they would be showing infamous eggs of the House of Faberge from the Virginia Museum of Fine Arts (VMFA), I knew I had to go!\n\nTucked away near the gelateria and the garden, the Gallery is pretty much hidden from view. It\'s what real estate agents would call "cozy" or "charming" - basically any euphemism for small.\n\nThat being said, you can still see wonderful art at a gallery of any size, so why the two *s you ask? Let me tell you:\n\n* pricing for this, while relatively inexpensive for a Las Vegas attraction, is completely over the top. For the space and the amount of art you can fit in there, it is a bit much.\n* it\'s not kid friendly at all. Seriously, don\'t bring them.\n* the security is not trained properly for the show. When the curatin

The next steps for the review:
    1. Feature engineering based on the text.
    2. Process the reveiw text for word cloud.(eg. Split the words)

### Data Cleansing for business

In [13]:
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if (c[1][:6] == 'struct')]

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [14]:
# flatten the df_business
df_business = flatten_df(df_business)
df_business.printSchema()

root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- attributes_AcceptsInsurance: string (nullable = true)
 |-- attributes_AgesAllowed: string (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_Ambience: string (nullable = true)
 |-- attributes_BYOB: string (nullable = true)
 |-- attributes_BYOBCorkage: string (nullable = true)
 |-- attributes_BestNights: string (nullable = true)
 |-- attributes_BikeParking: string (nullable = true)
 |-- attributes_BusinessAcceptsBitcoin: string (nullable = true)
 |-- attributes_BusinessAcceptsCred

In [15]:
# The attributes_BusinessParking still contains a bunch of information
json_schema = T.StructType([T.StructField("garage", T.StringType()), 
                            T.StructField("street", T.StringType()),
                            T.StructField("validated", T.StringType()),
                            T.StructField("lot", T.StringType()),
                            T.StructField("valet", T.StringType())
                           ])

In [16]:
# Convert the attributes_BusinessParking to json format and update the schema
df_business = df_business \
    .withColumn('attributes_BusinessParking', F.regexp_replace('attributes_BusinessParking', '\'', '\"')) \
    .withColumn('attributes_BusinessParking', F.regexp_replace('attributes_BusinessParking', 'False', '"False"')) \
    .withColumn('attributes_BusinessParking', F.regexp_replace('attributes_BusinessParking', 'True', '"True"')) \
    .withColumn('attributes_BusinessParking', F.regexp_replace('attributes_BusinessParking', ' ', ''))

In [17]:
df_business = df_business.withColumn('attributes_BusinessParking', F.from_json('attributes_BusinessParking', json_schema))
df_business = flatten_df(df_business)
df_business.printSchema()

root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- attributes_AcceptsInsurance: string (nullable = true)
 |-- attributes_AgesAllowed: string (nullable = true)
 |-- attributes_Alcohol: string (nullable = true)
 |-- attributes_Ambience: string (nullable = true)
 |-- attributes_BYOB: string (nullable = true)
 |-- attributes_BYOBCorkage: string (nullable = true)
 |-- attributes_BestNights: string (nullable = true)
 |-- attributes_BikeParking: string (nullable = true)
 |-- attributes_BusinessAcceptsBitcoin: string (nullable = true)
 |-- attributes_BusinessAcceptsCred

In [18]:
df_business.take(1)

[Row(address='10913 Bailey Rd', business_id='f9NumwFMBDn751xgFiRbNA', categories='Active Life, Gun/Rifle Ranges, Guns & Ammo, Shopping', city='Cornelius', is_open=1, latitude=35.4627242, longitude=-80.8526119, name='The Range At Lake Norman', postal_code='28031', review_count=36, stars=3.5, state='NC', attributes_AcceptsInsurance=None, attributes_AgesAllowed=None, attributes_Alcohol=None, attributes_Ambience=None, attributes_BYOB=None, attributes_BYOBCorkage=None, attributes_BestNights=None, attributes_BikeParking='True', attributes_BusinessAcceptsBitcoin=None, attributes_BusinessAcceptsCreditCards='True', attributes_ByAppointmentOnly='False', attributes_Caters=None, attributes_CoatCheck=None, attributes_Corkage=None, attributes_DietaryRestrictions=None, attributes_DogsAllowed=None, attributes_DriveThru=None, attributes_GoodForDancing=None, attributes_GoodForKids='False', attributes_GoodForMeal=None, attributes_HairSpecializesIn=None, attributes_HappyHour=None, attributes_HasTV=None, a

In [19]:
def convert_bool(df):
    attribs = [c[0] for c in df.dtypes if ('attributes' in c[0])]
    for a in attribs:
        df = df.withColumn(a,
                           df[a].cast(T.BooleanType()))
    return df

In [20]:
# Update columns with correct datatype
# Convert is_open to boolean
df_business = df_business.withColumn('is_open',
                                     df_business['is_open'] \
                                     .cast(T.BooleanType()))
df_business = convert_bool(df_business)

In [21]:
df_business.take(1)

[Row(address='10913 Bailey Rd', business_id='f9NumwFMBDn751xgFiRbNA', categories='Active Life, Gun/Rifle Ranges, Guns & Ammo, Shopping', city='Cornelius', is_open=True, latitude=35.4627242, longitude=-80.8526119, name='The Range At Lake Norman', postal_code='28031', review_count=36, stars=3.5, state='NC', attributes_AcceptsInsurance=None, attributes_AgesAllowed=None, attributes_Alcohol=None, attributes_Ambience=None, attributes_BYOB=None, attributes_BYOBCorkage=None, attributes_BestNights=None, attributes_BikeParking=True, attributes_BusinessAcceptsBitcoin=None, attributes_BusinessAcceptsCreditCards=True, attributes_ByAppointmentOnly=False, attributes_Caters=None, attributes_CoatCheck=None, attributes_Corkage=None, attributes_DietaryRestrictions=None, attributes_DogsAllowed=None, attributes_DriveThru=None, attributes_GoodForDancing=None, attributes_GoodForKids=False, attributes_GoodForMeal=None, attributes_HairSpecializesIn=None, attributes_HappyHour=None, attributes_HasTV=None, attrib

In [22]:
# Since it's just a subset of yelp's data, 
# let's check out which state has the most data 
# and we will focus on that state in order to make a meaningful data model
df_business \
    .groupBy('state') \
    .count() \
    .sort(F.desc('count')) \
    .show()

# Since AZ has the most data, we will include all data in AZ.

+-----+-----+
|state|count|
+-----+-----+
|   AZ|60803|
|   NV|39084|
|   ON|36627|
|   OH|16392|
|   NC|16218|
|   PA|12376|
|   QC|10233|
|   AB| 8682|
|   WI| 5525|
|   IL| 2034|
|   SC| 1328|
|   CA|   23|
|   NY|   22|
|   TX|    6|
|   WA|    5|
|   AL|    3|
|   GA|    3|
|   FL|    3|
|   BC|    2|
|   MI|    2|
+-----+-----+
only showing top 20 rows



In [23]:
# Check total rows.
df_business.count()

209393

In [24]:
df_business = df_business.filter(F.col('state') == 'AZ')

In [25]:
# We will do feature engineering for hours columns later, first lets check if there are any weird formats.
df_business.select('hours_Monday') \
    .dropDuplicates() \
    .withColumn('length', F.length('hours_Monday')) \
    .sort(F.desc('length')) \
    .show()

+------------+------+
|hours_Monday|length|
+------------+------+
| 10:30-19:30|    11|
| 14:30-19:30|    11|
| 17:30-22:30|    11|
| 15:30-21:30|    11|
| 10:45-21:15|    11|
| 16:30-19:45|    11|
| 10:30-22:15|    11|
| 19:30-14:30|    11|
| 16:15-19:15|    11|
| 13:30-18:30|    11|
| 11:30-18:30|    11|
| 17:30-21:30|    11|
| 14:30-18:30|    11|
| 11:15-21:45|    11|
| 12:45-21:45|    11|
| 14:30-20:30|    11|
| 12:30-21:30|    11|
| 16:45-20:30|    11|
| 10:30-18:30|    11|
| 10:30-15:30|    11|
+------------+------+
only showing top 20 rows



In [26]:
# Check missing values
df_business \
    .select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_business.columns)) \
    .take(1)

[Row(address=0, business_id=0, categories=161, city=0, is_open=0, latitude=0, longitude=0, name=0, postal_code=0, review_count=0, stars=0, state=0, attributes_AcceptsInsurance=56813, attributes_AgesAllowed=60803, attributes_Alcohol=60803, attributes_Ambience=60803, attributes_BYOB=60617, attributes_BYOBCorkage=60803, attributes_BestNights=60803, attributes_BikeParking=38262, attributes_BusinessAcceptsBitcoin=53907, attributes_BusinessAcceptsCreditCards=12931, attributes_ByAppointmentOnly=37137, attributes_Caters=50853, attributes_CoatCheck=59604, attributes_Corkage=60430, attributes_DietaryRestrictions=60803, attributes_DogsAllowed=55939, attributes_DriveThru=59482, attributes_GoodForDancing=59655, attributes_GoodForKids=45563, attributes_GoodForMeal=60803, attributes_HairSpecializesIn=60803, attributes_HappyHour=56447, attributes_HasTV=50203, attributes_Music=60803, attributes_NoiseLevel=60803, attributes_Open24Hours=60799, attributes_OutdoorSeating=48764, attributes_RestaurantsAttire

In [27]:
# Since there are too many missing values in attributes columns we will drop all these columns
df_business = df_business.drop(*(c[0] for c in df_business.dtypes if ('attributes' in c[0])))

We need to pay attentation to categories, when we create my data model.

The column categories contains some subcategories. We will need to get a full categories list from yelp API and reorganize the categories. 

The next steps:
    1. Feature engineering for hours. Open time, close time, open duration.
    2. Get full category list, look up the main category for the business.
    3. Create new features for number of check-in, number of reviews, number of tips.

### Data cleansing for df_checkin

In [28]:
df_checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [29]:
df_checkin.take(1)

[Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02, 2019-03-19 22:04:48')]

In [30]:
# We will create a column to count the number of check-in for each business id
df_checkin = df_checkin \
    .withColumn('splitted', F.split('date', ', ')) \
    .withColumn('num_checkin', F.size('splitted'))

In [31]:
df_checkin.take(1)

[Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02, 2019-03-19 22:04:48', splitted=['2016-04-26 19:49:16', '2016-08-30 18:36:57', '2016-10-15 02:45:18', '2016-11-18 01:54:50', '2017-04-20 18:39:06', '2017-05-03 17:58:02', '2019-03-19 22:04:48'], num_checkin=7)]

We will not go any further with the checkin at this stage.

###  Data cleansing for df_tip

In [32]:
df_tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [33]:
# We will not use this table for our project.
df_tip.take(2)

[Row(business_id='UYX5zL_Xj9WEc_Wp-FrqHw', compliment_count=0, date='2013-11-26 18:20:08', text='Here for a quick mtg', user_id='hf27xTME3EiCp6NL6VtWZQ'),
 Row(business_id='Ch3HkwQYv1YKw_FO06vBWA', compliment_count=0, date='2014-06-15 22:26:45', text='Cucumber strawberry refresher', user_id='uEvusDwoSymbJJ0auR3muQ')]

### Data cleansing for df_user

In [34]:
df_user.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [35]:
df_user.take(1)

[Row(average_stars=3.57, compliment_cool=22, compliment_cute=0, compliment_funny=22, compliment_hot=3, compliment_list=1, compliment_more=2, compliment_note=11, compliment_photos=0, compliment_plain=15, compliment_profile=1, compliment_writer=10, cool=227, elite='', fans=14, friends='oeMvJh94PiGQnx_6GlndPQ, wm1z1PaJKvHgSDRKfwhfDg, IkRib6Xs91PPW7pon7VVig, A8Aq8f0-XvLBcyMk2GJdJQ, eEZM1kogR7eL4GOBZyPvBA, e1o1LN7ez5ckCpQeAab4iw, _HrJVzFaRFUhPva8cwBjpQ, pZeGZGzX-ROT_D5lam5uNg, 0S6EI51ej5J7dgYz3-O0lA, woDt8raW-AorxQM_tIE2eA, hWUnSE5gKXNe7bDc8uAG9A, c_3LDSO2RHwZ94_Q6j_O7w, -uv1wDiaplY6eXXS0VwQiA, QFjqxXn3acDC7hckFGUKMg, ErOqapICmHPTN8YobZIcfQ, mJLRvqLOKhqEdkgt9iEaCQ, VKX7jlScJSA-ja5hYRw12Q, ijIC9w5PRcj3dWVlanjZeg, CIZGlEw-Bp0rmkP8M6yQ9Q, OC6fT5WZ8EU7tEVJ3bzPBQ, UZSDGTDpycDzrlfUlyw2dQ, deL6e_z9xqZTIODKqnvRXQ, 5mG2ENw2PylIWElqHSMGqg, Uh5Kug2fvDd51RYmsNZkGg, 4dI4uoShugD9z84fYupelQ, EQpFHqGT9Tk6YSwORTtwpg, o4EGL2-ICGmRJzJ3GxB-vw, s8gK7sdVzJcYKcPv2dkZXw, vOYVZgb_GVe-kdtjQwSUHw, wBbjgHsrKr7BsPBrQwJ

In [36]:
# Check total rows.
df_user.count()

1968703

In [37]:
# Column elit. array of integers, the years the user was elite
# Create a column number of elite, and number of friends
def count_num(astr):
    return len(list(filter(None, astr.split(','))))
colSize = F.udf(lambda x: count_num(x), T.IntegerType())

# Change type for yelping_since
df_user = df_user \
    .withColumn('yelping_since',
                F.unix_timestamp('yelping_since', 'yyyy-MM-dd HH:mm:ss') \
                    .cast(T.TimestampType())) \
    .withColumn('year', F.year('yelping_since').alias('year')) \
    .withColumn('month', F.month('yelping_since').alias('month')) \
    .withColumn('num_elite', colSize('elite')) \
    .withColumn('num_friends', colSize('friends'))

In [38]:
df_user.select('elite', 'num_elite', 'num_friends').show(5)

+--------------------+---------+-----------+
|               elite|num_elite|num_friends|
+--------------------+---------+-----------+
|                    |        0|         45|
|2008,2009,2010,20...|        6|        213|
|                2010|        1|         35|
|                2009|        1|        173|
|2009,2010,2011,20...|        9|        895|
+--------------------+---------+-----------+
only showing top 5 rows



In [39]:
# We will not include friends in our model, because we will not be using it.
df_user = df_user.select('user_id', 
                         'name', 
                         'average_stars', 
                         'compliment_cool',
                         'compliment_cute',
                         'compliment_funny',
                         'compliment_hot',
                         'compliment_list',
                         'compliment_more',
                         'compliment_note',
                         'compliment_photos', 
                         'compliment_plain',
                         'compliment_profile',
                         'compliment_writer',
                         'cool',
                         'fans',
                         'funny',
                         'review_count',
                         'useful',
                         'num_elite',
                         'num_friends',
                         #'friends',
                         'yelping_since')

In [40]:
# Check missing values
df_user \
    .select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_user.columns)) \
    .take(1)
#There are no missing values in df_user

[Row(user_id=0, name=0, average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, fans=0, funny=0, review_count=0, useful=0, num_elite=0, num_friends=0, yelping_since=0)]

## More data source

### Yelp category from yelp API

We will get a full list of yelp's category for main category and the level 1 subcategory.

In [41]:
df_category = spark.read.csv(project_dir + '/data/yelp-dataset/yelp_category.csv', header = True)

In [42]:
df_category.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategories: string (nullable = true)



In [43]:
# change data type and rename index
df_category = df_category \
    .withColumn('index', F.col('_c0').cast('integer')) \
    .drop('_c0')

In [44]:
# Create table for category level 
w = Window.partitionBy('subcategories').orderBy('category')
df_cat=df_category.select('subcategories', 'category') \
    .dropna(how = 'any') \
    .withColumn('rn', F.row_number().over(w)) \
    .filter(F.col('rn')==1) \
    .drop('rn')
df_subcat = df_category.select(df_category['subcategories'].alias('category')) \
    .dropDuplicates() \
    .withColumn('level', F.lit(2)) \
    .select('category', 'level')

# Create category table by business_id
df_business_cat = df_business \
    .select('business_id',F.col('categories').alias('category'), 'state') \
    .withColumn('category', F.split('category', ', ')) \
    .withColumn('category', F.explode('category'))

# Filter out all rows except ones with level 2 (subcategories).
df_business_cat = df_business_cat \
    .join(df_subcat, on='category', how='left_outer') \
    .filter(F.col('level'). isNotNull())

# Look up the first main category based on each subcategory
df_business_cat = df_business_cat.select('business_id', 
                                         F.col('category').alias('subcategories'),
                                         'state') \
    .join(df_cat, on= 'subcategories', how='left_outer') \
    .select('business_id', 'category', 'subcategories', 'state')


# Create join table for table business and table category.
df_bussiness_cat_join = df_business_cat \
    .join(df_category, on =['category', 'subcategories'], how = 'left_outer') \
    .select('business_id', 'index', 'state') \
    .withColumn('state', F.when(F.col('state') == '', None).otherwise(F.col('state'))) \
    .withColumn('state', F.when(F.col('state').isNull(), 'OTHER').otherwise(F.col('state')))

In [45]:
df_bussiness_cat_join.take(10)

[Row(business_id='Yzvjg0SayhoZgCljUJRF9Q', index=15, state='AZ'),
 Row(business_id='51M2Kk903DFYI6gnB5I6SQ', index=86, state='AZ'),
 Row(business_id='51M2Kk903DFYI6gnB5I6SQ', index=340, state='AZ'),
 Row(business_id='51M2Kk903DFYI6gnB5I6SQ', index=460, state='AZ'),
 Row(business_id='51M2Kk903DFYI6gnB5I6SQ', index=258, state='AZ'),
 Row(business_id='cKyLV5oWZJ2NudWgqs8VZw', index=78, state='AZ'),
 Row(business_id='cKyLV5oWZJ2NudWgqs8VZw', index=664, state='AZ'),
 Row(business_id='cKyLV5oWZJ2NudWgqs8VZw', index=955, state='AZ'),
 Row(business_id='ScYkbYNkDgCneBrD9vqhCQ', index=78, state='AZ'),
 Row(business_id='ScYkbYNkDgCneBrD9vqhCQ', index=664, state='AZ')]

Now we can map each business with it's category and subcategory, so we can compare it with it's competitors or with the category average.

There are businesses with multiple categories. Need to pay attention to this when we analyze the data.

### geo_location data and weather data

The idea here is that we can use the histical weather data to find out if there is a relationship between reviews and weather.

In [46]:
#Geo_location
df_geo = spark.read.csv(project_dir + '/data/yelp-dataset/geo_location.csv', header = True)

In [47]:
df_geo.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- latit_s: string (nullable = true)
 |-- longi_s: string (nullable = true)
 |-- station: string (nullable = true)



In [48]:
df_geo.show(100)

+---+-------+-------+-------+
|_c0|latit_s|longi_s|station|
+---+-------+-------+-------+
|  0|   33.6| -111.9|   KPHX|
|  1|   33.4| -111.7|   KIWA|
|  2|   33.4| -111.8|   KIWA|
|  3|   33.3| -112.0|   KPHX|
|  4|   33.4| -111.9|   KPHX|
|  5|   33.3| -111.7|   KIWA|
|  6|   33.5| -112.2|   KPHX|
|  7|   33.6| -112.0|   KPHX|
|  8|   33.6| -112.4|   KPHX|
|  9|   33.5| -112.1|   KPHX|
| 10|   33.4| -112.0|   KPHX|
| 11|   33.6| -112.2|   KPHX|
| 12|   33.8| -112.1|   KPHX|
| 13|   33.6| -112.1|   KPHX|
| 14|   33.5| -111.9|   KPHX|
| 15|   33.3| -111.9|   KPHX|
| 16|   33.5| -112.4|   KPHX|
| 17|   33.5| -112.0|   KPHX|
| 18|   33.3| -111.8|   KIWA|
| 19|   33.2| -111.8|   KIWA|
| 20|   33.7| -111.9|   KPHX|
| 21|   33.4| -112.1|   KPHX|
| 22|   33.6| -111.7|   KIWA|
| 23|   33.7| -112.2|   KPHX|
| 24|   33.7| -112.1|   KPHX|
| 25|   33.7| -112.0|   KPHX|
| 26|   33.4| -111.6|   KIWA|
| 27|   33.5| -112.3|   KPHX|
| 28|   33.5| -111.7|   KIWA|
| 29|   33.6| -111.8|   KPHX|
| 30|   33

In [49]:
# Change data type
df_geo = df_geo \
    .withColumn('latit_s', F.col('latit_s').cast('double')) \
    .withColumn('longi_s', F.col('longi_s').cast('double'))

In [50]:
df_weather = spark.read.csv(project_dir + '/data/weather-data/*.csv', header = True)

In [51]:
df_weather.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- station: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- dew_point_f: string (nullable = true)
 |-- humidity%: string (nullable = true)
 |-- pressure: string (nullable = true)
 |-- precip_hrly: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- uv_level: string (nullable = true)
 |-- feels_like: string (nullable = true)



In [52]:
# Count number of rows
df_weather.count()

684525

In [53]:
# Check missing values
df_weather \
    .select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_weather.columns)) \
    .show()
# All columns except station, station_name,date and hour contians a lot of missing values.
# Since we some data is back to 2004, the data is missing maybe because the station was not there back then.

+---+-------+------------+----+----+------+-----------+---------+--------+-----------+---------+--------+----------+
|_c0|station|station_name|date|hour|  temp|dew_point_f|humidity%|pressure|precip_hrly|condition|uv_level|feels_like|
+---+-------+------------+----+----+------+-----------+---------+--------+-----------+---------+--------+----------+
|  0|      0|           0|   0|   0|193052|     522279|   227074|  196499|     447853|   117688|  443358|    257220|
+---+-------+------------+----+----+------+-----------+---------+--------+-----------+---------+--------+----------+



In [54]:
# Change data type for df_weather

df_weather = df_weather \
    .drop('_c0') \
    .dropDuplicates() \
    .withColumn('date', F.date_format(F.col('date'),'yyyy-MM-dd').cast('date')) \
    .withColumn('hour', F.col('hour').cast('integer')) \
    .withColumn('temp', F.col('temp').cast('double')) \
    .withColumn('dew_point_f', F.col('dew_point_f').cast('double')) \
    .withColumn('humidity%', F.col('humidity%').cast('double')) \
    .withColumn('pressure', F.col('pressure').cast('double')) \
    .withColumn('precip_hrly', F.col('precip_hrly').cast('double')) \
    .withColumn('feels_like', F.col('feels_like').cast('double')) \
    .withColumn('rn', F.row_number().over(Window.partitionBy(['station', 'date', 'hour']).orderBy('station_name'))) \
    .filter(F.col('rn')==1) \
    .drop('rn') \
    .withColumn('temp', F.lit('ABC')) \
    .withColumn('id', F.row_number().over(Window.partitionBy('temp').orderBy(F.lit('A')))) \
    .drop('temp')

In [55]:
df_weather.take(10)

[Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 6), hour=11, dew_point_f=None, humidity%=44.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=1),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 18), hour=23, dew_point_f=None, humidity%=28.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=2),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 31), hour=10, dew_point_f=None, humidity%=None, pressure=None, precip_hrly=None, condition='Fair', uv_level=None, feels_like=None, id=3),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 2, 1), hour=5, dew_point_f=None, humidity%=66.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=4),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 2, 2), hour=6, dew_point_f=None, humidity%=None, pressure=None, precip

## Feature Engineering

Next step is to conduct feature engineering to create some features

### Table businesses

Derived features.

1. latit_s: latitude keeps one digit
2. longi_s: longitude keeps one digit
3. station: the cloest weather station where the business is located.

In [56]:
df_business = df_business \
    .withColumn('latit_s', F.round(F.col('latitude'),1)) \
    .withColumn('longi_s', F.round(F.col('longitude'),1)) \
    .join(df_geo, on=['latit_s', 'longi_s'], how = 'left') \
    .drop('_c0', 'categories')

In [57]:
df_business.take(2)

[Row(latit_s=33.6, longi_s=-111.9, address='8880 E Via Linda, Ste 107', business_id='Yzvjg0SayhoZgCljUJRF9Q', city='Scottsdale', is_open=True, latitude=33.5694041, longitude=-111.8902637, name='Carlos Santo, NMD', postal_code='85258', review_count=4, stars=5.0, state='AZ', hours_Friday=None, hours_Monday=None, hours_Saturday=None, hours_Sunday=None, hours_Thursday=None, hours_Tuesday=None, hours_Wednesday=None, station='KPHX'),
 Row(latit_s=33.4, longi_s=-111.7, address='4827 E Downing Cir', business_id='51M2Kk903DFYI6gnB5I6SQ', city='Mesa', is_open=True, latitude=33.4280652, longitude=-111.7266485, name='USE MY GUY SERVICES LLC', postal_code='85205', review_count=26, stars=4.5, state='AZ', hours_Friday='9:0-16:0', hours_Monday='0:0-0:0', hours_Saturday=None, hours_Sunday=None, hours_Thursday='9:0-16:0', hours_Tuesday='9:0-16:0', hours_Wednesday='9:0-16:0', station='KIWA')]

### Table reviews

Derived features.

1. word_split: a list of splitted words from column text
2. word_count: number of words in word_split
3. year: year of the review, store the data by partition by year and month
4. month: month of the review.
5. date: date of the review. date, hour, and station will be used as the foreign keys of the weather table.
6. hour: hour of the review.
7. station: the closest weather station the business is located.

In [58]:
df_review.take(2)

[Row(business_id='-MhfebM0QIsKt87iDN-FNw', cool=0, funny=0, review_id='xQY8N_XvtGbearJ5X4QryQ', stars=2.0, text='As someone who has worked with many museums, I was eager to visit this gallery on my most recent trip to Las Vegas. When I saw they would be showing infamous eggs of the House of Faberge from the Virginia Museum of Fine Arts (VMFA), I knew I had to go!\n\nTucked away near the gelateria and the garden, the Gallery is pretty much hidden from view. It\'s what real estate agents would call "cozy" or "charming" - basically any euphemism for small.\n\nThat being said, you can still see wonderful art at a gallery of any size, so why the two *s you ask? Let me tell you:\n\n* pricing for this, while relatively inexpensive for a Las Vegas attraction, is completely over the top. For the space and the amount of art you can fit in there, it is a bit much.\n* it\'s not kid friendly at all. Seriously, don\'t bring them.\n* the security is not trained properly for the show. When the curatin

In [59]:
concat_str = F.udf(lambda x : ','.join(x))

In [60]:
# Create new features and join table businesses to add column station.
df_review = df_review \
    .select(*(F.col(c) for c in df_review.columns), 
              F.year('time_stamp').alias('year'),
              F.month('time_stamp').alias('month'),
              F.date_format(F.col('time_stamp'),'yyyy-MM-dd').alias('date').cast('date'),
              F.hour('time_stamp').alias('hour')) \
    .join(df_business.select('business_id', 'station'), on = 'business_id', how = 'right') \
    .join(df_weather.select('id', 'station', 'date', 'hour'), on = ['station', 'date', 'hour'], how = 'left') \
    .drop('station', 'date', 'hour')

# Count number of words in the column text
df_review = df_review \
    .withColumn('word_split', F.split(F.regexp_replace('text', '[^a-zA-Z0-9-]+', ' '), ' ')) \
    .withColumn('word_count', F.size(F.col('word_split'))) \
    .withColumn('word_split', concat_str(F.col('word_split'))) \
    .withColumn('weather_id', F.col('id')) \
    .drop('text', 'id')

In [61]:
df_review.take(1)
#spark.catalog.clearCache()

[Row(business_id='-RRnldwSqCZT6OpuWKXolg', cool=0, funny=0, review_id='3Tw79e5L6bxqswzsovnzkg', stars=5.0, useful=2, user_id='DQGJIbXTgaugbKpsGTDxfA', time_stamp=datetime.datetime(2009, 12, 9, 0, 1, 34), year=2009, month=12, word_split='We,are,now,closed,but,thanks,to,the,good,folks,of,Phoenix,for,supporting,us,while,we,were,open,', word_count=20, weather_id=282256)]

### Table business_category_link

In [62]:
df_business_category_link = df_bussiness_cat_join
df_business_category_link.show()

+--------------------+-----+-----+
|         business_id|index|state|
+--------------------+-----+-----+
|Yzvjg0SayhoZgCljU...|   15|   AZ|
|51M2Kk903DFYI6gnB...|   86|   AZ|
|51M2Kk903DFYI6gnB...|  340|   AZ|
|51M2Kk903DFYI6gnB...|  460|   AZ|
|51M2Kk903DFYI6gnB...|  258|   AZ|
|cKyLV5oWZJ2NudWgq...|   78|   AZ|
|cKyLV5oWZJ2NudWgq...|  664|   AZ|
|cKyLV5oWZJ2NudWgq...|  955|   AZ|
|ScYkbYNkDgCneBrD9...|   78|   AZ|
|ScYkbYNkDgCneBrD9...|  664|   AZ|
|ScYkbYNkDgCneBrD9...|  939|   AZ|
|JjJs3o60uQCfctDjs...|  258|   AZ|
|nIEhsGbw0vJuYl05b...| 1004|   AZ|
|nIEhsGbw0vJuYl05b...|  397|   AZ|
|nIEhsGbw0vJuYl05b...|  362|   AZ|
|Vwo64kNYDjKi98gUU...| 1006|   AZ|
|Vwo64kNYDjKi98gUU...|   15|   AZ|
|x3Po6tJGb729u_HJP...|  377|   AZ|
|07cgbTbANYhVDfzTM...|   78|   AZ|
|DCsS3SgVFO56F6wRO...|  319|   AZ|
+--------------------+-----+-----+
only showing top 20 rows



### Table category

In [63]:
df_category.show(10)

+--------------------+--------------+-----+
|            category| subcategories|index|
+--------------------+--------------+-----+
|      Local Services|   3D Printing|    0|
|         Restaurants|       Italian|    1|
|           Nightlife|          Bars|    2|
|                Food|    Acai Bowls|    3|
|            Shopping|       Fashion|    4|
|Professional Serv...|   Accountants|    5|
|       Beauty & Spas|Acne Treatment|    6|
|         Active Life|          null|    7|
|    Health & Medical|   Acupuncture|    8|
|    Health & Medical|       Doctors|    9|
+--------------------+--------------+-----+
only showing top 10 rows



### Table users

In [64]:
df_user.take(1)

[Row(user_id='ntlvfPzc8eglqvk92iDIAw', name='Rafael', average_stars=3.57, compliment_cool=22, compliment_cute=0, compliment_funny=22, compliment_hot=3, compliment_list=1, compliment_more=2, compliment_note=11, compliment_photos=0, compliment_plain=15, compliment_profile=1, compliment_writer=10, cool=227, fans=14, funny=225, review_count=553, useful=628, num_elite=0, num_friends=45, yelping_since=datetime.datetime(2007, 7, 6, 3, 27, 11))]

### Table time

In [65]:
df_time = df_review \
    .select('time_stamp', 'year', 'month') \
    .dropDuplicates() \
    .select('time_stamp', 
            'year', 
            'month', 
            F.hour('time_stamp').alias('hour'),
            F.date_format(F.col('time_stamp'),'yyyy-MM-dd').alias('date').cast('date'),
            F.dayofmonth('time_stamp').alias('day'),
            F.quarter('time_stamp').alias('quarter'),
            F.dayofweek('time_stamp').alias('weekday'),
            F.dayofyear('time_stamp').alias('dayofyear'),
            F.weekofyear('time_stamp').alias('weekofyear')) \
    .sort('time_stamp')

In [66]:
df_time.show(5)

+-------------------+----+-----+----+----------+---+-------+-------+---------+----------+
|         time_stamp|year|month|hour|      date|day|quarter|weekday|dayofyear|weekofyear|
+-------------------+----+-----+----+----------+---+-------+-------+---------+----------+
|2004-12-20 03:08:59|2004|   12|   3|2004-12-20| 20|      4|      2|      355|        52|
|2004-12-24 20:51:20|2004|   12|  20|2004-12-24| 24|      4|      6|      359|        52|
|2005-02-02 06:53:48|2005|    2|   6|2005-02-02|  2|      1|      4|       33|         5|
|2005-03-08 22:19:43|2005|    3|  22|2005-03-08|  8|      1|      3|       67|        10|
|2005-03-09 05:09:00|2005|    3|   5|2005-03-09|  9|      1|      4|       68|        10|
+-------------------+----+-----+----+----------+---+-------+-------+---------+----------+
only showing top 5 rows



### Table weather

In [67]:
df_weather.take(5)

[Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 6), hour=11, dew_point_f=None, humidity%=44.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=1),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 18), hour=23, dew_point_f=None, humidity%=28.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=2),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 1, 31), hour=10, dew_point_f=None, humidity%=None, pressure=None, precip_hrly=None, condition='Fair', uv_level=None, feels_like=None, id=3),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 2, 1), hour=5, dew_point_f=None, humidity%=66.0, pressure=None, precip_hrly=None, condition=None, uv_level=None, feels_like=None, id=4),
 Row(station='KIWA', station_name='Mesa/Gateway', date=datetime.date(2004, 2, 2), hour=6, dew_point_f=None, humidity%=None, pressure=None, precip