In [2]:
import findspark
findspark.init('/home/ubuntu/spark-3.0.0-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spam').getOrCreate()

In [3]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Read in data

In [4]:
data = spark.read.csv('../datasets_483_982_spam.csv', inferSchema=True, header=True)

In [5]:
data.printSchema()

root
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [6]:
data.count()

5574

In [7]:
data.show()

+----+--------------------+----+----+----+
|  v1|                  v2| _c2| _c3| _c4|
+----+--------------------+----+----+----+
| ham|Go until jurong p...|null|null|null|
| ham|Ok lar... Joking ...|null|null|null|
|spam|Free entry in 2 a...|null|null|null|
| ham|U dun say so earl...|null|null|null|
| ham|Nah I don't think...|null|null|null|
|spam|FreeMsg Hey there...|null|null|null|
| ham|Even my brother i...|null|null|null|
| ham|As per your reque...|null|null|null|
|spam|WINNER!! As a val...|null|null|null|
|spam|Had your mobile 1...|null|null|null|
| ham|I'm gonna be home...|null|null|null|
|spam|SIX chances to wi...|null|null|null|
|spam|URGENT! You have ...|null|null|null|
| ham|I've been searchi...|null|null|null|
| ham|I HAVE A DATE ON ...|null|null|null|
|spam|XXXMobileMovieClu...|null|null|null|
| ham|Oh k...i'm watchi...|null|null|null|
| ham|Eh u remember how...|null|null|null|
| ham|Fine if that��s t...|null|null|null|
|spam|England v Macedon...|null|null|null|
+----+-----

In [8]:
data.na.drop(thresh=3).show()
data.na.drop(thresh=3).count()

+----+--------------------+--------------------+--------------------+--------------------+
|  v1|                  v2|                 _c2|                 _c3|                 _c4|
+----+--------------------+--------------------+--------------------+--------------------+
|spam|"Your free ringto...|         PO Box 5249| MK17 92H. 450Ppw...|                null|
| ham| \Wen u miss someone| the person is de...|    why to miss them|" just Keep-in-to...|
| ham|\HEY HEY WERETHE ...| HOWU DOIN? FOUND...|                null|                null|
|spam|SMS. ac sun0819 p...|" wanted to say h...|                null|                null|
| ham|Height of Confide...|this wont even st...|                null|                null|
|spam|"Your free ringto...|         PO Box 5249| MK17 92H. 450Ppw...|                null|
| ham|"Edison has right...|                  GN|                  GE|            GNT:-)""|
| ham|"Height of \Oh sh...|           .;-):-D""|                null|                null|

49

## Clean Data

In [9]:
from pyspark.sql.functions import concat_ws, col

In [10]:
# Fill null columns with empty string
data = data.na.fill(' ')

In [11]:
# Combine all string columns with a SPACE separator

data = data.withColumn('full_text', concat_ws(' ',
    col('v2'),
    col('_c2'),
    col('_c3'),
    col('_c4'),
))

In [12]:
# Select only necessary columns and drop any rows with null values

data = data.select(['v1', 'full_text'])
data = data.na.drop()

## Tokenize Words

In [13]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

In [14]:
regexTokenizer = RegexTokenizer(inputCol="full_text", outputCol="words", pattern="[\\w]+", gaps=False)

In [15]:
tokenized = regexTokenizer.transform(data)

In [16]:
'''
If the regex pattern did not match any text in the `full_text`
column, that column will have a null value (or `None`). In order
to avoid conflict, we must drop those rows.
'''

tokenized = tokenized.na.drop()

## Remove Stop Words

In [17]:
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')

In [18]:
filtered_data = remover.transform(tokenized).select(['v1', 'filtered_words'])

## Hash Words in Word Vector

In [19]:
from pyspark.ml.feature import CountVectorizer, StringIndexer, VectorIndexer, HashingTF, IDF

In [20]:
indexer = StringIndexer(inputCol='v1', outputCol='label').fit(filtered_data).transform(filtered_data)

In [21]:
cv = CountVectorizer(inputCol="filtered_words", outputCol="features", vocabSize=100000)

final_data = cv.fit(indexer).transform(indexer)

In [22]:
final_data.show()

