In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [1 InRelease 8,192 B/88.7                                                                                Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [1 InRelease 46.0 kB/88.7 kB 52%] [Waiting for headers                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [1 InRelease 46.0 kB/88.7 kB 52%] [Waiting for headers0% [2 InRelease gpgv 3,626 B] [Waiting for headers] [1 InRelease 48.9 kB/88.7 k                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [2 InRelease gpgv 3,626 B] [4 InRelease 14.2 kB/88.7 kB 16%] [1 InRelease 54                                               

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Team4FinalProject").getOrCreate()

## Data Pipeline

In [4]:
# Read in data
df = spark.read.csv(r"/content/feedback1.csv", header=True)

df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [6]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------------------+--------------------+------+
|               class|                text|length|
+--------------------+--------------------+------+
|            Positive|This shirt is ver...|   192|
|            Positive|Took a chance on ...|   296|
|            Positive|If this product w...|   132|
|fits nicely! i'm 5'4| 130lb and pregna...|    54|
|the tie can be fr...|                null|  null|
|            Positive|I love this shirt...|   443|
|            Positive|Very comfortable,...|    65|
|            Positive|"This top is so c...|   382|
|            Negative|Why do designers ...|   500|
|            Positive|I have a short to...|   234|
|zipper goes almos...|                null|  null|
|unlike another re...| i found it went ...|   100|
|            Positive|I am so drawn to ...|   113|
|            Negative|The zipper broke ...|   217|
|            Positive|I usually size up...|   254|
|            Positive|I purchased this ...|   500|
|            Positive|I tried t

In [7]:
data_df = data_df.dropna()
data_df.show()

+--------------------+--------------------+------+
|               class|                text|length|
+--------------------+--------------------+------+
|            Positive|This shirt is ver...|   192|
|            Positive|Took a chance on ...|   296|
|            Positive|If this product w...|   132|
|fits nicely! i'm 5'4| 130lb and pregna...|    54|
|            Positive|I love this shirt...|   443|
|            Positive|Very comfortable,...|    65|
|            Positive|"This top is so c...|   382|
|            Negative|Why do designers ...|   500|
|            Positive|I have a short to...|   234|
|unlike another re...| i found it went ...|   100|
|            Positive|I am so drawn to ...|   113|
|            Negative|The zipper broke ...|   217|
|            Positive|I usually size up...|   254|
|            Positive|I purchased this ...|   500|
|            Positive|I tried this on f...|   173|
|te blue is a ligh...|      almost neutral|    15|
|            Negative|Really cu

In [8]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [9]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [10]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

## Machine Learning Model

In [11]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [12]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[20805,42...|
|  0.0|(262145,[1578,312...|
|  0.0|(262145,[10049,31...|
| 11.0|(262145,[25882,47...|
|  0.0|(262145,[23087,23...|
|  0.0|(262145,[80218,87...|
|  0.0|(262145,[6346,181...|
|  1.0|(262145,[12917,22...|
|  0.0|(262145,[6346,157...|
| 52.0|(262145,[78820,87...|
|  0.0|(262145,[80393,93...|
|  1.0|(262145,[38668,44...|
|  0.0|(262145,[11125,18...|
|  0.0|(262145,[787,2785...|
|  0.0|(262145,[12524,40...|
| 39.0|(262145,[128590,1...|
|  1.0|(262145,[4631,234...|
|  0.0|(262145,[18176,19...|
|  0.0|(262145,[3121,634...|
|  0.0|(262145,[8538,157...|
+-----+--------------------+
only showing top 20 rows



In [13]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [14]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data

nb = NaiveBayes()
predictor = nb.fit(training)

In [15]:
# Transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Negative|"I bought two, on...|   282|  1.0|["i, bought, two,...|["i, bought, two,...|(262144,[7777,828...|(262144,[7777,828...|(262145,[7777,828...|[-1697.2299095550...|[1.0,5.2179376037...|       0.0|
|Negative|"I usually wear a...|   242|  1.0|["i, usually, wea...|["i, usually, wea...|(262144,[19096,25...|(262144,[19096,25...|(262145,[19096,25...|[-1334.7501391923...|[1.0,2.8853390455.

In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.500000
