In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [2 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Hit:4 https://developer.download.nvidia.com/comp

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing").getOrCreate()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, StringIndexer

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://butlerdata-group2-project3.s3.us-east-2.amazonaws.com/Tweets.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Tweets.csv"), sep=",", header=True)

# Show DataFrame
df.show(60)

+--------------------+-----------------+----------------------------+--------------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            tweet_id|airline_sentiment|airline_sentiment_confidence|      negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|         tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+--------------------+-----------------+----------------------------+--------------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  570306133677760513|          neutral|                         1.0| 

In [None]:
df = df.filter(df.airline != "null")

df.show(60)

+------------------+-----------------+----------------------------+--------------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|      negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|         tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|570306133677760513|          neutral|                         1.0|         

In [None]:
from pyspark.sql.functions import length

data_df = df.withColumn('length', length(df['text']))
data_df.show()

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|length|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+------+
|570306133677760513|          neutral|                         1.0|          null|                  

In [None]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="text", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show()

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|               words|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|570306133677760513|          neutral|                    

In [None]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+----------------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+
|tweet_id          |airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|airline       |airline_sentiment_gold|name           |negativereason_gold|retweet_count|text                                                                    

In [None]:
# Remove stop words
stop_list = ["@united","@jetblue's","@","@VirginAmerica", "@virginamerica","@AmericanAir","@JetBlue","@jetblue","@DeltaAssist","@SouthwestAir","@united","@United","airlines","Airlines","@USAirways","@usairways"]
remover = StopWordsRemover(inputCol="filtered", outputCol="filtered_new", stopWords=stop_list)
removed_frame_new = remover.transform(removed_frame)
removed_frame_new.show(truncate=False)

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+----------------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+
|tweet_id          |airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|airline       |airline_sentiment_gold|name 

In [None]:
# Run the hashing term frequency
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,4))

# Transform into a DF
hashed_df = hashing.transform(removed_frame_new)
hashed_df.show()

+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|               words|            filtered|        filtered_new|        hashedValues|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+-----------------

In [None]:
# remove tweet_coord
hashed_clean_df = hashed_df.drop("tweet_coord", "negativereason", "negativereason_confidence","airline_sentiment_gold","negativereason_gold")
hashed_clean_df.count()

14658

In [None]:
hashed_clean_df = hashed_clean_df.filter(hashed_clean_df.text != "null")
hashed_clean_df.count()

14632

In [None]:
# Fit the IDF on the data set 
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_clean_df)
rescaledData = idfModel.transform(hashed_clean_df)

In [None]:
# Display the DataFrame
rescaledData.show(truncate=False)

+------------------+-----------------+----------------------------+--------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
pos_neg_to_num = StringIndexer(inputCol='airline_sentiment',outputCol='label')
removed_frame_new = pos_neg_to_num.fit(rescaledData)
removed_frame_new2 = removed_frame_new.transform(rescaledData)
removed_frame_new2.show()

+------------------+-----------------+----------------------------+--------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|       airline|           name|retweet_count|                text|       tweet_created|      tweet_location|       user_timezone|               words|            filtered|        filtered_new|        hashedValues|            features|label|
+------------------+-----------------+----------------------------+--------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|570306133677760513|          neutral|                         1.0|Virgin Am

In [None]:
onehot = OneHotEncoder(inputCol='label',outputCol='hotcode')
one_hot_test = onehot.fit(removed_frame_new2).transform(removed_frame_new2)
removed_frame_new3 = onehot.fit(removed_frame_new2)
removed_frame_new4 = removed_frame_new3.transform(removed_frame_new2)
# removed_frame_new4.show()
one_hot_test.show()

+------------------+-----------------+----------------------------+--------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|       airline|           name|retweet_count|                text|       tweet_created|      tweet_location|       user_timezone|               words|            filtered|        filtered_new|        hashedValues|            features|label|      hotcode|
+------------------+-----------------+----------------------------+--------------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+
|570306133677760513|          neut

In [None]:
cleaned = one_hot_test

In [None]:
# Show label and resulting features
cleaned.select(['label', 'hotcode']).show()

+-----+-------------+
|label|      hotcode|
+-----+-------------+
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
|  1.0|(2,[1],[1.0])|
|  0.0|(2,[0],[1.0])|
|  0.0|(2,[0],[1.0])|
|  0.0|(2,[0],[1.0])|
|  2.0|    (2,[],[])|
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
|  2.0|    (2,[],[])|
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
|  2.0|    (2,[],[])|
|  2.0|    (2,[],[])|
|  2.0|    (2,[],[])|
|  0.0|(2,[0],[1.0])|
|  2.0|    (2,[],[])|
|  0.0|(2,[0],[1.0])|
|  2.0|    (2,[],[])|
|  2.0|    (2,[],[])|
+-----+-------------+
only showing top 20 rows



In [None]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
predictor.save("sentiment_model.h5")

In [None]:
from tensorflow.keras.models import load_model
setiment_model_predictor = load_model("sentiment_model.h5")

OSError: ignored

# New Section

# New Section

In [None]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+------------------+-----------------+----------------------------+----------+---------------+-------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+--------------------+--------------------+----------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|   airline|           name|retweet_count|                text|       tweet_created|tweet_location|       user_timezone|               words|            filtered|        filtered_new|        hashedValues|            features|label|      hotcode|       rawPrediction|         probability|prediction|
+------------------+-----------------+----------------------------+----------+---------------+-------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+

In [None]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.483320


In [None]:
testing.show()

+------------------+-----------------+----------------------------+----------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|   airline|           name|retweet_count|                text|       tweet_created|      tweet_location|       user_timezone|               words|            filtered|        filtered_new|        hashedValues|            features|label|      hotcode|
+------------------+-----------------+----------------------------+----------+---------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+
|567617081336950784|         negative|        