In [1]:
! rm -rf metastore_db/

In [2]:
spark.stop()

In [3]:
spark = SparkSession \
    .builder \
    .appName("Prediction of Loan Payment") \
    .master("local[*]") \
    .config("spark.executor.cores","3")\
    .getOrCreate();

# 1. Data Understanding

## Collecting the current data from the LendingClub's Website.

In [4]:
#! rm -rf LoanStats_web.csv

In [5]:
#! touch LoanStats_web.csv

In [6]:
! ls -l /root/notebook/data/LoanStats_web.csv

-rw-r--r-- 1 root root 1160243241 Nov 22 18:37 /root/notebook/data/LoanStats_web.csv


In [7]:
! head -3 /root/notebook/data/LoanStats_web.csv

"id","member_id","loan_amnt","funded_amnt","funded_amnt_inv","term","int_rate","installment","grade","sub_grade","emp_title","emp_length","home_ownership","annual_inc","verification_status","issue_d","loan_status","pymnt_plan","url","desc","purpose","title","zip_code","addr_state","dti","delinq_2yrs","earliest_cr_line","inq_last_6mths","mths_since_last_delinq","mths_since_last_record","open_acc","pub_rec","revol_bal","revol_util","total_acc","initial_list_status","out_prncp","out_prncp_inv","total_pymnt","total_pymnt_inv","total_rec_prncp","total_rec_int","total_rec_late_fee","recoveries","collection_recovery_fee","last_pymnt_d","last_pymnt_amnt","next_pymnt_d","last_credit_pull_d","collections_12_mths_ex_med","mths_since_last_major_derog","policy_code","application_type","annual_inc_joint","dti_joint","verification_status_joint","acc_now_delinq","tot_coll_amt","tot_cur_bal","open_acc_6m","open_act_il","open_il_12m","open_il_24m","mths_since_rcnt_il","total_bal_il","il_util","open_rv_1

In [8]:
! wc -l /root/notebook/data/LoanStats_web.csv

1432493 /root/notebook/data/LoanStats_web.csv


In [9]:
raw_df = spark.read.format('csv').option('header','true').option('mode','DROPMALFORMED')\
.load('/root/notebook/data/LoanStats_web.csv')

In [10]:
raw201617_df = raw_df

