# Coursework Part 1: Detecting Spam with Spark

These are the tasks for IN432 Big Data coursework 2018, part 1.  

This coursework is about classification of e-mail messages as spam or non-spam in Spark. We will go through the whole process from loading preprocessing to training and testing classifiers in a distributed way in Spark. We wil use the techniques shown in the lextures and labs. I will also introduce here a few additional elements, such as the NLTK and some of the preprocessing and machine learning functions that come with Spark. You are not expected to need anything beyond the material handed out so far and in some cases the Spark documentation, to which I have put links in this document.  

The structure is similar to the lab sheets. I provide a code structure with gaps that you are supposed to file. In addition you should run 2 small experiments and comment on the results. The lines where you are supposed to add code or take another action are marked with ">>>" 
please leave the ">>>" in the text, comment out that line, and write your own code in the next line using a copy of that line as a starting point.

I have added numerous comments in text cells and the code cells to guid you through the program. Please read them carefully and ask if anything is unclear. 

Once you have completed the tasks, don't delete the outpus, but downlaod the notebook (outputs will be included) and upload it into the coursework submission area on Moodle. The coursework part counts for 25% or the total coursework.

## Load and prepare the data

We will use the lingspam dataset in this coursework (see [http://csmining.org/index.php/ling-spam-datasets.html](http://csmining.org/index.php/ling-spam-datasets.html) for more information).

The next cell is only needed if you haven't cloned the repository in week 2 or later (but it doesn't do harm to run it). 

In [1]:
%cd ~/notebook/work/
!git clone https://github.com/tweyde/City-Data-Science.git

/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work
fatal: destination path 'City-Data-Science' already exists and is not an empty directory.


In [2]:
%cd ~/notebook/work/City-Data-Science/
!git pull
%cd ./datasets/ 
#we need to use a magic command (starting with '%') here.
print(">>> Extracting the ling_spam dataset, this can take a moment.")
!tar -xf lingspam_public02.tar.gz
# '!' calls a program on the machine (the DSX service runs on Linux machines).
print(">>> Unzipping finished.")
# We now have a new dataset in directory 'bare'.
%cd lingspam_public/bare 
print(">>> pwd ")
!pwd
print(">>> ls ")
!ls
# the line before last of output should show "part1 part10 part2  part3  part4  part5  part6  part7 part8 part9"
%cd ..

/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science
Already up-to-date.
/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets
>>> Extracting the ling_spam dataset, this can take a moment.
>>> Unzipping finished.
/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets/lingspam_public/bare
>>> pwd 
/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets/lingspam_public/bare
>>> ls 
part1  part10  part2  part3  part4  part5  part6  part7  part8	part9
/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets/lingspam_public


In [3]:
!pwd

/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets/lingspam_public


### Tools for Troubleshooting

Normally, DSX works reliably, but there are two issues that have occured. We have solutions for them that you can use with the following cells. 
If other problems occur, reloading the page and/or restarting the Kernel can help. 

In [4]:
# try this in case of "sc undefined" or similar errors, should normally not be necessary.
from pyspark import SparkContext
sc = spark.sparkContext

In [5]:
# sometimes, when you have multiple notebooks open at the same time, you might get an error that the metastore_db is not accessible.
# We can not prevent this form happening on DSX (apart from not opening more than one notebook at a time).
# If it does happen you need to delete the metastore_db. The path of the metastore_db is in the error messages and it's typically 
# lond like this example: 
# /gpfs/global_fs01/sym_shared/YPProdSpark/user/s832-dfe96c6e1f1d61-70d619a53771/notebook/jupyter-rt/kernel-cdcf5f73-9afb-481d-ac40-a210a649eb69-20180222_154448/metastore_db
# once you have it, you can use it with !rm -Rf to delete it:
#!rm -Rf /gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/jupyter-rt/kernel-cdcf5f73-9afb-481d-ac40-a210a649eb69-20180222_154448/metastore_db

## Task 1) Read the dataset and create RDDs 
a) Start by reading the directory with text files from the file system (`~/notebook/work/City-Data-Science/datasets/bare`). Load all text files per dirctory (part1,part2, ... ,part10) using `wholeTextFiles()`, which creates one RDD per part, containing tuples (filename,text). This is a good choice as the text files are small. (5%)

b) We will use one of the RDDs as test set, the rest as training set. For the training set you need to create the union of the remaining RDDs. (5%)

b) Remove the path and extension from the filename using the regular expression provided (5%).

If the filename starts with 'spmsg' it is spam, otherwise it is not. We'll use that later to train a classifier. 

