In [1]:
import pyspark
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("train_data .csv")



In [2]:
# 选择特征
df = df.select('total_loan', 'year_of_loan','interest', 'monthly_payment',
               'class', 'sub_class', 'work_type', 'employer_type',
               'industry', 'work_year', 'house_exist', 'house_loan_status',
               'censor_status', 'marriage', 'offsprings', 'use', 'region',
               'debt_loan_ratio', 'del_in_18month', 'scoring_low', 'scoring_high',
               'pub_dero_bankrup', 'early_return', 'early_return_amount', 
               'early_return_amount_3mon', 'recircle_b', 'recircle_u', 
               'initial_list_status', 'policy_code', 'f0', 'f1', 'f2', 'f3', 'f4', 'f5',
               'is_default')

In [4]:
from pyspark.ml.feature import RFormula
supervised = RFormula(formula="is_default ~ total_loan + year_of_loan + interest + monthly_payment + sub_class + work_type + employer_type + industry+house_exist+house_loan_status+censor_status+marriage+offsprings+del_in_18month+scoring_low+scoring_high+early_return+use+region")
fittedRF = supervised.fit(df)
preparedDF = fittedRF.transform(df)
train, test = preparedDF.randomSplit([0.8, 0.2])
train.show(5) 

+----------+------------+--------+---------------+-----+---------+---------+-------------+------------------+---------+-----------+-----------------+-------------+--------+----------+---+------+---------------+--------------+-----------+------------+----------------+------------+-------------------+------------------------+----------+----------+-------------------+-----------+----+----+----+----+----+----+----------+--------------------+-----+
|total_loan|year_of_loan|interest|monthly_payment|class|sub_class|work_type|employer_type|          industry|work_year|house_exist|house_loan_status|censor_status|marriage|offsprings|use|region|debt_loan_ratio|del_in_18month|scoring_low|scoring_high|pub_dero_bankrup|early_return|early_return_amount|early_return_amount_3mon|recircle_b|recircle_u|initial_list_status|policy_code|  f0|  f1|  f2|  f3|  f4|  f5|is_default|            features|label|
+----------+------------+--------+---------------+-----+---------+---------+-------------+--------------

In [5]:
# 使用逻辑回归预测并展示结果
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label",featuresCol="features")
fittedLR = lr.fit(train)
fittedLR.transform(test).select("label", "prediction").show(30) 

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
+-----+----------+
only showing top 30 rows



In [10]:
# 保存结果，并计算各类预测情况的数量
result = fittedLR.transform(test).select("label", "prediction")
TP = result.where("label = 1 and prediction = 1").count()
TN = result.where("label = 0 and prediction = 0").count()
FP = result.where("label = 0 and prediction = 1").count()
FN = result.where("label = 1 and prediction = 0").count()
print(TP, TN, FP, FN)

4775 45168 2616 7195


In [12]:
# 计算准确率、精准度、召回率和f1 score
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * precision * recall / (precision + recall)
print('准确率为：', accuracy)
print('精准度为：', precision)
print('召回率为：', recall)
print('f1 score = ', f1)

准确率为： 0.835810154968705
精准度为： 0.6460560140711676
召回率为： 0.3989139515455305
f1 score =  0.4932596456794586


In [18]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbtModel = gbt.fit(train)
result = gbtModel.transform(test).select("label", "prediction")
TP = result.where("label = 1 and prediction = 1").count()
TN = result.where("label = 0 and prediction = 0").count()
FP = result.where("label = 0 and prediction = 1").count()
FN = result.where("label = 1 and prediction = 0").count()
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
f1 = 2 * precision * recall / (precision + recall)
print('TP：', TP, '  TN：', TN, '  FP：', FP, '  FN：', FN)
print('准确率为：', accuracy)
print('精准度为：', precision)
print('召回率为：', recall)
print('f1 score = ', f1)

TP： 4429   TN： 45376   FP： 2408   FN： 7541
准确率为： 0.8335006861465342
精准度为： 0.6477987421383647
召回率为： 0.3700083542188805
f1 score =  0.470994842345935


In [22]:
def Logistic(train, test):
    lr = LogisticRegression(labelCol="label",featuresCol="features")
    fittedLR = lr.fit(train)
    result = fittedLR.transform(test).select("label", "prediction")
    TP = result.where("label = 1 and prediction = 1").count()
    TN = result.where("label = 0 and prediction = 0").count()
    FP = result.where("label = 0 and prediction = 1").count()
    FN = result.where("label = 1 and prediction = 0").count()
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1 = 2 * precision * recall / (precision + recall)
    lis = [accuracy, f1]
    return lis
def GBT(train, test):
    gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
    gbtModel = gbt.fit(train)
    result = gbtModel.transform(test).select("label", "prediction")
    TP = result.where("label = 1 and prediction = 1").count()
    TN = result.where("label = 0 and prediction = 0").count()
    FP = result.where("label = 0 and prediction = 1").count()
    FN = result.where("label = 1 and prediction = 0").count()
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1 = 2 * precision * recall / (precision + recall)
    lis = [accuracy, f1]
    return lis

In [25]:
# 循环十遍，每次重新随机划分，取十次的平均准确率和f1 score
LOG_acc = 0
LOG_f1 = 0
GBT_acc = 0
GBT_f1 = 0
for i in range(10):
    train, test = preparedDF.randomSplit([0.8, 0.2])
    LOG_lis = Logistic(train, test)
    GBT_lis = GBT(train, test)
    LOG_acc += LOG_lis[0]
    LOG_f1 += LOG_lis[1]
    GBT_acc += GBT_lis[0]
    GBT_f1 += GBT_lis[1]
# 取平均数
LOG_acc /= 10
LOG_f1 /= 10
GBT_acc /= 10
GBT_f1 /= 10
print('逻辑回归的平均准确率为：', LOG_acc, '平均f1 score为：', LOG_f1)
print('GBT的平均准确率为：', GBT_acc, '平均f1 score为：', GBT_f1)

逻辑回归的平均准确率为： 0.836561214701522 平均f1 score为： 0.49984261019117165
GBT的平均准确率为： 0.833929930088191 平均f1 score为： 0.48826365228720814
