In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
import pandas as pd
from pyspark.sql import *
spark = SparkSession.builder.appName('Data_model').getOrCreate()
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import StructType, StructField, IntegerType

引入数据集

In [3]:
normalizer_df = spark.read.csv('/home/ubuntu/BDAS_yang124/Datasets/normalizer_df_pandas.csv',inferSchema=True,header=True)
normalizer_df.show(5)
normalizer_df.printSchema()

+---+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|_c0|Private|             Apps|           Accept|           Enroll|       F_Undergrad|       P_Undergrad|         Outstate|       Room_Board|         S_F_Ratio|           Expend|        Grad_Rate|
+---+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+
|  0|    Yes|7.415175109613295|7.117205503164344|6.582025138892826|7.9676267393338165|6.2878585601617845|8.914760527397261|8.101980731853192|2.9496883350525844|8.859647499714997|4.110873864173311|
|  1|    Yes|7.690286020676768|7.562681246721884|6.240275845170769| 7.895063498091573| 7.113142108707088|9.415808631610384|8.771990436532242| 2.580216829592325|9.261793653565098| 4.04305126783455|
|  2|    Yes|7.

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
inputCols=['Apps',
 'Accept',
 'Enroll',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'S_F_Ratio',
 'Expend',
 'Grad_Rate']
assembler = VectorAssembler(
  inputCols=inputCols,
outputCol="features")
output = assembler.transform(normalizer_df)
indexer = StringIndexer(inputCol="Private", outputCol="PrivateIndex")
final_data_all = indexer.fit(output).transform(output)
final_data_all.show(5)


final_data = final_data_all.selectExpr("features","`PrivateIndex` as label")

final_data.show(5)


+---+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+------------+
|_c0|Private|             Apps|           Accept|           Enroll|       F_Undergrad|       P_Undergrad|         Outstate|       Room_Board|         S_F_Ratio|           Expend|        Grad_Rate|            features|PrivateIndex|
+---+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+------------+
|  0|    Yes|7.415175109613295|7.117205503164344|6.582025138892826|7.9676267393338165|6.2878585601617845|8.914760527397261|8.101980731853192|2.9496883350525844|8.859647499714997|4.110873864173311|[7.41517510961329...|         0.0|
|  1|    Yes|7.690286020676768|7.562681246721884|6.240275845170769| 7.895063

In [5]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier,LogisticRegression,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

分割数据用于选择最佳模型，获取最佳参数

In [6]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

需要使用测模型构建，并且不适用参数并获取其评价分数

In [7]:
lr = LogisticRegression(maxIter=10)
rfc = RandomForestClassifier(numTrees=10)
dtc = DecisionTreeClassifier()
nb = NaiveBayes()
lr_original_model=lr.fit(train_data)
rfc_original_model=rfc.fit(train_data)
dtc_original_model=dtc.fit(train_data)
nb_original_model=nb.fit(train_data)
lr_original_predictions=lr_original_model.transform(test_data)
rfc_original_predictions=rfc_original_model.transform(test_data)
dtc_original_predictions=dtc_original_model.transform(test_data)
nb_original_predictions=nb_original_model.transform(test_data)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_binary_eval = BinaryClassificationEvaluator()

print("DTC")
print(my_binary_eval.evaluate(dtc_original_predictions))
print("RFC")
print(my_binary_eval.evaluate(rfc_original_predictions))
print("LR")
print(my_binary_eval.evaluate(lr_original_predictions))
print("NB")
print(my_binary_eval.evaluate(nb_original_predictions))

DTC
0.8992599084623625
RFC
0.9784789171292236
LR
0.9738046547862493
NB
0.13253481351640864


In [8]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator( predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_original_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_original_predictions)
lr_acc = acc_evaluator.evaluate(lr_original_predictions)
nb_acc = acc_evaluator.evaluate(nb_original_predictions)

print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using LR has an accuracy of: {0:2.2f}%'.format(lr_acc*100))
print('-'*40)
print('An ensemble using NB has an accuracy of: {0:2.2f}%'.format(nb_acc*100))

----------------------------------------
A single decision tree has an accuracy of: 91.15%
----------------------------------------
A random forest ensemble has an accuracy of: 94.25%
----------------------------------------
An ensemble using LR has an accuracy of: 94.69%
----------------------------------------
An ensemble using NB has an accuracy of: 76.11%


参数选择，模型构建

In [9]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam,[0.0001, 0.001, 0.01, 0.1, 0.25, 0.5, 0.75, 1]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  

lr_best_model = crossval.fit(train_data)
params = lr_best_model.getEstimatorParamMaps()
avgMetrics = lr_best_model.avgMetrics
all_params = list(zip(params, avgMetrics))
best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0]

print(best_param)


({Param(parent='LogisticRegression_48f2a6224606139a780e', name='regParam', doc='regularization parameter (>= 0).'): 0.0001}, 0.9594424043096944)


In [10]:
dtc = DecisionTreeClassifier()
pipeline = Pipeline(stages=[dtc])
paramGrid = ParamGridBuilder().addGrid(dtc.maxDepth,[5, 10]).addGrid(dtc.minInfoGain,[0,0.1,0.5,0.8, 1]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  

dtc_best_Model = crossval.fit(train_data)


params = dtc_best_Model.getEstimatorParamMaps()
avgMetrics = dtc_best_Model.avgMetrics
all_params = list(zip(params, avgMetrics))
best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0]

print(best_param)



