In [19]:
# =========================
# Minimaler Endlauf: Profilbericht α/β (zweistufig) – MIT DETAILWERTEN
# =========================

import os, numpy as np, pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from tensorflow.keras.models import load_model

# --- Pfade ---
DATA_PATH   = "C:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/data/german.data"
MODEL_PATH  = "C:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/models/german_credit_model.keras"
PROFILES_CSV= "C:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/diverse_profiles.csv"
REPORT_PATH = "C:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/profilbericht_alpha_beta_twostage_details.md"

# (optional) Details zusätzlich auch als CSV persistieren?
WRITE_ALPHA_DETAILS_CSV = True
WRITE_BETA_DETAILS_CSV  = True
ALPHA_DETAILS_CSV = "c:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/alpha_details.csv"
BETA_DETAILS_CSV  = "c:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/beta_details.csv"

# =========================
# Laden & Vorverarbeitung
# =========================
df = pd.read_csv(DATA_PATH, header=None, sep=r"\s+")
df.columns = [
    "Status_des_Girokontos", "Dauer_in_Monaten", "Kreditgeschichte", "Kreditverwendungszweck",
    "Kreditbetrag", "Sparkonto_Wertpapiere", "Beschäftigt_seit", "Ratenhöhe",
    "Familienstand_Geschlecht", "Weitere_Bürgen_Schuldner", "Wohnsitzdauer", "Vermögen", "Alter",
    "Andere_Ratenverpflichtungen", "Wohnsituation", "Anzahl_bestehender_Kredite", "Beruf",
    "Unterhaltspflichtige_Personen", "Telefon", "Ausländischer_Arbeiter", "Ziel"
]
df["Ziel"] = df["Ziel"].map({1: 1, 2: 0}).astype(int)

X_all = df.drop(columns=["Ziel"])
y_all = df["Ziel"].values

# Numerisch/Kategorisch wie in deinem Setup
numerical_cols = [
    "Dauer_in_Monaten", "Kreditbetrag", "Ratenhöhe", "Wohnsitzdauer",
    "Alter", "Anzahl_bestehender_Kredite", "Unterhaltspflichtige_Personen"
]
categorical_cols = [c for c in X_all.columns if c not in numerical_cols]

preprocessor = ColumnTransformer([
    ("num", StandardScaler(), numerical_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_cols),
])
preprocessor.fit(X_all)

# --- Modell laden ---
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Modelldatei fehlt: {MODEL_PATH}")
model = load_model(MODEL_PATH)

# --- Profile laden & Spalten ausrichten ---
profiles_df = pd.read_csv(PROFILES_CSV)
extra_cols = [c for c in profiles_df.columns if c not in X_all.columns]
if extra_cols:
    profiles_df = profiles_df.drop(columns=extra_cols)
profiles_df = profiles_df[X_all.columns]
profiles_df[numerical_cols] = profiles_df[numerical_cols].apply(pd.to_numeric, errors="coerce").astype(float)

# =========================
# Feste Feature-Gruppen (gemäß deiner schriftlichen Definition)
# =========================
econ_num = ['Kreditbetrag', 'Dauer_in_Monaten', 'Ratenhöhe', 'Anzahl_bestehender_Kredite']
econ_cat = ['Kreditgeschichte', 'Kreditverwendungszweck', 'Sparkonto_Wertpapiere', 'Vermögen',
            'Status_des_Girokontos', 'Beschäftigt_seit', 'Andere_Ratenverpflichtungen', 'Weitere_Bürgen_Schuldner']

socio_num = ['Alter', 'Wohnsitzdauer', 'Unterhaltspflichtige_Personen']
socio_cat = ['Familienstand_Geschlecht', 'Wohnsituation', 'Telefon', 'Ausländischer_Arbeiter', 'Beruf']

