<a href="https://colab.research.google.com/github/SDCoulter/big_data_nlp/blob/main/08_PySpark_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Latest version of Spark.
spark_version = 'spark-3.1.2/'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java.
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

# Set environment variables.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-3.1.2-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 18.5 kB/88.7 kB 21%] [Connecting to security.ubuntu.com (91.1890% [2 InRelease gpgv 242 kB] [3 InRelease 18.5 kB/88.7 kB 21%] [Connecting to s                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [2 InRelease gpgv 242 kB] [3 InRelease 35.9 kB/88.7 kB 40%] [Connecting to s0% [2 InRelease gpgv 242 kB] [Waiting for hea

In [2]:
# Start Spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [3]:
# Read in from S3 Buckets.
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# And show our DF.
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [4]:
# Import functions to create our pipeline.
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [5]:
# Create a new column with the length of each row.
from pyspark.sql.functions import length
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [6]:
# Create all the features to the dataset.
# For future ML work.
pos_neg_to_num = StringIndexer(inputCol='class', outputCol='label')
# Pipeline transformations.
tokenizer = Tokenizer(inputCol='text', outputCol='words')
remover = StopWordsRemover(inputCol='words', outputCol='filtered')
hashing = HashingTF(inputCol='filtered', outputCol='hashed_values')
idf = IDF(inputCol='hashed_values', outputCol='idf_values')

In [13]:
# Create a feature vecor containing the output of the IDFModel, and length.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['idf_values', 'length'], outputCol='features')

In [14]:
# Create and run a data processing Pipeline.
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, remover, hashing, idf, clean_up])

In [15]:
# Fit and transform the Pipeline.
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [16]:
# Show lable and resulting features.
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[177414,2...|
|  0.0|(262145,[49815,23...|
|  0.0|(262145,[109649,1...|
|  1.0|(262145,[53101,68...|
|  1.0|(262145,[15370,77...|
|  0.0|(262145,[98142,13...|
|  0.0|(262145,[59172,22...|
|  0.0|(262145,[63420,85...|
|  1.0|(262145,[53777,17...|
|  1.0|(262145,[221827,2...|
|  1.0|(262145,[43756,22...|
|  0.0|(262145,[127310,1...|
|  0.0|(262145,[407,3153...|
|  1.0|(262145,[18098,93...|
|  0.0|(262145,[23071,12...|
|  0.0|(262145,[129941,1...|
|  1.0|(262145,[19633,21...|
|  0.0|(262145,[27707,65...|
|  0.0|(262145,[20891,27...|
|  0.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



In [17]:
# Break data down into a training set and testing set.
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [18]:
# Import Naive Bayes model and fit training data.
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
predictor = nb.fit(training)

In [20]:
# Transform the model with the testing data.
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|               words|            filtered|       hashed_values|          idf_values|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [22]:
test_results.select(['features', 'rawPrediction', 'probability', 'prediction']).show(5)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910...|       1.0|
|(262145,[65645,71...|[-620.40646705112...|[1.0,1.9205984091...|       0.0|
|(262145,[61899,66...|[-528.59562125515...|[0.99999999994873...|       0.0|
|(262145,[132270,1...|[-334.09599709326...|[0.99999999994185...|       0.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [24]:
# Import Binary Classifier to check accuracy of the model.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Choose required columns.
acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
# Evaluate the test_results DataFrame and print out the result.
acc = acc_eval.evaluate(test_results)
print("Accuracy of the model to predict reviews is %f" % acc)

Accuracy of the model to predict reviews is 0.700298
