<a href="https://www.kaggle.com/code/aleksandrmorozov123/credit-risk-models?scriptVersionId=193634345" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# import the required libraries

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# get the data and summary statistics

df = pd.read_csv ('/kaggle/input/default-of-credit-card-clients-dataset/UCI_Credit_Card.csv')
df.describe().transpose ().round (2)

In [None]:
COLS_TO_PLOT = ["AGE", "LIMIT_BAL", "PAY_6"]
pair_plot = sns.pairplot(df[COLS_TO_PLOT], kind="reg", diag_kind="kde", height=4, 
                         plot_kws={"line_kws":{"color":"red"}})
pair_plot.fig.suptitle("Pairplot of selected variables")

In [None]:
pair_plot = sns.pairplot(data=df, x_vars=COLS_TO_PLOT, 
                         y_vars=COLS_TO_PLOT, hue="SEX",
                         height=4)
pair_plot.fig.suptitle("Pairplot of selected variables")

In [None]:
ax = sns.jointplot(data=df, x = "AGE", y= "LIMIT_BAL", hue="SEX", height=10)
ax.fig.suptitle("Age vs. limit balance")

**Function for plotting the correaltion heatmap**

In [None]:
def plot_correlation_matrix(corr_mat):
    sns.set(style="white")
    mask = np.zeros_like(corr_mat, dtype=bool)
    mask[np.triu_indices_from(mask)] = True
    fig, ax = plt.subplots()
    cmap = sns.diverging_palette(240, 10, n=9, as_cmap=True)
    sns.heatmap(corr_mat, mask=mask, cmap=cmap, 
                vmax=.3, center=0, square=True,
                linewidths=.5, cbar_kws={"shrink": .5}, ax=ax)
    ax.set_title("Correlation Matrix", fontsize=16)
    sns.set(style="darkgrid")
    
corr_mat = df.select_dtypes(include="number").corr()
plot_correlation_matrix(corr_mat)

In [None]:
ax = sns.boxplot(data=df, y="AGE", x="MARRIAGE", hue="SEX")
ax.set_title("Distribution of age")

In [None]:
ax = sns.violinplot(x="EDUCATION", y="LIMIT_BAL", hue="SEX", split=True, data=df)
ax.set_title("Distribution of limit balance per education level", fontsize=16)

In [None]:
ax = df.groupby("EDUCATION")["default.payment.next.month"].value_counts(normalize=True).unstack() \
.plot(kind="barh", stacked="True")
ax.set_title("Percentage of default per education level", fontsize=16)
ax.legend(title="Default", bbox_to_anchor=(1,1))

**Training the model**

In [None]:
from sklearn.model_selection import train_test_split

X = df.copy()
y = X.pop("default.payment.next.month")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, shuffle = False)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, stratify = y, random_state = 42)

print("Target distribution - train")
print(y_train.value_counts(normalize = True).values)
print("Target distribution - test")
print(y_test.value_counts(normalize = True).values)

In [None]:
# define the size of the validation and test sets
VALID_SIZE = 0.1
TEST_SIZE = 0.2

# create the initial split - training and temp
X_train, X_temp, y_train, y_temp = train_test_split(X, y,
                                                    test_size=(VALID_SIZE + TEST_SIZE),
                                                    stratify=y,random_state=42)

# calculate the new test size
new_test_size = np.around(TEST_SIZE / (VALID_SIZE + TEST_SIZE), 2)

# create the valid and test sets
X_valid, X_test, y_valid, y_test = train_test_split(X_temp, y_temp,
                                                    test_size=new_test_size,
                                                    stratify=y_temp,
                                                    random_state=42)

print("Percentage of data in each set ----")
print(f"Train: {100 * len(X_train) / len(X):.2f}%")
print(f"Valid: {100 * len(X_valid) / len(X):.2f}%")
print(f"Test: {100 * len(X_test) / len(X):.2f}%")
print("")
print("Class distribution in each set ----")
print(f"Train: {y_train.value_counts(normalize=True).values}")
print(f"Valid: {y_valid.value_counts(normalize=True).values}")
print(f"Test: {y_test.value_counts(normalize=True).values}")

