In [1]:
#Creating a spark session
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('COVID').getOrCreate()

In [2]:
#Importing the required libraries
%matplotlib inline


from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
import matplotlib.pyplot as plt; plt.rcdefaults()
import numpy as np
import os, tempfile
import csv
import string
import random

import re

In [3]:
#Loading the data
data = spark.read.csv('Corona_NLP_train.csv',inferSchema=True, header=True)

## Data Exploration

In [4]:
data.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|            UserName|          ScreenName|            Location|             TweetAt|         Sentiment|       OriginalTweet|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|                3799|               48751|              London|          16-03-2020|           Neutral|@MeNyrbie @Phil_G...|
|                3800|               48752|                  UK|          16-03-2020|          Positive|advice Talk to yo...|
|                3801|               48753|           Vagabonds|          16-03-2020|          Positive|Coronavirus Austr...|
|                3802|               48754|                null|          16-03-2020|          Positive|My food stock is ...|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|              null|              

In [5]:
# Data Prepration

In [6]:
from pyspark.sql.functions import length

In [7]:
#Checking the length of tweets
data=data.withColumn('length', length(data['OriginalTweet']))

In [8]:
data.show()

+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------+
|            UserName|          ScreenName|            Location|             TweetAt|         Sentiment|       OriginalTweet|length|
+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------+
|                3799|               48751|              London|          16-03-2020|           Neutral|@MeNyrbie @Phil_G...|   111|
|                3800|               48752|                  UK|          16-03-2020|          Positive|advice Talk to yo...|   237|
|                3801|               48753|           Vagabonds|          16-03-2020|          Positive|Coronavirus Austr...|   131|
|                3802|               48754|                null|          16-03-2020|          Positive|My food stock is ...|    51|
|              PLEASE|         don't panic| THERE WILL BE EN...|     

In [9]:
#Counting the number of rows
data.count()

68046

In [10]:
#Viewing the various columns
data.columns

['UserName',
 'ScreenName',
 'Location',
 'TweetAt',
 'Sentiment',
 'OriginalTweet',
 'length']

In [11]:
# Check For Missing Values
data.toPandas().isnull().sum()

UserName             4
ScreenName       24072
Location         34547
TweetAt          26552
Sentiment        26752
OriginalTweet    26835
length           26835
dtype: int64

In [12]:
# Drop Missing Values
data = data.dropna(subset=('Sentiment'))
data = data.dropna(subset=('OriginalTweet'))

In [13]:
data = data.select('Sentiment', 'OriginalTweet')

In [14]:
data.show()

+------------------+--------------------+
|         Sentiment|       OriginalTweet|
+------------------+--------------------+
|           Neutral|@MeNyrbie @Phil_G...|
|          Positive|advice Talk to yo...|
|          Positive|Coronavirus Austr...|
|          Positive|My food stock is ...|
|Extremely Negative|Me, ready to go a...|
|          Positive|As news of the re...|
|          Positive|"Cashier at groce...|
|           Neutral|Was at the superm...|
|          Positive|Due to COVID-19 o...|
|          Negative|For corona preven...|
|           Neutral|All month there h...|
|Extremely Positive|Due to the Covid-...|
|Extremely Positive|#horningsea is a ...|
|          Positive|Me: I don't need ...|
|          Positive|ADARA Releases CO...|
|          Positive|Lines at the groc...|
|           Neutral|????? ????? ?????...|
|           Neutral|"@eyeonthearctic ...|
|Extremely Positive|Amazon Glitch Sty...|
|          Positive|For those who are...|
+------------------+--------------

In [15]:
#Total number of rows
data.count()

41211

## Cleaning Tweets

In [16]:
#Importing Libraries for cleaning the tweets