# =========================
# α / β – Funktionen (MIT DETAILS)
# =========================
def _predict_label_and_proba(model, preprocessor, row_df, threshold=0.5):
    Xp = preprocessor.transform(row_df)
    proba = float(model.predict(Xp, verbose=0)[0, 0])
    label = int(proba >= threshold)
    return label, proba

def _numeric_grid_around(value, lo, hi, n_steps=9):
    grid = np.linspace(lo, hi, n_steps)
    return [v for v in grid if abs(v - value) > 1e-12]

# ---- α: Notwendigkeit, mit exakten CF-Werten ----
def alpha_test_numeric(model, preprocessor, df_row, feature, bounds, threshold=0.5,
                       n_steps=9, clip_min=None):
    orig_label, orig_proba = _predict_label_and_proba(model, preprocessor, df_row, threshold)
    lo, hi = bounds
    orig_value = float(df_row[feature].iloc[0])
    test_values = _numeric_grid_around(orig_value, lo, hi, n_steps=n_steps)

    best = None
    for v in test_values:
        v_clip = float(max(v, clip_min)) if clip_min is not None else float(v)
        mod_row = df_row.copy()
        mod_row.at[df_row.index[0], feature] = np.float64(v_clip)
        new_label, new_proba = _predict_label_and_proba(model, preprocessor, mod_row, threshold)
        if new_label != orig_label:
            change = abs(v_clip - orig_value)
            if (best is None) or (change < best["delta_abs"]):
                best = {
                    "is_necessary": True,
                    "original_label": int(orig_label),
                    "original_proba": float(orig_proba),
                    "original_value": float(orig_value),
                    "cf_value": float(v_clip),
                    "delta_abs": float(change),
                    "cf_label": int(new_label),
                    "cf_proba": float(new_proba),
                }

    if best is None:
        return {
            "is_necessary": False,
            "original_label": int(orig_label),
            "original_proba": float(orig_proba),
            "original_value": float(orig_value),
            "cf_value": None,
            "delta_abs": None,
            "cf_label": None,
            "cf_proba": None,
        }
    return best

def alpha_test_categorical(model, preprocessor, df_row, feature, all_categories, threshold=0.5):
    orig_label, orig_proba = _predict_label_and_proba(model, preprocessor, df_row, threshold)
    current_cat = df_row[feature].iloc[0]

    for cat in all_categories:
        if cat == current_cat:
            continue
        mod_row = df_row.copy()
        mod_row.at[df_row.index[0], feature] = cat
        new_label, new_proba = _predict_label_and_proba(model, preprocessor, mod_row, threshold)
        if new_label != orig_label:
            return {
                "is_necessary": True,
                "original_label": int(orig_label),
                "original_proba": float(orig_proba),
                "original_value": current_cat,
                "cf_value": cat,
                "delta_abs": None,  # nicht sinnvoll für Kategorien
                "cf_label": int(new_label),
                "cf_proba": float(new_proba),
            }

    return {
        "is_necessary": False,
        "original_label": int(orig_label),
        "original_proba": float(orig_proba),
        "original_value": current_cat,
        "cf_value": None,
        "delta_abs": None,
        "cf_label": None,
        "cf_proba": None,
    }

def run_alpha_for_profiles(
    model, preprocessor, df_all, profiles_df,
    features_numeric, features_categorical,
    threshold=0.5, q_lo=0.05, q_hi=0.95, n_steps=9
):
    if not features_numeric and not features_categorical:
        return pd.DataFrame(index=profiles_df.index), {}

    num_bounds = {feat: (float(df_all[feat].quantile(q_lo)),
                         float(df_all[feat].quantile(q_hi)))
                  for feat in features_numeric}
    cat_values = {feat: sorted(df_all[feat].dropna().unique().tolist())
                  for feat in features_categorical}

    rows, details = [], {}
    for i in profiles_df.index:
        row = profiles_df.loc[[i]]
        row_result = {}
        for feat in features_numeric:
            res = alpha_test_numeric(model, preprocessor, row, feature=feat,
                                     bounds=num_bounds[feat], threshold=threshold,
                                     n_steps=n_steps, clip_min=0.0)
            row_result[feat] = int(res["is_necessary"])
            details[(i, feat)] = res

        for feat in features_categorical:
            res = alpha_test_categorical(model, preprocessor, row, feature=feat,
                                         all_categories=cat_values[feat], threshold=threshold)
            row_result[feat] = int(res["is_necessary"])
            details[(i, feat)] = res

        row_result["profile_index"] = i
        rows.append(row_result)

    results_table = pd.DataFrame(rows).set_index("profile_index").sort_index()
    return results_table, details

