# Real-Time Sentiment Analysis Task using Spark for English comments in Twitter 

this notenook contains :-

1- Data Analysis

2- Data Cleaning & NLP Processing

3- NLP Pipeline and ML Model Training & Tesing with accuracy about 78% 

4- (Extra! )Pipeline Evaluation on Real-Life Conversations & Rotten Tomatoes Reviews 

5- Real-time Streaming Sentiment Analysis on Real Tweets tracked on different keywords like Egypt, Usa, happy, sad , feeling and so on 

6- Deployment(Bonus Part) , Wep App is implemented to take a keyword and show table consisting of streamed tweers with it's corresponding sentiment prediction


# 1- Import necessary packages 

In [None]:
 import pyspark
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

from pyspark.sql import SparkSession 

import pandas as pd




import re 
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString,CountVectorizer 

from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline ,PipelineModel #Build a pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp import DocumentAssembler


import os
import gc





lets start the session

In [None]:
from pyspark.sql import SparkSession #Import the spark session
from pyspark import SparkContext #Create a spark context
from pyspark.sql import SQLContext #Create an SQL context

import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.executor.memory", "12g").config("spark.driver.memory", "12g")\
    .config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g")\
    .config('spark.executor.cores', '3').config('spark.cores.max', '3')\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.2.3").getOrCreate()

# 2- Data Analysis and Preparation

please note that the file of the training won't be included in the submitted folder due to submission size 

the dataset used :https://www.kaggle.com/kazanova/sentiment140?select=training.1600000.processed.noemoticon.csv 

In [None]:
training_data = spark.read.csv(os.getcwd()+"/training_data.csv", inferSchema = True, header = False) #Read in the data
#training_data.show(10)


In [None]:
columns = ["target", "id", "date", "flag", "user", "tweet"]  

training_data = training_data.select(col("_c0").alias(columns[0]), col("_c1").alias(columns[1]), col("_c2").alias(columns[2]),
                      col("_c3").alias(columns[3]), col("_c4").alias(columns[4]), col("_c5").alias(columns[5]))
#training_data.show(10) 

No need for ids , data , flag and user data for  our target analysis

In [None]:
training_data = training_data.select('target' ,'tweet')
#training_data.show(10) 

let's process our data ! 

In [None]:
training_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in training_data.columns]).show()


+------+-----+
|target|tweet|
+------+-----+
|     0|    0|
+------+-----+



we have no empyt data , so no need for imputation :)

let's check out target values distribution

In [None]:
training_data.groupBy("target").count().orderBy("count").show()

+------+------+
|target| count|
+------+------+
|     0|800000|
|     4|800000|
+------+------+



No neutral values !! 

Standarize our target values into 0s and 1s 

In [None]:
training_data = training_data.withColumn("target", when(training_data["target"] == 4, 1).otherwise(training_data["target"]))
training_data.groupBy("target").count().orderBy("count").show()

+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+



# The assumption here.. There's no neutral values 

# 1 => postive (Happy)  & 0 => negative (Sad)

let's know more about the nature of our tweets data 

In [None]:
training_data.select("tweet").show(20,truncate= False)

+---------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                |
+---------------------------------------------------------------------------------------------------------------------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D  |
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!      |
|@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                            |
|my whole body feels itchy and like its on fire                                                                       |
|@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.       |
|@Kwesidei not the whole crew           

# Tweets need to be more cleans..

mentions , links , hashtags and HTML elements .. have to be removed from our data

In [None]:
training_data.count()

1600000

In [None]:

training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', r'http\S+', '')) 
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '@\w+', '')) 
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '#', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))


training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&amp;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&quot;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&gt;', ''))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '&lt;', ''))


training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '-', ''))

training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '   ', ' '))
training_data = training_data.withColumn('tweet', F.regexp_replace('tweet', '  ', ' '))


training_data = training_data.filter((training_data.tweet!= ' ') &(training_data.tweet!= '')& (training_data.tweet!= '   '))

In [None]:
training_data.count()

1597182

In [None]:
training_data.groupBy("target").count().orderBy("count").show()

+------+------+
|target| count|
+------+------+
|     0|798491|
|     1|798691|
+------+------+



Now we can split randomly our training data into train and test set 

In [None]:
Train_Test_sets = training_data.randomSplit([0.75, 0.25])

In [None]:
train_set = Train_Test_sets[0] 
test_set = Train_Test_sets[1] 


In [None]:
training_data.select("tweet").show(15,truncate= False)

