## 0 初始处理

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import mllib
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.classification import SVMWithSGD, LogisticRegressionWithSGD, NaiveBayes
import pyspark.mllib.evaluation as ev

conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

In [2]:
data = sc.textFile("train_after.csv")

In [3]:
# 将 RDD 构建为 Mllib 中的模型能够使用的数据格式 LabeledPoint
labeled_data = data.map(lambda x: LabeledPoint(x.split(",")[-1], x.split(",")[:-1]))
print(labeled_data.take(10))

[LabeledPoint(0.0, [362112.0,4.0,1.0,2618.0]), LabeledPoint(0.0, [232320.0,4.0,1.0,1168.0]), LabeledPoint(0.0, [232320.0,4.0,1.0,4270.0]), LabeledPoint(0.0, [36480.0,6.0,1.0,4730.0]), LabeledPoint(0.0, [37248.0,3.0,1.0,2615.0]), LabeledPoint(0.0, [299904.0,6.0,1.0,1742.0]), LabeledPoint(0.0, [104064.0,6.0,1.0,3870.0]), LabeledPoint(0.0, [104064.0,6.0,1.0,669.0]), LabeledPoint(0.0, [105600.0,6.0,1.0,1487.0]), LabeledPoint(0.0, [40320.0,4.0,1.0,4173.0])]


In [4]:
# 按照 3:1 的比例划分训练集与测试集
train, test = labeled_data.randomSplit([0.75, 0.25])

In [5]:
# 定义评估函数放在前面，方便后面多种机器学习算法的调用
def eva(result):
    metrics = ev.BinaryClassificationMetrics(result)
    N = result.count()
    accuracy = 1.0 * result.filter(lambda p: p[0] == p[1]).count() / N
    print("【Area Under ROC】{0:.2f}".format(metrics.areaUnderROC))
    print("【Area Under PR】{0:.2f}".format(metrics.areaUnderPR))
    print("【Accuracy】{0:.2f}".format(accuracy))

In [6]:
# 定义函数用于将预测出来的概率转化为对应 0 或 1 预测值
def trans(x):
    if x > 0.5:
        return 1
    else:
        return 0

## 1 决策树

In [7]:
# 构造决策树模型并进行训练
model_1 = DecisionTree.trainRegressor(train, {})

# 得到预测结果
y_pred_1 = model_1.predict(test.map(lambda row: row.features))
print(y_pred_1.take(10))

# 将概率转换为对应的 0 或 1
y_pred_1 = y_pred_1.map(lambda x: float(trans(x)))

# 提取真实值
y_ture = test.map(lambda row: row.label)

# 将真实值与预测值进行配对
result_1 = y_pred_1.zip(y_ture)
print(result_1.take(10))

[0.04611837048424289, 0.08620689655172414, 0.06111111111111111, 0.08288770053475936, 0.032148900169204735, 0.08620689655172414, 0.04611837048424289, 0.04611837048424289, 0.04611837048424289, 0.032148900169204735]
[(0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 1.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]


In [8]:
# 输出评估结果
eva(result_1)

【Area Under ROC】0.50
【Area Under PR】0.03
【Accuracy】0.94


## 2 支持向量机SVM

In [9]:
# 构造 SVM 模型并进行训练，其中设置迭代次数为 1000
numIterations = 1000
model_2 = SVMWithSGD.train(train, numIterations)

# 得到预测结果
y_pred_2 = model_2.predict(test.map(lambda row: row.features)).map(lambda x: float(x))

# 将真实值与预测值进行配对
result_2 = y_pred_2.zip(y_ture)
print(result_2.take(10))

[(0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 1.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]


In [10]:
# 输出评估结果
eva(result_2)

【Area Under ROC】0.50
【Area Under PR】0.06
【Accuracy】0.94


## 3 LogisticRegression

In [11]:
# 构造 LogisticRegression 模型并进行训练，其中设置迭代次数为 100
numIterations = 10
model_3 = LogisticRegressionWithSGD.train(train, numIterations)

# 得到预测结果
y_pred_3 = model_3.predict(test.map(lambda row: row.features)).map(lambda x: float(x))

# 将真实值与预测值进行配对
result_3 = y_pred_3.zip(y_ture)
print(result_3.take(10))

[(0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 1.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]


In [12]:
# 输出评估结果
eva(result_3)

【Area Under ROC】0.50
【Area Under PR】0.06
【Accuracy】0.94


## 4 朴素贝叶斯

In [13]:
# 构造贝叶斯模型并进行训练
model_4 = NaiveBayes.train(train, 1.0)

# 得到预测结果
y_pred_4 = model_4.predict(test.map(lambda row: row.features)).map(lambda x: float(x))

# 将真实值与预测值进行配对
result_4 = y_pred_4.zip(y_ture)
print(result_4.take(10))

[(0.0, 0.0), (1.0, 0.0), (1.0, 0.0), (0.0, 0.0), (1.0, 1.0), (1.0, 0.0), (0.0, 0.0), (1.0, 0.0), (0.0, 0.0), (1.0, 0.0)]


In [14]:
# 输出评估结果
eva(result_4)

【Area Under ROC】0.51
【Area Under PR】0.07
【Accuracy】0.55
