# Spark Project: Tweet Sentiment Analysis

# NOTE: This notebook was meant to work with Spark. I also had a ML aspect with NLP, but Kaggle could not handle it. It seems the best course of action in this case would be to use a different platform, perhaps also with SystemML.

Coded by Luna McBride

The point of this project is to work with Apache Spark's features.

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import spacy # NLP
import re # regular expressions
import html # HTML content, like &amp;
from spacy.lang.en.stop_words import STOP_WORDS # stopwords

nlp = spacy.load('en_core_web_lg') #Load spacy, up here so I do not have to load it constantly

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/sentiment140/training.1600000.processed.noemoticon.csv


# Spark Handling

Source for spark handling in Kaggle: https://www.kaggle.com/tylerx/machine-learning-with-spark

In [2]:
!pip install -U sparkmagic #Install or update sparkmagic, for spark jupyter displays
!pip install -U pyspark #Install or update pyspark

Collecting sparkmagic
  Downloading sparkmagic-0.17.0.tar.gz (37 kB)
Collecting hdijupyterutils>=0.6
  Downloading hdijupyterutils-0.17.0.tar.gz (4.6 kB)
Collecting autovizwidget>=0.6
  Downloading autovizwidget-0.17.0.tar.gz (8.3 kB)
Collecting requests_kerberos>=0.8.0
  Downloading requests_kerberos-0.12.0-py2.py3-none-any.whl (14 kB)
Collecting pykerberos<2.0.0,>=1.1.8; sys_platform != "win32"
  Downloading pykerberos-1.2.1.tar.gz (24 kB)


