In [4]:
!python --version

Python 3.7.9


### Introduction

The Home Mortgage Disclosure Act was a bill passed by the U.S. Congress in 1975. It is an important piece of legislation that requires banks, credit unions, and other types of financial institutions to publicly report data involving the mortgages they process. This information can be used by individuals or businesses to analyze trends, conduct research, and/or make critical financial decisions. Some of the information legally required to be released includes the type of loan applied for, the amount requested to borrow, property type, ethnicity of the borrower, and much more. The HMDA was strengthened decades later after the passage of the Dodd–Frank Wall Street Reform and Consumer Protection Act, a response to the irresponsible mortgage lending by banks which caused the 2008 recession. Dodd Frank then required lenders to publicly report additional information such as loan terms, the age of the borrowers, credit score type, and interest rates, among others. 

This notebook will attempt to create machine learning classifiers to predict whether loans are approved or denied based on the historical data provided from the HMDA website. The dataset contains records from 2018-2019 and can be downloaded from the link at the bottom of the notebook in the References section.

### Importing Data & Libraries

In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

import os
import seaborn as sns
import pandas as pd
import numpy as np
import plotly
import cufflinks as cf

cf.go_offline()
pd.set_option('display.max_colwidth', None)

print(os.listdir('../loans'))

['.ipynb_checkpoints', '2018_lar.txt', '2018_ts.txt', '2019_lar.txt', '2019_ts.txt', 'CFPB Loans 2.ipynb', 'CFPB Loans.ipynb', 'model-benchmarks.txt', 'spark-warehouse']


In [6]:
spark = (SparkSession
        .builder 
        .master("local[*]") 
        .appName("cfpb-loans") 
        .getOrCreate())

spark

In [7]:
loans_2019 = (spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", "|")
    .option("inferSchema", "true")
    .load("../loans/2019_lar.txt"))

loans_2018 = (spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", "|")
    .option("inferSchema", "true")
    .load("../loans/2018_lar.txt"))

loans_2019.printSchema()

root
 |-- activity_year: integer (nullable = true)
 |-- lei: string (nullable = true)
 |-- derived_msa_md: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- census_tract: string (nullable = true)
 |-- conforming_loan_limit: string (nullable = true)
 |-- derived_loan_product_type: string (nullable = true)
 |-- derived_dwelling_category: string (nullable = true)
 |-- derived_ethnicity: string (nullable = true)
 |-- derived_race: string (nullable = true)
 |-- derived_sex: string (nullable = true)
 |-- action_taken: integer (nullable = true)
 |-- purchaser_type: integer (nullable = true)
 |-- preapproval: integer (nullable = true)
 |-- loan_type: integer (nullable = true)
 |-- loan_purpose: integer (nullable = true)
 |-- lien_status: integer (nullable = true)
 |-- reverse_mortgage: integer (nullable = true)
 |-- open_end_line_of_credit: integer (nullable = true)
 |-- business_or_commercial_purpose: integer (nullable = true)


In [8]:
loans = loans_2019.unionAll(loans_2018)

loans_2019.columns == loans_2018.columns

print('2019 loans count: ', loans_2019.count(), '\n' 
      '2018 loans count: ', loans_2018.count(), '\n'
      'total loan count: ', loans.count())

2019 loans count:  17522258 
2018 loans count:  15128476 
total loan count:  32650734


In [9]:
loans.limit(5).toPandas()

Unnamed: 0,activity_year,lei,derived_msa_md,state_code,county_code,census_tract,conforming_loan_limit,derived_loan_product_type,derived_dwelling_category,derived_ethnicity,...,denial_reason_2,denial_reason_3,denial_reason_4,tract_population,tract_minority_population_percent,ffiec_msa_md_median_family_income,tract_to_msa_income_percentage,tract_owner_occupied_units,tract_one_to_four_family_homes,tract_median_age_of_housing_units
0,2019,549300LIZH7VW4DFJK44,44060,WA,53065,53065941000,C,FHA:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,2204,85.53,67400,58,517,850,32
1,2019,549300LIZH7VW4DFJK44,46140,OK,40143,40143007633,C,FHA:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,2609,20.35,68600,166,836,1035,32
2,2019,549300LIZH7VW4DFJK44,99999,OK,40147,40147000600,C,FHA:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,7428,20.81,55800,150,2460,3036,38
3,2019,549300LIZH7VW4DFJK44,36420,OK,40109,40109108900,C,Conventional:First Lien,Single Family (1-4 Units):Site-Built,Not Hispanic or Latino,...,,,,4036,30.43,73100,86,1107,1547,33
4,2019,549300LIZH7VW4DFJK44,42140,NM,35049,35049940600,C,FHA:First Lien,Single Family (1-4 Units):Site-Built,Hispanic or Latino,...,,,,3673,80.56,73200,89,1057,1523,29


