###This jupyter notebook file contains all codes and related information to teach machine learning model for credit risk management system.

**Author : Umidjon Sattorov. Student at Skillbox platform and machine learning engineer.**

In [1]:
#Configuring google colab to work with pyspark
!pip install pyspark --quiet

In [1]:
#Configuring google colab to work with pyspark
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet
!apt install openjdk-8-jdk-headless &> /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 8g --executor-memory 8g pyspark-shell"

!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')

In [2]:
#Building pyspark environment
from pyspark.sql import SparkSession


### Ваш код здесь ###
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('Credit Risk Management')\
        .config("spark.driver.memory", "8g")\
        .getOrCreate()

In [3]:
#Adding google drive to colab environment.
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [3]:
#Importing necessary modules and libraries
#Data processing
import pandas as pd
import numpy as np

#Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import missingno as msno
import plotly.express as px

#Preprocessing and feature engineering
from pyspark.sql.functions import monotonically_increasing_id, col
from pyspark.ml.feature import VectorAssembler
from sklearn.model_selection import train_test_split, RandomizedSearchCV, StratifiedGroupKFold
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import IsolationForest
from sklearn.impute import KNNImputer

#Modelling
from pyspark.ml.classification import MultilayerPerceptronClassifier

#Metrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [4]:
#Reading dataframe in pandas dataframe format
df = pd.read_csv("./data/train_full.csv")
df.head()

Unnamed: 0,id,rn,pre_since_opened,pre_since_confirmed,pre_loans_credit_limit,pre_loans_next_pay_summ,pre_loans_outstanding,pre_loans_total_overdue,pre_loans_max_overdue_sum,pre_loans_credit_cost_rate,...,total_enc_paym,deviation_pfterm,deviation_till_close_pfdates,total_overdues,is_zero_loans5,is_zero_loans530,is_zero_loans3060,is_zero_loans6090,is_zero_loans90,flag
0,0,5.5,8.1,7.6,9.6,3.1,2.9,0.0,2.0,8.0,...,42.7,0.4,0.7,39.0,1,1,1,1,1,0
1,1,7.5,11.428571,7.642857,8.142857,2.142857,2.928571,0.0,1.928571,3.714286,...,44.285714,1.285714,3.071429,39.0,1,1,1,1,1,0
2,2,2.0,8.333333,10.666667,1.666667,1.333333,3.0,0.0,1.666667,4.0,...,49.666667,-1.0,-4.0,34.333333,1,1,1,1,1,0
3,3,8.0,7.0,7.333333,9.866667,2.6,3.266667,0.0,2.0,4.466667,...,28.8,0.2,-2.933333,39.0,1,1,1,1,1,0
4,4,1.0,12.0,9.0,12.0,1.0,3.0,0.0,2.0,0.0,...,78.0,4.0,-10.0,39.0,1,1,1,1,1,0


In [5]:
#Dividing dataset into training and test sets
x = df.drop(columns = ['id', 'flag'])
y = df['flag']

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3, random_state = 1)

In [6]:
#Using smote for balancing dataset
os = SMOTE(random_state = 1)

x_train_resampled, y_train_resampled = os.fit_resample(X = x_train, y = y_train)
y_train_resampled.value_counts()

flag
0    2025486
1    2025486
Name: count, dtype: int64

In [7]:
#Saving resulting dataframes to csv format to google drive
x_train_resampled.to_csv("./data/x_train_resampled.csv", index = False)
y_train_resampled.to_csv("./data/y_train_resampled.csv", index = False)
x_test.to_csv("./data/x_test.csv", index = False)
y_test.to_csv("./data/y_test.csv", index = False)
x_train.to_csv("./data/x_train.csv", index = False)
y_train.to_csv("./data/y_train.csv", index = False)

In [9]:
#Reading dataframe from csv file
x_train_resampled_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/x_train_resampled.csv")
y_train_resampled_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/y_train_resampled.csv")
x_test_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/x_test.csv")
y_test_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/y_test.csv")
x_train_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/x_train.csv")
y_train_sp = spark.read.option("header",True).option("delimiter",",").csv("./data/y_train.csv")

In [10]:
x_train_resampled_spark = x_train_resampled_sp.withColumn("index", monotonically_increasing_id())
y_train_resampled_spark = y_train_resampled_sp.withColumn("index", monotonically_increasing_id())

