In [None]:
import pandas as pd
import os
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import classification_report, roc_curve, roc_auc_score
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
import shap
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier
from typing import cast
import pickle

In [None]:
data_path = 'data/'
random_state = 42

In [None]:
df = pd.read_csv(os.path.join(data_path, 'data.csv'))
df['Bankrupt?'] = df['Bankrupt?'].astype(bool)
df.info()

In [None]:
colllll = ['Net Income to Total Assets',
'ROA(A) before interest and % after tax',
'Operating Gross Margin',
'Current Ratio',
'Total debt/Total net worth',
'Total Asset Turnover',
'Cash Flow Per Share']

In [None]:
df[colllll].describe()

In [None]:
df.head()

In [None]:
scaler = StandardScaler()
X = scaler.fit_transform(df[df.columns[1:]])
y = df['Bankrupt?'].to_numpy()
df_normalized = pd.DataFrame(np.column_stack((y, X)), columns=df.columns)

In [None]:
df_normalized.describe()

In [None]:
plt.figure(figsize=(12, 12))
sns.heatmap(df_normalized.corr(numeric_only=True))
plt.title('Correlation plot between columns')
plt.show()

In [None]:
# extracted from xgboost feature importance to have cleaner EDA
important_cols = ['ROA(C) before interest and depreciation before interest',
 'ROA(A) before interest and % after tax',
 'Operating Gross Margin',
 'ROA(B) before interest and depreciation after tax',
 'Realized Sales Gross Margin',
 'Research and development expense rate',
 'Cash Flow Per Share',
 'Pre-tax net Interest Rate',
 'After-tax net Interest Rate',
 'Continuous interest rate (after tax)',
 'Cash flow rate',
 'Operating Profit Rate',
 'Net Value Per Share (A)',
 'Non-industry income and expenditure/revenue',
 'Net Value Per Share (B)',
 'Operating Expense Rate',
 'Tax rate (A)',
 'Interest-bearing debt interest rate',
 'Net Value Per Share (C)',
 'Persistent EPS in the Last Four Seasons']
pred_column = ['Bankrupt?']

In [None]:
plt.figure(figsize=(12, 12))
sns.heatmap(df_normalized[pred_column + important_cols].corr(numeric_only=True), annot=True, fmt='.2f')
plt.title('Correlation plot between columns')
plt.show()

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df_normalized[df_normalized.columns[1:]], y, test_size=0.1, random_state=random_state)

## Random Forest Classifier

In [None]:
roc_curves = []

In [None]:
rf_classifier = RandomForestClassifier()
rf_classifier.fit(X_train, y_train)

In [None]:
y_pred = rf_classifier.predict(X_test)