In [None]:
# dealing with missing values
import missingno as msno
from sklearn.impute import SimpleImputer

X.info()

In [None]:
# check the missing values (we don't have this)
msno.matrix(X)

**Decision tree classifier**

In [None]:
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn import metrics

tree_classifier = DecisionTreeClassifier(random_state=42)
tree_classifier.fit(X_train, y_train)
y_pred = tree_classifier.predict(X_test)

plot_tree(tree_classifier, max_depth=3, fontsize=10)

In [None]:
plot_tree(tree_classifier,max_depth=2,
          feature_names=X_train.columns,
          class_names=["No default", "Default"],
          rounded=True, filled=True, fontsize=10
)

In [None]:
# function for performance evaluation report
from sklearn import metrics
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

def performance_evaluation_report(model, X_test, y_test, show_plot=False, labels=None, show_pr_curve=False):
    """
    Function for creating a performance report of a classification model.
    
    Parameters
    ----------
    model : scikit-learn estimator
        A fitted estimator for classification problems.
    X_test : pd.DataFrame
        DataFrame with features matching y_test
    y_test : array/pd.Series
        Target of a classification problem.
    show_plot : bool
        Flag whether to show the plot
    labels : list
        List with the class names.
    show_pr_curve : bool
        Flag whether to also show the PR-curve. For this to take effect, 
        show_plot must be True.
        
    Return
    ------
    stats : pd.Series
        A series with the most important evaluation metrics
    """

    y_pred = model.predict(X_test)
    y_pred_prob = model.predict_proba(X_test)[:, 1]

    cm = metrics.confusion_matrix(y_test, y_pred)
    tn, fp, fn, tp = cm.ravel()

    fpr, tpr, _ = metrics.roc_curve(y_test, y_pred_prob)
    roc_auc = metrics.auc(fpr, tpr)

    precision, recall, _ = metrics.precision_recall_curve(
        y_test, y_pred_prob)
    pr_auc = metrics.auc(recall, precision)

    if show_plot:

        if labels is None:
            labels = ["Negative", "Positive"]

        N_SUBPLOTS = 3 if show_pr_curve else 2
        PLOT_WIDTH = 20 if show_pr_curve else 12
        PLOT_HEIGHT = 5 if show_pr_curve else 6

        fig, ax = plt.subplots(
            1, N_SUBPLOTS, figsize=(PLOT_WIDTH, PLOT_HEIGHT))
        fig.suptitle("Performance Evaluation", fontsize=16)

        # plot 1: confusion matrix ----
        
        # preparing more descriptive labels for the confusion matrix
        cm_counts = [f"{val:0.0f}" for val in cm.flatten()]
        cm_percentages = [f"{val:.2%}" for val in cm.flatten()/np.sum(cm)]
        cm_labels = [f"{v1}\n{v2}" for v1, v2 in zip(cm_counts,cm_percentages)]
        cm_labels = np.asarray(cm_labels).reshape(2,2)

        sns.heatmap(cm, annot=cm_labels, fmt="", linewidths=.5, cmap="Greens", 
                    square=True, cbar=False, ax=ax[0],
                    annot_kws={"ha": "center", "va": "center"})
        ax[0].set(xlabel="Predicted label",
                  ylabel="Actual label", title="Confusion Matrix")
        ax[0].xaxis.set_ticklabels(labels)
        ax[0].yaxis.set_ticklabels(labels)

        # plot 2: ROC curve ----
        
        metrics.RocCurveDisplay.from_estimator(model, X_test, y_test, ax=ax[1], name="")
        ax[1].set_title("ROC Curve")
        ax[1].plot(fp/(fp+tn), tp/(tp+fn), "ro",
                   markersize=8, label="Decision Point")
        ax[1].plot([0, 1], [0, 1], "r--")
        
        # alternatively:
        # ax[1].plot(fpr, tpr, "b-", label=f"ROC-AUC = {roc_auc:.2f}")
        # ax[1].set(xlabel="False Positive Rate",
        #           ylabel="True Positive Rate", title="ROC Curve")
        # ax[1].plot(fp/(fp+tn), tp/(tp+fn), "ro",
        #            markersize=8, label="Decision Point")
        # ax[1].plot([0, 1], [0, 1], "r--")
        # ax[1].legend(loc="lower right")
        
        # plot 3: Precision-Recall curve ----

        if show_pr_curve:

            metrics.PrecisionRecallDisplay.from_estimator(model, X_test, y_test, ax=ax[2], name="")
            ax[2].set_title("Precision-Recall Curve")
            
            # alternatively:
            # ax[2].plot(recall, precision, label=f"PR-AUC = {pr_auc:.2f}")
            # ax[2].set(xlabel="Recall", ylabel="Precision",
            #           title="Precision-Recall Curve")
            # ax[2].legend()

    stats = {
        "accuracy": metrics.accuracy_score(y_test, y_pred),
        "precision": metrics.precision_score(y_test, y_pred),
        "recall": metrics.recall_score(y_test, y_pred),
        "specificity": (tn / (tn + fp)),
        "f1_score": metrics.f1_score(y_test, y_pred),
        "cohens_kappa": metrics.cohen_kappa_score(y_test, y_pred),
        "matthews_corr_coeff": metrics.matthews_corrcoef(y_test, y_pred),
        "roc_auc": roc_auc,
        "pr_auc": pr_auc,
        "average_precision": metrics.average_precision_score(y_test, y_pred_prob)
    }

    return stats

