<a href="https://colab.research.google.com/github/RodGuarneros/BigData_yelp_Sentiment_Analysis/blob/main/naive_review.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the version of spark 3.0 form http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
#spark_version 3.0.2
spark_version = "spark-3.1.1"
os.environ["SPARK_VERSION"] = spark_version

# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [1 InRelease 0 B/3,0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connecting to ppa.                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connecting to ppa.0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Get:4 https://developer.download.nvidia.com/comp

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [6]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show(truncate=False)

+--------+---------------------------------------------------------------------------------------------------------------+
|class   |text                                                                                                           |
+--------+---------------------------------------------------------------------------------------------------------------+
|positive|Wow... Loved this place.                                                                                       |
|negative|Crust is not good.                                                                                             |
|negative|Not tasty and the texture was just nasty.                                                                      |
|positive|Stopped by during the late May bank holiday off Rick Steve recommendation and loved it.                        |
|positive|The selection on the menu was great and so were the prices.                                                    |
|negative|Now I 

In [4]:
df.count()

1000

In [7]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



### Feature Transformations


In [10]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label') # zeros and ones
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [11]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [12]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [14]:
# Show label and resulting features
cleaned.select(['label', 'features', 'length']).show()

+-----+--------------------+------+
|label|            features|length|
+-----+--------------------+------+
|  1.0|(262145,[108541,1...|    24|
|  0.0|(262145,[49815,10...|    18|
|  0.0|(262145,[95889,97...|    41|
|  1.0|(262145,[9056,531...|    87|
|  1.0|(262145,[15370,67...|    59|
|  0.0|(262145,[19036,98...|    46|
|  0.0|(262145,[30950,48...|    37|
|  0.0|(262145,[15494,58...|   111|
|  1.0|(262145,[53777,95...|    25|
|  1.0|(262145,[107107,2...|    14|
|  1.0|(262145,[43756,99...|    24|
|  0.0|(262145,[127310,1...|    18|
|  0.0|(262145,[407,1903...|    99|
|  1.0|(262145,[18098,19...|    59|
|  0.0|(262145,[19036,23...|    62|
|  0.0|(262145,[19036,25...|    50|
|  1.0|(262145,[19633,21...|    19|
|  0.0|(262145,[27707,65...|    38|
|  0.0|(262145,[20891,27...|    51|
|  0.0|(262145,[8287,120...|    20|
+-----+--------------------+------+
only showing top 20 rows



In [15]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [17]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[19036,20...|(262144,[19036,20...|(262145,[19036,20...|[-874.20260546827...|[0.99999999999969...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.555070086084...|[0.64983564153590.

In [18]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.664437