resampled_spark_df = x_train_resampled_spark.join(y_train_resampled_spark, on="index").drop("index")
resampled_spark_df.show()

+----+------------------+-------------------+----------------------+-----------------------+---------------------+-----------------------+-------------------------+--------------------------+------------------+------------------+-----------------------------+-----------------------+---------------------+---------------------+------------------+------------------+------------------+-------------------+----------------------------+------------------+--------------+----------------+-----------------+-----------------+---------------+----+
|  rn|  pre_since_opened|pre_since_confirmed|pre_loans_credit_limit|pre_loans_next_pay_summ|pre_loans_outstanding|pre_loans_total_overdue|pre_loans_max_overdue_sum|pre_loans_credit_cost_rate|    pre_over2limit| pre_maxover2limit|enc_loans_account_holder_type|enc_loans_credit_status|enc_loans_credit_type|enc_loans_account_cur|       pclose_flag|       fclose_flag|    total_enc_paym|   deviation_pfterm|deviation_till_close_pfdates|    total_overdues|is_zer

In [11]:
resampled_spark_df_formatted = resampled_spark_df.select(
    col("rn").cast("Double"),
    col("pre_since_confirmed").cast("Double"),
    col("pre_loans_credit_limit").cast("Double"),
    col("pre_loans_next_pay_summ").cast("Double"),
    col("pre_loans_outstanding").cast("Double"),
    col("pre_loans_total_overdue").cast("Double"),
    col("pre_loans_max_overdue_sum").cast("Double"),
    col("pre_loans_credit_cost_rate").cast("Double"),
    col("pre_over2limit").cast("Double"),
    col("pre_maxover2limit").cast("Double"),
    col("enc_loans_account_holder_type").cast("Double"),
    col("enc_loans_credit_status").cast("Double"),
    col("enc_loans_credit_type").cast("Double"),
    col("enc_loans_account_cur").cast("Int"),
    col("pclose_flag").cast("Int"),
    col("fclose_flag").cast("Int"),
    col("total_enc_paym").cast("Double"),
    col("deviation_pfterm").cast("Double"),
    col("deviation_till_close_pfdates").cast("Double"),
    col("total_overdues").cast("Double"),
    col("is_zero_loans5").cast("Int"),
    col("is_zero_loans530").cast("Int"),
    col("is_zero_loans3060").cast("Int"),
    col("is_zero_loans6090").cast("Int"),
    col("is_zero_loans90").cast("Int"),
    col("flag").cast("Int")
)

In [12]:
# Assemble features into a vector
feature_columns = resampled_spark_df_formatted.columns
feature_columns.remove('flag')
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_data_assembled = assembler.transform(resampled_spark_df_formatted).select("features", "flag")
train_data_assembled.show()