+----+--------------------+-----+--------------------+
|  v1|      filtered_words|label|            features|
+----+--------------------+-----+--------------------+
| ham|[go, jurong, poin...|  0.0|(8624,[11,16,37,6...|
| ham|[ok, lar, joking,...|  0.0|(8624,[0,9,248,37...|
|spam|[free, entry, 2, ...|  1.0|(8624,[2,10,23,24...|
| ham|[u, dun, say, ear...|  0.0|(8624,[0,57,84,86...|
| ham|[nah, think, goes...|  0.0|(8624,[53,139,369...|
|spam|[freemsg, hey, da...|  1.0|(8624,[9,15,21,26...|
| ham|[even, brother, l...|  0.0|(8624,[15,130,288...|
| ham|[per, request, me...|  0.0|(8624,[149,162,30...|
|spam|[winner, valued, ...|  1.0|(8624,[1,65,82,15...|
|spam|[mobile, 11, mont...|  1.0|(8624,[0,1,10,31,...|
| ham|[m, gonna, home, ...|  0.0|(8624,[3,22,29,34...|
|spam|[six, chances, wi...|  1.0|(8624,[6,18,21,24...|
|spam|[urgent, won, 1, ...|  1.0|(8624,[10,24,26,5...|
| ham|[ve, searching, r...|  0.0|(8624,[45,77,85,1...|
| ham|      [date, sunday]|  0.0|(8624,[482,679],[...|
|spam|[xxx

## Create Train and Test Data

* 60/40 split, Train/Test

In [23]:
train_data, test_data = final_data.randomSplit([0.6, 0.4])

## Create and Train Model

In [24]:
nb = NaiveBayes()

# train the model
model = nb.fit(train_data)

In [25]:
# select example rows to display.
predictions = model.transform(test_data)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

+---+--------------------+-----+--------------------+--------------------+--------------------+----------+
| v1|      filtered_words|label|            features|       rawPrediction|         probability|prediction|
+---+--------------------+-----+--------------------+--------------------+--------------------+----------+
|ham|                  []|  0.0|        (8624,[],[])|[-0.1502082941297...|[0.86052871467639...|       0.0|
|ham|                  []|  0.0|        (8624,[],[])|[-0.1502082941297...|[0.86052871467639...|       0.0|
|ham|                  []|  0.0|        (8624,[],[])|[-0.1502082941297...|[0.86052871467639...|       0.0|
|ham|                  []|  0.0|        (8624,[],[])|[-0.1502082941297...|[0.86052871467639...|       0.0|
|ham|                  []|  0.0|        (8624,[],[])|[-0.1502082941297...|[0.86052871467639...|       0.0|
|ham|[1, number, 2, go...|  0.0|(8624,[2,5,26,43,...|[-91.373094987353...|[0.99995448134812...|       0.0|
|ham|[1, reach, home, ...|  0.0|(8624

In [26]:
print(f'Test set accuracy = {accuracy*100}%')

Test set accuracy = 97.76902887139107%


## Prediction Analysis

In [27]:
from pyspark.sql.functions import array_contains, explode

### Common Spam Words

In this section, the goal is to ensure the model is working correctly. By checking the words that appear most frequently in spam emails and words that appear most frequently in *predicted* spam emails, we can verify that the model is behaving as expected (to a degree). 


#### Words that Occur Most Frequently in Spam Emails

In [28]:
# Filter data for spam-only emails
spam_emails = final_data.filter(final_data.label == 1)

# Split out list of words and create a row for every word in the list in each row
spam_words = spam_emails.select(final_data.v1, explode(final_data.filtered_words).alias('word'))

# Group by words and count
spam_words_count = spam_words.groupBy('word').count()

# Order by most frequent words in spam emails
spam_words_count.orderBy(spam_words_count['count'].desc()).show()

+------+-----+
|  word|count|
+------+-----+
|  call|  355|
|  free|  224|
|     2|  207|
|     u|  174|
|   txt|  163|
|    ur|  144|
|     4|  137|
|mobile|  127|
|  text|  125|
|  stop|  123|
| claim|  113|
|     1|  112|
| reply|  104|
|   www|   98|
| prize|   93|
|   get|   86|
|   won|   76|
|  cash|   76|
|    uk|   74|
|  150p|   71|
+------+-----+
only showing top 20 rows



#### Words that Occur Most Frequently in Spam Predictions

In [29]:
# Filter predictions for spam only predictions
spam_predictions = predictions.filter(predictions.prediction == 1)

# Split out list of words and create a row for every word in the list in each row
spam_pred_words = spam_predictions.select(predictions.v1, explode(predictions.filtered_words).alias('word'))

# Group by words and count
spam_pred_words_count = spam_pred_words.groupBy('word').count()

# Order by most frequent words in spam predicitons
spam_pred_words_count.orderBy(spam_pred_words_count['count'].desc()).show()

+------+-----+
|  word|count|
+------+-----+
|  call|  129|
|  free|   94|
|     2|   74|
|     u|   67|
|   txt|   62|
|    ur|   53|
|     4|   52|
|mobile|   51|
| claim|   49|
|  text|   47|
| prize|   41|
| reply|   41|
|   www|   39|
|  stop|   39|
|     1|   37|
| nokia|   35|
|   won|   32|
|   new|   32|
|   get|   31|
|    uk|   30|
+------+-----+
only showing top 20 rows



____