# Kevin Ferdinand 0358519 Capstone Code


## Import Modules and Install PySpark

In [1]:
!pip list

Package                 Version
----------------------- --------
absl-py                 1.3.0
aiohttp                 3.8.3
aiosignal               1.2.0
altair                  4.1.0
anyio                   3.5.0
appdirs                 1.4.4
argon2-cffi             21.3.0
argon2-cffi-bindings    21.2.0
asttokens               2.0.5
astunparse              1.6.3
async-timeout           4.0.2
attrs                   22.1.0
autovizwidget           0.20.0
Babel                   2.11.0
backcall                0.2.0
beautifulsoup4          4.12.2
bleach                  4.1.0
blinker                 1.4
Bottleneck              1.3.5
brotlipy                0.7.0
cachetools              4.2.2
certifi                 2023.5.7
cffi                    1.15.1
charset-normalizer      2.0.4
click                   8.0.4
colorama                0.4.6
comm                    0.1.2
contourpy               1.0.5
cryptography            39.0.1
cycler                  0.11.0
debugpy                 1

In [2]:
!pip install pyspark



In [3]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


In [4]:
import numpy as np
import pandas as pd
import tensorflow as tf
import seaborn as sns
from datetime import datetime
from matplotlib import pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType, FloatType
from pyspark.sql.functions import when, expr
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.tree import GradientBoostedTrees
from pyspark.ml.classification import GBTClassifier
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F
from pyspark.sql.functions import col,count, explode, array, lit,mean, sum as spark_sum
from pyspark.ml import PipelineModel
import xgboost as xgb
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

## Create SparkSession and SparkContext Objects

In [None]:
spark = SparkSession.builder.master('local').appName('solution').config('spark.executor.memory', '8g').config('spark.driver.memory', '8g').getOrCreate()
sc = spark.sparkContext

### Load Data as RDDs - txt to Dataframe


In [None]:
# Load Each Txt File and use spark to convert it into a Pipeline RDD
loanOriginationRDD2017 = sc.textFile(data_dir+'sample_orig_2017.txt').map(lambda x: x.split('|'))
loanOriginationRDD2018 = sc.textFile(data_dir+'sample_orig_2018.txt').map(lambda x: x.split('|'))
loanOriginationRDD2019 = sc.textFile(data_dir+'sample_orig_2019.txt').map(lambda x: x.split('|'))
loanOriginationRDD2020 = sc.textFile(data_dir+'sample_orig_2020.txt').map(lambda x: x.split('|'))

In [None]:
#loanOriginationRDD = loanOriginationRDD2017.union(loanOriginationRDD2018).union(loanOriginationRDD2019).union(loanOriginationRDD2020)
loanOriginationRDD = loanOriginationRDD2018.union(loanOriginationRDD2019).union(loanOriginationRDD2020)

In [None]:
monthlyPerformanceRDD2017 = sc.textFile(data_dir+'sample_svcg_2017.txt').map(lambda x: x.split('|'))
monthlyPerformanceRDD2018 = sc.textFile(data_dir+'sample_svcg_2018.txt').map(lambda x: x.split('|'))
monthlyPerformanceRDD2019 = sc.textFile(data_dir+'sample_svcg_2019.txt').map(lambda x: x.split('|'))
monthlyPerformanceRDD2020 = sc.textFile(data_dir+'sample_svcg_2020.txt').map(lambda x: x.split('|'))

#monthlyPerformanceRDD = monthlyPerformanceRDD2017.union(monthlyPerformanceRDD2018).union(monthlyPerformanceRDD2019).union(monthlyPerformanceRDD2020)
monthlyPerformanceRDD = monthlyPerformanceRDD2018.union(monthlyPerformanceRDD2019).union(monthlyPerformanceRDD2020)


### Check Content

In [None]:
print('There are', loanOriginationRDD.count(), 'records in the loan origination dataset.')
print('There are', monthlyPerformanceRDD.count(), 'records in the monthly performance dataset.')

## Convert it into DataFrame

In [None]:
# Create the Loan Origination DataFrame from the Loan Origination RDD.
# Next, assign new column names to that DataFrame.
loanOriginationDF = spark.createDataFrame(loanOriginationRDD).toDF(
    'CREDIT_SCORE', 
    'FIRST_PAYMENT_DATE',
    'FIRST_TIME_HOMEBUYER_FLAG',
    'MATURITY_DATE',
    'METROPOLITAN_STATISTICAL_AREA',
    'MORTGAGE_INSURANCE_PERCENTAGE',
    'NUMBER_OF_UNITS',
    'OCCUPANCY_STATUS',
    'ORIGINAL_COMBINED_LOAN_TO_VALUE',
    'ORIGINAL_DEBT_TO_INCOME_RATIO',
    'ORIGINAL_UPB',
    'ORIGINAL_LOAN_TO_VALUE',
    'ORIGINAL_INTEREST_RATE',
    'CHANNEL',
    'PREPAYMENT_PENALTY_MORTGAGE_FLAG',
    'AMORTIZATION_TYPE',
    'PROPERTY_STATE',
    'PROPERTY_TYPE',
    'POSTAL_CODE',
    'LOAN_SEQUENCE_NUMBER',
    'LOAN_PURPOSE',
    'ORIGINAL_LOAN_TERM',
    'NUMBER_OF_BORROWERS',
    'SELLER_NAME',
    'SERVICER_NAME',
    'SUPER_CONFORMING_FLAG',
    'PRERELIEF_REFINANCE_LOAN_SEQUENCE_NUMBER',
    'PROGRAM_INDICATOR',
    'RELIEF_REFINANCE_INDICATOR',
    'PROPERTY_VALUATION_METHOD',
    'INTEREST_ONLY_INDICATOR')

