In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName('CoronavirusNLP').getOrCreate()

In [3]:
data = spark.read.csv('Corona_NLP_train.csv', header = True)

In [55]:
data.show(7)

+--------------------+------------+--------------------+----------+--------------------+---------+------------+
|            UserName|  ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|Tweet_length|
+--------------------+------------+--------------------+----------+--------------------+---------+------------+
|                3799|       48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         111|
|                3800|       48752|                  UK|16-03-2020|advice Talk to yo...| Positive|         237|
|                3801|       48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|         131|
|                3802|       48754|                null|16-03-2020|My food stock is ...|     null|          51|
|              PLEASE| don't panic| THERE WILL BE EN...|      null|                null|     null|        null|
|           Stay calm|  stay safe.|                null|      null|                null|     null|      

In [5]:
data.describe()

DataFrame[summary: string, UserName: string, ScreenName: string, Location: string, TweetAt: string, OriginalTweet: string, Sentiment: string]

In [6]:
data.columns

['UserName', 'ScreenName', 'Location', 'TweetAt', 'OriginalTweet', 'Sentiment']

In [7]:
print((data.count(),len(data.columns)))

(68046, 6)


# Data preparation

In [8]:
from pyspark.sql.functions import length

In [9]:
data = data.withColumn('Tweet_length', length(data['OriginalTweet']))

In [10]:
data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|Tweet_length|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         111|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|         237|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|         131|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|          51|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|     

In [11]:
sentiments= ['Positive','Negative','Neutral','Extremely Positive','Extremely Negative']

In [13]:
df = data.filter(data.Sentiment.isin(sentiments))

In [14]:
df.select('Sentiment').distinct().show()

+------------------+
|         Sentiment|
+------------------+
|Extremely Negative|
|           Neutral|
|          Positive|
|          Negative|
|Extremely Positive|
+------------------+



In [15]:
df.select('Sentiment').distinct().count()

5

In [16]:
df.groupby('Sentiment').count().show()

+------------------+-----+
|         Sentiment|count|
+------------------+-----+
|Extremely Negative| 3751|
|           Neutral| 5224|
|          Positive| 7718|
|          Negative| 6857|
|Extremely Positive| 4412|
+------------------+-----+



In [17]:
df.groupby('Sentiment').mean().show()

+------------------+------------------+
|         Sentiment| avg(Tweet_length)|
+------------------+------------------+
|Extremely Negative| 209.6656891495601|
|           Neutral| 151.2949846860643|
|          Positive|193.66195905675045|
|          Negative| 189.6651596908269|
|Extremely Positive| 215.0605167724388|
+------------------+------------------+



In [18]:
df.groupby('Location').mean().show()

+--------------------+------------------+
|            Location| avg(Tweet_length)|
+--------------------+------------------+
|                 ...|             197.0|
| Mumbai, Maharashtra|154.66666666666666|
| Brisbane, Australia|             207.0|
|West Woofle-Dust ...|             157.0|
|   St Petersburg, FL|169.57142857142858|
| All across Michigan|             224.0|
|     Northumberland |             280.0|
|     stoke on trent |             187.0|
|some where around...|             126.0|
|           Bangalore|176.21052631578948|
|           Norn Iron|             244.0|
|Horsham, Pennsylv...|             189.0|
|       Shimla  India|              89.0|
|Ferrara, Emilia R...|             230.0|
|      Luton, England|             198.0|
|              Heaven|             198.0|
|       St George, UT|             188.0|
|Just to the left ...|             205.0|
|           Worcester|             258.5|
|      Nellore/Canada|             280.0|
+--------------------+------------

In [19]:
df.groupby('Location').count().show()

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|                 ...|    1|
| Mumbai, Maharashtra|    3|
| Brisbane, Australia|    4|
|West Woofle-Dust ...|    1|
|   St Petersburg, FL|    7|
| All across Michigan|    1|
|     Northumberland |    1|
|     stoke on trent |    1|
|some where around...|    1|
|           Bangalore|   19|
|           Norn Iron|    1|
|Horsham, Pennsylv...|    1|
|       Shimla  India|    1|
|Ferrara, Emilia R...|    1|
|      Luton, England|    1|
|              Heaven|    1|
|       St George, UT|    1|
|Just to the left ...|    1|
|           Worcester|    2|
|      Nellore/Canada|    1|
+--------------------+-----+
only showing top 20 rows



In [20]:
df.show()

+--------+----------+--------------------+----------+--------------------+------------------+------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|Tweet_length|
+--------+----------+--------------------+----------+--------------------+------------------+------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|         111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|         237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|         131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|         249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|         184|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|         280|
|    3808|     48760|    BHAVNAGAR,GU

In [21]:
print((df.count(),len(df.columns)))

(27962, 7)


In [22]:
from pyspark.sql.functions import isnan,when,count,col

In [23]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+----------+--------+-------+-------------+---------+------------+
|UserName|ScreenName|Location|TweetAt|OriginalTweet|Sentiment|Tweet_length|
+--------+----------+--------+-------+-------------+---------+------------+
|       0|         0|    6152|      0|            0|        0|           0|
+--------+----------+--------+-------+-------------+---------+------------+



# Feature Transformation

In [25]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, RegexTokenizer

In [26]:
tokenizer = Tokenizer(inputCol = "OriginalTweet", outputCol = "token_text")
stopremove = StopWordsRemover(inputCol="token_text", outputCol = "stop_tokens")
#Cleaned version of Tokens
#Counting Occurnace of tokens
count_vec = CountVectorizer(inputCol = "stop_tokens", outputCol = "c_vec")
idf = IDF(inputCol = "c_vec", outputCol = "tf_idf")

