In [1]:
from pyspark import SparkConf
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql import SparkSession, functions, types
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
spark = SparkSession.builder.master("local").appName("Classification").getOrCreate()
readData = lambda path, label: spark.read.text(path).withColumn("label", functions.lit(label))
pTrain = readData("./spam-datasets/spam_training.txt",1)
nTrain = readData("./spam-datasets/nospam_training.txt",0)
pTest = readData("./spam-datasets/spam_testing.txt",1)
nTest = readData("./spam-datasets/nospam_testing.txt",0)
trainSet = pTrain.union(nTrain).cache()
testSet = pTest.union(nTest).cache()

In [4]:
tokenizer = RegexTokenizer(pattern="[^a-z0-9A-Z\-\']|https?://\S+|www\.\S+|\w*\d\w*", inputCol="value", outputCol="words_s")
remover = StopWordsRemover(inputCol="words_s", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=16384)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression()
nb = NaiveBayes()
svm = LinearSVC()

In [6]:
pipelineFit = lambda estimator: Pipeline(stages=[tokenizer, remover, hashingTF, idf, estimator]).fit(trainSet)
lrModel = pipelineFit(lr)
nbModel = pipelineFit(nb)
svmModel = pipelineFit(svm)

In [None]:
predict = lambda model: model.transform(testSet).select("prediction", "label")
lrPredictions = predict(lrModel)
nbPredictions = predict(nbModel)
svmPredictions = predict(svmModel)

In [None]:
getMetric = lambda predictions, metric: MulticlassClassificationEvaluator(metricName=metric).evaluate(predictions)
getMetrics = lambda predictions: (getMetric(predictions, "accuracy"), getMetric(predictions, "recallByLabel"), 
                                  getMetric(predictions, "precisionByLabel"))

lr_acc, lr_rec, lr_pre = getMetrics(lrPredictions)
nb_acc, nb_rec, nb_pre = getMetrics(nbPredictions)
svm_acc, svm_rec, svm_pre = getMetrics(svmPredictions)

In [None]:
def plot_bar(y, label): 
    fig, ax = plt.subplots()
    labels = ["LogisticRegression", "NaiveBayes", "SVM"]
    colors = ["red","green","blue"]
    ax.bar(range(3), y, tick_label=labels, color=colors)
    ax.set_ylabel(label)
    for i,v in enumerate(y):
        ax.text(i-.1, v, str(round(v,4)*100)+"%", fontweight='bold')
    plt.ylim(0.9,1)
    plt.show()

In [None]:
plot_bar([lr_acc, nb_acc, svm_acc], "Accuracy")
plot_bar([lr_rec, nb_rec, svm_rec], "Recall")
plot_bar([lr_pre, nb_pre, svm_pre], "Precision")