# 导包和环境配置

In [1]:
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import SQLContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier,NaiveBayes,DecisionTreeClassifier,GBTClassifier,OneVsRest,LogisticRegression,LogisticRegressionSummary,MultilayerPerceptronClassifier
from pyspark.sql.types import IntegerType
sc = SparkContext("local", "first")
logFile = "file:///F:/FBDP/实验/实验四/logfile1.txt"  
logData = sc.textFile(logFile).cache()
spark = SparkSession.builder.config("spark.driver.memory", "16g").getOrCreate()

# 读入数据并填补缺失值

In [2]:
df=spark.read.options(header='True') .csv("file:///F:/FBDP/实验/实验四/train_data.csv")
df = df.na.fill(-1)
df = df.na.fill('-1')
df=df.withColumnRenamed("is_default", "label")

# 把categorical变量转换为数值(略显粗暴)

In [3]:
#class
class_indexer = StringIndexer(inputCol='class', outputCol='class_num').fit(df)
df = class_indexer.transform(df)
class_onehoter = OneHotEncoder(inputCol='class_num', outputCol='class_vector')
df = class_onehoter.transform(df)
#sub_class
sub_class_indexer = StringIndexer(inputCol='sub_class', outputCol='sub_class_num').fit(df)
df = sub_class_indexer.transform(df)
sub_class_onehoter = OneHotEncoder(inputCol='sub_class_num', outputCol='sub_class_vector')
df = sub_class_onehoter.transform(df)
#work_type
work_type_indexer = StringIndexer(inputCol='work_type', outputCol='work_type_num').fit(df)
df = work_type_indexer.transform(df)
work_type_onehoter = OneHotEncoder(inputCol='work_type_num', outputCol='work_type_vector')
df = work_type_onehoter.transform(df)
#employer_type
employer_type_indexer = StringIndexer(inputCol='employer_type', outputCol='employer_type_num').fit(df)
df = employer_type_indexer.transform(df)
employer_type_onehoter = OneHotEncoder(inputCol='employer_type_num', outputCol='employer_type_vector')
df = employer_type_onehoter.transform(df)
#industry
industry_indexer = StringIndexer(inputCol='industry', outputCol='industry_num').fit(df)
df = industry_indexer.transform(df)
industry_onehoter = OneHotEncoder(inputCol='industry_num', outputCol='industry_vector')
df = industry_onehoter.transform(df)
#work_year
work_year_indexer = StringIndexer(inputCol='work_year', outputCol='work_year_num').fit(df)
df = work_year_indexer.transform(df)
work_year_onehoter = OneHotEncoder(inputCol='work_year_num', outputCol='work_year_vector')
df = work_year_onehoter.transform(df)
#df.show(3)
#issue_date
issue_date_indexer = StringIndexer(inputCol='issue_date', outputCol='issue_date_num').fit(df)
df = issue_date_indexer.transform(df)
issue_date_onehoter = OneHotEncoder(inputCol='issue_date_num', outputCol='issue_date_vector')
df = issue_date_onehoter.transform(df)
#df.show(3)
#earlies_credit_mon
earlies_credit_mon_indexer = StringIndexer(inputCol='earlies_credit_mon', outputCol='earlies_credit_mon_num').fit(df)
df = earlies_credit_mon_indexer.transform(df)
earlies_credit_mon_onehoter = OneHotEncoder(inputCol='earlies_credit_mon_num', outputCol='earlies_credit_mon_vector')
df = earlies_credit_mon_onehoter.transform(df)
df.show(3)

+-------+-------+----------+------------+--------+---------------+-----+---------+---------+-------------+------------------------------+---------+-----------+-----------------+-------------+--------+----------+----------+---+---------+------+---------------+--------------+-----------+------------+----------------+------------+-------------------+------------------------+----------+----------+-------------------+------------------+-----+-----------+---+---+----+----+---+---+-----+---------+-------------+-------------+----------------+-------------+----------------+-----------------+--------------------+------------+---------------+-------------+----------------+--------------+-----------------+----------------------+-------------------------+
|loan_id|user_id|total_loan|year_of_loan|interest|monthly_payment|class|sub_class|work_type|employer_type|                      industry|work_year|house_exist|house_loan_status|censor_status|marriage|offsprings|issue_date|use|post_code|region|debt