### Extract DELINQUENT from Monthly Performance RDD

In [None]:
dictionary = {}
# Txt File states that 0 = no delinquency , 1 = have a delinquent 
# Extract these coreesponding to the Loan Sequence Number and check if there is any instance of delinquency
def fillDictionary(record):
    loanSequenceNumber = record[0]
    currentLoanDelinquencyStatus = record[3]
    delinquent = 0 if currentLoanDelinquencyStatus == '0' else 1
    if loanSequenceNumber in dictionary:
        if dictionary[loanSequenceNumber] == 0:
            dictionary[loanSequenceNumber] = delinquent
    else:
        dictionary[loanSequenceNumber] = delinquent

monthlyPerformanceData = monthlyPerformanceRDD.collect()

for record in monthlyPerformanceData:
    fillDictionary(record)
print('The dictionary contains', len(dictionary), 'key-value pairs.')

### Create Delinquent DataFrame

In [None]:
delinquentDF = spark.createDataFrame(dictionary.items(), schema=['LOAN_SEQUENCE_NUMBER', 'DELINQUENT'])
delinquentDF.printSchema()

Now Join them with the other file to get a pair of person data and delinquency

In [None]:
df = loanOriginationDF.join(delinquentDF, 'LOAN_SEQUENCE_NUMBER')  # inner join

loanInDelinquencyCount = df.filter(df['DELINQUENT'] == 1).count()
loanNotInDelinquencyCount = df.filter(df['DELINQUENT'] == 0).count()
loanCount = loanInDelinquencyCount + loanNotInDelinquencyCount

print('There are', loanInDelinquencyCount, 'loans in delinquency.')
print('There are', loanNotInDelinquencyCount, 'loans not in delinquency.')
print('There are', loanCount, 'available loans in total.')

### Drop Columns that is not important

In [None]:
columnsToDrop = [
    'FIRST_PAYMENT_DATE',
    'MATURITY_DATE',
    'MORTGAGE_INSURANCE_PERCENTAGE',
    'ORIGINAL_UPB',
    'PREPAYMENT_PENALTY_MORTGAGE_FLAG',
    'PROPERTY_STATE',
    'LOAN_SEQUENCE_NUMBER',
    'SELLER_NAME',
    'SERVICER_NAME',
    'SUPER_CONFORMING_FLAG',
    'PRERELIEF_REFINANCE_LOAN_SEQUENCE_NUMBER',
    'PROGRAM_INDICATOR',
    'RELIEF_REFINANCE_INDICATOR',
    'PROPERTY_VALUATION_METHOD',
    'INTEREST_ONLY_INDICATOR',
    'AMORTIZATION_TYPE'
]

df = df.drop(*columnsToDrop)

In [None]:
# Check Datatypes
df.printSchema()


Change some Datatypes to Double

In [None]:
df = df.withColumn('CREDIT_SCORE', df['CREDIT_SCORE'].cast(DoubleType()))
df = df.withColumn('METROPOLITAN_STATISTICAL_AREA', df['METROPOLITAN_STATISTICAL_AREA'].cast(DoubleType()))
df = df.withColumn('NUMBER_OF_UNITS', df['NUMBER_OF_UNITS'].cast(DoubleType()))
df = df.withColumn('ORIGINAL_COMBINED_LOAN_TO_VALUE', df['ORIGINAL_COMBINED_LOAN_TO_VALUE'].cast(DoubleType()))
df = df.withColumn('ORIGINAL_DEBT_TO_INCOME_RATIO', df['ORIGINAL_DEBT_TO_INCOME_RATIO'].cast(DoubleType()))
df = df.withColumn('ORIGINAL_LOAN_TO_VALUE', df['ORIGINAL_LOAN_TO_VALUE'].cast(DoubleType()))
df = df.withColumn('POSTAL_CODE', df['POSTAL_CODE'].cast(DoubleType()))
df = df.withColumn('ORIGINAL_LOAN_TERM', df['ORIGINAL_LOAN_TERM'].cast(DoubleType()))
df = df.withColumn('NUMBER_OF_BORROWERS', df['NUMBER_OF_BORROWERS'].cast(DoubleType()))
df = df.withColumn('ORIGINAL_INTEREST_RATE', df['ORIGINAL_INTEREST_RATE'].cast(DoubleType()))
df.printSchema()

In [None]:
nullCountDF = df.toPandas()

In [None]:
nullCountDF.isnull().sum()

### Make 2 functions to help analyze categorical and continuos data

