In [110]:
# Create a Spark session
from pyspark.sql import SparkSession

In [111]:
spark=SparkSession.builder.appName('Problem 1').getOrCreate()

In [112]:
# Load the dataset
df=spark.read.csv('Corona_NLP_train.csv', header=True, inferSchema=True)

In [113]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [114]:
# List of columns
df.columns

['UserName', 'ScreenName', 'Location', 'TweetAt', 'OriginalTweet', 'Sentiment']

In [115]:
# Select columns
df.select('OriginalTweet','Sentiment').show(5)

+--------------------+---------+
|       OriginalTweet|Sentiment|
+--------------------+---------+
|@MeNyrbie @Phil_G...|  Neutral|
|advice Talk to yo...| Positive|
|Coronavirus Austr...| Positive|
|My food stock is ...|     null|
|                null|     null|
+--------------------+---------+
only showing top 5 rows



In [116]:
# Value Counts
df.groupby('Sentiment').count().show()

+--------------------+-----+
|           Sentiment|count|
+--------------------+-----+
|    online education|    1|
| potatoes &amp; v...|    1|
| only a few preli...|    1|
| Vaccines and Tre...|    1|
|              #virus|    1|
| consumer and mor...|    1|
|"" as shoppers ac...|    1|
|      Mumbai or Pune|    1|
|000 tests for COV...|    1|
| spot the scams""...|    1|
| claiming the new...|    1|
| we are confronte...|    1|
| they should add ...|    1|
| as lower oil pri...|    1|
|  closed the borders|    1|
| IFB vice preside...|    1|
|            teachers|    1|
| but we have to a...|    1|
| the company prod...|    1|
| a stock market n...|    1|
+--------------------+-----+
only showing top 20 rows



In [117]:
# Check for Missing values
df.toPandas()['Sentiment'].isnull().sum()

39429

In [118]:
# Drop Missing values
df = df.dropna(subset=('Sentiment'))

In [119]:
# Check for Missing values
df.toPandas()['Sentiment'].isnull().sum()

0

In [120]:
df.show(5)

+--------+----------+--------------------+----------+--------------------+---------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|
+--------+----------+--------------------+----------+--------------------+---------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|
+--------+----------+--------------------+----------+--------------------+---------+
only showing top 5 rows



# Data Preparation

In [121]:
from pyspark.sql.functions import length

In [122]:
df1 = df.withColumn('length',length(df['OriginalTweet']))

In [123]:
df1.show()

+--------+----------+--------------------+----------+--------------------+------------------+------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|length|
+--------+----------+--------------------+----------+--------------------+------------------+------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|   111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|   237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|   131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|   249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|   184|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|   280|
|    3808|     48760|    BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...|          Negativ

# Feature Extraction

In [124]:
# Load the libraries
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [125]:
df1.show(5)

+--------+----------+--------------------+----------+--------------------+---------+------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|length|
+--------+----------+--------------------+----------+--------------------+---------+------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|   111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|   237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|   131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|   249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|   184|
+--------+----------+--------------------+----------+--------------------+---------+------+
only showing top 5 rows



In [126]:
# Stages for the Pipeline
tokenizer = Tokenizer(inputCol='OriginalTweet',outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures', outputCol='vectorizedFeatures')

In [127]:
#Label Encoding
labelencoder = StringIndexer(inputCol='Sentiment',outputCol='label').fit(df1)

In [128]:
labelencoder.transform(df1).show(5)

+--------+----------+--------------------+----------+--------------------+---------+------+-----+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|length|label|
+--------+----------+--------------------+----------+--------------------+---------+------+-----+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|   111|  2.0|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|   237|  0.0|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|   131|  0.0|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|   249|  0.0|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|   184|  0.0|
+--------+----------+--------------------+----------+--------------------+---------+------+-----+
only showing top 5 rows



In [129]:
labelencoder.labels

['Positive',
 'Negative',
 'Neutral',
 'Extremely Positive',
 'Extremely Negative',
 'social distancing',
 ' N. Y. - April 10',
 ' Corona Virus',
 ' Stay with us',
 ' but we also need to change other activities that demand more and more forest land',
 ' delivery',
 ' ecological collapse',
 ' however',
 ' just ""selfish pigs who only think about themselves"".',
 ' not going to the pharmacy',
 ' of course',
 ' or click the links"" #AlwaysWatchingOutForYou',
 ' state governors say they\x92re now bidding against federal agencies and each other for scarce supplies',
 ' supermarket workers',
 ' "" Well covid-19...""  She rolled her eyes and "',
 ' ""1 lakh beds already prepared for COVID-19 testing."" and ""India has enough medicine and food stock""',
 ' ""We\'ll be\x85 https://t.co/e49zUcJV70"',
 ' ""Who needs BlackFriday anyway with steep discounts to cream it when you got COVID?"" #GreedyBastards https://t.co/cz7v14RzzA"',
 ' ""You stay at home for Us"". ',
 ' ""You\'d better stock up!""'

In [130]:
# Dict of Labels
label_dict = {'Positive':0.0,
 'Negative':1.0,
 'Neutral':2.0,
 'Extremely Positive':3.0,
 'Extremely Negative':4.0}

In [131]:
df1.show()

+--------+----------+--------------------+----------+--------------------+------------------+------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|length|
+--------+----------+--------------------+----------+--------------------+------------------+------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|   111|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|   237|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|   131|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|   249|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|   184|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|   280|
|    3808|     48760|    BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...|          Negativ

In [132]:
df1 = labelencoder.transform(df1)

In [133]:
df1.show(5)

+--------+----------+--------------------+----------+--------------------+---------+------+-----+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|length|label|
+--------+----------+--------------------+----------+--------------------+---------+------+-----+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|   111|  2.0|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...| Positive|   237|  0.0|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|   131|  0.0|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...| Positive|   249|  0.0|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...| Positive|   184|  0.0|
+--------+----------+--------------------+----------+--------------------+---------+------+-----+
only showing top 5 rows



In [134]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [135]:
clean_up = VectorAssembler(inputCols=['vectorizedFeatures','length'],outputCol='features')

#  Model 

In [137]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier
nb=NaiveBayes()
rfc=RandomForestClassifier(numTrees=350)
dtc=DecisionTreeClassifier(maxDepth=30)

# Building the Pipeline

In [138]:
from pyspark.ml import Pipeline

In [158]:
Pipeline = Pipeline(stages= [tokenizer,stopwords_remover,vectorizer,idf,labelencoder,clean_up])

In [159]:
pipelineModel = df_Pipeline.fit(df1)

In [160]:
clean_data = pipelineModel.transform(df1)

IllegalArgumentException: requirement failed: Output column label already exists.