We will put the code in each cell into a function that we can reuse later. In this way we can develop the whole preprocessing with the smaller test set and apply it to the training set once we know that everything works. 

In [6]:
from pathlib import Path
import re

def makeTestTrainRDDs(pathString):
    """ Takes one of the four subdirectories of the lingspam dataset and returns two RDDs one each for testing and training. """
    # We should see10 parts that we can use for creating train and test sets.
    p = Path(pathString) # gets a path object representing the current directory path.
    dirs = list(p.iterdir()) # get the directories part1 ... part10. 
    #print(dirs) # Print to check that you have the right directory. You can comment this out when checked. 
    rddList = [] # create a list for the RDDs
    # now create an RDD for each 'part' directory and add them to rddList
    for d in dirs: # iterate through the directories
#>>>     rdd = ... #>>> # read the files in the directory 
        rdd = sc.wholeTextFiles(str(d.absolute()))  #>>> # read the files in the directory 
#>>>     ... #>>> append the RDD to the rddList
        rddList.append(rdd) #>>> append the RDD to the rddList
    print('len(rddList)',len(rddList))  # we should now have 10 RDDs in the list # just for testing
    #print(rddList[1].take(1)) # just for testing, comment out when it works.
    testRDD1 = rddList[9] # set the test set
    trainRDD1 = rddList[0] # start the training set from 0 and 
    # now loop over the range from 1 to 9(exclusive) to create a union of the remaining RDDs
    for i in range(1,9):
        trainRDD1 = trainRDD1.union(rddList[i]) #>>> create a union of the current and the next 
            # RDD in the list, so that in the end we have a union of all parts 0-8. (9 ist used as test set)
    # both RDDs should remove the paths and extensions from the filename. 
    #>>> This regular expression will do it: re.split('[/\.]', fn_txt[0])[-2]
    #>>> apply it to the filenames in train and test RDD with a lambda
#>>>    testRDD2 = testRDD1.map(lambda ...) 
    testRDD2 = testRDD1.map(lambda fn_txt: (re.split('[/\.]', fn_txt[0])[-2],[fn_txt[1]])) 
#>>>    trainRDD2 = trainRDD1.map(lambda ...) 
    trainRDD2 = trainRDD1.map(lambda fn_txt: (re.split('[/\.]', fn_txt[0])[-2],[fn_txt[1]]))
    return (trainRDD2,testRDD2)

# this makes sure we are in the right directory
%cd ~/notebook/work/City-Data-Science/datasets/lingspam_public/
# this should show "bare  lemm  lemm_stop  readme.txt  stop"
!ls
# the code below is for testing the function makeTestTrainRDDs
trainRDD_testRDD = makeTestTrainRDDs('bare') # read from the 'bare' directory - this takes a bit of time
(trainRDD,testRDD) = trainRDD_testRDD # unpack the returned tuple
print('created the RDDs') # notify the user, so that we can figure out where things went wrong if they do.
print('testRDD.count(): ',testRDD.count()) # should be 290 
print('trainRDD.count(): ',trainRDD.count()) # should be 2603 - commented out to save time
print('testRDD.getNumPartitions()',testRDD.getNumPartitions()) # normally 2 on DSX
print('testRDD.getStorageLevel()',testRDD.getStorageLevel()) # Serialized 1x Replicated on DSX
print('testRDD.take(1): ',testRDD.take(1)) # should be (filename,[tokens]) 
rdd1 = testRDD # use this for developemnt in the next tasks 