In [10]:
institutions = (spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", "|")
    .option("inferSchema", "true")
    .load("../loans/2019_ts.txt"))

institutions.printSchema()
institutions.limit(5).toPandas()

root
 |-- activity_year: integer (nullable = true)
 |-- calendar_quarter: integer (nullable = true)
 |-- lei: string (nullable = true)
 |-- tax_id: string (nullable = true)
 |-- agency_code: integer (nullable = true)
 |-- respondent_name: string (nullable = true)
 |-- respondent_state: string (nullable = true)
 |-- respondent_city: string (nullable = true)
 |-- respondent_zip_code: string (nullable = true)
 |-- lar_count: integer (nullable = true)



Unnamed: 0,activity_year,calendar_quarter,lei,tax_id,agency_code,respondent_name,respondent_state,respondent_city,respondent_zip_code,lar_count
0,2019,4,5493005HN78XQ5US3306,72-0433983,5,CSE Federal Credit Union,LA,Sulphur,70665,180
1,2019,4,549300HNQDVXN8569N58,01-0079380,5,cPort Credit Union,ME,Portland,4103,146
2,2019,4,549300W2DSERWC01YR18,14-1155630,1,Walden Savings Bank,NY,MONTGOMERY,12549,545
3,2019,4,549300E3X7YDJIUENL65,36-6006909,5,NuMark Credit Union,IL,Joliet,60436,1007
4,2019,4,549300QTN66CFMVWLH03,41-0400045,5,MAYO EMPLOYEES FEDERAL CREDIT UNION,MN,ROCHESTER,55902,1136


In [11]:
from pyspark.sql import *

institutions.registerTempTable('Institutions')
loans_2019.registerTempTable('Loans2019')
loans_2018.registerTempTable('Loans2018')
loans.registerTempTable('Loans')

### Exploration

In [12]:
query = """ SELECT COUNT(*)
            FROM Institutions """
spark.sql(query).show(truncate = False)

+--------+
|count(1)|
+--------+
|5525    |
+--------+



In [13]:
query = """ SELECT AVG(loan_amount) AS Average_Loan_2019
            FROM Loans2019 """
spark.sql(query).show(truncate = False)

query = """ SELECT AVG(loan_amount) AS Average_Loan_2018
            FROM Loans2018 """
spark.sql(query).show(truncate = False)

+-----------------+
|Average_Loan_2019|
+-----------------+
|268214.9277356263|
+-----------------+

+-----------------+
|Average_Loan_2018|
+-----------------+
|241037.3535739489|
+-----------------+



The above query shows the average loan amount applied for in each year.

In [14]:
query = """ WITH top10 AS (SELECT lei AS Institution_ID, AVG(loan_amount) AS Average_Loan
                           FROM Loans 
                           GROUP BY lei
                           ORDER BY Average_Loan DESC 
                           LIMIT 10) 
            
            SELECT top10.Institution_ID, Institutions.respondent_name AS Name, top10.Average_Loan
            FROM top10
            LEFT JOIN Institutions ON top10.Institution_ID = Institutions.lei """
spark.sql(query).show(truncate = False)

+--------------------+----------------------------------------------+--------------------+
|Institution_ID      |Name                                          |Average_Loan        |
+--------------------+----------------------------------------------+--------------------+
|1DU7IM20QESYGDO4HO54|The Northwestern Mutual Life Insurance Company|6.189032258064516E7 |
|C4BXATY60WC6XEOZDX54|Metropolitan Life Insurance Co                |5.688422535211267E7 |
|7LTWFZYICNSX8D621K86|Deutsche Bank AG                              |5.543406976744186E7 |
|B2S31CFVSWTN3FR00Q90|PGIM Real Estate Finance, LLC                 |3.611929824561404E7 |
|CUMYEZJOAF02RYZ1JJ85|Principal Financial Group                     |3.01139010989011E7  |
|549300J5WHZ3UBAKJW15|Prudential Affordable Mortgage Company, LLC   |2.909069536423841E7 |
|549300IBT3SCF2K09D58|CBRE Multifamily Capital, Inc.                |2.614407894736842E7 |
|549300RDGRWJXEQOEC49|Prudential Multifamily Mortgage, LLC.         |2.526874251497006E7 |

The above query shows the top 10 banks/financial institutions with the highest average loan amount applied for.

In [15]:
query = """ SELECT COUNT(*) AS total_count
            FROM Loans 
            WHERE action_taken IN (1, 2, 3, 5, 8) AND loan_purpose IN (1, 2, 31, 32) """

spark.sql(query).show(truncate = False)

+-----------+
|total_count|
+-----------+
|22237131   |
+-----------+



