In [None]:
from pyspark.sql.types import
from pyspark.sql.functions import 

import pyspark
from pyspark.sql import SparkSession 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 

from pyspark.ml.feature import Tokenizer 
from pyspark.ml.feature import CountVectorizer 
from pyspark.ml import Pipeline 
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

conf = pyspark.SparkConf().setAll([('spark.executor.memory', '16g'), ('spark.executor.cores', '1'), ('spark.cores.max', '1'), ('spark.driver.memory','16g')])
sc = SparkContext.getOrCreate(conf = conf) 
sqlContext = SQLContext.getOrCreate(sc) 
spark = SparkSession.builder.master("local[*]").getOrCreate() 

In [None]:
file_path="Corona_NLP_train.csv"

tweets= sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(file_path)
tweets.show()


+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [None]:
tweets.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in tweets.columns]).show() #Check for null values

+--------+----------+--------+-------+-------------+---------+
|UserName|ScreenName|Location|TweetAt|OriginalTweet|Sentiment|
+--------+----------+--------+-------+-------------+---------+
|       4|     12417|   33799|  26311|        26663|    39429|
+--------+----------+--------+-------+-------------+---------+



In [None]:
tweets=tweets.na.drop(how="any")

In [None]:
tweets.show()

+--------+----------+--------------------+----------+--------------------+------------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|
+--------+----------+--------------------+----------+--------------------+------------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|
|    3808|     48760|    BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...|          Negative|
|    3809|     48761|      Makati, Manila|16-03-2020|All mon

In [None]:
# from pyspark.ml.feature import StringIndexer
# indexers = [StringIndexer(inputCol="Sentiment", outputCol="Target").fit(df)]

In [None]:
# pipeline = Pipeline(stages=indexers)
# df_r = pipeline.fit(tweets).transform(tweets)

In [None]:
# df_r.show()

In [None]:
#decode_map = {0: "Neutral", 1: "Positive",1:"Extremely Positive",2:"Extremely Negative",2:"Negative"}
#maping the column with particular data

def decode_sentiment(label):
    if label == "Positive" or label == "Extremely Positive":
        return "Positive"
    elif label == "Negative" or label == "Extremely Negative":
        return "Negative"
    else:
        return "Neutral"

In [None]:
stringNumber = udf(lambda m: decode_sentiment(m))

In [None]:
# tweets.select('Sentiment').distinct().collect()


In [None]:
tweets=tweets.withColumn("target_Sentiment", stringNumber("Sentiment"))


In [None]:
tweets.show(2)

+--------+----------+--------+----------+--------------------+---------+----------------+
|UserName|ScreenName|Location|   TweetAt|       OriginalTweet|Sentiment|target_Sentiment|
+--------+----------+--------+----------+--------------------+---------+----------------+
|    3799|     48751|  London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         Neutral|
|    3800|     48752|      UK|16-03-2020|advice Talk to yo...| Positive|        Positive|
+--------+----------+--------+----------+--------------------+---------+----------------+
only showing top 2 rows



In [None]:
drop_list =["UserName","ScreenName","Location","TweetAt","Sentiment"]
data = tweets.select([column for column in tweets.columns if column not in drop_list])

In [None]:
data.show()

+--------------------+----------------+
|       OriginalTweet|target_Sentiment|
+--------------------+----------------+
|@MeNyrbie @Phil_G...|         Neutral|
|advice Talk to yo...|        Positive|
|Coronavirus Austr...|        Positive|
|As news of the re...|        Positive|
|"Cashier at groce...|        Positive|
|Due to COVID-19 o...|        Positive|
|For corona preven...|        Negative|
|All month there h...|         Neutral|
|#horningsea is a ...|        Positive|
|For those who are...|        Positive|
|with 100  nations...|        Negative|
|@10DowningStreet ...|        Negative|
|UK #consumer poll...|        Positive|
|In preparation fo...|        Negative|
|This morning I te...|        Negative|
|Went to the super...|         Neutral|
|Worried about the...|        Positive|
|Now I can go to t...|        Positive|
|CHECK VIDEO ?? ht...|        Negative|
|Breaking Story: O...|         Neutral|
+--------------------+----------------+
only showing top 20 rows



#There are no null values in the dataset

In [None]:
(train_set, val_set, test_set) = data.randomSplit([0.98, 0.01, 0.01], seed = 2000)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer

tokenizer = Tokenizer(inputCol="OriginalTweet", outputCol="words")

hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 

label_stringIdx = StringIndexer(inputCol = "target_Sentiment", outputCol = "label")

pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

In [None]:
pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)

In [None]:
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+--------------------+----------------+--------------------+--------------------+--------------------+-----+
|       OriginalTweet|target_Sentiment|               words|                  tf|            features|label|
+--------------------+----------------+--------------------+--------------------+--------------------+-----+
|    Police office...|        Positive|[, , , , police, ...|(65536,[1434,1511...|(65536,[1434,1511...|  0.0|
|   I told them th...|        Negative|[, , , i, told, t...|(65536,[1198,5660...|(65536,[1198,5660...|  1.0|
|  A revised rail ...|        Positive|[, , a, revised, ...|(65536,[463,1032,...|(65536,[463,1032,...|  0.0|
|  Add your favori...|        Positive|[, , add, your, f...|(65536,[19208,203...|(65536,[19208,203...|  0.0|
|  COVID 19 UPDATE...|        Positive|[, , covid, 19, u...|(65536,[3856,4629...|(65536,[3856,4629...|  0.0|
+--------------------+----------------+--------------------+--------------------+--------------------+-----+
only showing top 5 

In [None]:
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression(maxIter=100)
model = LR.fit(train_df)
predictions = model.transform(val_df)

In [None]:
import pandas as pd
test_data_sets = {
    'OriginalTweet':[
        "i love to go shopping",
        'I hate the Master Chef US, its streaming this Friday on Fox #masterchef',
        'i love cooking'
    ]
}

test_result = pd.DataFrame(test_data_sets)

test_result = sqlContext.createDataFrame(test_result)

In [None]:
test_result.show()

+--------------------+
|       OriginalTweet|
+--------------------+
|i love to go shop...|
|I hate the Master...|
|      i love cooking|
+--------------------+



In [None]:
def model_predict(test_):
    features = pipelineFit.transform(test_)
    preds = model.transform(features)
    return preds

In [None]:
pred = model_predict(test_result)
pred.select('prediction').show()

+----------+
|prediction|
+----------+
|       0.0|
|       1.0|
|       0.0|
+----------+

