## 机器学习：将数据导入并分成测试集与训练集

In [3]:
from pyspark.ml.classification import GBTClassificationModel
df=spark.sql("select * from user_erin.creditcard")
from pyspark.ml.feature import VectorAssembler
df_assembler = VectorAssembler(inputCols=['time','V1','V2','V3','V4','V5','V6','V7','V8','V9'
                                          ,'V10','V11','V12','V13','V14'
                                         ,'V15','V16','V17','V18','V19','V20'
                                         ,'V21','V22','V23','V24','V25','V26','V27','V28','amount'],outputCol='features')
df= df_assembler.transform(df)
df=df.withColumnRenamed('class', 'label')
(train, test) = df.randomSplit([0.8, 0.2],seed = 11)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 机器学习：GBTC模型的调参与预测
- 用于分类的梯度提升决策树模型。该模型属于集成模型（Ensemble methods）家族。集成模型结合多个弱预测模型而形成一个强健的模型。
- 运用TrainValidationSplit（TVS）来进行参数调优。TVS对于每个参数组合只进行一次评估，当训练数据集足够大时，开销较小，并且产生的结果可靠。TVS会创建单个 (training, test) pair。它使用trainRatio参数将数据集split成两部分。例如： trainRatio=0.8，TVS会生成一个训练集（80%）和一个测试集（20%）。TVS最后会使用Estimator、以及最好的ParamMap，对整个数据集进行拟合。
- 运用param_grid方程来定义TVS检测的参数，最后一共12种参数组合模型被测试。
- maxIter 设置的越大结果越精确，但是设置的越大也就意味着越耗时；一般建议10-20。
- maxDepth - 树的最大深度（例如深度0表示1个叶节点，深度1表示1个内部节点+ 2个叶节点）。 （默认值：3）
- maxBins - 用于分割特征的最大bin数量。 DecisionTree需要maxBins> = max类别。 （默认值：32）

In [4]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
import pyspark.sql.functions as f
gbt = GBTClassifier()
gbt_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName='areaUnderROC')
param_grid = ParamGridBuilder() \
            .addGrid(gbt.maxDepth, [ 2, 4, 6]) \
            .addGrid(gbt.maxIter,[10,20])\
            .addGrid(gbt.maxBins,[20,60])\
            .build()
tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=param_grid,
                           evaluator=gbt_evaluator,
                           trainRatio = 0.8)
model=tvs.fit(train)
model=model.bestModel
prediction=model.transform(test)
tp = prediction.filter((f.col('label')  == 1) & (f.col('prediction')  == 1)).count()
tn = prediction.filter((f.col('label')  == 0) & (f.col('prediction')  == 0)).count()
fp = prediction.filter((f.col('label')  == 0) & (f.col('prediction')  == 1)).count()
fn = prediction.filter((f.col('label')  == 1) & (f.col('prediction')  == 0)).count()
acc=float(tp+tn)/(tp+tn+fp+fn)
print( "Accuracy:" + str(acc))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy:0.9994012081506137

将训练完成的表现最佳模型存入hdfs以便于后续的使用。
将最佳模型的预测存入hive表中。

In [6]:
model.write().overwrite().save("hdfs:///user/erin/data/mllib/gbtcmodel") 
prediction.write.saveAsTable("user_erin.creditcard_prediction", format="orc", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…