The above query shows the total number of loans approved or denied based on the values explained in the 
data definitions. This does not include the records that were denied for being incomplete or those not processed 
due to applicants withdrawing consideration.

In [16]:
query = """ WITH x AS (
                SELECT *, 
                CASE
                    WHEN action_taken IN (3, 5) THEN 0
                    WHEN action_taken IN (1, 2, 8) THEN 1
                END AS approved
                FROM Loans 
                WHERE action_taken IN (1, 2, 3, 5, 8) AND loan_purpose IN (1, 2, 31, 32) AND 
                      derived_race NOT IN ('Race Not Available', 'Free Form Text Only') 
            ), y AS (
                SELECT derived_race, COUNT(approved) as cnt, approved
                FROM x 
                GROUP BY derived_race, approved 
                ORDER BY derived_race) 
            
            SELECT derived_race, approved, cnt / SUM(cnt) OVER(PARTITION BY derived_race) AS percentage
            FROM y """

spark.sql(query).show(truncate = False)

+-----------------------------------------+--------+-------------------+
|derived_race                             |approved|percentage         |
+-----------------------------------------+--------+-------------------+
|2 or more minority races                 |1       |0.5929314976494291 |
|2 or more minority races                 |0       |0.40706850235057085|
|American Indian or Alaska Native         |0       |0.4113604204568425 |
|American Indian or Alaska Native         |1       |0.5886395795431575 |
|Asian                                    |0       |0.23828686619630318|
|Asian                                    |1       |0.7617131338036969 |
|Black or African American                |0       |0.378879598130544  |
|Black or African American                |1       |0.621120401869456  |
|Joint                                    |0       |0.2139335192525951 |
|Joint                                    |1       |0.7860664807474049 |
|Native Hawaiian or Other Pacific Islander|1       

The above query shows the percentages of loan approval & denial for each race.

### Preprocessing

In [17]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

features = ['derived_race', 'derived_sex', 'purchaser_type', 'loan_type', 'loan_amount', 
            'loan_term', 'occupancy_type', 'income', 'debt_to_income_ratio', 'applicant_age', 
            'discount_points', 'business_or_commercial_purpose', 'state_code', 'preapproval', 'loan_purpose']
target = ['approved']

def check_approval(num):
    if num in (1, 2, 8):
        return 1
    elif num in (3, 5):
        return 0
    
def remove_features(column):
    return (column not in features) and (column not in target)
    
func = F.udf(check_approval, IntegerType())

query = """ SELECT * 
            FROM Loans 
            WHERE action_taken IN (1, 2, 3, 5, 8) AND loan_purpose IN (1, 2, 31, 32) """
filtered = spark.sql(query)
loans_adjusted = filtered.withColumn("approved", func("action_taken"))

dropped_features = list(filter(remove_features, loans_adjusted.columns))
df = loans_adjusted.drop(*dropped_features)
df.groupBy('approved').count().show()

+--------+--------+
|approved|   count|
+--------+--------+
|       1|16857273|
|       0| 5379858|
+--------+--------+



In [18]:
pd.DataFrame(df.take(5), columns = df.columns)

Unnamed: 0,state_code,derived_race,derived_sex,purchaser_type,preapproval,loan_type,loan_purpose,business_or_commercial_purpose,loan_amount,discount_points,loan_term,occupancy_type,income,debt_to_income_ratio,applicant_age,approved
0,WA,American Indian or Alaska Native,Joint,0,2,2,31,2,135000.0,,180,1,61,20%-<30%,55-64,0
1,OK,White,Joint,0,2,2,1,2,215000.0,,360,1,82,40,25-34,0
2,OK,Joint,Joint,0,2,2,2,2,35000.0,,180,1,125,30%-<36%,55-64,0
3,OK,White,Male,1,2,1,31,2,345000.0,4600.2,360,1,153,20%-<30%,25-34,1
4,AK,American Indian or Alaska Native,Female,2,2,2,1,2,135000.0,2461.37,360,1,34,36,55-64,1


In [19]:
from pyspark.sql.functions import isnan, when, count, col

