## Read input

In [1]:
# Reason why we have the getOrCreate code
# http://stackoverflow.com/questions/28999332/how-to-access-sparkcontext-in-pyspark-script
sc = SparkContext.getOrCreate()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark1 = SparkSession.builder.appName('Sentiment Analysis').getOrCreate()

In [4]:
review = spark1.read.json("yelp-dataset/yelp_academic_dataset_review.json")

In [5]:
business = spark1.read.json("yelp-dataset/yelp_academic_dataset_business.json")

In [6]:
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [7]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [8]:
review.show(3)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|ujmEBvifdJM6h6RLv...|   0|2013-05-07 04:34:36|    1|Q1sbwvVQXV2734tPg...|  1.0|Total bill for th...|     6|hG7b0MtEbXx5QzbzE...|
|NZnhc2sEQy3RmzKTZ...|   0|2017-01-14 21:30:33|    0|GJXCdrto3ASJOqKeV...|  5.0|I *adore* Travis ...|     0|yXQM5uF2jS6es16SJ...|
|WTqjgwHlXbSFevF32...|   0|2016-11-09 20:09:03|    0|2TzJjDVDEuAW6MR5V...|  5.0|I have to say tha...|     3|n6-Gk65cPZL6Uz8qR...|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 3 rows



In [9]:
business.show(3)

