data: https://www.kaggle.com/pdunton/marvel-cinematic-universe-dialogue?select=mcu_subset.csv
data NRC : https://www.kaggle.com/andradaolteanu/bing-nrc-afinn-lexicons?select=NRC.csv

In [45]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, RegexTokenizer, PCA
from pyspark.mllib.regression import LabeledPoint
from IPython.display import Image
from pyspark.sql import SparkSession
import IPython
from pyspark.mllib.evaluation import MulticlassMetrics


# Collecting The Infinity Stones

### AKA Cleaning the dataset

![display image](https://media.giphy.com/media/3oxHQjRHcp4w9oi24M/giphy.gif)

In [26]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [27]:
spark = init_spark()
#read_csv = spark.read.csv('data/tweets.csv', inferSchema=True, header=True)
read_csv = spark.read.csv('data/Reddit_Data_utf8.csv', inferSchema=True, header=True)

In [28]:
#data = read_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data = read_csv.select("clean_comment", col("category").cast("Int").alias("label")).dropna().dropDuplicates().replace(-1,2)
data.show(10)

+--------------------+-----+
|       clean_comment|label|
+--------------------+-----+
|surprised modis i...|    1|
|     naga downs ing |    0|
| has been decided...|    0|
|how difficult was...|    1|
|yes now are going...|    0|
|every question yo...|    0|
|  the plot thickens |    0|
|this compelling y...|    2|
|video showing mh3...|    0|
|brace yourself th...|    0|
+--------------------+-----+
only showing top 10 rows



In [29]:
split = data.randomSplit([0.7, 0.3])
trainingData = split[0]
testingData = split[1]
print ("Training data has", split[0].count(), 'rows.')
print ("Testing data has", split[1].count(), 'rows.')

Training data has 25668 rows.
Testing data has 10871 rows.


## Cleaning The Data (Tokenizing and Stop Word Removing)

In [30]:
#inputCol = "SentimentText"
inputCol = "clean_comment"

tokenizer = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol=inputCol, outputCol="Tokens")
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="NoStopWords")

token_train = tokenizer.transform(trainingData)
nosw_train = swr.transform(token_train)

token_test = tokenizer.transform(testingData)
nosw_test = swr.transform(token_test)

nosw_train.show(truncate=True, n=10)
nosw_test.show(truncate=True, n=10)