+--------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
| Awww, that's a bummer. You shoulda got David Carr of Third Day to do it. ;D                                  |
|is upset that he can't update his Facebook by texting it... and might cry as a result School today also. Blah!|
| I dived many times for the ball. Managed to save 50% The rest go out of bounds                               |
|my whole body feels itchy and like its on fire                                                                |
| no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.                |
| not the whole crew                                                                            

The data is more clean now

# 3- Pipeline and Training SVM

The pipeline based on sparkNLP annotators and it consits of 10 stages 

# 3-a Turn tweets into documents (Document Assembler)

this is a basic step to start work with sparkNLP annotators

In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol("tweet") \
    .setOutputCol("document")


# 3-b Create sentences from documents (Sentences Detector )

In [None]:
dentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")


# 3-c Turn these sentences into tokens (Tokenizer)

in this stage, sentences are split into words

In [None]:
tokenizer = Tokenizer() \
  .setInputCols(["sentence"]) \
  .setOutputCol("token")



# 3-d Remove stop words from tokens (Stop Words Cleaner)

stop words like I, you, me and so on are removed

In [None]:
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("token")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

# 3- e,f Remove punctautions and turn the documented_tokens into array of tokens (Normalizer , Finisher)

In [None]:
normalizer = Normalizer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)

finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["token_features"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)# To generate Term Frequency


# 3-g Hashing the tokens (hashingTF or Vectorization)

In [None]:
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures")# To generate Inverse Document Frequency


In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

# 3-h Classification based on the hashed tokens using Suppor Vector Machine

Our problem is a binary classification one. We could use many classifier based on ML like SVM , XGBoost ,Decision tree, Random Forest

and we can use DNN to get a higher accuracy. But due to ram space and time constraints 

I had to choose a ML-approach which is the SVM. 



In [None]:
SVC = LinearSVC(labelCol = "target", featuresCol="features",maxIter=13, regParam=0.2)

let's create the promising Pipeline 

In [None]:
nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            dentence_detector,
            tokenizer,
            stopwords_cleaner,
            normalizer,
            finisher,
            hashingTF,
            idf,
            SVC])

In [None]:
pipeline_model = nlp_pipeline.fit(train_set)
print("Training finally Done !!!!")

Training finally Done !!!!


# 4- Evaluation on training and testing tweets sets

In [None]:
def evaluate(input_set):
    results=pipeline_model.transform(input_set)
    evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(results)
    print("Accuracy = %g" % (accuracy))
    print("Error = %g " % (1.0 - accuracy))
    return accuracy

# THE ACCURACY

In [None]:
evaluate(train_set)

Accuracy = 0.792393
Error = 0.207607 


0.7923931156785957

In [None]:
evaluate(test_set)

Accuracy = 0.771817
Error = 0.228183 


0.7718168384056234

In [None]:
pipeline_model.save("/pipeline")

After many trails and experiments , I could get a goot results which are around 79.2%  and 77.2% on train and test sets, respectively !!

I considerd it an achievement, as the satate of art pipeline could achieve only about 80 % .



# **** TO RUN MY PIPELINE ****

Kindly import the neccesay packages and start a session as explained above ans resume execution from here instead of training the model yourself

please make sure that you point to the correct pipeline path

In [None]:
pipeline_model=PipelineModel.load("/pipeline")

def predict(line): # function to make a predection on a tweet or line and outout happy or sad
    sample_df = spark.createDataFrame([[str(line)]]).toDF('tweet')
    #-- preprocessing---
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', r'http\S+', '')) 
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '@\w+', '')) 
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '#', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))


    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&amp;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&quot;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&gt;', ''))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '&lt;', ''))


    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '-', ''))

    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '   ', ' '))
    sample_df = sample_df.withColumn('tweet', F.regexp_replace('tweet', '  ', ' '))

    
    #---
    
    result = pipeline_model.transform(sample_df)
    sentiment = result.select('prediction').first()[0]
    if(sentiment == 1):
        sentiment = "Happy"
        print (str(line)+ " =====> "+"HAPPY")
    else:
        sentiment = "Sad"
        print(str(line)+ " =====> "+"HAPPY")

    return line , sentiment

#on a Real-Life Conversations

In [None]:
predict("Iam really happy right now.") # =>1
predict("Easy Task! ")# =>1
predict("I will be sad if not accepted") #=>0
predict("I am alone")# =>0
predict("My day was full of good events but at the end , a car hit me and broke my leg")# =>0
predict("Death.") #=>0
predict("I failed in my last exam") #=>0
predict("my dad bought me a new car") #=>1
predict("the new car my dad bought me was crashed :(") #=>0
predict("I am nervous") #=>0
predict("I helped many people today") #=>1


