<link rel='stylesheet' href='../assets/css/main.css'/>

[<< back to main index](../README.md)

# Naive Bayes Spam Filtering

### Overview

We all hate spam, so developing a classifier to classify email as spam or not spam is useful.  

### Builds on
None

### Run time
approx. 20-30 minutes

### Notes

PySpark has a class called NaiveBayes that can be used to do Naive Bayes classification.

In [29]:
%matplotlib inline

import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

print('Spark UI running on http://18.208.221.237:' + sc.uiWebUrl.split(':')[2])

Spark UI running on http://18.208.221.237:4040


## Step 1: Let's load the dataframe

We will load the dataframe into spark.  Since the outcome label is "ham" or "spam", we'll just call it label.

In [30]:
t1 = time.perf_counter()

dataset = spark.read.format("csv").\
          option('header','true').\
          option('delimiter', '\t').\
          option('inferSchema', 'true').\
          load("/data/spam/SMSSpamCollection.txt")

t2 = time.perf_counter() 

print("read {:,} records in {:,.2f} ms".format(dataset.count(), (t2-t1)*1000))

dataset.printSchema()
dataset.show()

read 5,574 records in 128.62 ms
root
 |-- isspam: string (nullable = true)
 |-- text: string (nullable = true)

+------+--------------------+
|isspam|                text|
+------+--------------------+
|   ham|Go until jurong p...|
|   ham|Ok lar... Joking ...|
|  spam|Free entry in 2 a...|
|   ham|U dun say so earl...|
|   ham|Nah I don't think...|
|  spam|FreeMsg Hey there...|
|   ham|Even my brother i...|
|   ham|As per your reque...|
|  spam|WINNER!! As a val...|
|  spam|Had your mobile 1...|
|   ham|I'm gonna be home...|
|  spam|SIX chances to wi...|
|  spam|URGENT! You have ...|
|   ham|I've been searchi...|
|   ham|I HAVE A DATE ON ...|
|  spam|XXXMobileMovieClu...|
|   ham|Oh k...i'm watchi...|
|   ham|Eh u remember how...|
|   ham|Fine if thats th...|
|  spam|England v Macedon...|
+------+--------------------+
only showing top 20 rows



In [31]:
## Count spam/ham
dataset.groupby("isspam").count().show()

+------+-----+
|isspam|count|
+------+-----+
|   ham| 4827|
|  spam|  747|
+------+-----+



## Step 2: Vectorize using tf/idf

Let's use tf/idf for vecorization at first.  TF/IDF will take and count the instances of each term, and then divide by the total frequecy of that term in the entire dataset.  

This leads to very highly dimensional data, because every word in the document will lead to a dimension in the data.

In [32]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

## TODO : split the text into words
## Hint : outputCol = 'words'
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(dataset)
wordsData.show()


