In [1]:
import os

import numpy as np
import pandas as pd
import xgboost as xgb
from rich import print
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    precision_recall_curve,
    roc_auc_score
)
from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split
from sklearn.pipeline import Pipeline


pd.set_option('display.max_rows', 500)

In [2]:
data_dir = "./data"

data_frames = []

for file_name in os.listdir(data_dir):
    if file_name.endswith(".csv"):
        df = pd.read_csv(os.path.join(data_dir, file_name), sep=";", decimal=",", na_values="NA")
        if "Unnamed: 0" in df.columns:
            df = df.drop(columns=["Unnamed: 0"])

        parts = file_name.split("_")
        bankruptcy_status = 0 if "nonbankrupt" in parts[0] else 1
        industry = parts[1]

        df["industry_agriculture"] = 1 if industry == "agriculture" else 0
        df["industry_construction"] = 1 if industry == "construction" else 0
        df["industry_manufacture"] = 1 if industry == "manufacture" else 0
        df["industry_retail"] = 1 if industry == "retail" else 0
        df["bankruptcy_label"] = bankruptcy_status

        data_frames.append(df)

combined_df = pd.concat(data_frames, ignore_index=True)


In [3]:
print(combined_df.describe())

In [4]:

def build_and_evaluate_model(df: pd.DataFrame) -> tuple:
    """Build and evaluate an XGBoost classifier on the provided DataFrame.

    Performs median imputation on missing values, splits the data using stratified
    sampling to maintain class imbalance, computes the appropriate scale_pos_weight,
    and validates the model using stratified cross-validation. Also tunes the decision
    threshold based on the F1 score.

    Args:
        df (pd.DataFrame): DataFrame containing feature columns and a target column
                           'bankruptcy_label'.

    Returns:
        tuple: (X_test with industry info, y_test, y_pred_proba from the test set)
    """
    X = df.drop(columns=["bankruptcy_label"])
    y = df["bankruptcy_label"]

    negatives = (y == 0).sum()
    positives = (y == 1).sum()
    scale_pos_weight = negatives / positives

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, stratify=y, test_size=0.2, random_state=42
    )

    pipeline = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("classifier", xgb.XGBClassifier(
            scale_pos_weight=scale_pos_weight,
            eval_metric="auc",
            random_state=42,
            n_jobs=-1
        ))
    ])

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    cv_scores = cross_val_score(
        pipeline, X_train, y_train, cv=cv, scoring="roc_auc", n_jobs=-1
    )

    pipeline.fit(X_train, y_train)

    y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
    y_pred = pipeline.predict(X_test)
    test_auc = roc_auc_score(y_test, y_pred_proba)

    precision, recall, thresholds = precision_recall_curve(y_test, y_pred_proba)
    f1_scores = 2 * precision * recall / (precision + recall + 1e-8)
    optimal_idx = np.argmax(f1_scores)
    optimal_threshold = thresholds[optimal_idx] if optimal_idx < len(thresholds) else 0.5
    y_pred_optimal = (y_pred_proba >= optimal_threshold).astype(int)

    print("Cross-validation AUC scores:", cv_scores)
    print("Mean CV AUC score:", cv_scores.mean())
    print("Test AUC score:", test_auc)
    print("Optimal threshold based on F1 score:", optimal_threshold)
    print("Confusion Matrix (default threshold):\n", confusion_matrix(y_test, y_pred))
    print("Classification Report (default threshold):\n", classification_report(y_test, y_pred))
    print("Confusion Matrix (optimal threshold):\n", confusion_matrix(y_test, y_pred_optimal))
    print("Classification Report (optimal threshold):\n", classification_report(y_test, y_pred_optimal))

    return X_test, y_test, y_pred_proba


def evaluate_by_industry(X_test: pd.DataFrame, y_test: pd.Series, y_pred_proba: np.ndarray,
                         threshold: float = 0.5) -> None:
    """Evaluate and print model performance metrics per industry.

    Derives an 'industry' column from one-hot encoded industry columns and groups the
    test set by industry. For each industry, it prints the support, ROC AUC, confusion
    matrix, and classification report.

    Args:
        X_test (pd.DataFrame): Test features DataFrame including industry one-hot columns.
        y_test (pd.Series): True labels for the test set.
        y_pred_proba (np.ndarray): Predicted probabilities for the positive class.
        threshold (float): Decision threshold for converting probabilities to class labels.
    """
    # Determine industry label from one-hot columns.
    def get_industry(row: pd.Series) -> str:
        if row.get("industry_agriculture", 0) == 1:
            return "agriculture"
        if row.get("industry_construction", 0) == 1:
            return "construction"
        if row.get("industry_manufacture", 0) == 1:
            return "manufacture"
        if row.get("industry_retail", 0) == 1:
            return "retail"
        return "unknown"

    industries = X_test.apply(get_industry, axis=1)
    # Compute predicted labels based on the chosen threshold.
    y_pred = (y_pred_proba >= threshold).astype(int)
    eval_df = pd.DataFrame({
        "industry": industries,
        "y_true": y_test.values,
        "y_pred": y_pred,
        "y_pred_proba": y_pred_proba
    })

    for industry, group in eval_df.groupby("industry"):
        print(f"\n--- Industry: {industry} ---")
        print("Support:", len(group))
        try:
            industry_auc = roc_auc_score(group["y_true"], group["y_pred_proba"])
        except Exception as err:
            industry_auc = None
        print("ROC AUC:", industry_auc)
        print("Confusion Matrix:")
        print(confusion_matrix(group["y_true"], group["y_pred"]))
        print("Classification Report:")
        print(classification_report(group["y_true"], group["y_pred"]))


X_test, y_test, y_pred_proba = build_and_evaluate_model(combined_df)
evaluate_by_industry(X_test, y_test, y_pred_proba, threshold=0.5)