Iam really happy right now. =====> HAPPY
Easy Task!  =====> HAPPY
I will be sad if not accepted =====> HAPPY
I am alone =====> HAPPY
My day was full of good events but at the end , a car hit me and broke my leg =====> HAPPY
Death. =====> HAPPY
I failed in my last exam =====> HAPPY
my dad bought me a new car =====> HAPPY
the new car my dad bought me was crashed :( =====> HAPPY
I am nervous =====> HAPPY
I helped many people today =====> HAPPY


('I helped many people today', 'Happy')

--

All of them predicted correctly !! 

dataset: https://www.kaggle.com/c/sentiment-analysis-on-movie-reviews/data


I use the train_set only

In [None]:
rotten_set = spark.read.csv(os.getcwd()+"/reviews.tsv", sep=r'\t', header=True)

In [None]:
rotten_set.show(10,truncate= True)

+--------------------+------+
|               tweet|target|
+--------------------+------+
|A series of escap...|   0.0|
|  good for the goose|   1.0|
|                good|   1.0|
|the gander , some...|   0.0|
|              amuses|   1.0|
|but none of which...|   0.0|
|none of which amo...|   0.0|
|This quiet , intr...|   1.0|
|This quiet , intr...|   1.0|
|quiet , introspec...|   1.0|
+--------------------+------+
only showing top 10 rows



In [None]:
rotten_set = rotten_set.select('Phrase' ,'Sentiment')

In [None]:
rotten_set.groupBy("Sentiment").count().orderBy("count").show()

+---------+-----+
|Sentiment|count|
+---------+-----+
|        0| 7072|
|        4| 9206|
|        1|27273|
|        3|32927|
|        2|79582|
+---------+-----+



In [None]:
rotten_set = rotten_set.withColumn("Sentiment", when(rotten_set["Sentiment"] ==1, 0).otherwise(rotten_set["Sentiment"]))
rotten_set = rotten_set.withColumn("Sentiment", when(rotten_set["Sentiment"] ==3, 4).otherwise(rotten_set["Sentiment"]))
rotten_set = rotten_set.withColumn("Sentiment", when(rotten_set["Sentiment"] ==4, 1).otherwise(rotten_set["Sentiment"]))
rotten_set = rotten_set.filter((rotten_set.Sentiment!= 2))
rotten_set.groupBy("Sentiment").count().orderBy("count").show()




+---------+-----+
|Sentiment|count|
+---------+-----+
|        0|34345|
|        1|42133|
+---------+-----+



In [None]:
from pyspark.sql.types import DoubleType
rotten_set = rotten_set.select(col("Phrase").alias("tweet"), col("Sentiment").alias("target"))
rotten_set = rotten_set.withColumn("target", rotten_set.target.cast(DoubleType()))


In [None]:
evaluate(rotten_set)

Accuracy = 0.674874
Error = 0.325126 


0.6748738199220691

It's not a bad accuracy. However, the drop in accuracy happens to the difference between the nature of pipeline training data and this data

# The Real-Time Streaming Sentiment Analysis

Now, real tweets is being streamed to be prediceted on my pipeline 

the tweets are tracked on different keywords like Egypt, Usa, happy, sad , feeling and so on 

every tweet is predicted individually in this form, tweet ====> sentiment prediction

# Keywords = {happy,sad,feeling,sentiment}

In [None]:
import tweepy
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
import socket
import json

class TweetsListener(StreamListener):
    
    def on_data(self, data):
        msg = json.loads(data)
        try:
            predict(msg['text'])
        except:
            pass



    def on_error(self, status):
        print(status)
        return True

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener())
twitter_stream.filter(track = ["happy","sad","feeling","sentiment"], languages=["en"])

@FreeTrap2x Mfs don’t even know the definition of misogyny 😂😂sad =====> SAD
(to the tune of The Devil Went Down To Georgia)

🎶 Russell Greer went down to Vegas , he was lookin' for a whore to… https://t.co/r8Vfn5OH3J =====> SAD
RT @BayouBun: Bow Wow came with the steel chair when he said puffy ain’t got a artist wit a milli and happy =====> HAPPY
RT @MeganMorantWWE: I can’t wait for @YaOnlyLivvOnce vs @CarmellaWWE at #ExtremeRules 🙌 =====> HAPPY
It’s real sadboi hours! Hit me with your best sad songs, doesn’t matter what genre. Classical, orchestral, rap, hip hop, doesn’t matter. =====> SAD
RT @BTSupdate_7: Look at Jimin happily dancing and waving 😭😭😭🤗🤗

