In [1]:
import pyspark
from pyspark.sql import SparkSession 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 

from pyspark.ml import Pipeline 

# SPARK-DEFAULT-CONF VARIABLES SETTING
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '16g'), 
                                   ('spark.executor.cores', '1'), 
                                   ('spark.cores.max', '1'), 
                                   ('spark.driver.memory','16g')])
sc = SparkContext.getOrCreate(conf = conf) #Initialize the spark context
sqlContext = SQLContext.getOrCreate(sc) #Create an SQL Context
spark = SparkSession.builder.master("local[*]").getOrCreate() # Intializing a spark session at local instance

In [47]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load("file:///home/nakul/Downloads/Corona_NLP_train.csv",
                         format="com.databricks.spark.csv",header=True,inferSchema=True)
df.show(5)

+--------+------------+--------------------+----------+--------------------+---------+
|UserName|  ScreenName|            Location|   TweetAt|       OriginalTweet|Sentiment|
+--------+------------+--------------------+----------+--------------------+---------+
|    3799|       48751|              London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|    3800|       48752|                  UK|16-03-2020|advice Talk to yo...| Positive|
|    3801|       48753|           Vagabonds|16-03-2020|Coronavirus Austr...| Positive|
|    3802|       48754|                null|16-03-2020|My food stock is ...|     null|
|  PLEASE| don't panic| THERE WILL BE EN...|      null|                null|     null|
+--------+------------+--------------------+----------+--------------------+---------+
only showing top 5 rows



In [5]:
# CHECKING FOR ALL THE NULL VALUES PRESENT IN THE DATA SEQUENTIALLY
from pyspark.sql.functions import col
for i in df.columns:
    print(i,":",df.where(col(i).isNull()).count())

UserName : 11150
ScreenName : 20019
Location : 35678
TweetAt : 30186
OriginalTweet : 30452
Sentiment : 39745


In [48]:
# DROPPING THE NULL VALUES
df=df.na.drop()
df.count()

16288

In [49]:
df.show()

+--------+----------+--------------------+----------+--------------------+------------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|
+--------+----------+--------------------+----------+--------------------+------------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|
|    3808|     48760|    BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...|          Negative|
|    3809|     48761|      Makati, Manila|16-03-2020|All mon

In [16]:
df.show()
tweetDF1 = df.toPandas()

+--------+----------+--------------------+----------+--------------------+------------------+----------------+
|UserName|ScreenName|            Location|   TweetAt|       OriginalTweet|         Sentiment|target_Sentiment|
+--------+----------+--------------------+----------+--------------------+------------------+----------------+
|    3799|     48751|              London|16-03-2020|@MeNyrbie @Phil_G...|           Neutral|         Neutral|
|    3800|     48752|                  UK|16-03-2020|advice Talk to yo...|          Positive|        Positive|
|    3801|     48753|           Vagabonds|16-03-2020|Coronavirus Austr...|          Positive|        Positive|
|    3804|     48756|ÜT: 36.319708,-82...|16-03-2020|As news of the re...|          Positive|        Positive|
|    3805|     48757|35.926541,-78.753267|16-03-2020|"Cashier at groce...|          Positive|        Positive|
|    3807|     48759|     Atlanta, GA USA|16-03-2020|Due to COVID-19 o...|          Positive|        Positive|
|

In [17]:
# DEALING WITH THE IMPORTANT CATEGORICAL VALUES
clean_val = {"Neutral":"Neutral","Positive":"Positive","Negative":"Negative",
             "Extremely Positive":"Positive","Extremely Negative":"Negative"}
tweetDF1 = tweetDF1.replace(clean_val)

In [18]:
DF=spark.createDataFrame(tweetDF1)

In [19]:
DF=DF.withColumn("target", DF["Sentiment"])

In [20]:
DF.show(2)

+--------+----------+--------+----------+--------------------+---------+----------------+--------+
|UserName|ScreenName|Location|   TweetAt|       OriginalTweet|Sentiment|target_Sentiment|  target|
+--------+----------+--------+----------+--------------------+---------+----------------+--------+
|    3799|     48751|  London|16-03-2020|@MeNyrbie @Phil_G...|  Neutral|         Neutral| Neutral|
|    3800|     48752|      UK|16-03-2020|advice Talk to yo...| Positive|        Positive|Positive|
+--------+----------+--------+----------+--------------------+---------+----------------+--------+
only showing top 2 rows



