In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [2]:
spark = SparkSession.builder.appName('coronatweets').getOrCreate() 

In [3]:
#Reading the dataset 
df=spark.read.csv('Corona_NLP_train.csv', header = True, inferSchema=True,sep= ',')

In [4]:
#viewing the dataset
df.show()

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|            UserName|          ScreenName|            Location|           Sentiment|   TweetAt|       OriginalTweet|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|                3799|               48751|              London|             Neutral|16-03-2020|@MeNyrbie @Phil_G...|
|                3800|               48752|                  UK|            Positive|16-03-2020|advice Talk to yo...|
|                3801|               48753|           Vagabonds|            Positive|16-03-2020|Coronavirus Austr...|
|                3802|               48754|                null|            Positive|16-03-2020|My food stock is ...|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|      null|                null|
|           Stay calm|          stay safe.|             

In [5]:
df.printSchema()

root
 |-- UserName: string (nullable = true)
 |-- ScreenName: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- TweetAt: string (nullable = true)
 |-- OriginalTweet: string (nullable = true)



In [6]:
#getting to know the columns
df.columns

['UserName', 'ScreenName', 'Location', 'Sentiment', 'TweetAt', 'OriginalTweet']

In [7]:
#dropping the duplicate values
df = df.dropDuplicates()
print(df.count())

65074


In [8]:
#dropping null values
df = df.na.drop()
print(df.count())

32621


In [9]:
#defining sentiments of the dataset
sentiments = ['Positive','Negative','Neutral','Extremely Positive','Extremely Negative']

In [10]:
#filtering out data in the dataset
df = df.filter(df.Sentiment.isin(sentiments))

In [11]:
#counting different sentiments
df.select('Sentiment').distinct().count()

5

In [12]:
#showing different sentiments
df.select('Sentiment').distinct().show()

+------------------+
|         Sentiment|
+------------------+
|Extremely Negative|
|           Neutral|
|          Positive|
|          Negative|
|Extremely Positive|
+------------------+



In [14]:
df=df.withColumn('length', length(df['OriginalTweet']))

In [15]:
df.show()

+--------+----------+--------------------+------------------+----------+--------------------+------+
|UserName|ScreenName|            Location|         Sentiment|   TweetAt|       OriginalTweet|length|
+--------+----------+--------------------+------------------+----------+--------------------+------+
|    3926|     48878| ????? ???? ????????|          Negative|16-03-2020|#unpopularopinion...|   175|
|    4155|     49107|      Owensboro, KY |           Neutral|16-03-2020|Just online shopp...|    80|
|    4247|     49199|            New York|          Positive|16-03-2020|I know a lot of g...|   269|
|    4949|     49901|         Houston, TX|          Positive|17-03-2020|Our latest issue ...|   164|
|    5065|     50017|  Manchester, Europe|Extremely Positive|17-03-2020|If you are health...|   202|
|    5322|     50274|      Leeds, England|          Positive|17-03-2020|#COVID2019 local ...|   191|
|    5766|     50718|          upstate NY|          Negative|17-03-2020|Seeing those empt..

In [17]:
#calculating the mean length of different sentiments
df.groupby('Sentiment').mean().show()

+------------------+------------------+
|         Sentiment|       avg(length)|
+------------------+------------------+
|Extremely Negative|179.08476571697668|
|           Neutral|134.06076810889644|
|          Positive| 167.5731693929081|
|          Negative|165.74478227261014|
|Extremely Positive|183.49146433990896|
+------------------+------------------+



In [18]:
#Implementing tokenizer

tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="token_text")
stopremove=StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf=IDF(inputCol="c_vec", outputCol="tf_idf")
label_to_num = StringIndexer(inputCol="sentiment", outputCol='label')

In [20]:
cleaned = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

In [21]:
#Building the model
from pyspark.ml.classification import RandomForestClassifier
rf=RandomForestClassifier()

