In [8]:
import pandas as pd
import numpy as np
from pathlib import Path
from scipy.stats import uniform, randint
import matplotlib.pyplot as plt

from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.model_selection import (
    train_test_split,
    RandomizedSearchCV,
    GridSearchCV,
    StratifiedKFold,
)
from sklearn.metrics import (
    precision_score,
    recall_score,
    roc_auc_score,
    confusion_matrix,
    ConfusionMatrixDisplay,
    precision_recall_curve,
    PrecisionRecallDisplay,
)
from sklearn.tree import DecisionTreeClassifier

RAW_DATA_PATH = "../data/data.xlsx"

CAT_FEATURES = [
    "PSP",
    "country",
    "card",
    "amount_bins",
    "interaction_psp_country",
    "interaction_psp_card",
    "interaction_psp_amount_bin",
    "interaction_psp_3D_secured",
]

CYCLICAL_FEATURES = {"day": 31, "dow": 7, "hour": 24}

PSP_COSTS = {
    "Moneycard": {"success": 5, "failure": 2},
    "Goldcard": {"success": 10, "failure": 5},
    "UK_Card": {"success": 3, "failure": 1},
    "Simplecard": {"success": 1, "failure": 0.5},
}

PARAM_DIST = {
    "learning_rate": uniform(0.01, 0.2),
    "max_iter": randint(100, 500),
    "max_depth": randint(3, 15),
    "l2_regularization": uniform(0, 1),
    "min_samples_leaf": randint(20, 100),
}

In [3]:
def engineer_features(data: pd.DataFrame) -> tuple[pd.DataFrame, OneHotEncoder]:
    """Drop duplicates and generate Features."""
    data = data.copy()
    data = data.drop_duplicates()

    # Informationen aus Zeitstempel extrahieren
    data["month"] = data.loc[:, "tmsp"].dt.month.astype("int64")
    data["week"] = data.loc[:, "tmsp"].dt.isocalendar().week.astype("int64")
    data["day"] = data.loc[:, "tmsp"].dt.day.astype("int64")
    data["dow"] = data.loc[:, "tmsp"].dt.dayofweek.astype("int64")
    data["hour"] = data.loc[:, "tmsp"].dt.hour.astype("int64")
    data["second"] = data.loc[:, "tmsp"].dt.second.astype("int64")
    data["is_weekend"] = data["dow"] >= 5
    data["is_business_hours"] = (data["hour"] >= 8) & (data["hour"] < 20)

    # Zeit-Features zyklisch kodieren
    # week und month nicht zyklisch kodieren da kein Zyklusübergang
    for key, value in CYCLICAL_FEATURES.items():
        data[f"{key}_sin"] = np.sin(2 * np.pi * data[key] / value)
        data[f"{key}_cos"] = np.cos(2 * np.pi * data[key] / value)

    # Kosten
    data["cost_if_success"] = data["PSP"].map(lambda psp: PSP_COSTS[psp]["success"])
    data["cost_if_failure"] = data["PSP"].map(lambda psp: PSP_COSTS[psp]["failure"])

    # Wiederholte Transaktionsversuche aufgrund fehlgeschlagener Transaktionen
    data["timedelta"] = data["tmsp"].diff().dt.total_seconds().fillna(0).astype("int64")
    cols_to_compare = ["country", "amount", "3D_secured", "card"]
    data["is_retry"] = (data[cols_to_compare] == data[cols_to_compare].shift(1)).all(axis=1)
    data["is_retry"] = data["is_retry"] & (data["timedelta"] <= 60)

    # Anzahl kontinuierlicher Retry Versuche
    retry_groups = (~data["is_retry"]).cumsum()
    data["retry_count"] = data.groupby(retry_groups)["is_retry"].cumsum().astype("int64")

    # Wechsel PSP bei Retry
    data["PSP_switch"] = data.groupby(retry_groups)["PSP"].transform(
        lambda x: (x != x.shift()).fillna(False).cumsum() > 0
    )

    data["amount_bins"] = pd.cut(
        data["amount"],
        bins=[0, 200, 400, float("inf")],
        labels=["amount_under_200", "amount_200_400", "amount_over_400"],
        right=False,
    )

    # Feature interaktion: PSP und Country
    data["interaction_psp_country"] = data["PSP"] + "_" + data["country"]
    data["interaction_psp_card"] = data["PSP"] + "_" + data["card"]
    data["interaction_psp_amount_bin"] = data["PSP"] + "_" + data["amount_bins"].astype(str)
    data["interaction_psp_3D_secured"] = data["PSP"] + "_" + data["3D_secured"].astype(str)

    # kategorische Merkmale encodieren
    ohc = train_ohc_encoder(data=data[CAT_FEATURES])
    encoded_array = ohc.transform(data[CAT_FEATURES])
    encoded_columns = ohc.get_feature_names_out(CAT_FEATURES)
    encoded_df = pd.DataFrame(encoded_array, columns=encoded_columns, index=data.index)
    data = pd.concat([data, encoded_df], axis=1)

    # Timestamp und nicht kategorische features entfernen
    cat_features = CAT_FEATURES + ["tmsp"]
    data = data.drop(columns=cat_features, axis=1)

    return (data, ohc)