# ---- β: Suffizienz, mit fixiertem Wert a + Zählung ----
def _predict_label_and_proba_batch(model, preprocessor, df, threshold=0.5):
    Xp = preprocessor.transform(df)
    proba = model.predict(Xp, verbose=0).reshape(-1)
    labels = (proba >= threshold).astype(int)
    return labels, proba

def beta_test_feature(
    model, preprocessor, df_all, df_row, feature, value=None,
    threshold=0.5, n_samples=2000, random_state=42
):
    rng = np.random.default_rng(random_state)
    y_star, p_star = _predict_label_and_proba(model, preprocessor, df_row, threshold)
    a = df_row[feature].iloc[0] if value is None else value

    sample_idx = rng.integers(0, df_all.shape[0], size=n_samples)
    Z = df_all.iloc[sample_idx].copy()
    if pd.api.types.is_numeric_dtype(df_all[feature]):
        Z[feature] = float(a)
    else:
        Z[feature] = a

    y_hat, proba = _predict_label_and_proba_batch(model, preprocessor, Z, threshold)
    same = (y_hat == y_star).astype(int)
    n_same = int(same.sum())
    n_diff = int(len(same) - n_same)
    beta = float(n_same / len(same))

    return {
        "feature": feature,
        "a": float(a) if pd.api.types.is_numeric_dtype(df_all[feature]) else a,
        "beta": beta,
        "y_star": int(y_star),
        "proba_star": float(p_star),
        "n_samples": int(n_samples),
        "n_same": n_same,
        "n_diff": n_diff,
    }

def run_beta_for_profiles(
    model, preprocessor, df_all, profiles_df, features_numeric, features_categorical,
    threshold=0.5, n_samples=2000, random_state=42
):
    features = (features_numeric or []) + (features_categorical or [])
    if not features:
        return pd.DataFrame(index=profiles_df.index), {}

    rows, details = [], {}
    for i in profiles_df.index:
        row = profiles_df.loc[[i]]
        row_result = {}
        for feat in features:
            res = beta_test_feature(
                model, preprocessor, df_all, row, feat,
                value=None, threshold=threshold,
                n_samples=n_samples, random_state=random_state + i,
            )
            row_result[feat] = res["beta"]
            details[(i, feat)] = res
        row_result["profile_index"] = i
        rows.append(row_result)

    beta_table = pd.DataFrame(rows).set_index("profile_index").sort_index()
    return beta_table, details

# =========================
# Report-Builder (Markdown) – MIT DETAILSPALTEN
# =========================
def _fmt_prob(p):
    if p is None:
        return "—"
    return f"{p:.3f}"

def _fmt_val(v):
    """Robuste Formatierung: Zahlen kompakt, Kategorien/Strings als String."""
    if v is None:
        return "—"
    # Numerisch?
    try:
        # np.bool_ abfangen, weil bool auch float-castbar ist
        if isinstance(v, (bool, np.bool_)):
            return str(v)
        fv = float(v)
        # Unendlichkeiten vermeiden
        if np.isfinite(fv):
            return f"{fv:.4g}"
        else:
            return str(v)
    except (TypeError, ValueError):
        # Nicht numerisch -> als String ausgeben
        return str(v)

