In [1]:
from pyspark.sql import SparkSession #Import the spark session
from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.sql.functions import count, udf, when, isnan, isnull, col
from pyspark.sql.types import IntegerType
from pyspark.ml.linalg import Vector

from pyspark.ml import Pipeline #Build a pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import warnings
warnings.filterwarnings("ignore")

In [2]:
#Creating a spark session
spark= SparkSession.builder.appName('Problem1').getOrCreate()

In [3]:
df=spark.read.csv('Corona_NLP_train.csv', header=True, inferSchema=True)

In [4]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [5]:
#calculate the null values in column
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+----------+--------+-------+-------------+---------+
|UserName|ScreenName|Location|TweetAt|OriginalTweet|Sentiment|
+--------+----------+--------+-------+-------------+---------+
|       4|     12417|   33799|  26311|        26663|    39429|
+--------+----------+--------+-------+-------------+---------+



In [6]:
#drop the rows having null values
df=df.na.drop(how="any")

In [7]:
df_SI = [StringIndexer(inputCol="Sentiment", outputCol="Target").fit(df)]

In [8]:
pipeline = Pipeline(stages=df_SI)
df1 = pipeline.fit(df).transform(df)

In [9]:
df1.show()

+--------+----------+--------------------+----------+--------------------+------------------+------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|Target|
+--------+----------+--------------------+----------+--------------------+------------------+------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|   2.0|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|   0.0|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|   0.0|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|   0.0|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|   0.0|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|   0.0|
|    3808|     48760|    BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...|          Negativ

In [10]:
#decode_map = {0: "Neutral", 1: "Positive",1:"Extremely Positive",2:"Extremely Negative",2:"Negative"}
def decode_sentiment(label):
    if label == "Positive" or label == "Extremely Positive":
        return "Positive"
    elif label == "Negative" or label == "Extremely Negative":
        return "Negative"
    else:
        return "Neutral"

In [11]:
#making udf
stringNumber = udf(lambda m: decode_sentiment(m))

In [12]:
df.select('Sentiment').distinct().collect()