({Param(parent='DecisionTreeClassifier_4f60a93d4b2a50f1a1e8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='DecisionTreeClassifier_4f60a93d4b2a50f1a1e8', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0}, 0.9053637460376946)


In [11]:
rfc = RandomForestClassifier(numTrees=10)
pipeline = Pipeline(stages=[rfc])
paramGrid = ParamGridBuilder().addGrid(dtc.maxDepth,[5, 10]).addGrid(dtc.minInfoGain,[0,0.1,0.5,0.8, 1]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  

rfc_best_Model = crossval.fit(train_data)


params = rfc_best_Model.getEstimatorParamMaps()
avgMetrics = rfc_best_Model.avgMetrics
all_params = list(zip(params, avgMetrics))
best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0]

print(best_param)



({Param(parent='DecisionTreeClassifier_4f60a93d4b2a50f1a1e8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='DecisionTreeClassifier_4f60a93d4b2a50f1a1e8', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0}, 0.9590244172833468)


In [12]:
nb = NaiveBayes()
pipeline = Pipeline(stages=[nb])
paramGrid = ParamGridBuilder().addGrid(nb.smoothing,[1,3,5,8, 10,15]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  

nb_best_Model = crossval.fit(train_data)


params = nb_best_Model.getEstimatorParamMaps()
avgMetrics = nb_best_Model.avgMetrics
all_params = list(zip(params, avgMetrics))
best_param = sorted(all_params, key=lambda x: x[1], reverse=True)[0]

print(best_param)


({Param(parent='NaiveBayes_4183ade2e25b44e97f3a', name='smoothing', doc='The smoothing parameter, should be >= 0, default is 1.0'): 5}, 0.17847727785936476)


参数选择后的模型进行评分

In [13]:
lr_best_predictions=lr_best_model.transform(test_data)
rfc_best_predictions=rfc_best_Model.transform(test_data)
dtc_best_predictions=dtc_best_Model.transform(test_data)
nb_best_predictions=nb_best_Model.transform(test_data)

my_binary_eval = BinaryClassificationEvaluator()

print("DTC_best")
print(my_binary_eval.evaluate(dtc_best_predictions))


print("RFC_best")
print(my_binary_eval.evaluate(rfc_best_predictions))

print("LR_best")
print(my_binary_eval.evaluate(lr_best_predictions))

print("NB_best")
print(my_binary_eval.evaluate(nb_best_predictions))

dtc_acc = acc_evaluator.evaluate(dtc_best_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_best_predictions)
lr_acc = acc_evaluator.evaluate(lr_best_predictions)
nb_acc = acc_evaluator.evaluate(nb_best_predictions)

print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using LR has an accuracy of: {0:2.2f}%'.format(lr_acc*100))
print('-'*40)
print('An ensemble using NB has an accuracy of: {0:2.2f}%'.format(nb_acc*100))

DTC_best
0.8992599084623625
RFC_best
0.9784789171292236
LR_best
0.9738046547862493
NB_best
0.13263219398188725
----------------------------------------
A single decision tree has an accuracy of: 91.15%
----------------------------------------
A random forest ensemble has an accuracy of: 94.25%
----------------------------------------
An ensemble using LR has an accuracy of: 94.69%
----------------------------------------
An ensemble using NB has an accuracy of: 76.11%


In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

def evaluate(predictionAndLabels):
    log = {}

    # Show Validation Score (AUROC)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')
    log['AUROC'] = "%f" % evaluator.evaluate(predictionAndLabels)    
    print("Area under ROC = {}".format(log['AUROC']))

    # Show Validation Score (AUPR)
    evaluator = BinaryClassificationEvaluator(metricName='areaUnderPR')
    log['AUPR'] = "%f" % evaluator.evaluate(predictionAndLabels)
    print("Area under PR = {}".format(log['AUPR']))

    # Metrics
    predictionRDD = predictionAndLabels.select(['label', 'prediction']) \
                            .rdd.map(lambda line: (line[1], line[0]))
    metrics = MulticlassMetrics(predictionRDD)

    # Confusion Matrix
    print(metrics.confusionMatrix().toArray())

    # Overall statistics
    log['precision'] = "%s" % metrics.precision()
    log['recall'] = "%s" % metrics.recall()
    log['F1 Measure'] = "%s" % metrics.fMeasure()
    print("[Overall]\tprecision = %s | recall = %s | F1 Measure = %s" % \
            (log['precision'], log['recall'], log['F1 Measure']))

    # Statistics by class
    labels = [0.0, 1.0]
    for label in sorted(labels):
        log[label] = {}
        log[label]['precision'] = "%s" % metrics.precision(label)
        log[label]['recall'] = "%s" % metrics.recall(label)
        log[label]['F1 Measure'] = "%s" % metrics.fMeasure(label, 
                                                           beta=1.0)
        print("[Class %s]\tprecision = %s | recall = %s | F1 Measure = %s" \
                  % (label, log[label]['precision'], 
                    log[label]['recall'], log[label]['F1 Measure']))

    return log

print("LR 模型")
lr_log = evaluate(lr_best_predictions)

print("DTC 模型")
evaluate(dtc_best_predictions)

print("RF 模型")
evaluate(rfc_best_predictions)

print("NB 模型")
evaluate(nb_best_predictions)




LR 模型
Area under ROC = 0.973805
Area under PR = 0.901227
[[158.   5.]
 [  7.  56.]]
[Overall]	precision = 0.9469026548672567 | recall = 0.9469026548672567 | F1 Measure = 0.9469026548672567
[Class 0.0]	precision = 0.9575757575757575 | recall = 0.9693251533742331 | F1 Measure = 0.9634146341463414
[Class 1.0]	precision = 0.9180327868852459 | recall = 0.8888888888888888 | F1 Measure = 0.9032258064516128




NameError: name 'opt' is not defined