+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+-------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|       city|               hours|is_open|     latitude|    longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------+-------------+-------------+--------------------+-----------+------------+-----+-----+
|2818 E Camino Ace...|[,,,,,,,,,,,,,,,,...|1SWheh84yJXfytovI...|   Golf, Active Life|    Phoenix|                null|      0|   33.5221425| -112.0184807|Arizona Biltmore ...|      85016|           5|  3.0|   AZ|
|30 Eglinton Avenue W|[,, u'full_bar', ...|QXAEGFB4oINsVuTFx...|Specialty Food, R...|Mississauga|[9:0-1:0, 9:0-0:0...|      1|43.6054989743|-79.6522

## Data analysis

In [10]:
from pyspark.sql.functions import col

Let's see the top 20 business categories on the Yelp dataset

In [11]:
business.groupBy("categories").count().orderBy(col("count").desc()).show()

+--------------------+-----+
|          categories|count|
+--------------------+-----+
|  Restaurants, Pizza| 1042|
|Nail Salons, Beau...| 1031|
|  Pizza, Restaurants|  993|
|Beauty & Spas, Na...|  947|
|  Food, Coffee & Tea|  888|
|Mexican, Restaurants|  885|
|  Coffee & Tea, Food|  865|
|Restaurants, Mexican|  853|
|Chinese, Restaurants|  840|
|Hair Salons, Beau...|  831|
|Beauty & Spas, Ha...|  819|
|Restaurants, Chinese|  789|
|Automotive, Auto ...|  585|
|Auto Repair, Auto...|  534|
|       Food, Grocery|  492|
|       Grocery, Food|  491|
|                null|  482|
|Restaurants, Italian|  474|
|Italian, Restaurants|  446|
|Banks & Credit Un...|  439|
+--------------------+-----+
only showing top 20 rows



What are the rating distribution on the Yelp dataset

In [12]:
review.groupBy("stars").count().orderBy(col("stars")).show()

+-----+-------+
|stars|  count|
+-----+-------+
|  1.0|1002159|
|  2.0| 542394|
|  3.0| 739280|
|  4.0|1468985|
|  5.0|2933082|
+-----+-------+



As we can see, lots of ratings are 5 stars, followed by 4 and 1 stars. 3 and 2 stars are the least likely.

Let's check the top 10 categories that has lowest/highest average rating (stars):

In [13]:
business.groupBy("categories").avg("stars").orderBy(col("avg(stars)")).show(10)

+--------------------+----------+
|          categories|avg(stars)|
+--------------------+----------+
|Occupational Ther...|       1.0|
|Dive Bars, Bars, ...|       1.0|
|Hair Removal, Bea...|       1.0|
|Beauty & Spas, Sk...|       1.0|
|Shopping, Arts & ...|       1.0|
|Heating & Air Con...|       1.0|
|Medical Spas, Ski...|       1.0|
|Food, Automotive,...|       1.0|
|Home Services, Ro...|       1.0|
|Dance Studios, Fi...|       1.0|
+--------------------+----------+
only showing top 10 rows



In [14]:
business.groupBy("categories").avg("stars").orderBy(col("avg(stars)").desc()).show(10)

+--------------------+----------+
|          categories|avg(stars)|
+--------------------+----------+
|Beauty & Spas, Ha...|       5.0|
|Flowers & Gifts, ...|       5.0|
|Tires, Oil Change...|       5.0|
|Notaries, Printin...|       5.0|
|Chocolatiers & Sh...|       5.0|
|Automotive, Auto ...|       5.0|
|Interval Training...|       5.0|
|Coffee & Tea, Loc...|       5.0|
|Beauty & Spas, Ha...|       5.0|
|Automotive, Local...|       5.0|
+--------------------+----------+
only showing top 10 rows



Another data analysis that we can do is to merge the business and the review data frames using 'business_id', then proceed to see what is the average(), max(), min() ratings that business in each category receives. However, since the target of the project is sentiment analysis, we can leave that idea for now and save it for a future project.

## Data cleaning & preprocessing for ML

In [None]:
# From the review data frame, keep only the review and stars columns. Then discard 3-star rating, mark 4-5 star as positive,
# mark 1-2 star as negative

In [15]:
to_select = ['stars', 'text']
df = review.select(to_select)
df.show(5)

+-----+--------------------+
|stars|                text|
+-----+--------------------+
|  1.0|Total bill for th...|
|  5.0|I *adore* Travis ...|
|  5.0|I have to say tha...|
|  5.0|Went in for a lun...|
|  1.0|Today was my seco...|
+-----+--------------------+
only showing top 5 rows



In [16]:
df = df.dropna(how='any')

In [17]:
df = df.filter(df['stars'] != 3)

In [18]:
df.describe().show()

+-------+------------------+------------------------------------+
|summary|             stars|                                text|
+-------+------------------+------------------------------------+
|  count|           5946620|                             5946620|
|   mean|3.8052367563422584|                                null|
| stddev|1.5286847482110935|                                null|
|    min|               1.0|                                   !|
|    max|               5.0|５　感動しました！大好きです。
４...|
+-------+------------------+------------------------------------+



From this describe function we can see that not every review is in English. Need to figure out some way to remove non-English reviews

In [19]:
df.dtypes

[('stars', 'double'), ('text', 'string')]

In [20]:
def convert(x):
    if x > 3:
        return 'Good'
    else:
        return 'Bad'

In [21]:
from pyspark.sql.types import *

In [22]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [23]:
from pyspark.sql.functions import udf
udfConvert = udf(convert, StringType())

In [24]:
df2 = df.withColumn("Sentiment", udfConvert('stars'))

In [25]:
df2.printSchema()

root
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [26]:
df2.show()

+-----+--------------------+---------+
|stars|                text|Sentiment|
+-----+--------------------+---------+
|  1.0|Total bill for th...|      Bad|
|  5.0|I *adore* Travis ...|     Good|
|  5.0|I have to say tha...|     Good|
|  5.0|Went in for a lun...|     Good|
|  1.0|Today was my seco...|      Bad|
|  4.0|I'll be the first...|     Good|
|  1.0|This place has go...|      Bad|
|  2.0|I was really look...|      Bad|
|  4.0|Like walking back...|     Good|
|  1.0|Walked in around ...|      Bad|
|  4.0|Wow. So surprised...|     Good|
|  4.0|Michael from Red ...|     Good|
|  1.0|I cannot believe ...|      Bad|
|  5.0|You can't really ...|     Good|
|  4.0|Great lunch today...|     Good|
|  5.0|We've been a huge...|     Good|
|  5.0|Our family LOVES ...|     Good|
|  5.0|If you are lookin...|     Good|
|  4.0|The food is alway...|     Good|
|  5.0|Pick any meat on ...|     Good|
+-----+--------------------+---------+
only showing top 20 rows



In [None]:
# Put data through pipeline: tokenizer, removeStopWord, CountVectorizer. Then train-test split (stratified if possible)

In [27]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

# tokenizer
tokenizer = RegexTokenizer(inputCol="text", outputCol="words")

# stop words remover
swRemover = StopWordsRemover(inputCol="words", outputCol="filtered")

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features",
                              vocabSize=10000, minDF=5)

In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

pipeline = Pipeline(stages=[tokenizer, swRemover, countVectors])

# Fit the pipeline to data
pipelineFit = pipeline.fit(df2)
data = pipelineFit.transform(df2)
data.show(5)

+-----+--------------------+---------+--------------------+--------------------+--------------------+
|stars|                text|Sentiment|               words|            filtered|            features|
+-----+--------------------+---------+--------------------+--------------------+--------------------+
|  1.0|Total bill for th...|      Bad|[total, bill, for...|[total, bill, hor...|(10000,[16,113,14...|
|  5.0|I *adore* Travis ...|     Good|[i, *adore*, trav...|[*adore*, travis,...|(10000,[1,3,4,5,6...|
|  5.0|I have to say tha...|     Good|[i, have, to, say...|[say, office, rea...|(10000,[1,3,10,26...|
|  5.0|Went in for a lun...|     Good|[went, in, for, a...|[went, lunch., st...|(10000,[1,9,22,34...|
|  1.0|Today was my seco...|      Bad|[today, was, my, ...|[today, second, t...|(10000,[3,7,8,12,...|
+-----+--------------------+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [29]:
data.count()

5946620

In [31]:
data.filter(data['Sentiment']=='Good').count()

4402067

In [32]:
4402067/5946620

0.7402637128318271

As we can see, the data set is skewed towards good reviews. About 74% of reviews are good ones.

In [42]:
# Only use Sentiment and features columns in our models, use stars to 
data2 = data.select(['Sentiment', 'features'])
data2.show(5)

+---------+--------------------+
|Sentiment|            features|
+---------+--------------------+
|      Bad|(10000,[16,113,14...|
|     Good|(10000,[1,3,4,5,6...|
|     Good|(10000,[1,3,10,26...|
|     Good|(10000,[1,9,22,34...|
|      Bad|(10000,[3,7,8,12,...|
+---------+--------------------+
only showing top 5 rows



In [47]:
def convertNum(x):
    if x == 'Good':
        return 0
    else:
        return 1

In [48]:
udfConvertNum = udf(convertNum, IntegerType())
data3 = data2.withColumn("label", udfConvertNum('Sentiment'))

In [56]:
data3.count()

5946620

Due to the lack of computing power, my machine fails to run the randomSplit function to do the train-test split. Therefore, I use a subset (20k rows) of the data for the modeling, in which 25% are used as a hold-out test set. 

In [82]:
dataset = data3.limit(20000)

In [69]:
from pyspark.sql.functions import *

In [83]:
train = dataset.limit(15000)

In [84]:
test = dataset.subtract(train)

## Machine Learning 

Logistic Regression

In [85]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model, only use features as input
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(train)

In [86]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(test)

In [80]:
selected = predictions.select("label", "prediction", "probability")
display(selected)

DataFrame[label: int, prediction: double, probability: vector]

In [87]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.9555549697644004

Naive Bayes

In [93]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1, featuresCol = 'features', labelCol = 'label')
nbModel = nb.fit(train)

In [94]:
predictions3 = nbModel.transform(test)

In [95]:
evaluator.evaluate(predictions3)

0.3998445778876712

Naive Bayes classifier performs poorly on the test dataset. It is reasonable because the training data has only 15k rows while the number of features is 10k. High demensionality of the data can lead to the classifier performs very poorly. If we can run the model on the whole dataset then the accuracy can be improved.

Decision Tree, Random Forest

In [96]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5)

# Train model with Training Data
dtModel = dt.fit(train)

In [97]:
display(dtModel)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_fc1fb79898c3) of depth 5 with 55 nodes

In [100]:
# Make predictions on test data using the Transformer.transform() method.
predictions4 = dtModel.transform(test)

In [101]:
evaluator.evaluate(predictions4)

0.5007754234830791

## Diagnostics & Model improvement

### Logistic Regression Grid Search Cross Validation

In [88]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [90]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)

In [91]:
# Use test set to measure the accuracy of our model on new data
predictions2 = cvModel.transform(test)

In [92]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions2)

0.9710756707455382

### Decision Tree Grid Search Cross Validation

In [102]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [103]:
# Create 5-fold CrossValidator
cvdt = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvdtModel = cvdt.fit(train)

In [104]:
predictions5 = cvdtModel.transform(test)

In [105]:
# Evaluate best model
evaluator.evaluate(predictions5)

0.4377063112554501

The performance decreases!!! My hypothesis for this is that the tree is overfitted to the training data and therefore performs worse on the hold-out test set.