In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [2]:
spark = SparkSession.builder \
    .enableHiveSupport().getOrCreate()
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("final_data.csv")
df_assembler = VectorAssembler(inputCols=[ 'total_loan','year_of_loan','interest','monthly_payment','class', 'sub_class', 'work_type', 'employer_type', 'industry',
 'work_year', 'house_exist','house_loan_status', 'censor_status','marriage', 'offsprings', 'use', 'post_code',
 'region', 'debt_loan_ratio', 'del_in_18month', 'scoring_low', 'scoring_high', 'pub_dero_bankrup', 'early_return',
 'early_return_amount', 'early_return_amount_3mon', 'recircle_b', 'recircle_u', 'initial_list_status', 'title',
 'policy_code', 'f0', 'f1', 'f2', 'f3', 'f4', 'f5', 'total_money'],outputCol='features')
labeled_df = df_assembler.transform(df)

                                                                                

In [3]:
labeled_df = labeled_df["is_default","features"]
data_set = labeled_df.select(['features', 'is_default'])
train_df, test_df = data_set.randomSplit([0.8, 0.2])

In [4]:
numTrees = 20
maxDepth = 15
impurity = 'entropy'
maxBins = 32
rf_classifier=RandomForestClassifier(labelCol='is_default', featuresCol="features",numTrees=numTrees,maxDepth=maxDepth,impurity=impurity,maxBins=maxBins).fit(train_df)  
rf_predictions=rf_classifier.transform(test_df)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='is_default')
print('AUC：'+str(numTrees)+str(maxDepth)+impurity+str(maxBins), evaluator.evaluate(rf_predictions))

21/12/19 14:31:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/12/19 14:32:20 WARN DAGScheduler: Broadcasting large task binary with size 1466.9 KiB
21/12/19 14:32:22 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
21/12/19 14:32:25 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/19 14:32:30 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
21/12/19 14:32:33 WARN DAGScheduler: Broadcasting large task binary with size 1049.4 KiB
21/12/19 14:32:35 WARN DAGScheduler: Broadcasting large task binary with size 8.6 MiB
21/12/19 14:32:39 WARN DAGScheduler: Broadcasting large task binary with size 1432.6 KiB
21/12/19 14:32:41 WARN DAGScheduler: Broadcasting large task binary with size 12.5 MiB
21/12/19 14:32:46 WARN DAGScheduler: Broadcasting large task binary with size 1876.2 KiB
21/12/19 14:32:50 WARN DAGSchedul

AUC：2015entropy32 0.8623369861102074


In [4]:
numTrees = 20
maxDepth = 18
impurity = 'entropy'
maxBins = 32
rf_classifier=RandomForestClassifier(labelCol='is_default', featuresCol="features",numTrees=numTrees,maxDepth=maxDepth,impurity=impurity,maxBins=maxBins).fit(train_df)  
rf_predictions=rf_classifier.transform(test_df)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='is_default')
print('AUC：'+str(numTrees)+str(maxDepth)+impurity+str(maxBins), evaluator.evaluate(rf_predictions))

21/12/18 22:06:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/12/18 22:06:49 WARN DAGScheduler: Broadcasting large task binary with size 1459.9 KiB
21/12/18 22:06:51 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
21/12/18 22:06:54 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/18 22:06:58 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
21/12/18 22:07:01 WARN DAGScheduler: Broadcasting large task binary with size 1056.9 KiB
21/12/18 22:07:03 WARN DAGScheduler: Broadcasting large task binary with size 8.7 MiB
21/12/18 22:07:06 WARN DAGScheduler: Broadcasting large task binary with size 1445.2 KiB
21/12/18 22:07:09 WARN DAGScheduler: Broadcasting large task binary with size 12.6 MiB
21/12/18 22:07:14 WARN DAGScheduler: Broadcasting large task binary with size 1892.4 KiB
21/12/18 22:07:17 WARN DAGSchedul

AUC：2018entropy32 0.8560613603199781


In [10]:
numTrees = 40
maxDepth = 15
impurity = 'entropy'
maxBins = 32
rf_classifier=RandomForestClassifier(labelCol='is_default', featuresCol="features",numTrees=numTrees,maxDepth=maxDepth,impurity=impurity,maxBins=maxBins).fit(train_df)  
rf_predictions=rf_classifier.transform(test_df)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='is_default')
print('AUC：'+str(numTrees)+str(maxDepth)+impurity+str(maxBins), evaluator.evaluate(rf_predictions))

21/12/18 22:26:58 WARN DAGScheduler: Broadcasting large task binary with size 1684.4 KiB
21/12/18 22:27:00 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
21/12/18 22:27:04 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
21/12/18 22:27:08 WARN DAGScheduler: Broadcasting large task binary with size 7.4 MiB
21/12/18 22:27:12 WARN DAGScheduler: Broadcasting large task binary with size 1496.9 KiB
21/12/18 22:27:14 WARN DAGScheduler: Broadcasting large task binary with size 11.6 MiB
21/12/18 22:27:19 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
21/12/18 22:27:22 WARN DAGScheduler: Broadcasting large task binary with size 17.6 MiB
21/12/18 22:27:31 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
21/12/18 22:27:35 WARN DAGScheduler: Broadcasting large task binary with size 25.6 MiB
21/12/18 22:29:20 WARN Executor: Issue communicating with driver in heartbeater]
org.apache.spark.rpc.RpcTimeoutException: Cannot r

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/bf/.local/lib/python3.8/site-packages/IPython/core/interac