In [11]:
raw201617_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: string (nullable = true)
 |-- funded_amnt: string (nullable = true)
 |-- funded_amnt_inv: string (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string 

### Numbers of Fields [1]

In [12]:
len(raw201617_df.columns)

144

In [13]:
rawweb_df = raw201617_df.select('id',
 'member_id',
 'loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'loan_status',
 'pymnt_plan',
 'url',
 'desc',
 'purpose',
 'title',
 'zip_code',
 'addr_state',
 'dti',
 'delinq_2yrs',
 'earliest_cr_line',
 'inq_last_6mths',
 'mths_since_last_delinq',
 'mths_since_last_record',
 'open_acc',
 'pub_rec',
 'revol_bal',
 'revol_util',
 'total_acc',
 'initial_list_status',
 'out_prncp',
 'out_prncp_inv',
 'total_pymnt',
 'total_pymnt_inv',
 'total_rec_prncp',
 'total_rec_int',
 'total_rec_late_fee',
 'recoveries',
 'collection_recovery_fee',
 'last_pymnt_d',
 'last_pymnt_amnt',
 'next_pymnt_d',
 'last_credit_pull_d',
 'collections_12_mths_ex_med',
 'mths_since_last_major_derog',
 'policy_code',
 'application_type',
 'annual_inc_joint',
 'dti_joint',
 'verification_status_joint',
 'acc_now_delinq',
 'tot_coll_amt',
 'tot_cur_bal',
 'open_acc_6m',
 #'open_il_6m',
 'open_il_12m',
 'open_il_24m',
 'mths_since_rcnt_il',
 'total_bal_il',
 'il_util',
 'open_rv_12m',
 'open_rv_24m',
 'max_bal_bc',
 'all_util',
 'total_rev_hi_lim',
 'inq_fi',
 'total_cu_tl',
 'inq_last_12m')

### Numbers of Fields after attributes selections [1]

In [14]:
len(rawweb_df.columns)

73

In [15]:
rawweb_df.select(['issue_d']).distinct().show()

+--------+
| issue_d|
+--------+
|Oct-2016|
|Feb-2019|
|Mar-2018|
|Sep-2018|
|Jan-2016|
|Jul-2016|
|Aug-2016|
|Feb-2017|
|Nov-2018|
|Jul-2018|
|Apr-2018|
|Dec-2018|
|May-2016|
|Jan-2018|
|Aug-2018|
|Jan-2019|
|Apr-2016|
|Sep-2016|
|Oct-2018|
|Feb-2018|
+--------+
only showing top 20 rows



In [16]:
raw_df = rawweb_df

In [17]:
raw_df.count()

1432439

In [18]:
df_colfam5 = raw_df.select("loan_amnt","term","int_rate","installment","grade","emp_length",\
                           "home_ownership","annual_inc","verification_status","loan_status",\
                           "purpose","addr_state","dti","delinq_2yrs","earliest_cr_line",\
                           "inq_last_6mths","open_acc","pub_rec","revol_bal","revol_util","total_acc",\
                           "last_credit_pull_d")

### Numbers of Fields after final attributes selections [1]

In [19]:
len(df_colfam5.columns)

22

In [20]:
! pip install pandas

[33mYou are using pip version 9.0.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [21]:
import pandas as pd
pd.DataFrame(df_colfam5.describe().take(6), columns=df_colfam5.describe().columns).transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,1432439,15370.392945179516,9646.028077240426,1000,9975
term,1432439,,,36 months,60 months
int_rate,1432439,,,5.31%,30.99%
installment,1432439,456.6879090138958,281.71351826414445,100.01,999.98
grade,1432439,,,A,G
emp_length,1432439,,,1 year,
home_ownership,1432439,,,ANY,RENT
annual_inc,1432439,81034.59412687029,134183.40313571738,0,99999.84
verification_status,1432439,,,Not Verified,Verified


df_colfam5.select(['last_credit_pull_d']).distinct().\
orderBy(['last_credit_pull_d'],ascending=1).show()

df_colfam5.select(['last_credit_pull_d']).\
filter(df_colfam5.last_credit_pull_d == '').count()

df_colfam5.select(['last_credit_pull_d']).\
filter(df_colfam5['last_credit_pull_d'].isNull()).count()

In [22]:
df_colfam5.groupby('home_ownership').count().show()

+--------------+------+
|home_ownership| count|
+--------------+------+
|           OWN|170888|
|          RENT|556960|
|      MORTGAGE|701646|
|           ANY|  2940|
|          NONE|     5|
+--------------+------+



# 2. Data Preparation

Divide this process into 2 parts. The first is a business oriented preparation that turn many business rules to be programming's logics. Its result benefits many tasks related to the Business Intelligence and other descriptive analytics. The second one is a data science oriented preparation that turn many requirements of data science to be programming's logics. Its result benefits many tasks related to the Predictive analytics.

Asumption: Business oriented preparation: No missing values, ONLY month, Correct data types.

## Data Cleansing: Remove missing values

In [23]:
df_no_missing = df_colfam5.dropna(how='any')

## Check for a schema

In [24]:
df_no_missing.printSchema()

root
 |-- loan_amnt: string (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: string (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- revol_bal: string (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: string (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



In [25]:
df_no_missing_fitmem = df_no_missing.repartition(60)

In [26]:
df_no_missing_cached = df_no_missing_fitmem.cache()

In [27]:
df_no_missing_cached.registerTempTable("df")

## Number of data rows

In [28]:
df_no_missing_cached.count()

1429645

## Data Transformation: Remove Sign of Percent and Extract Month.

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

### Remove Sign of Percent

In [30]:
def f_removepercentsign(origin):
    return origin.rstrip('%')

In [31]:
removepercentsign = udf(lambda x: f_removepercentsign(x),StringType())

### Extract Month

In [32]:
def f_extractmonth(origin):
    return origin.split('-')[0]

In [33]:
extractmonth = udf(lambda x: f_extractmonth(x),StringType())

In [34]:
extractterm = udf(lambda x: x.replace('months',''),StringType())

In [35]:
#extractemp_length = udf(lambda x: x.replace('year',''),StringType())

In [36]:
from pyspark.sql.functions import col

In [37]:
dfWithCrunch = df_no_missing_cached.\
withColumn('revol_util',removepercentsign(col('revol_util')).cast(DoubleType())).\
withColumn('int_rate',removepercentsign(col('int_rate')).cast(DoubleType())).\
withColumn('earliest_cr_line',extractmonth(col('earliest_cr_line')).cast(StringType())).\
withColumn('last_credit_pull_d',extractmonth(col('last_credit_pull_d')).cast(StringType())).\
withColumn('dti',col('dti').cast(DoubleType())).\
withColumn('loan_amnt',col('loan_amnt').cast(DoubleType())).\
withColumn('revol_bal',col('revol_bal').cast(DoubleType())).\
withColumn('term',extractterm(col('term')).cast(DoubleType())).\
withColumn('installment',col('installment').cast(DoubleType())).\
withColumn('open_acc',col('open_acc').cast(DoubleType())).\
withColumn('total_acc',col('total_acc').cast(DoubleType())).\
withColumn('pub_rec',col('pub_rec').cast(DoubleType())).\
withColumn('annual_inc',col('annual_inc').cast(DoubleType()))

In [38]:
#dfWithCrunch.select(['last_credit_pull_d']).printSchema()

In [39]:
#dfWithCrunch.groupby('emp_length').count().show()

In [40]:
#dfWithCrunch.count()

In [41]:
dfWithCrunch.select(['loan_amnt','int_rate','dti','home_ownership','grade','term']).show()

+---------+--------+-----+--------------+-----+----+
|loan_amnt|int_rate|  dti|home_ownership|grade|term|
+---------+--------+-----+--------------+-----+----+
|  20625.0|   10.75|24.16|      MORTGAGE|    B|36.0|
|  12875.0|   13.67| 6.49|      MORTGAGE|    C|36.0|
|  16300.0|    9.75|24.74|           OWN|    B|36.0|
|  10000.0|   14.46|33.73|          RENT|    C|36.0|
|   7200.0|    8.39| 8.08|          RENT|    B|36.0|
|  30000.0|   10.75|22.14|      MORTGAGE|    B|60.0|
|  15000.0|    9.75|15.91|          RENT|    B|60.0|
|   8000.0|    9.75|17.66|          RENT|    B|36.0|
|  10000.0|    6.97|17.24|      MORTGAGE|    A|36.0|
|   6500.0|   11.47| 16.0|          RENT|    B|36.0|
|  17650.0|   19.53|13.63|          RENT|    D|36.0|
|  10000.0|   12.99| 9.75|          RENT|    C|36.0|
|   7000.0|   11.47|21.36|      MORTGAGE|    B|36.0|
|  25000.0|   18.25|36.29|           OWN|    D|36.0|
|   8500.0|    6.49|17.23|      MORTGAGE|    A|36.0|
|  23675.0|   14.46|21.42|          RENT|    C

In [42]:
#dfWithCrunch.select(['loan_amnt','int_rate','dti','home_ownership','grade']).printSchema()

In [43]:
dfWithCrunch.select(['loan_amnt','annual_inc']).describe().show()

+-------+------------------+-----------------+
|summary|         loan_amnt|       annual_inc|
+-------+------------------+-----------------+
|  count|           1429645|          1429645|
|   mean|15366.699302973815|81111.66344812173|
| stddev| 9641.851365996221|134222.5400492037|
|    min|            1000.0|              0.0|
|    max|           40000.0|            6.1E7|
+-------+------------------+-----------------+



In [44]:
rawhive_df = dfWithCrunch.repartition(60).cache()
#rawhive_df.registerTempTable("crunched_data")

In [45]:
len(rawhive_df.columns)

22

In [46]:
rawhive_df.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- term: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: double (nullable = true)
 |-- pub_rec: double (nullable = true)
 |-- revol_bal: double (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



In [47]:
#rawhive_df.select(['loan_amnt','int_rate','dti','home_ownership','grade']).printSchema()

In [48]:
rawhive_df.count()

1429645

### Data Transformation: Normalization of "annual_inc" and "loan_amnt"

In [49]:
from pyspark.sql.functions import *

max_annual_inc = rawhive_df.select(max('annual_inc')).collect()[0][0]

min_annual_inc = rawhive_df.select(min('annual_inc')).collect()[0][0]

#sqlContext.udf.register("t_annual_inc", lambda x: ((x-min_annual_inc)/(max_annual_inc-min_annual_inc)))

In [50]:
def t_annual_inc(origin):
    return ((origin-min_annual_inc)/(max_annual_inc-min_annual_inc))

In [51]:
normalized_annual_inc = udf(lambda x: t_annual_inc(x),DoubleType())

In [52]:
max_loan_amnt = rawhive_df.select(max('loan_amnt')).collect()[0][0]

min_loan_amnt = rawhive_df.select(min('loan_amnt')).collect()[0][0]

#sqlContext.udf.register("t_loan_amnt", lambda x: ((x-min_loan_amnt)/(max_loan_amnt-min_loan_amnt)))

In [53]:
def t_loan_amnt(origin):
    return ((origin-min_loan_amnt)/(max_loan_amnt-min_loan_amnt))

In [54]:
normalized_loan_amnt = udf(lambda x: t_loan_amnt(x),DoubleType())

In [55]:
normalized_df = rawhive_df.withColumn('loan_amnt',normalized_loan_amnt(col('loan_amnt'))).\
withColumn('annual_inc',normalized_annual_inc(col('annual_inc')))

In [56]:
normalized_df.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- term: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: double (nullable = true)
 |-- pub_rec: double (nullable = true)
 |-- revol_bal: double (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



### Number of data rows that are only "Fully Paid" and "Charged Off"

In [57]:
from pyspark.sql.functions import col

In [58]:
normalized_df.select(col('loan_status')).groupBy('loan_status').count().show()

+------------------+------+
|       loan_status| count|
+------------------+------+
|        Fully Paid|582923|
|           Default|  1416|
|   In Grace Period|  6125|
|       Charged Off|152817|
|Late (31-120 days)| 19405|
|           Current|661982|
| Late (16-30 days)|  4977|
+------------------+------+



In [59]:
normalized_df.select(['loan_amnt','term','int_rate','installment','grade','annual_inc','revol_bal','loan_status']).\
orderBy(['loan_amnt','term','int_rate','installment','grade','annual_inc']).show()

+---------+----+--------+-----------+-----+--------------------+---------+-----------+
|loan_amnt|term|int_rate|installment|grade|          annual_inc|revol_bal|loan_status|
+---------+----+--------+-----------+-----+--------------------+---------+-----------+
|      0.0|36.0|    5.31|      30.12|    A|4.098360655737705E-4|   3215.0|    Current|
|      0.0|36.0|    5.31|      30.12|    A|5.081967213114754E-4|  25993.0| Fully Paid|
|      0.0|36.0|    5.31|      30.12|    A|5.901639344262295E-4|   3311.0| Fully Paid|
|      0.0|36.0|    5.31|      30.12|    A|6.065573770491804E-4|   1605.0| Fully Paid|
|      0.0|36.0|    5.31|      30.12|    A|6.557377049180328E-4|   1016.0| Fully Paid|
|      0.0|36.0|    5.31|      30.12|    A|6.557377049180328E-4|   1329.0|    Current|
|      0.0|36.0|    5.31|      30.12|    A|7.180327868852459E-4|    804.0|    Current|
|      0.0|36.0|    5.31|      30.12|    A|9.344262295081968E-4|    858.0| Fully Paid|
|      0.0|36.0|    5.31|      30.12|    A|

In [60]:
normalized_df.select(['loan_amnt','annual_inc']).describe().show()

+-------+-------------------+--------------------+
|summary|          loan_amnt|          annual_inc|
+-------+-------------------+--------------------+
|  count|            1429645|             1429645|
|   mean| 0.3683769052044868|0.001329699400788...|
| stddev|0.24722695810246717|0.002200369509003...|
|    min|                0.0|                 0.0|
|    max|                1.0|                 1.0|
+-------+-------------------+--------------------+



In [61]:
normalized_filtered_df = normalized_df.filter(col('loan_status') == 'Fully Paid').\
union(normalized_df.filter(col('loan_status') == 'Charged Off'))

In [62]:
data = normalized_filtered_df.repartition(60).cache()

### Drop Null

In [63]:
data_no_missing_df = data.dropna(how='any')

In [64]:
data_no_missing_df.count()

735740

In [65]:
df_no_missing_cached.select(['loan_status']).distinct().show(100)

+------------------+
|       loan_status|
+------------------+
|        Fully Paid|
|           Default|
|   In Grace Period|
|       Charged Off|
|Late (31-120 days)|
|           Current|
| Late (16-30 days)|
+------------------+



In [66]:
data_no_missing_df.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- term: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: double (nullable = true)
 |-- pub_rec: double (nullable = true)
 |-- revol_bal: double (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



# 3. Data Modeling

In [67]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder,\
VectorIndexer, QuantileDiscretizer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.clustering import *

In [68]:
installmentDiscretizer = QuantileDiscretizer(numBuckets=5,inputCol='installment',\
                                             outputCol='installmentDiscreted')

In [69]:
labelIndexer = StringIndexer(inputCol='loan_status',outputCol='indexedLabel')

In [70]:
homeIndexer = StringIndexer(inputCol='home_ownership',outputCol='homeIndexed')
homeOneHotEncoder = OneHotEncoder(dropLast=False,inputCol='homeIndexed',outputCol='homeVec')

In [71]:
gradeIndexer = StringIndexer(inputCol='grade',outputCol='gradeIndexed')
gradeOneHotEncoder = OneHotEncoder(dropLast=False,inputCol='gradeIndexed',outputCol='gradeVec')

In [72]:
purposeIndexer = StringIndexer(inputCol='purpose',outputCol='purposeIndexed')
purposeOneHotEncoder = OneHotEncoder(dropLast=False,inputCol='purposeIndexed',\
                                     outputCol='purposeVec')

In [73]:
emp_lengthIndexer = StringIndexer(inputCol='emp_length',outputCol='emp_lengthIndexed')
emp_lengthOneHotEncoder = OneHotEncoder(dropLast=False,inputCol='emp_lengthIndexed',\
                                     outputCol='emp_lengthVec')

In [74]:
featureAssembler = VectorAssembler(inputCols=[\
                                              'loan_amnt',\
                                              'int_rate',\
                                              #'revol_bal',\
                                              'annual_inc',\
                                              'emp_lengthVec',\
                                              'installmentDiscreted',\
                                              'dti',\
                                              'homeVec',\
                                              'gradeVec',\
                                              'open_acc',\
                                              'term'],\
                                   outputCol='features')

In [77]:
data_no_missing_df_fully_paid = data_no_missing_df.filter(col('loan_status') == 'Fully Paid').sample(True, 0.3, 42)

In [78]:
data_no_missing_df_charge_off = data_no_missing_df.filter(col('loan_status') == 'Charged Off')

In [79]:
final_data_no_missing_df = data_no_missing_df_fully_paid.union(data_no_missing_df_charge_off)

In [80]:
training , test = final_data_no_missing_df.filter(col('loan_amnt') > 0).randomSplit([0.8,0.2])

In [81]:
training.select('loan_amnt').describe().show()

+-------+-------------------+
|summary|          loan_amnt|
+-------+-------------------+
|  count|             260745|
|   mean| 0.3588826542879378|
| stddev|0.23803869280248646|
|    min|6.41025641025641E-4|
|    max|                1.0|
+-------+-------------------+



In [82]:
training.select(['revol_bal']).\
describe().show()

+-------+------------------+
|summary|         revol_bal|
+-------+------------------+
|  count|            260745|
|   mean|15877.743385299815|
| stddev| 21578.74699967469|
|    min|               0.0|
|    max|         1137891.0|
+-------+------------------+



In [83]:
training.select(['loan_amnt','term','int_rate','installment','annual_inc']).\
describe().show()

+-------+-------------------+------------------+------------------+-----------------+--------------------+
|summary|          loan_amnt|              term|          int_rate|      installment|          annual_inc|
+-------+-------------------+------------------+------------------+-----------------+--------------------+
|  count|             260745|            260745|            260745|           260745|              260745|
|   mean| 0.3588826542879378| 42.04323764597596|14.057881838578194|461.3462111258122|0.001280400949626...|
| stddev|0.23803869280248646|10.417168446356694| 5.397254156773198|284.4969347563382|0.001374141626438...|
|    min|6.41025641025641E-4|              36.0|              5.31|            33.05|                 0.0|
|    max|                1.0|              60.0|             30.99|          1715.42| 0.15693560655737704|
+-------+-------------------+------------------+------------------+-----------------+--------------------+



In [84]:
training

DataFrame[loan_amnt: double, term: double, int_rate: double, installment: double, grade: string, emp_length: string, home_ownership: string, annual_inc: double, verification_status: string, loan_status: string, purpose: string, addr_state: string, dti: double, delinq_2yrs: string, earliest_cr_line: string, inq_last_6mths: string, open_acc: double, pub_rec: double, revol_bal: double, revol_util: double, total_acc: double, last_credit_pull_d: string]

In [85]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

pipeline = Pipeline(stages=[gradeIndexer,gradeOneHotEncoder,\
                                    homeIndexer,homeOneHotEncoder,\
                                    purposeIndexer,purposeOneHotEncoder,\
                                    emp_lengthIndexer,emp_lengthOneHotEncoder,\
                                    #home_ownershipIndexer,home_ownershipOneHotEncoder,\
                                    installmentDiscretizer,\
                                    labelIndexer,featureAssembler])

In [86]:
train_df_features = pipeline.fit(training).transform(training)

### MLP: กำหนด Pipeline เพื่อ Train Network

In [87]:
layers = [
    train_df_features.schema["features"].metadata["ml_attr"]["num_attrs"],
    20, 10, 2
]

In [88]:
layers

[30, 20, 10, 2]

In [90]:
#layers = [30, 5, 4, 2]

clf = MultilayerPerceptronClassifier(labelCol='indexedLabel',featuresCol='features'\
                                     ,layers=layers)

In [91]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

pipeline = Pipeline(stages=[gradeIndexer,gradeOneHotEncoder,\
                                    homeIndexer,homeOneHotEncoder,\
                                    purposeIndexer,purposeOneHotEncoder,\
                                    emp_lengthIndexer,emp_lengthOneHotEncoder,\
                                    #home_ownershipIndexer,home_ownershipOneHotEncoder,\
                                    installmentDiscretizer,\
                                    labelIndexer,featureAssembler,clf])

In [92]:
x1 = 'stepSize'
x2 = 'maxIter'
paramGrid = ParamGridBuilder() \
    .addGrid(getattr(clf,x1), [0.1, 0.2]) \
    .addGrid(getattr(clf,x2),[5,10])\
    .build()
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                                          predictionCol='prediction', metricName='f1')
crossval = CrossValidator(estimator=pipeline,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=evaluator,
                                      numFolds=2)
cvModel = crossval.fit(training)

model1 = pipeline_dt.fit(training)

In [93]:
training.select(['loan_amnt','int_rate','dti','home_ownership','grade']).describe().show()

+-------+-------------------+------------------+------------------+--------------+------+
|summary|          loan_amnt|          int_rate|               dti|home_ownership| grade|
+-------+-------------------+------------------+------------------+--------------+------+
|  count|             260745|            260745|            260745|        260745|260745|
|   mean| 0.3588826542879378|14.057881838578194|19.258327906575396|          null|  null|
| stddev|0.23803869280248646| 5.397254156773198|14.335118991928999|          null|  null|
|    min|6.41025641025641E-4|              5.31|              -1.0|           ANY|     A|
|    max|                1.0|             30.99|             999.0|          RENT|     G|
+-------+-------------------+------------------+------------------+--------------+------+



In [94]:
training.select(['loan_amnt','int_rate','dti','home_ownership','grade']).show()

+--------------------+--------+-----+--------------+-----+
|           loan_amnt|int_rate|  dti|home_ownership|grade|
+--------------------+--------+-----+--------------+-----+
|0.001282051282051282|   19.53| 8.15|      MORTGAGE|    D|
|0.005128205128205128|    9.16| 6.35|          RENT|    B|
|0.005128205128205128|   10.75|  3.6|          RENT|    B|
|0.005128205128205128|   13.49|13.11|          RENT|    C|
|0.005128205128205128|   13.49|13.11|          RENT|    C|
|0.005128205128205128|   14.46|28.42|          RENT|    C|
|0.005128205128205128|   14.49|  8.5|           OWN|    C|
|0.005128205128205128|   15.99| 6.51|      MORTGAGE|    C|
|0.005128205128205128|   16.99| 15.6|          RENT|    D|
|0.005128205128205128|   17.99| 4.95|          RENT|    D|
|0.007692307692307693|   10.49|26.32|      MORTGAGE|    B|
| 0.01282051282051282|    5.32|18.29|      MORTGAGE|    A|
| 0.01282051282051282|    7.49| 2.95|      MORTGAGE|    A|
| 0.01282051282051282|    8.99|13.75|      MORTGAGE|    

# 4. Evaluation of Models

In [97]:
result = cvModel.transform(test)

In [98]:
test.count()

65335

In [99]:
training.count()

260745

In [100]:
training.groupBy('loan_status').count().show()

+-----------+------+
|loan_status| count|
+-----------+------+
| Fully Paid|139151|
|Charged Off|121594|
+-----------+------+



In [101]:
(0.25*349664)

87416.0

In [102]:
training.filter(training.loan_status == 'Fully Paid').count()

139151

In [103]:
training.filter(training.loan_status == 'Charged Off').count()

121594

In [104]:
result.select('loan_status','indexedLabel','prediction').filter(result.loan_status != 'Fully Paid').show(10)

+-----------+------------+----------+
|loan_status|indexedLabel|prediction|
+-----------+------------+----------+
|Charged Off|         1.0|       0.0|
|Charged Off|         1.0|       0.0|
|Charged Off|         1.0|       0.0|
|Charged Off|         1.0|       1.0|
|Charged Off|         1.0|       1.0|
|Charged Off|         1.0|       0.0|
|Charged Off|         1.0|       1.0|
|Charged Off|         1.0|       1.0|
|Charged Off|         1.0|       1.0|
|Charged Off|         1.0|       0.0|
+-----------+------------+----------+
only showing top 10 rows



In [105]:
result.select('loan_status','indexedLabel','prediction').filter(result.loan_status != 'Fully Paid').count()

30709

In [106]:
result.select('loan_status','indexedLabel','prediction').filter(result.loan_status != 'Fully Paid').\
filter(result.indexedLabel == result.prediction).count()

14637

In [107]:
result.select('loan_status','indexedLabel','prediction').filter(result.loan_status != 'Fully Paid').\
filter(result.indexedLabel != result.prediction).count()

16072

In [108]:
result.select('loan_status','indexedLabel','prediction').filter(result.loan_status == 'Fully Paid').show(20)

+-----------+------------+----------+
|loan_status|indexedLabel|prediction|
+-----------+------------+----------+
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       1.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
| Fully Paid|         0.0|       0.0|
+-----------+------------+----------+
only showing top 20 rows



In [109]:
result.select('loan_status','indexedLabel','prediction').filter(result.indexedLabel != result.prediction).\
filter(result.loan_status == 'Fully Paid').count()

10916

In [110]:
result.select('loan_status','indexedLabel','prediction').filter(result.indexedLabel != result.prediction).filter(result.loan_status == 'Charged Off').count()

16072

## Calculation for Confusion Matrix.

In [111]:
def eval_metrics(lap):

    tp = float(len(lap[(lap['indexedLabel']==0) & (lap['prediction']==0)]))

    tn = float(len(lap[(lap['indexedLabel']==1) & (lap['prediction']==1)]))

    fp = float(len(lap[(lap['indexedLabel']==1) & (lap['prediction']==0)]))

    fn = float(len(lap[(lap['indexedLabel']==0) & (lap['prediction']==1)]))

    positivepredictivevalue = tp / (tp+fp)
    negativepredictivevalue = tn / (tn+fn)

    sensitivity = tp / (tp+fn)
    specificity = tn / (tn+fp)

    accuracy = (tp+tn) / (tp+tn+fp+fn)

    return {'PPV': positivepredictivevalue, 'NPV':negativepredictivevalue,'Sensitivity': sensitivity, 'Specificity': specificity, 'Accuracy': accuracy}

In [112]:
! pip install pandas

[33mYou are using pip version 9.0.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [113]:
lap = result.select("indexedLabel", "prediction").toPandas()

In [114]:
m = eval_metrics(lap)

In [115]:
m

{'Accuracy': 0.5869289048748756,
 'NPV': 0.5728094548585293,
 'PPV': 0.5959981901362426,
 'Sensitivity': 0.684745566915035,
 'Specificity': 0.4766355140186916}

In [116]:
tp = float(len(lap[(lap['indexedLabel']==0) & (lap['prediction']==0)])) #tp - Full paid

In [117]:
tn = float(len(lap[(lap['indexedLabel']==1) & (lap['prediction']==1)])) #tn - Charged off

In [118]:
fp = float(len(lap[(lap['indexedLabel']==1) & (lap['prediction']==0)])) #fp

In [119]:
fn = float(len(lap[(lap['indexedLabel']==0) & (lap['prediction']==1)])) #fn

In [120]:
! pip install plotly

[33mYou are using pip version 9.0.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [121]:
! pip install chart-studio

[33mYou are using pip version 9.0.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [122]:
#model1.save('./model_24Nov2019/')

In [124]:
#model1

In [125]:
! ls -l ./model_24Nov2019/stages/

total 28
drwxr-xr-x 4 root root 4096 Nov 24 23:20 0_StringIndexer_484eae119774ef396f49
drwxr-xr-x 3 root root 4096 Nov 24 23:20 1_OneHotEncoder_4e1d9fd76b68ea04742c
drwxr-xr-x 4 root root 4096 Nov 24 23:20 2_StringIndexer_4e979f712c33c84c5864
drwxr-xr-x 3 root root 4096 Nov 24 23:20 3_OneHotEncoder_4c0c8f93c5951c01730b
drwxr-xr-x 4 root root 4096 Nov 24 23:20 4_StringIndexer_49cb99cdfa7a82111308
drwxr-xr-x 3 root root 4096 Nov 24 23:20 5_VectorAssembler_46f8866472577f94716e
drwxr-xr-x 4 root root 4096 Nov 24 23:20 6_DecisionTreeClassifier_4ecf9b9c170d7161b14c


In [126]:
m

{'Accuracy': 0.5869289048748756,
 'NPV': 0.5728094548585293,
 'PPV': 0.5959981901362426,
 'Sensitivity': 0.684745566915035,
 'Specificity': 0.4766355140186916}

# Jan2020

In [None]:
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import Bucketizer

In [None]:
def get_model(df,categoricalCols,continuousCols,discretedCols,split_range,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col
    
    
    labelIndexer = StringIndexer(inputCol=labelCol,\
                             outputCol='indexedLabel',\
                             handleInvalid='keep')

    indexers = [ StringIndexer(handleInvalid='keep',\
                               inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]
    discretizers = [ Bucketizer(inputCol=d, outputCol="{0}_discretized".format(d)\
                 ,splits=split_range)
                 for d in discretedCols ]
    
    
    featureCols = ['features']
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols +\
                                [discretizer.getOutputCol() for discretizer in discretizers], \
                                outputCol='features')
    
    
    
    ml_algorithm = DecisionTreeClassifier(featuresCol='features',\
                                labelCol='indexedLabel')
    


    
    pipeline = Pipeline(stages=[labelIndexer] + indexers + encoders + discretizers + \
                        [assembler] + [ml_algorithm])
    
    

    model=pipeline.fit(df)
    #data = model.transform(df)

    #data = data.withColumn('label',col(labelCol))

    return model

In [None]:
training

In [None]:
training.select('dti').describe().show()

In [None]:
catcols = [\
           'emp_length',\
           'home_ownership',\
           'verification_status',\
           'purpose',\
           'addr_state'\
          ]

num_cols = [\
            #'dti',\
            #'loan_amnt',\
            'int_rate',\
            'installment',\
            #'annual_inc',\
            #'revol_bal',\
            #'delinq_2yrs',\
            'pub_rec',\
            #'revol_util'\
            'total_acc'\
           ]

discols = [\
           #'pub_rec',\
           'dti',\
           #'installment'\
          ]



labelCol = 'loan_status'

splits = [-1.0, 0.0, 40.0, 80.0, 120.0, 160.0, float("inf")]

In [None]:
ml_model = get_model(training,catcols,num_cols,discols, splits, labelCol)

In [None]:
ml_model

In [None]:
ml_model.stages

In [None]:
get_bucket = ml_model.stages[11]

In [None]:
get_bucket.getOutputCol()

In [None]:
get_bucket.transform(training).groupBy(get_bucket.getOutputCol()).count().show()

In [None]:
lap2 = ml_model.transform(test).select("indexedLabel", "prediction").toPandas()

In [None]:
m2 = eval_metrics(lap2)

In [None]:
m2

In [None]:
training.groupby('loan_status').count().show()

In [None]:
test.groupby('loan_status').count().show()

In [None]:
training.count()

In [None]:
training.count()

In [None]:
training.groupby('loan_status').count().show()

In [None]:
test.groupBy('loan_status').count().show()

In [None]:
test_result_df = ml_model.transform(test)

In [None]:
test_result_df.filter(col('indexedLabel') != col('prediction')).count()

In [None]:
test_result_df.filter(col('indexedLabel') != col('prediction')).filter(col('loan_status') == 'Charged Off').\
groupBy(['loan_status','indexedLabel','prediction']).count().show()

In [None]:
test_result_df.filter(col('loan_status') == 'Charged Off').count()

In [None]:
13903/30629