# Coursework Part 1: Detecting Spam with Spark

IN432 Big Data coursework 2017, part 1. Classifying messages to detect spam. 

The overall goal is to transform the data into so that we can build a classifier. 
Then some aspects of the data and it's preparation will be explored. We will 
specifically study the effect of 
* the size of training set 
* the size of the representation vector, and 
* the preprocessing with stopword removal and/or lemmatisation.

The team task also addresses the creating of TF.IDF vectors.

Please add comments aobut the your findings at the end. 

> There is a new bit of code at the end of this file that you can use in case of "sc undefined" errors. This tries to create a new SparContext. After using it you will need to run all code cells again from the beginning.    

## Task a) & b) Read some files and prepare a (f,w) RDD 
a) Start by reading the directory with text files from the distributed file system (e.g. `hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1`), and loading all text files using wholeTextFiles(), which loads the text per file, i.e. tuples (f,t). (5%)

> Note: while HDFS is not available, please use the files in `\data\tempstore\`. A prefix mechanism has been added to this version to make it easy to switch. 

b) Split the text into words (lower case), creating a (file,word) RDD. (10%)

For both tasks you can use the code from the labs. Don't remove finals 's' (we have already lemmatised data to work with later). 

It is very useful to package the code into a function that takes a directory as an argument and returns an RDD with (f,w) structure, e.g. `read_fw_RDD`.

Please write two lines of code at the end of the cell that run a little example and print some output. This is for you to test your code during developemnt and for us to mark your work. You can comment them out after you have verified that your code works. I have left some of mine there, feel free to use those

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
import re 
# USE DEPENDING ON DATASTORE
#prefix = '/data/tempstore/'
#prefix = 'hdfs://saltdean.nsqdc.city.ac.uk/data/'
prefix  = '/data/tempstore/'

dirPath = prefix + 'spam/bare/part1'



def splitFileWords(filenameContent): # your splitting function
    f,c = filenameContent # split the input tuple  
    fwLst = [] # the new list for (filename,word) tuples
    wLst = re.split('\W+',c) # <<< now create a word list wLst
    
    for w in wLst: # iterate through the list
        fwLst.append((f,w.lower())) # <<< and append (f,w) to the 
        
    return fwLst #return a list of (f,w) tuples 



def read_fw_RDD(dirPath): # package tasks a/b into a function for later use
#     # ... #<<< task a) read the files
    ft_RDD = sc.wholeTextFiles(dirPath) #<<< add code to create an RDD with wholeTextFiles
    
    print('Read {} files from directory {}'.format(dirPath, ft_RDD.count())) # status message for testing, can be disabled later on

    print('file word count histogram') # the histogram can be useful for checking later 
    print(ft_RDD.map(lambda fwL: (len(fwL[1]))).histogram([0,10,100,1000,10000]))


#     ... #<<< task b) split words
    fw_RDD = ft_RDD.flatMap(splitFileWords)
    return fw_RDD # A fw_RDD should be returned

fw_RDD = read_fw_RDD(dirPath) # for testing
fw_RDD.take(3) # for testing





# words = text.flatMap(lambda x: re.split('\W+',x)) # split words, break lists
# words1 = words.map(lambda x: (stripFinalS(x),1)) # lower case, (w,1) pairs
# wordCount = words1.reduceByKey(add) # reduce and add up counts
# stopwlst = ['the','a','in','of','on','at','for','by','I','you','me'] # stopword list
# freqWords = wordCount.filter(lambda x:  x[1] >= 5 or x in stopwlst ) # remove rare words
# stopWords = freqWords.filter(lambda x:  x[0] in stopwlst ) # keep only stopwords
# output = stopWords.collect() # collect results
# for (word, count) in output: # iterate over (w,c) pairs
#     print("%s: %i" % (word, count)) #  … and print

Read /data/tempstore/spam/bare/part1 files from directory 289
file word count histogram
([0, 10, 100, 1000, 10000], [0, 0, 107, 177])


[('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'subject'),
 ('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'becoming'),
 ('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'a')]

## Task c) Normalised word count lists
Use the code from the labs to generate the `[(word,count), ...]` list per file and to create a word frequency vector. 

Normalise the term frequency (TF) vector by the total word count per file. (15%)

For this you can mostly reuse the lab code. The interesting part here is the normalisation. For normalisation we need to total word count per file. You can use a nested list comprehension for this (go through the (w,c) list and divide each c by the sum of all c, which you can get with a list). Alternatively, you can write a function where you can create local variables, e.g. for the number of words per file.  

Another option is to use a separate RDD with (f,twc), where 'twc' is for total word count, and which you can create from the (f,[(w,c), ... ]) RDD. 

This new RDD can then be joined with the (f,[(w,c), ... ]) RDD and then the (w,c) list be normalised in a list comprehension. 

Again, put your code into a function, and add a short test that can be commented out.

In [2]:
from operator import add

def reGrpLst(fw_c): # we get a nested tuple
    fw,c = fw_c
    f,w = fw
    return (f,[(w,c)]) # return (f,[(w,c)]) structure. Can be used verbatim, if your variable names match.
 
def make_f_tfLn_RDD(argDir): 
    
    fw_RDD = read_fw_RDD(argDir)
    
    #<<< change (f,w) to ((f,w),1)
    fw_1_RDD = fw_RDD.map(lambda fw: (fw, 1))
    fw_c_RDD = fw_1_RDD.reduceByKey(add) 
    f_wcL_RDD = fw_c_RDD.map(reGrpLst) #as above
    f_wcL2_RDD = f_wcL_RDD.reduceByKey(add)
    
    
    fw_d_RDD = fw_RDD.map(lambda x: (x[0], 1)).reduceByKey(add)
    f_wcLn_RDD = f_wcL2_RDD.join(fw_d_RDD) #this returns (f, ([w,c], [w,c]..., dl))
    
    #You need to divide c by dl ...
    #f_wcLn_RDD = f_wcLn_RDD.map( ...)

    return f_wcLn_RDD



f_wcLn_RDD = make_f_tfLn_RDD( prefix + 'spam/bare/part1') # for testing
print(f_wcLn_RDD.take(1)) # for testing

#wcLn = f_wcLn_RDD.take(1)[0][1] # get the first normalised word count list
#print(sum([cn for (w,cn) in wcLn])) # the sum of normalised counts should be close to 1 

Read /data/tempstore/spam/bare/part1 files from directory 289
file word count histogram
([0, 10, 100, 1000, 10000], [0, 0, 107, 177])
[('file:/data/tempstore/spam/bare/part1/spmsga129.txt', ([('please', 1), ('99', 1), ('3', 1), ('a', 3), ('surrey', 1), ('fresh', 1), ('pound', 2), ('supplied', 1), ('po', 1), ('the', 13), ('59', 1), ('about', 1), ('net', 1), ('get', 1), ('post', 2), ('international', 1), ('152', 1), ('80', 1), ('dispatched', 1), ('discs', 1), ('date', 1), ('text', 1), ('box', 1), ('137', 1), ('then', 1), ('5', 2), ('120', 1), ('packing', 1), ('sites', 1), ('15', 1), ('lists', 3), ('how', 1), ('92', 1), ('email', 15), ('either', 1), ('who', 1), ('made', 1), ('data', 1), ('and', 11), ('you', 9), ('enclose', 1), ('number', 1), ('fully', 1), ('address', 3), ('come', 2), ('42', 1), ('it', 1), ('packages', 1), ('178', 1), ('cleared', 1), ('by', 2), ('unsolicited', 1), ('2', 1), ('direct', 1), ('exchange', 1), ('code', 1), ('expensive', 1), ('dollars', 3), ('that', 1), ('payabl

## Task d) Creating hashed feature vectors 
Use the hashing trick to create fixed size TF vectors. (10%)

Use the code from the week 2 lecture to create the hash vectors.

As before, make it a function and add a short test.

In [3]:
def hashing_vectorizer(word_count_list, N): 
    # use the code from the lecture
    v = [0] * N # create fixed size vector of 0s for word_count in word_count_list:
    for word_count in word_count_list:
        word,count = word_count_list # unpack tuple
        h = hash(word) # get hash value
        v[h % N] = v[h % N] + count # add count 
    return v # return hashed word vector
    
    
def make_f_wVn_RDD(f_wcLn_RDD, argN):
    # apply hashing_vectorizer in a lambda, this is only a one-liner
    print(f_wcLn_RDD.take(3))
    f_wVn_RDD = f_wcLn_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],argN)))
    print(f_wVn_RDD.take(3))
    return f_wVn_RDD


N=100
f_wVn_RDD = make_f_wVn_RDD(make_f_tfLn_RDD(dirPath),N) # for testing
print(f_wVn_RDD.take(1)[0][1]) # for testing
print( sum(f_wVn_RDD.take(1)[0][1])) # for testing

Read /data/tempstore/spam/bare/part1 files from directory 289
file word count histogram
([0, 10, 100, 1000, 10000], [0, 0, 107, 177])
[('file:/data/tempstore/spam/bare/part1/spmsga129.txt', ([('opened', 1), ('unsure', 1), ('programs', 2), ('road', 1), ('54', 1), ('', 1), ('files', 2), ('have', 1), ('work', 2), ('country', 2), ('disc', 2), ('started', 1), ('payment', 3), ('102', 1), ('england', 1), ('isp', 2), ('own', 2), ('4', 1), ('of', 4), ('do', 1), ('purchase', 1), ('below', 1), ('form', 1), ('fill', 1), ('friendly', 1), ('normally', 1), ('new', 1), ('ask', 2), ('documents', 1), ('free', 1), ('sent', 1), ('addresses', 12), ('days', 1), ('name', 1), ('8', 1), ('cd', 2), ('type', 1), ('bank', 1), ('only', 1), ('are', 5), ('but', 1), ('i', 1), ('9', 1), ('us', 3), ('mailing', 2), ('other', 1), ('just', 1), ('1xt', 1), ('tick', 2), ('3xd', 1), ('details', 2), ('calcuated', 1), ('them', 1), ('quoted', 1), ('remove', 1), ('000', 18), ('1', 1), ('in', 6), ('subject', 1), ('xx', 1), ('amou

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 30, 10.207.1.85): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-3-37240242e93b>", line 14, in <lambda>
  File "<ipython-input-3-37240242e93b>", line 6, in hashing_vectorizer
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1306, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-3-37240242e93b>", line 14, in <lambda>
  File "<ipython-input-3-37240242e93b>", line 6, in hashing_vectorizer
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


## Task e) Create Labeled Points

Determine whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (ham) accordingly. Use map() to create an RDD of LabeledPoint objects. 

See here [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) for an example, and here [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint) for the `LabeledPoint` documentation. (15%)

It's useful to take the RDD with normalised word lists as input. 

For finding the spam messages use `re.search()` see here[https://docs.python.org/3/library/re.html?highlight=re%20search#re.search](https://docs.python.org/3/library/re.html?highlight=re%20search#re.search) for documentation. Search for 'spmsg' in the filename and check whether the result is `None`. The relevan syntax here is <b>`0 if <yourCondition> else 1`</b>, i.e. 0 if 'spmsg' is not in the filename (not spam) and 1 if it is (it's spam).

In [None]:
from pyspark.mllib.regression import LabeledPoint

def make_lp_RDD(f_tfLn_RDD,argN):
    
    lp_RDD = f_wVec_RDD.map(lambda f_wVec: LabeledPoint(1 if re.search(f_wVec[0],'spmsg') is not None else 0,f_wVec[1]))     #<<< make a vector
    #<<< detect spam by filename 
    #<<< transform into LabeledPoint objects
    return lp_RDD

#lp_RDD = make_lp_RNN(make_f_tfLn_RDD(prefix + 'spam/bare/part1'),100)
#print(lp_RDD.take(3))

## Task f) Train a classifier 

Use the `LabeledPoint` objects to train the `LogisticRegression` and calculate the accuracy of the model on the training set (again, follow this example [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) and here is the documentation [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS).  (15%) 

It's useful to make a function that takes as argument a normalised word list again (because we can later also use it with TF.IDF values).  

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, NaiveBayes
from pyspark.mllib.util import MLUtils

path = prefix + 'spam/stop/part1'

N=100
def trainModel(f_wcL_RDD,N):
    ... #<<< get the training data as LabeledPoint objects.
    print('training data items: {}, correct: {}'.format(trainData.count(), correct)) # output raw numbers
    print('training accuracy {:.1%}'.format(accuracy)) # and accuracy
    return model 

f_wcLn_RDD = make_f_wcLn_RDD(path) # for testing
model = trainModel(f_wcLn_RDD,N) # for testing

## Task g) Test the classifier

Use the files from \texttt{.../data/extra/spam/bare/part10} and prepare them like in task~a)-e) (use the function you created in task e) and before. Then use the trained model to predict the label for each vector you have and compare it to the original to test the performance of your classifier. (10\%) 

In [None]:
def testModel(model,f_wcL_RDD,N):
    #<<< like with trainModel, transform the data and evaluate it.
    print('test data items: {}, correct:{}'.format(testData.count(),correct))
    print('testing accuracy {:.1%}'.format(accuracy))

testModel(model,make_f_wcLn_RDD('hdfs://saltdean/data/spam/stop/part10'),N) # for testing

## Task h) Run experiments 

Package the whole classifier training and evaluation in one function. Then apply it to the files from `/data/extra/spam/lemm`, `/data/extra/spam/stop` and `/data/extra/spam/lemm_stop` in addition to `/data/extra/spam/bare`  and evaluate the accuracy of your classifier. 

Comment on the effect of *lemmatisation* and *stopword removal* on classification accuracy. Further, evaluate the use of larger training sets and the effect of different vector sizes. Print out the results of your experiments in readable form. (20%) 

You need to create one small fuction that combines tasks f) and g), and then apply it to different datasets sizes, vector sizes, and different preprocessings. 

The combination of the part1-part9 datasets can be achieved by using 'glob' patterns in the filename ('part[1-9]'). This is a feature of the Hadoop filesystem and not well documented in Spark (or anywhere else). You can find a description of its Python implementation here: [https://docs.python.org/3/library/glob.html](https://docs.python.org/3/library/glob.html). You can also supply multiple comma-separated paths, but you'll need to test what works, when you use this feature. Recursive patterns don't seem to work.

Alternatively, you can create unions of RDDs for each part. However, this seems to lead to slower execution. With the latter, it is useful to created arrays of the directory names (part1, ...). When you work with unions, it may be useful to start with an empty RDD. That can be created with `sc.parallelize([])`.

A useful tool for creatng multiple long paths variants is the use of the Python string format() method as used below. There is a good set of example here: [https://docs.python.org/3/library/string.html#format-examples](https://docs.python.org/3/library/string.html#format-examples) and the specification is here: [https://docs.python.org/3/library/string.html#format-specification-mini-language](https://docs.python.org/3/library/string.html#format-specification-mini-language).


In [None]:
# this function combines tasks f) and g)
def trainTestModel(trainPaths,testPath,N):
    ... #<<< just combine training ans testing here
    
# prepare the part directories and the path
dirPattern = 'hdfs://saltdean/data/spam/bare/part[1-{}]' # the {} can be filled by 'dirPattern.format(i)' 
# create the path for the test set
testPath = 'hdfs://saltdean/data/spam/bare/part10'

print('EXPERIMENT 1: Testing different training set sizes')
print('Path = {}, N = {}'.format(dirPattern,N)) # using format to make sure we record the parameters of the experiment
#<<< make the test set, it will be constant for this experiment
#<<< loop over i the number of parts for training (1-9)
    trainPath = dirPattern.format(i) # in the loop you can create a path like this
    print(trainPath) #just for testing, remove later
    #<<< create the trainRDD (using your make_f_tfLn_RDD method)
    trainTestModel(trainPaths,testPath,N)

print('\nEXPERIMENT 2: Testing different vector sizes')
#<<< loop over different values for N. 3,10,30,100,300, ... is a good pattern
    print('=== N = ',N)
    trainTestModel(trainPaths,testPath,N)

N = 100 # change to what you feel is a good compromise between computation and accuracy
# the dictionary below helps associate description and paths.
setDict = {'No preprocessing': prefix + 'spam/bare/',
           'Stopwords removed': prefix + 'spam/stop/',
           'Lemmatised': prefix + 'spam/lemm/',
           'Lemmatised and stopwords removed': prefix + 'spam/lemm_stop/'}

print('\nEXPERIMENT 3: Testing differently preprocessed data sets')
print('training on parts 1-9, N = {}'.format(N))
for sp in setDict:
    print('=== ',sp)
    trainPath = setDict[sp] + 'part[1-9]'
    #<<< make the test (part1-9) and training data (part10) RDDs and evaluate 

print('\n====== Done ======')

## Task i) (Task for pairs) TF.IDF vectors
You need to address this task if you are working as a pair. 

Calculate the IDF values for each word and generate fixed size TF.IDF vectors for each document (word frequencies still normalised by total document word count). Also evaluate the use of TF.IDF compared to normalised word counts in terms of accuracy. (25%)

To calculate the IDF values you need to create an RDD (w,f) pairs. You can use the function `RDD.distinct()` to remove duplicates and reorganise to create (w,[f, ...]) lists. The length of the list is the document frequency and can be used to calculate the IDF.





In [None]:
from operator import add
from math import log

trainPath = prefix + 'spam/lemm_stop/part[1-9]'
testPath = prefix + 'spam/lemm_stop/part10'

def make_f_wtfiL_RDD(path):
    # Calculuate the IDFs
    fw_RDD = read_fw_RDD(path)
    #<<< keep only unique (f,w) pairs
    #<<< (f,w) -> (w,[f])
    #<<< join the lists of files with reduceByKey
    vocSize = wfL_RDD.count() # calculate the vocabulary size
    print('vocSize: {}'.format(vocSize)) 
    # calculate the IDF values per word by using len() on the list of files
    print('wIdf_RDD.count(): ',wIdf_RDD.count()) # for testing
    print(wIdf_RDD.take(2)) # for testing

    # Get the normalise word counts (TFs) and organise by word (w,(f,cn))
    f_wcLn_RDD = make_f_wcLn_RDD(path) # create the normalised word count lists 
    #print('f_wcLn_RDD: ',f_wcLn_RDD.map(
    #        lambda x: sum([c for (w,c) in x[1]]).histogram([0,10,100,1000,10000]))) # check for the per-file word counts
    #<<<< create a list of tuples [(w,(f,cn)), ..] and use flatmap 
    print('w_fcn_RDD.count(): {}'.format(w_fcn_RDD.count())) # for testing
    print(w_fcn_RDD.take(2)) # for testing

    # now we can join the IFDs and TFs by the words (w,(f,cn)) join (w,idf) to (w,((f,cn),idf))
    #<<< use RDD.join()
    print( 'w_fcnIdf_RDD.count(): ', w_fcnIdf_RDD.count())
    print( w_fcnIdf_RDD.take(2))

    # we have doubly nested tuples (w,((f,cn),idf)) in the RDD, 
    # but they let us calculate the TF.IDF per file and word (f,[(w,cn*idf)]).
    #<<<< map to (f,[(w,cn*idf)])
    print('f_wtfiL_RDD.count()', f_wtfiL_RDD.count())
    print(str(f_wtfiL_RDD.take(2)))

    # with that we can reduce by key (files) to get [(w,tfidf), ...] lists per file.
    #<<< reduceByKey
    print('# of files with TF.IDF vectors: {}'.format(f_wtfiL2_RDD.count()))
    print(f_wtfiL2_RDD.take(2)))

    return f_wtfiL2_RDD


N=100 # choose a value yourself 
print('N: {}, trainPath: {}'.format(N,trainPath))
#<<< you can now apply trainModel and test Model to RDDs created with make_f_wtfiL_RDD()


## Appendix
This code is just needed if you have an error message "sc undefined". In that case run the code below and try again. You will have to run all code cells from the beginning again, as the new context has no information about what happened before.   

In [None]:
# try this in case of "sc undefined" errors

from pyspark import SparkContext

try: 
    sc.stop()
    print('Stopped existing SparkContext')
except Exception as e: 
    print(e)

try: 
    sc = SparkContext(appName='Coursework part 1')
    print('Created new SparkContext')
except Exception as e: 
    print(e)
print('Proterties of sc: ',list(sc.getConf().getAll()))