# Coursework Part 1: Detecting Spam with Spark

IN432 Big Data coursework 2017, part 1. Classifying messages to detect spam. 



## Task a) & b) Read some files and prepare a (f,w) RDD 


In [8]:
import re 

# USE DEPENDING ON DATASTORE
#prefix = '/data/tempstore/'
prefix = 'hdfs://saltdean.nsqdc.city.ac.uk/data/'

dirPath = prefix + 'spam/bare/part1'


def read_fw_RDD( argDir ): # package tasks a/b into a function for later use
    fwL_RDD = sc.wholeTextFiles(argDir) # read the files
    #print('Read {} files from directory {}'.format(fwL_RDD,argDir)) # status message for testing, can be disabled later on
    #print('file word count histogram') # the histogram can be useful for checking later 
    #print(fwL_RDD.map(lambda fwL: (len(fwL[1]))).histogram([0,10,100,1000,10000]))
    fw_RDD = fwL_RDD.flatMap(lambda f: [(f[0],w.lower()) for w in re.split('\W+', f[1])]) # prescreate tuples of (filename,word) by splitting the words in the file and lower-casing them
    
    return fw_RDD # A fw_RDD should be returned

fw_RDD = read_fw_RDD(dirPath) # for testing
print(fw_RDD.take(3)) # for testing

# result returns [(filename, word)....]

[('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', 'subject'), ('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', 're'), ('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', '2')]


## Task c) Normalised word count lists


In [9]:
from operator import add

def reGrpLst(fw_c): # reorganise the tuples
    fw, c= fw_c
    f,w = fw 
    return (f, [(w,c)]) # Function that will be called to present output in the format (filename, [(word, count)....])
 
def make_f_tfLn_RDD(argDir):  
    fw_RDD = read_fw_RDD( argDir ) # call function from task a & b
    f_wcL1_RDD=fw_RDD.map(lambda fw: (fw,1))  # add integer 1 to each file,word tuple
    f_wcL2_RDD=f_wcL1_RDD.map(lambda fw: fw) 
    f_wcL3_RDD=f_wcL2_RDD.reduceByKey(add)  #reduce and sum 
    f_wcL4_RDD=f_wcL3_RDD.map(reGrpLst) #call function that will reorganise tuple
    f_wcL5_RDD=f_wcL4_RDD.reduceByKey(add) # reduce and sum 
    f_wcL6_RDD=f_wcL5_RDD.map(lambda f_wcl: (f_wcl[0], sum([c for (w,c) in f_wcl[1]]))) #create RDD with (filename ([total count]))
    f_wcL7_RDD=f_wcL5_RDD.join(f_wcL6_RDD) #(join filename([total count] ) with filename ([word, count]...)
    f_wcLn_RDD=f_wcL7_RDD.map(lambda f_wcl: (f_wcl[0], [(w,c/f_wcl[1][1]) for (w,c) in f_wcl[1][0]])) #normalise the count for each word
    return f_wcLn_RDD

f_wcLn_RDD = make_f_tfLn_RDD( prefix + 'spam/bare/part1') # for testing
print(f_wcLn_RDD.take(1)) # for testing
wcLn = f_wcLn_RDD.take(1)[0][1] # get the first normalised word count list
print(sum([cn for (w,cn) in wcLn])) # the sum of normalised counts should be close to 1 


# Result returns [(filename), [(word, normalised count)....]]

[('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-550msg1.txt', [('query', 0.045454545454545456), ('annotext', 0.045454545454545456), ('anyone', 0.045454545454545456), ('internet', 0.045454545454545456), ('', 0.045454545454545456), ('michael', 0.045454545454545456), ('subject', 0.045454545454545456), ('thanks', 0.045454545454545456), ('greek', 0.045454545454545456), ('know', 0.045454545454545456), ('classical', 0.045454545454545456), ('dedicated', 0.045454545454545456), ('sikillian', 0.045454545454545456), ('latin', 0.045454545454545456), ('any', 0.045454545454545456), ('does', 0.045454545454545456), ('lists', 0.09090909090909091), ('or', 0.09090909090909091), ('bitnet', 0.045454545454545456), ('to', 0.045454545454545456)])]
0.9999999999999999


