In [1]:
################################################################################################################################
# Author        : - Abdullah Abdelhakeem                                                                                       #  
# Date          : - 24-9-2021                                                                                                  #  
# Version       : - v 0.0.1                                                                                                    #  
# Project       : - Real Time Streaming Sentiment Analysis (+ve , -ve , N) Twitter.                                                    #  
#                                                                                                                              # 
# Dependencies  : -   1- Tweepy            
#                     2- Apache Kafka      
#                     3- Apache Spark      
#                     4- kafka-python      
#                     5- pySpark           
#                     6- Delta Lake package                                                                                                         
#
#
#
#                                                                                                                              #
# Steps :                                                                                                                      # 
#        1- install Dependency                                                                                                 # 
#        2- Load Data                                                                                                          # 
#        3- Cleaning the Data                                                                                                  # 
#        4- PreProcessing the Data                                                                                             # 
#        5- Feature Selection                                                                                                  # 
#        6- Tokenize the data                                                                                               # 
#        7- Split the data 
#        8- Create an ML Pipeline
#        9- tarin the model(fit) and predict
#        10-Calculate Accuracy
#        11-save the model for deploying
#        12-load the model saved(check)
#                                                                                                                              #
###############################################################################################################################

In [2]:
################################################################################################################################ 
#                                                                                                                              #
# Requirements :                                                                                                               # 
#        1- Tweepy                                                                                                             # 
#        2- Apache Kafka                                                                                                       # 
#        3- Apache Spark                                                                                                       # 
#        4- kafka-python                                                                                                       # 
#        5- pySpark                                                                                                            # 
#        6- Delta Lake package                                                                                                 # 
#                                                                                                                              # 
#                                                                                                                              #          
###############################################################################################################################

In [3]:
'''
Reads 'Sentiment140' dataset, trains and saves the pipeline 
using SparkML
'''

import findspark
findspark.init()
import pyspark


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf
import re

from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pathlib import Path

from pyspark.sql.types import StringType, StructType, StructField, ArrayType
from pyspark.sql.functions import udf, from_json, col

from pyspark.ml import PipelineModel

import re
from datetime import datetime
from pathlib import Path

from tweepy import OAuthHandler, StreamListener
from tweepy import Stream, API
# from kafka import KafkaProducer

import json
from dateutil.parser import parse
import re

In [4]:
# # Include information necessary for Twitter API authentication
# Developer Account Take 15 days
# access_token = 'ACCESS_TOKEN'
# access_token_secret = 'ACCESS_TOKEN_SECRET'
# consumer_key = 'CONSUMER_KEY'
# consumer_secret = 'CONSUMER_SECRET'

In [5]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName("RDDApp").getOrCreate()
spark = SparkSession.builder.getOrCreate()
sc =spark.sparkContext

In [6]:
sqlcontext = SQLContext(sc)

# Load The Data

In [7]:
'''
Sentiment Analysis dataset
0 - the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)
1 - the id of the tweet 
2 - the date of the tweet 
3 - the query . If there is no query, then this value is NO_QUERY.
4 - the user that tweeted 
5 - the text of the tweet 
'''

raw_data = sqlcontext \
    .read \
    .format('csv') \
    .options(header=False) \
    .load("train.csv") \
    .selectExpr("_c0 as sentiment","_c1 as id","_c2 as date ","_c3 as query","_c4 as user","_c5 as message")


In [8]:
raw_data.show()

+---------+----------+--------------------+--------+---------------+--------------------+
|sentiment|        id|                date|   query|           user|             message|
+---------+----------+--------------------+--------+---------------+--------------------+
|        0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|        0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|        0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|        0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|        0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|        0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|        0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|        0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|        0

In [9]:
raw_data.printSchema()

root
 |-- sentiment: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user: string (nullable = true)
 |-- message: string (nullable = true)



In [10]:
df = sqlcontext \
    .read \
    .format('csv') \
    .options(header=False) \
    .load("train.csv") \
    .selectExpr("_c0 as sentiment", "_c5 as message")

In [11]:
df.show()