# 把string列转为int

In [4]:
tmpCols=['total_loan', 'year_of_loan', 'interest', 'monthly_payment', 'class_vector','sub_class_vector','work_type_vector','employer_type_vector','industry_vector','issue_date_vector','earlies_credit_mon_vector','house_exist','house_loan_status','censor_status','marriage','offsprings','use','post_code','region','debt_loan_ratio','del_in_18month','scoring_low','scoring_high','pub_dero_bankrup','early_return','early_return_amount','early_return_amount_3mon','recircle_b','recircle_u','initial_list_status','title','policy_code','f0','f1','f2','f3','f4','f5']
for i in tmpCols:
    if "vector" in i:
        print("")
    else:
        df = df.withColumn(i, df[i].cast('double'))
#df = df.withColumn("total_loan", df["total_loan"].cast(IntegerType()))
df = df.withColumn('label', df['label'].cast(IntegerType()))
df.show(3)








+-------+-------+----------+------------+--------+---------------+-----+---------+---------+-------------+------------------------------+---------+-----------+-----------------+-------------+--------+----------+----------+---+---------+------+---------------+--------------+-----------+------------+----------------+------------+-------------------+------------------------+----------+----------+-------------------+------------------+-----+-----------+----+----+----+----+----+----+-----+---------+-------------+-------------+----------------+-------------+----------------+-----------------+--------------------+------------+---------------+-------------+----------------+--------------+-----------------+----------------------+-------------------------+
|loan_id|user_id|total_loan|year_of_loan|interest|monthly_payment|class|sub_class|work_type|employer_type|                      industry|work_year|house_exist|house_loan_status|censor_status|marriage|offsprings|issue_date|use|post_code|

# 把输入特征合并到一列

In [5]:
data = df.drop('label')
feas = data.columns
df_assembler = VectorAssembler(inputCols=['total_loan', 'year_of_loan', 'interest', 'monthly_payment', 'class_vector','sub_class_vector','work_type_vector','work_year_vector','employer_type_vector','industry_vector','issue_date_vector','earlies_credit_mon_vector','house_exist','house_loan_status','censor_status','marriage','offsprings','use','post_code','region','debt_loan_ratio','del_in_18month','scoring_low','scoring_high','pub_dero_bankrup','early_return','early_return_amount','early_return_amount_3mon','recircle_b','recircle_u','initial_list_status','title','policy_code','f0','f1','f2','f3','f4','f5'],outputCol='features')
print(df_assembler)
data = df_assembler.transform(df)
data.show()

VectorAssembler_1b84fdf45a05
+-------+-------+----------+------------+--------+---------------+-----+---------+---------+--------------+------------------------------+---------+-----------+-----------------+-------------+--------+----------+----------+---+---------+------+---------------+--------------+-----------+------------+----------------+------------+-------------------+------------------------+----------+----------+-------------------+------------------+-------+-----------+----+----+----+----+----+----+-----+---------+-------------+-------------+----------------+-------------+----------------+-----------------+--------------------+------------+---------------+-------------+----------------+--------------+-----------------+----------------------+-------------------------+--------------------+
|loan_id|user_id|total_loan|year_of_loan|interest|monthly_payment|class|sub_class|work_type| employer_type|                      industry|work_year|house_exist|house_loan_status|censor_statu

# 划分数据集（8：2）

In [6]:
data_set = data.select(['features', 'label'])
train_df, test_df = data_set.randomSplit([0.8, 0.2])
#print(' train_df shape : (%d , %d)'%(train_df.count(), len(train_df.columns)))
#print(' test_df  shape: :(%d , %d)'%(test_df.count(), len(test_df.columns)))

# 训练RFC模型

In [8]:
train_df.show(5)
RFC_reg = RandomForestClassifier(maxDepth=2).fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    0|
+--------------------+-----+
only showing top 5 rows



# LinearSVC

In [9]:
train_df.show(5)
RFC_reg = LinearSVC().fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    0|
+--------------------+-----+
only showing top 5 rows