+--------------------+-----+--------------------+--------------------+
|       clean_comment|label|              Tokens|         NoStopWords|
+--------------------+-----+--------------------+--------------------+
| all the signs em...|    1|[all, the, signs,...|[signs, emergency...|
| assholery has fa...|    0|[assholery, has, ...|[assholery, face,...|
| because fucking ...|    0|[because, fucking...|[fucking, white, ...|
| bjp will proacti...|    2|[bjp, will, proac...|[bjp, proactive, ...|
| breaking news ma...|    0|[breaking, news, ...|[breaking, news, ...|
| couldn see havin...|    1|[couldn, see, hav...|[couldn, see, pos...|
| doing decent job...|    1|[doing, decent, j...|[decent, job, com...|
| don know ethical...|    1|[don, know, ethic...|[know, ethical, f...|
| explained this a...|    2|[explained, this,...|[explained, anecd...|
| feel like part r...|    0|[feel, like, part...|[feel, like, part...|
+--------------------+-----+--------------------+--------------------+
only s

## Hashing The Features using HashingTF

In [31]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
hash_train = hashTF.transform(nosw_train).select(
    'label', 'Tokens', 'features')

hash_test = hashTF.transform(nosw_test).select(
    'Label', 'Tokens', 'features')
hash_train.show(n=5)
hash_test.show(n=5)

+-----+--------------------+--------------------+
|label|              Tokens|            features|
+-----+--------------------+--------------------+
|    1|[all, the, signs,...|(262144,[2306,256...|
|    0|[assholery, has, ...|(262144,[13644,20...|
|    0|[because, fucking...|(262144,[41748,43...|
|    2|[bjp, will, proac...|(262144,[1141,670...|
|    0|[breaking, news, ...|(262144,[18981,27...|
+-----+--------------------+--------------------+
only showing top 5 rows

+-----+--------------------+--------------------+
|Label|              Tokens|            features|
+-----+--------------------+--------------------+
|    0|[5ppr, for, flex,...|(262144,[5972,147...|
|    0|[are, you, saying...|(262144,[160395,1...|
|    0|[back, vent, here...|(262144,[30297,13...|
|    1|[big, political, ...|(262144,[3657,334...|
|    2|[bunch, aap, supp...|(262144,[13468,33...|
+-----+--------------------+--------------------+
only showing top 5 rows



# Training 

In [32]:
mlor = (LogisticRegression()
       .setFamily("multinomial") )

In [33]:
model= mlor.fit(hash_train)

In [68]:
prediction = model.transform(hash_test)
prediction.show(100)

+-----+---------------------+--------------------+--------------------+--------------------+----------+
|Label|               Tokens|            features|       rawPrediction|         probability|prediction|
+-----+---------------------+--------------------+--------------------+--------------------+----------+
|    0| [5ppr, for, flex,...|(262144,[5972,147...|[41.1129939291239...|[1.0,4.9176315886...|       0.0|
|    0| [are, you, saying...|(262144,[160395,1...|[5.08306291156648...|[0.36750222290442...|       1.0|
|    0| [back, vent, here...|(262144,[30297,13...|[7.78969378741204...|[0.99979152794506...|       0.0|
|    1| [big, political, ...|(262144,[3657,334...|[2.30518583959471...|[4.53408415891803...|       1.0|
|    2| [bunch, aap, supp...|(262144,[13468,33...|[-2.6346333542633...|[3.09661797338132...|       1.0|
|    2| [don, see, how, t...|(262144,[8538,942...|[15.7602014236787...|[0.99999999807920...|       0.0|
|    2| [feel, bad, for, ...|(262144,[22370,61...|[-7.4183882970

# Accuracy model

In [69]:
predictionFinal = prediction.select(
    "Tokens", "prediction", "Label")
predictionFinal.show(n=100)

+---------------------+----------+-----+
|               Tokens|prediction|Label|
+---------------------+----------+-----+
| [5ppr, for, flex,...|       0.0|    0|
| [are, you, saying...|       1.0|    0|
| [back, vent, here...|       0.0|    0|
| [big, political, ...|       1.0|    1|
| [bunch, aap, supp...|       1.0|    2|
| [don, see, how, t...|       0.0|    2|
| [feel, bad, for, ...|       1.0|    2|
| [hai, aam, modi, ...|       0.0|    2|
| [have, similar, c...|       2.0|    0|
| [need, bjp, mukt,...|       1.0|    0|
| [never, visited, ...|       0.0|    0|
| [press, conferenc...|       1.0|    0|
| [should, finally,...|       0.0|    0|
| [teacher, back, t...|       0.0|    0|
| [that, what, the,...|       1.0|    1|
| [this, doesn, mak...|       1.0|    1|
| [true, then, bad,...|       1.0|    2|
| [unpopular, opini...|       2.0|    2|
| [was, museum, tod...|       2.0|    2|
| [will, make, sure...|       2.0|    1|
| [wouldn, surprise...|       1.0|    1|
| [aib, mainstre

In [70]:
match = predictionFinal.filter(predictionFinal['prediction'] == predictionFinal['label']).count()
total = predictionFinal.count()
print("Accuracy:", match/total)

Accuracy: 0.6787784012510348


In [75]:
temp = predictionFinal.drop('Tokens')

# predictionAndLabels = temp.rdd.map(lambda lp: (float(prediciton), lp.label))
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

In [76]:
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 830.0 failed 1 times, most recent failure: Lost task 0.0 in stage 830.0 (TID 29574, DESKTOP-CTIMH20, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\Ben\anaconda3\lib\site-packages\pyspark\rdd.py", line 1440, in takeUpToNumLeft
    yield next(iterator)
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-75-9f8004798c20>", line 3, in <lambda>
NameError: name 'prediciton' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:154)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\Ben\anaconda3\lib\site-packages\pyspark\rdd.py", line 1440, in takeUpToNumLeft
    yield next(iterator)
  File "C:\Users\Ben\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-75-9f8004798c20>", line 3, in <lambda>
NameError: name 'prediciton' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:154)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Avengers Assemble

![display image](https://media.giphy.com/media/j2pWZpr5RlpCodOB0d/giphy.gif)

In [37]:
mcu_csv = spark.read.csv('data/mcu_subset.csv', inferSchema=True, header=True)
print("Lines of Dialogue:", mcu_csv.count())

Lines of Dialogue: 6509


In [38]:
data = mcu_csv.select("character","line")
data.show(n=10)

+------------+--------------------+
|   character|                line|
+------------+--------------------+
|  TONY STARK|Oh, I get it.  Yo...|
|  TONY STARK|Oh.  I see.  So i...|
|  TONY STARK|Good God, you’re ...|
|  TONY STARK|             Please.|
|  TONY STARK|Excellent questio...|
|  TONY STARK|      Join the club.|
|  TONY STARK|Are you aware tha...|
|JAMES RHODES|GET DOWN, TONY.  ...|
|JAMES RHODES|As Program Manage...|
|  TONY STARK|...you think we’r...|
+------------+--------------------+
only showing top 10 rows



In [39]:
t = Tokenizer(inputCol="line", outputCol="new_line")
swr_MCU = StopWordsRemover(inputCol=t.getOutputCol(), 
                       outputCol="new")
token_MCU = t.transform(data)
nosw_MCU = swr_MCU.transform(token_MCU)

nosw_MCU.show(n=10)

+------------+--------------------+--------------------+--------------------+
|   character|                line|            new_line|                 new|
+------------+--------------------+--------------------+--------------------+
|  TONY STARK|Oh, I get it.  Yo...|[oh,, i, get, it....|[oh,, get, it., ,...|
|  TONY STARK|Oh.  I see.  So i...|[oh., , i, see., ...|[oh., , see., , i...|
|  TONY STARK|Good God, you’re ...|[good, god,, you’...|[good, god,, you’...|
|  TONY STARK|             Please.|           [please.]|           [please.]|
|  TONY STARK|Excellent questio...|[excellent, quest...|[excellent, quest...|
|  TONY STARK|      Join the club.|  [join, the, club.]|       [join, club.]|
|  TONY STARK|Are you aware tha...|[are, you, aware,...|[aware, native, a...|
|JAMES RHODES|GET DOWN, TONY.  ...|[get, down,, tony...|[get, down,, tony...|
|JAMES RHODES|As Program Manage...|[as, program, man...|[program, manager...|
|  TONY STARK|...you think we’r...|[...you, think, w...|[...you,

In [40]:
hashTF = HashingTF(inputCol=swr_MCU.getOutputCol(), outputCol="features")
hash_MCU = hashTF.transform(nosw_MCU).select('new_line', 'features')
hash_MCU.show(n=3)

+--------------------+--------------------+
|            new_line|            features|
+--------------------+--------------------+
|[oh,, i, get, it....|(262144,[44954,84...|
|[oh., , i, see., ...|(262144,[8938,109...|
|[good, god,, you’...|(262144,[6808,353...|
+--------------------+--------------------+
only showing top 3 rows



In [41]:
prediction = model.transform(hash_MCU)
predictionFinal_mcu = prediction.select(
    "new_line", "prediction")
predictionFinal_mcu.show(n=300)

+--------------------+----------+
|            new_line|prediction|
+--------------------+----------+
|[oh,, i, get, it....|       0.0|
|[oh., , i, see., ...|       0.0|
|[good, god,, you’...|       0.0|
|           [please.]|       2.0|
|[excellent, quest...|       2.0|
|  [join, the, club.]|       0.0|
|[are, you, aware,...|       0.0|
|[get, down,, tony...|       0.0|
|[as, program, man...|       0.0|
|[...you, think, w...|       1.0|
|[hold, on, a, sec...|       2.0|
|[yeah., , they, s...|       1.0|
|[okay,, let’s, do...|       1.0|
|[a, lot, of, peop...|       0.0|
|[it, belongs, to,...|       0.0|
|[what’s, wrong, w...|       0.0|
|[hold, that, thou...|       0.0|
|[...you, just, bl...|       2.0|
|[yeah., , don’t, ...|       1.0|
|[everything’s, fu...|       1.0|
|[no., , you’re, n...|       0.0|
|[we’ve, got, a, h...|       0.0|
|[one, more, stop....|       0.0|
|[this, is, no, jo...|       1.0|
|[this, system, ha...|       1.0|
|[tony,, it’s, the...|       1.0|
|[...jim,, how

In [42]:
test = predictionFinal_mcu.groupBy('prediction').count()
test.show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 4188|
|       1.0| 1504|
|       2.0|  817|
+----------+-----+

