In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('NLP').getOrCreate()

# Importing Libraries

In [3]:
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

In [4]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import CountVectorizer

In [5]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time

# Exploring Data

In [6]:
df = spark.read.csv("Corona_NLP_train.csv", header=True, inferSchema=True)

In [7]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [8]:
df.select('TweetAt','OriginalTweet','Sentiment').show()

+--------------------+--------------------+---------+
|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+---------+
|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|          16-03-2020|advice Talk to yo...| Positive|
|          16-03-2020|Coronavirus Austr...| Positive|
|          16-03-2020|My food stock is ...|     null|
|                null|                null|     null|
|                null|                null|     null|
|                null|                null|     null|
|          16-03-2020|Me, ready to go a...|     null|
| don't panic. It ...|                null|     null|
|                null|                null|     null|
|          16-03-2020|As news of the re...| Positive|
|          16-03-2020|"Cashier at groce...| Positive|
|          16-03-2020|Was at the superm...|     null|
|                null|                null|     null|
|          16-03-2020|Due to COVID-19 o...| Positive|
|          16-03-2020|For co

In [9]:
df = df.select('TweetAt','OriginalTweet','Sentiment')

In [10]:
df.toPandas()['OriginalTweet'].isnull().sum()

26663

In [11]:
df = df.dropna(subset=('OriginalTweet'))

In [12]:
df.toPandas()['Sentiment'].isnull().sum()

12766

In [13]:
df = df.dropna(subset=('Sentiment'))

In [14]:
import pyspark.ml.feature
from pyspark.ml.feature import IDF

In [15]:
tokenizer = Tokenizer(inputCol='OriginalTweet' , outputCol='tweet')
stopwords_remover = StopWordsRemover(inputCol='tweet', outputCol='filter_tweet')
vectorizer = CountVectorizer(inputCol='filter_tweet' , outputCol='vector_tweet')
idf = IDF(inputCol='vector_tweet' , outputCol='vector_idf')

In [16]:
labelEncoder = StringIndexer(inputCol='Sentiment' , outputCol='label').fit(df)

In [17]:
labelEncoder.transform(df).show(10)

+----------+--------------------+------------------+-----+
|   TweetAt|       OriginalTweet|         Sentiment|label|
+----------+--------------------+------------------+-----+
|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|  2.0|
|16-03-2020|advice Talk to yo...|          Positive|  0.0|
|16-03-2020|Coronavirus Austr...|          Positive|  0.0|
|16-03-2020|As news of the re...|          Positive|  0.0|
|16-03-2020|"Cashier at groce...|          Positive|  0.0|
|16-03-2020|Due to COVID-19 o...|          Positive|  0.0|
|16-03-2020|For corona preven...|          Negative|  1.0|
|16-03-2020|All month there h...|           Neutral|  2.0|
|16-03-2020|#horningsea is a ...|Extremely Positive|  3.0|
|16-03-2020|ADARA Releases CO...|          Positive|  0.0|
+----------+--------------------+------------------+-----+
only showing top 10 rows



In [18]:
df = labelEncoder.transform(df)

In [19]:
train,test = df.randomSplit([0.6,0.4])

In [20]:
lr = LogisticRegression(featuresCol='vector_features' , labelCol='label')

In [21]:
pipeline = Pipeline(stages = [tokenizer, stopwords_remover, vectorizer, idf, lr])

In [22]:
lr_model = pipeline.fit(train)

IllegalArgumentException: vector_features does not exist. Available: TweetAt, OriginalTweet, Sentiment, label, tweet, filter_tweet, vector_tweet, vector_idf