In [1]:
from pyspark.ml.feature import IndexToString, Word2Vec, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [2]:
# 设置运行环境

conf = SparkConf()
conf.setAppName("SpamMessageClassifier")
conf.setMaster("spark://172.19.96.58:7077")
sc.stop()
sc = SparkContext(conf = conf)
sqlCtx = SQLContext(sc)

In [3]:
# 从数据集读取手机短信作为处理数据源，在此基础上创建 DataFrame，
# 该 DataFrame 包含lableCol、contextCol 两个列

data_path = "SMSSpamCollection"
messageRDD = sc.textFile(data_path).map(lambda line:line.split("\t")).map(lambda line:[line[0],line[1].split()])

smsDF = sqlCtx.createDataFrame(messageRDD).toDF("labelCol", "contextCol")

In [4]:
# 将原始的文本标签{"Ham", "Spam"} 转换成数值类型

labelIndexer = StringIndexer(inputCol="labelCol", 
                             outputCol="indexedLabelCol").fit(smsDF)

# 使用 Word2Vec 将短信文本转化成数值型词向量
word2Vec = Word2Vec(inputCol="contextCol",
                   outputCol="featuresCol",
                   vectorSize=100,
                   minCount=1)

In [5]:
layers = [100,6,5,2]

# 使用 MultilayerPerceptronClassifier 训练一个多层感知模型

mpc = MultilayerPerceptronClassifier(featuresCol="featuresCol",
                                    labelCol="indexedLabelCol",
                                    predictionCol="predictionCol",
                                    maxIter=128,
                                    seed=1234,
                                    layers=layers,
                                    blockSize=512)

In [6]:
# 使用 IndexToString 将预测结果的数值标签转化成原始的文本标签

labelConverter = IndexToString(inputCol="predictionCol",
                              outputCol="predictedLabelCol",
                              labels=labelIndexer.labels)

In [7]:
# 将原始文本数据按照 8:2 的比例分成训练和测试数据集合
trainingData,testData = smsDF.randomSplit([0.8, 0.2])

In [8]:
# 使用 Pipeline 对数据进行处理和模型的训练
pipeline = Pipeline(stages = [labelIndexer, word2Vec, mpc, labelConverter])
model = pipeline.fit(trainingData)
preResultDF = model.transform(testData)

In [9]:
# 使用模型对测试数据进行分类处理并在屏幕打印 20 条数据
preResultDF.select(
    col("contextCol"),
    col("labelCol"),
    col("predictedLabelCol")).show(20)

+--------------------+--------+-----------------+
|          contextCol|labelCol|predictedLabelCol|
+--------------------+--------+-----------------+
|["Are, you, comin...|     ham|              ham|
|["EY!, CALM, DOWN...|     ham|              ham|
|["HELLO, U.CALL, ...|     ham|              ham|
|["Hi, its, Kate, ...|     ham|              ham|
|["Petey, boy, whe...|     ham|              ham|
|["Si.como, no?!li...|     ham|              ham|
|["YEH, I, AM, DEF...|     ham|              ham|
|[&lt;#&gt;, %of, ...|     ham|              ham|
|[(And, my, man, c...|     ham|              ham|
|[(I, should, add,...|     ham|              ham|
|[*, Was, a, nice,...|     ham|              ham|
|[1), Go, to, writ...|     ham|              ham|
|               [645]|     ham|              ham|
|[;-(, oh, well,, ...|     ham|              ham|
|[Abeg,, make, pro...|     ham|              ham|
|[Actually, i, del...|     ham|              ham|
|[Ahhh., Work., I,...|     ham|              ham|


In [10]:
# 测试数据集上测试模型的预测精确度
evaluator = MulticlassClassificationEvaluator(
    predictionCol="predictionCol",
    labelCol="indexedLabelCol")

predictionAccuracy = evaluator.evaluate(preResultDF)

print("Testing Accuracy is {:.4f}".format(predictionAccuracy*100) + "%.")



Testing Accuracy is 93.3183%.


In [11]:
sc.stop()