## Task d) Creating hashed feature vectors 


In [10]:
def hashing_vectorizer(word_count_list, N): #hash function that creates hash vector for  each tuple
    v = [0] * N 
    for word_count in word_count_list: 
        word, count = word_count # reorganise tupe by splitting  word and the count
        h = hash(word)  #create hash vector for each word in RDD
        v[h % N] = v[h % N] + count
    return v

def make_f_wVn_RDD(f_wcLn_RDD, argN):
    file_hash=f_wcLn_RDD.map(lambda f_wc: (f_wc[0], hashing_vectorizer(f_wc[1], argN))) # call hash function to create hash vectors for each word
    return file_hash
    
N=100
f_wVn_RDD = make_f_wVn_RDD(make_f_tfLn_RDD(dirPath),N) # for testing
print(f_wVn_RDD.take(1)[0][1]) # for testing
print( sum(f_wVn_RDD.take(1)[0][1])) # for testing

# Result is hashvectors

[0.09090909090909091, 0.09090909090909091, 0, 0.09090909090909091, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0.045454545454545456, 0, 0.045454545454545456, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0.045454545454545456, 0.045454545454545456, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0.09090909090909091, 0.045454545454545456, 0, 0, 0.045454545454545456]
0.9999999999999997


## Task e) Create Labeled Points

In [11]:
from pyspark.mllib.regression import LabeledPoint

def make_lp_RDD(f_tfLn_RDD,argN):
    file_hash = make_f_wVn_RDD(f_tfLn_RDD, argN)  # make a vector
    lp_RDD = file_hash.map(lambda x: LabeledPoint(1 if re.search('spmsg',x[0]) else 0, x[1])) #detect spam by filename  & transform into LabeledPoint objects
    return lp_RDD

lp_RDD = make_lp_RDD(make_f_tfLn_RDD(prefix + 'spam/bare/part1'),100)
print(lp_RDD.take(3)) 

#result in the format: LabelledPoint(0 if not spam or 1 if spam[Hashed vectors])