df.registerTempTable('Filtered')
df.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- derived_race: string (nullable = true)
 |-- derived_sex: string (nullable = true)
 |-- purchaser_type: integer (nullable = true)
 |-- preapproval: integer (nullable = true)
 |-- loan_type: integer (nullable = true)
 |-- loan_purpose: integer (nullable = true)
 |-- business_or_commercial_purpose: integer (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- discount_points: string (nullable = true)
 |-- loan_term: string (nullable = true)
 |-- occupancy_type: integer (nullable = true)
 |-- income: string (nullable = true)
 |-- debt_to_income_ratio: string (nullable = true)
 |-- applicant_age: string (nullable = true)
 |-- approved: integer (nullable = true)



In [20]:
query = """ SELECT derived_race, derived_sex, purchaser_type, loan_type, 
                   business_or_commercial_purpose, INT(loan_amount), 
                   INT(discount_points), INT(loan_term), occupancy_type, 
                   INT(income), debt_to_income_ratio, applicant_age, 
                   state_code, preapproval, loan_purpose, approved
            FROM Filtered """

df = spark.sql(query)
df.registerTempTable('Filtered')

In [21]:
query = """ SELECT COUNT(*), 
            CASE
                WHEN debt_to_income_ratio IN ('30%-<36%', '36', '37', '38', '39') THEN '30%-<40%'
                WHEN debt_to_income_ratio IN ('40', '41', '42', '43', '44', '45', '46', '47', '48', '49') THEN '40%-<50%'
                ELSE debt_to_income_ratio
            END AS dtir
            FROM Filtered 
            GROUP BY dtir
            ORDER BY COUNT(*) """

spark.sql(query).show(truncate = False)

+--------+--------+
|count(1)|dtir    |
+--------+--------+
|712969  |Exempt  |
|1018561 |>60%    |
|1374671 |<20%    |
|1537139 |50%-60% |
|2536758 |NA      |
|3362287 |20%-<30%|
|5531171 |30%-<40%|
|6163575 |40%-<50%|
+--------+--------+



In [22]:
query = """ SELECT derived_race, derived_sex, purchaser_type, loan_type, 
                   business_or_commercial_purpose, loan_amount,  loan_term,  
                   occupancy_type, income, applicant_age, state_code, preapproval, loan_purpose, approved,
            CASE 
                WHEN discount_points IS NULL THEN 0
                ELSE discount_points
            END AS discount_pts,
            CASE
                WHEN debt_to_income_ratio IN ('30%-<36%', '36', '37', '38', '39') THEN '30%-<40%'
                WHEN debt_to_income_ratio IN ('40', '41', '42', '43', '44', '45', '46', '47', '48', '49') THEN '40%-<50%'
                ELSE debt_to_income_ratio
            END AS dtir
            FROM Filtered 
            WHERE loan_amount IS NOT NULL AND derived_race NOT IN ('Race Not Available', 'Free Form Text Only') AND 
                  loan_term IS NOT NULL AND applicant_age NOT IN (8888, 9999) AND income IS NOT NULL AND income > 0 """

df = spark.sql(query)
df.count()

17228367

In [23]:
approved_ratio = 0.76

def balance_class_weights(labels):
    return when(labels == 0, approved_ratio).otherwise(1 * (1 - approved_ratio))

df = df.withColumn('weights', balance_class_weights(col('approved')))

In [24]:
df.printSchema()

root
 |-- derived_race: string (nullable = true)
 |-- derived_sex: string (nullable = true)
 |-- purchaser_type: integer (nullable = true)
 |-- loan_type: integer (nullable = true)
 |-- business_or_commercial_purpose: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- loan_term: integer (nullable = true)
 |-- occupancy_type: integer (nullable = true)
 |-- income: integer (nullable = true)
 |-- applicant_age: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- preapproval: integer (nullable = true)
 |-- loan_purpose: integer (nullable = true)
 |-- approved: integer (nullable = true)
 |-- discount_pts: integer (nullable = true)
 |-- dtir: string (nullable = true)
 |-- weights: double (nullable = false)



In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assembler = VectorAssembler(inputCols = ['loan_amount', 'income', 'discount_pts'], outputCol = 'continuous_features_vector')
df = assembler.transform(df)

standard_scaler = StandardScaler().setInputCol('continuous_features_vector').setOutputCol('continuous_features_scaled')
df_scaled = standard_scaler.fit(df).transform(df)

In [26]:
pd.DataFrame(df_scaled.take(5), columns = df_scaled.columns)

Unnamed: 0,derived_race,derived_sex,purchaser_type,loan_type,business_or_commercial_purpose,loan_amount,loan_term,occupancy_type,income,applicant_age,state_code,preapproval,loan_purpose,approved,discount_pts,dtir,weights,continuous_features_vector,continuous_features_scaled
0,American Indian or Alaska Native,Joint,0,2,2,135000,180,1,61,55-64,WA,2,31,0,0,20%-<30%,0.76,"[135000.0, 61.0, 0.0]","[0.1725648471579141, 0.01165464902967626, 0.0]"
1,White,Joint,0,2,2,215000,360,1,82,25-34,OK,2,1,0,0,40%-<50%,0.76,"[215000.0, 82.0, 0.0]","[0.2748254973255669, 0.01566690525300743, 0.0]"
2,Joint,Joint,0,2,2,35000,180,1,125,55-64,OK,2,2,0,0,30%-<40%,0.76,"[35000.0, 125.0, 0.0]","[0.0447390344483481, 0.023882477519828398, 0.0]"
3,White,Male,1,1,2,345000,360,1,153,25-34,OK,2,31,1,4600,20%-<30%,0.24,"[345000.0, 153.0, 4600.0]","[0.4409990538480027, 0.029232152484269962, 3.251283374541778]"
4,American Indian or Alaska Native,Female,2,2,2,135000,360,1,34,55-64,AK,2,1,1,2461,30%-<40%,0.24,"[135000.0, 34.0, 2461.0]","[0.1725648471579141, 0.006496033885393325, 1.7394366053798511]"


In [27]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categorical_columns_str = ['derived_race', 'derived_sex', 'applicant_age', 
                           'state_code', 'dtir']
categorical_columns_num = ['purchaser_type', 'loan_type', 'business_or_commercial_purpose',
                           'loan_term', 'preapproval', 'occupancy_type', 'loan_purpose']
stages = []

for column in categorical_columns_str:
    stringIndexer = StringIndexer(inputCol = column, outputCol = column + 'Index')
    stages += [stringIndexer]

Here, the categorical features are indexed by each unique string or number and prepared for target/mean encoding.

In [28]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df_scaled)
df_scaled = pipelineModel.transform(df_scaled)

