### HADOOP COMMANDS
### 1. STARTING HADOOP
##### allstart.sh
### 2. COPYING THE CORONA FILE FROM LOCAL TO HADOOP
#### hadoop fs -copyFromLocal Corona_NLP_train.csv
### 3. OPENING JUPYTER NOTEBOOK 
##### pysparknb

## Resham Vyas
## 015028

### Importing Libraries

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('nlp').getOrCreate()

In [3]:
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

In [4]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.feature import CountVectorizer

In [5]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time

### Import Data

In [6]:
df = spark.read.csv("Corona_NLP_train.csv", sep=",", header=True, inferSchema=True)

### Data Exploration

In [7]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [8]:
df.columns

['UserName', 'ScreenName', 'Location', 'TweetAt', 'OriginalTweet', 'Sentiment']

In [9]:
len(df.columns)

6

In [10]:
df.printSchema()

root
 |-- UserName: string (nullable = true)
 |-- ScreenName: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- TweetAt: string (nullable = true)
 |-- OriginalTweet: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [11]:
df.head()

Row(UserName='3799', ScreenName='48751', Location='London', TweetAt='16-03-2020', OriginalTweet='@MeNyrbie @Phil_Gahan @Chrisitv https://t.co/iFz9FAn2Pa and https://t.co/xX6ghGFzCC and https://t.co/I2NlzdxNo8', Sentiment='Neutral')

In [12]:
df.tail(3)

[Row(UserName='44954', ScreenName='89906', Location=None, TweetAt='14-04-2020', OriginalTweet='Is it wrong that the smell of hand sanitizer is starting to turn me on?', Sentiment=None),
 Row(UserName='#coronavirus #COVID19 #coronavirus"', ScreenName='Neutral', Location=None, TweetAt=None, OriginalTweet=None, Sentiment=None),
 Row(UserName='44955', ScreenName='89907', Location='i love you so much || he/him', TweetAt='14-04-2020', OriginalTweet="@TartiiCat Well new/used Rift S are going for $700.00 on Amazon rn although the normal market price is usually $400.00 . Prices are really crazy right now for vr headsets since HL Alex was announced and it's only been worse with COVID-19. Up to you whethe", Sentiment='Negative')]

In [13]:
df.count()

68046

In [14]:
df.describe().show()

+-------+--------------------+-------------------+--------------------+------------+--------------------+--------------------+
|summary|            UserName|         ScreenName|            Location|     TweetAt|       OriginalTweet|           Sentiment|
+-------+--------------------+-------------------+--------------------+------------+--------------------+--------------------+
|  count|               68042|              55629|               34247|       41735|               41383|               28617|
|   mean| 8.975066046523207E7| 150387.47623557984|    1.76800437504E10|        10.0|               682.0|              1016.0|
| stddev|  9.10012357766514E9|1.645384113616883E7|8.839999088543759E10|         NaN|  1176.0629234866644|  1418.4562030602144|
|    min|                 ...|                   |                    |            |      Coronavirus...| "" Well covid-19...|
|    max|Ø  With April 1 d...|         but for us|ï? ???????'? ????...| she says."|Ø  As buyers stoc...| when 

In [15]:
df.select('TweetAt','OriginalTweet','Sentiment').show()

+--------------------+--------------------+---------+
|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+---------+
|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|          16-03-2020|advice Talk to yo...| Positive|
|          16-03-2020|Coronavirus Austr...| Positive|
|          16-03-2020|My food stock is ...|     null|
|                null|                null|     null|
|                null|                null|     null|
|                null|                null|     null|
|          16-03-2020|Me, ready to go a...|     null|
| don't panic. It ...|                null|     null|
|                null|                null|     null|
|          16-03-2020|As news of the re...| Positive|
|          16-03-2020|"Cashier at groce...| Positive|
|          16-03-2020|Was at the superm...|     null|
|                null|                null|     null|
|          16-03-2020|Due to COVID-19 o...| Positive|
|          16-03-2020|For co

In [16]:
df = df.select('TweetAt','OriginalTweet','Sentiment')

In [17]:
df.toPandas()['OriginalTweet'].isnull().sum()

26663

In [18]:
df = df.dropna(subset=('OriginalTweet'))

In [19]:
df.show()

+----------+--------------------+------------------+
|   TweetAt|       OriginalTweet|         Sentiment|
+----------+--------------------+------------------+
|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|
|16-03-2020|advice Talk to yo...|          Positive|
|16-03-2020|Coronavirus Austr...|          Positive|
|16-03-2020|My food stock is ...|              null|
|16-03-2020|Me, ready to go a...|              null|
|16-03-2020|As news of the re...|          Positive|
|16-03-2020|"Cashier at groce...|          Positive|
|16-03-2020|Was at the superm...|              null|
|16-03-2020|Due to COVID-19 o...|          Positive|
|16-03-2020|For corona preven...|          Negative|
|16-03-2020|All month there h...|           Neutral|
|16-03-2020|Due to the Covid-...|              null|
|16-03-2020|#horningsea is a ...|Extremely Positive|
|16-03-2020|Me: I don't need ...|              null|
|16-03-2020|ADARA Releases CO...|          Positive|
|16-03-2020|Lines at the groc...|             

In [20]:
df.toPandas()['Sentiment'].isnull().sum()

12766

In [21]:
df = df.dropna(subset=('Sentiment'))