+------+--------------------+--------------------+
|isspam|                text|               words|
+------+--------------------+--------------------+
|   ham|Go until jurong p...|[go, until, juron...|
|   ham|Ok lar... Joking ...|[ok, lar..., joki...|
|  spam|Free entry in 2 a...|[free, entry, in,...|
|   ham|U dun say so earl...|[u, dun, say, so,...|
|   ham|Nah I don't think...|[nah, i, don't, t...|
|  spam|FreeMsg Hey there...|[freemsg, hey, th...|
|   ham|Even my brother i...|[even, my, brothe...|
|   ham|As per your reque...|[as, per, your, r...|
|  spam|WINNER!! As a val...|[winner!!, as, a,...|
|  spam|Had your mobile 1...|[had, your, mobil...|
|   ham|I'm gonna be home...|[i'm, gonna, be, ...|
|  spam|SIX chances to wi...|[six, chances, to...|
|  spam|URGENT! You have ...|[urgent!, you, ha...|
|   ham|I've been searchi...|[i've, been, sear...|
|   ham|I HAVE A DATE ON ...|[i, have, a, date...|
|  spam|XXXMobileMovieClu...|[xxxmobilemoviecl...|
|   ham|Oh k...i'm watchi...|[o

In [33]:
## compute the hash of words
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show()


+------+--------------------+--------------------+--------------------+--------------------+
|isspam|                text|               words|         rawFeatures|            features|
+------+--------------------+--------------------+--------------------+--------------------+
|   ham|Go until jurong p...|[go, until, juron...|(20,[0,2,5,6,7,10...|(20,[0,2,5,6,7,10...|
|   ham|Ok lar... Joking ...|[ok, lar..., joki...|(20,[0,4,6,16,19]...|(20,[0,4,6,16,19]...|
|  spam|Free entry in 2 a...|[free, entry, in,...|(20,[0,4,5,6,7,8,...|(20,[0,4,5,6,7,8,...|
|   ham|U dun say so earl...|[u, dun, say, so,...|(20,[1,2,3,6,8,12...|(20,[1,2,3,6,8,12...|
|   ham|Nah I don't think...|[nah, i, don't, t...|(20,[0,3,4,6,8,9,...|(20,[0,3,4,6,8,9,...|
|  spam|FreeMsg Hey there...|[freemsg, hey, th...|(20,[0,2,5,6,7,8,...|(20,[0,2,5,6,7,8,...|
|   ham|Even my brother i...|[even, my, brothe...|(20,[1,3,6,7,8,10...|(20,[1,3,6,7,8,10...|
|   ham|As per your reque...|[as, per, your, r...|(20,[0,2,3,5,6,8,...

In [34]:
rescaledData.select("isspam", "text", "features").show()

+------+--------------------+--------------------+
|isspam|                text|            features|
+------+--------------------+--------------------+
|   ham|Go until jurong p...|(20,[0,2,5,6,7,10...|
|   ham|Ok lar... Joking ...|(20,[0,4,6,16,19]...|
|  spam|Free entry in 2 a...|(20,[0,4,5,6,7,8,...|
|   ham|U dun say so earl...|(20,[1,2,3,6,8,12...|
|   ham|Nah I don't think...|(20,[0,3,4,6,8,9,...|
|  spam|FreeMsg Hey there...|(20,[0,2,5,6,7,8,...|
|   ham|Even my brother i...|(20,[1,3,6,7,8,10...|
|   ham|As per your reque...|(20,[0,2,3,5,6,8,...|
|  spam|WINNER!! As a val...|(20,[0,1,3,4,5,6,...|
|  spam|Had your mobile 1...|(20,[0,2,3,4,6,7,...|
|   ham|I'm gonna be home...|(20,[0,1,3,5,6,8,...|
|  spam|SIX chances to wi...|(20,[0,1,2,3,6,8,...|
|  spam|URGENT! You have ...|(20,[0,2,4,5,6,7,...|
|   ham|I've been searchi...|(20,[0,1,2,3,5,7,...|
|   ham|I HAVE A DATE ON ...|(20,[1,2,4,9,10,1...|
|  spam|XXXMobileMovieClu...|(20,[1,3,5,6,7,8,...|
|   ham|Oh k...i'm watchi...|(2

## Step 3: Create a numeric label out of the string column "isspam."

In [35]:
from pyspark.ml.feature import StringIndexer

## TODO : Index 'isspam' column into 'label' column
## Hint : inputCol = 'isspam',   outputCol = 'label'
indexer = StringIndexer(inputCol="isspam", outputCol="label")
indexed = indexer.fit(rescaledData).transform(rescaledData)

indexed.select(['text', 'isspam', 'label', 'features']).show()


+--------------------+------+-----+--------------------+
|                text|isspam|label|            features|
+--------------------+------+-----+--------------------+
|Go until jurong p...|   ham|  0.0|(20,[0,2,5,6,7,10...|
|Ok lar... Joking ...|   ham|  0.0|(20,[0,4,6,16,19]...|
|Free entry in 2 a...|  spam|  1.0|(20,[0,4,5,6,7,8,...|
|U dun say so earl...|   ham|  0.0|(20,[1,2,3,6,8,12...|
|Nah I don't think...|   ham|  0.0|(20,[0,3,4,6,8,9,...|
|FreeMsg Hey there...|  spam|  1.0|(20,[0,2,5,6,7,8,...|
|Even my brother i...|   ham|  0.0|(20,[1,3,6,7,8,10...|
|As per your reque...|   ham|  0.0|(20,[0,2,3,5,6,8,...|
|WINNER!! As a val...|  spam|  1.0|(20,[0,1,3,4,5,6,...|
|Had your mobile 1...|  spam|  1.0|(20,[0,2,3,4,6,7,...|
|I'm gonna be home...|   ham|  0.0|(20,[0,1,3,5,6,8,...|
|SIX chances to wi...|  spam|  1.0|(20,[0,1,2,3,6,8,...|
|URGENT! You have ...|  spam|  1.0|(20,[0,2,4,5,6,7,...|
|I've been searchi...|   ham|  0.0|(20,[0,1,2,3,5,7,...|
|I HAVE A DATE ON ...|   ham|  

## Step 4: Split into training and test

We will split our dataset into training and test sets.

In [36]:
# TODO : Split the data into train and test into 80/20
(train, test) = indexed.randomSplit([80.0, 20.0])

print("training set count : ", train.count())
print("testing set count : ", test.count())

training set count :  4434
testing set count :  1140


## Step 5: Fit Naive Bayes model

In [37]:
from pyspark.ml.classification import NaiveBayes

## TODO : create the trainer and set its parameters
## Hint : NaiveBayes  (see the class name above)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
t1 = time.perf_counter()
## TODO : fit on training data (hint: train)
model = nb.fit(train)
t2 = time.perf_counter()

print("trained on {:,} records  in {:,.2f} ms".\
      format(train.count(), (t2-t1)*1000))

trained on 4,434 records  in 944.30 ms


## Step 6: Run test data

Let's call .transform on our model to do make predictions on our test data. The output should be contained in the "prediction" column, while the correct label will be there in the "label" column. 

We will be able to evaluate our results by comparing the results.

In [38]:
# select example rows to display.
## TODO : transform on test data (hint : test)
predictions = model.transform(test)
predictions.show()


+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|isspam|                text|               words|         rawFeatures|            features|label|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|   ham| &lt;#&gt;  in mc...|[, &lt;#&gt;, , i...|(20,[3,5,6,7,12,1...|(20,[3,5,6,7,12,1...|  0.0|[-18.221422993911...|[0.89041228332752...|       0.0|
|   ham| said kiss, kiss,...|[, said, kiss,, k...|(20,[0,1,3,4,5,6,...|(20,[0,1,3,4,5,6,...|  0.0|[-57.257499934769...|[0.94415845314779...|       0.0|
|   ham| says that he's q...|[, says, that, he...|(20,[0,2,3,4,5,6,...|(20,[0,2,3,4,5,6,...|  0.0|[-90.675695295924...|[0.95431026870009...|       0.0|
|   ham|"Speak only when ...|["speak, only, wh...|(20,[0,1,3,5,10,1...|(20,[0,1,3,5,10,1

## Step 7: Evaluate the model

Let's look at how our model performs.  We will do an accuracy measure.

In [39]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.85


Let us do a confusion matrix.

In [40]:
predictions.groupBy('label').pivot('prediction', [0,1]).count().na.fill(0).orderBy('label').show()

## Can you explain the confusion matrix

+-----+---+---+
|label|  0|  1|
+-----+---+---+
|  0.0|966|  4|
|  1.0|167|  3|
+-----+---+---+



## Step 8: Improve prediction results

We used too few features above, and got bad accuracy. Increase the number of features for HashingTF

## Step 9:  Run your own test

Now it's your turn!   Make a new dataframe with some sample test data of your own creation.  Make some "spammy" SMSes and some ordinary ones.  See how our spam filter does.

In [41]:
# TODO: make a dataframe with some of your own data.
mydata = pd.DataFrame({'text' : ['hey, can we meet 1 hr later?', 
                                'WINNER!  Click here to claim your prize !!!!',
                                'CHEAP DEGREEES !!', 
                                'your text here',
                                'test']
                         })

mydata2 = spark.createDataFrame(mydata)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
fv = tokenizer.transform(mydata2)
fv.show()

## NOTE : make sure this 'numFeatures' matches the 'numFeatures' in step-2
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
fv = hashingTF.transform(fv)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
fv = idfModel.transform(fv)
fv.show()

+--------------------+--------------------+
|                text|               words|
+--------------------+--------------------+
|hey, can we meet ...|[hey,, can, we, m...|
|WINNER!  Click he...|[winner!, , click...|
|   CHEAP DEGREEES !!|[cheap, degreees,...|
|      your text here|  [your, text, here]|
|                test|              [test]|
+--------------------+--------------------+



Py4JJavaError: An error occurred while calling o1074.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 120.0 failed 1 times, most recent failure: Lost task 0.0 in stage 120.0 (TID 1489, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$2: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$2: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ArrayIndexOutOfBoundsException


In [42]:
predictions = model.transform(fv)
predictions.select(['text', 'prediction']).show()

Py4JJavaError: An error occurred while calling o1142.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 121.0 failed 1 times, most recent failure: Lost task 0.0 in stage 121.0 (TID 1490, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$2: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$2: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.lang.ArrayIndexOutOfBoundsException


## FUN : How will you defeat this algorithm? :-) 

If you are spammer, how can you defeat this algorithm?

<img src="../assets/images/come-tothe-dark-side-iin-we-have-cookies.png">

# BONUS: Word2Vec Instead of TF/IDF

We used the TF/IDF encoding. We might get better resu

lts if we use Word2Vec instead. Run with word2vec and see if you get a better accuracy rate.