df_scaled = df_scaled.withColumn('label', col('approved'))
pd.DataFrame(df_scaled.take(5), columns=df_scaled.columns).transpose()

Unnamed: 0,0,1,2,3,4
derived_race,American Indian or Alaska Native,White,Joint,White,American Indian or Alaska Native
derived_sex,Joint,Joint,Joint,Male,Female
purchaser_type,0,0,0,1,2
loan_type,2,2,2,1,2
business_or_commercial_purpose,2,2,2,2,2
loan_amount,135000,215000,35000,345000,135000
loan_term,180,360,180,360,360
occupancy_type,1,1,1,1,1
income,61,82,125,153,34
applicant_age,55-64,25-34,55-64,25-34,55-64


In [29]:
from pyspark.sql.functions import create_map, lit
from itertools import chain
from functools import reduce
import random

categorical_features = [c + 'Index' for c in categorical_columns_str] + categorical_columns_num
df_scaled.registerTempTable('Testing')

def apply_normaldist_noise():
    noise = 1 + random.normalvariate(0, 0.07)
    
    return noise

def target_encoding(df):
    for column in categorical_features:
        encoded_column_name = column + '_enc'
        query = f""" WITH x AS (
                        SELECT COUNT(*) AS target_cnt, approved, {column}
                        FROM Testing
                        WHERE approved = 1
                        GROUP BY {column}, approved
                        ORDER BY {column}
                     ), y AS (
                        SELECT COUNT(*) AS feature_cnt, {column}
                        FROM Testing
                        GROUP BY {column}
                        ORDER BY {column} )

                     SELECT x.{column}, x.target_cnt / (y.feature_cnt) AS mean
                     FROM x
                     LEFT JOIN y ON x.{column} = y.{column}
                     ORDER BY {column} """

        aggregate_df = spark.sql(query)

        aggregate_dict = aggregate_df.rdd.map(lambda row: {row[column]: row['mean']}).collect()
        aggregate_dict = reduce(lambda a, b: {**a, **b}, aggregate_dict)
    
        mapping_expr = create_map([lit(x) for x in chain(*aggregate_dict.items())])
        df = df.withColumn(encoded_column_name, mapping_expr[col(column)])

#         # Add noise for L.O.O encoding
#         func = F.udf(apply_normaldist_noise, FloatType())
#         df = df.withColumn(encoded_column_name, col(encoded_column_name) * func())
        
    return df

df_scaled = target_encoding(df_scaled)
pd.DataFrame(df_scaled.take(5), columns=df_scaled.columns).transpose()

Unnamed: 0,0,1,2,3,4
derived_race,American Indian or Alaska Native,White,Joint,White,American Indian or Alaska Native
derived_sex,Joint,Joint,Joint,Male,Female
purchaser_type,0,0,0,1,2
loan_type,2,2,2,1,2
business_or_commercial_purpose,2,2,2,2,2
loan_amount,135000,215000,35000,345000,135000
loan_term,180,360,180,360,360
occupancy_type,1,1,1,1,1
income,61,82,125,153,34
applicant_age,55-64,25-34,55-64,25-34,55-64


In [30]:
encfeatures = list(map(lambda x:"{}{}".format(x, '_enc'), categorical_features))
df_scaled.select([count(when(col(c).isNull(), c)).alias(c) for c in encfeatures]).toPandas().T

Unnamed: 0,0
derived_raceIndex_enc,0
derived_sexIndex_enc,0
applicant_ageIndex_enc,0
state_codeIndex_enc,0
dtirIndex_enc,0
purchaser_type_enc,0
loan_type_enc,0
business_or_commercial_purpose_enc,0
loan_term_enc,138
preapproval_enc,0