[LabeledPoint(0.0, [0.0909090909091,0.0909090909091,0.0,0.0909090909091,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0454545454545,0.0,0.0454545454545,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0454545454545,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0909090909091,0.0454545454545,0.0,0.0,0.0454545454545]), LabeledPoint(0.0, [0.00691562932227,0.0138312586445,0.00968188105118,0.0235131396957,0.0,0.0124481327801,0.0179806362379,0.0138312586445,0.00276625172891,0.0193637621024,0.030428769018,0.00138312586445,0.00553250345781,0.00138312586445,0.00414937759336,0.00691562932227,0.00691562932227,0.0553250345781,0.00691562932227,0.0110650069156,0.00829875518672,0.016597

## Task f) Train a classifier 


In [12]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, NaiveBayes
from pyspark.mllib.util import MLUtils

path = prefix + 'spam/stop/part1'

N=100
def trainModel(f_wcL_RDD,N):
    trainData=make_lp_RDD(f_wcL_RDD, N) #call Labelled point function on trianing data
    model=LogisticRegressionWithLBFGS.train(trainData) #train Machine Learning algorithm
    predicted=trainData.map (lambda x: (x.label, model.predict(x.features))) #classify
    count=predicted.count()  #count inputs that are to be classified
    correct = predicted.filter(lambda lp: lp[0]==lp[1]).count() #count correctly classified items
    accuracy=correct/float(count) #calculate classification accuracy
    print('training data items: {}, correct: {}'.format(trainData.count(), correct)) # output raw numbers
    print('training accuracy {:.1%}'.format(accuracy)) # and accuracy
    return model

f_wcLn_RDD = make_f_tfLn_RDD(path) # for testing
model = trainModel(f_wcLn_RDD,N) # for testing

training data items: 289, correct: 289
training accuracy 100.0%


## Task g) Test the classifier


In [13]:
def testModel(model,f_wcL_RDD,N):
    testData=make_lp_RDD(f_wcL_RDD, N) #call labelled point function on test data
    predicted=testData.map(lambda x: (x.label, model.predict(x.features))) # classify test data
    count=predicted.count() #count total inputs to be classified
    correct=predicted.filter(lambda lp: lp[0]==lp[1]).count() # count number correctly classified
    accuracy = correct / float(count)   # calculate proportion correctly classified (accuracy)
    print('test data items: {}, correct:{}'.format(testData.count(),correct))
    print('testing accuracy {:.1%}'.format(accuracy))

testModel(model,make_f_tfLn_RDD('hdfs://saltdean/data/spam/stop/part10'),N) # for testing

test data items: 291, correct:219
testing accuracy 75.3%


## Task h) Run experiments 



In [15]:
# this function combines tasks f) and g)
def trainTestModel(trainPaths,testPath,N):
    model=trainModel(make_f_tfLn_RDD(trainPaths),N) #combine training and testing 
    testModel(model,make_f_tfLn_RDD(testPath),N)
    return model                 
                
# prepare the part directories and the path
dirPattern = 'hdfs://saltdean/data/spam/bare/part[1-{}]' # the {} can be filled by 'dirPattern.format(i)' 
# create the path for the test set
testPath = 'hdfs://saltdean/data/spam/bare/part10'

print('EXPERIMENT 1: Testing different training set sizes')
print('Path = {}, N = {}'.format(dirPattern,N)) # using format to make sure we record the parameters of the experiment
#<<< make the test set, it will be constant for this experiment
for i in range(1,10): # loop over i the number of parts for training (1-9)
    trainPaths = dirPattern.format(i) # in the loop you can create a path like this
    print(trainPaths) #just for testing, remove later
    trainTestModel(trainPaths,testPath,N) # create the trainRDD (using your make_f_tfLn_RDD method)
    

print('\nEXPERIMENT 2: Testing different vector sizes')
array= [3,10,30,100,300] #<<< loop over different values for N. 3,10,30,100,300, ... is a good pattern
for i in range (0,5):
    print('=== N = ', array[i])
    trainTestModel(trainPaths,testPath,array[i])

N = 100 # change to what you feel is a good compromise between computation and accuracy
# the dictionary below helps associate description and paths.
setDict = {'No preprocessing': prefix + 'spam/bare/',
           'Stopwords removed': prefix + 'spam/stop/',
           'Lemmatised': prefix + 'spam/lemm/',
           'Lemmatised and stopwords removed': prefix + 'spam/lemm_stop/'}

print('\nEXPERIMENT 3: Testing differently preprocessed data sets')
print('training on parts 1-9, N = {}'.format(N))
for sp in setDict:
    print('=== ',sp)
    trainPaths = setDict[sp] + 'part[1-9]' #make training data 1-9
    print(trainPaths)
    trainTestModel(trainPaths, testPath, N) # test data
    
print('\n====== Done ======')

EXPERIMENT 1: Testing different training set sizes
Path = hdfs://saltdean/data/spam/bare/part[1-{}], N = 100
hdfs://saltdean/data/spam/bare/part[1-1]
training data items: 289, correct: 289
training accuracy 100.0%
test data items: 291, correct:254
testing accuracy 87.3%
hdfs://saltdean/data/spam/bare/part[1-2]
training data items: 578, correct: 578
training accuracy 100.0%
test data items: 291, correct:266
testing accuracy 91.4%
hdfs://saltdean/data/spam/bare/part[1-3]
training data items: 867, correct: 867
training accuracy 100.0%
test data items: 291, correct:265
testing accuracy 91.1%
hdfs://saltdean/data/spam/bare/part[1-4]
training data items: 1156, correct: 1156
training accuracy 100.0%
test data items: 291, correct:265
testing accuracy 91.1%
hdfs://saltdean/data/spam/bare/part[1-5]
training data items: 1446, correct: 1446
training accuracy 100.0%
test data items: 291, correct:264
testing accuracy 90.7%
hdfs://saltdean/data/spam/bare/part[1-6]
training data items: 1735, correct: 