Building wheels for collected packages: sparkmagic, hdijupyterutils, autovizwidget, pykerberos
  Building wheel for sparkmagic (setup.py) ... [?25ldone
[?25h  Created wheel for sparkmagic: filename=sparkmagic-0.17.0-py3-none-any.whl size=60489 sha256=c9f27a6bda71ae002cfcba2b71c275fd7b2b9412eba4b3482ca94734da1db9c6
  Stored in directory: /root/.cache/pip/wheels/d8/b0/70/a7689cd13911f373341bc8c60f6fc195a6cc308b4118e00eb0
  Building wheel for hdijupyterutils (setup.py) ... [?25ldone
[?25h  Created wheel for hdijupyterutils: filename=hdijupyterutils-0.17.0-py3-none-any.whl size=7696 sha256=527e32af7ac9a4de00c015e69ce4cbd78337c35d2296167f2ae6ea4e28e668c4
  Stored in directory: /root/.cache/pip/wheels/b6/30/53/dc0c8ec25a55ca1f0fc7a6251d36acfbb212d7c38562bd1ce5
  Building wheel for autovizwidget (setup.py) ... [?25ldone
[?25h  Created wheel for autovizwidget: filename=autovizwidget-0.17.0-py3-none-any.whl size=14547 sha256=0daf34197a3112291866ce889decac87ab028d80cf9688838aaf0b33c81721e1

In [3]:
from pyspark.sql.types import * #Import spark types
from pyspark.sql.functions import * #Import spark functions

import pyspark
from pyspark.sql import SparkSession #Import the spark session
from pyspark import SparkContext #Create a spark context
from pyspark.sql import SQLContext #Create an SQL context

from pyspark.ml.feature import Tokenizer #Used to tokenize the tweet data
from pyspark.ml.feature import CountVectorizer #Used to make the data into vectors
from pyspark.ml import Pipeline #Build a pipeline
from pyspark.ml.classification import RandomForestClassifier #The chosen classifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator #Metrics

conf = pyspark.SparkConf().setAll([('spark.executor.memory', '16g'), ('spark.executor.cores', '1'), ('spark.cores.max', '1'), ('spark.driver.memory','16g')])
sc = SparkContext.getOrCreate(conf = conf) #Initialize the spark context
sqlContext = SQLContext.getOrCreate(sc) #Create an SQL Context
spark = SparkSession.builder.master("local[*]").getOrCreate() #Make a spark session

---

# Load Data

In [4]:
tweets = spark.read.csv("../input/sentiment140/training.1600000.processed.noemoticon.csv", inferSchema = True, header = False) #Read in the data
tweets.show(10) #Show the first 10 columns

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  0|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [5]:
labels = ["target", "id", "date", "flag", "user", "tweet", "cleanTweet"] #Column names

#Fix column names, as this dataset did not have a header
tweets = tweets.select(col("_c0").alias(labels[0]), col("_c1").alias(labels[1]), col("_c2").alias(labels[2]),
                      col("_c3").alias(labels[3]), col("_c4").alias(labels[4]), col("_c5").alias(labels[5]))
tweets.show(10) #Show the dataset

+------+----------+--------------------+--------+---------------+--------------------+
|target|        id|                date|    flag|           user|               tweet|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

---

# Check for Null Values

Source: https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe

In [6]:
#For each column, count cases where the column is NaN or Null
tweets.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in tweets.columns]).show() #Check for null values

+------+---+----+----+----+-----+
|target| id|date|flag|user|tweet|
+------+---+----+----+----+-----+
|     0|  0|   0|   0|   0|    0|
+------+---+----+----+----+-----+



There are no null values.

---

# Sentiment Counts

In [7]:
tweets.groupBy("target").count().orderBy("count").show() #Check how many of each target value there is

+------+------+
|target| count|
+------+------+
|     0|800000|
|     4|800000|
+------+------+



There are an equal number of positive (4) and negative (0) tweets in the dataset. There are no neutral values

---

# Fix the Sentiments

In [8]:
#Change tweet sentiment to 0 and 1 instead of 0 and 4
tweets = tweets.withColumn("target", \
              when(tweets["target"] == 4, 1).otherwise(tweets["target"]))

In [9]:
tweets.groupBy("target").count().orderBy("count").show() #Check how many of each target value there is

+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+



---

# Clean the Tweets

Reusing my clean tweets code from my Coronavirus tweet analysis: https://www.kaggle.com/lunamcbride24/coronavirus-tweet-processing
Source on adding the new column: https://stackoverflow.com/questions/48164206/pyspark-adding-a-column-from-a-list-of-values-using-a-udf

In [10]:
punctuations = """!()-![]{};:+'"\,<>./?@#$%^&*_~Â""" #List of punctuations to remove, including a weird A that will not process out any other way

#CleanTweets: parces the tweets and removes punctuation, stop words, digits, and links.
#Input: the list of tweets that need parsing
#Output: the parsed tweets
def cleanTweets(tweetParse):
    length = len(tweetParse)
    for i in range(0,length):
        tweet = tweetParse[i] #Putting the tweet into a variable so that it is not calling tweetParse[i] over and over
        tweet = html.unescape(tweet) #Removes leftover HTML elements, such as &amp;
        tweet = re.sub(r"@\w+", ' ', tweet) #Completely removes @'s, as other peoples' usernames mean nothing
        tweet = re.sub(r'http\S+', ' ', tweet) #Removes links, as links provide no data in tweet analysis in themselves
        tweet = re.sub(r"\d+\S+", ' ', tweet) #Removes numbers, as well as cases like the "th" in "14th"
        tweet = ''.join([punc for punc in tweet if not punc in punctuations]) #Removes the punctuation defined above
        tweet = tweet.lower() #Turning the tweets lowercase real quick for later use
    
        tweetWord = tweet.split() #Splits the tweet into individual words
        tweetParse[i] = ''.join([word + " " for word in tweetWord if nlp.vocab[word].is_stop == False]) #Checks if the words are stop words
        
    return tweetParse #Returns the parsed tweets

In [11]:
tweetText = tweets.select("tweet").collect() #Collect the tweet data
tweetText = [str(tweet.tweet) for tweet in tweetText] #Make the tweets strings
cleanTweet = cleanTweets(tweetText) #Clean the tweets

---

# Connect Clean Tweets with Sentiments

## Collect Sentiments

In [12]:
sentiment = tweets.select("target").collect() #Collect the sentiment data
sentiment = [int(tweet.target) for tweet in sentiment] #Make the sentiments into integers
print(sentiment[:10]) #Look at the list of sentiments

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


## Connect the Tweets and Sentiments

In [13]:
tweetSentiment = [] #A list to hold the combined tweet-sentiment pair
lengthClean = len(cleanTweet) #Get the length for looping

#For loop to combine clean tweet with its sentiment
for index in range(0, lengthClean):
    tweetSentiment.append((cleanTweet[index], sentiment[index])) #Add the combined pair to the tweetSentiment list

print(tweetSentiment[:10]) #Print a couple of the pairings

[('awww thats bummer shoulda got david carr day d ', 0), ('upset cant update facebook texting cry result school today blah ', 0), ('dived times ball managed save rest bounds ', 0), ('body feels itchy like fire ', 0), ('behaving im mad cant ', 0), ('crew ', 0), ('need hug ', 0), ('hey long time yes rains bit bit lol im fine thanks hows ', 0), ('nope didnt ', 0), ('que muera ', 0)]


## Put into a PySpark Dataframe

In [14]:
tweetColumns = ["cleanTweet","trueTarget"] #Put the column names we want to use
sentDF = spark.createDataFrame(data = tweetSentiment, schema = tweetColumns) #Put the data into a new dataframe
sentDF.cache() #Cashe the dataset
sentDF.show(10) #Take a peek at the new dataset

+--------------------+----------+
|          cleanTweet|trueTarget|
+--------------------+----------+
|awww thats bummer...|         0|
|upset cant update...|         0|
|dived times ball ...|         0|
|body feels itchy ...|         0|
|behaving im mad c...|         0|
|               crew |         0|
|           need hug |         0|
|hey long time yes...|         0|
|         nope didnt |         0|
|          que muera |         0|
+--------------------+----------+
only showing top 10 rows



---

# Fix cleanTweet Nulls

In [15]:
sentDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sentDF.columns]).show() #Check for null values

+----------+----------+
|cleanTweet|trueTarget|
+----------+----------+
|         1|         0|
+----------+----------+



In [16]:
#Take a peek at the non-null counts
print("Tweet: ", sentDF.filter(isnan(sentDF.cleanTweet) == False).count(), "\nTarget: ", sentDF.filter(isnan(sentDF.trueTarget) == False).count())

Tweet:  1599999 
Target:  1600000


In [17]:
sentDF = sentDF.where(isnan(sentDF.cleanTweet) == False) #Remove the null value
sentDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sentDF.columns]).show() #Check for null values