In [31]:
df_scaled = df_scaled.dropna(subset='loan_term_enc')

assemblerInputs = encfeatures
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'chisq_features')
df_scaled = assembler.transform(df_scaled)

df2 = df_scaled.select('chisq_features', 'label', 'weights')
pd.DataFrame(df2.take(5), columns=df2.columns).transpose()

Unnamed: 0,0,1,2,3,4
chisq_features,"[0.5859915548606365, 0.8069451212522806, 0.7273094658960028, 0.808127549157317, 0.8746725172972699, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7751510480738912, 0.7581354849861067, 0.7658341286078199, 0.7248737653961077]","[0.782999439926836, 0.8069451212522806, 0.8342565417713655, 0.7575528747655684, 0.8629681734737917, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.8510934790255182]","[0.7882675639480726, 0.8069451212522806, 0.7273094658960028, 0.7575528747655684, 0.880346732052094, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7751510480738912, 0.7581354849861067, 0.7658341286078199, 0.552056992157519]","[0.782999439926836, 0.7425761525555532, 0.8342565417713655, 0.7575528747655684, 0.8746725172972699, 1.0, 0.7660516003450845, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.7248737653961077]","[0.5859915548606365, 0.7328477104823524, 0.7273094658960028, 0.7989864253393665, 0.880346732052094, 1.0, 0.7644677859281759, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.8510934790255182]"
label,0,0,0,1,1
weights,0.76,0.76,0.76,0.24,0.24


In [32]:
from pyspark.ml.stat import ChiSquareTest

chi_test = ChiSquareTest.test(df2, 'chisq_features', 'label').head()
print("p-values: " + str(chi_test.pValues))
print("degrees of freedom: " + str(chi_test.degreesOfFreedom))
print("statistics: " + str(chi_test.statistics))

p-values: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.8788319539341245e-07,0.0,0.0,0.0,0.0]
degrees of freedom: [6, 3, 6, 54, 7, 1, 3, 2, 357, 1, 2, 3]
statistics: [219644.72702182888,111644.78580594399,216554.18445850047,105364.29108818347,5327021.999998463,5391561.196502638,10369.434756809702,30.121422020958914,706411.6206296235,189581.3464073926,4043.911463637122,958460.8642184382]


In [33]:
df_scaled = df_scaled.drop(col("features"))
encfeatures.append('continuous_features_scaled')

assembler2 = VectorAssembler(inputCols = encfeatures, outputCol = 'features')
df_scaled = assembler2.transform(df_scaled)

df2 = df_scaled.select('features', 'label', 'weights')
pd.DataFrame(df2.take(5), columns=df2.columns).transpose()

Unnamed: 0,0,1,2,3,4
features,"[0.5859915548606365, 0.8069451212522806, 0.7273094658960028, 0.808127549157317, 0.8746725172972699, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7751510480738912, 0.7581354849861067, 0.7658341286078199, 0.7248737653961077, 0.1725648471579141, 0.01165464902967626, 0.0]","[0.782999439926836, 0.8069451212522806, 0.8342565417713655, 0.7575528747655684, 0.8629681734737917, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.8510934790255182, 0.2748254973255669, 0.01566690525300743, 0.0]","[0.7882675639480726, 0.8069451212522806, 0.7273094658960028, 0.7575528747655684, 0.880346732052094, 0.5266690300283462, 0.7644677859281759, 0.7664958251165599, 0.7751510480738912, 0.7581354849861067, 0.7658341286078199, 0.552056992157519, 0.0447390344483481, 0.023882477519828398, 0.0]","[0.782999439926836, 0.7425761525555532, 0.8342565417713655, 0.7575528747655684, 0.8746725172972699, 1.0, 0.7660516003450845, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.7248737653961077, 0.4409990538480027, 0.029232152484269962, 3.251283374541778]","[0.5859915548606365, 0.7328477104823524, 0.7273094658960028, 0.7989864253393665, 0.880346732052094, 1.0, 0.7644677859281759, 0.7664958251165599, 0.7946107259792559, 0.7581354849861067, 0.7658341286078199, 0.8510934790255182, 0.1725648471579141, 0.006496033885393325, 1.7394366053798511]"
label,0,0,0,1,1
weights,0.76,0.76,0.76,0.24,0.24


In [34]:
train, test = df2.randomSplit([0.8, 0.2], seed = 7)

### Building Models

In [35]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn import metrics
import plotly.graph_objects as go
import plotly.figure_factory as ff

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes

def crossval_training(model):
    cv = CrossValidator(estimator = model, estimatorParamMaps = ParamGridBuilder().build(), 
                        evaluator = BinaryClassificationEvaluator(), 
                        numFolds = 5, parallelism = 4, seed = 7)
    cv_model = cv.fit(train)
    
    predictions = cv_model.transform(test)
    predictions_df = predictions.select('label', 'prediction').toPandas()
    report = metrics.classification_report(predictions_df['label'], predictions_df['prediction'])
    matrix = metrics.confusion_matrix(predictions_df['label'], predictions_df['prediction'])

    return {'model': cv_model, 'predictions': predictions, 'predictions_df': predictions_df, 
            'report': report, 'matrix': matrix}

def make_heatmap(matrix, title, colorscale):
    fig = ff.create_annotated_heatmap(matrix, 
                                      x = ['Predicted - Denied (0)', 'Predicted - Approved (1)'], 
                                      y = ['Actual - Denied (0)', 'Actual - Approved (1)'], 
                                      annotation_text = matrix, colorscale = 'Greens')
    
    fig.update_layout(title = 'Loan Predictions: ' + title)
    fig.show()
    
def plot_roc_curve(model, title):
    summary = model.bestModel.summary
    roc = summary.roc.toPandas()

    fig = go.Figure(data = go.Scatter(x = roc['FPR'], y = roc['TPR']))
    fig.update_layout(title = 'Loan Predictions: ' + title)
    fig.show()
    
    print('AUC: ' + str(summary.areaUnderROC))

When it comes to metrics for evaluating these models it's important to take a couple of things into consideration. First of all, due to the heavy class imbalance, accuracy won't be a satisfactory. To illustrate with our example (keeping an 85-15 class ratio as with this dataset), assuming there were 100 rows and we predicted all 85 approved (1) cases correctly and all denied (0) cases incorrectly, the accuracy would be 85%. This seems like a solid number, but the model wasn't able to predict anything from the other class, so it's useless!

In comes precision and recall, which are much more useful. Precision is defined as: true positives / (true positives + false positives). Essentially, this tries to measure how correct the model actually is when it predicts the positive class. On the other hand, recall is defined as: true positives / (true positives + false negatives). Or, this could be thought of as how many true positives the model finds among the total number of actual positives in the data. A balance between these two would be the F1-Score, which is the weighted average of precision & recall. Formally, this is defined as 2 * ((precision * recall) / (precision + recall)). 

From the perspective of the bank/financial institution/loan issuer, precision could be a preferred metric as it's used when the cost of false positives is high. For example, if a bank were to approve a loan that should have been denied, there is a chance that the borrower won't pay it back, which is a huge long-term cost. False negatives could hurt the bank in another way as repeatedly denying loans might bring the bank less customers/lose potential profits. If one were to value the cost of false negatives more than that of false positives, recall would be good to pay attention to. 

In [59]:
# Logistic Regression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', weightCol = 'weights', 
                        maxIter = 10, regParam = 0.07, elasticNetParam = 0.4)

lr_result = crossval_training(lr)

In [60]:
make_heatmap(lr_result['matrix'], 'Logistic Regression Confusion Matrix', 'Greens')

In [61]:
print(lr_result['report'])

              precision    recall  f1-score   support

           0       0.68      0.63      0.65    804574
           1       0.89      0.91      0.90   2644083

    accuracy                           0.84   3448657
   macro avg       0.78      0.77      0.78   3448657
weighted avg       0.84      0.84      0.84   3448657



In [62]:
evaluator = BinaryClassificationEvaluator()
print('Logistic Regression areaUnderROC: {}'.format(evaluator.evaluate(lr_result['predictions'])))

Logistic Regression areaUnderROC: 0.9235463819860802


In [41]:
# Random Forest

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', weightCol = 'weights')

rf_result = crossval_training(rf)

In [42]:
make_heatmap(rf_result['matrix'], 'Random Forest Confusion Matrix', 'Greens')

In [43]:
print(rf_result['report'])

              precision    recall  f1-score   support

           0       0.52      1.00      0.68    804574
           1       1.00      0.72      0.83   2644083

    accuracy                           0.78   3448657
   macro avg       0.76      0.86      0.76   3448657
weighted avg       0.89      0.78      0.80   3448657



In [44]:
evaluator = BinaryClassificationEvaluator()
print('Random Forest areaUnderROC: {}'.format(evaluator.evaluate(rf_result['predictions'])))

Random Forest areaUnderROC: 0.9479956947466625


In [45]:
# Gradient-Boosted Trees

gbt = GBTClassifier(maxIter=10, weightCol = 'weights')

gbt_result = crossval_training(gbt)

In [46]:
make_heatmap(gbt_result['matrix'], 'Gradient-Boosted Tree Confusion Matrix', 'Greens')

