# Building Models to Predict Loan Default

## Table of Contents
*  
    * [Part 1 : Data Loading, Transformation and Exploration](#part-1)
    * [Part 2 : Feature extraction and ML training](#part-2)
    * [Part 3 : Applicant Segmentation and Knowledge sharing with K-Mean](#part-3)

 


# Part 1: Data Loading, Transformation and Exploration <a class="anchor" name="part-1"></a>
## 1.1 Data Loading

In [None]:
# Import classes into program
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

# setting number of CPU cores to be used. * indicates all available cores
master = "local[*]"
# name of application
app_name = "Big Data App 2"
# setting spark config variable
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# SparkSession builder
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [None]:
from pyspark.sql.types import *

schema_previous_application = schema = StructType([
StructField("id_app",IntegerType(),True),
StructField("contract_type_previous",IntegerType(),True),
StructField("amt_annuity_previous",FloatType(),True),
StructField("amt_application",FloatType(),True),
StructField("amt_credit_previous",FloatType(),True),
StructField("amt_down_payment",FloatType(),True),
StructField("amt_goods_price_previous",FloatType(),True),
StructField("hour_appr_process_start",IntegerType(),True),
StructField("rate_down_payment",FloatType(),True),
StructField("rate_interest_primary",FloatType(),True),
StructField("rate_interest_privileged",FloatType(),True),
StructField("name_cash_loan_purpose",StringType(),True),
StructField("name_contract_status",StringType(),True),
StructField("days_decision",IntegerType(),True),
StructField("name_payment_type",StringType(),True),
StructField("code_rejection_reason",StringType(),True),
StructField("name_type_suite",StringType(),True),
StructField("name_client_type",StringType(),True),
StructField("name_goods_category",StringType(),True),
StructField("name_portfolio",StringType(),True),
StructField("name_product_type",StringType(),True),
StructField("channel_type",StringType(),True),
StructField("sellerplace_area",IntegerType(),True),
StructField("name_seller_industry",StringType(),True),
StructField("cnt_payment",FloatType(),True),
StructField("name_yield_group",StringType(),True),
StructField("product_combination",StringType(),True),
StructField("days_first_drawing",FloatType(),True),
StructField("days_first_due",FloatType(),True),
StructField("days_last_due_1st_version",FloatType(),True),
StructField("days_last_due",FloatType(),True),
StructField("days_termination",FloatType(),True),
StructField("nflag_insured_on_approval",FloatType(),True),
StructField("id",LongType (),True)
])

schema_application = schema = StructType([ 
StructField("id_app",IntegerType(),True),
StructField("target",IntegerType(),True),
StructField("contract_type",IntegerType(),True),
StructField("gender",StringType(),True),
StructField("own_car",StringType(),True),
StructField("own_property",StringType(),True),
StructField("num_of_children",IntegerType(),True),
StructField("income_total",FloatType(),True),
StructField("amt_credit",FloatType(),True),
StructField("amt_annuity",FloatType(),True),
StructField("amt_goods_price",FloatType(),True),
StructField("income_type",IntegerType(),True),
StructField("education_type",IntegerType(),True),
StructField("family_status",IntegerType(),True),
StructField("housing_type",IntegerType(),True),
StructField("region_population",FloatType(),True),
StructField("days_birth",IntegerType(),True),
StructField("days_employed",IntegerType(),True),
StructField("own_car_age",FloatType(),True),
StructField("flag_mobile",IntegerType(),True),
StructField("flag_emp_phone",IntegerType(),True),
StructField("flag_work_phone",IntegerType(),True),
StructField("flag_cont_mobile",IntegerType(),True),
StructField("flag_phone",IntegerType(),True),
StructField("flag_email",IntegerType(),True),
StructField("occupation_type",IntegerType(),True),
StructField("cnt_fam_members",FloatType(),True),
StructField("weekday_app_process_start",StringType(),True),
StructField("hour_app_process_start",IntegerType(),True),
StructField("organization_type",IntegerType(),True),
StructField("credit_score_1",FloatType(),True),
StructField("credit_score_2",FloatType(),True),
StructField("credit_score_3",FloatType(),True),
StructField("days_last_phone_change",FloatType(),True),
StructField("amt_credit_req_last_hour",FloatType(),True),
StructField("amt_credit_req_last_day",FloatType(),True),
StructField("amt_credit_req_last_week",FloatType(),True),
StructField("amt_credit_req_last_month",FloatType(),True),
StructField("amt_credit_req_last_quarter",FloatType(),True),
StructField("amt_credit_req_last_year",FloatType(),True)
])

In [None]:
df_application = spark.read.format( 
    "csv").schema(schema_application).option( 
    "header", True).load("data/application_data.csv") 

df_previous_application = spark.read.format( 
    "csv").schema(schema_previous_application).option( 
    "header", True).load("data/previous_application.csv") 

df_value_dict = spark.read.format( 
    "csv").option( 
    "header", True).load("data/value_dict.csv") 

df_loan_default = spark.read.format( 
    "csv").option( 
    "header", True).load("data/loan_default.csv") 

df_application.printSchema()
df_previous_application.printSchema()
df_value_dict.printSchema()
df_loan_default.printSchema()

### 1.2 Data Transformation and Create Features <a class="anchor" name="1.2"></a>
In this step, we’re going to perform data transformation and create some new features using existing information. 

- Adding a new column loan_to_income_ratio which is the ratio between the loan amount and the income of the applicant.

In [None]:
from pyspark.sql.functions import col

# Adding new column loan_to_income_ratio
df_application = df_application.withColumn("loan_to_income_ratio", (col("amt_credit")/col("income_total")))
# df_application.show()

- 1.2.2. Performing age bucketing and create a new string column called age_bucket and set the values below:  
    age < 25: Y  
    25 <= age <35: E  
    35 <= age <45: M  
    45 <= age < 55: L  
    55 <= age < 65: N  
    Age >= 65: R

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import IntegerType, DateType

# Creating a column with age values calculated from days_birth
df_application = df_application.withColumn("age", (F.abs(F.floor(\
                    F.col("days_birth") / 365.2425))-1).\
                                        cast(IntegerType()))

# Creating a column age_bucket using the values from age column
df_application = df_application.withColumn("age_bucket", when(col("age") < 25, "Y")
                                                        .when((col("age") >= 25) & (col("age") < 35), "E")
                                                    .when((col("age") >= 35) & (col("age") < 45), "M")
                                                    .when((col("age") >= 45) & (col("age") < 55), "L")
                                                    .when((col("age") >= 55) & (col("age") < 65), "N")
                                                    .when(col("age") >= 65, "R")
                                                    .otherwise(""))

# df_application.select("id_app","age","age_bucket").show()

- Creating a new string column named credit_worthiness. It takes the average value of credit_score_1,2,3. If the average >= 0.7, set credit_worthiness to “high”; 0.4 <= average <= 0.7 set to “medium”, < 0.4 set to “low”.

In [None]:
# setting all null values to 0.5
df_application = df_application.fillna({'credit_score_1':'0.5','credit_score_2':'0.5','credit_score_3':'0.5'})
df_application = df_application.withColumn("avg_credit", ((col("credit_score_1") + col("credit_score_2")\
                                                           + col("credit_score_3")) / 3))
df_application = df_application.withColumn("amt_to_income_ratio", ((col("credit_score_1") \
                                             + col("credit_score_2") + col("credit_score_3")) / 3))
df_application = df_application.withColumn("credit_worthiness", when(((col("credit_score_1") \
                                            + col("credit_score_2") + col("credit_score_3")) / 3)>0.7, "high")
                                    .when((((col("credit_score_1") + col("credit_score_2") + \
                                    col("credit_score_3")) / 3)>=0.4) & (((col("credit_score_1") + \
                                    col("credit_score_2") + col("credit_score_3")) / 3)<=0.7), "medium")
                                    .when(((col("credit_score_1") + col("credit_score_2") + \
                                    col("credit_score_3")) / 3)<0.4, "low")
                                    .otherwise(""))

df_application.select("id_app", "credit_score_1", "credit_score_2", "credit_score_3", "avg_credit",\
                      "credit_worthiness").show()

- Creating 4 columns: num_of_prev_app(number of previous applications), num_of_approved_app (number of approved applications), total_credit (sum of credit of all approved previous applications), total_credit_to_income_ratio (total credit/income).

In [None]:
from pyspark.sql.types import IntegerType

# Joining Application Data and Previous Application 
joined_df = df_application.join(df_previous_application, "id_app")

# Creating num_of_prev_app column
prev_app_count = joined_df.groupBy("id_app").count().withColumnRenamed("count", "num_of_prev_app")
joined_df = joined_df.join(prev_app_count, "id_app")
 
# Creating num_of_approved_app column by filtering and grouping
approved_applications = df_previous_application.filter(col("name_contract_status") == "Approved")
approved_applications_count = approved_applications.groupBy("id_app").count()\
.withColumnRenamed("count", "num_of_approved_app")
joined_df = joined_df.join(approved_applications_count, "id_app")

# Creating total_credit using approved applications
total_credit = approved_applications.groupBy("id_app").agg(F.sum("amt_credit_previous").alias("total_credit"))
joined_df = joined_df.join(total_credit, "id_app").dropDuplicates(["id_app"])

# Calculating total_credit_to_income_ratio using total_credit
joined_df = joined_df.withColumn("total_credit_to_income_ratio", (col("total_credit")/col("income_total")))

# joined_df.printSchema()

joined_df.select("id_app", "num_of_prev_app", "num_of_approved_app","total_credit", \
                 "total_credit_to_income_ratio").orderBy("id_app").show()

- Replacing education_type, occupation_type, income_type and family_status with matching strings from value_dict.

In [None]:
mapped_df = df_application.join(df_value_dict, (df_value_dict["category"] == \
                                    "education_type") & ((df_application["education_type"]) \
                                        == df_value_dict["value"]), "left_outer") \
    .withColumn("mapped_value", col("key")) \
    .drop("id", "category", "key", "value")

- Join the loan_default data frame and add is_default to application data.

In [None]:
final_application_df = joined_df.join(df_loan_default, "id_app")

- Printing 10 records from the application_data data frame.

In [None]:
final_application_df.show(10)

### 1.3 Exploring the Data <a class="anchor" name="1.3"></a>
 With the transformed data frame from 1.2, we write code to show the basic statistics:
- For each numeric column, we show count, mean, stddev, min, max, 25 percentile, 50 percentile, and 75 percentile;  
- For each non-numeric column, we display the top 5 based on counts in descending order;  
- For each boolean column, we display the value and count(i.e., two rows in total).

In [None]:
numeric_columns1 = [col(column) for column in final_application_df.columns if \
                    final_application_df.schema[column].dataType in [IntegerType()]]
numeric_columns2 = [col(column) for column in final_application_df.columns if \
                    final_application_df.schema[column].dataType in [DoubleType()]]
numeric_columns3 = [col(column) for column in final_application_df.columns if \
                    final_application_df.schema[column].dataType in [FloatType()]]
non_numeric_columns = [col(column) for column in final_application_df.columns \
                       if final_application_df.schema[column].dataType not in \
                       [IntegerType(), FloatType(), DoubleType()]]
boolean_cols = ['is_default','own_car','own_property','flag_emp_phone',\
                'flag_work_phone','flag_phone','flag_email']

numeric_columns_stats = final_application_df.select(numeric_columns1).describe()
numeric_columns_stats.show()

numeric_columns_stats = final_application_df.select(numeric_columns2).describe()
numeric_columns_stats.show()

numeric_columns_stats = final_application_df.select(numeric_columns3).describe()
numeric_columns_stats.show()

for column in non_numeric_columns:
    top_values = final_application_df.groupBy(column).count().orderBy(col("count").desc()).limit(5)
    print(f"Top 5 values for {column} based on counts:")
    top_values.show(truncate=False)
    
# Display value and count for each boolean column
for col_name in boolean_cols:
    value_counts = final_application_df.groupBy(col_name).count()
    print(f"Counts for boolean column '{col_name}':")
    value_counts.show()

- Exploring the data frame and writing code to present two plots worthy of presentation.


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Load your dataset (replace 'your_dataset.csv' with your actual file)
df = final_application_df.toPandas().sample(frac=0.1, replace=True, random_state=1)

# Choose the attribute you want to analyze (replace 'selected_attribute' with your attribute column)
selected_attribute = 'total_credit_to_income_ratio'

# Create a bar plot or box plot to visualize the relationship with the boolean column
plt.figure(figsize=(10, 6))

# Bar Plot (for categorical variables)
sns.boxplot(y=selected_attribute, x='is_default', data=df)

# Box Plot (for numerical variables)
# sns.boxplot(x='is_default', y=selected_attribute, data=df)

plt.title(f'Correlation between {selected_attribute} and is_default')
plt.ylabel(selected_attribute)
plt.xlabel('is_default')
plt.show()

In the above graph, we try to correlate the attribute total_credit_to_income_ratio with is_default. From the graph we can see that higher the total_credit_to_income_ratio, more likely is the applicant to default. Logically this makes sense as well. a high total_credit_to_income_ratio indicates that the applicant is requesting for a higher loan credit than their income, i.e. higher the total_credit_to_income_ratio, greater is the payment period, hence higher the chances of defaulting. Majority of defaults occur when applicants apply for loans beyond their means, which is indicated by a high total_credit_to_income_ratio

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Load your dataset (replace 'your_dataset.csv' with your actual file)
df = final_application_df.toPandas().sample(frac=0.1, replace=True, random_state=1)

# Choose the attribute you want to analyze (replace 'selected_attribute' with your attribute column)
selected_attribute = 'credit_worthiness'

# Create a bar plot or box plot to visualize the relationship with the boolean column
plt.figure(figsize=(10, 6))

# Bar Plot (for categorical variables)
# sns.barplot(x=selected_attribute, y='is_default', data=df, ci=None)
g = sns.catplot(
    data=df, kind="bar",
    x="credit_worthiness", y="avg_credit", hue="is_default",
    errorbar="sd", palette="dark", alpha=.6, height=6
)

g.despine(left=True)
g.set_axis_labels("credit_worthiness", "avg_credit")
g.legend.set_title("")

# Box Plot (for numerical variables)
# sns.boxplot(x='is_default', y=selected_attribute, data=df)

# plt.title(f'Correlation between {selected_attribute} and is_default')
# plt.ylabel(selected_attribute)
# plt.xlabel('is_default')
# plt.show()

The Above graph plots the total number of defaults for each category of credit_worthiness. There are 3 categories of credit_worthiness: high, medium and low. As is expected, there are more loan applicants from high credit worthiness than low or medium. The graph also indcates that for high credit_worthiness, the defaults are less than 50% of the high applicants. Whereas for low and medium, the defaults are greater than 50% of the low and medium applicants. This shows that credit scores are a worthy indicator for an applicants financial situation and their liability for defaults. Despite averaging the credit scores and grouping them into 3 categories, it is distinguishable to acertain the expectation of an applicants default. Credit scores are strong indicators of financial nous.

- We print the correlation matrix to find the columns that would be more or less useful for Feature extraction and training

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

# plot_df = final_application_df.select("credit_worthiness","total_credit_to_income_ratio",\
#                                                    "income_type","housing_type", "is_default")

# final_application_df.show()

categorical_columns = ["sellerplace_area","name_seller_industry","cnt_payment",\
                    "name_yield_group","product_combination","days_first_drawing",\
                    "days_first_due","days_last_due_1st_version","days_last_due",\
                       "days_termination","nflag_insured_on_approval","id","num_of_prev_app",\
                       "num_of_approved_app","total_credit","total_credit_to_income_ratio","is_default"]
indexers = StringIndexer(inputCols=[column for column in categorical_columns], \
                         outputCols=[f"{column}_index" for column in categorical_columns])
indexers.setHandleInvalid("keep")
df_transformed = indexers.fit(final_application_df).transform(final_application_df)

# pipeline = Pipeline(stages=indexers)
# df_transformed = pipeline.fit(final_application_df).transform(final_application_df)

columnss = [f"{column}_index" for column in categorical_columns]
# column_str = columnss.join(',')
check_df = df_transformed.select("sellerplace_area_index","name_seller_industry_index",\
                                 "cnt_payment_index","name_yield_group_index",\
                                 "product_combination_index","days_first_drawing_index",\
                                 "days_first_due_index","days_last_due_1st_version_index",\
                                 "days_last_due_index","days_termination_index",\
                                 "nflag_insured_on_approval_index","id_index",\
                                 "num_of_prev_app_index","num_of_approved_app_index",\
                                 "total_credit_index","total_credit_to_income_ratio_index","is_default_index")

train_data, test_data = check_df.randomSplit([0.99, 0.01], seed=42)

plot1_pandas = test_data.toPandas()

# print(plot1_pandas)

correlation_matrix = plot1_pandas.corr()

# Extract correlations between other columns and the target column
target_correlations = correlation_matrix['is_default_index']

# Print or analyze the correlation values
print(target_correlations)

# sns.scatterplot(x='total_credit_to_income_ratio_index', y='income_type', \
# hue='is_default', data=plot1_pandas, palette={'true': 'blue', 'false': 'red'})
# plt.title("Scatter Plot with Color based on Boolean Value")
# plt.show()

# plt.scatter(plot1_pandas["total_credit_to_income_ratio_index"], plot1_pandas\
# ["credit_worthiness_index"], c=plot1_pandas["is_default"], cmap='Spectral')
# plt.colorbar()
# plt.title('Simple Scatter plot')
# plt.xlabel('X - value')
# plt.ylabel('Y - value')
# plt.show()

# fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2)
# fig.suptitle('Sharing x per column, y per row')
# ax1.plot(plot1_pandas["income_type_index"], plot1_pandas["housing_type_index"])
# ax2.plot(plot1_pandas["housing_type_index"], plot1_pandas["is_default"], 'tab:orange')
# ax3.plot(plot1_pandas["total_credit_to_income_ratio_index"], plot1_pandas["is_default"], 'tab:green')
# ax4.plot(plot1_pandas["credit_worthiness_index"], plot1_pandas["is_default"], 'tab:red')
# # plot1_df = chosen_ones.toPandas()
# corr_matrix = plot1_df.corr()

## Part 2. Feature extraction and ML training <a class="anchor" name="part-2"></a>

### 2.1 Feature selection and preparation of the feature columns

- Based on the data exploration from 1.2 and considering the use case, we discuss the importance of those features. 


rate_interest_primary, rate_interest_privileged, occupation_type and cnt_fam_members can be removed
as they have got a correlation index with is_default either >0.01 or <-0.01. That shows extremely low dependence on those attributes.
additionally num_of_children, housing_type, days_birth, days_employed, age, channel_type 
along with the respective non index columns can also be removed as they have a correlation index barely over 0.01
We can also group multiple columns together to increase their correlation index with is_default. In this example i will be
grouping together credit_worthiness, total_credit_to_income_ratio, income_total, amt_credit, previous application data and more.

- Code to create/transform the columns based on your discussion above. 


In [None]:
df = final_application_df.select("gender","income_type","education_type","family_status",\
                                 "housing_type","occupation_type","loan_to_income_ratio",\
                                 "age_bucket","credit_worthiness","avg_credit",\
                                 "num_of_approved_app","total_credit_to_income_ratio","is_default")

### 2.2 Preparing Spark ML Transformers/Estimators for features, labels, and models  <a class="anchor" name="2.2"></a>

- Code to create Transformers/Estimators for transforming/assembling the columns you selected above in 2.1, and creating ML model Estimators for Random Forest (RF) and Gradient-boosted tree (GBT) model.  


In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline


categorical_columns=["gender", "education_type", "income_type","family_status",\
                     "housing_type","occupation_type",\
                     "is_default"]
categorical_columns_no_default=["gender", "education_type", "income_type",\
                                "family_status","housing_type","occupation_type",\
                                ]

# Feature Transformation
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in categorical_columns]

assembler = VectorAssembler(
    inputCols= [f"{col}_index" for col in categorical_columns_no_default],
    outputCol="features"
)

# Model Estimators
rf = RandomForestClassifier(labelCol="is_default_index", featuresCol="features", numTrees=10)
gbt = GBTClassifier(labelCol="is_default_index", featuresCol="features", maxIter=10)

- Code to include the above Transformers/Estimators into two pipelines(RF and GBT).


In [None]:
pipeline_rf = Pipeline(stages=indexers + [assembler, rf])
pipeline_gbt = Pipeline(stages=indexers + [assembler, gbt])

### 2.3 Prepare, Train and Evaluate models  
- Code to split the data for training and testing purposes.

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

- Code to use the corresponding ML Pipelines to train the models on the training data. Then using the trained models we predict the testing data from 2.3.

In [None]:
model_rf = pipeline_rf.fit(train_data)
model_gbt  = pipeline_gbt.fit(train_data)

predictions_rf = model_rf.transform(test_data)
predictions_gbt = model_gbt.transform(test_data)

- For both models(RF and GBT) and testing data, we write code to display the count of TP/TN/FP/FN. Compute the AUC, accuracy, recall, and precision for the above-threshold/below-threshold label from each model testing result using pyspark MLlib/ML APIs.

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import matplotlib.pyplot as plt
import numpy as np

#For Random Forest
# TP,TN,FP,FN Values

tn_rf = predictions_rf.filter('prediction = 0.0 AND is_default_index = 0.0').count()
tp_rf = predictions_rf.filter('prediction = 1.0 AND is_default_index = 1.0').count()
fp_rf = predictions_rf.filter('prediction = 0.0 AND is_default_index = 1.0').count()
fn_rf = predictions_rf.filter('prediction = 1.0 AND is_default_index = 0.0').count()

predictions_rf.groupBy('is_default_index', 'prediction').count().show()

# calculate metrics by the confusion matrix
accuracy_rf = (tn_rf+tp_rf)/(tn_rf+tp_rf+fn_rf+fp_rf)
precision_rf = tp_rf/(tp_rf+fp_rf)
recall_rf = tp_rf/(tp_rf+fn_rf) 

auc_evaluator_rf = BinaryClassificationEvaluator(labelCol='is_default_index', metricName='areaUnderROC')
auc_rf = auc_evaluator_rf.evaluate(predictions_rf)

# roc_values = []
# for threshold in np.linspace(0, 1, 100):        
#         prob_df=prob_df.withColumn('prediction',F.when(prob_df.positive_prob > threshold,1).otherwise(0))
#         tp,tn,fp,fn = confusion_matrix(prob_df)  
#         tpr = tp/(tp+fn)
#         fpr = fp/(fp+tn)
#         print('Threshold:',threshold,'TPR:',tpr,'FPR:',fpr)
#         roc_values.append([tpr, fpr])
# tpr_values, fpr_values = zip(*roc_values)

# fig, ax = plt.subplots(figsize=(10,7))
# ax.plot(fpr_values,tpr_values)
# ax.plot(np.linspace(0, 1, 100),
#          np.linspace(0, 1, 100),
#          label='baseline',
#          linestyle='--')
# plt.title('Receiver Operating Characteristic Curve', fontsize=18)
# plt.ylabel('TPR', fontsize=16)
# plt.xlabel('FPR', fontsize=16)
# plt.legend(fontsize=12);

print("For Random Forest")
print(f"TP: {tp_rf}")
print(f"TN: {tn_rf}")
print(f"FP: {fp_rf}")
print(f"FN: {fn_rf}")

print(f"Accuracy: {accuracy_rf}")
print(f"Precision: {precision_rf}")
print(f"Recall: {recall_rf}")
print(f"Area Under ROC Curve: {auc_rf}")

#For Gradient Boosted Trees
# TP,TN,FP,FN Values

tn_gbt = predictions_gbt.filter('prediction = 0.0 AND is_default_index = 0.0').count()
tp_gbt = predictions_gbt.filter('prediction = 1.0 AND is_default_index = 1.0').count()
fp_gbt = predictions_gbt.filter('prediction = 0.0 AND is_default_index = 1.0').count()
fn_gbt = predictions_gbt.filter('prediction = 1.0 AND is_default_index = 0.0').count()

predictions_gbt.groupBy('is_default_index', 'prediction').count().show()

# calculate metrics by the confusion matrix
accuracy_gbt = (tn_gbt+tp_gbt)/(tn_gbt+tp_gbt+fn_gbt+fp_gbt)
precision_gbt = tp_gbt/(tp_gbt+fp_gbt)
recall_gbt = tp_gbt/(tp_gbt+fn_gbt) 

auc_evaluator_gbt = BinaryClassificationEvaluator(labelCol='is_default_index', metricName='areaUnderROC')
auc_gbt = auc_evaluator_gbt.evaluate(predictions_gbt)


print("For Gradient Boosted Trees")
print(f"TP: {tp_gbt}")
print(f"TN: {tn_gbt}")
print(f"FP: {fp_gbt}")
print(f"FN: {fn_gbt}")

print(f"Accuracy: {accuracy_gbt}")
print(f"Precision: {precision_gbt}")
print(f"Recall: {recall_gbt}")
print(f"Area Under ROC Curve: {auc_gbt}")

After comparing the results of both models, I have decided to save Gradient Boosted Trees Model as it provides a higher accuracy, precision and greater Area under the ROC Curve.

In [None]:
model_gbt.save('model/gbt_model2/')

### Part 3. Applicant Segmentation and Knowledge sharing with K-Mean <a class="anchor" name="part-3"></a>  
- Utilize K-Mean clustering/hyperparameter tuning to find the optimal K value and train the model.


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Extracting required data
df_kmeans = final_application_df.select("income_type","avg_credit",\
                                        "total_credit_to_income_ratio","own_car",\
                                        "own_property","income_total","housing_type",\
                                        "occupation_type")

indexer = StringIndexer(inputCols=["income_type","own_car","own_property","housing_type",\
                                   "occupation_type"],outputCols=["income_type_index",\
                                    "own_car_index","own_property_index","housing_type_index",\
                                        "occupation_type_index"])
indexed_transformer = indexer.fit(df_kmeans)
indexed_data = indexed_transformer.transform(df_kmeans)

assembler = VectorAssembler(inputCols=["income_type_index","own_car_index","own_property_index",\
                                       "housing_type_index","occupation_type_index","avg_credit",\
                                       "total_credit_to_income_ratio","income_total"],outputCol='features')
assembled_data = assembler.transform(indexed_data)
evaluator = ClusteringEvaluator()

silhouette_arr=[]
for k in range(2,10):
    k_means= KMeans(featuresCol='features', k=k)
    model = k_means.fit(assembled_data)
    predictions = model.transform(assembled_data)
    silhouette = evaluator.evaluate(predictions)
    silhouette_arr.append(silhouette)
    print('No of clusters:',k,'Silhouette Score:',silhouette)

fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),silhouette_arr)
ax.set_xlabel('k')
ax.set_ylabel('cost')


For the purposes of this exercise, I have decided to segment the applicants based on their finances. 
As figuring out a persons finances arent as straight forward, I have decided to use few data points
to determine an applicants finance. Income_type, income_total and occupation_type will help us figure out what 
the primary source of income is for each applicant. avg_credit gives the applicants credit score, which useful 
in helping identify their financial skills. total_credit_to_income_ratio tells us if the persons credit expenses 
are in line with their income. own_car and own_property are great identifiers of their financial situation, it could 
either tell us they can afford luxuries or they are in debt, this can be further strenghtened by housing_type.
We can group all of these data points the segment the applicants. Using K means, I have Identified that the ideal 
number of clusters are 5. This has a Silhouette score of 0.7544790092012427 which strikes the right balance of 
segmentation and silhouette scores. Using this we can segment the applicants into 5 categories i.e. High, Mid-high, Mid
Mid-low and Low finances. Naturally, MoLoCo can use this segmentation as a pre approvl process for loan amounts.
This helps hedge against the risk of lending high amounts to applicants who may never be able to pay of the loans.
Based on combinations of the above 8 factors, applicants can be grouped into the 5 categories.