In [1]:
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.sql.functions import udf, col, regexp_replace, count
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml import Pipeline

from nltk.stem.snowball import SnowballStemmer
import re

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1580945031101_0002,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load and clean Yelp data

In [None]:
sc = spark.sparkContext

bucketpath = 's3://aust-galv-aust-cap2/YelpData/'
bus_s3 = bucketpath + 'business.json'
cleaned_rev_s3 = bucketpath + 'review_cleaned'

In [2]:
rev_df = spark.read.json(rev_s3)
business_df = spark.read.json(bus_s3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
sentiment_udf = udf(lambda rating: 1 if rating >= 4.0 else 0, IntegerType())

def clean_data_and_upload(s3_path, filename):
    # Download review dataset from s3 bucket, clean data, and save new dataframe
    # to s3 bucket
    reviews_df = spark.read.json(s3_path + filename)
    cleaned_review_df = rev_df.select(regexp_replace('text', r'[^\w\s]', '').alias('clean_text'),
                                      'stars', sentiment_udf('stars').alias('sentiment'))
    
    cleaned_review_df.write.format('json').save(s3_path + "review_cleaned/")
    reviews_df.unpersist()   # Remove old dataframe from memory
    
    return cleaned_review_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
review_df = clean_data_and_upload(bucketpath, "review.json")
review_df = review_df.repartition(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
review_df.createOrReplaceTempView("review")

result = spark.sql("""SELECT *
                      FROM review
                      LIMIT 5""")
result.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-----+
|          clean_text|sentiment|stars|
+--------------------+---------+-----+
|Sugar Factory is ...| negative|  3.0|
|The food is still...| negative|  1.0|
|This review is fo...| positive|  5.0|
|A disclaimer Im a...| negative|  3.0|
|Wow  Keep in mind...| negative|  2.0|
+--------------------+---------+-----+

## Explore business dataset
Note: most EDA has been removed for the sake of brevity

In [None]:
business_df.createOrReplaceTempView("business")
spark.sql("""SELECT *
             FROM business
             LIMIT 5""").show()
# result.show()

In [None]:
result = spark.sql("""SELECT city, COUNT(*) as count
                        FROM business
                        GROUP BY city""")
result.show()

In [5]:
print(review_df.count())
review_df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6685900
[Row(clean_text='Sugar Factory is an ok experience The food lacked flavor The service started out great but gets noticeably bad the last 75 of your time there The music is really loud The tables are small while the plates are huge my friend had to put her plate in her lap because there was no where else on the dinky table The prices for the Goblets are high so look for check in deals on Yelp and other voucher websites or skip them altogether Id go back with the realization that their Happy Hour sucks \n\nWe went after reading about the happy hour and what a ripoff Id have been more pleased if the Goblets were 50 off end of story Inside the server said buy one Goblet and get select apps for 3 I figured it was a great deal and thought you could choose 2 or 3 apps for 3 each I mean the Goblet is the price of an entree at dinner for goodness sake Well you could only choose ONE app at that price PER Goblet I chose the burger thinking I was getting a great deal Not so much The burger

In [15]:
wordcount_df.groupBy('n_words') \
  .agg(count('n_words').alias('count')) \
  .filter(col('n_words')<10).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+
|n_words|count|
+-------+-----+
|      1|  628|
|      6| 2031|
|      3|  477|
|      5| 1115|
|      9| 5257|
|      4|  770|
|      8| 3944|
|      7| 2829|
|      2|  499|
|      0|  546|
+-------+-----+

In [4]:
# We restrict the data to reviews with more than 5 words. This removes just 4,000 out of 7 million reviews

wordcount_udf = udf(lambda string: len(string.split()), IntegerType())

review_data = review_df.select('clean_text','stars', 'sentiment').filter(wordcount_udf('clean_text')>5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Natural Language Processing

In [5]:
# Tokenizes data by word
tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
tokenized_data = tokenizer.transform(review_data)

# Removes stop words
stopwords = StopWordsRemover.loadDefaultStopWords("english")
remover = StopWordsRemover(inputCol="words", outputCol="stopped_words", stopWords=stopwords)
stopremoved_data = remover.transform(tokenized_data).select("clean_text", "sentiment", "stopped_words")

# Stem words
stemmer = SnowballStemmer(language="english")
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
stemmed_data = stopremoved_data.select("clean_text", "sentiment", stemmer_udf("stopped_words").alias("stemmed_words"))

# Gets tfidf for all words. HashingTF is a Transformer which takes sets of terms and converts 
# those sets into fixed-length feature vectors.
hashing_TF = HashingTF(inputCol="stemmed_words", outputCol="raw_features")
tf_data = hashing_TF.transform(stemmed_data).select("clean_text", "sentiment", "raw_features")
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=50)
idf_model = idf.fit(tf_data)
tfidf_data = idf_model.transform(tf_data).select("clean_text", "sentiment", "features")

train, test = tfidf_data.randomSplit([.8, .2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
tfidf_data.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(clean_text='The sandwiches are great Like some of the other reviews the sides and also the cookies are better left alone I always have a hard time picking something off the menu cause its so big I dont have a favorite yet but I have ordered the Shea stadium the most \nThe sandwich makers are OK some what friendly but for the most part dont talk very much \nThe other day we went in for some food and the TV was blaring a sports game so loud we really had a hard time hearing each other \nGood thing the man behind the counter noticed this after a little bit and turned it down with out anyone asking to do so \nThe bathrooms are OK and the seating isnt the best \nI think Im going to have to try all the subs before I find my favorite or just have like 6 or 7 favorites', sentiment=0, features=SparseVector(262144, {1353: 1.6814, 2325: 2.6672, 2437: 6.2655, 8804: 3.2286, 11137: 5.1736, 13957: 2.1708, 18659: 3.703, 20376: 4.8544, 21294: 4.2003, 24918: 3.9971, 25615: 2.7694, 30006: 2.8128, 35

## Making an NLP Pipeline
Note: Spark has no built-in Stemmer so the pipeline doesn't include this preprocessing step

In [5]:
stopwords = StopWordsRemover.loadDefaultStopWords('english')

tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="stopped_words", stopWords=stopwords)
hashing_TF = HashingTF(inputCol=remover.getOutputCol(), outputCol="raw_features")
idf = IDF(inputCol=hashing_TF.getOutputCol(), outputCol="features", minDocFreq=2)

pipeline = Pipeline(stages=[tokenizer, remover, hashing_TF, idf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preprocessing and splitting the data
Note: under normal circumstances I would use cross validation on the model, but due to time constraints a standard train/test split was used.

In [6]:
# Fit the pipeline to training documents.
processed_model = pipeline.fit(review_data)
processed_data = processed_model.transform(review_data)

train, test = processed_data.randomSplit([.8, .2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
truepos_udf = udf(lambda ytrue, ypred: 1 if (ytrue == 1. and ypred == 1.) else 0, IntegerType())
falsepos_udf = udf(lambda ytrue, ypred: 1 if (ytrue == 0. and ypred == 1.) else 0, IntegerType())
falseneg_udf = udf(lambda ytrue, ypred: 1 if (ytrue == 1. and ypred == 0.) else 0, IntegerType())
trueneg_udf = udf(lambda ytrue, ypred: 1 if (ytrue == 0. and ypred == 0.) else 0, IntegerType())


def get_model_scores(df):
    # Gets the confusion matrix from the model, then calculates various scores 
    # of the test data in the model
    confusion_matrix = nb_result.select(truepos_udf("sentiment", "prediction").alias("tp"),
                                        falsepos_udf("sentiment", "prediction").alias("fp"),
                                        falseneg_udf("sentiment", "prediction").alias("fn"),
                                        trueneg_udf("sentiment", "prediction").alias("tn")) \
                                .groupBy().sum().collect()

    tp, fp, fn, tn = confusion_matrix[0]
    total = tp + fp + fn + tn
    accuracy = (tp + tn)/(tp + fp + fn + tn)
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)

    print(f'Test sample size: {total}')
    print(f'Accuracy:  {accuracy:1.2%}')
    print(f'Precision: {precision:1.2%}')
    print(f'Recall:    {recall:1.2%}')
    
    return accuracy, precision, recall

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Training NaiveBayes and RandomForest

In [None]:
nb = NaiveBayes(featuresCol='features', labelCol='sentiment', smoothing=1.0, modelType='multinomial')
nb_model = nb.fit(train)
nb_result = nb_model.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
_ = get_model_scores(nb_result)

In [24]:
nb_result.select('stars', 'sentiment', 'prediction').take(15)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(stars=5.0, sentiment=1, prediction=1.0), Row(stars=3.0, sentiment=0, prediction=0.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=5.0, sentiment=1, prediction=1.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=3.0, sentiment=0, prediction=0.0), Row(stars=5.0, sentiment=1, prediction=0.0), Row(stars=5.0, sentiment=1, prediction=1.0), Row(stars=4.0, sentiment=1, prediction=1.0), Row(stars=5.0, sentiment=1, prediction=1.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=1.0, sentiment=0, prediction=0.0), Row(stars=1.0, sentiment=0, prediction=0.0)]

In [None]:
rfc = RandomForestClassifier(featuresCol='features', labelCol='sentiment', numTrees=2, featureSubsetStrategy='sqrt')
rfc_model = rfc.fit(train)
rfc_result = rfc_model.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
_ = get_model_scores(rfc_result)

## Results

NaiveBayes result: 
* Accuracy: 84.8%
* Precision: 88.4%
* Recall: 88.5%
* Relatively short train time

RandomForest result: 
* Accuracy: 87.7%
* Precision: 87.7%
* Recall: 94.6%
* Much longer train time