# GBTClassifier

In [10]:
train_df.show(5)
RFC_reg = GBTClassifier().fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    0|
+--------------------+-----+
only showing top 5 rows



# 决策树

In [26]:
train_df.show(5)
RFC_reg = DecisionTreeClassifier().fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
+--------------------+-----+
only showing top 5 rows



# OneVsRest

In [7]:
train_df.show(5)
lr = LogisticRegression(regParam=0.01)
ovr = OneVsRest(classifier=lr)
RFC_reg = ovr.fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
+--------------------+-----+
only showing top 5 rows



# NaiveBayes

In [7]:
train_df.show(5)
RFC_reg = NaiveBayes().fit(train_df)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    1|
|(926,[0,1,2,3,4,1...|    0|
|(926,[0,1,2,3,4,1...|    0|
+--------------------+-----+
only showing top 5 rows



Py4JJavaError: An error occurred while calling o596.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Naive Bayes requires nonnegative feature values but found (926,[0,1,2,3,4,11,44,48,59,64,102,449,899,900,901,902,903,905,906,907,908,909,910,911,915,916,919,920,921,922,923,924],[8500.0,3.0,12.49,284.32,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,3.0,567.0,9.0,7.06,1.0,675.0,679.0,1.0,2029.0,17.0,1.0,7.0,-1.0,25.0,17.0,13.0]).
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.NaiveBayes$.requireNonnegativeValues(NaiveBayes.scala:235)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$4.apply(NaiveBayes.scala:144)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$4.apply(NaiveBayes.scala:144)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$7.apply(NaiveBayes.scala:168)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$7.apply(NaiveBayes.scala:166)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1.apply(NaiveBayes.scala:176)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1.apply(NaiveBayes.scala:129)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.classification.NaiveBayes.trainWithLabelCheck(NaiveBayes.scala:129)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:118)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:78)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: Naive Bayes requires nonnegative feature values but found (926,[0,1,2,3,4,11,44,48,59,64,102,449,899,900,901,902,903,905,906,907,908,909,910,911,915,916,919,920,921,922,923,924],[8500.0,3.0,12.49,284.32,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,3.0,567.0,9.0,7.06,1.0,675.0,679.0,1.0,2029.0,17.0,1.0,7.0,-1.0,25.0,17.0,13.0]).
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.NaiveBayes$.requireNonnegativeValues(NaiveBayes.scala:235)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$4.apply(NaiveBayes.scala:144)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$4.apply(NaiveBayes.scala:144)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$7.apply(NaiveBayes.scala:168)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$trainWithLabelCheck$1$$anonfun$7.apply(NaiveBayes.scala:166)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:189)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:188)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# 在测试集上做预测

In [8]:
test0=test_df.drop('label')
RFC_reg.transform(test0).head().prediction

0.0

In [9]:
predictions = RFC_reg.transform(test_df)
predictions.show()

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    1|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    1|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    1|       1.0|
|(926,[0,1,2,3,4,1...|    1|       1.0|
|(926,[0,1,2,3,4,1...|    1|       1.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    1|       0.0|
|(926,[0,1,2,3,4,1...|    1|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
|(926,[0,1,2,3,4,1...|    0|       0.0|
+--------------------+-----+----------+
only showing top 20 rows



# 模型评估

In [10]:
tp=predictions.filter("label==1 and prediction==1").count()
tn=predictions.filter("label==0 and prediction==0").count()
fp=predictions.filter("label==0 and prediction==1").count()
fn=predictions.filter("label==1 and prediction==0").count()
print(tp)

4694


In [11]:
print(tn)
print(fp)
print(fn)

45706
2309
7494


In [12]:
print('test accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn)))
recal=tp/(tp+fn)
prec=tp/(tp+fp)
print('test recall is : %f'%(recal))
print('test precision is : %f'%(prec))
print('test f1-score is : %f'%(2*recal*prec/(prec+recal)))

test accuracy is : 0.837168
test recall is : 0.385133
test precision is : 0.670284
test f1-score is : 0.489188


In [24]:
sc.stop()