# COURSEWORK 2: SPAM CLASSIFICATION 

This coursework is based on a classification task on the spam dataset. Typically, spam is tackled via Naive Bayes, however this approach will test Logistic Regression as the machine learning classifier. Here, the task is to correctly classify spam documents from documents that are not considered spam. The choice of dataset is the spam dataset, and the choice of machine learning algorithms is due to being a binary classification problem. 

In [1]:
import re 

prefix = '/data/tempstore/' # filesystem
dirPath = prefix+'spam/bare/part[1-9]' #Call the spam dataset  


Task a) Define a function that reads the rows in the dataframe and creates a dataframe 

In [2]:
def read_dataframe_rows( argDir ):  
    ft_RDD = sc.wholeTextFiles(argDir) #read files 
    print('Read {} files from directory {}'.format(ft_RDD.count(),argDir)) 
    #Create an RDD through a lambda function with an if statement where if 
    #labelled spmsg then give a label of zero, otherwise all other instances a label of 1.0
    RDD_spam = ft_RDD.map(lambda ft: (ft[1], 0.0 if re.search('spmsg',ft[0]) is None else 1.0))
    
    DataFrame_rows = spark.createDataFrame(RDD_spam, schema=['text','label']) # create a DataFrame with text and labels
    
    return DataFrame_rows # return function

Test the output of the function by printing out 5 top rows

In [3]:
# Testing
DF_spam = read_dataframe_rows(dirPath)
DF_spam.show(5)

print("Testing completed") #print when output is completed 

Read 2602 files from directory /data/tempstore/spam/bare/part[1-9]
+--------------------+-----+
|                text|label|
+--------------------+-----+
|Subject: becoming...|  0.0|
|Subject: zero dow...|  1.0|
|Subject: how does...|  1.0|
|Subject: philosop...|  0.0|
|Subject: job - un...|  0.0|
+--------------------+-----+
only showing top 5 rows

Testing completed


Task b) Implement a machine learning pipeline in Spark.

In [4]:
# Import all the relevant libraries.
import time 
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer, IDF
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [5]:
# Create the training and test datasets with a split of 80% to 20%.
dataset = DF_spam
tr, te = dataset.randomSplit([0.8, 0.2], seed=1234)

## Logistic Regression 

Configure the ML pipeline. We use the tokenizer as the feature transformer and the HashingTF as the feature extractor. We use Logistic Regression and implement all these in the pipeline. The tokenizer breaks up the sentences into individual terms hence being a feature transformer. For feature extraction, HashingTF maps a sequence of terms to their term frequencies via the hashing trick. Finally, logistic regression is implemented as the machine learning classification algorithm.

In [6]:
#tokenizer as feature transforming step
tokenizer = Tokenizer(inputCol="text", outputCol="words") 
#HashingTF as the feature extractor step
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") 
#Choose Logistic regression as the machine learning classifier
logRes = LogisticRegression(maxIter=20)

#Finally the pipeline argument takes in the tokenizer (feature transforming), feature extracting 
#and the machine learning algorithm, which in this case is logistic regression.
pipeline = Pipeline(stages=[tokenizer, hashingTF, logRes]) 


Here, Tasks c) and d) are done together where a grid search is implemented and the performance of the pipeline is evaluated via TrainValidationSplit. 

A ParamGridBuilder is used to construct a grid search, which takes in arguments about the feature transformers and any parameter changes possible for the machine learning classifier. 

In [7]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [100]) \
    .addGrid(logRes.regParam, [0]) \
    .addGrid(logRes.maxIter, [10]) \
    .build()

Define a Binary Classification Evaluator. Logistic regression is a binary classification problem.

In [8]:
binary_class_eval=BinaryClassificationEvaluator()

TrainValidationSplit will try all combinations of values and determine best model using an evaluator. The pipeline is used as an estimator. We introduce the TrainValidationSplit, which uses an estimator, estimator ParamMaps and an evaluator.

In our case, the estimator will use the pipeline, the estimator ParamMaps is essentially the gridsearch and the evaluator is the binary class evaluator. 

The trainRatio split will be 80% for training and 20% for validation.

In [9]:
train_val_split = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=binary_class_eval,
                           trainRatio=0.8)

Create a model by fitting the TrainValidationSplit on the training data.

In [10]:
model = train_val_split.fit(tr)

In [11]:
# Make predictions on training documents.
tr_Start_time = time.time() #start timer
prediction = model.transform(tr)
print("The best training model is ",model.bestModel)
print("best training model results: ", binary_class_eval.evaluate(prediction))
tr_End_time = time.time() #end timer
tr_TimeTaken = tr_End_time - tr_Start_time
print("Time taken is", tr_TimeTaken)