ConnectionRefusedError: [Errno 111] Connection refused

In [4]:
numTrees_list = [20,50,100,150,200,300]
maxDepth_list = [10,15,20,30,40,50]
impurity_list = ['gini', 'entropy']
maxBins_list = [32]
for numTrees in numTrees_list:
    for maxDepth in maxDepth_list:
        for impurity in impurity_list:
            for maxBins in maxBins_list:
                rf_classifier=RandomForestClassifier(labelCol='is_default', featuresCol="features",numTrees=numTrees,maxDepth=maxDepth,impurity=impurity,maxBins=maxBins).fit(train_df)  
                rf_predictions=rf_classifier.transform(test_df)
                evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='is_default')
                print('AUC：'+str(numTrees)+str(maxDepth)+impurity+str(maxBins), evaluator.evaluate(rf_predictions))

21/12/18 21:25:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/12/18 21:26:07 WARN DAGScheduler: Broadcasting large task binary with size 1035.3 KiB
21/12/18 21:26:09 WARN DAGScheduler: Broadcasting large task binary with size 1733.8 KiB
                                                                                

AUC：2010gini32 0.8557574665077384


21/12/18 21:26:38 WARN DAGScheduler: Broadcasting large task binary with size 1468.7 KiB
                                                                                

AUC：2010entropy32 0.8517265259696734


21/12/18 21:27:03 WARN DAGScheduler: Broadcasting large task binary with size 1035.3 KiB
21/12/18 21:27:05 WARN DAGScheduler: Broadcasting large task binary with size 1733.8 KiB
21/12/18 21:27:08 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
21/12/18 21:27:11 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
21/12/18 21:27:15 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
21/12/18 21:27:19 WARN DAGScheduler: Broadcasting large task binary with size 1252.4 KiB
21/12/18 21:27:21 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
21/12/18 21:27:25 WARN DAGScheduler: Broadcasting large task binary with size 1707.2 KiB
21/12/18 21:27:29 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB
21/12/18 21:27:34 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
21/12/18 21:27:38 WARN DAGScheduler: Broadcasting large task binary with size 9.4 MiB
                                        

AUC：2015gini32 0.862169467345574


21/12/18 21:28:02 WARN DAGScheduler: Broadcasting large task binary with size 1468.7 KiB
21/12/18 21:28:05 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
21/12/18 21:28:07 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
21/12/18 21:28:11 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB
21/12/18 21:28:14 WARN DAGScheduler: Broadcasting large task binary with size 1074.2 KiB
21/12/18 21:28:16 WARN DAGScheduler: Broadcasting large task binary with size 8.8 MiB
21/12/18 21:28:19 WARN DAGScheduler: Broadcasting large task binary with size 1474.5 KiB
21/12/18 21:28:22 WARN DAGScheduler: Broadcasting large task binary with size 12.8 MiB
21/12/18 21:28:27 WARN DAGScheduler: Broadcasting large task binary with size 1920.0 KiB
21/12/18 21:28:31 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
                                                                                

AUC：2015entropy32 0.8626899891879092


21/12/18 21:28:53 WARN DAGScheduler: Broadcasting large task binary with size 1035.3 KiB
21/12/18 21:28:55 WARN DAGScheduler: Broadcasting large task binary with size 1733.8 KiB
21/12/18 21:28:57 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
21/12/18 21:29:00 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
21/12/18 21:29:04 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
21/12/18 21:29:08 WARN DAGScheduler: Broadcasting large task binary with size 1252.4 KiB
21/12/18 21:29:10 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB
21/12/18 21:29:14 WARN DAGScheduler: Broadcasting large task binary with size 1707.2 KiB
21/12/18 21:29:17 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB
21/12/18 21:29:23 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
21/12/18 21:29:27 WARN DAGScheduler: Broadcasting large task binary with size 21.0 MiB
21/12/18 21:29:33 WARN DAGScheduler: Br

Py4JJavaError: An error occurred while calling o736.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 215.0 failed 1 times, most recent failure: Lost task 0.0 in stage 215.0 (TID 548) (192.168.0.105 executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3596/1742404485.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:418)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3567/1864117116.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.RDD$$Lambda$2603/905447321.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2377/774780918.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:663)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:208)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:302)
	at org.apache.spark.ml.classification.RandomForestClassifier.$anonfun$train$1(RandomForestClassifier.scala:161)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:138)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3596/1742404485.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:418)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$3567/1864117116.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.RDD$$Lambda$2603/905447321.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2377/774780918.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