cm = confusion_matrix(y_test, y_pred, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.title("Random Forest Confusion Matrix")
plt.show()

In [None]:
y_proba = rf_classifier.predict_proba(X_test)[:, 1]

roc_curves.append(('RandForest', roc_curve(y_test, y_proba), roc_auc_score(y_test, y_proba)))
RocCurveDisplay.from_predictions(y_test, y_proba)
plt.title("Random Forest ROC Curve")
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

## Support Vector Machine

In [None]:
svc = SVC(probability=True)
svc.fit(X_train, y_train)

In [None]:
y_pred = svc.predict(X_test)

cm = confusion_matrix(y_test, y_pred, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.title("Support Vector Machine Confusion Matrix")
plt.show()

In [None]:
y_proba = svc.predict_proba(X_test)[:, 1]

roc_curves.append(('SVM', roc_curve(y_test, y_proba), roc_auc_score(y_test, y_proba)))
RocCurveDisplay.from_predictions(y_test, y_proba)
plt.title("Support Vector Machine ROC Curve")
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

## Logistic Regression

In [None]:
logr = LogisticRegression(max_iter=1000)

logr.fit(X_train, y_train)
y_pred = logr.predict(X_test)

In [None]:
cm = confusion_matrix(y_test, y_pred, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.title("Logistic Regression Confusion Matrix")
plt.show()

In [None]:
y_proba = logr.predict_proba(X_test)[:, 1]
y_proba_logr = y_proba

roc_curves.append(('LogReg', roc_curve(y_test, y_proba), roc_auc_score(y_test, y_proba)))
RocCurveDisplay.from_predictions(y_test, y_proba)
plt.title("Logistic Regression ROC Curve")
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

This one seems to be the best

## XGBoost

In [None]:
xgb = XGBClassifier()
xgb.fit(X_train, y_train)

y_pred = xgb.predict(X_test)

In [None]:
cm = confusion_matrix(y_test, y_pred, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.title("XGBoost Confusion Matrix")
plt.show()

In [None]:
y_proba = xgb.predict_proba(X_test)[:, 1]

roc_curves.append(('XGBoost', roc_curve(y_test, y_proba), roc_auc_score(y_test, y_proba)))
RocCurveDisplay.from_predictions(y_test, y_proba)
plt.title("XGBoost ROC Curve")
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
important_cols = df_normalized.columns[1:][np.argsort(xgb.feature_importances_)[::-1]][:20].to_list()
important_cols

## Simplified XGBoost model

In [None]:
xgb_s = XGBClassifier()
xgb_s.fit(X_train[important_cols], y_train)

y_pred = xgb_s.predict(X_test[important_cols])

In [None]:
cm = confusion_matrix(y_test, y_pred, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.title("XGBoost (Simplified) Confusion Matrix")
plt.show()

In [None]:
y_proba = xgb_s.predict_proba(X_test[important_cols])[:, 1]

roc_curves.append(('XGBoost_s', roc_curve(y_test, y_proba), roc_auc_score(y_test, y_proba)))
RocCurveDisplay.from_predictions(y_test, y_proba)
plt.title("XGBoost (Simplified) ROC Curve")
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
for name, curve, auc in roc_curves:
    plt.plot(curve[0],curve[1],label=f"{name} {auc=:.3f}")

plt.title('ROC Curves of all tried models')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.show()

## XAI

In [None]:
explainer = shap.LinearExplainer(
    logr,
    X_test,
    model_output='raw'
)
shap_values = explainer.shap_values(X_test)
shap_values

In [None]:
shap.summary_plot(shap_values, X_test)

In [None]:
bankrupt_predicted_test_idxs = np.argwhere(y_proba_logr > 0.5).flatten()
bankrupt_predicted_test_idxs

In [None]:
y_proba_logr[bankrupt_predicted_test_idxs]

In [None]:
baseline_logit = explainer.expected_value
baseline_prob = 1 / (1 + np.exp(-baseline_logit))
baseline_logit, baseline_prob

In [None]:
shap_values_arr = np.array(shap_values)
bankrupt_logit = np.sum(shap_values_arr[bankrupt_predicted_test_idxs[1]]) + baseline_logit
bankrupt_prob = 1 / (1 + np.exp(-bankrupt_logit))
bankrupt_logit, bankrupt_prob

In [None]:
shap_values_prob = np.zeros((len(shap_values_arr), 2))
for i, entry_values in enumerate(shap_values_arr):
    val = np.sum(shap_values_arr[i])
    shap_values_prob[i, 0] = val + baseline_logit
    shap_values_prob[i, 1] = 1 / (1 + np.exp(-val-baseline_logit))

In [None]:
xmin = -20; xmax = 20
ymin = 0; ymax = 1
plt.figure(figsize=(14, 8))
plt.hlines([0.5], xmin=xmin, xmax=xmax, colors=['r'], linestyles='dashed', label='Threshold')
plt.scatter(shap_values_prob[:, 0], shap_values_prob[:, 1], label='Test sample probabilities')
plt.title('Shap Values vs probability of test sample')
plt.xticks(range(-20, 26, 5))
plt.yticks(np.arange(0, 1.1, 0.1))
plt.xlabel('Shap value')
plt.ylabel('Probability')
plt.legend()
plt.show()

In [None]:
# snippet from gemini to debug explainer problem
# For binary classification, shap_values is a list: [shap_values_class_0, shap_values_class_1]
if isinstance(shap_values, list):
    # We plot the contributions toward the positive class (logit of P(Y=1))
    shap_values_positive = shap_values[1]
else:
    # If the model only had one output (e.g., a custom wrapper), use it directly
    shap_values_positive = shap_values

# 2. Generate the modern bar plot
# The bar plot aggregates the mean absolute SHAP value for each feature.
shap.plots.bar(
    shap.Explanation(
        shap_values_positive, 
        base_values=explainer.expected_value, 
        data=X_test.values, 
        feature_names=X_test.columns.tolist()
    )
)

In [None]:
# generated with gemini from a sample of code, then refactored it to a function
def plot_waterfall(explainer, shap_values, X_test, i):
    # 1. Select the SHAP values for the positive class (Class 1) and the specific sample
    if isinstance(shap_values, list):
        # Get the row corresponding to sample 'i' from the positive class array
        shap_values_i = shap_values[1][i, :]
    else:
        shap_values_i = shap_values[i, :]

    # 2. Get the feature values for that specific sample
    data_i = X_test.iloc[i].values

    # 3. Create the Explanation object for the single sample
    # The base_values and feature_names come from the overall explainer/dataset
    explanation_i = shap.Explanation(
        values=shap_values_i,
        base_values=explainer.expected_value,
        data=data_i,
        feature_names=X_test.columns.tolist()
    )

    # 4. Generate the modern waterfall plot
    shap.plots.waterfall(explanation_i)

In [None]:
plot_waterfall(explainer, shap_values, X_test, bankrupt_predicted_test_idxs[0])

In [None]:
plot_waterfall(explainer, shap_values, X_test, 5)

## Saving data and models

In [None]:
if isinstance(X_test, np.ndarray):
    X_test = pd.DataFrame(X_test, columns=df.columns[1:]) 

X_test.to_csv('assets/X_test.csv', index=False)
np.save('assets/y_test.npy', y_test)

with open('assets/logreg_model.pkl', 'wb') as f:
    pickle.dump(logr, f)

model_probas = {
    'Random Forest': rf_classifier.predict_proba(X_test)[:, 1],
    'SVM': svc.predict_proba(X_test)[:, 1],
    'Logistic Regression': logr.predict_proba(X_test)[:, 1],
    'XGBoost': xgb.predict_proba(X_test)[:, 1],
    'XGBoost (Simplified)': xgb_s.predict_proba(X_test[important_cols])[:, 1]
}

with open('assets/model_comparison_probas.pkl', 'wb') as f:
    pickle.dump(model_probas, f)

df.sample(1000, random_state=42).to_csv('assets/df_sample.csv', index=False)

In [None]:
with open('assets/scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

In [None]:
df_normalized[df_normalized.columns[1:]].median().to_csv('assets/median_values.csv')