# A Spam Classifier with PySpark
This adventure builds a spam classifier against the CSMDC 2010 Spam data set, which has 4327 labeled observations. While the original dataset has disappeared from its original home, you may download it from [this Github repo](https://github.com/hexgnu/spam_filter/tree/master/data).

This exercise is loosely based on a lab from the Hortonworks *HDP Analyst: Data Science* course.

We will read the files with standard Python code, then create a Resilient Distributed Dataset from the data that have been loaded into the driver's memory.

In [None]:
spamFiles = []
notSpamFiles = [] 

with open('/root/ds/labs/Lab10.3/spamClassData/label/SPAMTrain.txt', 'r') as f:
    for line in f:
        if int(line[0]) == 1:
            r = line[2:]
            notSpamFiles.append('/root/ds/labs/Lab10.3/spamClassData/'+r.rstrip('\n'))
        elif int(line[0]) == 0:
            r = line[2:]
            spamFiles.append('/root/ds/labs/Lab10.3/spamClassData/'+r.rstrip('\n'))

In [28]:
import io

def loadFiles(paths):
    emails = []
    for path in paths:
        with io.open(path,"r", encoding='iso-8859-1') as f:
            emails.append(f.read())
    return emails

spams = loadFiles(spamFiles)
hams = loadFiles(notSpamFiles)

## Preprocessing Pipeline
This pipeline is built with code I previously published. I should really turn this into a library!

In [30]:
from collections import defaultdict
from nltk import download, pos_tag
from nltk.tokenize import wordpunct_tokenize
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords, wordnet
import string
import re
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import Pipeline

class TransformerBase(BaseEstimator, TransformerMixin):
    '''
    Provides no-op fit() function for Transformers that only need
    a fit method
    '''
    
    def __init__(self):
        pass
    
    def fit(self, X, y=None, **fit_params):
        return self

class LowerCaser(TransformerBase):
    
    def transform(self, X, **fit_params):
        for i in range(len(X)):
            X[i] = X[i].lower()
        return X    

class Tokenizer(TransformerBase):

    def transform(self, X, **fit_params):
        for i in range(len(X)):
            X[i] = X[i]
            X[i] = wordpunct_tokenize(X[i])
            for tok in X[i]:
                if tok.endswith('.') and len(tok) > 1:
                    X[i].remove(tok)
        return X
    
def remove_listed_tokens(X, removal_list):
    '''
    If you immediately remove a token as you iterate forward through a list,
    you skip over the next token. This function instead builds a list of tokens
    to be removed, then removes them at the end.
    
    Parameters
      X - list of lists
      removal_list - string of tokens (e.g., punctuation), or list of strings (e.g., stopwords)
    '''
    for doc in X:
        removals = []
        for tok in doc:
            if tok in removal_list:
                removals.append(tok)
        for p in removals:
            doc.remove(p)
    return X
        
class StopWordRemover(TransformerBase):
    
    def __init__(self):
        download("stopwords")
        
    def transform(self, X, **fit_params):
        return remove_listed_tokens(X, stopwords.words('english'))
    
class Stringizer(TransformerBase):
    def transform(self, X, **fit_params):
        for i in range(len(X)):
            X[i] = ' '.join(X[i])
        return X
    
class Lemmatizer(TransformerBase):
    
    def __init__(self):
        self.treenet_map = defaultdict(str)
        self.treenet_map['N'] = wordnet.NOUN
        self.treenet_map['R'] = wordnet.ADV
        self.treenet_map['V'] = wordnet.VERB
        self.treenet_map['J'] = wordnet.ADJ
        
    def transform(self, X, **fit_params):
        lemmatizer = WordNetLemmatizer()
        for i in range(len(X)):
            doc = X[i][:]
            X[i] = [] # a list of lemmatized tokens
            for tok, pos in pos_tag(doc):
                wordnet_pos = self.treenet_map[pos[0]]
                if not wordnet_pos:
                    X[i].append(tok) # use tok without any lemmatizing if not a recognized POS
                else:
                    X[i].append(lemmatizer.lemmatize(tok, wordnet_pos))
        return X

class PunctuationRemover(TransformerBase):
    
    def __init__(self, exceptions = ''):
        self.exceptions = exceptions
        
    def transform(self, X, **fit_params):
        if not self.exceptions:
            punc = string.punctuation
        else:
            retained_punc = re.compile('['+self.exceptions+']') # don't remove these chars; they may convey emotion
            punc = retained_punc.sub('', string.punctuation)
        return remove_listed_tokens(X, punc)
    
    def get_params(self, deep=True):
        return {'exceptions': self.exceptions}
    
    def set_params(self, **parameters):
        for parm, value in parameters.items():
            setattr(self, parm, value)
        return self

pipeline = Pipeline([('lower', LowerCaser()),
                     ('tokenize', Tokenizer()),
                     ('lemmatize', Lemmatizer()),
                     ('stopwords', StopWordRemover()),
                     ('punc', PunctuationRemover()),
                     ('stringize', Stringizer())])

spams = pipeline.transform(spams)
hams = pipeline.transform(hams)
print len(spams)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
1378


## Spark MLLib
Because this notebook was created and run on a vastly underpowered, single-node cluster, the SparkConf must throttle the resource usage. The string data will be:
1. parallelized into RDDs
2. tokenized into a sparse feature vector with `pyspark.mllib.feature.HashingTF`

Since a model from `pyspark.mllib` will process the data, we must use the `mllib` version of `HashingTF` rather than the `pyspark.ml` version.

We will then prepend the labels to the observations so the LogisticRegressionWithSGD classifier can train. After splitting the observations into training and test sets, we cache the training set to optimize the iterative stochastic gradient descent processing.

In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().set('spark.executor.instances', 1). \
 set("spark.executor.memory", "4g")
sc = SparkContext("yarn-client", conf=conf)
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD

spam = sc.parallelize(spams)
ham = sc.parallelize(hams)
tf = HashingTF()

spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
hamFeatures = ham.map(lambda email: tf.transform(email.split(" ")))

labeledSpam = spamFeatures.map(lambda features: LabeledPoint(1, features))
labeledHam = hamFeatures.map(lambda features: LabeledPoint(0, features))

observations = labeledSpam.union(labeledHam)


In [34]:
trainingSet, testSet = observations.randomSplit([4,1])
trainingSet.cache()
model = LogisticRegressionWithSGD.train(trainingSet)

## Model Validation
Let's test how well the model performs....

In [35]:
LabelsAndPredictions = testSet.map(lambda v: (v.label, model.predict(v.features)))
LabelsAndPredictions.cache()
errorRate = LabelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count() / float(LabelsAndPredictions.count())
print("Training Error = " + str(errorRate))

Training Error = 0.026875699888


Unfortunately, `pyspark.mllib.feature.HashingTF` does not provide a way to inspect the terms in the sparse feature vector. If you want to understand feature importance, you must use `pyspark.ml.feature.CountVectorizer` with a model from the `pyspark.ml` namespace. Since `pyspark.mllib` models such as `LogisticRegressionWithSGD` cannot even be saved to disk at this time, the only reasonable way to use PySpark in production data science is to use `pyspark.ml` rather than `pyspark.mllib`.