/gpfs/global_fs01/sym_shared/YPProdSpark/user/s82c-284281499e6876-47e231f9ebd0/notebook/work/City-Data-Science/datasets/lingspam_public
bare  lemm  lemm_stop  readme.txt  stop
len(rddList) 10
created the RDDs
testRDD.count():  291
trainRDD.count():  2602
testRDD.getNumPartitions() 2
testRDD.getStorageLevel() Serialized 1x Replicated
testRDD.take(1):  [('9-66msg1', ["Subject: xth conference of nordic and general ling .\n\nthe tenth conference of nordic and general linguistics will be held in reykjavik , iceland , from saturday june 6 , to monday june 8 , 1998 . it is organized by the institute of linguistics , university of iceland . the deadline for pre-registration at a reduced price is january 31 , 1998 . pre - registration forms and further information can be found on our web site ( http : / / www . rhi . hi . is / ~ nordconf ) and can also be mailed or e-mailed upon request . papers on any linguistic topic are invited , especially papers on synchronic and diachronic aspects of the 

## Task 2) Tokenize and remove punctuation

Now we need to split the words, a process called *tokenization* by linguists, and remove punctuation. 

We will use the Python [Natural Language Toolkit](http://www.nltk.org) *NLTK* to do the tokenization (rather than splitting ourselves, as these specialist tools usually do that we can ourselves). We use the NLTK function word_tokenize, see here for a code example: [http://www.nltk.org/book/ch03.html](http://www.nltk.org/book/ch03.html). 5%

Then we will remove punctuation. There is no specific funtion for this, so we use a regular expression (see here for info [https://docs.python.org/3/library/re.html?highlight=re#module-re](https://docs.python.org/3/library/re.html?highlight=re#module-re)) in a list comprehension (here's a nice visual explanation: [http://treyhunner.com/2015/12/python-list-comprehensions-now-in-color/](http://treyhunner.com/2015/12/python-list-comprehensions-now-in-color/)). 5% 

We use a new technique here: we separate keys and values of the RDD, using the RDD functions `keys()` and `values()`, which yield each a new RDD. Then we process the values and *zip* them together with the keys again. See here for documentation: [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.zip](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.zip).  We wrap the whole sequence into one function `prepareTokenRDD` for later use. 5%

In [7]:
import nltk
import re
from nltk.corpus import stopwords

def tokenize(text):
    """ Apply the nltk.word_tokenize() method to our text, return the token list. """
    nltk.download('punkt') # this loads the standard NLTK tokenizer model 
    # it is important that this is done here in the function, as it needs to be done on every worker.
    # If we do the download outside a this function, it would only be executed on the driver     
#>>>    return ... # use the nltk function word_tokenize
    return nltk.word_tokenize(text[0])
    
def removePunctuation(tokens):
    """ Remove punctuation characters from all tokens in a provided list. """
    # this will remove all punctiation from string s: re.sub('[()\[\],.?!";_]','',s)
#>>>    tokens2 =  [...] # use a list comprehension to remove punctuaton
    tokens2 = [re.sub('[()\[\],.?!";_]','',s) for s in tokens]
    return tokens2
    
def prepareTokenRDD(fn_txt_RDD):
    """ Take an RDD with (filename,text) elements and transform it into a (filename,[token ...]) RDD without punctuation characters. """
    rdd_vals2 = fn_txt_RDD.values() # It's convenient to process only the values. 
    rdd_vals3 = rdd_vals2.map(tokenize) # Create a tokenised version of the values by mapping
    rdd_vals4 = rdd_vals3.map(removePunctuation) # remove punctuation from the values
    rdd4 = fn_txt_RDD.keys().zip(rdd_vals4) # we zip the two RDDs together 
    # i.e. produce tuples with one itme from each RDD.
    # This works because we have only applied mappings to the values, 
    # therefore the items in both RDDs are still aligned.
    # now remove any empty value strings (i.e. length 0) that we may have created by removing punctiation.
    # >>> now remove any empty strings (i.e. length 0) that we may have 
    # created by removing punctuation, and resulting entries without words left.
    #rdd5 = rdd4. # remove empty strings using RDD.map and a lambda. TIP len(s) gives you the lenght of string. 
    rdd5 = rdd4.map(lambda fn_txt: (fn_txt[0], [token for token in fn_txt[1] if len(token)>0]))
    #rdd6 = rdd5. # remove items without tokens using RDD.filter and a lambda. 
    rdd6= rdd5.filter(lambda x: x[1] != [])           
    # >>> Question: why should this be filtering done after zipping the keys and values together?
    return rdd6
    
    # >>> Question: why should this be filtering done after zipping the keys and values together?
    # Answer: Filtering should be done after zipping because otherwise the items are no longer aligned between the values and keys


rdd2 = prepareTokenRDD(rdd1) # Use the test set for now, because it is smaller
print(rdd2.take(1)) # For checking result of task 2. 

[('9-66msg1', ['Subject', ':', 'xth', 'conference', 'of', 'nordic', 'and', 'general', 'ling', 'the', 'tenth', 'conference', 'of', 'nordic', 'and', 'general', 'linguistics', 'will', 'be', 'held', 'in', 'reykjavik', 'iceland', 'from', 'saturday', 'june', '6', 'to', 'monday', 'june', '8', '1998', 'it', 'is', 'organized', 'by', 'the', 'institute', 'of', 'linguistics', 'university', 'of', 'iceland', 'the', 'deadline', 'for', 'pre-registration', 'at', 'a', 'reduced', 'price', 'is', 'january', '31', '1998', 'pre', '-', 'registration', 'forms', 'and', 'further', 'information', 'can', 'be', 'found', 'on', 'our', 'web', 'site', 'http', ':', '/', '/', 'www', 'rhi', 'hi', 'is', '/', '~', 'nordconf', 'and', 'can', 'also', 'be', 'mailed', 'or', 'e-mailed', 'upon', 'request', 'papers', 'on', 'any', 'linguistic', 'topic', 'are', 'invited', 'especially', 'papers', 'on', 'synchronic', 'and', 'diachronic', 'aspects', 'of', 'the', 'nordic', 'languages', 'invited', 'speakers', ':', 'anders', 'holmberg', 't

Question: why should this be filtering done after zipping the keys and values together? <br>
Answer: Filtering should be done after zipping because otherwise the items are no longer aligned between the values and keys

## Task 3) Creating normalised TF.IDF vectors of defined dimensionality, measure the effect of caching.

We use the hashing trick to create fixed size TF vectors directly from the word list now (slightly different from the previous lab, where we used *(word,count)* pairs.). Write a bit of code as needed. (5%)

Then we'll use the IDF and Normalizer functions provided by Spark. They use a slightly different pattern than RDD.map and reduce, have a look at the examples here in the documentation for Normalizer  and IDF:
[http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.feature.Normalizer](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.feature.Normalizer), [http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.feature.IDF) (5%)

We want control of the dimensionality in the `normTFIDF` function, so we introduce an argument into our functions that enables us to vary dimensionalty later. Here is also an opportunity to benefit from caching, i.e. persisting the RDD after use, so that it will not be recomputed.  (5%)

In [8]:
# use the hashing trick to create a fixed-size vector from a word list


def hashing_vectorize(text,N): # arguments: the list and the size of the output vector
    v = [0] * N  # create vector of 0s
    for word in text: # iterate through the words 
#>>>              # get the hash value 
        h = hash(word)
#>>>              # add 1 at the hashed address 
        v[h%N] +=1
    return v # return hashed word vector

#hashing_vectorize() # for testing

from pyspark.mllib.feature import IDF, Normalizer

def normTFIDF(fn_tokens_RDD, vecDim, caching=True):
    keysRDD = fn_tokens_RDD.keys()
    tokensRDD = fn_tokens_RDD.values()
    tfVecRDD = tokensRDD.map(lambda tokens: hashing_vectorize(tokens,vecDim)) #>>> passing the vecDim value. TIP: you need a lambda. 
    if caching:
        tfVecRDD.persist(StorageLevel.MEMORY_ONLY) # since we will read more than once, caching in Memory will make things quicker.
    idf = IDF() # create IDF object
    idfModel = idf.fit(tfVecRDD) # calculate IDF values
    tfIdfRDD = idfModel.transform(tfVecRDD) # 2nd pass needed (see lecture slides), transforms RDD
#>>>    norm = ... # create a Normalizer object like in the example linked above
    norm = Normalizer(p=2.0)
#>>>    normTfIdfRDD = norm. ... # and apply it to the tfIdfRDD 
    normTfIdfRDD = norm.transform(tfIdfRDD)
#>>>    zippedRDD = ... # zip the keys and values together
    zippedRDD = keysRDD.zip(normTfIdfRDD)
    return zippedRDD

testDim = 10 # too small for good accuracy, but OK for testing
rdd3 = normTFIDF(rdd2, testDim, True) # test our
print(rdd3.take(1)) # we should now have tuples with ('filename',[N-dim vector])
# e.g. [('9-1142msg1', DenseVector([0.0, 0.0, 0.0, 0.0, 0.4097, 0.0, 0.0, 0.0, 0.9122, 0.0]))]

[('9-66msg1', DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]))]


### Task 3a) Caching experiment

The normTFIDF let's us switch caching on or off. Write a bit of code that measures the effect of caching by takes the time for both options. Use the time function as shown in lecture 3, slide 47. Remember that you need to call an action on an RDD to trigger full execution. 

Add a short comment on the result (why is there an effect, why of the size that it is?). Remember that this is wall clock time, i.e. you may get noisy results. (10%)

In [9]:
#run a small experiment with caching set to True or False, 3 times each

from time import time

resCaching = [] # for storing results
resNoCache = [] # for storing results
for i in range(3): # 3 samples
#>>>  # start timer
    startTime = time()
    testRDD1 = normTFIDF(rdd2, testDim, True) # 
#>>>     # call an action on the RDD to force execution
    testRDD1.take(1)
#>>>  # end timer
    endTime = time()
    resCaching.append( endTime - startTime ) # calculate the difference
    
#>>>  # start timer
    startTime = time()
    testRDD2 = normTFIDF(rdd2, testDim, False) 
#>>>  # call an action to force execution   
    testRDD2.take(1)
#>>>  # end timer
    endTime = time()
    resNoCache.append( endTime - startTime )
    
#>>> meanTimeCaching = # calculate average times
    meanTimeCaching = sum(resCaching)/len(resCaching)
#>>> meanTimeNoCache = # calculate average times
    meanTimeNoCache = sum(resNoCache)/len(resNoCache)

print('Creating TF.IDF vectors, 3 trials - mean time with caching: ', meanTimeCaching, ', mean time without caching: ', meanTimeNoCache)
#>>> # add your results and comments here 
# reating TF.IDF vectors, 3 trials - mean time with caching:  13.617048660914103 , mean time without caching:  20.574780702590942 
# Without caching intermediate results are not stored in memory and have to be retrieved from disk, as reading from disk is slower than from memory
# wee see a improvement in performance when enabeling caching. As the normTFIDF performs two operations on the rdd, enabeling caching should speed up 
# the executiong time by a factor of roughly two. The wall-clock-time noise has an impact on the the measured time, hence our results only display an
# increase by a factor of 1.51.

Creating TF.IDF vectors, 3 trials - mean time with caching:  14.236143112182617 , mean time without caching:  19.288512309392292


## Task 4) Create LabeledPoints 

Determine whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (non-spam) accordingly. Use `RDD.map()` to create an RDD of LabeledPoint objects. See here [http://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#logistic-regression](http://spark.apache.org/docs/2.1.0/mllib-linear-methods.html#logistic-regression) for an example, and here [http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint) for the `LabeledPoint` documentation. (10%)

There is a handy function of Python strings called startswith: e.g. 'abc'.startswith('ab) will return true. The relevant Python syntax here is a conditional expression: **``<a> if <yourCondition> else <b>``**, i.e. 1 if the filename starts with 'spmsg' and otherwise 0.

In [10]:
from pyspark.mllib.regression import LabeledPoint

# creatate labelled points of vector size N out of an RDD with normalised (filename [(word,count), ...]) items
def makeLabeledPoints(fn_vec_RDD): # RDD and N needed 
    # we determine the true class as encoded in the filename and represent as 1 (samp) or 0 (good)
#>>>    
    cls_vec_RDD = fn_vec_RDD.map(lambda fn: (1 if fn[0].startswith('spmsg') else 0, fn[1])) # use a conditional expression to get the class label (True or False)    
    # now we can create the LabeledPoint objects with (class,vector) arguments
    lp_RDD = cls_vec_RDD.map(lambda cls_vec: LabeledPoint(cls_vec[0],cls_vec[1]) ) 
    return lp_RDD 

# for testing
testLpRDD = makeLabeledPoints(rdd3) 
print(testLpRDD.take(1)) 
# should look similar to this: [LabeledPoint(0.0, [0.0,0.0,0.0,0.0,0.40968062880166006,0.0,0.0,0.0,0.9122290186048,0.0])]

[LabeledPoint(0.0, [0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0])]


## Task 5) Complete the preprocessing 

It will be useful to have a single function to do the preprocessing. So integrate everything here. (5%)

In [11]:
# now we can apply the preprocessing chain to the data loaded in task 1 
# N is for controlling the vector size
def preprocess(rawRDD,N):
    """ take a (filename,text) RDD and transform into LabelledPoint objects 
        with class labels and a TF.IDF vector with N dimensions. 
    """
    tokenRDD = prepareTokenRDD(rawRDD) # task 2
    tfIdfRDD = normTFIDF(tokenRDD,N) # task 3
    lpRDD = makeLabeledPoints(tfIdfRDD) # task 4
    return lpRDD # return RDD with LabeledPoints

# and with this we can start the whole process from a directory, N is again the vector size
def loadAndPreprocess(directory,N):
    """ load lingspam data from a directory and create a training and test set of preprocessed data """
#>>>    trainRDD_testRDD = ... # read from the directory using the function created in task 1
    trainRDD_testRDD = makeTestTrainRDDs(directory) # read from the directory using the function created in task 1

#>>>     # unpack the returned tuple
    (trainRDD,testRDD) = trainRDD_testRDD # unpack the returned tuple
    return (preprocess(trainRDD,N),preprocess(testRDD,N)) # apply the preprocessing funcion defined above

trainLpRDD = preprocess(trainRDD,testDim) # prepare the training data
print(testLpRDD.take(1)) # should look similar to previous cell's output

train_test_LpRDD = loadAndPreprocess('lemm',100) # let's re-run with another vector size
(trainLpRDD,testLpRDD) = train_test_LpRDD 
print(testLpRDD.take(1))
print(trainLpRDD.take(1))

[LabeledPoint(0.0, [0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0])]
len(rddList) 10
[LabeledPoint(0.0, [0.0173065598238,0.0450282043828,0.07637472436,0.0428900460529,0.0730278218162,0.13845247859,0.207468110614,0.0907447439736,0.099848432519,0.15975749203,0.0568299720098,0.175748800957,0.173650740836,0.0,0.0841990489641,0.0318533592129,0.0271491006488,0.022397244891,0.0288613186302,0.0650231550584,0.237523491514,0.0431219297594,0.0,0.134049457572,0.0,0.0831523206436,0.0985569335186,0.0236171609595,0.0658853314433,0.141869820965,0.138587201073,0.0767548415888,0.0817292485974,0.129408153043,0.160954127147,0.0,0.120059729193,0.116597153518,0.0420995244821,0.0692262392951,0.0546558190972,0.0692262392951,0.0293894980059,0.0880784281569,0.148602906238,0.14718449106,0.0453723719868,0.0364799780137,0.143740136859,0.0335358779938,0.179727178534,0.104190444502,0.0732586401789,0.0,0.0440392140784,0.0667245056621,0.0394544471286,0.106332536461,0.0526589109777,0.0547199670205,0.0335358779938,0.264932083

## Task 6) Train some classifiers 

Use the `LabeledPoint` objects to train a classifier, specifically the *LogisticRegression*, *Naive Bayes*, and *Support Vector Machine*. Calculate the accuracy of the model on the training set (again, follow this example [http://spark.apache.org/docs/2.1.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) and here is the documentation for the classifiers [LogisticRegressionWithLBFGS](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS), [NaiveBayes](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayes), [SVMWithSGD](http://spark.apache.org/docs/2.1.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.SVMWithSGD).  (10%) 

In [12]:
# Reset metastore_db if required
#!rm -rf /gpfs/global_fs01/sym_shared/YPProdSpark/user/sdbc-86020865f0bcf7-1f3bf7500109/notebook/jupyter-rt/kernel-4fc3b08d-9659-493a-8319-4e566bafcb3c-20180312_131858/metastore_db

In [13]:
from pyspark.mllib.classification import (NaiveBayes, LogisticRegressionWithLBFGS, SVMWithSGD) 
import numpy

# train the model with an (f,[(w,c), ...]) RDD. This is practical as we can reuse the function for TF.IDF
def trainModel(lpRDD):
    """ Train 3 classifier models on the given RDD with LabeledPoint objects. A list of trained model is returned. """
    lpRDD.persist(StorageLevel.MEMORY_ONLY) # not really needed as the Spark implementations ensure caching themselves. 
                    # Other implementations might not, however. 
    # Train a classifier model.
    print('Starting to train the model') # give some immediate feedback
    model1 = LogisticRegressionWithLBFGS.train(lpRDD) # this is the best model
    print('Trained LR (model1)')
    #print('type(model1)')
    model2 = NaiveBayes.train(lpRDD) # doesn't work well
    print('Trained NB (model2)')
    #print(type(model2))
    model3 = SVMWithSGD.train(lpRDD) # or this ...
    print('Trained SVM (model3)')
    return [model1,model2,model3]

def testModel(model, lpRDD):
    """ Tests the classificatio accuracy of the given model on the given RDD with LabeledPoint objects. """
    lpRDD.persist(StorageLevel.MEMORY_ONLY)
    # Make prediction and evaluate training set accuracy.
    # Get the prediction and the ground truth label
    predictionAndLabel = lpRDD.map(lambda p: (model.predict(p.features), p.label)) # get the prediction and ground truth (label) for each item.
    correct = predictionAndLabel.filter(lambda xv: xv[0] == xv[1]).count() # count the correct predictions 
#>>>    accuracy =  # and calculate the accuracy 
    accuracy = correct/(lpRDD.count())
    print('Accuracy {:.1%} (data items: {}, correct: {})'.format(accuracy,lpRDD.count(), correct)) # report to console
    return accuracy # and return the value  

models = trainModel(trainLpRDD) # just for testing
testModel(models[2], trainLpRDD) # just for testing

Starting to train the model


IllegalArgumentException: "Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"

## Task 7) Automate training and testing

We automate now the whole process from reading the files, through preprocessing, and training up to evaluating the models. In the end we have a single function that takes all the parameters we are interested in and produces trained models and an evaluation. (5%) 

In [21]:
# this function combines tasks f) and g)
# this method should take RDDs with (f,[(w,c), ...])
def trainTestModel(trainRDD,testRDD):
    """ Trains 3 models and tests them on training and test data. Returns a matrix the training and testing (rows) accuracy values for all models (columns). """
#>>> models = ... # train models on the training set
    models = trainModel(trainRDD) # train models on the training set
    #results = [[]]*2 # matrix for 2 modes (training/test) vs n models (currently 3)
    results = [[],[]] # matrix for 2 modes (training/test) vs n models (currently 3)
    for mdl in models:
        print('Training')
#>>>        results[0].append(...) # test the model on the training set
        results[0].append(testModel(mdl,trainRDD)) # test the model on the training set
        print('Testing')
#>>>        results[1].append(...) # test the model on the test set
        results[1].append(testModel(mdl,testRDD)) # test the model on the test set
    return results

def trainTestFolder(folder,N):
    """ Reads data from a folder, preproceses the data, and trains and evaluates models on it. """
    print('Start loading and preprocessing') 
    train_test_LpRDD = loadAndPreprocess(folder,N) # create the RDDs
    print('Finished loading and preprocessing')
    (trainLpRDD,testLpRDD) = train_test_LpRDD # unpack the RDDs 
    return trainTestModel(trainLpRDD,testLpRDD) # train and test

trainTestFolder('lemm',1000) 

Start loading and preprocessing
[PosixPath('lemm/part5'), PosixPath('lemm/part8'), PosixPath('lemm/part2'), PosixPath('lemm/part9'), PosixPath('lemm/part3'), PosixPath('lemm/part6'), PosixPath('lemm/part1'), PosixPath('lemm/part4'), PosixPath('lemm/part7'), PosixPath('lemm/part10')]
len(rddList) 10
[('file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/sdbc-86020865f0bcf7-1f3bf7500109/notebook/work/City-Data-Science/datasets/lingspam_public/lemm/part8/6-939msg2.txt', 'Subject: child language acquistion\n\ni be look for information on elicitation technique and grammaticality judgement for 2 - 4 year old child . i would be grateful for your help on thus s subject . cathy finlay . university of ulster .\n')]
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Training
Accuracy 100.0% (data items: 2602, correct: 2602)
Testing
Accuracy 97.6% (data items: 291, correct: 284)
Training
Accuracy 94.3% (data items: 2602, corre

[[1.0, 0.9431206764027671, 0.8339738662567256],
 [0.9759450171821306, 0.9209621993127147, 0.8316151202749141]]

## Task 8) Run experiments 

We have now a single function that allows us to vary the vector size easily. Test vector sizes 5, 50, 500, 5000, 50000 and examine the effect on the classification accuracy in Experiment 1.

Use the function from Task 7) to test different data types. The dataset has raw text in folder `bare`, lemmatised text in  `lemm` (similar to stemming, reduces to basic word forms), `stop` (with stopwords removed), and `lemm_stop` (lemmatised and stopwords removed). Test how the classification accuracy differs for these four data types in Experiment 2. Collect the results in a data structure that can be saved for later saving and analyis.

Comment on the results in a few sentences, considering the differences in performance between the different conditions as well as train an test values. 15%

In [22]:
from pyspark.sql import DataFrame

folder = 'bare'
N = numpy.array([3,30,300,3000,30000]) 
print('\nEXPERIMENT 1: Testing different vector sizes')
results = []
for n in N:
    print('N = {}'.format(n))
   # ... = trainTestFolder(folder,n)
    results.append(trainTestFolder(folder,n))
    #N = trainTestFolder(folder,n)
    
n = 3000
typeFolders = ['bare','stop','lemm','lemm_stop']
print('EXPERIMENT 2: Testing different data types')
for folder in typeFolders:
    print('Path = {}'.format(folder))
    #... = trainTestFolder(folder,n)
    results.append(trainTestFolder(folder,n))
    
# Add comments on the performance in a cell below. 


EXPERIMENT 1: Testing different vector sizes
N = 3
Start loading and preprocessing
[PosixPath('bare/part5'), PosixPath('bare/part8'), PosixPath('bare/part2'), PosixPath('bare/part9'), PosixPath('bare/part3'), PosixPath('bare/part6'), PosixPath('bare/part1'), PosixPath('bare/part4'), PosixPath('bare/part7'), PosixPath('bare/part10')]
len(rddList) 10
[('file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/sdbc-86020865f0bcf7-1f3bf7500109/notebook/work/City-Data-Science/datasets/lingspam_public/bare/part8/6-939msg2.txt', 'Subject: child language acquistion\n\ni am looking for information on elicitation techniques and grammaticality judgements for 2 - 4 year old children . i would be grateful for your help on thi s subject . cathy finlay . university of ulster .\n')]
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM (model3)
Training
Accuracy 83.4% (data items: 2602, correct: 2170)
Testing
Accuracy 83.2% (data items: 291, corr

Accuracy 93.8% (data items: 291, correct: 273)
Training
Accuracy 83.4% (data items: 2602, correct: 2170)
Testing
Accuracy 83.2% (data items: 291, correct: 242)
Path = lemm
Start loading and preprocessing
[PosixPath('lemm/part5'), PosixPath('lemm/part8'), PosixPath('lemm/part2'), PosixPath('lemm/part9'), PosixPath('lemm/part3'), PosixPath('lemm/part6'), PosixPath('lemm/part1'), PosixPath('lemm/part4'), PosixPath('lemm/part7'), PosixPath('lemm/part10')]
len(rddList) 10
[('file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/sdbc-86020865f0bcf7-1f3bf7500109/notebook/work/City-Data-Science/datasets/lingspam_public/lemm/part8/6-939msg2.txt', 'Subject: child language acquistion\n\ni be look for information on elicitation technique and grammaticality judgement for 2 - 4 year old child . i would be grateful for your help on thus s subject . cathy finlay . university of ulster .\n')]
Finished loading and preprocessing
Starting to train the model
Trained LR (model1)
Trained NB (model2)
Trained SVM

## Task 8) (Cont.) Comments on Performance




In [60]:
print(results)

[[[0.8339738662567256, 0.8339738662567256, 0.8339738662567256], [0.8316151202749141, 0.8316151202749141, 0.8316151202749141]], [[0.8547271329746349, 0.8339738662567256, 0.8339738662567256], [0.865979381443299, 0.8316151202749141, 0.8316151202749141]], [[1.0, 0.862413528055342, 0.8339738662567256], [0.9690721649484536, 0.8350515463917526, 0.8316151202749141]], [[1.0, 0.9750192159877018, 0.8339738662567256], [0.9759450171821306, 0.9587628865979382, 0.8316151202749141]], [[1.0, 0.8608762490392006, 0.834358186010761], [0.9828178694158075, 0.845360824742268, 0.8316151202749141]], [[1.0, 0.9750192159877018, 0.8339738662567256], [0.9759450171821306, 0.9587628865979382, 0.8316151202749141]], [[1.0, 0.968870099923136, 0.8339738662567256], [0.9759450171821306, 0.9381443298969072, 0.8316151202749141]], [[1.0, 0.9742505764796311, 0.8339738662567256], [0.9759450171821306, 0.9484536082474226, 0.8316151202749141]], [[1.0, 0.9692544196771714, 0.8339738662567256], [0.979381443298969, 0.9243986254295533

The best performing model on test set was found to be LogisticRegressionWithLBFGS with N = 30000 as vector size on the bare dataset, which resulted in 98.3% accuracy on test, and 100% accuracy on the training set. This shows that, with respect to the logistic regression model, an increase in the vector size leads to an increase in performance. In order to reach 100% in training accuracy, a vector size of 300 was required. This training accuracy score remained for increasing sizes of n.
One can infer from this that, with vector sizes smaller than 300, some valueable information is lost. 
However varying the dataset by way of vocabulary selection methods for dimension reduction, from bare, to stop, to lemm, to lemm_stop, did not have such a marked improvement on performance (with n = 3000, kept steady to enable data type comparisons). Accuracy scores remained at 97.6% for testing (100% for training) for the raw text in the bare folder, and for stop and lemm folders. But a slight improvement of 0.3% for testing can be seen for lemm_stop. This illustrates that Logistic Regression is not adversely affected in performance by noise in the data.
With regard to vector size, Naive Bayes performed best with n=3000 (on the bare set), with an accuracy score of 95.9% for testing and 97.5% for training. The reduction in performance after 3000 could indicate that Naive Bayes is more sensitive to collision, as an effect of hashing. Vocabularly selection had a negative effect on testing performance, with stop and lemm receiving worse scores (93.8% and 94.8% respectively) and lemmstop receiving the lowest of 92.4%. Although stop did make an improvement upon bare in training (by 0.4%).
In contrast, SVM received training and testing accuracy scores of 83.4% and 83.2%, respectively, for all vector size and vocabulary variations. Dimension reduction does not seem to have any material effect on the SVM model.