In [1]:
import pyspark
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import PCA
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("train_data .csv")
df = df.select('total_loan','year_of_loan','interest','monthly_payment','is_default','sub_class','work_type','employer_type','industry','work_year','house_exist','house_loan_status','censor_status','marriage','offsprings','use','region','debt_loan_ratio','del_in_18month','scoring_low','scoring_high','early_return','policy_code','f0','f1','f2','f3','f4','f5')

## 缺失值处理

In [2]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['total_loan','year_of_loan','interest','monthly_payment','house_exist','house_loan_status','censor_status','marriage','offsprings','use','region','debt_loan_ratio','del_in_18month','scoring_low','scoring_high','early_return','f0','f1','f2','f3','f4','f5'], outputCols=['total_loan','year_of_loan','interest','monthly_payment','house_exist','house_loan_status','censor_status','marriage','offsprings','use','region','debt_loan_ratio','del_in_18month','scoring_low','scoring_high','early_return','f0','f1','f2','f3','f4','f5'])
model = imputer.fit(df)
model.transform(df).show()
df = df.na.fill('0')

+----------+------------+--------+---------------+----------+---------+---------+--------------+------------------------------+---------+-----------+-----------------+-------------+--------+----------+---+------+---------------+--------------+-----------+------------+------------+-----------+-----------------+--------------------+---------------+------------------+-----------------+------------------+
|total_loan|year_of_loan|interest|monthly_payment|is_default|sub_class|work_type| employer_type|                      industry|work_year|house_exist|house_loan_status|censor_status|marriage|offsprings|use|region|debt_loan_ratio|del_in_18month|scoring_low|scoring_high|early_return|policy_code|               f0|                  f1|             f2|                f3|               f4|                f5|
+----------+------------+--------+---------------+----------+---------+---------+--------------+------------------------------+---------+-----------+-----------------+-------------+--------+

In [3]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="work_year", outputCol="work_year_t")
df = indexer.fit(df).transform(df)



In [4]:
supervised = RFormula(formula="is_default ~ total_loan+year_of_loan+interest+monthly_payment+sub_class+work_type+employer_type+industry+house_exist+house_loan_status+censor_status+marriage+offsprings+del_in_18month+scoring_low+scoring_high+early_return+use+region")
fittedRF = supervised.fit(df)
preparedDF = fittedRF.transform(df)

pca = PCA(k=62, inputCol="features", outputCol="newfeatures")
model = pca.fit(preparedDF)
result = model.transform(preparedDF)
train, test = result.randomSplit([0.8, 0.2])
train.show(5)

+----------+------------+--------+---------------+----------+---------+---------+--------------+--------------------+---------+-----------+-----------------+-------------+--------+----------+---+------+---------------+--------------+-----------+------------+------------+-----------+----+----+----+----+----+----+-----------+--------------------+-----+--------------------+
|total_loan|year_of_loan|interest|monthly_payment|is_default|sub_class|work_type| employer_type|            industry|work_year|house_exist|house_loan_status|censor_status|marriage|offsprings|use|region|debt_loan_ratio|del_in_18month|scoring_low|scoring_high|early_return|policy_code|  f0|  f1|  f2|  f3|  f4|  f5|work_year_t|            features|label|         newfeatures|
+----------+------------+--------+---------------+----------+---------+---------+--------------+--------------------+---------+-----------+-----------------+-------------+--------+----------+---+------+---------------+--------------+-----------+-------

In [5]:
lr = LogisticRegression(labelCol="label",featuresCol="newfeatures")
fittedLR = lr.fit(train)
fittedLR.transform(test).select("label", "prediction").show(30)

trainingSummary = fittedLR.summary
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

test_result = fittedLR.transform(test).select("label", "prediction")
TT = test_result.where("label=1 and prediction=1").count()
TF = test_result.where("label=0 and prediction=0").count()
total = test_result.count()
accuracy = (TT+TF)/total
print(accuracy)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 30 rows

+--------------------+-------------------+
|                 FPR|                TPR|
+--------------------+-------------------+
|                 0.0|                0.0|
|0.002422792970780909|0.04047439759036144|
|0.005370697722782572|0.07883617804551539|
|0.008724134345429967|0.11556643239625167|
|0.012389518560881772|0.151

In [6]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(featuresCol="features",numTrees=200, labelCol="label", seed=7)
rfModel = rf.fit(train)

test_result = rfModel.transform(test).select("label", "prediction")
TT = test_result.where("label=1 and prediction=1").count()
TF = test_result.where("label=0 and prediction=0").count()
total = test_result.count()
accuracy = (TT+TF)/total
print(accuracy)
print("模型特征重要性:{}".format(rfModel.featureImportances))
print("模型特征数:{}".format(rfModel.numFeatures))

0.7979850297400254
模型特征重要性:(71,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70],[0.006181310609526548,0.0675159704512101,0.09887863330243986,0.0030000767835973025,0.00021165850995721818,5.106080585409422e-05,9.60837183752774e-06,0.0007030622953706993,0.00032577962148022454,6.097801043259874e-05,7.167195102917475e-05,3.7405782628854225e-05,0.00162512452479944,6.506779783142316e-06,0.009208998080697918,0.009104772152015737,3.884036932070571e-05,8.885916219164366e-05,0.019962863112429452,6.97183452979049e-05,0.009639269329733705,0.014209030972299039,0.00021928895780609845,3.709813263677134e-05,0.002830028024613019,0.0013152511183747242,0.005271059913405066,0.00023050863568421516,0.0007119109786264047,0.00403589248836222,0.0022426496097276974,0.00016935290734600028,0.00011776645988826489,1.9770916883436202e-05,1.924532595807776e-05,4.03777

In [7]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
rfModel = rf.fit(train)

test_result = rfModel.transform(test).select("label", "prediction")
TT = test_result.where("label=1 and prediction=1").count()
TF = test_result.where("label=0 and prediction=0").count()
total = test_result.count()
accuracy = (TT+TF)/total
print(accuracy)

0.7979850297400254