In [None]:
# Analyse categorical features.
# the Dataframe with Selected Attributes, Selected Column Name , The Figure Size, Title , Rotation
def analyseCategoricalData(df, column, fig_size=(12, 7), title=None, rot=90):
    # Print Unique Values and Missing Values 
    print("Number of unique values: {}\n".format(len(df[column].unique())))
    print("Number of missing values: {}\n".format(df[column].isnull().sum()))
    # Count the Missing Values
    df = np.round(df[column].value_counts(normalize=True, ascending=False, dropna=False) * 100, 2)
    if True in df.index.isnull():
        df.index = df.index.fillna("Missing Values") 
    print(df)
    # Plot the Figure using pyplot 
    fig = plt.figure(figsize=fig_size)
    ax = df.plot.bar()
    for p in ax.patches:
        ax.annotate(str(p.get_height()), (p.get_x() * 1.005, p.get_height() * 1.005))
    plt.title(title)
    plt.xticks(rotation=rot)
    # Save Graph
    plt.savefig(column + '.png', dpi=fig.dpi)
    plt.show()

def analyseContinuousData(df, column, xlabel="", figSize=(12, 7)):
    # Print Missing Values and Statistics such as Mean, std,count ....
    print("Number of missing values: {}\n".format(df[column].isnull().sum()))
    print('Descriptive statistics:\n')
    print(df[column].describe())

    # Plot the graphs Make it as density values to better represent it
    fig, ax = plt.subplots()
    df.loc[df.DELINQUENT == True, column].plot.hist(bins=50, density=True, ax=ax, alpha=0.4, label='Delinquent')
    df.loc[df.DELINQUENT == False, column].plot.hist(bins=50, density=True, ax=ax, alpha=0.4, label='Non-Delinquent')
    plt.title("Delinquent vs. Non-Delinquent")
    plt.xlabel(xlabel)
    plt.ylabel('Density Value')
    plt.legend()
    # Save Graph
    plt.savefig(column + '.png', dpi=fig.dpi)
    plt.show()

## Exploratory Data Analysis

### Explore Correlation Heatmap

In [None]:
pandas_df = df.toPandas()

In [None]:
# Calculate the correlation matrix
correlation_matrix = pandas_df.corr()

# Create a heatmap using seaborn
plt.figure(figsize=(10, 8))  # Set the size of the heatmap
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm')

# Add labels and title
plt.xlabel('Attributes')
plt.ylabel('Attributes')
plt.title('Correlation Heatmap')

# Display the heatmap
plt.show()

In [None]:
correlation_matrix

### Explore CREDIT_SCORE

In [None]:
# A credit score is unavailable when its value is 9999.
print('The number of loan records with unavailable credit scores:', df.filter(df['CREDIT_SCORE'] == 9999).count())

In [None]:
# Replace the values of unavailable credit scores with None.
df = df.withColumn('CREDIT_SCORE', when(df['CREDIT_SCORE'] != 9999, df['CREDIT_SCORE']).otherwise(None))

