# Email Spam Classification Pipeline

## 1. Read labeled email data
1 = ham
0 = spam

In [1]:
data = spark.read.parquet("train_parquet/")
data.cache()
data.show(5)

+--------+--------------------+-----+
|email_id|                text|label|
+--------+--------------------+-----+
|       1|One of a kind Mon...|    0|
|      10|Re: What to choos...|    1|
|     100|Strictly Private....|    0|
|    1000|Re: Flash is open...|    1|
|    1001|Re: Alsa/Redhat 8...|    1|
+--------+--------------------+-----+
only showing top 5 rows



In [2]:
print "Training data: %d" % data.count()

Training data: 2500


## 2. Create a Spark ML pipeline consisting of:
1. **Tokenizer** - extract tokens from raw text
2. **Count vectorizer** - convert tokens to term-frequency vectors
3. **IDF** - normalize term-frequency vectors using TF-IDF
4. **Logistic Regression** for binary classification

In [3]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="[^a-zA-Z_0-9]+")
cv = CountVectorizer(inputCol="words", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="features")
lr = LogisticRegression(maxIter=30)
pipeline = Pipeline(stages=[tokenizer, cv, idf, lr])

## 3. Use K-fold Cross Validation for Model Selection for Pipeline  

In [5]:
auc_eval = BinaryClassificationEvaluator()
grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1e-3, 1e-2]) \
    .addGrid(lr.elasticNetParam, [0.25, 0.0]) \
    .addGrid(cv.vocabSize, [10000, 50000]) \
    .addGrid(idf.minDocFreq, [0, 3]) \
    .build()
cross_val = CrossValidator(estimator=pipeline, evaluator=auc_eval, estimatorParamMaps=grid, numFolds=3)
pipeline_model = cross_val.fit(data)

In [12]:
scores = zip(grid, pipeline_model.avgMetrics)
scores.sort(key=lambda x: x[1], reverse=True)
print "Cross-validation scores:"
for s in scores:
    p = s[0]
    print "regParam: %s; elasticNet: %s, vocabSize: %s, minDocFreq: %s - ROC score: %s" % \
        (p[lr.regParam], p[lr.elasticNetParam], p[cv.vocabSize], p[idf.minDocFreq], s[1])

Cross-validation scores:
regParam: 0.001; elasticNet: 0.25, vocabSize: 50000, minDocFreq: 0 - ROC score: 0.994703388226
regParam: 0.01; elasticNet: 0.25, vocabSize: 10000, minDocFreq: 3 - ROC score: 0.994337609943
regParam: 0.01; elasticNet: 0.25, vocabSize: 10000, minDocFreq: 0 - ROC score: 0.994124213595
regParam: 0.001; elasticNet: 0.25, vocabSize: 10000, minDocFreq: 3 - ROC score: 0.993912311874
regParam: 0.001; elasticNet: 0.25, vocabSize: 10000, minDocFreq: 0 - ROC score: 0.993836705568
regParam: 0.001; elasticNet: 0.25, vocabSize: 50000, minDocFreq: 3 - ROC score: 0.993431752622
regParam: 0.01; elasticNet: 0.25, vocabSize: 50000, minDocFreq: 3 - ROC score: 0.993430946302
regParam: 0.01; elasticNet: 0.0, vocabSize: 10000, minDocFreq: 3 - ROC score: 0.993413552489
regParam: 0.001; elasticNet: 0.0, vocabSize: 10000, minDocFreq: 0 - ROC score: 0.993098352005
regParam: 0.001; elasticNet: 0.0, vocabSize: 10000, minDocFreq: 3 - ROC score: 0.993038119979
regParam: 0.01; elasticNet: 0.25

## 4. Use Best PipelineModel to Score Data

In [13]:
print pipeline.getStages()
print pipeline_model
print pipeline_model.bestModel

[RegexTokenizer_422cb14cc91d8cb75da0, CountVectorizer_4609bd0606c0a106abc5, IDF_47339268d1d054dd59fd, LogisticRegression_4df9a2979bd5b2311e20]
CrossValidatorModel_4ce8b81002e7a3303cd4
PipelineModel_421f8a7fcb8a4ae48f3d


In [14]:
# example pipeline output
data.show(5)
pipeline_model.transform(data).select("text", "words", "features", "label", "prediction").show(5)

+--------+--------------------+-----+
|email_id|                text|label|
+--------+--------------------+-----+
|       1|One of a kind Mon...|    0|
|      10|Re: What to choos...|    1|
|     100|Strictly Private....|    0|
|    1000|Re: Flash is open...|    1|
|    1001|Re: Alsa/Redhat 8...|    1|
+--------+--------------------+-----+
only showing top 5 rows

+--------------------+--------------------+--------------------+-----+----------+
|                text|               words|            features|label|prediction|
+--------------------+--------------------+--------------------+-----+----------+
|One of a kind Mon...|[one, of, a, kind...|(50000,[0,1,2,3,4...|    0|       0.0|
|Re: What to choos...|[re, what, to, ch...|(50000,[0,1,2,3,4...|    1|       1.0|
|Strictly Private....|[strictly, privat...|(50000,[0,1,2,3,4...|    0|       0.0|
|Re: Flash is open...|[re, flash, is, o...|(50000,[0,1,2,3,4...|    1|       1.0|
|Re: Alsa/Redhat 8...|[re, alsa, redhat...|(50000,[0,1,2,3,