In [17]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [18]:
#Cleaning the tweet
data = data.withColumn('OriginalTweet', F.regexp_replace('OriginalTweet', r'http\S+', ''))
data = data.withColumn('OriginalTweet', F.regexp_replace('OriginalTweet', '@\w+', ''))
data = data.withColumn('OriginalTweet', F.regexp_replace('OriginalTweet', '#', ''))
data = data.withColumn('OriginalTweet', F.regexp_replace('OriginalTweet', 'RT', ''))
data = data.withColumn('OriginalTweet', F.regexp_replace('OriginalTweet', ':', ''))

In [19]:
data.show()

+------------------+--------------------+
|         Sentiment|       OriginalTweet|
+------------------+--------------------+
|           Neutral|           and  and |
|          Positive|advice Talk to yo...|
|          Positive|Coronavirus Austr...|
|          Positive|My food stock is ...|
|Extremely Negative|Me, ready to go a...|
|          Positive|As news of the re...|
|          Positive|"Cashier at groce...|
|           Neutral|Was at the superm...|
|          Positive|Due to COVID-19 o...|
|          Negative|For corona preven...|
|           Neutral|All month there h...|
|Extremely Positive|Due to the Covid-...|
|Extremely Positive|horningsea is a c...|
|          Positive|Me I don't need t...|
|          Positive|ADARA Releases CO...|
|          Positive|Lines at the groc...|
|           Neutral|????? ????? ?????...|
|           Neutral|" 16MAR20 Russia ...|
|Extremely Positive|Amazon Glitch Sty...|
|          Positive|For those who are...|
+------------------+--------------

In [20]:
#Converting the columns to lower case
from pyspark.sql.functions import lower, col

In [21]:
data = data.withColumn('Sentiment', lower(col('Sentiment')))
data = data.withColumn('OriginalTweet', lower(col('OriginalTweet')))

In [22]:
data.show()

+------------------+--------------------+
|         Sentiment|       OriginalTweet|
+------------------+--------------------+
|           neutral|           and  and |
|          positive|advice talk to yo...|
|          positive|coronavirus austr...|
|          positive|my food stock is ...|
|extremely negative|me, ready to go a...|
|          positive|as news of the re...|
|          positive|"cashier at groce...|
|           neutral|was at the superm...|
|          positive|due to covid-19 o...|
|          negative|for corona preven...|
|           neutral|all month there h...|
|extremely positive|due to the covid-...|
|extremely positive|horningsea is a c...|
|          positive|me i don't need t...|
|          positive|adara releases co...|
|          positive|lines at the groc...|
|           neutral|????? ????? ?????...|
|           neutral|" 16mar20 russia ...|
|extremely positive|amazon glitch sty...|
|          positive|for those who are...|
+------------------+--------------

## Performing Features Transformation

In [23]:
# Features Transformation
import pyspark.ml.feature