They are really happy ~

 https://t.co/qe0Em4Z811 =====> HAPPY
RT @DomainDoris: I’m not plotting mischief, I’m an innocent floof! Happy #Caturday pals! 🐾🥰🐾 #CatsOfTwitter #Cat #cats #pets #animals 💙 #Ca… =====> HAPPY
RT @AlwaysRamCharan: Introducing #Siddha's Love #Neelambari ! 
Wishing you all a very Happy Ugadi.
#Acharya
@KChiruTweet

# Keywords = {Egypt,USA}

In [None]:
import tweepy
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
import socket
import json
class TweetsListener(StreamListener):
    def on_data(self, data):
        msg = json.loads(data)
        try:
            predict(msg['text'])
        except:
            pass



    def on_error(self, status):
        print(status)
        return True

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener())
twitter_stream.filter(track = ["egypt","usa"], languages=["en"])

RT @catturd2: 😂😂😂😂

#CatturdsBirthday is the #2 trend in the USA =====> HAPPY
RT @chapps: *Who* is this #handsome rogue? Wonderful details in the hair, and an appropriately Hadrianic beard for c. 125 AD. Sadly, a fune… =====> HAPPY
RT @taehyungpic: 📸#TAEHYUNG  KR 🛫 USA

have a safe flight ♡
FASHION ICON V https://t.co/cbmOlpnkfc =====> HAPPY
RT @taehyungpic: 📸#TAEHYUNG  KR 🛫 USA

have a safe flight ♡
FASHION ICON V https://t.co/cbmOlpnkfc =====> HAPPY
RT @UFOMJLeader: #AncientAliens Just said the Pyramid Text describes the Extraterrestrial gods as coming from Sirius and Orion. The Dogon c… =====> HAPPY
RT @TaehyungUSA: Taehyung Trends USA [NEW]

● Baby Prince V | Entertainment 

Another cute nickname for FASHION ICON V given by the media h… =====> HAPPY
@bwhite_ee @pugpolitics1 @MattBraynard One might even argue that an insurrection is an act of war against the USA a… https://t.co/janeh1DiQp =====> SAD
I’d salute, blare “GOD BLESS THE USA”, all the decorum. A new holiday. A blending of

# SIMPLE WEB APP (Bonus Part)

In [None]:
import tweepy
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
import socket
import json
class TweetsListener(StreamListener):
    def on_data(self, data):
        msg = json.loads(data)
        try:
            tweet,feeling = predict(msg['text'])
            put_table([['tweet', 'sentiment'],[put_text(str(tweet)), put_text(str(feeling))]])
        except:
            pass



    def on_error(self, status):
        print(status)
        return True

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener())


In [None]:
from pywebio.input import input, FLOAT
from pywebio.output import put_text ,put_table 
def doit(word):
    twitter_stream.filter(track = [str(word)], languages=["en"])

def bmi():
    word = input("Enter one keyword: ")
    doit(word)
if __name__ == '__main__':
    bmi()

    


no one makes me happy like ateez does =====> HAPPY
RT @LucasTurnbloom: HOW TO CAT: “Spot” 
.
Happy #caturday everyone!
.
https://t.co/qG1IXo4wUe https://t.co/gPsbTYuxBE =====> HAPPY
@TrustWallet Happy birthday trust wallet and twt thank you for the Best wallet =====> HAPPY
RT @neiljedcastro: We all have our breaking points in this life. We all get tired of waiting and chasing for that dream to happen. We all g… =====> HAPPY
♡                                                       ♡
            put this on your profile
            and who… https://t.co/QLE9chbD3x =====> HAPPY
@tuechainz happy gday my boy 💪🏾 =====> HAPPY
RT @Gatorcwboyfan: 5 ain’t it, bottom line. We all wanted his patience to be rewarded. Fairytales don’t always have a happy ending. =====> HAPPY
RT @mygiorni: 🎶 Happy Birthday Laboon 🎶 

#ONEPIECE https://t.co/RZ3OkPxvZP =====> HAPPY
happy birthday to me im gonna eat king crab and drink g&amp;t’s all night =====> HAPPY
RT @razzparks: happy birthday @JawnRocha, an amazing 