
# Detecting Spam with Spark

This notebook is about classification of e-mail messages as spam or non-spam in Spark. We will go through the whole process from loading and preprocessing to training and testing classifiers in a distributed way in Spark.  

## Load and prepare the data

In [5]:
%cd
!brew install openjdk@8
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz > /dev/null
!pip install -q findspark
import os 
os.environ["JAVA_HOME"] = "/usr/local/Cellar/openjdk@8/1.8.0+282" # appropriately set environment variables
os.environ["SPARK_HOME"] = "/Users/Braganca/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()

import pyspark
# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# get the context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)

/Users/Braganca
<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f8052a29b10>


In [6]:
%cd /Users/Braganca/lingspam_public

!cat readme.txt # shows the content of the readme file, which explains the structure of the dataset

/Users/Braganca/lingspam_public
This directory contains the Ling-Spam corpus, as described in the 
paper:

I. Androutsopoulos, J. Koutsias, K.V. Chandrinos, George Paliouras, 
and C.D. Spyropoulos, "An Evaluation of Naive Bayesian Anti-Spam 
Filtering". In Potamias, G., Moustakis, V. and van Someren, M. (Eds.), 
Proceedings of the Workshop on Machine Learning in the New Information 
Age, 11th European Conference on Machine Learning (ECML 2000), 
Barcelona, Spain, pp. 9-17, 2000.

There are four subdirectories, corresponding to four versions of 
the corpus:

bare: Lemmatiser disabled, stop-list disabled.
lemm: Lemmatiser enabled, stop-list disabled.
lemm_stop: Lemmatiser enabled, stop-list enabled.
stop: Lemmatiser disabled, stop-list enabled.

Each one of these 4 directories contains 10 subdirectories (part1, 
..., part10). These correspond to the 10 partitions of the corpus 
that were used in the 10-fold experiments. In each repetition, one 
part was reserved for 

## Read the dataset and create RDDs 

Load all text files per directory (`part1, part2, ... , part10`) using `wholeTextFiles()`, which creates one RDD per part, containing tuples `(filename, text)`.
Use one of the RDDs as test set, the rest as training set. For the training set create the union of the remaining RDDs. If the filename starts with 'spmsg' it is spam, otherwise it is not. 

In [7]:
from pathlib import Path
import re

def makeTestTrainRDDs(pathString):
    """ Takes one of the four subdirectories of the lingspam dataset and returns two RDDs one each for testing and training. """
    # We should see 10 parts that we can use for creating train and test sets.
    p = Path(pathString) # gets a path object representing the current directory path.
    dirs = list(p.iterdir()) # get the directories part1 ... part10. 
    #print(dirs) 
    rddList = []
    # create an RDD for each 'part' directory and add them to rddList
    print('creating RDDs')
    for d in dirs: # iterate through the directories
        dir_path = str(d.resolve())
        print(dir_path) 
        rdd = sc.wholeTextFiles(dir_path)
        rddList.append(rdd)
    #print('len(rddList)', len(rddList))  # should have 10 RDDs in the list # just for testing
    #print(rddList[1].take(1)) # just for testing

    testRDD1 = rddList[9] # set the test set
    trainRDD1 = rddList[0] # start the training set from 0 and 
    # create a union of the remaining RDDs (parts 0-8)
    print('creating RDD union')
    for i in range(1, 9):
        trainRDD1 = trainRDD1.union(rddList[i]) 
    # both RDDs should remove the paths and extensions from the filename. 
    testRDD2 = testRDD1.map(lambda fn_txt: (re.split('[/\.]', fn_txt[0])[-2],fn_txt[1]))
    trainRDD2 = trainRDD1.map(lambda fn_txt: (re.split('[/\.]', fn_txt[0])[-2],fn_txt[1]))
    return (trainRDD2, testRDD2)

# make sure we are in the right directory
%cd /Users/Braganca/lingspam_public 
# this should show the directories "bare  lemm  lemm_stop  readme.txt  stop"
!ls 
# the code below is for testing the function makeTestTrainRDDs
trainRDD_testRDD = makeTestTrainRDDs('bare') # read from the 'bare' directory - this takes a bit of time
(trainRDD, testRDD) = trainRDD_testRDD # unpack the returned tuple
print('created the RDDs') 
print('testRDD.count(): ', testRDD.count()) 
#print('trainRDD.count(): ', trainRDD.count()) # commented out to save time as it takes some time to create RDD from all the files
print('testRDD.getNumPartitions():', testRDD.getNumPartitions()) 
print('testRDD.getStorageLevel():', testRDD.getStorageLevel()) # Serialized, 1x Replicated, expected to be (False, False, False, False, 1) 
print('testRDD.take(1): ', testRDD.take(1)) # should be (filename, message) 
rdd1 = testRDD # to be used later