[Row(Sentiment=' online education'),
 Row(Sentiment=' potatoes &amp; vegetables??'),
 Row(Sentiment=' only a few preliminary conclusions can be cautiously advanced. Firstly'),
 Row(Sentiment=' Vaccines and Treatments"": Because #COVID19 has never been seen in humans before'),
 Row(Sentiment=' #virus'),
 Row(Sentiment=' consumer and more. Please RT!'),
 Row(Sentiment=' Mumbai or Pune'),
 Row(Sentiment='000 tests for COVID-19 to 13 states....'),
 Row(Sentiment=' spot the scams"" https://t.co/UvLZ9lOO0v #FTC #scams #coronavirus"'),
 Row(Sentiment=' claiming the news station'),
 Row(Sentiment=' they should add ""moronic attacks"". https://t.co/Efyg77aw6L"'),
 Row(Sentiment=' as lower oil prices and a dip in economic activity amid the #coronavirus pandemic more than offset upward pressure from the lira\x92s depreciation."" @cagankoc\'s @economics report:'),
 Row(Sentiment=' IFB vice president: https://t.co/oe6lKTyjku https://t.co/YW7KJMEHPC"'),
 Row(Sentiment=' teachers'),
 Row(Sentiment=' 

In [13]:
#apply udf on df column
df=df.withColumn("target_Sentiment", stringNumber("Sentiment"))

In [14]:
df.show()

+--------+----------+--------------------+----------+--------------------+------------------+----------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|target_Sentiment|
+--------+----------+--------------------+----------+--------------------+------------------+----------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|         Neutral|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|        Positive|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|        Positive|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|        Positive|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|        Positive|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|        Positive|
|

In [15]:
drop_list =["UserName","ScreenName","Location","TweetAt","Sentiment"]
data = df.select([column for column in df.columns if column not in drop_list])

In [16]:
data.show()

+--------------------+----------------+
|       OriginalTweet|target_Sentiment|
+--------------------+----------------+
|@MeNyrbie @Phil_G...|         Neutral|
|advice Talk to yo...|        Positive|
|Coronavirus Austr...|        Positive|
|As news of the re...|        Positive|
|"Cashier at groce...|        Positive|
|Due to COVID-19 o...|        Positive|
|For corona preven...|        Negative|
|All month there h...|         Neutral|
|#horningsea is a ...|        Positive|
|For those who are...|        Positive|
|with 100  nations...|        Negative|
|@10DowningStreet ...|        Negative|
|UK #consumer poll...|        Positive|
|In preparation fo...|        Negative|
|This morning I te...|        Negative|
|Went to the super...|         Neutral|
|Worried about the...|        Positive|
|Now I can go to t...|        Positive|
|CHECK VIDEO ?? ht...|        Negative|
|Breaking Story: O...|         Neutral|
+--------------------+----------------+
only showing top 20 rows



In [17]:
data.describe().show()

+-------+--------------------+----------------+
|summary|       OriginalTweet|target_Sentiment|
+-------+--------------------+----------------+
|  count|               22358|           22358|
|   mean|                null|            null|
| stddev|                null|            null|
|    min|      Coronavirus...|        Negative|
|    max|Ça se peut?  Why ...|        Positive|
+-------+--------------------+----------------+



In [18]:
data.printSchema()

root
 |-- OriginalTweet: string (nullable = true)
 |-- target_Sentiment: string (nullable = true)



In [19]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-------------+----------------+
|OriginalTweet|target_Sentiment|
+-------------+----------------+
|            0|               0|
+-------------+----------------+



In [20]:
tokenizer=RegexTokenizer(inputCol="OriginalTweet", outputCol="words", pattern="\\W")
stopremove=StopWordsRemover(inputCol="words", outputCol="stop_words")
hashtf = HashingTF(numFeatures=2**16, inputCol="stop_words", outputCol='tf')
idf=IDF(inputCol="tf", outputCol="idf_words")

label_stringId = StringIndexer(inputCol = "target_Sentiment", outputCol = "label")



In [21]:
clean_up = VectorAssembler(inputCols=['idf_words'], outputCol= 'features')

In [22]:
pipeline = Pipeline(stages=[tokenizer, stopremove, hashtf, idf, label_stringId, clean_up])

In [23]:
cleaner = pipeline.fit(data)

In [24]:
clean_data = cleaner.transform(data)

In [25]:
clean_data.show()

+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|       OriginalTweet|target_Sentiment|               words|          stop_words|                  tf|           idf_words|label|            features|
+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+
|@MeNyrbie @Phil_G...|         Neutral|[menyrbie, phil_g...|[menyrbie, phil_g...|(65536,[8455,1542...|(65536,[8455,1542...|  2.0|(65536,[8455,1542...|
|advice Talk to yo...|        Positive|[advice, talk, to...|[advice, talk, ne...|(65536,[3564,3943...|(65536,[3564,3943...|  0.0|(65536,[3564,3943...|
|Coronavirus Austr...|        Positive|[coronavirus, aus...|[coronavirus, aus...|(65536,[2284,4482...|(65536,[2284,4482...|  0.0|(65536,[2284,4482...|
|As news of the re...|        Positive|[as, news, of, th...|[news, region, fi...|(65536,[308,3

In [26]:
clean_data=clean_data.select(['label','features'])

In [27]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|(65536,[8455,1542...|
|  0.0|(65536,[3564,3943...|
|  0.0|(65536,[2284,4482...|
|  0.0|(65536,[308,338,3...|
|  0.0|(65536,[3436,4447...|
|  0.0|(65536,[6122,1139...|
|  1.0|(65536,[298,721,1...|
|  2.0|(65536,[1434,4739...|
|  0.0|(65536,[7713,8615...|
|  0.0|(65536,[2544,1139...|
|  1.0|(65536,[5854,7032...|
|  1.0|(65536,[4260,6601...|
|  0.0|(65536,[1546,2171...|
|  1.0|(65536,[11395,131...|
|  1.0|(65536,[600,1483,...|
|  2.0|(65536,[298,1434,...|
|  0.0|(65536,[3639,3861...|
|  0.0|(65536,[1434,1165...|
|  1.0|(65536,[6397,7150...|
|  2.0|(65536,[7350,1003...|
+-----+--------------------+
only showing top 20 rows



# Model

In [28]:
nb=NaiveBayes()

# Machine Learning

In [29]:
(train_set, test_set) = clean_data.randomSplit([0.7, 0.3])

In [30]:
df_predictor=nb.fit(train_set)

In [31]:
test_results=df_predictor.transform(test_set)

In [32]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(65536,[7,251,263...|[-1655.2816646440...|[7.77668790059235...|       1.0|
|  0.0|(65536,[12,106,14...|[-1725.0451859823...|[1.0,2.3819166212...|       0.0|
|  0.0|(65536,[12,205,28...|[-1724.8047957561...|[0.98981871406833...|       0.0|
|  0.0|(65536,[12,649,71...|[-1496.0394748847...|[5.56224471339631...|       1.0|
|  0.0|(65536,[12,1487,2...|[-947.87710181834...|[0.99999999999461...|       0.0|
|  0.0|(65536,[12,2803,8...|[-1050.0668379266...|[4.16276904719195...|       1.0|
|  0.0|(65536,[12,3775,9...|[-881.21097101835...|[5.08921340612163...|       1.0|
|  0.0|(65536,[12,3861,6...|[-1123.6142014100...|[1.0,2.2138875717...|       0.0|
|  0.0|(65536,[12,5914,7...|[-1070.2853989254...|[1.0,6.2406339692...|       0.0|
|  0.0|(65536,[1

In [33]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)

In [34]:
print("Accuracy of the model is::", acc)

Accuracy of the model is:: 0.59322863821896


# Logistic Regression

In [35]:
LR = LogisticRegression(maxIter=100)
model = LR.fit(train_set)
predictions = model.transform(test_set)

In [36]:
predictions.show(40)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(65536,[7,251,263...|[7.25863032460698...|[0.99990707794316...|       0.0|
|  0.0|(65536,[12,106,14...|[86.2276981431049...|[1.0,3.5471242557...|       0.0|
|  0.0|(65536,[12,205,28...|[51.6063404533206...|[0.20098844762330...|       1.0|
|  0.0|(65536,[12,649,71...|[13.8221454308885...|[2.34513161945049...|       1.0|
|  0.0|(65536,[12,1487,2...|[35.6737128258868...|[0.99999791730854...|       0.0|
|  0.0|(65536,[12,2803,8...|[2.22810001191648...|[6.41157667332719...|       1.0|
|  0.0|(65536,[12,3775,9...|[27.4913712612726...|[1.91043470917600...|       1.0|
|  0.0|(65536,[12,3861,6...|[85.4689953257695...|[1.0,7.2104401005...|       0.0|
|  0.0|(65536,[12,5914,7...|[70.4534159596876...|[1.0,9.1918087244...|       0.0|
|  0.0|(65536,[1

In [37]:
evaluator = BinaryClassificationEvaluator()
acc_LR=evaluator.evaluate(predictions)

In [38]:
print("Accuracy of the model is::", acc_LR)

Accuracy of the model is:: 0.7204163147465437


In [None]:
# Problem 1 completed