In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('yelp').getOrCreate()

In [2]:
from pyspark import SparkFiles

spark.sparkContext.addFile("https://s3.amazonaws.com/zepl-trilogy-test/yelp_reviews.tsv")
df = spark.read.csv(SparkFiles.get("yelp_reviews.tsv"), sep="\t", header=True)
df.show(truncate=100)

print('Number of records: ', df.count())

+--------+----------------------------------------------------------------------------------------------------+
|   class|                                                                                                text|
+--------+----------------------------------------------------------------------------------------------------+
|positive|                                                                            Wow... Loved this place.|
|negative|                                                                                  Crust is not good.|
|negative|                                                           Not tasty and the texture was just nasty.|
|positive|             Stopped by during the late May bank holiday off Rick Steve recommendation and loved it.|
|positive|                                         The selection on the menu was great and so were the prices.|
|negative|                                                      Now I am getting angry and I want my dam

In [3]:
from pyspark.ml.feature import StringIndexer, Tokenizer, StopWordsRemover, HashingTF, IDF

# adjust class column
pos_neg_transformer = StringIndexer(inputCol="class", outputCol="label")

# adjust text column
token_transformer = Tokenizer(inputCol="text", outputCol="tokens")
stopwords_transformer = StopWordsRemover(inputCol="tokens", outputCol="stop_tokens")
hashing_transformer = HashingTF(inputCol="stop_tokens", outputCol="hash_tokens")
idf_transformer = IDF(inputCol="hash_tokens", outputCol="idf_tokens")

In [4]:
pos_neg_transformer.fit(df).transform(df).show()

+--------+--------------------+-----+
|   class|                text|label|
+--------+--------------------+-----+
|positive|Wow... Loved this...|  0.0|
|negative|  Crust is not good.|  1.0|
|negative|Not tasty and the...|  1.0|
|positive|Stopped by during...|  0.0|
|positive|The selection on ...|  0.0|
|negative|Now I am getting ...|  1.0|
|negative|Honeslty it didn'...|  1.0|
|negative|The potatoes were...|  1.0|
|positive|The fries were gr...|  0.0|
|positive|      A great touch.|  0.0|
|positive|Service was very ...|  0.0|
|negative|  Would not go back.|  1.0|
|negative|The cashier had n...|  1.0|
|positive|I tried the Cape ...|  0.0|
|negative|I was disgusted b...|  1.0|
|negative|I was shocked bec...|  1.0|
|positive| Highly recommended.|  0.0|
|negative|Waitress was a li...|  1.0|
|negative|This place is not...|  1.0|
|negative|did not like at all.|  1.0|
+--------+--------------------+-----+
only showing top 20 rows



In [5]:
from pyspark.sql.functions import length

# add length feature
df = df.withColumn('length', length(df['text']))
df.show(truncate=90)

+--------+------------------------------------------------------------------------------------------+------+
|   class|                                                                                      text|length|
+--------+------------------------------------------------------------------------------------------+------+
|positive|                                                                  Wow... Loved this place.|    24|
|negative|                                                                        Crust is not good.|    18|
|negative|                                                 Not tasty and the texture was just nasty.|    41|
|positive|   Stopped by during the late May bank holiday off Rick Steve recommendation and loved it.|    87|
|positive|                               The selection on the menu was great and so were the prices.|    59|
|negative|                                            Now I am getting angry and I want my damn pho.|    46|
|negative|         

In [6]:
from pyspark.ml.feature import VectorAssembler

# create feature vectors
features_transformer = VectorAssembler(inputCols=['idf_tokens', 'length'], outputCol='features')

In [7]:
from pyspark.ml import Pipeline

# create a data processing Pipeline
data_prep_pipeline = Pipeline(stages=[
    pos_neg_transformer, 
    token_transformer, 
    stopwords_transformer, 
    hashing_transformer, 
    idf_transformer, 
    features_transformer
])

In [8]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(df)
cleaned = cleaner.transform(df)
cleaned.show(truncate=10)

+--------+----------+------+-----+----------+-----------+-----------+----------+----------+
|   class|      text|length|label|    tokens|stop_tokens|hash_tokens|idf_tokens|  features|
+--------+----------+------+-----+----------+-----------+-----------+----------+----------+
|positive|Wow... ...|    24|  0.0|[wow......| [wow......| (262144...|(262144...|(262145...|
|negative|Crust i...|    18|  1.0|[crust,...| [crust,...| (262144...|(262144...|(262145...|
|negative|Not tas...|    41|  1.0|[not, t...| [tasty,...| (262144...|(262144...|(262145...|
|positive|Stopped...|    87|  0.0|[stoppe...| [stoppe...| (262144...|(262144...|(262145...|
|positive|The sel...|    59|  0.0|[the, s...| [select...| (262144...|(262144...|(262145...|
|negative|Now I a...|    46|  1.0|[now, i...| [gettin...| (262144...|(262144...|(262145...|
|negative|Honeslt...|    37|  1.0|[honesl...| [honesl...| (262144...|(262144...|(262145...|
|negative|The pot...|   111|  1.0|[the, p...| [potato...| (262144...|(262144...|

In [9]:
cleaned.select(['class', 'label', 'features']).show(truncate=90)

+--------+-----+------------------------------------------------------------------------------------------+
|   class|label|                                                                                  features|
+--------+-----+------------------------------------------------------------------------------------------+
|positive|  0.0|(262145,[33933,69654,123604,262144],[4.51085950651685,6.215607598755275,3.8642323415917...|
|negative|  1.0|               (262145,[150903,153353,262144],[3.7732605633860707,5.810142490647111,18.0])|
|negative|  1.0|(262145,[63367,115881,227406,262144],[5.52246041819533,6.215607598755275,4.962844630259...|
|positive|  0.0|(262145,[6286,27293,33933,53101,68727,76515,90362,140586,146390,188822,262144],[6.21560...|
|positive|  0.0|(262145,[6979,91184,138356,151571,262144],[5.52246041819533,4.829313237635384,2.9197707...|
|negative|  1.0|(262145,[24661,34140,98142,190256,255299,262144],[6.215607598755275,4.829313237635384,5...|
|negative|  1.0|(262145,[101

In [10]:
from pyspark.ml.classification import NaiveBayes

# split training and testing data
train, test = cleaned.randomSplit([0.75, 0.25])

# create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(train)

In [11]:
# tranform the model with the testing data
test_results = predictor.transform(test)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|              tokens|         stop_tokens|         hash_tokens|          idf_tokens|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"It was extremely...|    51|  1.0|["it, was, extrem...|["it, extremely, ...|(262144,[7388,163...|(262144,[7388,163...|(262145,[7388,163...|[-463.83432142333...|[0.00884727968197...|       1.0|
|negative|"The servers went...|    97|  1.0|["the, servers, w...|["the, servers, w...|(262144,[67991,76...|(262144,[67991,76...|(262145,[67991,76...|[-910.02214478209...|[6.95199911010009.

In [12]:
test_results.select('class', 'label', 'prediction').show()

+--------+-----+----------+
|   class|label|prediction|
+--------+-----+----------+
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       0.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       0.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
|negative|  1.0|       1.0|
+--------+-----+----------+
only showing top 20 rows



In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# use the ClassificationEvaluator to check accuracy
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)

print(f"Accuracy of model at predicting reviews was: {acc}")

Accuracy of model at predicting reviews was: 0.7202232901709829