def _alpha_detail_cell(alpha_det: dict):
    """
    Kompakte Zelle für α-Details.
    - Für Numeric: orig → cf (Δ=...) | p:o→n | y:o→n
    - Für Kategorial: orig → cf | p:o→n | y:o→n (Δ nicht sinnvoll -> "—")
    """
    if not alpha_det:
        return "—"

    is_nec = alpha_det.get("is_necessary", False)

    if not is_nec:
        return f"orig={_fmt_val(alpha_det.get('original_value'))} | keine CF-Änderung"

    # notwendig -> wir haben einen cf_value
    orig_val = _fmt_val(alpha_det.get("original_value"))
    cf_val   = _fmt_val(alpha_det.get("cf_value"))
    delta    = _fmt_val(alpha_det.get("delta_abs"))  # bei Kategorien "—"
    p_o      = _fmt_prob(alpha_det.get("original_proba"))
    p_n      = _fmt_prob(alpha_det.get("cf_proba"))
    y_o      = alpha_det.get("original_label", "—")
    y_n      = alpha_det.get("cf_label", "—")

    return f"orig={orig_val} → cf={cf_val} (Δ={delta} | p:{p_o}→{p_n} | y:{y_o}→{y_n})"

def _beta_detail_cell(beta_det: dict):
    """
    Kompakte Zelle für β-Details.
    Zeigt den fixierten Wert a (auch kategorial ok), β, same/diff und y*, p*.
    """
    if not beta_det:
        return "—"
    a_val = _fmt_val(beta_det.get("a"))
    beta  = beta_det.get("beta", None)
    beta_s = f"{beta:.4f}" if isinstance(beta, (float, int, np.floating, np.integer)) else "—"
    n_same = beta_det.get("n_same", "—")
    n_diff = beta_det.get("n_diff", "—")
    y_star = beta_det.get("y_star", "—")
    p_star = _fmt_prob(beta_det.get("proba_star"))
    return f"a={a_val} | β={beta_s} | same/diff={n_same}/{n_diff} | y*={y_star} (p={p_star})"


