<a href="https://colab.research.google.com/github/emariecovey/Amazon_Vine_Analysis/blob/main/NLP_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
### SPARK INSTALLATION - run at beginning of all colab notebooks (along with a new spark session in next block)

import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [10% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [W0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:4 https://developer.download.nvidia.com/compute/cuda/repos/u

In [None]:
#a pipeline lets us store all functions created in different stages and run only once.
#Each stage that is passed in won't run until the previous stage has been completed

#Start spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Yelp_NLP").getOrCreate()

#import functions
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, StringIndexer

#read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-online/module_16/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df=spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

#Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import length

#create a length column to be used as a future feature
data_df = df.withColumn("length", length(df["text"]))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [None]:
#Create all features for the dataset:
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [None]:
#create a vector with output from IDFModel (last stage in pipeline) and length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

#create feature vectors
clean_up = VectorAssembler(inputCols=["idf_token", "length"], outputCol="features")

#create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

#fit and transform the pipeline with original dataframe 
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

#training data is data passed into NLP model to train model to predict results
#testing data is used to test predictions (using randomSplit)
#Break data down into a training set and a testing set (70% going to training, 30% to testing, 21 is seed number that stays the same)
training, testing = cleaned.randomSplit([0.7, 0.3],21)

#Naive Bayes is a group of classifier algorithms based on Bayes' theorem (gives a way to dtermine probability of an event based on n ew conditions or info that might be related to the event)
from pyspark.ml.classification import NaiveBayes
#create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

#now that the model has been trained, transform model with testing data
test_results = predictor.transform(testing)
test_results.show(5)

#if prediction column at end is 1.0, review was positive, if it's 0.0, review was negative
#future data sets can now be run with model to see if positive or negative


+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [None]:
#Binary classification evaluator shows how accurate the model is at determining if a review is positive or negative

from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was %f" % acc)

Accuracy of model at predicting reviews was 0.700298