/Users/Braganca/lingspam_public
[34mbare[m[m       [34mlemm[m[m       [34mlemm_stop[m[m  readme.txt [34mstop[m[m
creating RDDs
/Users/Braganca/lingspam_public/bare/part3
/Users/Braganca/lingspam_public/bare/part4
/Users/Braganca/lingspam_public/bare/part5
/Users/Braganca/lingspam_public/bare/part2
/Users/Braganca/lingspam_public/bare/part10
/Users/Braganca/lingspam_public/bare/part9
/Users/Braganca/lingspam_public/bare/part7
/Users/Braganca/lingspam_public/bare/part1
/Users/Braganca/lingspam_public/bare/part6
/Users/Braganca/lingspam_public/bare/part8
creating RDD union
created the RDDs
testRDD.count():  289
testRDD.getNumPartitions(): 2
testRDD.getStorageLevel(): Serialized 1x Replicated
testRDD.take(1):  [('8-1064msg1', 'Subject: re : 8 . 1044 , disc : grammar in schools\n\n( re message from : linguist @ linguistlist . org ) > > linguist list : vol-8 - 1044 . sat jul 12 1997 . issn : 1068-4875 . > > subject : 8 . 1044 , disc : grammar in schools > > i know and teach that 

## Tokenize and remove punctuation

Now we need to split the words(*tokenization*) and remove punctuation. 

In [8]:
import nltk
import re
from nltk.corpus import stopwords

def tokenize(text):
    """ Apply the nltk.word_tokenize() method to our text, return the token list. """
    nltk.download('punkt') #  loads the standard NLTK tokenizer model 
    # it is important that this is done here in the function, as it needs to be done on every worker.
    # If we do the download outside a this function, it would only be executed on the driver     
    return nltk.word_tokenize(text)
        
def removePunctuation(tokens):
    """ Remove punctuation characters from all tokens in a provided list. """
    # remove all punctuation from string 
    tokens2 =  [re.sub('[()\[\],.?!";_]','', s) for s in tokens]
    return tokens2
    
def prepareTokenRDD(fn_txt_RDD):
    """ Take an RDD with (filename, text) elements and transform it into a (filename, [token ...]) RDD without punctuation characters. """
    rdd_vals2 = fn_txt_RDD.values() # It's convenient to process only the values. 
    rdd_vals3 = rdd_vals2.map(tokenize) # Create a tokenised version of the values by mapping
    rdd_vals4 = rdd_vals3.map(removePunctuation) # remove punctuation from the values
    rdd_kv = fn_txt_RDD.keys().zip(rdd_vals4) # zip the two RDDs together 

    # now remove any empty strings created by removing punctuation, and resulting entries without words left.
    rdd_kvr = rdd_kv.map(lambda x: (x[0], [s for s in x[1] if len(s) > 0]))
    # remove empty token lists 
    rdd_kvrf = rdd_kvr.filter(lambda x: len(x[1]) > 0)
    return rdd_kvrf

rdd2 = prepareTokenRDD(rdd1) # Use a small RDD for testing.
print(rdd2.take(5))

[('8-1064msg1', ['Subject', ':', 're', ':', '8', '1044', 'disc', ':', 'grammar', 'in', 'schools', 're', 'message', 'from', ':', 'linguist', '@', 'linguistlist', 'org', '>', '>', 'linguist', 'list', ':', 'vol-8', '-', '1044', 'sat', 'jul', '12', '1997', 'issn', ':', '1068-4875', '>', '>', 'subject', ':', '8', '1044', 'disc', ':', 'grammar', 'in', 'schools', '>', '>', 'i', 'know', 'and', 'teach', 'that', 'not', 'all', 'infinitives', 'contain', '`', 'to', "'", 'i', 'also', 'give', '>', 'the', 'students', 'examples', 'e', 'g', '`', 'i', 'asked', 'him', 'to', 'kindly', 'apologise', "'", 'where', '>', 'placing', 'the', 'adverb', 'anywhere', 'else', 'would', 'cause', 'ambiguity', '>', '>', 'jennifer', 'chew', 'an', 'example', 'i', 'once', 'concocted', 'to', 'justify', '``', 'splitting', 'the', 'infintive', '``', 'or', 'not', 'as', 'the', 'case', 'may', 'be', 'is', ':', 'a', 'after', 'a', 'heavy', 'meal', 'i', 'prepared', 'slowly', 'to', 'go', 'home', 'digesting', 'b', 'after', 'a', 'heavy', '

## Create normalised TF.IDF vectors of defined size

In [9]:
# use hashing to create a fixed-size vector from a word list
def hashing_vectorize(text, N): # arguments: the list and the size of the output vector
    v = [0] * N  # create vector of 0s
    for word in text: # iterate through the words 
        h = hash(word)
        # add 1 at the hashed address 
        v[h % N] += 1
    return v # return hashed word vector

from pyspark.mllib.feature import IDF, Normalizer

def normTFIDF(fn_tokens_RDD, vecDim):
    keysRDD = fn_tokens_RDD.keys()
    tokensRDD = fn_tokens_RDD.values()
    tfVecRDD = tokensRDD.map(lambda tokens: hashing_vectorize(tokens, vecDim)) 
    idf = IDF() # create IDF object
    idfModel = idf.fit(tfVecRDD) # calculate IDF values
    tfIdfRDD = idfModel.transform(tfVecRDD)
    norm = Normalizer() # create a Normalizer object
    normTfIdfRDD = norm.transform(tfIdfRDD)
    zippedRDD = keysRDD.zip(normTfIdfRDD)
    return zippedRDD

testDim = 10 # too small for good accuracy, but OK for testing
rdd3 = normTFIDF(rdd2, testDim) # test normTFIDF function
print(rdd3.take(1)) # tuples with ('filename', [N-dim vector])

[('8-1064msg1', DenseVector([0.6009, 0.1198, 0.1819, 0.0, 0.0, 0.3999, 0.5172, 0.2085, 0.3472, 0.0]))]


## Create Labelled Points 
Determine whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (non-spam) accordingly.

In [10]:
from pyspark.mllib.regression import LabeledPoint

# create labelled points of vector size N out of the RDD with normalised (filename, td.idf-vector) items
def makeLabeledPoints(fn_vec_RDD):  
    # determine the true class as encoded in the filename and represent as 1 (spam) or 0 (good) 
    cls_vec_RDD = fn_vec_RDD.map(lambda x: (1, x[1]) if x[0].startswith('spmsg') else (0,x[1]))
    # create the LabeledPoint objects with (class, vector) arguments
    lp_RDD = cls_vec_RDD.map(lambda cls_vec: LabeledPoint(cls_vec[0], cls_vec[1]) ) 
    return lp_RDD 

# for testing
testLpRDD = makeLabeledPoints(rdd3)
print(testLpRDD.take(1))

[LabeledPoint(0.0, [0.6009150344196178,0.11976690682157425,0.18186826591424235,0.0,0.0,0.39991491844182403,0.5172418880942757,0.2084831340968144,0.3471953532202236,0.0])]


## Complete the preprocessing 
Integrate everything to have a single function to do the preprocessing.

In [11]:
# apply the preprocessing chain to the data
def preprocess(rawRDD, N):
    """ take a (filename,text) RDD and transform into LabelledPoint objects 
        with class labels and a TF.IDF vector with N dimensions. 
    """
    toknRDD = prepareTokenRDD(rawRDD)
    normRDD = normTFIDF(toknRDD,N)
    lpRDD = makeLabeledPoints(normRDD)
    return lpRDD # return RDD with LabeledPoints

# this starts the whole process from a directory, N is the vector size
def loadAndPreprocess(directory, N):
    """ load lingspam data from a directory and create a training and test set of preprocessed data """
    ftrainRDD_testRDD = makeTestTrainRDDs(directory)
    (trainRDD,testRDD) = ftrainRDD_testRDD
    return (preprocess(trainRDD, N), preprocess(testRDD, N)) # apply the preprocessing function defined above

trainLpRDD = preprocess(trainRDD, testDim) # prepare the training data
print(trainLpRDD.take(1)) # check

train_test_LpRDD = loadAndPreprocess('lemm', 100)
(trainLpRDD, testLpRDD) = train_test_LpRDD
print(testLpRDD.take(1))
print(trainLpRDD.take(1))

[LabeledPoint(0.0, [0.2637866600493808,0.44748645332942294,0.6065785052932868,0.0,0.0,0.06051794242403623,0.376799936865379,0.2141387069181108,0.23374073852613333,0.34074074684210165])]
creating RDDs
/Users/Braganca/lingspam_public/lemm/part3
/Users/Braganca/lingspam_public/lemm/part4
/Users/Braganca/lingspam_public/lemm/part5
/Users/Braganca/lingspam_public/lemm/part2
/Users/Braganca/lingspam_public/lemm/part10
/Users/Braganca/lingspam_public/lemm/part9
/Users/Braganca/lingspam_public/lemm/part7
/Users/Braganca/lingspam_public/lemm/part1
/Users/Braganca/lingspam_public/lemm/part6
/Users/Braganca/lingspam_public/lemm/part8
creating RDD union
[LabeledPoint(0.0, [0.023025588154406303,0.03990474409476495,0.05543803033005656,0.07555383010020532,0.0,0.03827674183305801,0.37254955965631914,0.030259500929017198,0.047662478924632136,0.0,0.0,0.005040169037386492,0.4236126382119103,0.04430486406570511,0.06061866555976338,0.05871296012132274,0.0,0.045678416985632686,0.035150827322761063,0.0366154

## Train some classifiers 

Use the `LabeledPoint` objects to train a classifier, specifically *Logistic Regression*, *Naive Bayes*, and *Support Vector Machine* and calculate the accuracy of the model on the training set

In [12]:
from pyspark.mllib.classification import NaiveBayes, LogisticRegressionWithLBFGS, SVMWithSGD
from pyspark import StorageLevel

def trainModel(lpRDD):
    """ Train 3 classifier models on the given RDD with LabeledPoint objects."""
    print('Starting to train the model')
    model1 = LogisticRegressionWithLBFGS.train(lpRDD)
    print('Trained LR (model1)')
    model2 = NaiveBayes.train(lpRDD)
    print('Trained NB (model2)')
    model3 = SVMWithSGD.train(lpRDD)
    print('Trained SVM (model3)')
    return [model1, model2, model3]

def testModel(model, lpRDD):
    """ Tests the classification accuracy of the given model on the given RDD with LabeledPoint objects. """
    lpRDD.persist(StorageLevel.MEMORY_ONLY)
    # Get the prediction and the ground truth label
    predictionAndLabel = lpRDD.map(lambda p: (model.predict(p.features), p.label))
    correct = predictionAndLabel.filter(lambda xv: xv[0] == xv[1]).count() 
    accuracy = correct/lpRDD.count()
    print('Accuracy {:.1%} (data items: {}, correct: {})'.format(accuracy, lpRDD.count(), correct))
    return accuracy

models = trainModel(trainLpRDD) # just for testing
testModel(models[2], trainLpRDD) # just for testing

Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Accuracy 83.4% (data items: 2604, correct: 2171)


0.8337173579109063

## Automate training and testing

Automate the whole process from reading the files, through preprocessing, and training up to evaluating the models.

In [13]:
def trainTestModel(trainRDD, testRDD):
    """ Trains 3 models and tests them on training and test data. Returns a matrix with the training and testing (rows) accuracy values for all models (columns). """
    models = trainModel(trainRDD)
    results = [[], []] # training, test for the models
    for mdl in models:
        print('Training')
        results[0].append(testModel(mdl, trainRDD))
        print('Testing')
        results[1].append(testModel(mdl, testRDD))
    return results

def trainTestFolder(folder,N):
    """ Reads data from a folder, preproceses the data, and trains and evaluates models on it. """
    print('Start loading and preprocessing') 
    train_test_LpRDD = loadAndPreprocess(folder,N) # create the RDDs
    print('Finished loading and preprocessing')
    (trainLpRDD, testLpRDD) = train_test_LpRDD # unpack the RDDs 
    return trainTestModel(trainLpRDD,testLpRDD) # train and test

trainTestFolder('lemm', 1000) 

Start loading and preprocessing
creating RDDs
/Users/Braganca/lingspam_public/lemm/part3
/Users/Braganca/lingspam_public/lemm/part4
/Users/Braganca/lingspam_public/lemm/part5
/Users/Braganca/lingspam_public/lemm/part2
/Users/Braganca/lingspam_public/lemm/part10
/Users/Braganca/lingspam_public/lemm/part9
/Users/Braganca/lingspam_public/lemm/part7
/Users/Braganca/lingspam_public/lemm/part1
/Users/Braganca/lingspam_public/lemm/part6
/Users/Braganca/lingspam_public/lemm/part8
creating RDD union
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Training
Accuracy 100.0% (data items: 2604, correct: 2604)
Testing
Accuracy 97.6% (data items: 289, correct: 282)
Training
Accuracy 94.6% (data items: 2604, correct: 2464)
Testing
Accuracy 90.0% (data items: 289, correct: 260)
Training
Accuracy 83.4% (data items: 2604, correct: 2171)
Testing
Accuracy 83.4% (data items: 289, correct: 241)


[[1.0, 0.946236559139785, 0.8337173579109063],
 [0.9757785467128027, 0.8996539792387543, 0.8339100346020761]]

## Run experiments 

In [14]:
folder = 'bare'
vectorsizes = [3, 30, 300, 3000, 30000]
print('EXPERIMENT 1: Testing different vector sizes')
results_vectorsizes = []
for n in vectorsizes:
    print('\nN = {}'.format(n))
    result = {'n': n, 't': folder}
    result['acc'] = trainTestFolder(folder, n)
    results_vectorsizes.append(result)

EXPERIMENT 1: Testing different vector sizes

N = 3
Start loading and preprocessing
creating RDDs
/Users/Braganca/lingspam_public/bare/part3
/Users/Braganca/lingspam_public/bare/part4
/Users/Braganca/lingspam_public/bare/part5
/Users/Braganca/lingspam_public/bare/part2
/Users/Braganca/lingspam_public/bare/part10
/Users/Braganca/lingspam_public/bare/part9
/Users/Braganca/lingspam_public/bare/part7
/Users/Braganca/lingspam_public/bare/part1
/Users/Braganca/lingspam_public/bare/part6
/Users/Braganca/lingspam_public/bare/part8
creating RDD union
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Training
Accuracy 83.4% (data items: 2604, correct: 2171)
Testing
Accuracy 83.4% (data items: 289, correct: 241)
Training
Accuracy 83.4% (data items: 2604, correct: 2171)
Testing
Accuracy 83.4% (data items: 289, correct: 241)
Training
Accuracy 83.4% (data items: 2604, correct: 2171)
Testing
Accuracy 83.4% (data items: 289, cor

In [15]:
n = 3000
typeFolders = ['bare', 'stop', 'lemm', 'lemm_stop']
print('EXPERIMENT 2: Testing different data types')
results_preprocessing = []
for folder in typeFolders:
    print('\nPath = {}'.format(folder))
    result = {'n': n, 't': folder}
    result['acc'] = trainTestFolder(folder, n)
    results_preprocessing.append(result)

EXPERIMENT 2: Testing different data types

Path = bare
Start loading and preprocessing
creating RDDs
/Users/Braganca/lingspam_public/bare/part3
/Users/Braganca/lingspam_public/bare/part4
/Users/Braganca/lingspam_public/bare/part5
/Users/Braganca/lingspam_public/bare/part2
/Users/Braganca/lingspam_public/bare/part10
/Users/Braganca/lingspam_public/bare/part9
/Users/Braganca/lingspam_public/bare/part7
/Users/Braganca/lingspam_public/bare/part1
/Users/Braganca/lingspam_public/bare/part6
/Users/Braganca/lingspam_public/bare/part8
creating RDD union
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Training
Accuracy 100.0% (data items: 2604, correct: 2604)
Testing
Accuracy 96.9% (data items: 289, correct: 280)
Training
Accuracy 97.6% (data items: 2604, correct: 2541)
Testing
Accuracy 94.5% (data items: 289, correct: 273)
Training
Accuracy 83.4% (data items: 2604, correct: 2171)
Testing
Accuracy 83.4% (data items: 289

Comment:

Normally we would expect a very high performance of the classifiers when tested on the training data and naturally lower performance on the testing data. But we see that there is no noticeable difference in the performance of the classifiers when tested using the training(83.4%) and testing(83.4%) sets, which would mean that the classifiers are not effective in distinguishing between the two classes and classify every data point as non-spam(majority class). This is for the case when we use lower values of Vector Size 'N' (3, 30). However, for higher values of 'N' (300+) two of the three models achieve much better performance on the training set with Logistic regression(100%) and Naive Bayes(97.4%) with no improvement in performance for the SVM model across all the cases of values of N.

This improvement in performance can be attributed to the size of the hash vector where a larger size allows for fewer collisions and thus accomodation of more words and consequently helps the classifier in learning to better distinguish between the classes, compared to a smaller vector size(N) which results in higher collisions between words and thus a loss of information resulting in lower performance as the data to train the classifier is not as effective.

Another important aspect is the corpus that has been used for the training of the models. A corpus which has not had any pre processing done on it should generally result in lower accuracy values, but in our case the Naive Bayes model achieves a higher accuracy while testing using the test data on the "bare" corpus as compared to the other corpuses and similar results for the Logistic regression model. This might be indicative of other factors that need inspection which might have an effect on the specific chosen models (class imbalance in the data set, ineffective preprocessing).