+----------+----------+
|cleanTweet|trueTarget|
+----------+----------+
|         0|         0|
+----------+----------+



---

# Train-Test Split the Data

In [18]:
Train_Test = sentDF.randomSplit([0.75, 0.25]) #Split the data 75-25
sentTrain = Train_Test[0] #Put the train data into its own variable
sentTest = Train_Test[1] #Put the test data into its own variable
print("Train: ", sentTrain.count(), "\nTest: ", sentTest.count()) #Print split numbers

Train:  1199942 
Test:  400057


---

# Build Pipeline to Tokenize/Vectorize the Tweets

Source: https://classes.ischool.syr.edu/ist718/content/unit09/lab-sentiment_analysis/

In [19]:
tokenizer = Tokenizer().setInputCol("cleanTweet").setOutputCol("tokenTweet") #Build the tokenizer
vectorizer = CountVectorizer().setInputCol("tokenTweet").setOutputCol("features") #Build the vectors
forest = RandomForestClassifier(labelCol = "trueTarget", featuresCol="features", numTrees = 3, maxDepth = 16) #Build the forest classifier
mlPipe = Pipeline(stages = [tokenizer, vectorizer, forest]) #Build the pipeline to do all above steps

---

# Build the Model

In [20]:
#model = mlPipe.fit(sentTrain) #Fit the data

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 44184)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/opt/conda/lib/python3.7/site-packages/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/opt/conda/lib/python3.7/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/opt/conda/lib/python3.7/site-packages/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(sel

Py4JError: An error occurred while calling o216.fit

In [None]:
#predictions = model.transform(sentTest) #Predict the data

In [None]:
#predictions.show(5) #Show the prediction table