def build_two_stage_profile_report_md(
    alpha_econ: pd.DataFrame,
    alpha_econ_details: dict,
    beta_econ: pd.DataFrame,
    beta_econ_details: dict,
    econ_features_all: list,
    alpha_socio: pd.DataFrame,
    alpha_socio_details: dict,
    beta_socio: pd.DataFrame,
    beta_socio_details: dict,
    socio_features_all: list,
    suff_threshold: float = 0.7,
    round_beta: int = 4,
    out_path: str = REPORT_PATH
):
    # Sicherstellen, dass die Indizes übereinstimmen
    idx_ref = alpha_econ.index
    assert idx_ref.equals(beta_econ.index)
    assert idx_ref.equals(alpha_socio.index)
    assert idx_ref.equals(beta_socio.index)

    econ_feats = [f for f in econ_features_all if f in alpha_econ.columns and f in beta_econ.columns]
    socio_feats = [f for f in socio_features_all if f in alpha_socio.columns and f in beta_socio.columns]

    md = []
    md.append("# Profilbericht (zweistufig): α/β je Profil – Ökonomisch vs. Soziodemographisch\n")
    md.append(f"- **Anzahl Profile:** {len(idx_ref)}")
    md.append(f"- **Hinreichend-Schwelle:** β ≥ {suff_threshold}")
    md.append(f"- **Ökonomische Features:** {', '.join(econ_feats) if econ_feats else '(keine)'}")
    md.append(f"- **Soziodemographische Features:** {', '.join(socio_feats) if socio_feats else '(keine)'}\n")

    # Überblick je Profil (Anzahlen)
    overview_rows = []
    for pid in idx_ref:
        row = {"profile_index": pid}
        row["econ_alpha_cnt"] = int(alpha_econ.loc[pid, econ_feats].sum()) if econ_feats else 0
        row["econ_hinr_cnt"]  = int((beta_econ.loc[pid, econ_feats] >= suff_threshold).sum()) if econ_feats else 0
        row["socio_alpha_cnt"] = int(alpha_socio.loc[pid, socio_feats].sum()) if socio_feats else 0
        row["socio_hinr_cnt"]  = int((beta_socio.loc[pid, socio_feats] >= suff_threshold).sum()) if socio_feats else 0
        overview_rows.append(row)
    overview_df = pd.DataFrame(overview_rows).set_index("profile_index")
    md.append("## Überblick (Anzahl notwendiger / hinreichender Features je Stufe)\n")
    md.append(overview_df.to_markdown())
    md.append("\n---\n")

    # Details je Profil
    md.append("## Details je Profil\n")
    for pid in idx_ref:
        md.append(f"\n### Profil {pid}\n")

        if econ_feats:
            # Tabelle mit α/β + Detail-Spalten
            rows = []
            for feat in econ_feats:
                alpha_det = alpha_econ_details.get((pid, feat), {})
                beta_det  = beta_econ_details.get((pid, feat), {})
                rows.append({
                    "Feature": feat,
                    "alpha_notwendig": int(alpha_econ.loc[pid, feat]),
                    "beta": round(float(beta_econ.loc[pid, feat]), round_beta),
                    f"α-Details (orig→cf)": _alpha_detail_cell(alpha_det),
                    f"β-Details (fixierter Wert)": _beta_detail_cell(beta_det),
                    f"hinreichend (β≥{suff_threshold:.2f})": int(float(beta_econ.loc[pid, feat]) >= suff_threshold),
                })
            df_e = pd.DataFrame(rows)
            md.append("\n**Ökonomische Features**\n")
            md.append(df_e.to_markdown(index=False))

        if socio_feats:
            rows = []
            for feat in socio_feats:
                alpha_det = alpha_socio_details.get((pid, feat), {})
                beta_det  = beta_socio_details.get((pid, feat), {})
                rows.append({
                    "Feature": feat,
                    "alpha_notwendig": int(alpha_socio.loc[pid, feat]),
                    "beta": round(float(beta_socio.loc[pid, feat]), round_beta),
                    f"α-Details (orig→cf)": _alpha_detail_cell(alpha_det),
                    f"β-Details (fixierter Wert)": _beta_detail_cell(beta_det),
                    f"hinreichend (β≥{suff_threshold:.2f})": int(float(beta_socio.loc[pid, feat]) >= suff_threshold),
                })
            df_s = pd.DataFrame(rows)
            md.append("\n**Soziodemographische/sonstige Features**\n")
            md.append(df_s.to_markdown(index=False))

        md.append("\n---")

    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        f.write("\n".join(md))

# =========================
# AUSFÜHRUNG: Stufe 1 & 2 + Bericht
# =========================
alpha_econ,  alpha_econ_details  = run_alpha_for_profiles(
    model, preprocessor, df_all=X_all, profiles_df=profiles_df,
    features_numeric=econ_num, features_categorical=econ_cat,
    threshold=0.5, q_lo=0.05, q_hi=0.95, n_steps=9
)
beta_econ,   beta_econ_details   = run_beta_for_profiles(
    model, preprocessor, df_all=X_all, profiles_df=profiles_df,
    features_numeric=econ_num, features_categorical=econ_cat,
    threshold=0.5, n_samples=2000, random_state=42
)
alpha_socio, alpha_socio_details = run_alpha_for_profiles(
    model, preprocessor, df_all=X_all, profiles_df=profiles_df,
    features_numeric=socio_num, features_categorical=socio_cat,
    threshold=0.5, q_lo=0.05, q_hi=0.95, n_steps=9
)
beta_socio,  beta_socio_details  = run_beta_for_profiles(
    model, preprocessor, df_all=X_all, profiles_df=profiles_df,
    features_numeric=socio_num, features_categorical=socio_cat,
    threshold=0.5, n_samples=2000, random_state=4242
)