In [50]:
# DROPPING THE FEATURES THAT ARE NOT TO BE USED
DF=DF.drop(DF.TweetAt)
DF=DF.drop(DF.UserName)
DF=DF.drop(DF.ScreenName)
DF=DF.drop(DF.Sentiment)
DF=DF.drop(DF.Location)
data = DF

In [51]:
data.show()

+--------------------+----------------+--------+
|       OriginalTweet|target_Sentiment|  target|
+--------------------+----------------+--------+
|@MeNyrbie @Phil_G...|         Neutral| Neutral|
|advice Talk to yo...|        Positive|Positive|
|Coronavirus Austr...|        Positive|Positive|
|As news of the re...|        Positive|Positive|
|"Cashier at groce...|        Positive|Positive|
|Due to COVID-19 o...|        Positive|Positive|
|For corona preven...|        Negative|Negative|
|All month there h...|         Neutral| Neutral|
|#horningsea is a ...|        Positive|Positive|
|For those who are...|        Positive|Positive|
|with 100  nations...|        Negative|Negative|
|@10DowningStreet ...|        Negative|Negative|
|UK #consumer poll...|        Positive|Positive|
|In preparation fo...|        Negative|Negative|
|This morning I te...|        Negative|Negative|
|Went to the super...|         Neutral| Neutral|
|Worried about the...|        Positive|Positive|
|Now I can go to t..

In [31]:
# SPILTTING THE DATA INTO 3 SETS : 
# TRAINING SET
# TEST SET
# VALIDATION SET
(train_set, val_set, test_set) = data.randomSplit([0.98, 0.01, 0.01], seed = 2000)

In [52]:
# CREATING A PIPEPLINE OF 4 STAGES TO TRANFORM THE TWEET TEXT
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
# TOKENIZING THE TWEETS
tokenizer = Tokenizer(inputCol="OriginalTweet", outputCol="tokens")

# CALCULATING THE Term Frequency — Inverse Document Frequency FOR THE TWEET TEXT
hashtf = HashingTF(numFeatures=2**16, inputCol="tokens", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# INDEXING THE TARGET SENTIMENT ACCORDING TO THE PROVIDED LABEL
label_stringIdx = StringIndexer(inputCol = "target_Sentiment", outputCol = "label")

# ASSEMBLING THE PIPELINE
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

In [54]:
pipelineFit = pipeline.fit(train_set)
train = pipelineFit.transform(train_set)

In [56]:
#transform model with validataion datasets
val = pipelineFit.transform(val_set)
train.show(5)

+--------------------+----------------+--------------------+--------------------+--------------------+-----+
|       OriginalTweet|target_Sentiment|              tokens|                  tf|            features|label|
+--------------------+----------------+--------------------+--------------------+--------------------+-----+
|    Police office...|        Positive|[, , , , police, ...|(65536,[1434,1511...|(65536,[1434,1511...|  0.0|
|   I told them th...|        Negative|[, , , i, told, t...|(65536,[1198,5660...|(65536,[1198,5660...|  1.0|
|  A revised rail ...|        Positive|[, , a, revised, ...|(65536,[463,1032,...|(65536,[463,1032,...|  0.0|
|  Add your favori...|        Positive|[, , add, your, f...|(65536,[19208,203...|(65536,[19208,203...|  0.0|
|  COVID 19 UPDATE...|        Positive|[, , covid, 19, u...|(65536,[3856,4629...|(65536,[3856,4629...|  0.0|
+--------------------+----------------+--------------------+--------------------+--------------------+-----+
only showing top 5 

In [57]:
# APPLING LOGISTIC REGRESSION ON THE TRAINING SET
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression(maxIter=100)
model = LR.fit(train)
predictions = model.transform(val)

In [59]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.6807
ROC-AUC: 0.7344


In [60]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.7344114219114218