<a href="https://colab.research.google.com/github/EmadSoheili/PySpark-UofT-NLP/blob/main/Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
# Install PySpark

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [16]:
# Start Spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [17]:
# Load data into a df

from pyspark import SparkFiles

url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-online/v2/module_17/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [18]:
# Create a length column

from pyspark.sql.functions import length

data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [19]:
# Import stop words library

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [20]:
# Create all NLP features

pos_neg_to_num = StringIndexer(inputCol='class', outputCol='label')
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
hashingTF = HashingTF(inputCol='stop_tokens', outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [21]:
# Create feature vectors

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [22]:
# Create and run pipeline

from pyspark.ml import Pipeline

data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [23]:
# Fit and transform the pipeline

cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[177414,2...|
|  0.0|(262145,[49815,23...|
|  0.0|(262145,[109649,1...|
|  1.0|(262145,[53101,68...|
|  1.0|(262145,[15370,77...|
|  0.0|(262145,[98142,13...|
|  0.0|(262145,[59172,22...|
|  0.0|(262145,[63420,85...|
|  1.0|(262145,[53777,17...|
|  1.0|(262145,[221827,2...|
|  1.0|(262145,[43756,22...|
|  0.0|(262145,[127310,1...|
|  0.0|(262145,[407,3153...|
|  1.0|(262145,[18098,93...|
|  0.0|(262145,[23071,12...|
|  0.0|(262145,[129941,1...|
|  1.0|(262145,[19633,21...|
|  0.0|(262145,[27707,65...|
|  0.0|(262145,[20891,27...|
|  0.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



In [24]:
# Break data into training and testing sets

training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [25]:
# Create a Naive Bayes model and fit training data

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
predictor = nb.fit(training)

In [26]:
# Transform the model with the testing data

test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [31]:
# Check the accurracy of the model

from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print('Accurcay: %f' % acc)

Accurcay: 0.700298