In [47]:
print(gbt_result['report'])

              precision    recall  f1-score   support

           0       0.57      0.97      0.72    804574
           1       0.99      0.78      0.87   2644083

    accuracy                           0.82   3448657
   macro avg       0.78      0.87      0.79   3448657
weighted avg       0.89      0.82      0.83   3448657



In [48]:
evaluator = BinaryClassificationEvaluator()
print('Gradient-Boosted Tree areaUnderROC: {}'.format(evaluator.evaluate(gbt_result['predictions'])))

Gradient-Boosted Tree areaUnderROC: 0.9527209056329249


In [49]:
# Naive Bayes

nb = NaiveBayes(weightCol = 'weights')

nb_result = crossval_training(nb)

In [50]:
make_heatmap(nb_result['matrix'], 'Naive Bayes Confusion Matrix', 'Greens')

In [51]:
print(nb_result['report'])

              precision    recall  f1-score   support

           0       0.30      1.00      0.46    804574
           1       1.00      0.28      0.44   2644083

    accuracy                           0.45   3448657
   macro avg       0.65      0.64      0.45   3448657
weighted avg       0.84      0.45      0.45   3448657



In [52]:
evaluator = BinaryClassificationEvaluator()
print('Naive Bayes areaUnderROC: {}'.format(evaluator.evaluate(nb_result['predictions'])))

Naive Bayes areaUnderROC: 0.11037348363590317


In [53]:
# Support Vector Machine

svm = LinearSVC(maxIter=10, regParam=0.05, weightCol = 'weights')

svm_result = crossval_training(svm)


Precision and F-score are ill-defined and being set to 0.0 in labels with no predicted samples. Use `zero_division` parameter to control this behavior.



In [54]:
make_heatmap(svm_result['matrix'], 'Support Vector Machine', 'Greens')

In [55]:
print(svm_result['report'])

              precision    recall  f1-score   support

           0       0.00      0.00      0.00    804574
           1       0.77      1.00      0.87   2644083

    accuracy                           0.77   3448657
   macro avg       0.38      0.50      0.43   3448657
weighted avg       0.59      0.77      0.67   3448657



In [56]:
evaluator = BinaryClassificationEvaluator()
print('Support Vector Machine areaUnderROC: {}'.format(evaluator.evaluate(svm_result['predictions'])))

Support Vector Machine areaUnderROC: 0.9224175483282917


### Conclusion

When it comes to evaluating the performance of the classifiers above, multiple angles must be considered. Most of them demonstrated an accuracy at least in the high 70s but as stated earlier, this is certainly not the best standard to go by. Weight balancing had to be performed since training all the models without them yielded high accuracies but terrible recall of the negative class (maximum in the high 50s). Both the Decision Tree and Random Forest models surprisingly provided identical, yet disappointing results. Here, they had almost the same amount of false negatives as that of true negatives and not a single false positive. The Gradient-Boosted Tree improved slightly yet this and the other two tree-based models were highly overfit to the negative/minority class (0 - loan denials), most likely due to the weighting. The Naive-Bayes model was by far the worst with an accuracy of 45% and an average F1-score of 0.45. Looking at the confusion matrix/classification report, even though it correctly predicted all the denied loans present in the data, it predicted far too many of them and not enough approved loans. Surprisingly, simple logistic regression gave the best results, with an accuracy of 84% and an F1-score of 0.78 after tweaking some hyperparameters. 

Besides model performence, there were several other drawbacks and areas for improvement. Currently the dataset provides applicants' credit score types but not the scores themselves. Those are hugely influential factors in determining whether or not to approve a loan/mortgage, but having those collected in future releases online would be a privacy risk. A possible solution for the CFPB could be to at least divide the scores into several bins of specified ranges to further anonymize the data. Also, the Spark ML library itself was not the easiest to work with. First of all, there doesn't appear to be a native API to view confusion matrices. In this notebook, the predictions had to be converted into a Pandas DataFrame, processed in scikit-learn, and visualized as a heatmap with Plotly. Additionally Spark ML doesn't natively support a wide variety of feature encoding techniques like scikit-learn does (here, target/mean encoding & leave-one-out encoding had to be implemented separately). Furthermore, Spark ML could support more model types such as XGBoost or including different kernel types for Support Vector Machines (polynomial, rbf, etc). Another item of note is the run time for this notebook, which could be sped up with better hardware. The whole process took many hours, with the tree based models taking the longest to train. Inspecting Terminal during model training revealed that insufficient memory for caching the RDDs was the cause, but the problem could also be that the notebook was running on a 4-year old weak MacBook CPU. This also resulted in needing to rerun several times for a complete output as the kernel would occasionally die out.

References:

1. Data Dictionary/Definitions: https://ffiec.cfpb.gov/documentation/2019/lar-data-fields/
2. SPark MLLib Documentation: https://spark.apache.org/docs/latest/ml-guide.html