In [None]:
LABELS = ["No Default", "Default"]
tree_perf = performance_evaluation_report(tree_classifier,
                                          X_test,y_test, labels=LABELS,
                                          show_plot=True)

**Evaluation metrics of classification**

In [None]:
y_pred_prob = tree_classifier.predict_proba(X_test)[:, 1]
precision, recall, _ = metrics.precision_recall_curve(y_test, y_pred_prob)

ax = plt.subplot()
ax.plot(recall, precision,
        label=f"PR-AUC = {metrics.auc(recall, precision):.2f}")
ax.set(title="Precision-Recall Curve", xlabel="Recall", ylabel="Precision")
ax.legend()

In [None]:
ax = metrics.PrecisionRecallDisplay.from_estimator(tree_classifier, X_test, y_test)
ax.ax_.set_title("Precision-Recall Curve")

**Creating pipeline**

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline


df = pd.read_csv("/kaggle/input/default-of-credit-card-clients-dataset/UCI_Credit_Card.csv", na_values="")
X = df.copy()
y = X.pop("default.payment.next.month")
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.2,
                                                    stratify=y,
                                                    random_state=42
)

num_features = X_train.columns.to_list()
num_pipeline = Pipeline(steps=[("imputer", SimpleImputer(strategy="median"))])

In [None]:
preprocessor = ColumnTransformer(transformers=[("numerical", num_pipeline, num_features)],
                                 remainder="drop")

dec_tree = DecisionTreeClassifier(random_state=42)
tree_pipeline = Pipeline(steps=[("preprocessor", preprocessor),
                                ("classifier", dec_tree)
])

tree_pipeline.fit(X_train, y_train)

