###**Set up environment**

In [26]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Hit:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co[0m                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
[33m0% [3 InRelease 15.6 kB/114 kB 14%] [Connecting to security.ubuntu.com (91.189.[0m                                                                               Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
[33m0% [3 InRelease 15.6 kB/114 kB 14%] [Connecting to security.ubuntu.com (91.189.[0m                                                                               Hit:5 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
[33m0% [3 InRelease 15.6 kB

In [27]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession.builder \
       .appName("Credit Risk Analysis") \
       .config("spark.driver.memory", "8g") \
       .config("spark.executor.memory", "8g") \
       .getOrCreate()


spark

In [28]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


##**Import Libraries**

In [52]:
from pyspark.sql.functions import count, when, col, isnan, when, count, isnan, to_date, dayofmonth, month, year
from pyspark.sql.types import StructType,StructField, DateType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

##**Read in Data**

In [30]:
filepath = '/content/drive/My Drive/data.csv'

In [31]:
# Define a custom date format
dateFormat = "dd/MM/yyyy"

schema = StructType([
  StructField("issue_d", DateType()),
  StructField("last_pymnt_d", DateType()),
  StructField("earliest_cr_line", DateType()),
  StructField("last_credit_pull_d", DateType()),
])

df = spark.read.csv(filepath, header = True, inferSchema = True, dateFormat=dateFormat)

##**Exploratory Data Analysis**

In [32]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest

In [33]:
df.describe().show()

+-------+--------------------+--------------------+------------------+------------------+------------------+----------+------------------+------------------+------+---------+--------------------+----------+--------------+-----------------+-------------------+---------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+----------------------+----------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+-----------------+--------------------+--------------------+------------------+--------------------+------------------+-----------------------+------------------+------------------+------------------+------------------+--------------------------+---------------------------+------------------+-----------------+-------------

In [34]:
# Identity missing values and outliers
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show(vertical=True)

-RECORD 0-----------------------------
 id                          | 0      
 member_id                   | 0      
 loan_amnt                   | 0      
 funded_amnt                 | 0      
 funded_amnt_inv             | 0      
 term                        | 0      
 int_rate                    | 0      
 installment                 | 0      
 grade                       | 0      
 sub_grade                   | 0      
 emp_title                   | 49439  
 emp_length                  | 0      
 home_ownership              | 0      
 annual_inc                  | 0      
 verification_status         | 0      
 issue_d                     | 0      
 pymnt_plan                  | 0      
 desc                        | 734155 
 purpose                     | 1      
 title                       | 33     
 zip_code                    | 1      
 addr_state                  | 1      
 dti                         | 1      
 delinq_2yrs                 | 1      
 earliest_cr_line        

In [35]:
df.columns

['id',
 'member_id',
 'loan_amnt',
 'funded_amnt',
 'funded_amnt_inv',
 'term',
 'int_rate',
 'installment',
 'grade',
 'sub_grade',
 'emp_title',
 'emp_length',
 'home_ownership',
 'annual_inc',
 'verification_status',
 'issue_d',
 'pymnt_plan',
 'desc',
 'purpose',
 'title',
 'zip_code',
 'addr_state',
 'dti',
 'delinq_2yrs',
 'earliest_cr_line',
 'inq_last_6mths',
 'mths_since_last_delinq',
 'mths_since_last_record',
 'open_acc',
 'pub_rec',
 'revol_bal',
 'revol_util',
 'total_acc',
 'initial_list_status',
 'out_prncp',
 'out_prncp_inv',
 'total_pymnt',
 'total_pymnt_inv',
 'total_rec_prncp',
 'total_rec_int',
 'total_rec_late_fee',
 'recoveries',
 'collection_recovery_fee',
 'last_pymnt_d',
 'last_pymnt_amnt',
 'next_pymnt_d',
 'last_credit_pull_d',
 'collections_12_mths_ex_med',
 'mths_since_last_major_derog',
 'policy_code',
 'application_type',
 'annual_inc_joint',
 'dti_joint',
 'verification_status_joint',
 'acc_now_delinq',
 'tot_coll_amt',
 'tot_cur_bal',
 'open_acc_6m',
 '

In [36]:
# Drop columns which I think is unnecessary after checking the data dictionary
df = df.drop('id','member_id','desc','emp_length','emp_title','title','zip_code','addr_state')

#### Show percentage of missing values for each column

In [37]:
def calculate_missing_values(df):
    missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
    missing_percentages = (missing_counts / df.count()) * 100
    styled_output = missing_percentages.style.format("{:.2f}%").background_gradient(cmap='Reds', axis=1)
    return styled_output

calculate_missing_values(df)

Unnamed: 0,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,home_ownership,annual_inc,verification_status,issue_d,pymnt_plan,purpose,dti,delinq_2yrs,earliest_cr_line,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_il_6m,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m,default_ind
0,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,51.37%,84.65%,0.01%,0.01%,0.01%,0.06%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,0.00%,1.04%,0.00%,29.53%,0.01%,0.01%,75.08%,0.01%,0.01%,99.93%,99.93%,99.94%,0.02%,7.85%,7.85%,98.44%,98.44%,98.44%,98.44%,98.49%,98.44%,98.64%,98.45%,98.45%,98.45%,98.45%,7.86%,98.45%,98.45%,98.45%,0.03%


#### Drop columns where it is missing more than 25% of data

In [38]:
# Threshold for dropping columns
threshold = 0.25
for col in df.columns:
    missing_percentage = df.filter(df[col].isNull() | isnan(df[col])).count() / df.count()
    if missing_percentage > threshold:
        df = df.drop(col)

In [39]:
#calculate_missing_values(df)

#### Drop remaining rows which consists of missing values

In [40]:
df = df.dropna()
print('Number of rows: ', df.count())
print('Number of columns: ', len(df.columns))

Number of rows:  779489
Number of columns:  44


#### Convert date columns to date

In [42]:
date_columns = ['issue_d', 'last_pymnt_d', 'earliest_cr_line', 'last_credit_pull_d']
date_format = "dd-MM-yyyy"

for col_name in date_columns:
    # Convert column to date type
    df = df.withColumn(col_name, to_date(df[col_name], date_format))

    # Extract the components of the date
    df = df.withColumn(f"{col_name}_day", dayofmonth(df[col_name]))
    df = df.withColumn(f"{col_name}_month", month(df[col_name]))
    df = df.withColumn(f"{col_name}_year", year(df[col_name]))

In [43]:
# Drop date column
df = df.drop(*date_columns)

In [44]:
df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- inq_last_6mths: string (nullable = true)
 |-- open_acc: string (nullable = true)
 |-- pub_rec: string (nullable = true)
 |-- revol_bal: string (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: string (nullable = true)
 |-- initial_list_status: string (nullable = true)
 |-- out_prncp: string (nullable = true)
 |-- out_prncp_inv: string

#### Encode categorical columns using String Index and OneHotEncoder. StringIndex is used to assign a unique value to each categorical column and the One hot encoder transform the values into a binary vector. I used a Pipeline to automate the the encoding process. Then I use Vector assembler to combine all my features into a single vector

In [45]:
# Get numerical column names
numerical_columns = [column[0] for column in df.dtypes if column[1] in ['int', 'double']]

# Remove 'label_ind' from numerical_columns
if 'label_ind' in numerical_columns:
    numerical_columns.remove('label_ind')

# Get categorical columns
categorical_columns = [column[0] for column in df.dtypes if column[1] == 'string']

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_columns]

encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in categorical_columns]

# Add numerical columns directly to feature vector
assembler_input = [encoder.getOutputCol() for encoder in encoders] + numerical_columns

# Use VectorAssembler to combine all the OneHotEncoded and numerical columns into one vector column
assembler = VectorAssembler(inputCols=assembler_input, outputCol="features")

# Set up the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit the pipeline to your data
model = pipeline.fit(df)

# Transform your data
df_transformed = model.transform(df)


In [46]:
df_transformed = df_transformed.select("default_ind", "features")

#### Using stratified sampling, split the dataframe into training and testing sets

In [47]:
label = "default_ind"

# Had to set a sample size from the df as the training time was too long, causing my spark connection to keep crashing
sample_ratio = 0.2

# Do stratified sampling for train-test split
fractions = df_transformed.select(label).distinct().withColumn("fraction", F.lit(sample_ratio)).rdd.collectAsMap()
train_df = df_transformed.sampleBy(label, fractions, seed=42)
test_df = df_transformed.subtract(train_df)

#### Logistic Regression for training

In [48]:
# Define Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol=label)

In [49]:
# Fit the model to the data
lr_model = lr.fit(train_df)

In [50]:
# Make predictions on test data
lr_predictions = lr_model.transform(test_df)

#### Evaluate results

In [56]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="default_ind")

# Compute metrics
accuracy = multi_evaluator.evaluate(lr_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(lr_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(lr_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(lr_predictions, {multi_evaluator.metricName: "f1"})

print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1 score: ", f1)

Accuracy:  0.9746453883643107
Precision:  0.9728561652819363
Recall:  0.9746453883643107
F1 score:  0.9714755756779909


#### Save the pipeline for future use

In [57]:
model.save('lr_Pipeline')