In [22]:
df.show()

+----------+--------------------+------------------+
|   TweetAt|       OriginalTweet|         Sentiment|
+----------+--------------------+------------------+
|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|
|16-03-2020|advice Talk to yo...|          Positive|
|16-03-2020|Coronavirus Austr...|          Positive|
|16-03-2020|As news of the re...|          Positive|
|16-03-2020|"Cashier at groce...|          Positive|
|16-03-2020|Due to COVID-19 o...|          Positive|
|16-03-2020|For corona preven...|          Negative|
|16-03-2020|All month there h...|           Neutral|
|16-03-2020|#horningsea is a ...|Extremely Positive|
|16-03-2020|ADARA Releases CO...|          Positive|
|16-03-2020|For those who are...|          Positive|
|16-03-2020|with 100  nations...|Extremely Negative|
|16-03-2020|@10DowningStreet ...|          Negative|
|16-03-2020|UK #consumer poll...|Extremely Positive|
|16-03-2020|In preparation fo...|          Negative|
|16-03-2020|This morning I te...|Extremely Neg

In [23]:
import re
from pyspark.sql.functions import regexp_replace

In [24]:
df.withColumn("OriginalTweet", regexp_replace(col("OriginalTweet"), "/[^0-9A-Za-z t]+/" , ""))

DataFrame[TweetAt: string, OriginalTweet: string, Sentiment: string]

### Feature Engineering

In [25]:
import pyspark.ml.feature
from pyspark.ml.feature import IDF

In [26]:
####Initialising the pipeline stages
tokenizer = Tokenizer(inputCol='OriginalTweet' , outputCol='words')
stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words' , outputCol='vector_words')
idf = IDF(inputCol='vector_words' , outputCol='vectorized_features')

In [27]:
####Adding Labels
labelEncoder = StringIndexer(inputCol='Sentiment' , outputCol='label').fit(df)

In [28]:
labelEncoder.transform(df).show(10)

+----------+--------------------+------------------+-----+
|   TweetAt|       OriginalTweet|         Sentiment|label|
+----------+--------------------+------------------+-----+
|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|  2.0|
|16-03-2020|advice Talk to yo...|          Positive|  0.0|
|16-03-2020|Coronavirus Austr...|          Positive|  0.0|
|16-03-2020|As news of the re...|          Positive|  0.0|
|16-03-2020|"Cashier at groce...|          Positive|  0.0|
|16-03-2020|Due to COVID-19 o...|          Positive|  0.0|
|16-03-2020|For corona preven...|          Negative|  1.0|
|16-03-2020|All month there h...|           Neutral|  2.0|
|16-03-2020|#horningsea is a ...|Extremely Positive|  3.0|
|16-03-2020|ADARA Releases CO...|          Positive|  0.0|
+----------+--------------------+------------------+-----+
only showing top 10 rows



In [29]:
df = labelEncoder.transform(df)

In [30]:
df.show()

+----------+--------------------+------------------+-----+
|   TweetAt|       OriginalTweet|         Sentiment|label|
+----------+--------------------+------------------+-----+
|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|  2.0|
|16-03-2020|advice Talk to yo...|          Positive|  0.0|
|16-03-2020|Coronavirus Austr...|          Positive|  0.0|
|16-03-2020|As news of the re...|          Positive|  0.0|
|16-03-2020|"Cashier at groce...|          Positive|  0.0|
|16-03-2020|Due to COVID-19 o...|          Positive|  0.0|
|16-03-2020|For corona preven...|          Negative|  1.0|
|16-03-2020|All month there h...|           Neutral|  2.0|
|16-03-2020|#horningsea is a ...|Extremely Positive|  3.0|
|16-03-2020|ADARA Releases CO...|          Positive|  0.0|
|16-03-2020|For those who are...|          Positive|  0.0|
|16-03-2020|with 100  nations...|Extremely Negative|  4.0|
|16-03-2020|@10DowningStreet ...|          Negative|  1.0|
|16-03-2020|UK #consumer poll...|Extremely Positive|  3.

In [31]:
### Positive = 0.0
### Negative = 1.0
### Neutral  = 2.0
### Extermely Positive = 3.0
### Extremely Negative = 4.0

### Model

In [32]:
train,test = df.randomSplit([0.6,0.4])

In [33]:
lr = LogisticRegression(featuresCol='vectorized_features' , labelCol='label')

### Pipeline

In [34]:
pipeline = Pipeline(stages = [tokenizer, stopwords_remover, vectorizer, idf, lr])

### Building Model

In [35]:
lr_model = pipeline.fit(train)

Py4JJavaError: An error occurred while calling o154.fit.
: java.lang.OutOfMemoryError: Java heap space
	at java.lang.reflect.Array.newInstance(Array.java:75)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1939)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1517)
	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:826)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:226)
	at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$2061/295050171.apply(Unknown Source)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:223)
	at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$2059/645461649.apply(Unknown Source)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:218)
	at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$2058/1363606530.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1343)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:218)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.ml.optim.aggregator.LogisticAggregator.<init>(LogisticAggregator.scala:194)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$13(LogisticRegression.scala:600)
	at org.apache.spark.ml.classification.LogisticRegression$$Lambda$3490/596126736.apply(Unknown Source)
	at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:58)
	at org.apache.spark.ml.optim.loss.RDDLossFunction.calculate(RDDLossFunction.scala:47)