In [None]:
LABELS = ["No Default", "Default"]
tree_perf = performance_evaluation_report(tree_pipeline, X_test,
                                          y_test, labels=LABELS,
                                          show_plot=True)

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class OutlierRemover(BaseEstimator, TransformerMixin):
    def __init__(self, n_std=3):
        self.n_std = n_std
    def fit(self, X, y = None):
        if np.isnan(X).any(axis=None):
            raise ValueError("""Missing values in the array! Please remove them.""")
        mean_vec = np.mean(X, axis=0)
        std_vec = np.std(X, axis=0)
        self.upper_band_ = pd.Series(mean_vec + self.n_std * std_vec)
        self.upper_band_ = (
        self.upper_band_.to_frame().transpose())
        self.lower_band_ = pd.Series(
        mean_vec - self.n_std * std_vec)
        self.lower_band_ = (
        self.lower_band_.to_frame().transpose())
        self.n_features_ = len(self.upper_band_.columns)
        return self

    def transform(self, X, y = None):
        X_copy = pd.DataFrame(X.copy())
        upper_band = pd.concat([self.upper_band_] * len(X_copy),
                               ignore_index=True)
        lower_band = pd.concat([self.lower_band_] * len(X_copy),
                               ignore_index=True)
        X_copy[X_copy >= upper_band] = upper_band
        X_copy[X_copy <= lower_band] = lower_band
        return X_copy.values

In [None]:
num_pipeline = Pipeline(steps=[("imputer", SimpleImputer(strategy="median")),("outliers", OutlierRemover())])
preprocessor = ColumnTransformer(transformers=[("numerical", num_pipeline, num_features)],remainder="drop")
dec_tree = DecisionTreeClassifier(random_state=42)
tree_pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", dec_tree)])
tree_pipeline.fit(X_train, y_train)

tree_perf = performance_evaluation_report(tree_pipeline, X_test,
                                          y_test, labels=LABELS,
                                          show_plot=True)

In [None]:
tree_pipeline.named_steps
tree_pipeline.named_steps["classifier"]
(tree_pipeline.named_steps["preprocessor"].named_transformers_["numerical"]["outliers"].upper_band_)

In [None]:
from sklearn.model_selection import (GridSearchCV, cross_val_score,
                                     RandomizedSearchCV, cross_validate,
                                     StratifiedKFold)
from sklearn import metrics
k_fold = StratifiedKFold(5, shuffle=True, random_state=42)
cross_val_score(tree_pipeline, X_train, y_train, cv=k_fold)

In [None]:
cv_scores = cross_validate(tree_pipeline, X_train, y_train, cv=k_fold,
                           scoring=["accuracy", "precision", "recall","roc_auc"])
pd.DataFrame(cv_scores)

In [None]:
param_grid = {
    "classifier__criterion": ["entropy", "gini"],
    "classifier__max_depth": range(3, 11),
    "classifier__min_samples_leaf": range(2, 11),
    "preprocessor__numerical__outliers__n_std": [3, 4]
}

In [None]:
classifier_gs = GridSearchCV(tree_pipeline, param_grid,scoring="recall", 
                             cv=k_fold, n_jobs=-1, verbose=1)
classifier_gs.fit(X_train, y_train)

In [None]:
LABELS = ["No Default", "Default"]
tree_gs_perf = performance_evaluation_report(
    classifier_gs, X_test,
    y_test, labels=LABELS,
    show_plot=True
)

In [None]:
classifier_rs = RandomizedSearchCV(tree_pipeline, param_grid, scoring="recall", 
                                   cv=k_fold, n_jobs=-1, verbose=1,
                                   n_iter=100, random_state=42)
classifier_rs.fit(X_train, y_train)
print(f"Best parameters: {classifier_rs.best_params_}")
print(f"Recall (Training set): {classifier_rs.best_score_:.4f}")
print(f"Recall (Test set): {metrics.recall_score(y_test, classifier_rs.predict(X_test)):.4f}")

In [None]:
from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import HalvingGridSearchCV

classifier_sh = HalvingGridSearchCV(tree_pipeline, param_grid,
                                    scoring="recall", cv=k_fold,
                                    n_jobs=-1, verbose=1,
                                    min_resources="exhaust", factor=3)
classifier_sh.fit(X_train, y_train)

In [None]:
from sklearn.ensemble import RandomForestClassifier