In [None]:
# Analyse the CREDIT_SCORE column.
pandasDF = df.select(df['CREDIT_SCORE'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='CREDIT_SCORE', xlabel='Credit Score')

In [None]:
# Replace the values of unavailable credit scores with 700.
df = df.withColumn('CREDIT_SCORE', when(df['CREDIT_SCORE'].isNotNull(), df['CREDIT_SCORE']).otherwise(700.0))

In [None]:
df = df.withColumn('CREDIT_SCORE_CLASS',
                   when(df['CREDIT_SCORE'].between(800, 850), 'Excellent')
                   .when(df['CREDIT_SCORE'].between(740, 799), 'Very Good')
                   .when(df['CREDIT_SCORE'].between(670, 739), 'Good')
                   .when(df['CREDIT_SCORE'].between(580, 669), 'Fair')
                   .when(df['CREDIT_SCORE'].between(300, 579), 'Poor')
                   .otherwise('Unknown'))

In [None]:
# Count unique values and their frequency in the 'attribute' column
unique_values_count = df.groupBy('CREDIT_SCORE_CLASS').agg(count('*').alias('count'))
# Display the result
unique_values_count.show()

In [None]:
pandasDF = df.select(df['CREDIT_SCORE_CLASS'], df['DELINQUENT']).toPandas()
analyseCategoricalData(
    pandasDF,
    column='CREDIT_SCORE_CLASS',
    fig_size=(8, 6),
    title='Credit Score Class',
    rot=0
)

In [None]:
pd.crosstab(pandasDF.CREDIT_SCORE_CLASS, pandasDF.DELINQUENT, normalize='index')

### Explore First Time Homebuyer Flag

In [None]:
# A first-time homebuyer flag is unavailable when its value is '9'.

print('The number of loan records with unavailable first-time homebuyer flags:', df.filter(df['FIRST_TIME_HOMEBUYER_FLAG'] == '9').count())

In [None]:
# Analyse the FIRST_TIME_HOMEBUYER_FLAG column.
pandasDF = df.select(df['FIRST_TIME_HOMEBUYER_FLAG'], df['DELINQUENT']).toPandas()
analyseCategoricalData(
    pandasDF,
    column='FIRST_TIME_HOMEBUYER_FLAG',
    fig_size=(8, 6),
    title='First Time Homebuyer Flag',
    rot=0
)
pd.crosstab(pandasDF.FIRST_TIME_HOMEBUYER_FLAG, pandasDF.DELINQUENT, normalize='index')

### Explore Original Interest Rate

In [None]:
print('Number of loan records with unavailable ORIGINAL_INTEREST_RATE:', df.filter(df['ORIGINAL_INTEREST_RATE'].isNull()).count())

In [None]:
# Analyse the ORIGINAL_INTEREST_RATE column.
pandasDF = df.select(df['ORIGINAL_INTEREST_RATE'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='ORIGINAL_INTEREST_RATE', xlabel='ORIGINAL_INTEREST_RATE')


### Explore METROPOLITAN_STATISTICAL_AREA

In [None]:
# MSA is codes that corresponds to a certain area and have some information about them
print('Number of unique MSA codes:', df.select(df['METROPOLITAN_STATISTICAL_AREA']).distinct().count())
print('Number of loan records with unavailable MSA codes:', df.filter(df['METROPOLITAN_STATISTICAL_AREA'].isNull()).count())

replace null with 0 and keep non null as usuall


In [None]:
df = df.withColumn('METROPOLITAN_STATISTICAL_AREA', when(df['METROPOLITAN_STATISTICAL_AREA'].isNotNull(), df['METROPOLITAN_STATISTICAL_AREA']).otherwise(0.0))

In [None]:
pandasDF = df.select(df['METROPOLITAN_STATISTICAL_AREA'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='METROPOLITAN_STATISTICAL_AREA', xlabel='MSA')
pd.crosstab(pandasDF.METROPOLITAN_STATISTICAL_AREA, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False).head()

### Explore NUMBER_OF_UNITS

In [None]:
# The number of units is unavailable when its value is 99.
print('The number of loan records with the unavailable number of units:', df.filter(df['NUMBER_OF_UNITS'] == 99).count())

In [None]:
# Analyse the NUMBER_OF_UNITS column.

pandasDF = df.select(df['NUMBER_OF_UNITS'], df['DELINQUENT']).toPandas()
analyseCategoricalData(
    pandasDF,
    column='NUMBER_OF_UNITS',
    fig_size=(8, 6),
    title='Number of Units',
    rot=0
)
pd.crosstab(pandasDF.NUMBER_OF_UNITS, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

### Explore OCCUPANCY_STATUS

In [None]:
# An occupancy status is unavailable when its value is '9'.
print('The number of loan records with unavailable occupancy status:', df.filter(df['OCCUPANCY_STATUS'] == '9').count())

In [None]:
# Analyse the OCCUPANCY_STATUS column.
pandasDF = df.select(df['OCCUPANCY_STATUS'], df['DELINQUENT']).toPandas()
analyseCategoricalData(
    pandasDF,
    column='OCCUPANCY_STATUS',
    fig_size=(8, 6),
    title='Occupancy Status',
    rot=0
)
pd.crosstab(pandasDF.OCCUPANCY_STATUS, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

### Explore ORIGINAL_COMBINED_LOAN_TO_VALUE of property


In [None]:
# An original combined LTV is unavailable when its value is 999.
print('The number of loan records with unavailable original combined LTVs:', df.filter(df['ORIGINAL_COMBINED_LOAN_TO_VALUE'] == 999).count())

In [None]:
# Replace unavailable values with None for the EDA
df = df.withColumn('ORIGINAL_COMBINED_LOAN_TO_VALUE', when(df['ORIGINAL_COMBINED_LOAN_TO_VALUE'] != 999, df['ORIGINAL_COMBINED_LOAN_TO_VALUE']).otherwise(None))

In [None]:
# Analyse the ORIGINAL_COMBINED_LOAN_TO_VALUE column.
pandasDF = df.select(df['ORIGINAL_COMBINED_LOAN_TO_VALUE'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='ORIGINAL_COMBINED_LOAN_TO_VALUE', xlabel='Original Combined Loan-to-Value (LTV)')

In [None]:
# Replace the missing values with the median for training
df = df.withColumn('ORIGINAL_COMBINED_LOAN_TO_VALUE', when(df['ORIGINAL_COMBINED_LOAN_TO_VALUE'].isNotNull(), df['ORIGINAL_COMBINED_LOAN_TO_VALUE']).otherwise(80))

### Explore ORIGINAL_DEBT_TO_INCOME_RATIO

In [None]:
# An original combined LTV is unavailable when its value is 999.
print('The number of loan records with unavailable original DTIR:', df.filter(df['ORIGINAL_DEBT_TO_INCOME_RATIO'] == 999).count())

In [None]:
# Replace unavailable values with None.
df = df \
.withColumn('ORIGINAL_DEBT_TO_INCOME_RATIO', when(df['ORIGINAL_DEBT_TO_INCOME_RATIO'] != 999, df['ORIGINAL_DEBT_TO_INCOME_RATIO']).otherwise(None))

In [None]:
# Analyse the ORIGINAL_DEBT_TO_INCOME_RATIO column.
pandasDF = df.select(df['ORIGINAL_DEBT_TO_INCOME_RATIO'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='ORIGINAL_DEBT_TO_INCOME_RATIO', xlabel='Original Debt-to-Income (DTI) Ratio')

In [None]:
# Replace unavailable values with 999 to check the density values
df = df \
.withColumn('ORIGINAL_DEBT_TO_INCOME_RATIO', when(df['ORIGINAL_DEBT_TO_INCOME_RATIO'].isNotNull(), df['ORIGINAL_DEBT_TO_INCOME_RATIO']).otherwise(999))

In [None]:
# Create a new colomn named ORIGINAL_DEBT_TO_INCOME_RATIO_Class so it will not mix
df = df.withColumn('ORIGINAL_DEBT_TO_INCOME_RATIO_Class', when((df['ORIGINAL_DEBT_TO_INCOME_RATIO'] >= 35) , "High").otherwise("low"))

In [None]:
# Analyse the ORIGINAL_DEBT_TO_INCOME_RATIO_Class colomn.
pandasDF = df.select(df['ORIGINAL_DEBT_TO_INCOME_RATIO_Class'], df['DELINQUENT']).toPandas()
analyseCategoricalData(
    pandasDF,
    column='ORIGINAL_DEBT_TO_INCOME_RATIO_Class',
    fig_size=(8, 6),
    title='ORIGINAL_DEBT_TO_INCOME_RATIO_Class',
    rot=0
)
pd.crosstab(pandasDF.ORIGINAL_DEBT_TO_INCOME_RATIO_Class, pandasDF.DELINQUENT, normalize='index')

In [None]:
# Count unique values and their frequency in the 'attribute' column
unique_values_count = df.groupBy('ORIGINAL_DEBT_TO_INCOME_RATIO_Class').agg(count('*').alias('count'))

# Display the result
unique_values_count.show()

### Explore ORIGINAL_LOAN_TO_VALUE of the property

In [None]:
# An original LTV is unavailable when its value is 999.
print('The number of loan records with unavailable original LTVs:', df.filter(df['ORIGINAL_LOAN_TO_VALUE'] == 999).count())

In [None]:
# Replace unavailable values with None.
df = df.withColumn('ORIGINAL_LOAN_TO_VALUE', when(df['ORIGINAL_LOAN_TO_VALUE'] != 999, df['ORIGINAL_LOAN_TO_VALUE']).otherwise(None))

In [None]:
# Analyse the ORIGINAL_LOAN_TO_VALUE column.
pandasDF = df.select(df['ORIGINAL_LOAN_TO_VALUE'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='ORIGINAL_LOAN_TO_VALUE', xlabel='Original Loan-to-Value (LTV)')

In [None]:
# Replace  the missing values with the median.
df = df.withColumn('ORIGINAL_LOAN_TO_VALUE', when(df['ORIGINAL_LOAN_TO_VALUE'].isNotNull(), df['ORIGINAL_LOAN_TO_VALUE']).otherwise(80))

### Explore CHANNEL

In [None]:
# Analyse the CHANNEL column. 
pandasDF = df.select(df['CHANNEL'], df['DELINQUENT']).toPandas()
analyseCategoricalData(pandasDF, column='CHANNEL', title='Channel')
pd.crosstab(pandasDF.CHANNEL, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

### Explore PROPERTY_TYPE

SF = Single Family
CO Condomonium
MH Manufactured Housing
CP: Commercial Property
PU: Planned Unit Development

In [None]:
# Analyse the PROPERTY_TYPE #SIngle Family,.

pandasDF = df.select(df['PROPERTY_TYPE'], df['DELINQUENT']).toPandas()
analyseCategoricalData(pandasDF, column='PROPERTY_TYPE',title='Property Type')
pd.crosstab(pandasDF.PROPERTY_TYPE, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

### Explore POSTAL_CODE

In [None]:
print('Number of loan records with unavailable postal codes:', df.filter(df['POSTAL_CODE'].isNull()).count())

### Explore LOAN_PURPOSE

In [None]:
# Analyse the LOAN_PURPOSE column
pandasDF = df.select(df['LOAN_PURPOSE'], df['DELINQUENT']).toPandas()
analyseCategoricalData(pandasDF, column='LOAN_PURPOSE',title='Loan Purpose')
pd.crosstab(pandasDF.LOAN_PURPOSE, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

### Explore ORIGINAL_LOAN_TERM

In [None]:
# Analyse the ORIGINAL_LOAN_TERM column (Months).
pandasDF = df.select(df['ORIGINAL_LOAN_TERM'], df['DELINQUENT']).toPandas()
analyseContinuousData(pandasDF, column='ORIGINAL_LOAN_TERM',xlabel='Original Loan Term')

In [None]:
# Replace mIssing with mean of 327
df = df.withColumn('ORIGINAL_LOAN_TERM', when(df['ORIGINAL_LOAN_TERM'].isNotNull(), df['ORIGINAL_LOAN_TERM']).otherwise(327))

### Explore NUMBER_OF_BORROWERS

In [None]:
# The number of borrowers is unavailable when its value is 99.
print('The number of loan records with unavailable borrower numbers:', df.filter(df['NUMBER_OF_BORROWERS'] == 99).count())

In [None]:
df.select(df['NUMBER_OF_BORROWERS']).distinct().show()

In [None]:
# Cast its data type to StringType.
df = df.withColumn('NUMBER_OF_BORROWERS', df['NUMBER_OF_BORROWERS'].cast(StringType()))

In [None]:
# Analyse the NUMBER_OF_BORROWERS column.
pandasDF = df.select(df['NUMBER_OF_BORROWERS'], df['DELINQUENT']).toPandas()
analyseCategoricalData(pandasDF, column='NUMBER_OF_BORROWERS',title='Number of Borrowers')
pd.crosstab(pandasDF.NUMBER_OF_BORROWERS, pandasDF.DELINQUENT, normalize='index').sort_values(by=1, ascending=False)

In [None]:
# Cast its data type back to DoubleType.
df = df.withColumn('NUMBER_OF_BORROWERS', df['NUMBER_OF_BORROWERS'].cast(DoubleType()))

##  EDA done


In [None]:
df.printSchema()

Check amount of Denlinquents and Non Delinquents, OverSampling

In [None]:
df.count()

### Check amount of Denlinquents and Non Delinquents, OverSampling

In [None]:
df.count()

In [None]:
# Split the dataframe into positive (DELINQUENT = 1) and negative (DELINQUENT = 0) examples
positive_df = df.filter(col("DELINQUENT") == 1)
negative_df = df.filter(col("DELINQUENT") == 0)
print(positive_df.count())
print(negative_df.count())

In [None]:
# Calculate the ratio of positive to negative examples
positive_count = positive_df.count()
negative_count = negative_df.count()
oversample_ratio = negative_count / positive_count
print(oversample_ratio)


In [None]:
# there is a high ratio of positive to negative, but for this scenario 2 should be enough
oversample_ratio = 2.0
# Oversample the positive examples
oversampled_positive_df = positive_df.sample(withReplacement=True, fraction=oversample_ratio, seed=42)

# Combine the oversampled positive examples with the original negative examples
balanced_df = negative_df.unionAll(oversampled_positive_df)


In [None]:
oversampled_positive_df.count()

In [None]:
balanced_df.count()

## Train a Gradient Boosting Decision Tree Model - Preparing Data

Since PySpark have problems, Change to Pandas Dataframe to use with XGBoost

In [None]:
pandas_df = balanced_df.toPandas()

In [None]:
shape = pandas_df.shape
print("shape:", shape)
schema = pandas_df.dtypes
print(schema)

In [None]:
null_columns = pandas_df.columns[pandas_df.isnull().any()]
if null_columns.empty:
    print("No null values found in any column.")
else:
    print("Null values found in the following columns:")
    for col in null_columns:
        print(col)

In [None]:
mappings = {}
# Label Encode And Save the Label Encoder 
label_encoder = LabelEncoder()
categorical_cols = ["FIRST_TIME_HOMEBUYER_FLAG", "OCCUPANCY_STATUS", "CHANNEL", "PROPERTY_TYPE", "LOAN_PURPOSE"]
for col in categorical_cols:
    pandas_df[col] = label_encoder.fit_transform(pandas_df[col])
    # Create a mapping dictionary
    mapping = dict(zip(range(len(label_encoder.classes_)), label_encoder.classes_))

    # Save the mapping dictionary to a CSV file
    mapping_df = pd.DataFrame.from_dict(mapping, orient='index', columns=[col])
    mapping_df.to_csv(f"{col}_mapping.csv", index_label='Encoded_Value')

    # Store the mapping dictionary in the mappings dictionary
    mappings[col] = mapping
    
    joblib.dump(label_encoder, f"{col}_mapping.joblib")
    

### Test the Label Encoder

In [None]:
# Sample data
data = [
    [450, 'N', 12345.0, 1, 'I', 74,20,10,2, 'R', 'SF', 54321.0, 'P', 360.0, 1.0],
    [200, 'Y', 23456.0, 2, 'P', 115,30, 100,3, 'C', 'CO', 65432.0, 'N', 240.0, 2.0],
    [650, 'N', 34567.0, 1, 'S', 100,40, 30,4, 'B', 'CP', 76543.0, 'C', 180.0, 1.0],
    [500, 'Y', 45678.0, 2, 'I', 50,50, 40, 5,'B', 'MH', 87654.0, 'C', 300.0, 3.0],
    [500, 'Y', 47664.0, 2, 'P', 69,10, 69,6, 'C', 'SF', 48300.0, 'N', 240.0, 1.0],
]

In [None]:
# Column names based on the given schema
columns = [
    'CREDIT_SCORE',
    'FIRST_TIME_HOMEBUYER_FLAG',
    'METROPOLITAN_STATISTICAL_AREA',
    'NUMBER_OF_UNITS', 
    'OCCUPANCY_STATUS',
    'ORIGINAL_COMBINED_LOAN_TO_VALUE',
    'ORIGINAL_DEBT_TO_INCOME_RATIO',
    'ORIGINAL_LOAN_TO_VALUE',
    'ORIGINAL_INTEREST_RATE',
    'CHANNEL',
    'PROPERTY_TYPE',
    'POSTAL_CODE',
    'LOAN_PURPOSE',
    'ORIGINAL_LOAN_TERM',
    'NUMBER_OF_BORROWERS',
    
]
test_DF = pd.DataFrame(data, columns=columns)

In [None]:
test_DF

In [None]:
for col in categorical_cols:
    label_encoder = joblib.load(f"{col}_mapping.joblib")
    test_DF[col] = label_encoder.fit_transform(test_DF[col])

In [None]:
test_DF

In [None]:
### Label Encoder Working as expected

In [None]:
### Split Data into Training and Testing

In [None]:
X = pandas_df.drop("DELINQUENT", axis=1)
y = pandas_df["DELINQUENT"]

In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

### Train Decision Tree - Filled with Best Parameters from GridSearch

In [None]:
# Create a decision tree classifier
dtModel = DecisionTreeClassifier(
    criterion='gini',  
    max_depth=20,       # Maximum depth of the tree
    min_samples_split=2,  # Minimum number of samples required to split an internal node
    min_samples_leaf=1,   # Minimum number of samples required to be at a leaf node
    random_state=42      # Random seed for reproducibility
)
start_time = datetime.now()
# Train the decision tree classifier
dtModel.fit(X_train, y_train)
end_time = datetime.now()
training_time = end_time - start_time

print("Training time = %s" % training_time)

# Make predictions on the testing set
testpreds = dtModel.predict(X_test)
trainpreds = dtModel.predict(X_train)

In [None]:
# Compute false positive rate, true positive rate, and thresholds
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, testpreds)
fpr_train, tpr_train, thresholds_train = roc_curve(y_train, trainpreds)

# Compute AUC-ROC for training and testing data
auc_roc_train = auc(fpr_train, tpr_train)
auc_roc_test = auc(fpr_test, tpr_test)
print(auc_roc_train)
print(auc_roc_test)

# Plot the ROC curves for training and testing data
plt.figure(figsize=(8, 6))
plt.plot(fpr_train, tpr_train, label='Train ROC curve (AUC = %0.2f)' % auc_roc_train)
plt.plot(fpr_test, tpr_test, label='Test ROC curve (AUC = %0.2f)' % auc_roc_test)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Decision Tree ROC Graph ')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Calculate evaluation metrics for Test
accuracy = accuracy_score(y_test, testpreds)
precision = precision_score(y_test, testpreds)
recall = recall_score(y_test, testpreds)
f1 = f1_score(y_test, testpreds)

# Create confusion matrix
confusion = confusion_matrix(y_test, testpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Calculate evaluation metrics for Train
accuracy = accuracy_score(y_train, trainpreds)
precision = precision_score(y_train, trainpreds)
recall = recall_score(y_train, trainpreds)
f1 = f1_score(y_train, trainpreds)

# Create confusion matrix
confusion = confusion_matrix(y_train, trainpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

### Now Random Forest - Filled with Best Parameters from GridSearch

In [None]:
# Create a Random Forest classifier with specified parameters
clf = RandomForestClassifier(
    n_estimators=100,    # Number of decision trees in the forest
    criterion='gini',    # Split criterion ('gini' or 'entropy')
    max_depth=20,      # Maximum depth of the trees
    min_samples_split=2, # Minimum number of samples required to split an internal node
    min_samples_leaf=1,  # Minimum number of samples required to be at a leaf node
    random_state=42      # Random seed for reproducibility
)

start_time = datetime.now()
# Train the Random Forest classifier
clf.fit(X_train, y_train)
end_time = datetime.now()
training_time = end_time - start_time
print("Training time = %s" % training_time)


In [None]:
# Make predictions on the testing set
testpreds = clf.predict(X_test)
trainpreds = clf.predict(X_train)


In [None]:
# Compute false positive rate, true positive rate, and thresholds
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, testpreds)
fpr_train, tpr_train, thresholds_train = roc_curve(y_train, trainpreds)

# Compute AUC-ROC for training and testing data
auc_roc_train = auc(fpr_train, tpr_train)
auc_roc_test = auc(fpr_test, tpr_test)
print(auc_roc_train)
print(auc_roc_test)

# Plot the ROC curves for training and testing data
plt.figure(figsize=(8, 6))
plt.plot(fpr_train, tpr_train, label='Train ROC curve (AUC = %0.2f)' % auc_roc_train)
plt.plot(fpr_test, tpr_test, label='Test ROC curve (AUC = %0.2f)' % auc_roc_test)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Random Forest ROC Graph ')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Calculate evaluation metrics for Test
accuracy = accuracy_score(y_test, testpreds)
precision = precision_score(y_test, testpreds)
recall = recall_score(y_test, testpreds)
f1 = f1_score(y_test, testpreds)

# Create confusion matrix
confusion = confusion_matrix(y_test, testpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Calculate evaluation metrics for Train
accuracy = accuracy_score(y_train, trainpreds)
precision = precision_score(y_train, trainpreds)
recall = recall_score(y_train, trainpreds)
f1 = f1_score(y_train, trainpreds)

# Create confusion matrix
confusion = confusion_matrix(y_train, trainpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

### XGBClassifier Decision Tree With Best Params


In [None]:
best_params = {'colsample_bytree': 1.0,
 'learning_rate': 0.1,
 'max_depth': 15,
 'n_estimators': 350,
 'subsample': 0.8}

In [None]:
start_time = datetime.now()
clf = xgb.XGBClassifier(**best_params)
clf.fit(X_train, y_train.astype(int))
end_time = datetime.now()
training_time = end_time - start_time
print("Training time = %s" % training_time)

In [None]:
# Make predictions on the testing set
testpreds = clf.predict(X_test)
trainpreds = clf.predict(X_train)

In [None]:
# Compute false positive rate, true positive rate, and thresholds
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, testpreds)
fpr_train, tpr_train, thresholds_train = roc_curve(y_train, trainpreds)

# Compute AUC-ROC for training and testing data
auc_roc_train = auc(fpr_train, tpr_train)
auc_roc_test = auc(fpr_test, tpr_test)
print(auc_roc_train)
print(auc_roc_test)

# Plot the ROC curves for training and testing data
plt.figure(figsize=(8, 6))
plt.plot(fpr_train, tpr_train, label='Train ROC curve (AUC = %0.2f)' % auc_roc_train)
plt.plot(fpr_test, tpr_test, label='Test ROC curve (AUC = %0.2f)' % auc_roc_test)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('XGBoost ROC Graph ')
plt.legend(loc='lower right')
plt.show()

In [None]:

# Calculate evaluation metrics for test
accuracy = accuracy_score(y_test, testpreds)
precision = precision_score(y_test, testpreds)
recall = recall_score(y_test, testpreds)
f1 = f1_score(y_test, testpreds)

# Create confusion matrix
confusion = confusion_matrix(y_test, testpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Calculate evaluation metrics for Train
accuracy = accuracy_score(y_train, trainpreds)
precision = precision_score(y_train, trainpreds)
recall = recall_score(y_train, trainpreds)
f1 = f1_score(y_train, trainpreds)

# Create confusion matrix
confusion = confusion_matrix(y_train, trainpreds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Too Big Plot Decision Tree
# plot_tree(clf, rankdir='LR', num_trees=1)
# plt.show()

## Save Model

In [None]:

clf.save_model("model.json")

In [None]:
# Get feature importance
importance = clf.feature_importances_

# Create a DataFrame to display feature importance
feature_importance_df = pd.DataFrame({'Feature': X.columns, 'Importance': importance})
feature_importance_df = feature_importance_df.sort_values(by='Importance', ascending=False)

# Print or display the feature importance
print(feature_importance_df)

In [None]:
stop

# Grid Search for best parameters


### Decision Tree

In [None]:
dt_param_grid = {
    'criterion': ['gini', 'entropy'],
    'max_depth': [5, 10, 15,20],
    'min_samples_split': [2, 5, 10,15],
    'min_samples_leaf': [1, 2, 4,8]
}
kfold = KFold(n_splits=5, shuffle=True, random_state=42)


In [None]:
# Create a Decision Tree classifier and perform grid search
dt_clf = DecisionTreeClassifier()
dt_grid_search = GridSearchCV(dt_clf, dt_param_grid, cv=kfold)

start_time = datetime.now()
dt_grid_search.fit(X_train, y_train)

end_time = datetime.now()
training_time = end_time - start_time
print("Training time = %s" % training_time)


In [None]:
# Get the best Decision Tree model and evaluate on the test set
dt_best_model = dt_grid_search.best_estimator_
dt_predictions = dt_best_model.predict(X_test)
dt_accuracy = accuracy_score(y_test, dt_predictions)
print("Decision Tree Accuracy:", dt_accuracy)

In [None]:
dt_predictions = clf.predict(X_test)
# Calculate evaluation metrics
accuracy = accuracy_score(y_test, dt_predictions)
precision = precision_score(y_test, dt_predictions)
recall = recall_score(y_test, dt_predictions)
f1 = f1_score(y_test, dt_predictions)

# Create confusion matrix
confusion = confusion_matrix(y_test, dt_predictions)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Print the Best Parameters
best_params = dt_grid_search.best_params_

In [None]:
best_params

In [None]:
### Random Forest

In [None]:
# Define the parameter grid for Random Forest
rf_param_grid = {
    'n_estimators': [100, 150, 200],
    'criterion': ['gini', 'entropy'],
    'max_depth': [5, 10,15,20],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}
kfold = KFold(n_splits=5, shuffle=True, random_state=42)


In [None]:
# Create a Random Forest classifier and perform grid search
rf_clf = RandomForestClassifier()
rf_grid_search = GridSearchCV(rf_clf, rf_param_grid, cv=kfold)
start_time = datetime.now()
rf_grid_search.fit(X_train, y_train)
end_time = datetime.now()
training_time = end_time - start_time
print("Training time = %s" % training_time)


In [None]:
# Get the best Random Forest model and evaluate on the test set
rf_best_model = rf_grid_search.best_estimator_
rf_predictions = rf_best_model.predict(X_test)
rf_accuracy = accuracy_score(y_test, rf_predictions)
print("Random Forest Accuracy:", rf_accuracy)

In [None]:
best_params = rf_grid_search.best_params_

In [None]:
best_params

### XGboost

In [None]:
# Specify the Parameters for XGBoost
param_grid = {
    'max_depth': [3, 5, 7,9,11,13,15],
    'learning_rate': [0.1, 0.01, 0.001],
    'n_estimators': [100,150, 200,250, 300,350,400],
    'subsample': [0.8, 1.0,1.2,1.4],
    'colsample_bytree': [0.8, 1.0,1.2,1.4]
}
kfold = KFold(n_splits=5, shuffle=True, random_state=42)


In [None]:
start_time = datetime.now()
model = xgb.XGBClassifier()
grid_search = GridSearchCV(model, param_grid, scoring='accuracy', cv=kfold)
grid_search.fit(X_train, y_train)
end_time = datetime.now()
training_time = end_time - start_time
print("Training time = %s" % training_time)

In [None]:
best_params = grid_search.best_params_
best_model = grid_search.best_estimator_


In [None]:
best_params

In [None]:

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, preds)
precision = precision_score(y_test, preds)
recall = recall_score(y_test, preds)
f1 = f1_score(y_test, preds)

# Create confusion matrix
confusion = confusion_matrix(y_test, preds)
confusion_df = pd.DataFrame(confusion, columns=['Predicted 0', 'Predicted 1'], index=['Actual 0', 'Actual 1'])

# Print evaluation metrics and confusion matrix
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)
print("Confusion Matrix:")
print(confusion_df)

In [None]:
# Get feature importance
importance = best_model.feature_importances_

# Create a DataFrame to display feature importance
feature_importance_df = pd.DataFrame({'Feature': X.columns, 'Importance': importance})
feature_importance_df = feature_importance_df.sort_values(by='Importance', ascending=False)

# Print or display the feature importance
print(feature_importance_df)