+--------------------+----+
|            features|flag|
+--------------------+----+
|[3.0,10.2,12.0,2....|   0|
|[8.5,8.8125,9.625...|   0|
|[10.5,9.35,11.5,3...|   0|
|[6.5,7.6666666666...|   0|
|[2.0,12.333333333...|   0|
|[3.0,5.6,7.2,2.0,...|   0|
|[5.5,7.4,9.3,2.4,...|   0|
|[6.0,6.0909090909...|   0|
|[7.5,9.6428571428...|   0|
|[12.5,6.291666666...|   0|
|[1.5,13.0,9.5,2.5...|   0|
|[6.0,9.1818181818...|   0|
|[3.0,5.6,13.2,2.0...|   0|
|[1.5,9.0,12.5,2.0...|   0|
|[2.5,11.0,9.25,2....|   0|
|[1.5,5.5,9.0,1.0,...|   0|
|[5.5,6.9,9.4,2.1,...|   0|
|[2.5,8.25,6.25,2....|   0|
|[5.5,8.1,8.2,1.9,...|   0|
|[8.0,9.2,10.66666...|   0|
+--------------------+----+
only showing top 20 rows



In [13]:
#Preparing test data
x_test_spark = x_test_sp.withColumn("index", monotonically_increasing_id())
y_test_spark = y_test_sp.withColumn("index", monotonically_increasing_id())

test_sp = x_test_spark.join(y_test_spark, on="index").drop("index")

test_sp_formatted = test_sp.select(
    col("rn").cast("Double"),
    col("pre_since_confirmed").cast("Double"),
    col("pre_loans_credit_limit").cast("Double"),
    col("pre_loans_next_pay_summ").cast("Double"),
    col("pre_loans_outstanding").cast("Double"),
    col("pre_loans_total_overdue").cast("Double"),
    col("pre_loans_max_overdue_sum").cast("Double"),
    col("pre_loans_credit_cost_rate").cast("Double"),
    col("pre_over2limit").cast("Double"),
    col("pre_maxover2limit").cast("Double"),
    col("enc_loans_account_holder_type").cast("Double"),
    col("enc_loans_credit_status").cast("Double"),
    col("enc_loans_credit_type").cast("Double"),
    col("enc_loans_account_cur").cast("Int"),
    col("pclose_flag").cast("Int"),
    col("fclose_flag").cast("Int"),
    col("total_enc_paym").cast("Double"),
    col("deviation_pfterm").cast("Double"),
    col("deviation_till_close_pfdates").cast("Double"),
    col("total_overdues").cast("Double"),
    col("is_zero_loans5").cast("Int"),
    col("is_zero_loans530").cast("Int"),
    col("is_zero_loans3060").cast("Int"),
    col("is_zero_loans6090").cast("Int"),
    col("is_zero_loans90").cast("Int"),
    col("flag").cast("Int")
)

test_data_assembled = assembler.transform(test_sp_formatted).select("features", "flag")
test_data_assembled.show()

+--------------------+----+
|            features|flag|
+--------------------+----+
|[2.5,6.75,7.0,3.5...|   0|
|[1.0,16.0,7.0,4.0...|   0|
|[4.0,5.7142857142...|   0|
|[2.5,10.0,14.75,2...|   0|
|[5.5,10.7,12.9,2....|   0|
|[5.0,5.5555555555...|   0|
|[1.5,9.0,9.5,1.5,...|   0|
|[6.5,9.3333333333...|   0|
|[13.0,8.56,8.56,2...|   0|
|[4.5,4.875,8.75,2...|   0|
|[2.5,7.25,4.75,1....|   0|
|[2.5,8.25,17.0,2....|   0|
|[5.0,7.7777777777...|   0|
|[11.5,8.863636363...|   0|
|[2.0,7.0,12.66666...|   0|
|[2.0,10.0,10.0,2....|   0|
|[7.5,8.8571428571...|   0|
|[1.5,9.0,11.0,2.5...|   0|
|[7.0,11.692307692...|   0|
|[9.5,8.5555555555...|   0|
+--------------------+----+
only showing top 20 rows



In [22]:
#Defining MLP classifier
layers_list = []
for i in range(50, 200, 5) : 
    for j in range(5, 200, 5) :
        layer = [len(feature_columns), i, j, 2]
        layers_list.append(layer)

def train_and_evaluate(layers) : 
    mlp = MultilayerPerceptronClassifier(layers = layers, seed = 1, labelCol = "flag", featuresCol = "features")

    mlp_model = mlp.fit(train_data_assembled)

    # Model evaluator
    evaluator = BinaryClassificationEvaluator(labelCol="flag", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

    # ROC AUC on the training data
    train_predictions = mlp_model.transform(train_data_assembled)
    train_auc = evaluator.evaluate(train_predictions)

    # ROC AUC on the test data
    test_predictions = mlp_model.transform(test_data_assembled)
    test_auc = evaluator.evaluate(test_predictions)

    return train_auc, test_auc

best_layers = None
best_train_auc = 0
best_test_auc = 0
best_auc_diff = float('inf')

for l in layers_list : 
    train_auc, test_auc = train_and_evaluate(l)
    auc_diff = abs(train_auc - test_auc)

    print(f"Layers: {l} - Train AUC: {train_auc} - Test AUC: {test_auc} - AUC Diff: {auc_diff}")

    if (train_auc + test_auc) / 2 > (best_train_auc + best_test_auc) / 2 and auc_diff < best_auc_diff:
        best_layers = l
        best_train_auc = train_auc
        best_test_auc = test_auc
        best_auc_diff = auc_diff

print(f"\nBest configuration - Layers: {best_layers} - Train AUC: {best_train_auc} - Test AUC: {best_test_auc} - AUC Diff: {best_auc_diff}")

Layers: [25, 50, 5, 2] - Train AUC: 0.5049078862141523 - Test AUC: 0.5842811469186086 - AUC Diff: 0.07937326070445627


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\User\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\User\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\User\AppData\Local\Programs\Python\Python312\Lib\socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 