In [4]:
def train_ohc_encoder(data: pd.DataFrame) -> OneHotEncoder:
    """Trains and saves a OneHotEncoder for categorical features."""
    one_hot_encoder = OneHotEncoder(sparse_output=False, handle_unknown="warn")
    one_hot_encoder.fit(data)
    return one_hot_encoder


def train_decision_tree(
    x_train: pd.DataFrame,
    y_train: pd.Series,
) -> DecisionTreeClassifier:
    """Trains a Decision Tree Classifier."""
    decision_tree_model = DecisionTreeClassifier(max_depth=5, random_state=42)
    decision_tree_model.fit(x_train, y_train)

    return decision_tree_model


def train_hgboost(
    x_train: pd.DataFrame,
    y_train: pd.Series,
) -> HistGradientBoostingClassifier:
    """Trains a HGBoost Classifier."""
    hgboost_model = HistGradientBoostingClassifier(random_state=42, class_weight="balanced")
    hgboost_model.fit(x_train, y_train)

    return hgboost_model


def tune_hyperparameters(x_train: pd.DataFrame, y_train: pd.DataFrame) -> HistGradientBoostingClassifier:
    """Run randomized search to find the best hyperparameters."""
    hgboost_model = HistGradientBoostingClassifier(random_state=42, class_weight="balanced")
    cv_strategy = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    random_search = RandomizedSearchCV(
        estimator=hgboost_model,
        param_distributions=PARAM_DIST,
        n_iter=200,
        cv=cv_strategy,
        scoring="f1",
        n_jobs=-1,
        random_state=42,
        verbose=2,
    )

    random_search.fit(x_train, y_train)

    print(f"Beste Parameter gefunden: {random_search.best_params_}")
    print(f"Bester CV f1-Score: {random_search.best_score_:.4f}")

    final_model = HistGradientBoostingClassifier(**random_search.best_params_, random_state=42, class_weight="balanced")
    final_model.fit(x_train, y_train)

    return final_model


def calculate_success_probability(model, features: pd.DataFrame) -> float:
    """Calculates the success probability for a given model and features."""
    return model.predict_proba(features)[:, 1]

In [5]:
processed_data["amount_bins"] = pd.cut(
    processed_data["amount"],
    bins=[0, 200, 400, float("inf")],
    labels=["amount_under_200", "amount_200_400", "amount_over_400"],
    right=False,
)
processed_data["amount_bins"].info()

NameError: name 'processed_data' is not defined

In [9]:
raw_data = pd.read_excel(RAW_DATA_PATH, index_col=0)

In [10]:
processed_data, ohc = engineer_features(data=raw_data)

