This is a pyspark python3 notebook

In [None]:
import os
import email

from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
DATA_DIR = '/home/ml/Desktop/spam/trec07p/data'
LABELS_FILE = '/home/ml/Desktop/spam/trec07p/full/index'
TRAINING_SET_RATIO = 0.7

In [None]:
labels = {}
# Read the labels.
with open(LABELS_FILE) as f:
    for line in f:
        line = line.strip()
        label, key = line.split()
        labels[key.split('/')[-1]] = 1 if label.lower() == 'ham' else 0

In [None]:
def extract_email_text(path):
    # Load a single email from an input file.
    with open(path) as f:
        # msg = email.message_from_file(f)
        # with open(path, encoding='utf-8', errors='ignore') as f:
        text = f.read()
        msg = email.message_from_string(text)
    if not msg:
        return ''

    # Read the email subject.
    subject = msg['Subject']
    if subject:
        subject = subject
    else:
        subject = ''

    # Read the email body.
    body = None
    try:
        body = ' '.join(m for m in flatten_to_string(msg.get_payload()) if type(m) == str)
    except: pass
    if body:
        body = body
    else:
        body = ''

    return subject + '\n' + body

In [None]:
def flatten_to_string(parts):
    ret = []
    if type(parts) == str:
        ret.append(parts)
    elif type(parts) == list:
        for part in parts:
            ret.extend(flatten_to_string(part))
    elif parts.get_content_type() == 'text/plain':
        ret.append(parts.get_payload())

In [None]:
def read_email_files():
    X = []
    y = []
    for i in range(len(labels)):
        filename = 'inmail.' + str(i+1)
        email_str = extract_email_text(os.path.join(DATA_DIR, filename))
        X.append(email_str)
        y.append(float(labels[filename]))
    return X, y

In [None]:
X, y = read_email_files()

schema = StructType([
            StructField('id', IntegerType(), nullable=False),
            StructField('email', StringType(), nullable=False),
            StructField('label', DoubleType(), nullable=False)])

df = spark.createDataFrame(zip(range(len(y)), X, y), schema)

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- email: string (nullable = false)
 |-- label: double (nullable = false)

In [None]:
train, test = df.randomSplit([TRAINING_SET_RATIO, 1-TRAINING_SET_RATIO], seed=123)

In [None]:
tokenizer = Tokenizer()
vectorizer = CountVectorizer()
rfc = RandomForestClassifier()

pipeline = Pipeline(stages=[tokenizer, vectorizer, rfc])

In [None]:
paramMap = {
    tokenizer.inputCol: 'email',
    tokenizer.outputCol: 'tokens',

    vectorizer.inputCol: 'tokens',
    vectorizer.outputCol: 'vectors',

    rfc.featuresCol: 'vectors',
    rfc.labelCol: 'label',
    rfc.numTrees: 500
}

In [None]:
model = pipeline.fit(train, params=paramMap)

In [None]:
prediction = model.transform(test)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
pr_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderPR'})
roc_score = evaluator.evaluate(prediction, {evaluator.metricName: 'areaUnderROC'})

print("Area under ROC curve score: {:.3f}".format(roc_score))
print("Area under precision/recall curve score: {:.3f}".format(pr_score))

Area under ROC curve score: 0.971
Area under precision/recall curve score: 0.958