The best training model is  PipelineModel_4f188a4b744f898e969e
best training model results:  0.9944077047982728
Time taken is 16.751314163208008


In [12]:
# Make predictions on test documents. 
te_Start_time = time.time() #start timer
prediction = model.transform(te)
print("The best testing model is ", model.bestModel)
print("best testing model results: ", binary_class_eval.evaluate(prediction))
te_End_time = time.time() #end timer
te_TimeTaken = te_End_time - te_Start_time
print("Time taken is", te_TimeTaken)

The best testing model is  PipelineModel_4f188a4b744f898e969e
best testing model results:  0.9881434355118566
Time taken is 16.725611448287964


We can make predictions on the test data and show 30 results. The model is basically a combination of all the parameters that performed the best. This is so we can get a better view of the classification process. 

In [13]:
model.transform(te)\
    .select("features", "label", "prediction")\
    .show(30)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(100,[0,2,5,10,11...|  0.0|       0.0|
|(100,[1,3,7,10,18...|  1.0|       0.0|
|(100,[1,2,3,4,5,6...|  1.0|       1.0|
|(100,[0,1,3,4,5,6...|  0.0|       0.0|
|(100,[0,1,2,4,5,6...|  0.0|       0.0|
|(100,[1,6,9,10,11...|  0.0|       0.0|
|(100,[0,1,2,4,5,6...|  0.0|       0.0|
|(100,[0,1,4,6,10,...|  1.0|       0.0|
|(100,[0,1,2,3,4,7...|  1.0|       1.0|
|(100,[0,1,2,3,4,5...|  0.0|       0.0|
|(100,[1,4,5,6,9,1...|  0.0|       0.0|
|(100,[0,1,2,3,4,5...|  0.0|       0.0|
|(100,[4,5,7,9,10,...|  0.0|       0.0|
|(100,[0,1,3,4,7,1...|  0.0|       0.0|
|(100,[3,7,9,12,15...|  0.0|       0.0|
|(100,[0,1,2,3,4,5...|  0.0|       0.0|
|(100,[0,1,5,6,7,9...|  0.0|       0.0|
|(100,[0,1,3,4,5,7...|  0.0|       0.0|
|(100,[0,1,4,5,6,7...|  1.0|       1.0|
|(100,[0,1,4,5,7,8...|  1.0|       1.0|
|(100,[0,1,2,3,4,5...|  1.0|       1.0|
|(100,[0,1,2,3,4,5...|  0.0|       0.0|


### Testing the best parameters for Logistic Regression on the test set using the Pipeline.

We now test the best parameters through the pipeline. We first copy the evaluation metrics and output the minimum and maximum parameters. 

In [16]:
import numpy as np

evaluation = model.validationMetrics.copy()
minParams = np.argmin(evaluation)
print(paramGrid[minParams])
maxParams = np.argmax(evaluation)
print(paramGrid[maxParams])


{Param(parent='HashingTF_47adb92303a5e769dd2a', name='numFeatures', doc='number of features.'): 100, Param(parent='LogisticRegression_472889e41ab9172ffc8f', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LogisticRegression_472889e41ab9172ffc8f', name='regParam', doc='regularization parameter (>= 0).'): 0}
{Param(parent='HashingTF_47adb92303a5e769dd2a', name='numFeatures', doc='number of features.'): 100, Param(parent='LogisticRegression_472889e41ab9172ffc8f', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent='LogisticRegression_472889e41ab9172ffc8f', name='regParam', doc='regularization parameter (>= 0).'): 0}


In [18]:
# we input the best parameters from the grid search into the pipeline
te_Start_time = time.time() #start timer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features",numFeatures=100)
logRes = LogisticRegression(maxIter=10,regParam=0) #ml algo
pipeline = Pipeline(stages=[tokenizer, hashingTF, logRes])  #pipeline
model = pipeline.fit(te) #use pipeline to fit model
prediction = model.transform(te) 
binary_class_eval=BinaryClassificationEvaluator()
print("best testing model pipeline results: ", binary_class_eval.evaluate(prediction))
te_End_time = time.time() #end timer
te_TimeTaken = te_End_time - te_Start_time
print("Time taken is", te_TimeTaken)

best testing model pipeline results:  0.9993894993894994
Time taken is 31.141541242599487


It appears that the training time is much longer, however it also includes the tokenizer being initialised and the set up of the pipeline (not timing from the grid search). The accuracy is extremely high at near 100%. 

## Naive Bayes

Now we test this on Naive Bayes and compare the results. As mentioned earlier, Naive Bayes is a popular algorithm, typcially used for this classification process. We implement the multinomial Naive Bayes model as the classifier.

In [19]:
from pyspark.ml.classification import NaiveBayes 

Now we train the naive bayes model, changing some of the variable names to avoid confusion with the logistic regression variables.  

In [20]:
# Train a naive Bayes model.
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb_pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) 

#change paramgrid to add the the naive bayes smoothing parameter.
nb_paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [100]) \
    .addGrid(nb.smoothing, [1.0, 0.5, 0]) \
    .build()

#feed in the new naive bayes pipeline and grid search in TrainValidationSplit
nb_train_val_split = TrainValidationSplit(estimator= nb_pipeline,
                           estimatorParamMaps= nb_paramGrid,
                           evaluator=binary_class_eval,
                           trainRatio=0.8)

#define the naive bayes model and fit to the training set
nb_model = nb_train_val_split.fit(tr)

nb_tr_Start_time = time.time() #start timer
tr_pr_nb = nb_model.transform(tr) #nb prediction on training set
print("The best training model is ", nb_model.bestModel)
print("best training model results: ", binary_class_eval.evaluate(tr_pr_nb))
nb_tr_End_time = time.time() #end timer
nb_tr_TimeTaken = nb_tr_End_time - nb_tr_Start_time
print("Time taken is", nb_tr_TimeTaken)

The best training model is  PipelineModel_4f1fb9d88f53f2dea8e4
best training model results:  0.4904551356283106
Time taken is 17.717833042144775


Now we investigate the test set for Naive Bayes via modelling through TrainValidationSplit. 

In [21]:
nb_te_Start_time = time.time() #start timer 
te_pr_nb = nb_model.transform(te) #nb prediction on testing set
print("The best testing model is ", nb_model.bestModel)
print("best testing model results: ", binary_class_eval.evaluate(te_pr_nb))
nb_te_End_time = time.time() #end timer
nb_te_TimeTaken = nb_te_End_time - nb_te_Start_time
print("Time taken is", nb_te_TimeTaken)

The best testing model is  PipelineModel_4f1fb9d88f53f2dea8e4
best testing model results:  0.4674506779769934
Time taken is 17.51353645324707


### Testing the best parameters for Naive Bayes on the test set using the Pipeline.

We actually take the best parameters from the training set and apply these onto the test set via the pipeline.

In [22]:
import numpy as np

evaluation = nb_model.validationMetrics.copy()
minParams = np.argmin(evaluation)
print(nb_paramGrid[minParams])
maxParams = np.argmax(evaluation)
print(nb_paramGrid[maxParams])

{Param(parent='HashingTF_4114b145928cb2fd5112', name='numFeatures', doc='number of features.'): 100, Param(parent='NaiveBayes_4dbca88328aa1e4680bd', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 1.0}
{Param(parent='HashingTF_4114b145928cb2fd5112', name='numFeatures', doc='number of features.'): 100, Param(parent='NaiveBayes_4dbca88328aa1e4680bd', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 0}


Now we take the best parameters from the grid search and apply it through the pipeline.

In [23]:
nb_te_Start_time = time.time() #start timer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features",numFeatures=100)
nb = NaiveBayes(smoothing= 0, modelType="multinomial") #ML algo

nb_pipeline = Pipeline(stages=[tokenizer, hashingTF, nb]) 
nb_model = nb_pipeline.fit(te) #use pipeline to fit the model

te_pr_nb = nb_model.transform(te) #nb prediction on testing set

print("best testing model results: ", binary_class_eval.evaluate(te_pr_nb))
nb_te_End_time = time.time() #end timer
nb_te_TimeTaken = nb_te_End_time - nb_te_Start_time
print("Time taken is", nb_te_TimeTaken)

best testing model results:  0.46928217980849546
Time taken is 31.52148461341858


The accuracy for Naive Bayes is quite surprising as it performs poorly, below 50%. The time taken is longer however, it includes initialising the pipeline. The time is similar to that of Logistic Regression.

## Comparison of both models and analysis of results

Surprisingly, the Naive Bayes models do not do as well as the logistic regression models. Both the training and testing accuracy for the best models were less than 50%. This in comparison to the high accuracy of near 100% given by the logistic regression models suggest that naive bayes was poor in this pipeline. Perhaps the limited amount of parameter changes resulted in such poor results. Naive Bayes is known for its limited parameter changes and its  simplicity. Perhaps the model initiated was too simple where in reality an ensemble or more sophisticated methods are employed. One thing to note is that there is a negligible difference in training and testing times for both machine learning algorithms.  