# (optional) Details als CSV persistieren – gut für Nachvollziehbarkeit/Audit
if WRITE_ALPHA_DETAILS_CSV:
    rows = []
    for (pid, feat), d in alpha_econ_details.items():
        dd = d.copy(); dd.update({"profile_index": pid, "feature": feat, "gruppe": "oekonomisch"})
        rows.append(dd)
    for (pid, feat), d in alpha_socio_details.items():
        dd = d.copy(); dd.update({"profile_index": pid, "feature": feat, "gruppe": "sozio"})
        rows.append(dd)
    alpha_details_df = pd.DataFrame(rows)
    alpha_details_df.to_csv(ALPHA_DETAILS_CSV, index=False)

if WRITE_BETA_DETAILS_CSV:
    rows = []
    for (pid, feat), d in beta_econ_details.items():
        dd = d.copy(); dd.update({"profile_index": pid, "feature": feat, "gruppe": "oekonomisch"})
        rows.append(dd)
    for (pid, feat), d in beta_socio_details.items():
        dd = d.copy(); dd.update({"profile_index": pid, "feature": feat, "gruppe": "sozio"})
        rows.append(dd)
    beta_details_df = pd.DataFrame(rows)
    beta_details_df.to_csv(BETA_DETAILS_CSV, index=False)

# Report erzeugen (mit Detailspalten)
build_two_stage_profile_report_md(
    alpha_econ=alpha_econ, alpha_econ_details=alpha_econ_details,
    beta_econ=beta_econ,   beta_econ_details=beta_econ_details,
    econ_features_all=econ_num+econ_cat,
    alpha_socio=alpha_socio, alpha_socio_details=alpha_socio_details,
    beta_socio=beta_socio,   beta_socio_details=beta_socio_details,
    socio_features_all=socio_num+socio_cat,
    suff_threshold=0.7, round_beta=4, out_path=REPORT_PATH
)

print("Feature-Gruppen:")
print("Ökonomisch – numerisch:", econ_num)
print("Ökonomisch – kategorisch:", econ_cat)
print("Soziodemographisch – numerisch:", socio_num)
print("Soziodemographisch – kategorisch:", socio_cat)
print(f"💾 Profilbericht (mit Details) gespeichert unter: {REPORT_PATH}")
if WRITE_ALPHA_DETAILS_CSV:
    print(f"💾 Alpha-Details CSV: {ALPHA_DETAILS_CSV}")
if WRITE_BETA_DETAILS_CSV:
    print(f"💾 Beta-Details CSV:  {BETA_DETAILS_CSV}")

Feature-Gruppen:
Ökonomisch – numerisch: ['Kreditbetrag', 'Dauer_in_Monaten', 'Ratenhöhe', 'Anzahl_bestehender_Kredite']
Ökonomisch – kategorisch: ['Kreditgeschichte', 'Kreditverwendungszweck', 'Sparkonto_Wertpapiere', 'Vermögen', 'Status_des_Girokontos', 'Beschäftigt_seit', 'Andere_Ratenverpflichtungen', 'Weitere_Bürgen_Schuldner']
Soziodemographisch – numerisch: ['Alter', 'Wohnsitzdauer', 'Unterhaltspflichtige_Personen']
Soziodemographisch – kategorisch: ['Familienstand_Geschlecht', 'Wohnsituation', 'Telefon', 'Ausländischer_Arbeiter', 'Beruf']
💾 Profilbericht (mit Details) gespeichert unter: C:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/profilbericht_alpha_beta_twostage_details.md
💾 Alpha-Details CSV: c:/Users/JonasNiehus/Documents/Masterarbeit/Evaluation/Masterarbeit_Guetekriterien-sichere-und-interpetierbare-Hochrisiko-KI-Systeme/results/alpha_details.csv
💾 Beta-Details CSV:  c:/Users/Jo