param_grid = [
    {"classifier": [RandomForestClassifier(random_state=42)],
     "classifier__n_estimators": np.linspace(100, 500, 10, dtype=int),
     "classifier__max_depth": range(3, 11),
     "preprocessor__numerical__outliers__n_std": [3, 4]},
    {"classifier": [DecisionTreeClassifier(random_state=42)],
     "classifier__criterion": ["entropy", "gini"],
     "classifier__max_depth": range(3, 11),
     "classifier__min_samples_leaf": range(2, 11),
     "preprocessor__numerical__outliers__n_std": [3, 4]}
]

classifier_gs_2 = GridSearchCV(tree_pipeline, param_grid,
                               scoring="recall", cv=k_fold,
                               n_jobs=-1, verbose=1)
classifier_gs_2.fit(X_train, y_train)

print(f"Best parameters: {classifier_gs_2.best_params_}")
print(f"Recall (Training set): {classifier_gs_2.best_score_:.4f}")
print(f"Recall (Test set): {metrics.recall_score(y_test, classifier_gs_2.predict(X_test)):.4f}")

In [None]:
pd.DataFrame(classifier_gs_2.cv_results_).sort_values("rank_test_score")

**More advanced methods for training the model**

In [None]:
from sklearn.ensemble import (RandomForestClassifier, GradientBoostingClassifier)
from xgboost.sklearn import XGBClassifier
from lightgbm import LGBMClassifier

rf = RandomForestClassifier(random_state=42)
rf_pipeline = Pipeline(steps=[("preprocessor", preprocessor),
                              ("classifier", rf)])
rf_pipeline.fit(X_train, y_train)

rf_perf = performance_evaluation_report(rf_pipeline, X_test,
                                        y_test, labels=LABELS,
                                        show_plot=True,
                                        show_pr_curve=True)

In [None]:
gbt = GradientBoostingClassifier(random_state=42)
gbt_pipeline = Pipeline(
steps=[("preprocessor", preprocessor), ("classifier", gbt)])
gbt_pipeline.fit(X_train, y_train)

gbt_perf = performance_evaluation_report(gbt_pipeline, X_test,
                                         y_test, labels=LABELS,
                                         show_plot=True,
                                         show_pr_curve=True)

In [None]:
xgb = XGBClassifier(random_state=42)
xgb_pipeline = Pipeline(
steps=[("preprocessor", preprocessor), ("classifier", xgb)])
xgb_pipeline.fit(X_train, y_train)

xgb_perf = performance_evaluation_report(xgb_pipeline, X_test,
                                         y_test, labels=LABELS,
                                         show_plot=True,
                                         show_pr_curve=True)

In [None]:
lgbm = LGBMClassifier(random_state=42)
lgbm_pipeline = Pipeline(steps=[("preprocessor", preprocessor),("classifier", lgbm)])
lgbm_pipeline.fit(X_train, y_train)

lgbm_pipeline.fit(X_train, y_train)
lgbm_perf = performance_evaluation_report(lgbm_pipeline, X_test,
                                          y_test, labels=LABELS,
                                          show_plot=True,
                                          show_pr_curve=True)

**Handling with imbalanced data**

In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import RobustScaler
from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler
from imblearn.ensemble import BalancedRandomForestClassifier

RANDOM_STATE = 42
df = pd.read_csv("/kaggle/input/fraud-detection/fraudTrain.csv")
X = df.copy().drop(columns=["trans_date_trans_time"])
y = X.pop("Class")
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=RANDOM_STATE
)

robust_scaler = RobustScaler()
X_train = robust_scaler.fit_transform(X_train)
X_test = robust_scaler.transform(X_test)
rf = RandomForestClassifier(
random_state=RANDOM_STATE, n_jobs=-1
)
rf.fit(X_train, y_train)

rus = RandomUnderSampler(random_state=RANDOM_STATE)
X_rus, y_rus = rus.fit_resample(X_train, y_train)
rf.fit(X_rus, y_rus)
rf_rus_perf = performance_evaluation_report(rf, X_test, y_test)

KeyError: 'Class'