In [22]:
#Building pipeline
from pyspark.ml import Pipeline
df_prep_pipeline= Pipeline(stages=[label_to_num, tokenizer, stopremove,count_vec, idf,cleaned])

In [23]:
#fitting the model on the data
cleaned_df= df_prep_pipeline.fit(df)

In [24]:
#transforming the data
cleaned_df=cleaned_df.transform(df)

In [25]:
#viewing the cleaned data 
cleaned_df.show()

+--------+----------+--------------------+------------------+----------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|UserName|ScreenName|            Location|         sentiment|   TweetAt|       OriginalTweet|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+--------+----------+--------------------+------------------+----------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3926|     48878| ????? ???? ????????|          Negative|16-03-2020|#unpopularopinion...|   175|  1.0|[#unpopularopinio...|[#unpopularopinio...|(80619,[5,56,60,8...|(80619,[5,56,60,8...|(80620,[5,56,60,8...|
|    4155|     49107|      Owensboro, KY |           Neutral|16-03-2020|Just online shopp...|    80|  2.0|[just, online, sh...|[online, shopping...|(806

In [26]:
cleaned_df=cleaned_df.select(['label', 'features'])

In [27]:
#viewing columns
cleaned_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(80620,[5,56,60,8...|
|  2.0|(80620,[6,13,14,8...|
|  0.0|(80620,[0,3,7,17,...|
|  0.0|(80620,[2,10,15,3...|
|  3.0|(80620,[0,5,6,17,...|
|  0.0|(80620,[5,16,19,4...|
|  1.0|(80620,[3,7,16,56...|
|  0.0|(80620,[38,45,116...|
|  2.0|(80620,[9,59,91,2...|
|  1.0|(80620,[3,7,45,56...|
|  1.0|(80620,[0,7,47,59...|
|  2.0|(80620,[0,3,8,12,...|
|  4.0|(80620,[0,3,7,17,...|
|  0.0|(80620,[0,13,14,1...|
|  2.0|(80620,[27806,506...|
|  3.0|(80620,[1,6,11,36...|
|  2.0|(80620,[6,665,118...|
|  3.0|(80620,[4,6,51,54...|
|  2.0|(80620,[3,7,8,31,...|
|  2.0|(80620,[0,4,21,36...|
+-----+--------------------+
only showing top 20 rows



In [28]:
#Training the dataset
#Spilting the data into train and test
(training, testing)=cleaned_df.randomSplit([0.7,0.3])

In [40]:
#Applying Random Forest
#fitting the model on the dataset
spam_predictor_rf=rf.fit(training)

In [41]:
#transforming the model
test_results_rf=spam_predictor_rf.transform(testing)

In [42]:
#viewing the results of the test data
test_results_rf.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(80620,[0,1,3,7,6...|[5.60104606499630...|[0.28005230324981...|       0.0|
|  0.0|(80620,[0,2,12,70...|[5.53855401644105...|[0.27692770082205...|       0.0|
|  0.0|(80620,[0,3,12,21...|[5.69880669421629...|[0.28494033471081...|       0.0|
|  0.0|(80620,[0,5,8,23,...|[5.56385121263016...|[0.27819256063150...|       0.0|
|  0.0|(80620,[0,9,12,10...|[5.36301256333227...|[0.26815062816661...|       0.0|
|  0.0|(80620,[0,9,12,12...|[5.55481383646610...|[0.27774069182330...|       0.0|
|  0.0|(80620,[0,12,24,3...|[5.6518393691567,...|[0.28259196845783...|       0.0|
|  0.0|(80620,[0,13,14,1...|[5.55481383646610...|[0.27774069182330...|       0.0|
|  0.0|(80620,[0,13,46,8...|[5.46099572704180...|[0.27304978635209...|       0.0|
|  0.0|(80620,[0

In [43]:
#evaluating the accuracy of the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
 metricName="accuracy")
accuracy_rf = evaluator.evaluate(test_results_rf)
print("Test set accuracy = " + str(accuracy_rf))

Test set accuracy = 0.28523215381468603