In [11]:
# split features and target
y = processed_data["success"]
X = processed_data.drop(columns=["success"], axis=1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [13]:
# Train models
dtm = train_decision_tree(x_train=X_train, y_train=y_train)
hgbm = train_hgboost(x_train=X_train, y_train=y_train)
ohgbm = tune_hyperparameters(x_train=X_train, y_train=y_train)

Exception in thread Thread-4 (_readerthread):
Traceback (most recent call last):
  File [35m"C:\Users\Erik\Miniconda3\Lib\threading.py"[0m, line [35m1041[0m, in [35m_bootstrap_inner[0m
    [31mself.run[0m[1;31m()[0m
    [31m~~~~~~~~[0m[1;31m^^[0m
  File [35m"C:\Users\Erik\Miniconda3\Lib\site-packages\ipykernel\ipkernel.py"[0m, line [35m766[0m, in [35mrun_closure[0m
    [31m_threading_Thread_run[0m[1;31m(self)[0m
    [31m~~~~~~~~~~~~~~~~~~~~~[0m[1;31m^^^^^^[0m
  File [35m"C:\Users\Erik\Miniconda3\Lib\threading.py"[0m, line [35m992[0m, in [35mrun[0m
    [31mself._target[0m[1;31m(*self._args, **self._kwargs)[0m
    [31m~~~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"C:\Users\Erik\Miniconda3\Lib\subprocess.py"[0m, line [35m1611[0m, in [35m_readerthread[0m
    buffer.append([31mfh.read[0m[1;31m()[0m)
                  [31m~~~~~~~[0m[1;31m^^[0m
  File [35m"C:\Users\Erik\Miniconda3\Lib\encodings\cp1252.py"[0m, line [

Fitting 5 folds for each of 200 candidates, totalling 1000 fits
Beste Parameter gefunden: {'l2_regularization': np.float64(0.7215965507512772), 'learning_rate': np.float64(0.01961892879281754), 'max_depth': 10, 'max_iter': 366, 'min_samples_leaf': 33}
Bester CV f1-Score: 0.4238


In [14]:
models_to_evaluate = {
    "decision_tree_model": dtm,
    "hgboost_model": hgbm,
    "optimized_hgboost_model": ohgbm,
}

In [None]:
for name, model in models_to_evaluate.items():
    precision, recall, accuracy, f1, roc_auc, cm = get_scores(name, model, y_test)

In [None]:
def get_scores(
    name: str,
    model: DecisionTreeClassifier | HistGradientBoostingClassifier,
    y_true: pd.Series,
    x_test: pd.DataFrame,
):
    y_pred = model.predict(x_test)
    precision = precision_score(y_true=y_true, y_pred=y_pred)
    recall = recall_score(y_true=y_true, y_pred=y_pred)
    roc_auc = roc_auc_score(y_true=y_true, y_score=y_pred)
    cm = confusion_matrix(y_true=y_true, y_pred=y_pred)

    return precision, recall, roc_auc, cm

In [None]:
predictions = ohgbm.predict_proba(X_test)[:, 1]
precision, recall, _ = precision_recall_curve(y_test, predictions)
disp = PrecisionRecallDisplay(precision=precision, recall=recall)
disp.plot()

In [None]:
# Feature Importance
result_hgbm = permutation_importance(hgbm, X_test, y_test, n_repeats=10, random_state=42, n_jobs=-1)
result_ohgbm = permutation_importance(ohgbm, X_test, y_test, n_repeats=10, random_state=42, n_jobs=-1)
perm_importance_df = pd.DataFrame(
    {
        "Feature": X_test.columns,
        "dtm": dtm.feature_importances_.round(4),
        "hgbm": result_hgbm.importances_mean.round(4),
        "ohgbm": result_ohgbm.importances_mean.round(4),
    }
).set_index("Feature")

In [None]:
perm_importance_df.ohgbm.sort_values(ascending=False)

In [None]:
# Business Evaluation
print("\n--- Model Evaluation ---")
for name, model in models_to_evaluate.items():
    print(f"\nEvaluating {name}")
    evaluate_technical_performance(model=model, x_test=X_test, y_test=y_test)
    evaluate_business_impact(model=model, x_test=X_test, y_test=y_test, original_data=raw_data)
print("\n--- Evaluation complete ---")

In [None]:
def plot_confusion_matrix(
    x_test: pd.DataFrame,
    y_test: pd.Series,
    model: HistGradientBoostingClassifier | DecisionTreeClassifier,
) -> None:
    preds = model.predict(x_test)
    cm = confusion_matrix(y_test, preds)
    labels = [0, 1]
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    print(cm)
    disp.plot()


def evaluate_technical_performance(model, x_test: pd.DataFrame, y_test: pd.DataFrame) -> None:
    """Evaluates the model on the test set by calculating accuracy."""
    y_pred_proba = calculate_success_probability(model, x_test)
    score = roc_auc_score(y_test, y_pred_proba)
    print(f"ROC AUC Score: {score:.4f}")


def _calculate_actual_costs(choices, y_true) -> float:
    """
    Helper function to calculate the total actual cost for a series of PSP choices
    based on the true transaction outcomes.
    """
    total_cost = 0

    for index, psp_choice in choices.items():
        if psp_choice in PSP_COSTS:
            cost_dict = PSP_COSTS[psp_choice]
            # Use the true outcome (y_true) to determine the actual cost
            actual_cost = cost_dict["success"] if y_true.loc[index] else cost_dict["failure"]
            total_cost += actual_cost
    return total_cost


def evaluate_business_impact(model, x_test: pd.DataFrame, y_test: pd.DataFrame, original_data: pd.DataFrame) -> None:
    """Evaluates and compares the financial outcome of the model's routing strategy
    against the legacy system's strategy on the test set."""
    all_model_columns = x_test.columns.tolist()
    expected_costs_df = pd.DataFrame(index=x_test.index)

    for psp in PSP_COSTS:
        simulated_features = x_test.copy()

        for col in all_model_columns:
            if col.startswith("PSP_"):
                simulated_features[col] = 0
        simulated_features[f"PSP_{psp}"] = 1
        simulated_features = simulated_features.reindex(columns=all_model_columns, fill_value=0)
        prob_success = calculate_success_probability(model, simulated_features)

        expected_costs_df[psp] = (
            prob_success * PSP_COSTS[psp]["success"] + (1 - prob_success) * PSP_COSTS[psp]["failure"]
        )

    # Calculate Model Strategy Cost
    model_choices = expected_costs_df.idxmin(axis=1)
    total_cost_model = _calculate_actual_costs(model_choices, y_test)

    # Calculate Legacy System Cost
    legacy_choices = original_data.loc[x_test.index, "PSP"]
    total_cost_legacy = _calculate_actual_costs(legacy_choices, y_test)

    # Report the Financial Outcome
    savings = total_cost_legacy - total_cost_model
    savings_percent = (savings / total_cost_legacy) * 100 if total_cost_legacy > 0 else 0

    print(f"  Legacy System Cost: {total_cost_legacy:,.2f} €")
    print(f"  Model Strategy Cost: {total_cost_model:,.2f} €")
    print(f"  Savings: {savings:,.2f} € ({savings_percent:.2f}%)")

In [None]:
tmp = processed_data["is_retry"] & processed_data["success"]
tmp.value_counts()

In [None]:
processed_data["is_retry"].value_counts()

In [None]:
processed_data.groupby("is_retry").head()