# Big Data and Data Analytics for Managers 

## Merlin John - 015027

Perform Text Classification on Coronavirus tweets using Py spark


Columns:

1) Location
2) Tweet At
3) Original Tweet
4) Label

In [76]:
from pyspark.sql import SparkSession

In [77]:
spark=SparkSession.builder.appName('nlpML').getOrCreate()

In [78]:
data=spark.read.csv('Corona_NLP_train.csv', header= True, inferSchema=True)

In [79]:
data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|
|           Stay calm|          stay safe.|                null|

In [None]:
data.printSchema()

In [80]:
data.describe().show()

+-------+--------------------+-------------------+--------------------+------------+--------------------+--------------------+
|summary|            UserName|         ScreenName|            Location|     TweetAt|       OriginalTweet|           Sentiment|
+-------+--------------------+-------------------+--------------------+------------+--------------------+--------------------+
|  count|               68042|              55629|               34247|       41735|               41383|               28617|
|   mean| 8.975066046523207E7| 150387.47623557984|    1.76800437504E10|        10.0|               682.0|              1016.0|
| stddev|  9.10012357766514E9|1.645384113616883E7|8.839999088543759E10|         NaN|  1176.0629234866644|  1418.4562030602144|
|    min|                 ...|                   |                    |            |      Coronavirus...| "" Well covid-19...|
|    max|Ø  With April 1 d...|         but for us|ï? ???????'? ????...| she says."|Ø  As buyers stoc...| when 

# Data Prepration

In [81]:
result_df = data.groupBy("Location").count().sort("Location", ascending=False)
result_df.show(10)

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|ï? ???????'? ????...|    1|
|í ?í?? í? ? ?????...|    1|
|à l'échelle mondiale|    1|
|ÜT: 59.19408,17.6...|    1|
|ÜT: 54.975455,-1....|    1|
|ÜT: 53.839856,-0....|    1|
|ÜT: 51.56353,-0.0...|    2|
|ÜT: 51.560275,-0....|    1|
|ÜT: 51.512407,-0....|    1|
|ÜT: 51.4761159,-2...|    1|
+--------------------+-----+
only showing top 10 rows



In [82]:
from pyspark.sql.functions import length

In [83]:
data=data.withColumn('length', length(data['OriginalTweet']))

In [84]:
data.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+
|            UserName|          ScreenName|            Location|             TweetAt|       OriginalTweet|Sentiment|length|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+
|                3799|               48751|              London|          16-03-2020|@MeNyrbie @Phil_G...|  Neutral|   111|
|                3800|               48752|                  UK|          16-03-2020|advice Talk to yo...| Positive|   237|
|                3801|               48753|           Vagabonds|          16-03-2020|Coronavirus Austr...| Positive|   131|
|                3802|               48754|                null|          16-03-2020|My food stock is ...|     null|    51|
|              PLEASE|         don't panic| THERE WILL BE EN...|                null|                null|     null|  null|
|       

In [85]:
data.filter(data["Sentiment"]=="Negative").show(10)

+--------+----------+------------------+----------+--------------------+---------+------+
|UserName|ScreenName|          Location|   TweetAt|       OriginalTweet|Sentiment|length|
+--------+----------+------------------+----------+--------------------+---------+------+
|    3808|     48760|  BHAVNAGAR,GUJRAT|16-03-2020|For corona preven...| Negative|   267|
|    3823|     48775|  Downstage centre|16-03-2020|@10DowningStreet ...| Negative|   255|
|    3825|     48777|    Ketchum, Idaho|16-03-2020|In preparation fo...| Negative|   202|
|    3829|     48781|              null|16-03-2020|There Is of in th...| Negative|   114|
|    3837|     48789|              null|16-03-2020|my wife works ret...| Negative|   288|
|    3851|     48803|         Ogden, UT|16-03-2020|Why we stock up o...| Negative|   284|
|    3865|     48817|The European Union|16-03-2020|Seen in a Faceboo...| Negative|   259|
|    3866|     48818|   London, England|16-03-2020|@BobJLowe Sadly t...| Negative|   173|
|    3876|

In [86]:
df=data.drop("UserName","ScreenName","Location","TweetAt")

In [87]:
df.show(10)

+--------------------+---------+------+
|       OriginalTweet|Sentiment|length|
+--------------------+---------+------+
|@MeNyrbie @Phil_G...|  Neutral|   111|
|advice Talk to yo...| Positive|   237|
|Coronavirus Austr...| Positive|   131|
|My food stock is ...|     null|    51|
|                null|     null|  null|
|                null|     null|  null|
|                null|     null|  null|
|Me, ready to go a...|     null|    60|
|                null|     null|  null|
|                null|     null|  null|
+--------------------+---------+------+
only showing top 10 rows



In [88]:
from pyspark.sql.types import StructType, StructField,StringType, IntegerType,FloatType
df.na.drop().show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+------+
|OriginalTweet                                                                                                                                                                                                                                                                                   |Sentiment         |length|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+------+
|@MeNyrbie @Phil_Gahan @Chrisitv https://t.co/iFz