In [24]:
#Creating various stages for pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="token_tweet")
stopremove=StopWordsRemover(inputCol="token_tweet", outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf=IDF(inputCol="c_vec", outputCol="tf_idf")

# Converting the labels/sentiment to numbers(Label Encoding)
labelEncoder = StringIndexer(inputCol="Sentiment", outputCol='label')

In [25]:
data.show(10)

+------------------+--------------------+
|         Sentiment|       OriginalTweet|
+------------------+--------------------+
|           neutral|           and  and |
|          positive|advice talk to yo...|
|          positive|coronavirus austr...|
|          positive|my food stock is ...|
|extremely negative|me, ready to go a...|
|          positive|as news of the re...|
|          positive|"cashier at groce...|
|           neutral|was at the superm...|
|          positive|due to covid-19 o...|
|          negative|for corona preven...|
+------------------+--------------------+
only showing top 10 rows



In [26]:
#Transforming the list of columns into a single vector column features
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf'], outputCol='features')

## Building the Pipeline 

In [27]:
from pyspark.ml import Pipeline
data_pipeline= Pipeline(stages=[labelEncoder, tokenizer, stopremove,count_vec, idf,clean_up])

In [28]:
data_pipeline_fit=data_pipeline.fit(data)

In [29]:
data_final=data_pipeline_fit.transform(data)

In [30]:
#Checking the data after the pipeline implementation 
data_final.show()

+------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|         Sentiment|       OriginalTweet|label|         token_tweet|         stop_tokens|               c_vec|              tf_idf|            features|
+------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|           neutral|           and  and |  2.0|[, , , , and, , and]|          [, , , , ]|   (67136,[0],[5.0])|(67136,[0],[6.380...|(67136,[0],[6.380...|
|          positive|advice talk to yo...|  0.0|[advice, talk, to...|[advice, talk, ne...|(67136,[14,15,132...|(67136,[14,15,132...|(67136,[14,15,132...|
|          positive|coronavirus austr...|  0.0|[coronavirus, aus...|[coronavirus, aus...|(67136,[1,6,15,68...|(67136,[1,6,15,68...|(67136,[1,6,15,68...|
|          positive|my food stock is ...|  0.0|[my, food, stock,...|[food, stock, 

In [31]:
data_final=data_final.select(['label', 'features'])

In [32]:
data_final.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(67136,[0],[6.380...|
|  0.0|(67136,[14,15,132...|
|  0.0|(67136,[1,6,15,68...|
|  0.0|(67136,[5,33,35,1...|
|  4.0|(67136,[4,13,22,4...|
|  0.0|(67136,[0,6,8,25,...|
|  0.0|(67136,[3,7,20,59...|
|  2.0|(67136,[4,41,52,4...|
|  0.0|(67136,[0,6,7,14,...|
|  1.0|(67136,[11,14,15,...|
|  2.0|(67136,[55,70,78,...|
|  3.0|(67136,[5,6,24,32...|
|  3.0|(67136,[14,15,28,...|
|  0.0|(67136,[1,17,35,3...|
|  0.0|(67136,[6,9,28,49...|
|  0.0|(67136,[3,7,115,3...|
|  2.0|(67136,[79,633],[...|
|  2.0|(67136,[6,9,127,3...|
|  3.0|(67136,[3,322,399...|
|  0.0|(67136,[5,6,24,40...|
+-----+--------------------+
only showing top 20 rows



## Train Test Split

In [34]:
# Split the data into training and test sets
(training, testing)=data_final.randomSplit([0.7,0.3],seed=123 )

## Classification Models

In [35]:
#Creating 2 classification models
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier

rf=RandomForestClassifier(numTrees=50)
dtc=DecisionTreeClassifier(maxDepth=5)


In [40]:
#ML Training DecisionTreeClassifier model

In [41]:
#Fit Decision Tree Classification Model
sentiment_predictor_dtc=dtc.fit(training)

In [42]:
# Make predictions using Decision Tree Classification Model
test_results_dtc=sentiment_predictor_dtc.transform(testing)
test_results_dtc.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (67136,[],[])|[7784.0,6887.0,60...|[0.27963787900560...|       0.0|
|  0.0|       (6

In [43]:
# Accuracy of the DecisionTreeClassifier model

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [45]:
acc_eval_dtc=MulticlassClassificationEvaluator()
acc_dtc=acc_eval_dtc.evaluate(test_results_dtc)

In [46]:
print ("Accuracy of the Decision Tree Classifier model is::", acc_dtc)

Accuracy of the Decision Tree Classifier model is:: 0.20345812658311346


In [36]:
#ML Training RandomForestClassifier model

In [37]:
#Fit RandomForestClassifier Model
sentiment_predictor_rf=rf.fit(training)

In [38]:
# Make predictions
test_results_rf=sentiment_predictor_rf.transform(testing)

In [39]:
test_results_rf.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (67136,[],[])|[13.8117830542577...|[0.27623566108515...|       0.0|
|  0.0|       (6

In [40]:
# Accuracy of the RandomForestClassifier model

In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [42]:
acc_eval_rf=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc_rf=acc_eval_rf.evaluate(test_results_rf)

print ("Accuracy of the model is::", acc_rf)

Accuracy of the model is:: 0.28174317071216853


In [44]:
print ("Accuracy of the model is::", acc_rf*100 ,"%")

Accuracy of the model is:: 28.174317071216855 %