+---------+--------------------+
|sentiment|             message|
+---------+--------------------+
|        0|@switchfoot http:...|
|        0|is upset that he ...|
|        0|@Kenichan I dived...|
|        0|my whole body fee...|
|        0|@nationwideclass ...|
|        0|@Kwesidei not the...|
|        0|         Need a hug |
|        0|@LOLTrish hey  lo...|
|        0|@Tatiana_K nope t...|
|        0|@twittera que me ...|
|        0|spring break in p...|
|        0|I just re-pierced...|
|        0|@caregiving I cou...|
|        0|@octolinz16 It it...|
|        0|@smarrison i woul...|
|        0|@iamjazzyfizzle I...|
|        0|Hollis' death sce...|
|        0|about to file taxes |
|        0|@LettyA ahh ive a...|
|        0|@FakerPattyPattz ...|
+---------+--------------------+
only showing top 20 rows



# Tokenize the data

In [12]:
pre_process = udf(
    lambda x: re.sub(r'[^A-Za-z\n ]|(http\S+)|(www.\S+)', '', \
        x.lower().strip()).split(), ArrayType(StringType())
    )
df = df.withColumn("cleaned_data", pre_process(df.message)).dropna()

In [13]:
df.show()

+---------+--------------------+--------------------+
|sentiment|             message|        cleaned_data|
+---------+--------------------+--------------------+
|        0|@switchfoot http:...|[switchfoot, awww...|
|        0|is upset that he ...|[is, upset, that,...|
|        0|@Kenichan I dived...|[kenichan, i, div...|
|        0|my whole body fee...|[my, whole, body,...|
|        0|@nationwideclass ...|[nationwideclass,...|
|        0|@Kwesidei not the...|[kwesidei, not, t...|
|        0|         Need a hug |      [need, a, hug]|
|        0|@LOLTrish hey  lo...|[loltrish, hey, l...|
|        0|@Tatiana_K nope t...|[tatianak, nope, ...|
|        0|@twittera que me ...|[twittera, que, m...|
|        0|spring break in p...|[spring, break, i...|
|        0|I just re-pierced...|[i, just, repierc...|
|        0|@caregiving I cou...|[caregiving, i, c...|
|        0|@octolinz16 It it...|[octolinz, it, it...|
|        0|@smarrison i woul...|[smarrison, i, wo...|
|        0|@iamjazzyfizzle I

In [14]:
# df.select('cleaned_data').toPandas()

# Split the data into training and testing

In [15]:
train, test = df.randomSplit([0.8,0.2],seed = 100)

In [16]:
train.show()

+---------+--------------------+--------------------+
|sentiment|             message|        cleaned_data|
+---------+--------------------+--------------------+
|        0|           FUCK YOU!|         [fuck, you]|
|        0|          i want ...|[i, want, some, b...|
|        0|        my head f...|[my, head, feels,...|
|        0|      My current ...|[my, current, hea...|
|        0|      this weeken...|[this, weekend, h...|
|        0|            #canucks|           [canucks]|
|        0|     &lt;- but mu...|[lt, but, mustach...|
|        0|     I dont like ...|[i, dont, like, t...|
|        0|     I'll get on ...|[ill, get, on, it...|
|        0|     ok thats it ...|[ok, thats, it, y...|
|        0|     what the fuc...|[what, the, fuccc...|
|        0|    I just cut my...|[i, just, cut, my...|
|        0|    on the comput...|[on, the, compute...|
|        0|       wompppp wompp|    [wompppp, wompp]|
|        0|   *old me's dead...|[old, mes, dead, ...|
|        0|   ...  Headed to

In [17]:
import pyspark
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [18]:
print(train.shape())

(1279865, 3)


In [19]:
test.show()

+---------+--------------------+--------------------+
|sentiment|             message|        cleaned_data|
+---------+--------------------+--------------------+
|        0|       i really2 ...|[i, really, dont,...|
|        0|     jb isnt show...|[jb, isnt, showin...|
|        0|    Not feeling i...|[not, feeling, it...|
|        0|   Boston Globe c...|[boston, globe, c...|
|        0|   My phone can u...|[my, phone, can, ...|
|        0|   hoping to see ...|[hoping, to, see,...|
|        0|   i'm so cold th...|[im, so, cold, th...|
|        0|   kinda but not ...|[kinda, but, not,...|
|        0|   the batt to my...|[the, batt, to, m...|
|        0|  *pout*  I want ...|[pout, i, want, s...|
|        0|  2 orders to fil...|[orders, to, fill...|
|        0|  Another expensi...|[another, expensi...|
|        0|  Just got to wor...|[just, got, to, w...|
|        0|  are there any p...|[are, there, any,...|
|        0|  but chilli praw...|[but, chilli, pra...|
|        0|  i was too slow 

In [20]:
test.shape()

(320135, 3)

# Create an ML Pipeline

In [21]:
# Peforms TF-IDF calculation and Logistic Regression
remover = StopWordsRemover(inputCol="cleaned_data", outputCol="words")
vector_tf = CountVectorizer(inputCol="words", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="features", minDocFreq=3)
label_indexer = StringIndexer(inputCol = "sentiment", outputCol = "label")
lr_model = LogisticRegression(maxIter=100)

pipeline = Pipeline(stages=[remover, vector_tf, idf, label_indexer, lr_model])

# Fit the pipeline to the training dataframe

In [22]:
trained_model = pipeline.fit(train)

# Predicting the test dataframe 

In [23]:
'''
The labels are labelled with positive (4) as 0.0 
negative (0) as 1.0
'''
prediction_df = trained_model.transform(test)
prediction_df.printSchema()

root
 |-- sentiment: string (nullable = true)
 |-- message: string (nullable = true)
 |-- cleaned_data: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [24]:
prediction_df.select("message","features","label","prediction").show(100)

+--------------------+--------------------+-----+----------+
|             message|            features|label|prediction|
+--------------------+--------------------+-----+----------+
|       i really2 ...|(262144,[4,6,19,5...|  1.0|       1.0|
|     jb isnt show...|(262144,[180,1157...|  1.0|       1.0|
|    Not feeling i...|(262144,[7,55,99,...|  1.0|       1.0|
|   Boston Globe c...|(262144,[72,1096,...|  1.0|       1.0|
|   My phone can u...|(262144,[5,10,40,...|  1.0|       1.0|
|   hoping to see ...|(262144,[20,177,1...|  1.0|       0.0|
|   i'm so cold th...|(262144,[0,36,224...|  1.0|       1.0|
|   kinda but not ...|(262144,[19,327],...|  1.0|       0.0|
|   the batt to my...|(262144,[14,146,3...|  1.0|       1.0|
|  *pout*  I want ...|(262144,[23,63,56...|  1.0|       1.0|
|  2 orders to fil...|(262144,[1,5,56,6...|  1.0|       0.0|
|  Another expensi...|(262144,[77,106,3...|  1.0|       1.0|
|  Just got to wor...|(262144,[11,12,25...|  1.0|       0.0|
|  are there any p...|(2

In [25]:
prediction_df.select("features").show(1 , truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|features                                                                                                      |
+--------------------------------------------------------------------------------------------------------------+
|(262144,[4,6,19,5432,182025],[3.0637558705620007,3.1999263774213245,3.5356513945264174,9.085532200495107,0.0])|
+--------------------------------------------------------------------------------------------------------------+
only showing top 1 row



# Calculate Accuracy

In [26]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(prediction_df)
print(accuracy)

0.8323899499569996


In [27]:
from pyspark.ml.evaluation import RegressionEvaluator
regEval = RegressionEvaluator(predictionCol ='prediction', labelCol='label',metricName='rmse')
regEval.evaluate(prediction_df)

0.47713096575915465

In [28]:
regEval_r2 = RegressionEvaluator(predictionCol ='prediction', labelCol='label',metricName='r2')
regEval_r2.evaluate(prediction_df)

0.08938394704275376

# Save the pipeline model

In [29]:
# trained_model.save("fittedpipeline_Model")

# Save the Model for Deploy

In [30]:
trained_model.write()\
             .overwrite() \
             .save("ModelDir")

# Load the Model Saved

In [31]:
# read pickled model via pipeline api
from pyspark.ml.pipeline import PipelineModel
persistedModel = PipelineModel.load("ModelDir")

In [32]:
# predict
predictionsDF = persistedModel.transform(test)

In [33]:
predictionsDF.select("message","features","label","prediction").show(5)

+--------------------+--------------------+-----+----------+
|             message|            features|label|prediction|
+--------------------+--------------------+-----+----------+
|       i really2 ...|(262144,[4,6,19,5...|  1.0|       1.0|
|     jb isnt show...|(262144,[180,1157...|  1.0|       1.0|
|    Not feeling i...|(262144,[7,55,99,...|  1.0|       1.0|
|   Boston Globe c...|(262144,[72,1096,...|  1.0|       1.0|
|   My phone can u...|(262144,[5,10,40,...|  1.0|       1.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows



# Thank you! Abdullah Abdelhakeem