# Features Transformation

In [89]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

tokenizer=Tokenizer(inputCol="OriginalTweet", outputCol="token_text")
stopremove=StopWordsRemover(inputCol="token_text", outputCol="stop_tokens")
count_vec=CountVectorizer(inputCol="stop_tokens", outputCol="c_vec")
idf=IDF(inputCol="c_vec", outputCol="tf_idf")

# we also need to convert our labels in numbers
ham_samp_to_num = StringIndexer(inputCol="Sentiment", outputCol='label')


In [90]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

In [91]:
clean_up = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

# Model

In [92]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier

dtc=DecisionTreeClassifier(maxDepth=15)
rf=RandomForestClassifier(numTrees=200)
nb=NaiveBayes()

# Pipeline

In [93]:
from pyspark.ml import Pipeline
data_prep_pipeline= Pipeline(stages=[ham_samp_to_num, tokenizer, stopremove,count_vec, idf,clean_up])

In [94]:
cleaner=data_prep_pipeline.fit(df)

Py4JJavaError: An error occurred while calling o504.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 89.0 failed 1 times, most recent failure: Lost task 2.0 in stage 89.0 (TID 1743, master, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(Tokenizer$$Lambda$3170/1617889843: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(Tokenizer$$Lambda$3170/1617889843: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 19 more


In [97]:
clean_data=cleaner.transform(df)

NameError: name 'cleaner' is not defined

In [98]:
clean_data.show()

NameError: name 'clean_data' is not defined

In [28]:
clean_data=clean_data.select(['label', 'features'])

In [29]:
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
|  0.0|(13424,[0,70,80,1...|
|  0.0|(13424,[36,134,31...|
|  1.0|(13424,[10,60,139...|
|  0.0|(13424,[10,53,103...|
|  0.0|(13424,[125,184,4...|
|  1.0|(13424,[1,47,118,...|
|  1.0|(13424,[0,1,13,27...|
|  0.0|(13424,[18,43,120...|
|  1.0|(13424,[8,17,37,8...|
|  1.0|(13424,[13,30,47,...|
|  0.0|(13424,[39,96,217...|
|  0.0|(13424,[552,1697,...|
|  1.0|(13424,[30,109,11...|
|  0.0|(13424,[82,214,47...|
|  0.0|(13424,[0,2,49,13...|
|  0.0|(13424,[0,74,105,...|
|  1.0|(13424,[4,30,33,5...|
+-----+--------------------+
only showing top 20 rows



# ML Training

In [30]:
(training, testing)=clean_data.randomSplit([0.8,0.2])

### Decision Tree Classifier

In [63]:
spam_predictor=dtc.fit(training)

In [64]:
test_results=spam_predictor.transform(testing)

In [65]:
test_results.show()

+-----+--------------------+-------------+--------------------+----------+
|label|            features|rawPrediction|         probability|prediction|
+-----+--------------------+-------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|    [0.0,2.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,5,15,...|    [6.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,5,20,...|    [0.0,1.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,9,14,...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,14,18...|    [4.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,20,27...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,30,12...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,31,43...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,416,6...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,3657,...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,2,3,4,6.

In [66]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [67]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)

In [68]:
print ("Accuracy of the model is::", acc)

Accuracy of the model is:: 0.9501735633322752


### Random Forest Classifier

In [63]:
spam_predictor=rf.fit(training)

In [64]:
test_results=spam_predictor.transform(testing)

In [65]:
test_results.show()

+-----+--------------------+-------------+--------------------+----------+
|label|            features|rawPrediction|         probability|prediction|
+-----+--------------------+-------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|    [0.0,2.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,5,15,...|    [6.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,5,20,...|    [0.0,1.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,9,14,...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,14,18...|    [4.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,20,27...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,30,12...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,31,43...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,416,6...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,3657,...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,2,3,4,6.

In [67]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)

In [68]:
print ("Accuracy of the model is::", acc)

Accuracy of the model is:: 0.9501735633322752


### Naive Bayes

In [None]:
spam_predictor=nb.fit(training)

In [64]:
test_results=spam_predictor.transform(testing)

In [65]:
test_results.show()

+-----+--------------------+-------------+--------------------+----------+
|label|            features|rawPrediction|         probability|prediction|
+-----+--------------------+-------------+--------------------+----------+
|  0.0|(13424,[0,1,2,41,...|    [0.0,2.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,5,15,...|    [6.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,5,20,...|    [0.0,1.0]|           [0.0,1.0]|       1.0|
|  0.0|(13424,[0,1,9,14,...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,14,18...|    [4.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,20,27...|    [8.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,30,12...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,31,43...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,416,6...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,1,3657,...|  [106.0,0.0]|           [1.0,0.0]|       0.0|
|  0.0|(13424,[0,2,3,4,6.

In [67]:
acc_eval=MulticlassClassificationEvaluator()
acc=acc_eval.evaluate(test_results)

In [68]:
print ("Accuracy of the model is::", acc)

Accuracy of the model is:: 0.9501735633322752