Corona_to_num = StringIndexer(inputCol = "Sentiment", outputCol = "label")

In [27]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [28]:
clean_up = VectorAssembler(inputCols =["tf_idf", "Tweet_length"], outputCol = "features")

# Model

In [30]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier

In [31]:
NB= NaiveBayes()
RF = RandomForestClassifier(numTrees = 50)
DTC = DecisionTreeClassifier (maxDepth = 10)

# pipeline

In [33]:
from pyspark.ml import Pipeline

In [34]:
data_prep_pipeline = Pipeline(stages =[Corona_to_num,tokenizer, stopremove,count_vec,idf,clean_up])

In [35]:
cleaner = data_prep_pipeline.fit(df)

In [36]:
clean_data = cleaner.transform(df)

In [37]:
clean_data.show()

+--------+----------+--------------------+----------+--------------------+------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|Tweet_length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+--------+----------+--------------------+----------+--------------------+------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|         111|  2.0|[@menyrbie, @phil...|[@menyrbie, @phil...|(78305,[14499,289...|(78305,[14499,289...|(78306,[14499,289...|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|         237|  0.0|[advice, talk, t

In [38]:
clean_data = clean_data.select(['label','features'])

In [39]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(78306,[14499,289...|
|  0.0|(78306,[13,14,133...|
|  0.0|(78306,[8,14,37,7...|
|  0.0|(78306,[7,8,31,47...|
|  0.0|(78306,[3,6,18,60...|
|  0.0|(78306,[1,6,8,13,...|
|  1.0|(78306,[11,13,14,...|
|  2.0|(78306,[48,70,147...|
|  3.0|(78306,[13,14,23,...|
|  0.0|(78306,[8,10,23,5...|
|  0.0|(78306,[4,8,24,38...|
|  4.0|(78306,[1,4,9,11,...|
|  1.0|(78306,[4,21,44,7...|
|  3.0|(78306,[10,37,54,...|
|  1.0|(78306,[4,8,24,33...|
|  4.0|(78306,[1,7,11,36...|
|  1.0|(78306,[1,4,7,34,...|
|  2.0|(78306,[5,47,48,6...|
|  0.0|(78306,[8,12,23,2...|
|  1.0|(78306,[6,28,33,9...|
+-----+--------------------+
only showing top 20 rows



# Machine Learning Training

In [41]:
(training,testing)=clean_data.randomSplit([0.8,0.2])

In [42]:
PredictNB = NB.fit(training)

In [43]:
PredictRF= RF.fit(training)

In [44]:
#testing the model

In [45]:
NB_results = PredictNB.transform(testing)

In [46]:
RF_results = PredictRF.transform(testing)

In [47]:
#DTC_results = PredictDTC.transform(testing)

In [48]:
NB_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(78306,[0,1,2,7,4...|[-1272.3084168820...|[4.69853735104021...|       4.0|
|  0.0|(78306,[0,1,2,7,1...|[-1201.6027268959...|[5.55082526493622...|       4.0|
|  0.0|(78306,[0,1,2,12,...|[-1147.6665627889...|[0.99999824007625...|       0.0|
|  0.0|(78306,[0,1,2,15,...|[-1686.3468091500...|[0.99999929069284...|       0.0|
|  0.0|(78306,[0,1,2,16,...|[-1389.9759104929...|[5.82135401080913...|       4.0|
|  0.0|(78306,[0,1,2,17,...|[-2266.4304863658...|[8.97525001185602...|       3.0|
|  0.0|(78306,[0,1,2,28,...|[-2044.3965287278...|[0.99999999999996...|       0.0|
|  0.0|(78306,[0,1,2,29,...|[-1851.1216944312...|[2.72418512189509...|       4.0|
|  0.0|(78306,[0,1,2,46,...|[-527.36864716010...|[9.26276069333105...|       1.0|
|  0.0|(78306,[0

In [49]:
RF_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(78306,[0,1,2,7,4...|[13.6613630145247...|[0.27322726029049...|       0.0|
|  0.0|(78306,[0,1,2,7,1...|[13.6638310330250...|[0.27327662066050...|       0.0|
|  0.0|(78306,[0,1,2,12,...|[14.0945301423600...|[0.28189060284720...|       0.0|
|  0.0|(78306,[0,1,2,15,...|[13.5695430379821...|[0.27139086075964...|       0.0|
|  0.0|(78306,[0,1,2,16,...|[13.0924331553794...|[0.26184866310758...|       1.0|
|  0.0|(78306,[0,1,2,17,...|[13.9214693829901...|[0.27842938765980...|       0.0|
|  0.0|(78306,[0,1,2,28,...|[14.1323023805481...|[0.28264604761096...|       0.0|
|  0.0|(78306,[0,1,2,29,...|[13.5130642983568...|[0.27026128596713...|       0.0|
|  0.0|(78306,[0,1,2,46,...|[13.6966723474252...|[0.27393344694850...|       0.0|
|  0.0|(78306,[0

In [50]:
#DTC_results.show()

In [51]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [52]:
eva = MulticlassClassificationEvaluator()
acc_NB = eva.evaluate(NB_results)

In [53]:
eva = MulticlassClassificationEvaluator()
acc_RF = eva.evaluate(RF_results)

In [54]:
print("accuracy of the NB and RF is ::", acc_NB, acc_RF)

accuracy of the NB and RF is :: 0.40332136698366144 0.12606511583689714
