In [None]:
# eda.py
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split

# -------------------------------------------------------------------
# KONFIG
# -------------------------------------------------------------------

BASE_DIR = os.path.dirname(os.path.abspath(__file__))   # katalog z tym plikiem
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, ".."))
DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")

# EDA_wyniki w BASE (np. .../EDA/EDA_wyniki)
SAVE_DIR = os.path.join(BASE_DIR, "EDA_wyniki")
os.makedirs(SAVE_DIR, exist_ok=True)

# -------------------------------------------------------------------
# WCZYTANIE DANYCH
# -------------------------------------------------------------------

df = pd.read_csv(DATA_PATH)

print("\n=== Podstawowe informacje ===")
print(df.info())
print(df.describe())

# ===================================================================
# 1) BAR PLOT: liczba default√≥w
# ===================================================================

plt.figure(figsize=(6, 4))
df["default"].value_counts().sort_index().plot(kind="bar", color=["green", "red"])
plt.title("Rozk≈Çad default√≥w")
plt.xlabel("Default (0 = good, 1 = bad)")
plt.ylabel("Liczba obserwacji")
plt.tight_layout()
plt.savefig(os.path.join(SAVE_DIR, "default_distribution.png"), dpi=150)
plt.close()

# ===================================================================
# 2) Podzia≈Ç train/val/test + bar plot
# ===================================================================

X = df.drop(columns=["default"])
y = df["default"]

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.4, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

sizes = {
    "Train": len(X_train),
    "Validation": len(X_val),
    "Test": len(X_test),
}

plt.figure(figsize=(6, 4))
plt.bar(sizes.keys(), sizes.values(), color=["blue", "orange", "green"])
plt.title("Liczno≈õƒá zbior√≥w: train / val / test")
plt.ylabel("Liczba obserwacji")
plt.tight_layout()
plt.savefig(os.path.join(SAVE_DIR, "train_val_test_counts.png"), dpi=150)
plt.close()

# ===================================================================
# 3) Braki danych
# ===================================================================

missing = df.isna().sum()
missing = missing[missing > 0]

print("\n=== Kolumny z brakami ===")
print(missing)

missing.to_csv(os.path.join(SAVE_DIR, "missing_values.csv"))

# ===================================================================
# 4) Korelacje ‚Äì heatmap dla 10 zmiennych NAJBARDZIEJ skorelowanych z defaultem
# ===================================================================

# bierzemy tylko kolumny numeryczne
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
numeric_cols = [c for c in numeric_cols if c != "default"]

# korelacje z defaultem
corr_full = df[numeric_cols + ["default"]].corr()
corr_with_target = corr_full["default"].drop("default")

top10_vars = corr_with_target.abs().sort_values(ascending=False).head(10).index.tolist()

print("\nTop 10 zmiennych najbardziej skorelowanych z defaultem:")
print(top10_vars)

corr_top10 = df[top10_vars + ["default"]].corr()

plt.figure(figsize=(10, 8))
sns.heatmap(corr_top10, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Heatmap korelacji ‚Äì top 10 zmiennych vs default")
plt.tight_layout()
plt.savefig(os.path.join(SAVE_DIR, "correlation_heatmap_top10.png"), dpi=150)
plt.close()

# ===================================================================
# 5) Scatter ploty dla najmocniejszych zmiennych
#     ‚Äì wybieramy 3 najmocniej skorelowane z defaultem
# ===================================================================

target_corr_top = corr_top10["default"].abs().sort_values(ascending=False)
strong_vars = target_corr_top.index[1:4]  # pomijamy 'default' na pozycji 0

print("\nNajsilniej skorelowane zmienne z defaultem (do scatter√≥w):")
print(strong_vars)

# Parowe scatter ploty
for i in range(len(strong_vars)):
    for j in range(i + 1, len(strong_vars)):
        v1 = strong_vars[i]
        v2 = strong_vars[j]

        plt.figure(figsize=(6, 5))
        sns.scatterplot(
            data=df,
            x=v1,
            y=v2,
            hue="default",
            palette={0: "green", 1: "red"},
            alpha=0.6,
        )
        plt.title(f"Scatter: {v1} vs {v2} (kolor=default)")
        plt.tight_layout()
        plt.savefig(os.path.join(SAVE_DIR, f"scatter_{v1}_{v2}.png"), dpi=150)
        plt.close()

print("\nEDA zapisane do folderu:", SAVE_DIR)


In [None]:
# eda_transformers.py
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler




class InfinityReplacer(BaseEstimator, TransformerMixin):
    """Zamienia inf/-inf na NaN."""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        return X.replace([np.inf, -np.inf], np.nan)

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech - bez zmian."""
        if input_features is None:
            return None
        return np.array(input_features)


class HighMissingDropper(BaseEstimator, TransformerMixin):
    """Usuwa kolumny z liczbƒÖ brak√≥w przekraczajƒÖcƒÖ threshold."""

    def __init__(self, missing_threshold=0.95):
        self.missing_threshold = missing_threshold

    def fit(self, X, y=None):
        missing_ratio = X.isnull().mean()
        self.cols_to_drop_ = missing_ratio[missing_ratio > self.missing_threshold].index.tolist()
        if len(self.cols_to_drop_) > 0:
            print(f"üóëÔ∏è Zapamiƒôtano {len(self.cols_to_drop_)} kolumn do usuniƒôcia (braki > {self.missing_threshold*100:.0f}%)")
        return self

    def transform(self, X):
        X = X.copy()
        return X.drop(columns=self.cols_to_drop_, errors='ignore')

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po usuniƒôciu kolumn."""
        if input_features is None:
            return None
        return np.array([col for col in input_features if col not in self.cols_to_drop_])


class MissingIndicator(BaseEstimator, TransformerMixin):
    """Dodaje flagi *_missing dla kolumn z brakami."""

    def fit(self, X, y=None):
        self.cols_with_missing_ = X.columns[X.isnull().any()].tolist()
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.cols_with_missing_:
            if col in X.columns:
                X[f"{col}_missing"] = X[col].isnull().astype(int)
        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech + flagi _missing."""
        if input_features is None:
            return None
        output_cols = list(input_features)
        for col in self.cols_with_missing_:
            if col in input_features:
                output_cols.append(f"{col}_missing")
        return np.array(output_cols)


class CustomImputer(BaseEstimator, TransformerMixin):
    """Imputacja: numeryczne -> mediana, kategoryczne -> moda."""

    def __init__(self):
        self.imputer_num_ = None
        self.imputer_cat_ = None
        self.num_cols_ = None
        self.cat_cols_ = None

    def fit(self, X, y=None):
        self.num_cols_ = X.select_dtypes(include=[np.number]).columns.tolist()
        bool_cols = X.select_dtypes(include=[bool]).columns.tolist()
        self.num_cols_ = [col for col in self.num_cols_ if col not in bool_cols]

        self.cat_cols_ = X.select_dtypes(exclude=[np.number, np.bool_]).columns.tolist()

        if len(self.num_cols_) > 0:
            self.imputer_num_ = SimpleImputer(strategy="median")
            self.imputer_num_.fit(X[self.num_cols_])

        if len(self.cat_cols_) > 0:
            self.imputer_cat_ = SimpleImputer(strategy="most_frequent")
            self.imputer_cat_.fit(X[self.cat_cols_])

        return self

    def transform(self, X):
        X = X.copy()

        if self.imputer_num_ is not None and len(self.num_cols_) > 0:
            X[self.num_cols_] = self.imputer_num_.transform(X[self.num_cols_])

        if self.imputer_cat_ is not None and len(self.cat_cols_) > 0:
            X[self.cat_cols_] = self.imputer_cat_.transform(X[self.cat_cols_])

        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech - bez zmian."""
        if input_features is None:
            return None
        return np.array(input_features)


class Winsorizer(BaseEstimator, TransformerMixin):
    """Winsoryzacja (obcina warto≈õci skrajne na podstawie kwantyli)."""

    def __init__(self, lower_q=0.02, upper_q=0.98):
        self.lower_q = lower_q
        self.upper_q = upper_q

    def fit(self, X, y=None):
        num_cols = X.select_dtypes(include=[np.number]).columns
        bool_cols = X.select_dtypes(include=[bool]).columns
        num_cols = [col for col in num_cols 
                    if col not in bool_cols and not col.endswith("_missing")]

        self.limits_ = {}
        for col in num_cols:
            lower = X[col].quantile(self.lower_q)
            upper = X[col].quantile(self.upper_q)
            self.limits_[col] = (lower, upper)

        return self

    def transform(self, X):
        X = X.copy()
        for col, (lower, upper) in self.limits_.items():
            if col in X.columns:
                X[col] = np.clip(X[col], lower, upper)
        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech - bez zmian."""
        if input_features is None:
            return None
        return np.array(input_features)


class NumericScaler(BaseEstimator, TransformerMixin):
    """Standaryzacja kolumn numerycznych (pomija bool i *_missing)."""

    def __init__(self):
        self.scaler_ = None
        self.num_cols_ = None

    def fit(self, X, y=None):
        num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
        bool_cols = X.select_dtypes(include=[bool]).columns.tolist()
        self.num_cols_ = [col for col in num_cols 
                         if col not in bool_cols and not col.endswith("_missing")]

        if len(self.num_cols_) > 0:
            self.scaler_ = StandardScaler()
            self.scaler_.fit(X[self.num_cols_])
        return self

    def transform(self, X):
        X = X.copy()
        if self.scaler_ is not None and len(self.num_cols_) > 0:
            X[self.num_cols_] = self.scaler_.transform(X[self.num_cols_])
        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech - bez zmian."""
        if input_features is None:
            return None
        return np.array(input_features)


class OneHotEncoder(BaseEstimator, TransformerMixin):
    """One-hot encoding dla kolumn kategorycznych."""

    def __init__(self):
        self.cat_cols_ = None
        self.encoded_cols_ = None

    def fit(self, X, y=None):
        self.cat_cols_ = X.select_dtypes(exclude=[np.number]).columns.tolist()
        if len(self.cat_cols_) > 0:
            X_encoded = pd.get_dummies(X, columns=self.cat_cols_, prefix=self.cat_cols_)
            self.encoded_cols_ = X_encoded.columns.tolist()
        else:
            self.encoded_cols_ = X.columns.tolist()
        return self

    def transform(self, X):
        X = X.copy()
        if len(self.cat_cols_) > 0:
            X = pd.get_dummies(X, columns=self.cat_cols_, prefix=self.cat_cols_)
            for col in self.encoded_cols_:
                if col not in X.columns:
                    X[col] = 0
            X = X[self.encoded_cols_]
        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po one-hot encoding."""
        return np.array(self.encoded_cols_)


class LowVarianceDropper(BaseEstimator, TransformerMixin):
    """Usuwa kolumny o niskiej wariancji."""

    def __init__(self, var_threshold=0.01):
        self.var_threshold = var_threshold

    def fit(self, X, y=None):
        num_cols = X.select_dtypes(include=[np.number, np.bool_]).columns
        variances = X[num_cols].var(numeric_only=True)
        self.low_var_cols_ = variances[variances < self.var_threshold].index.tolist()
        if len(self.low_var_cols_) > 0:
            print(f"‚ö†Ô∏è Zapamiƒôtano {len(self.low_var_cols_)} kolumn o niskiej wariancji (< {self.var_threshold})")
        return self

    def transform(self, X):
        return X.drop(columns=self.low_var_cols_, errors='ignore')

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po usuniƒôciu kolumn o niskiej wariancji."""
        if input_features is None:
            return None
        return np.array([col for col in input_features if col not in self.low_var_cols_])


class HighCorrelationDropper(BaseEstimator, TransformerMixin):
    """Usuwa kolumny silnie skorelowane."""

    def __init__(self, corr_threshold=0.9):
        self.corr_threshold = corr_threshold


    def fit(self, X, y=None):
        num_cols = X.select_dtypes(include=[np.number, np.bool_]).columns
        corr_matrix = X[num_cols].corr().abs()
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        self.high_corr_cols_ = [col for col in upper.columns if any(upper[col] > self.corr_threshold)]
        if len(self.high_corr_cols_) > 0:
            print(f"üîÅ Zapamiƒôtano {len(self.high_corr_cols_)} kolumn z wysokƒÖ korelacjƒÖ (> {self.corr_threshold})")
        return self

    def transform(self, X):
        return X.drop(columns=self.high_corr_cols_, errors='ignore')

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po usuniƒôciu skorelowanych kolumn."""
        if input_features is None:
            return None
        return np.array([col for col in input_features if col not in self.high_corr_cols_])


class WoETransformer(BaseEstimator, TransformerMixin):
    """
    Transformer zamieniajƒÖcy zmienne numeryczne na WoE wzglƒôdem y (default flag).

    Za≈Ço≈ºenia:
    - y = 1 -> 'bad' (default)
    - y = 0 -> 'good' (brak defaultu)

    Dzia≈Ça w dw√≥ch krokach:
    1) dzieli ka≈ºdƒÖ kolumnƒô na n_bins kwantylowych przedzia≈Ç√≥w (+ osobny bin na missing),
    2) liczy WoE dla ka≈ºdego binu i zapisuje s≈Çowniki mapowa≈Ñ.
    """

    def __init__(self, n_bins=5, eps=0.5):
        """
        n_bins: liczba bin√≥w kwantylowych (bez binu na brak)
        eps: smoothing dodawany do licznik√≥w good/bad, ≈ºeby uniknƒÖƒá WoE = +/- inf
        """
        self.n_bins = n_bins
        self.eps = eps

    def fit(self, X, y):
        X = X.copy()
        y = pd.Series(y)

        # bierzemy tylko kolumny numeryczne (WoE ma sens g≈Ç√≥wnie tam)
        self.num_cols_ = X.select_dtypes(include=[np.number]).columns.tolist()

        # globalne liczebno≈õci
        self.total_good_ = (y == 0).sum()
        self.total_bad_ = (y == 1).sum()

        self.bin_edges_ = {}
        self.woe_maps_ = {}
        self.iv_ = {}

        for col in self.num_cols_:
            col_data = X[col]
            df_tmp = pd.DataFrame({"x": col_data, "y": y})

            # osobny bin na braki
            missing_mask = df_tmp["x"].isna()

            # kwantylowy binning na nie-missing
            if (~missing_mask).sum() == 0:
                # kolumna w ca≈Ço≈õci pusta -> WoE = 0
                self.bin_edges_[col] = None
                self.woe_maps_[col] = {"MISSING": 0.0}
                self.iv_[col] = 0.0
                continue

            try:
                # retbins=True -> dostajemy krawƒôdzie przedzia≈Ç√≥w
                _, bins = pd.qcut(
                    df_tmp.loc[~missing_mask, "x"],
                    q=self.n_bins,
                    duplicates="drop",
                    retbins=True
                )
            except ValueError:
                # za ma≈Ço unikalnych warto≈õci -> jeden bin
                bins = np.unique(df_tmp.loc[~missing_mask, "x"])
                if bins.size == 1:
                    bins = np.array([bins[0] - 1e-6, bins[0] + 1e-6])

            self.bin_edges_[col] = bins

            # przypisanie bin√≥w
            df_tmp["bin"] = pd.cut(
                df_tmp["x"],
                bins=bins,
                include_lowest=True
            )
            df_tmp["bin"] = df_tmp["bin"].astype(object)

            df_tmp.loc[missing_mask, "bin"] = "MISSING"

            # agregacja good/bad per bin
            grouped = df_tmp.groupby("bin")["y"]
            good = (grouped.apply(lambda s: (s == 0).sum()) + self.eps)
            bad = (grouped.apply(lambda s: (s == 1).sum()) + self.eps)

            dist_good = good / (self.total_good_ + self.eps * len(good))
            dist_bad = bad / (self.total_bad_ + self.eps * len(bad))

            woe = np.log(dist_good / dist_bad)

            # zapisujemy mapowanie: bin -> WoE
            woe_map = woe.to_dict()
            self.woe_maps_[col] = woe_map

            # policz IV tej zmiennej (przyda siƒô p√≥≈∫niej do raportu)
            iv_col = ((dist_good - dist_bad) * woe).sum()
            self.iv_[col] = iv_col

        return self

    def transform(self, X):
        X = X.copy()

        for col in self.num_cols_:
            if col not in X.columns:
                continue

            col_data = X[col]
            bins = self.bin_edges_[col]
            woe_map = self.woe_maps_[col]

            if bins is not None:
                binned = pd.cut(
                    col_data,
                    bins=bins,
                    include_lowest=True
                ).astype(object)
            else:
                # kolumna by≈Ça w ca≈Ço≈õci missing przy fit
                binned = pd.Series(["MISSING"] * len(X), index=X.index, dtype=object)

            # missing -> "MISSING"
            binned[col_data.isna()] = "MISSING"

            # zamiana bin√≥w na WoE; nieznane biny -> 0.0
            X[col] = binned.map(woe_map).fillna(0.0).astype(float)

        return X

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech - WoE nie zmienia nazw kolumn."""
        if input_features is None:
            return None
        return np.array(input_features)


class WoEDirectionalityFilter(BaseEstimator, TransformerMixin):
    """
    Dla cech po WoE:
    - liczy korelacjƒô (domy≈õlnie Spearmana) z targetem
    - zostawia tylko te kolumny, dla kt√≥rych korelacja jest wyra≈∫nie ujemna.
      (czyli: wiƒôksze WoE => mniej default√≥w)
    """

    def __init__(self, min_corr=-0.01, method="spearman", verbose=True):
        """
        min_corr : float
            pr√≥g ujemnej korelacji ‚Äì zostawiamy tylko kolumny z corr < min_corr
            np. -0.01 znaczy: zachowaj, je≈õli korelacja <= -0.01
        method : {"spearman", "pearson"}
        verbose : bool
        """
        self.min_corr = min_corr
        self.method = method
        self.verbose = verbose

    def fit(self, X, y):
        # zadbajmy o DataFrame z nazwami kolumn
        if isinstance(X, pd.DataFrame):
            X_df = X.copy()
        else:
            X_df = pd.DataFrame(X, columns=[f"x_{i}" for i in range(X.shape[1])])

        y_series = pd.Series(y)

        self.corrs_ = {}
        for col in X_df.columns:
            try:
                c = X_df[col].corr(y_series, method=self.method)
            except Exception:
                c = np.nan
            self.corrs_[col] = c

        # zostawiamy kolumny z wyra≈∫nie ujemnƒÖ korelacjƒÖ
        self.cols_to_keep_ = [
            col for col, c in self.corrs_.items()
            if pd.notna(c) and c < self.min_corr
        ]

        if self.verbose:
            total = X_df.shape[1]
            kept = len(self.cols_to_keep_)
            dropped = total - kept
            print(
                f"üßπ WoEDirectionalityFilter: zachowano {kept}/{total} kolumn, "
                f"usuniƒôto {dropped} (corr >= {self.min_corr:.3f})"
            )

        return self

    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            X_df = X
        else:
            # je≈õli X jest macierzƒÖ ‚Äì zak≈Çadamy tƒô samƒÖ kolejno≈õƒá kolumn co w fit
            X_df = pd.DataFrame(X, columns=list(self.corrs_.keys()))

        return X_df[self.cols_to_keep_]

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po filtrowaniu."""
        return np.array(self.cols_to_keep_)


class DropColumnsTransformer(BaseEstimator, TransformerMixin):
    """
    Transformer usuwajƒÖcy wskazane kolumny.

    Mo≈ºna:
    - przekazaƒá listƒô kolumn w parametrze `columns`
    - albo ≈õcie≈ºkƒô do pliku CSV z listƒÖ cech (`columns_path`),
      gdzie kolumna z nazwami cech nazywa siƒô np. 'feature'.

    U≈ºywamy go przed WoE, ≈ºeby wyrzuciƒá cechy z dodatnimi beta / wysokim VIF.
    """

    def __init__(self, columns=None, columns_path=None, feature_col="feature"):
        self.columns = columns
        self.columns_path = columns_path
        self.feature_col = feature_col
        self.columns_ = None

    def fit(self, X, y=None):
        # Je≈õli kolumny podane "na sztywno"
        if self.columns is not None:
            self.columns_ = list(self.columns)
            return self

        # Je≈õli mamy ≈õcie≈ºkƒô do CSV z listƒÖ cech
        if self.columns_path is not None:
            try:
                df_cols = pd.read_csv(self.columns_path)
                if self.feature_col not in df_cols.columns:
                    raise ValueError(
                        f"Plik {self.columns_path} nie zawiera kolumny '{self.feature_col}' "
                        "z nazwami cech."
                    )
                self.columns_ = df_cols[self.feature_col].astype(str).tolist()
                if len(self.columns_) > 0:
                    print(
                        f"üßπ DropColumnsTransformer: zapamiƒôtano {len(self.columns_)} kolumn "
                        f"do usuniƒôcia z pliku {self.columns_path}"
                    )
                else:
                    print(
                        f"üßπ DropColumnsTransformer: plik {self.columns_path} jest pusty ‚Äì "
                        "nie usuwamy ≈ºadnych kolumn."
                    )
            except FileNotFoundError:
                print(
                    f"‚ö†Ô∏è DropColumnsTransformer: nie znaleziono pliku {self.columns_path}. "
                    "Nie usuwamy ≈ºadnych kolumn."
                )
                self.columns_ = []
        else:
            # Nic nie podano ‚Äì transformer jest no-op
            self.columns_ = []

        return self

    def transform(self, X):
        X = X.copy()
        if not self.columns_:
            return X
        return X.drop(columns=self.columns_, errors="ignore")

    def get_feature_names_out(self, input_features=None):
        """Zwraca nazwy cech po usuniƒôciu kolumn."""
        if input_features is None:
            return None
        if not self.columns_:
            return np.array(input_features)
        return np.array([col for col in input_features if col not in self.columns_])


In [None]:
# dopasowanie_pipeline.py
import os
import sys
import pandas as pd
import numpy as np
import joblib
# ...reszta import√≥w (sklearn, transformers itd.)

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ KONFIGURACJA ≈öCIE≈ªEK ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
BASE_DIR = os.path.dirname(os.path.abspath(__file__))        # .../IWUM-Projekt-1/EDA
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # .../IWUM-Projekt-1

# ≈ºeby import transformers.py z tego folderu zawsze dzia≈Ça≈Ç
if BASE_DIR not in sys.path:
    sys.path.append(BASE_DIR)

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(BASE_DIR, "preprocesing_pipelines")  # dok≈Çadnie tak, jak folder siƒô nazywa u Ciebie
os.makedirs(PREPROC_DIR, exist_ok=True)

INTERP_LOGIT_DIR = os.path.join(
    PROJECT_ROOT,
    "Modele_interpretowalne",
    "interpretowalnosc_logit",
)
FEATURES_TO_DROP_PATH = os.path.join(INTERP_LOGIT_DIR, "logit_features_to_drop.csv")
FEATURES_TO_DROP_PATH_K5 = os.path.join(INTERP_LOGIT_DIR, "drop_columns_k5.csv")


from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

from eda_transformers import (
    InfinityReplacer,
    HighMissingDropper,
    MissingIndicator,
    CustomImputer,
    Winsorizer,
    LowVarianceDropper,
    HighCorrelationDropper,
    OneHotEncoder,
    NumericScaler,   # mo≈ºe siƒô jeszcze przydaƒá, na razie nie u≈ºywamy
    WoETransformer,   # NOWY transformer, musi byƒá dodany w transformers.py
    WoEDirectionalityFilter,  # potrzebny zeby logit byl interpretowalny
    DropColumnsTransformer
)

import joblib


# ========= PIPELINE DLA DRZEWA DECYZYJNEGO =========

def create_tree_preprocessing_pipeline(
    missing_threshold: float = 0.95,
    lower_q: float = 0.02,
    upper_q: float = 0.98,
    var_threshold: float = 0.01,
    corr_threshold: float = 0.9,
) -> Pipeline:
    """
    Preprocessing pod drzewo:
    - OneHotEncoder dla zmiennych kategorycznych
    - zamiana inf na NaN
    - wyrzucenie kolumn z ogromnƒÖ liczbƒÖ brak√≥w
    - dodanie wska≈∫nik√≥w brak√≥w
    - imputacja (median / most_frequent)
    - winsoryzacja (obciƒôcie outlier√≥w)
    - wyrzucenie kolumn o bardzo ma≈Çej wariancji
    - wyrzucenie kolumn mocno skorelowanych
    - BEZ skalowania (drzewo go nie potrzebuje)
    """
    steps = [
        ("one_hot", OneHotEncoder()),
        ("inf_replacer", InfinityReplacer()),
        ("drop_high_missing", HighMissingDropper(missing_threshold=missing_threshold)),
        ("missing_indicator", MissingIndicator()),
        ("imputer", CustomImputer()),
        ("winsorizer", Winsorizer(lower_q=lower_q, upper_q=upper_q)),
        ("drop_low_variance", LowVarianceDropper(var_threshold=var_threshold)),
        ("drop_high_corr", HighCorrelationDropper(corr_threshold=corr_threshold)),
    ]

    return Pipeline(steps)


# ========= PIPELINE DLA REGRESJI LOGISTYCZNEJ (WoE) =========

def create_logit_preprocessing_pipeline(
    missing_threshold: float = 0.95,
    lower_q: float = 0.02,
    upper_q: float = 0.98,
    var_threshold: float = 0.01,
    corr_threshold: float = 0.9,
    n_bins: int = 5,
) -> Pipeline:
    """
    Preprocessing pod regresjƒô logistycznƒÖ z WoE:
    - OneHotEncoder (na razie zostawiamy, bo drzewo te≈º go ma; mo≈ºna p√≥≈∫niej upro≈õciƒá)
    - zamiana inf na NaN
    - wyrzucenie kolumn z ogromnƒÖ liczbƒÖ brak√≥w
    - dodanie wska≈∫nik√≥w brak√≥w
    - imputacja (median / most_frequent)
    - winsoryzacja
    - wyrzucenie kolumn o bardzo ma≈Çej wariancji
    - WoETransformer (binning + WoE na zmiennych numerycznych)
    - wyrzucenie kolumn mocno skorelowanych JU≈ª po WoE
    - BEZ skalowania (WoE jest ju≈º na sensownej skali)
    """
    steps = [
        ("one_hot", OneHotEncoder()),
        ("inf_replacer", InfinityReplacer()),
        ("drop_high_missing", HighMissingDropper(missing_threshold=missing_threshold)),
        ("missing_indicator", MissingIndicator()),
        ("imputer", CustomImputer()),
        ("winsorizer", Winsorizer(lower_q=lower_q, upper_q=upper_q)),
        ("drop_low_variance", LowVarianceDropper(var_threshold=var_threshold)),
        ("drop_high_corr", HighCorrelationDropper(corr_threshold=corr_threshold)),
        ("woe", WoETransformer(n_bins=n_bins)),
        ("woe_directionality", WoEDirectionalityFilter(min_corr=-0.01, method="spearman")),
        ("drop_bad_for_logit", DropColumnsTransformer(columns_path=FEATURES_TO_DROP_PATH)),
        ("drop_unnessesary_for_logit", DropColumnsTransformer(columns_path=FEATURES_TO_DROP_PATH_K5)),
    ]

    return Pipeline(steps)


# ========= PIPELINE DLA MODELI NIEINTERPRETOWALNYCH (XGBoost, LightGBM, MLP) =========

def create_blackbox_preprocessing_pipeline(
    missing_threshold: float = 0.95,
    lower_q: float = 0.02,
    upper_q: float = 0.98,
    var_threshold: float = 0.01,
    corr_threshold: float = 0.9,
) -> Pipeline:
    """
    Preprocessing pod modele nieinterpretowalne (XGBoost, LightGBM, MLP):
    - OneHotEncoder dla zmiennych kategorycznych
    - zamiana inf na NaN
    - wyrzucenie kolumn z ogromnƒÖ liczbƒÖ brak√≥w
    - dodanie wska≈∫nik√≥w brak√≥w
    - imputacja (median / most_frequent)
    - winsoryzacja (obciƒôcie outlier√≥w)
    - wyrzucenie kolumn o bardzo ma≈Çej wariancji
    - wyrzucenie kolumn mocno skorelowanych
    - NumericScaler (standaryzacja dla MLP - dla drzew nie szkodzi)
    """
    steps = [
        ("one_hot", OneHotEncoder()),
        ("inf_replacer", InfinityReplacer()),
        ("drop_high_missing", HighMissingDropper(missing_threshold=missing_threshold)),
        ("missing_indicator", MissingIndicator()),
        ("imputer", CustomImputer()),
        ("winsorizer", Winsorizer(lower_q=lower_q, upper_q=upper_q)),
        ("drop_low_variance", LowVarianceDropper(var_threshold=var_threshold)),
        ("drop_high_corr", HighCorrelationDropper(corr_threshold=corr_threshold)),
        ("scaler", NumericScaler()),  # Dodajemy skalowanie dla MLP
    ]

    return Pipeline(steps)

# ========= G≈Å√ìWNY BLOK: PODZIA≈Å DANYCH + FITOWANIE PIPELINE‚Äô√ìW =========

if __name__ == "__main__":
    # 1. Wczytanie danych
    df = pd.read_csv(DATA_PATH)

    # Zak≈Çadamy, ≈ºe kolumna celu to 'default'
    X = df.drop(columns=["default"])
    y = df["default"]

    print("Rozmiar pe≈Çnego zbioru:", X.shape)

    # 2. Podzia≈Ç train / temp / test (60 / 20 / 20) ze sta≈Çym random_state
    X_train, X_temp, y_train, y_temp = train_test_split(
        X,
        y,
        test_size=0.4,
        stratify=y,
        random_state=42,
    )

    X_val, X_test, y_val, y_test = train_test_split(
        X_temp,
        y_temp,
        test_size=0.5,
        stratify=y_temp,
        random_state=42,
    )

    print("Train:", X_train.shape, "Val:", X_val.shape, "Test:", X_test.shape)

    # 3. Tworzymy oba pipeline‚Äôy
    tree_pipeline = create_tree_preprocessing_pipeline(
        missing_threshold=0.95,
        lower_q=0.02,
        upper_q=0.98,
        var_threshold=0.01,
        corr_threshold=0.9,
    )

    logit_pipeline = create_logit_preprocessing_pipeline(
        missing_threshold=0.95,
        lower_q=0.02,
        upper_q=0.98,
        var_threshold=0.01,
        corr_threshold=0.9,
        n_bins=5,
    )
        # Pipeline dla modeli nieinterpretowalnych
    blackbox_pipeline = create_blackbox_preprocessing_pipeline(
        missing_threshold=0.95,
        lower_q=0.02,
        upper_q=0.98,
        var_threshold=0.01,
        corr_threshold=0.9,
    )

    # 4. Fitujemy pipeline‚Äôy na zbiorze treningowym
    print("\n Fitowanie pipeline‚Äôu dla drzewa na zbiorze treningowym...")
    X_train_tree = tree_pipeline.fit_transform(X_train, y_train)
    print("Kszta≈Çt po przetworzeniu (drzewo):", X_train_tree.shape)

    print("\n Fitowanie pipeline‚Äôu dla logitu (WoE) na zbiorze treningowym...")
    X_train_logit = logit_pipeline.fit_transform(X_train, y_train)
    print("Kszta≈Çt po przetworzeniu (logit+WoE):", X_train_logit.shape)
    
    print("\n Fitowanie pipeline'u dla modeli nieinterpretowalnych na zbiorze treningowym...")
    X_train_blackbox = blackbox_pipeline.fit_transform(X_train, y_train)
    print("Kszta≈Çt po przetworzeniu (blackbox):", X_train_blackbox.shape)


    # 5. Zapisujemy pipeline‚Äôy do plik√≥w
    joblib.dump(tree_pipeline, os.path.join(PREPROC_DIR, "preprocessing_tree.pkl"))
    joblib.dump(logit_pipeline, os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl"))
    joblib.dump(blackbox_pipeline, os.path.join(PREPROC_DIR, "preprocessing_blackbox.pkl"))

    print("\n Zapisano pipeline'y:")
    print("   - preprocessing_tree.pkl")
    print("   - preprocessing_logit_woe.pkl")
    print("   - preprocessing_blackbox.pkl")


In [None]:
# modele_interpretowalne.py
import os
import pandas as pd
import numpy as np
import joblib
# reszta import√≥w sklearn, warnings, itd.

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ KONFIGURACJA ≈öCIE≈ªEK ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
BASE_DIR = os.path.dirname(os.path.abspath(__file__))        # .../IWUM-Projekt-1/Modele_interpretowalne
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # .../IWUM-Projekt-1

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(PROJECT_ROOT, "EDA", "preprocesing_pipelines")

MODELS_DIR = os.path.join(BASE_DIR, "models")
RESULTS_DIR = os.path.join(BASE_DIR, "model_results")

os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    log_loss,
    brier_score_loss,
)
from scipy.stats import ks_2samp
import warnings

warnings.filterwarnings("ignore")

# =====================================================================
#                            CUSTOM METRICS
# =====================================================================

def gini_from_auc(auc):
    return 2 * auc - 1

def calculate_ks_statistic(y_true, y_pred_proba):
    """Kolmogorov-Smirnov statistic."""
    data = pd.DataFrame({"y": y_true, "p": y_pred_proba}).sort_values("p")

    pos_probs = data.loc[data["y"] == 1, "p"]
    neg_probs = data.loc[data["y"] == 0, "p"]

    if len(pos_probs) == 0 or len(neg_probs) == 0:
        return np.nan

    ks_stat, _ = ks_2samp(pos_probs, neg_probs)
    return ks_stat


# =====================================================================
#                        GRIDY HIPERPARAMETR√ìW
# =====================================================================

def create_logistic_regression_grid():
    """
    Logit na WoE ‚Äî legalne kombinacje penalty/solver:
    - L2 + lbfgs / newton-cg
    - L1 + saga / liblinear
    - Elasticnet + saga
    """
    base_model = LogisticRegression(
        max_iter=1000,
        random_state=42,
        n_jobs=-1,
    )

    param_grid = [
        {
            "penalty": ["l2"],
            "solver": ["lbfgs", "newton-cg"],
            "C": [0.01, 0.1, 1.0, 10.0],
            "class_weight": [None, "balanced"],
        },
        {
            "penalty": ["l1"],
            "solver": ["liblinear", "saga"],
            "C": [0.01, 0.1, 1.0, 10.0],
            "class_weight": [None, "balanced"],
        },
        {
            "penalty": ["elasticnet"],
            "solver": ["saga"],
            "l1_ratio": [0.3, 0.5, 0.7],
            "C": [0.01, 0.1, 1.0, 10.0],
            "class_weight": [None, "balanced"],
        },
    ]

    return base_model, param_grid


def create_decision_tree_grid():
    """
    Drzewo interpretowalne (p≈Çytkie) + pruning.
    """
    model = DecisionTreeClassifier(random_state=42)

    param_grid = {
        "max_depth": [3, 4, 5, 7, 10],
        "min_samples_split": [20, 50, 100],
        "min_samples_leaf": [20, 50, 100],
        "criterion": ["gini", "entropy"],
        "class_weight": [None, "balanced"],
        "ccp_alpha": [0.0, 0.001, 0.01],
    }

    return model, param_grid


# =====================================================================
#                           EWALUACJA MODELI
# =====================================================================

def evaluate_model(model, X, y, model_name="Model", dataset_name="val"):
    y_pred_proba = model.predict_proba(X)[:, 1]

    roc = roc_auc_score(y, y_pred_proba)
    
    return {
        "model_name": model_name,
        "dataset": dataset_name,
        "roc_auc": roc,
        "gini": 2 * roc - 1,
        "pr_auc": average_precision_score(y, y_pred_proba),
        "ks": calculate_ks_statistic(y, y_pred_proba),
        "log_loss": log_loss(y, y_pred_proba),
        "brier": brier_score_loss(y, y_pred_proba),
    }

def print_evaluation_table(results):
    df = pd.DataFrame(results)
    print("\n" + "=" * 70)
    print("              WYNIKI MODELI (VAL + TEST)")
    print("=" * 70)
    print(df.to_string(index=False))
    print("=" * 70)
    return df


# =====================================================================
#                         GRIDSEARCH DLA MODELU
# =====================================================================

def train_with_gridsearch(
    model, param_grid, X_train, y_train, model_name="Model", cv=5
):
    print("\n" + "=" * 80)
    print(f" GridSearch: {model_name}")
    print("=" * 80)

    gs = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring="roc_auc",  #  tylko ROC-AUC, ≈ºadnych custom scorer√≥w
        cv=cv,
        n_jobs=-1,
        verbose=1,
        return_train_score=False,
    )

    gs.fit(X_train, y_train)

    print("\nNajlepsze parametry:")
    print(gs.best_params_)
    print(f"Najlepszy ROC-AUC CV: {gs.best_score_:.4f}")

    return gs.best_estimator_, gs


# =====================================================================
#                                MAIN
# =====================================================================

def main():
    print(" Wczytywanie danych...")
    df = pd.read_csv(DATA_PATH)

    X = df.drop(columns=["default"])
    y = df["default"]

    # Podzia≈Ç jak w EDA.py ‚Äî 60/20/20
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )

    print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")

    print("\n ≈Åadowanie pipeline‚Äô√≥w...")
    tree_preproc = joblib.load(os.path.join(PREPROC_DIR, "preprocessing_tree.pkl"))
    logit_preproc = joblib.load(os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl"))

    print("\n Transformacja danych dla drzewa...")
    X_train_tree = tree_preproc.transform(X_train)
    X_val_tree = tree_preproc.transform(X_val)
    X_test_tree = tree_preproc.transform(X_test)

    print("\n Transformacja danych dla logitu...")
    X_train_logit = logit_preproc.transform(X_train)
    X_val_logit = logit_preproc.transform(X_val)
    X_test_logit = logit_preproc.transform(X_test)

    # ============================
    #       GRIDSEARCH LOGIT
    # ============================
    logit_model, logit_grid = create_logistic_regression_grid()
    best_logit, gs_logit = train_with_gridsearch(
        logit_model, logit_grid, X_train_logit, y_train, "Logit (WoE)", cv=5
    )

    # ============================
    #       GRIDSEARCH DRZEWO
    # ============================
    tree_model, tree_grid = create_decision_tree_grid()
    best_tree, gs_tree = train_with_gridsearch(
        tree_model, tree_grid, X_train_tree, y_train, "Decision Tree", cv=5
    )

    # ============================
    #            EWALUACJA
    # ============================
    results = []

    # logit
    results.append(evaluate_model(best_logit, X_val_logit, y_val, "Logit_WoE", "val"))
    results.append(evaluate_model(best_logit, X_test_logit, y_test, "Logit_WoE", "test"))

    # drzewo
    results.append(evaluate_model(best_tree, X_val_tree, y_val, "DecisionTree", "val"))
    results.append(evaluate_model(best_tree, X_test_tree, y_test, "DecisionTree", "test"))

    df_results = print_evaluation_table(results)

    # ============================
    #             ZAPIS
    # ============================
    print("\n Zapisujemy modele...")

    joblib.dump(best_logit, os.path.join(MODELS_DIR, "best_logistic_regression_woe.pkl"))
    joblib.dump(best_tree, os.path.join(MODELS_DIR, "best_decision_tree.pkl"))
    
    df_results.to_csv(
        os.path.join(RESULTS_DIR, "model_evaluation_results.csv"),
        index=False,
    )
    
    pd.DataFrame(gs_logit.cv_results_).to_csv(
        os.path.join(RESULTS_DIR, "grid_results_logit_woe.csv"),
        index=False,
    )
    pd.DataFrame(gs_tree.cv_results_).to_csv(
        os.path.join(RESULTS_DIR, "grid_results_tree.csv"),
        index=False,
    )
    
    print("\n================ BETA COEFFICIENTS ================\n")

    # pobierz nazwy zmiennych po transformacji WOE + DropColumns
    woe_feature_names = logit_preproc.get_feature_names_out()
    
    # ale DropColumnsTransformer uciƒÖ≈Ç kolumny ‚Äî wiƒôc
    # pobieramy REALNE nazwy cech po transformacji
    X_logit_df = pd.DataFrame(X_train_logit)
    feature_names = list(X_logit_df.columns)
    
    # wsp√≥≈Çczynniki
    betas = best_logit.coef_[0]
    intercept = best_logit.intercept_[0]
    
    print(f"Intercept (Œ≤0): {intercept:.6f}\n")
    
    for fname, beta in zip(feature_names, betas):
        print(f"{fname:40s}  Œ≤ = {beta:.6f}")

    print("Zapisano wszystkie modele i wyniki.")

    return best_logit, best_tree, df_results


if __name__ == "__main__":
    main()


In [None]:
#ocena_jakosci_modelow_wykresy.py
import os
import pandas as pd
import numpy as np
import joblib
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    roc_curve,
    roc_auc_score,
    precision_recall_curve,
    average_precision_score,
    brier_score_loss,
)
from sklearn.calibration import calibration_curve

# ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ KONFIGURACJA ≈öCIE≈ªEK ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
BASE_DIR = os.path.dirname(os.path.abspath(__file__))        # .../IWUM-Projekt-1/Modele_interpretowalne
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # .../IWUM-Projekt-1

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(PROJECT_ROOT, "EDA", "preprocesing_pipelines")
MODELS_DIR = os.path.join(BASE_DIR, "models")
PLOTS_DIR = os.path.join(BASE_DIR, "wykresy_oceny_jakosci")

os.makedirs(PLOTS_DIR, exist_ok=True)


# =====================================================================
#                 FUNKCJE POMOCNICZE / METRYKI DODATKOWE
# =====================================================================

def calculate_ks_statistic(y_true, y_pred_proba):
    df = pd.DataFrame({"y": y_true, "p": y_pred_proba}).sort_values("p")
    pos = df.loc[df["y"] == 1, "p"]
    neg = df.loc[df["y"] == 0, "p"]

    if len(pos) == 0 or len(neg) == 0:
        return np.nan

    from scipy.stats import ks_2samp
    ks_stat, _ = ks_2samp(pos, neg)
    return ks_stat


# =====================================================================
#                            WYKRESY
# =====================================================================

def plot_roc(models, savepath=None):
    """ROC curves dla obu modeli (val + test)."""
    if savepath is None:
        savepath = os.path.join(PLOTS_DIR, "roc_logit_tree.png")

    plt.figure(figsize=(7, 6))
    for name, y_val, p_val, y_test, p_test in models:
        fpr_val, tpr_val, _ = roc_curve(y_val, p_val)
        fpr_test, tpr_test, _ = roc_curve(y_test, p_test)

        auc_val = roc_auc_score(y_val, p_val)
        auc_test = roc_auc_score(y_test, p_test)

        plt.plot(fpr_val, tpr_val, label=f"{name} ‚Äì Val (AUC={auc_val:.3f})")
        plt.plot(fpr_test, tpr_test, label=f"{name} ‚Äì Test (AUC={auc_test:.3f})")

    plt.plot([0, 1], [0, 1], "--", label="Losowy model")
    plt.xlabel("FPR")
    plt.ylabel("TPR")
    plt.title("ROC curve ‚Äì Logit (WoE) vs Decision Tree")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(savepath, dpi=150)
    plt.close()


def plot_pr(models, savepath=None):
    """Precision‚ÄìRecall curves dla obu modeli (val + test)."""
    if savepath is None:
        savepath = os.path.join(PLOTS_DIR, "pr_logit_tree.png")

    plt.figure(figsize=(7, 6))

    for name, y_val, p_val, y_test, p_test in models:
        prec_val, rec_val, _ = precision_recall_curve(y_val, p_val)
        prec_test, rec_test, _ = precision_recall_curve(y_test, p_test)

        ap_val = average_precision_score(y_val, p_val)
        ap_test = average_precision_score(y_test, p_test)

        plt.plot(rec_val, prec_val, label=f"{name} ‚Äì Val (AP={ap_val:.3f})")
        plt.plot(rec_test, prec_test, label=f"{name} ‚Äì Test (AP={ap_test:.3f})")

    baseline = models[0][1].mean()
    plt.hlines(baseline, 0, 1, linestyles="--", label=f"Baseline ({baseline:.3f})")

    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("PR curve ‚Äì Logit (WoE) vs Decision Tree")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(savepath, dpi=150)
    plt.close()


def plot_calibration(models, savepath=None, n_bins=10):
    """Calibration / reliability plot dla obu modeli (val + test)."""
    if savepath is None:
        savepath = os.path.join(PLOTS_DIR, "calibration_logit_tree.png")

    plt.figure(figsize=(7, 6))
    plt.plot([0, 1], [0, 1], "--", label="Idealna kalibracja")

    for name, y_val, p_val, y_test, p_test in models:
        pt_val, pp_val = calibration_curve(y_val, p_val, n_bins=n_bins)
        pt_test, pp_test = calibration_curve(y_test, p_test, n_bins=n_bins)

        plt.plot(pp_val, pt_val, "o-", label=f"{name} ‚Äì Val")
        plt.plot(pp_test, pt_test, "s-", label=f"{name} ‚Äì Test")

    plt.xlabel("≈örednie przewidziane PD (bin)")
    plt.ylabel("Rzeczywista czƒôsto≈õƒá default√≥w")
    plt.title("Calibration ‚Äì Logit (WoE) vs Decision Tree")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(savepath, dpi=150)
    plt.close()


def plot_hist(models, out_dir=None):
    """Histogramy PD dla good/bad osobno dla ka≈ºdego modelu i zbioru."""
    if out_dir is None:
        out_dir = PLOTS_DIR

    for name, y_val, p_val, y_test, p_test in models:
        for ds_name, y, p in [("val", y_val, p_val), ("test", y_test, p_test)]:
            plt.figure(figsize=(7, 6))
            df = pd.DataFrame({"y": y, "p": p})

            plt.hist(
                df[df["y"] == 0]["p"],
                bins=20,
                alpha=0.6,
                density=True,
                label="Good",
            )
            plt.hist(
                df[df["y"] == 1]["p"],
                bins=20,
                alpha=0.6,
                density=True,
                label="Bad",
            )

            plt.xlabel("Przewidywane PD")
            plt.ylabel("Gƒôsto≈õƒá")
            plt.title(f"Histogram PD ‚Äì {name} ‚Äì {ds_name}")
            plt.legend()
            plt.grid(True, alpha=0.3)
            plt.tight_layout()

            fname = f"hist_{name}_{ds_name}.png"
            plt.savefig(os.path.join(out_dir, fname), dpi=150)
            plt.close()


# =====================================================================
#                                MAIN
# =====================================================================

def main():
    print(" Wczytywanie danych i modeli...")
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]

    # modele
    logit = joblib.load(os.path.join(MODELS_DIR, "best_logistic_regression_woe.pkl"))
    tree = joblib.load(os.path.join(MODELS_DIR, "best_decision_tree.pkl"))

    # preprocessing
    preproc_logit = joblib.load(os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl"))
    preproc_tree = joblib.load(os.path.join(PREPROC_DIR, "preprocessing_tree.pkl"))

    # podzia≈Ç danych (jak w innych plikach)
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )

    # transformacje
    X_val_logit = preproc_logit.transform(X_val)
    X_test_logit = preproc_logit.transform(X_test)

    X_val_tree = preproc_tree.transform(X_val)
    X_test_tree = preproc_tree.transform(X_test)

    # predykcje
    p_val_logit = logit.predict_proba(X_val_logit)[:, 1]
    p_test_logit = logit.predict_proba(X_test_logit)[:, 1]

    p_val_tree = tree.predict_proba(X_val_tree)[:, 1]
    p_test_tree = tree.predict_proba(X_test_tree)[:, 1]

    # pakujemy modele do listy dla wygody
    MODELE = [
        ("Logit", y_val, p_val_logit, y_test, p_test_logit),
        ("Tree",  y_val, p_val_tree,  y_test, p_test_tree),
    ]

    # =====================================================================
    #                          WYKRESY
    # =====================================================================
    print(" Rysujƒô ROC...")
    plot_roc(MODELE)

    print(" Rysujƒô PR...")
    plot_pr(MODELE)

    print(" Rysujƒô calibration...")
    plot_calibration(MODELE)

    print(" Rysujƒô histogramy PD...")
    plot_hist(MODELE)

    print(" Wygenerowano wykresy dla logitu i drzewca!")
    print(f"   Pliki zapisane w: {PLOTS_DIR}")


if __name__ == "__main__":
    main()


In [None]:
#interpretowalnosc_regresja_logistyczna.py
import os
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt

# ============================================================
#                KONFIGURACJA ≈öCIE≈ªEK
# ============================================================

# Ten plik zak≈Çadamy, ≈ºe le≈ºy w:
#   .../IWUM-Projekt-1/Modele_interpretowalne/interpretowalnosc_regresja_logistyczna.py

BASE_DIR = os.path.dirname(os.path.abspath(__file__))        # .../Modele_interpretowalne
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # .../IWUM-Projekt-1

MODELS_DIR = os.path.join(BASE_DIR, "models")
INTERP_DIR = os.path.join(BASE_DIR, "interpretowalnosc_logit")

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(PROJECT_ROOT, "EDA", "preprocesing_pipelines")
PREPROC_LOGIT_PATH = os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl")

# podfoldery na wykresy
WYKRESY_DIR = os.path.join(INTERP_DIR, "waznosc_cech")
PDP_DIR = os.path.join(INTERP_DIR, "PDP")
ICE_DIR = os.path.join(INTERP_DIR, "ICE")
WOE_PROFILS = os.path.join(INTERP_DIR, "woe_profils")

os.makedirs(WOE_PROFILS, exist_ok=True)
os.makedirs(INTERP_DIR, exist_ok=True)
os.makedirs(WYKRESY_DIR, exist_ok=True)
os.makedirs(PDP_DIR, exist_ok=True)
os.makedirs(ICE_DIR, exist_ok=True)

INTERP_LOCAL_DIR = os.path.join(INTERP_DIR, "interpretowalnosc_lokalna")
os.makedirs(INTERP_LOCAL_DIR, exist_ok=True)

# ============================================================
#                ANALIZA WSP√ì≈ÅCZYNNIK√ìW LOGITU
# ============================================================

def load_logit_model():
    model_path = os.path.join(MODELS_DIR, "best_logistic_regression_woe.pkl")
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Nie znaleziono modelu logitu pod ≈õcie≈ºkƒÖ: {model_path}")
    logit = joblib.load(model_path)
    return logit


def load_logit_preprocessor():
    if not os.path.exists(PREPROC_LOGIT_PATH):
        raise FileNotFoundError(f"Nie znaleziono pipeline'u logitu: {PREPROC_LOGIT_PATH}")
    return joblib.load(PREPROC_LOGIT_PATH)


def extract_coefficients(logit):
    """
    Zwraca DataFrame z:
        - nazwƒÖ cechy
        - beta
        - |beta|
        - znakiem
        - odds_ratio = exp(beta)
    """
    coef = logit.coef_.ravel()
    intercept = float(logit.intercept_[0])

    if hasattr(logit, "feature_names_in_"):
        feature_names = np.array(logit.feature_names_in_)
    else:
        feature_names = np.array([f"x_{i}" for i in range(len(coef))])

    df_coef = pd.DataFrame({
        "feature": feature_names,
        "beta": coef,
    })
    df_coef["abs_beta"] = df_coef["beta"].abs()
    df_coef["sign"] = np.where(
        df_coef["beta"] > 0, "positive",
        np.where(df_coef["beta"] < 0, "negative", "zero")
    )
    df_coef["odds_ratio"] = np.exp(df_coef["beta"])

    df_coef = df_coef.sort_values("abs_beta", ascending=False).reset_index(drop=True)
    return df_coef, intercept


def summarize_signs(df_coef, intercept):
    n_total = len(df_coef)
    n_pos = (df_coef["sign"] == "positive").sum()
    n_neg = (df_coef["sign"] == "negative").sum()
    n_zero = (df_coef["sign"] == "zero").sum()

    print("\n============================")
    print("   PODSUMOWANIE WSP√ì≈ÅCZYNNIK√ìW LOGITU")
    print("============================")
    print(f"Intercept (Œ≤0): {intercept:.4f}")
    print(f"Liczba cech: {n_total}")
    print(f"  ‚Ä¢ beta > 0  (positive): {n_pos}")
    print(f"  ‚Ä¢ beta < 0  (negative): {n_neg}")
    print(f"  ‚Ä¢ beta = 0  (zero):     {n_zero}")

    if n_pos == 0 and n_neg > 0:
        print("\n Wszystkie niezerowe bety sƒÖ ujemne ‚Äì kierunek wp≈Çywu jest sp√≥jny z WoE.")
    elif n_neg == 0 and n_pos > 0:
        print("\n Wszystkie niezerowe bety sƒÖ dodatnie ‚Äì to oznacza odwrotnƒÖ konwencjƒô WoE.")
    else:
        print("\n Mamy mieszane znaki beta ‚Äì warto sprawdziƒá, kt√≥re cechy majƒÖ 'dziwny' kierunek.")
        print("   (np. problem z binningiem, korelacjami lub zmiennymi pomocniczymi).")


def save_coefficients(df_coef):
    out_path = os.path.join(INTERP_DIR, "coefficients_logit.csv")
    df_coef.to_csv(out_path, index=False)
    print(f"\n Zapisano tabelƒô wsp√≥≈Çczynnik√≥w do: {out_path}")


# ============================================================
#                POBRANIE I PRZETWORZENIE DANYCH
# ============================================================

def load_and_prepare_data(preproc_logit, logit):
    """Wczytuje pe≈Çne dane, dzieli na X, y, przepuszcza przez pipeline WoE
    i zwraca (X_woe_df, y_series)."""
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"].astype(int)

    X_woe = preproc_logit.transform(X)

    # wymuszamy DataFrame z nazwami cech jak w logit
    feature_names = logit.feature_names_in_
    X_woe_df = pd.DataFrame(X_woe, columns=feature_names)

    return X_woe_df, y


# ============================================================
#                       PROFILE WoE
# ============================================================

def plot_woe_profile(X_woe, y, feature, save_path):
    """
    Tworzy profil WoE dla danej cechy:
       - o≈õ X: warto≈õƒá WoE (po binningu)
       - o≈õ Y: czƒôsto≈õƒá default√≥w w danym binie
       - linia przerywana: ≈õredni default rate
       - pionowa linia w WoE = 0
    """
    df_tmp = pd.DataFrame({
        "woe": X_woe[feature],
        "y": y
    })

    # grupujemy po unikalnych warto≈õciach WoE (to de facto biny)
    grp = (
        df_tmp
        .groupby("woe")
        .agg(events=("y", "sum"), total=("y", "count"))
        .reset_index()
        .sort_values("woe")
    )
    grp["dr"] = grp["events"] / grp["total"]

    mean_dr = y.mean()

    plt.figure(figsize=(7, 5))
    plt.plot(grp["woe"], grp["dr"], "o-", label="default rate")
    plt.axhline(mean_dr, color="tab:blue", linestyle="--",
                label=f"≈öredni default (train) = {mean_dr:.3f}")
    plt.axvline(0.0, color="tab:blue", linestyle=":",
                label="WoE = 0")

    plt.xlabel("Warto≈õƒá WoE (po binningu)")
    plt.ylabel("Czƒôsto≈õƒá default√≥w w binie")
    plt.title(f"Profil WoE ‚Äì {feature}")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.savefig(save_path, dpi=150)
    plt.close()
    print(f"   ‚ûú zapisano profil WoE: {save_path}")


def generate_woe_profiles(df_coef, X_woe, y):
    """Rysuje:
       ‚Ä¢ profil dla jednej cechy z beta > 0 (je≈õli istnieje),
       ‚Ä¢ profile dla 9 cech z najwiƒôkszym |beta| i znakiem negative.
    """
    # cecha z dodatniƒÖ betƒÖ (je≈õli jest)
    df_pos = df_coef[df_coef["sign"] == "positive"]
    if len(df_pos) > 0:
        pos_feat = df_pos.iloc[0]["feature"]
        save_path = os.path.join(
            INTERP_DIR,"woe_profils", f"woe_profile_positive_beta_{pos_feat}.png"
        )
        print(f"\n Profil WoE dla cechy z dodatniƒÖ betƒÖ: {pos_feat}")
        plot_woe_profile(X_woe, y, pos_feat, save_path)
    else:
        print("\n Brak cech z dodatniƒÖ betƒÖ ‚Äì nie rysujƒô osobnego profilu dla beta > 0.")

    # top-5 cech z ujemnƒÖ betƒÖ wg |beta|
    df_neg = df_coef[df_coef["sign"] == "negative"].head(9)
    print("\n Profile WoE dla 9 cech z najwiƒôkszym |beta| (beta < 0):")
    for feat in df_neg["feature"]:
        save_path = os.path.join(INTERP_DIR,"woe_profils", f"woe_profile_top_negative_{feat}.png")
        plot_woe_profile(X_woe, y, feat, save_path)

# ============================================================
#      DIAGNOSTYKA LICZEBNO≈öCI BIN√ìW DLA PROFILI WOE
# ============================================================

from sklearn.model_selection import train_test_split


def get_train_split():
    """
    Odwzorowuje dok≈Çadnie ten sam podzia≈Ç 60/20/20,
    kt√≥rego u≈ºywali≈õmy do trenowania modeli.
    Zwraca X_train, y_train.
    """
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]

    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    # Val i test sƒÖ nam tutaj niepotrzebne ‚Äì patrzymy tylko na rozk≈Çad w trainie
    return X_train, y_train


def load_woe_transformer():
    """
    ≈Åaduje pipeline preprocessingowy dla logitu i wyciƒÖga z niego krok 'woe'.
    """
    preproc_path = os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl")
    if not os.path.exists(preproc_path):
        raise FileNotFoundError(f"Nie znaleziono pipeline'u WoE pod ≈õcie≈ºkƒÖ: {preproc_path}")

    preproc = joblib.load(preproc_path)
    if "woe" not in preproc.named_steps:
        raise ValueError("W pipeline'ie nie ma kroku o nazwie 'woe'.")

    return preproc.named_steps["woe"]


def compute_bin_table_for_feature(feature, X_train, y_train, woe_tr, min_count=50):
    """
    Buduje tabelƒô:
        bin (wg granic z WoE),
        count_total,
        count_good,
        count_bad,
        default_rate
    i zwraca jƒÖ jako DataFrame.
    """

    if feature not in woe_tr.bin_edges_:
        raise KeyError(f"Brak granic bin√≥w dla cechy '{feature}' w woe.bin_edges_")

    edges = np.array(woe_tr.bin_edges_[feature])

    # Dzielimy X_train na biny wed≈Çug granic z WoE
    bins = pd.cut(
        X_train[feature],
        bins=edges,
        include_lowest=True,
        right=True,
    )

    # Tabelka liczno≈õci good/bad
    ctab = pd.crosstab(bins, y_train).rename(columns={0: "good", 1: "bad"})
    if "good" not in ctab.columns:
        ctab["good"] = 0
    if "bad" not in ctab.columns:
        ctab["bad"] = 0

    ctab["total"] = ctab["good"] + ctab["bad"]
    ctab["default_rate"] = ctab["bad"] / ctab["total"].replace(0, np.nan)

    ctab = ctab.reset_index().rename(columns={X_train[feature].name: "bin"})

    # Flaga ma≈Çej liczno≈õci
    ctab["low_count_flag"] = ctab["total"] < min_count

    return ctab

def diagnose_bin_sizes(df_coef, n_top=5, min_count=50):
    """
    Sprawdza, czy dziwne zachowanie na ko≈Ñcach profili WoE
    mo≈ºe wynikaƒá z bardzo ma≈Çej liczno≈õci skrajnych bin√≥w.

    Dla:
      - top n_top cech wg |beta| (ujemne),
      - wszystkich cech z beta > 0
    zapisuje tabelki liczno≈õci do CSV.
    """

    print("\n Diagnostyka liczno≈õci bin√≥w WoE...")

    # 1. Pobieramy train i WoETransformera
    X_train, y_train = get_train_split()
    woe_tr = load_woe_transformer()

    # 2. Top cechy wg |beta| (ujemne)
    top_neg = df_coef[df_coef["sign"] == "negative"].head(n_top)["feature"].tolist()
    pos_feats = df_coef[df_coef["sign"] == "positive"]["feature"].tolist()

    features_to_check = top_neg + pos_feats

    all_tables = []

    for feat in features_to_check:
        try:
            tbl = compute_bin_table_for_feature(feat, X_train, y_train, woe_tr, min_count=min_count)
        except KeyError as e:
            print(f" [WARN] Pomijam {feat}: {e}")
            continue

        tbl["feature"] = feat
        all_tables.append(tbl)

        # Kr√≥tkie podsumowanie w konsoli
        print(f"\n Cechy bin√≥w ‚Äì {feat}:")
        print(tbl[["bin", "total", "good", "bad", "default_rate", "low_count_flag"]].to_string(index=False))

        # Zapis osobnego pliku CSV dla tej cechy
        out_path_single = os.path.join(
            INTERP_DIR,
            f"woe_bin_counts_{feat}.csv"
        )
        tbl.to_csv(out_path_single, index=False)

    if all_tables:
        full = pd.concat(all_tables, ignore_index=True)
        out_path_all = os.path.join(INTERP_DIR, "woe_bin_counts_all_checked_features.csv")
        full.to_csv(out_path_all, index=False)
        print(f"\n Zapisano zbiorczƒÖ tabelƒô liczno≈õci bin√≥w do: {out_path_all}")
    else:
        print("\n Nie uda≈Ço siƒô zbudowaƒá ≈ºadnej tabeli bin√≥w ‚Äì sprawd≈∫ nazwy cech i WoE.")



# ============================================================
#                RANKING CECH I CONTRIBUTION PLOT
# ============================================================

def plot_beta_importance(df_coef, top_n=9):
    """Wykres s≈Çupkowy top_n cech wg |beta|."""
    df_top = df_coef.head(top_n).iloc[::-1]  # od najmniejszej do najwiƒôkszej na osi Y

    plt.figure(figsize=(8, 6))
    colors = ["tab:red" if b > 0 else "tab:green" for b in df_top["beta"]]
    plt.barh(df_top["feature"], df_top["beta"], color=colors)
    plt.axvline(0, color="black", linewidth=1)
    plt.xlabel("Warto≈õƒá wsp√≥≈Çczynnika Œ≤")
    plt.title(f"Top {top_n} cech wg |Œ≤|")
    plt.tight_layout()

    out_path = os.path.join(WYKRESY_DIR, f"beta_importance_top{top_n}.png")
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"\n Zapisano wykres wa≈ºno≈õci cech: {out_path}")


def plot_contribution_for_top_case(df_coef, intercept, X_woe, y, logit):
    """Contribution plot dla obserwacji o najwy≈ºszym PD."""
    proba = logit.predict_proba(X_woe)[:, 1]
    top_idx = np.argmax(proba)

    x_row = X_woe.iloc[top_idx]
    beta = df_coef.set_index("feature")["beta"]

    contrib = x_row * beta
    df_contrib = contrib.to_frame("contribution")
    df_contrib["abs_contrib"] = df_contrib["contribution"].abs()
    df_contrib = df_contrib.sort_values("abs_contrib", ascending=False).head(15)
    df_contrib = df_contrib.iloc[::-1]  # do barh

    # logit i PD dla tej obserwacji
    logit_val = intercept + (x_row * beta).sum()
    pd_val = 1 / (1 + np.exp(-logit_val))
    base_pd = 1 / (1 + np.exp(-intercept))

    plt.figure(figsize=(8, 6))
    colors = ["tab:red" if c > 0 else "tab:green" for c in df_contrib["contribution"]]
    plt.barh(df_contrib.index, df_contrib["contribution"], color=colors)
    plt.axvline(0, color="black", linewidth=1)
    plt.xlabel("Wk≈Çad do logitu (Œ≤_j * x_j)")
    plt.title(
        f"Contribution plot ‚Äì top case (PD={pd_val:.3f}, base PD={base_pd:.3f})"
    )
    plt.tight_layout()

    out_path = os.path.join(WYKRESY_DIR, "contribution_top_case.png")
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f" Zapisano contribution plot: {out_path}")


# ============================================================
#                     PDP i ICE DLA TOP CECH
# ============================================================

def compute_pdp_ice_for_feature(X_woe, y, logit, feature,
                                grid_size=20, ice_samples=50, random_state=42):
    """
    Liczy PDP i ICE dla pojedynczej cechy:
      - PDP: ≈õrednie PD po zastƒÖpieniu danej cechy r√≥≈ºnymi warto≈õciami z gridu
      - ICE: dla wybranych obserwacji ≈õledzimy PD w funkcji tej cechy
    Zwraca (grid, pdp_values, ice_curves), gdzie:
      - grid: warto≈õci cechy
      - pdp_values: mean PD dla ka≈ºdego z gridu
      - ice_curves: lista np.array o d≈Çugo≈õci grid_size (ka≈ºda to krzywa dla 1 obserwacji)
    """
    rng = np.random.RandomState(random_state)
    x_vals = X_woe[feature].values

    # ograniczamy siƒô do "≈õrodka" rozk≈Çadu
    low, high = np.percentile(x_vals, [5, 95])
    grid = np.linspace(low, high, grid_size)

    # PDP
    pdp_values = []
    for v in grid:
        X_mod = X_woe.copy()
        X_mod[feature] = v
        pdp_values.append(logit.predict_proba(X_mod)[:, 1].mean())
    pdp_values = np.array(pdp_values)

    # ICE
    n_samples = min(ice_samples, len(X_woe))
    sample_idx = rng.choice(len(X_woe), size=n_samples, replace=False)

    ice_curves = []
    for idx in sample_idx:
        row = X_woe.iloc[idx:idx+1].copy()
        preds = []
        for v in grid:
            row_mod = row.copy()
            row_mod[feature] = v
            preds.append(logit.predict_proba(row_mod)[:, 1][0])
        ice_curves.append(np.array(preds))

    return grid, pdp_values, ice_curves


def plot_pdp(grid, pdp_values, feature):
    plt.figure(figsize=(7, 5))
    plt.plot(grid, pdp_values, "-o")
    plt.xlabel(f"{feature} (WoE)")
    plt.ylabel("≈örednie PD")
    plt.title(f"PDP ‚Äì {feature}")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()

    out_path = os.path.join(PDP_DIR, f"pdp_{feature}.png")
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"   ‚ûú zapisano PDP: {out_path}")


def plot_ice(grid, ice_curves, feature):
    plt.figure(figsize=(7, 5))
    for curve in ice_curves:
        plt.plot(grid, curve, alpha=0.2, color="tab:blue")
    plt.xlabel(f"{feature} (WoE)")
    plt.ylabel("PD")
    plt.title(f"ICE ‚Äì {feature}")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()

    out_path = os.path.join(ICE_DIR, f"ice_{feature}.png")
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"   ‚ûú zapisano ICE: {out_path}")


def generate_pdp_ice_for_top_features(df_coef, X_woe, y, logit, top_n=9):
    """PDP + ICE dla top_n cech wg |beta| (niezale≈ºnie od znaku)."""
    df_top = df_coef.head(top_n)
    print(f"\n PDP i ICE dla top {top_n} cech wg |beta|:")

    for feat in df_top["feature"]:
        print(f"   ‚Ä¢ {feat}")
        grid, pdp_vals, ice_curves = compute_pdp_ice_for_feature(
            X_woe, y, logit, feature=feat
        )
        plot_pdp(grid, pdp_vals, feat)
        plot_ice(grid, ice_curves, feat)



# ============================================================
#             LOKALNA INTERPRETACJA ‚Äì 9 PRZYPADK√ìW
# ============================================================

INTERP_LOCAL_DIR = os.path.join(INTERP_DIR, "interpretowalnosc_lokalna")
os.makedirs(INTERP_LOCAL_DIR, exist_ok=True)

LOGIT_PREPROC_PATH = os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl")


def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))


def load_logit_preproc():
    """
    ≈Åaduje pipeline preprocessingowy dla logitu (WoE).
    """
    if not os.path.exists(LOGIT_PREPROC_PATH):
        raise FileNotFoundError(f"Nie znaleziono pipeline'u logitu pod: {LOGIT_PREPROC_PATH}")
    return joblib.load(LOGIT_PREPROC_PATH)


def get_data_splits_for_local():
    """
    Odwzorowuje podzia≈Ç 60/20/20 u≈ºywany w projekcie.
    Zwraca: X_train, X_val, X_test, y_train, y_val, y_test
    """
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]

    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )
    return X_train, X_val, X_test, y_train, y_val, y_test


def transform_to_feature_df(preproc_logit, logit, X):
    """
    Przepuszcza X przez preproc_logit i zwraca DataFrame
    z kolumnami w tej samej kolejno≈õci, co feature_names_in_ modelu logit.
    """
    X_tr = preproc_logit.transform(X)

    if isinstance(X_tr, pd.DataFrame):
        # upewniamy siƒô, ≈ºe kolumny sƒÖ w tej samej kolejno≈õci
        return X_tr.loc[:, logit.feature_names_in_]

    # je≈õli np. jest to ndarray:
    return pd.DataFrame(X_tr, columns=logit.feature_names_in_)


def select_9_cases_evenly_by_pd(X_test_tr, y_test, logit):
    """
    Wybiera 9 obserwacji z testu, roz≈Ço≈ºonych r√≥wnomiernie po skali PD.
    Korzysta z kwantyli przewidzianych PD (0.05, 0.15, ..., 0.95).
    Zwraca listƒô indeks√≥w X_test (oryginalnych).
    """
    p_test = logit.predict_proba(X_test_tr)[:, 1]
    s = pd.Series(p_test, index=y_test.index)

    quantiles = np.linspace(0.05, 0.95, 9)
    selected_idx = []

    for q in quantiles:
        target = s.quantile(q)
        # obserwacja, kt√≥rej PD jest najbli≈ºej wybranego kwantyla
        idx = (s - target).abs().sort_values().index[0]
        # je≈õli ju≈º mamy tƒô obserwacjƒô, szukamy kolejnej najbli≈ºszej
        if idx in selected_idx:
            for alt_idx in (s - target).abs().sort_values().index:
                if alt_idx not in selected_idx:
                    idx = alt_idx
                    break
        selected_idx.append(idx)

    return selected_idx, s


def decompose_logit_for_case(idx, x_row, y_true, beta, intercept, feature_names):
    """
    Rozk≈Çada logit dla pojedynczej obserwacji na wk≈Çady cech.
    Zwraca:
      - df_contrib: DataFrame z wk≈Çadami dla wszystkich cech (do sortowania)
      - meta: dict z logitem, PD i y_true
    """
    x_vals = x_row.values.astype(float)
    beta_vals = beta.astype(float)

    contrib = x_vals * beta_vals
    logit_val = intercept + contrib.sum()
    pd_val = sigmoid(logit_val)

    df_contrib = pd.DataFrame({
        "feature": feature_names,
        "x_value": x_vals,
        "beta": beta_vals,
        "contribution": contrib,
    })
    df_contrib["abs_contribution"] = df_contrib["contribution"].abs()

    # sortujemy od najbardziej wp≈Çywowych cech
    df_contrib = df_contrib.sort_values("abs_contribution", ascending=False).reset_index(drop=True)

    meta = {
        "index": int(idx),
        "y_true": int(y_true),
        "logit": float(logit_val),
        "pd": float(pd_val),
    }
    return df_contrib, meta


def compute_local_decomposition_for_9_cases(logit, df_coef):
    """
    G≈Ç√≥wna funkcja:
      - ≈Çaduje preproc i dane,
      - wybiera 9 case'√≥w roz≈Ço≈ºonych po skali PD,
      - dla ka≈ºdego case'a rozk≈Çada logit na wk≈Çady cech,
      - zapisuje:
          * local_cases_meta.csv ‚Äì 9 wierszy (case_id, index, y_true, logit, pd)
          * local_cases_top10_contributions.csv ‚Äì top 9 cech dla ka≈ºdego case'a
    """
    print("\n Liczƒô lokalnƒÖ interpretacjƒô (9 przypadk√≥w)...")

    preproc_logit = load_logit_preproc()
    X_train, X_val, X_test, y_train, y_val, y_test = get_data_splits_for_local()
    X_test_tr = transform_to_feature_df(preproc_logit, logit, X_test)

    beta = logit.coef_.ravel()
    intercept = float(logit.intercept_[0])
    feature_names = np.array(logit.feature_names_in_)

    # wybieramy 9 przypadk√≥w
    selected_idx, pd_series = select_9_cases_evenly_by_pd(X_test_tr, y_test, logit)
    print(f"   Wybrane indeksy testu: {selected_idx}")

    meta_rows = []
    all_top10_rows = []

    for case_id, idx in enumerate(selected_idx, start=1):
        x_row = X_test_tr.loc[idx, :]
        y_true = y_test.loc[idx]

        df_contrib, meta = decompose_logit_for_case(
            idx=idx,
            x_row=x_row,
            y_true=y_true,
            beta=beta,
            intercept=intercept,
            feature_names=feature_names,
        )

        # ≈ÇƒÖczymy z globalnymi informacjami o wsp√≥≈Çczynnikach (np. abs_beta, sign)
        df_contrib = df_contrib.merge(
            df_coef[["feature", "abs_beta", "sign"]],
            how="left",
            on="feature",
        )

        # bierzemy top 9 cech
        df_top10 = df_contrib.head(9).copy()
        df_top10["case_id"] = case_id
        df_top10["original_index"] = int(idx)
        df_top10["rank"] = np.arange(1, len(df_top10) + 1)

        all_top10_rows.append(df_top10)

        meta["case_id"] = case_id
        meta["original_index"] = int(idx)
        meta_rows.append(meta)

    # zapis meta
    df_meta = pd.DataFrame(meta_rows)[
        ["case_id", "original_index", "y_true", "logit", "pd"]
    ]
    meta_path = os.path.join(INTERP_LOCAL_DIR, "local_cases_meta.csv")
    df_meta.to_csv(meta_path, index=False)
    print(f" Zapisano podsumowanie 9 przypadk√≥w ‚Üí {meta_path}")

    # zapis top10 contributions (long format)
    df_all_top10 = pd.concat(all_top10_rows, ignore_index=True)
    contrib_path = os.path.join(INTERP_LOCAL_DIR, "local_cases_top10_contributions.csv")
    df_all_top10.to_csv(contrib_path, index=False)
    print(f" Zapisano top 9 wk≈Çad√≥w cech dla 9 przypadk√≥w ‚Üí {contrib_path}")


# ============================================================
#                           MAIN
# ============================================================

def main():
    
    print(" ≈Åadowanie modelu logit (WoE)...")
    logit = load_logit_model()
    preproc_logit = load_logit_preprocessor()

    print(" Ekstrakcja wsp√≥≈Çczynnik√≥w...")
    df_coef, intercept = extract_coefficients(logit)

    summarize_signs(df_coef, intercept)
    save_coefficients(df_coef)

    print("\nTop 9 cech wg |beta|:")
    print(df_coef.head(9).to_string(index=False))

    print("\n Przygotowywanie danych (WoE)...")
    X_woe, y = load_and_prepare_data(preproc_logit, logit)

    # ---------- Profile WoE ----------
    generate_woe_profiles(df_coef, X_woe, y)

    # ---------- Ranking cech ----------
    plot_beta_importance(df_coef, top_n=9)

    # ---------- Contribution plot ----------
    plot_contribution_for_top_case(df_coef, intercept, X_woe, y, logit)

    # ---------- PDP + ICE ----------
    generate_pdp_ice_for_top_features(df_coef, X_woe, y, logit, top_n=9)

    print("\n Zako≈Ñczono generowanie wykres√≥w interpretowalno≈õci logitu.")
    
    diagnose_bin_sizes(df_coef, n_top=9, min_count=50)
    
    # Lokalna interpretacja ‚Äì 5 obserwacji
    compute_local_decomposition_for_9_cases(logit, df_coef)

if __name__ == "__main__":
    main()


In [None]:
#interpretowalnosc_logit/interpretowalnosc_lokalna/wykresy_interpretacji_lokalnej.py
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


# ============================================================
#           KONFIGURACJA ≈öCIE≈ªEK I FOLDER√ìW WYJ≈öCIOWYCH
# ============================================================

BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # .../interpretowalnosc_lokalna

META_PATH = os.path.join(BASE_DIR, "local_cases_meta.csv")
CONTRIB_PATH = os.path.join(BASE_DIR, "local_cases_top10_contributions.csv")

PLOTS_CASE_DIR = os.path.join(BASE_DIR, "wykresy_case")
PLOTS_GRID_DIR = os.path.join(BASE_DIR, "wykresy_zbiorcze")

os.makedirs(PLOTS_CASE_DIR, exist_ok=True)
os.makedirs(PLOTS_GRID_DIR, exist_ok=True)


# ============================================================
#                Wczytanie danych lokalnych
# ============================================================

def load_local_data():
    if not os.path.exists(META_PATH):
        raise FileNotFoundError(f"Nie znaleziono pliku meta: {META_PATH}")
    if not os.path.exists(CONTRIB_PATH):
        raise FileNotFoundError(f"Nie znaleziono pliku z wk≈Çadami: {CONTRIB_PATH}")

    df_meta = pd.read_csv(META_PATH)
    df_contrib = pd.read_csv(CONTRIB_PATH)

    # Upewniamy siƒô, ≈ºe case_id jest int
    df_meta["case_id"] = df_meta["case_id"].astype(int)
    df_contrib["case_id"] = df_contrib["case_id"].astype(int)

    return df_meta, df_contrib


# ============================================================
#         Wykresy pojedyncze ‚Äì top10 wk≈Çad√≥w dla case
# ============================================================

def plot_single_case_bar(df_case, meta_row, save_path):
    """
    Rysuje wykres s≈Çupkowy wk≈Çad√≥w cech do logitu dla pojedynczego case'a.
    df_case ‚Äì wiersze dla jednego case_id (top 9 cech),
    meta_row ‚Äì Series z informacjami: case_id, original_index, y_true, logit, pd
    """
    # sortujemy tak, aby najbardziej wp≈Çywowe cechy by≈Çy na g√≥rze
    df_plot = df_case.sort_values("abs_contribution", ascending=True).copy()

    features = df_plot["feature"]
    contrib = df_plot["contribution"]

    # kolor: czerwony dla dodatnich wk≈Çad√≥w, niebieski dla ujemnych
    colors = np.where(contrib >= 0, "#d62728", "#1f77b4")  # red / blue

    plt.figure(figsize=(8, 5))
    plt.barh(features, contrib, color=colors)
    plt.axvline(0, color="black", linewidth=1)

    case_id = int(meta_row["case_id"])
    idx = int(meta_row["original_index"])
    y_true = int(meta_row["y_true"])
    logit_val = meta_row["logit"]
    pd_val = meta_row["pd"]

    plt.title(
        f"Case {case_id} (idx={idx}, y={y_true})\n"
        f"logit={logit_val:.3f}, PD={pd_val:.3%}"
    )
    plt.xlabel("Wk≈Çad do logitu (beta * x)")
    plt.ylabel("Cecha")
    plt.grid(axis="x", alpha=0.3)
    plt.tight_layout()
    plt.savefig(save_path, dpi=150)
    plt.close()


def generate_per_case_plots(df_meta, df_contrib):
    """
    Generuje po jednym wykresie dla ka≈ºdego z 9 case'√≥w (top 9 cech).
    """
    for _, meta_row in df_meta.sort_values("case_id").iterrows():
        case_id = int(meta_row["case_id"])
        idx = int(meta_row["original_index"])

        df_case = df_contrib[df_contrib["case_id"] == case_id].copy()

        fname = f"case_{case_id}_idx_{idx}_top9_contributions.png"
        save_path = os.path.join(PLOTS_CASE_DIR, fname)

        plot_single_case_bar(df_case, meta_row, save_path)
        print(f"üíæ Zapisano wykres dla case {case_id} ‚Üí {save_path}")


# ============================================================
#     Zbiorczy wykres 3√ó3 ‚Äì 9 case'√≥w, kolor = gradient po PD
# ============================================================

def plot_grid_cases(df_meta, df_contrib, n_per_case=9):
    """
    Tworzy zbiorczy wykres 3x3:
      - ka≈ºdy subplot to top 10 wk≈Çad√≥w dla danego case'a,
      - kolor s≈Çupk√≥w jest kolorem z mapy barw zale≈ºnym od PD (gradient).
    """
    # przygotowanie mapy kolor√≥w po PD
    pd_vals = df_meta["pd"].values
    pd_min, pd_max = pd_vals.min(), pd_vals.max()
    norm = plt.Normalize(pd_min, pd_max)
    cmap = plt.cm.get_cmap("viridis")

    cases_sorted = df_meta.sort_values("case_id").reset_index(drop=True)
    n_cases = len(cases_sorted)

    n_rows, n_cols = 3, 3  # zak≈Çadamy 9 case'√≥w
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 12))
    axes = axes.ravel()

    for i, (_, meta_row) in enumerate(cases_sorted.iterrows()):
        ax = axes[i]
        case_id = int(meta_row["case_id"])
        idx = int(meta_row["original_index"])
        y_true = int(meta_row["y_true"])
        logit_val = meta_row["logit"]
        pd_val = meta_row["pd"]

        df_case = df_contrib[df_contrib["case_id"] == case_id].copy()
        df_case = df_case.sort_values("abs_contribution", ascending=True).tail(n_per_case)

        features = df_case["feature"]
        contrib = df_case["contribution"]

        # kolor ca≈Çego case'a ‚Äì jeden kolor z gradientu po PD
        color_case = cmap(norm(pd_val))

        ax.barh(features, contrib, color=color_case)
        ax.axvline(0, color="black", linewidth=0.8)

        ax.set_title(
            f"Case {case_id} (idx={idx}, y={y_true})\n"
            f"PD={pd_val:.2%}"
        )
        ax.set_xlabel("Wk≈Çad do logitu")
        ax.set_ylabel("Cecha")
        ax.grid(axis="x", alpha=0.3)

    # je≈õli case'√≥w mniej ni≈º 9, ukrywamy puste osie
    for j in range(i + 1, len(axes)):
        axes[j].axis("off")

    # dodajemy pasek koloru opisujƒÖcy PD
    cbar_ax = fig.add_axes([0.92, 0.15, 0.02, 0.7])
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])
    cbar = fig.colorbar(sm, cax=cbar_ax)
    cbar.set_label("PD (prawdopodobie≈Ñstwo defaultu)")

    fig.suptitle("Lokalna interpretacja ‚Äì top 10 wk≈Çad√≥w (9 przypadk√≥w)", fontsize=14)
    plt.tight_layout(rect=[0.03, 0.03, 0.9, 0.95])

    out_path = os.path.join(PLOTS_GRID_DIR, "grid_3x3_cases_top10_contributions.png")
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"üíæ Zapisano zbiorczy wykres 3x3 ‚Üí {out_path}")


# ============================================================
#                                MAIN
# ============================================================

def main():
    print("üìÇ Wczytywanie danych lokalnej interpretacji...")
    df_meta, df_contrib = load_local_data()

    print("üñºÔ∏è Rysujƒô wykresy pojedynczych case'√≥w...")
    generate_per_case_plots(df_meta, df_contrib)

    print("üñºÔ∏è Rysujƒô zbiorczy wykres 3x3 z gradientem po PD...")
    plot_grid_cases(df_meta, df_contrib, n_per_case=9)

    print("‚úÖ Gotowe ‚Äì lokalna interpretacja zwizualizowana.")


if __name__ == "__main__":
    main()


In [None]:
#modele_nieinterpretowalne.py
import os
import sys
import pandas as pd
import numpy as np
import joblib
import warnings
import re
import ast
warnings.filterwarnings("ignore")

from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    log_loss,
    brier_score_loss,
)
from sklearn.neural_network import MLPClassifier
from scipy.stats import ks_2samp, randint, uniform
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
import shap
import lime
import lime.lime_tabular
import matplotlib.pyplot as plt

# KONFIGURACJA SCIEZEK
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, ".."))

# ‚úÖ DODAJ TE LINIE - dodaj folder EDA do sys.path
EDA_DIR = os.path.join(PROJECT_ROOT, "EDA")
if EDA_DIR not in sys.path:
    sys.path.append(EDA_DIR)

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(PROJECT_ROOT, "EDA", "preprocesing_pipelines")
MODELS_DIR = os.path.join(BASE_DIR, "models_blackbox")
RESULTS_DIR = os.path.join(BASE_DIR, "blackbox_results")
SHAP_DIR = os.path.join(RESULTS_DIR, "shap_plots")
LIME_DIR = os.path.join(RESULTS_DIR, "lime_explanations")

os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(SHAP_DIR, exist_ok=True)
os.makedirs(LIME_DIR, exist_ok=True)
def calculate_ks_statistic(y_true, y_pred_proba):
    data = pd.DataFrame({"y": y_true, "p": y_pred_proba}).sort_values("p")
    pos_probs = data.loc[data["y"] == 1, "p"]
    neg_probs = data.loc[data["y"] == 0, "p"]
    if len(pos_probs) == 0 or len(neg_probs) == 0:
        return np.nan
    ks_stat, _ = ks_2samp(pos_probs, neg_probs)
    return ks_stat

def create_xgboost_grid():
    model = XGBClassifier(
        random_state=42,
        n_jobs=-1,
        eval_metric="logloss",
        use_label_encoder=False,
    )
    param_distributions = {
        "n_estimators": randint(100, 500),
        "max_depth": randint(3, 10),
        "learning_rate": uniform(0.01, 0.2),
        "min_child_weight": randint(1, 10),
        "subsample": uniform(0.6, 0.4),
        "colsample_bytree": uniform(0.6, 0.4),
        "reg_alpha": uniform(0, 1),
        "reg_lambda": uniform(0, 2),
        "scale_pos_weight": [1, 2, 3],
    }
    return model, param_distributions

def create_lightgbm_grid():
    model = LGBMClassifier(random_state=42, n_jobs=-1, verbose=-1)
    param_distributions = {
        "n_estimators": randint(100, 500),
        "num_leaves": randint(20, 100),
        "max_depth": randint(3, 10),
        "learning_rate": uniform(0.01, 0.2),
        "min_data_in_leaf": randint(20, 200),
        "feature_fraction": uniform(0.6, 0.4),
        "bagging_fraction": uniform(0.6, 0.4),
        "bagging_freq": [5],
        "lambda_l1": uniform(0, 1),
        "lambda_l2": uniform(0, 2),
        "scale_pos_weight": [1, 2, 3],
    }
    return model, param_distributions

def create_mlp_grid():
    model = MLPClassifier(
        random_state=42,
        max_iter=1000,
        early_stopping=True,
        validation_fraction=0.2,
    )
    param_distributions = {
        "hidden_layer_sizes": [(50,), (100,), (150,), (50, 25), (100, 50), (150, 75)],
        "activation": ["relu", "tanh"],
        "alpha": uniform(0.0001, 0.01),
        "learning_rate_init": uniform(0.001, 0.01),
        "batch_size": [32, 64, 128],
    }
    return model, param_distributions

def evaluate_model(model, X, y, model_name="Model", dataset_name="val"):
    y_pred_proba = model.predict_proba(X)[:, 1]
    return {
        "model_name": model_name,
        "dataset": dataset_name,
        "roc_auc": roc_auc_score(y, y_pred_proba),
        "pr_auc": average_precision_score(y, y_pred_proba),
        "ks": calculate_ks_statistic(y, y_pred_proba),
        "log_loss": log_loss(y, y_pred_proba),
        "brier": brier_score_loss(y, y_pred_proba),
    }

def print_evaluation_table(results):
    df = pd.DataFrame(results)
    print("\n" + "=" * 70)
    print(" WYNIKI MODELI BLACK-BOX (VAL + TEST)")
    print("=" * 70)
    print(df.to_string(index=False))
    print("=" * 70)
    return df

def train_with_randomized_search(model, param_distributions, X_train, y_train, model_name="Model", n_iter=50, cv=5):
    print("\n" + "=" * 80)
    print(f"Tuning {model_name} z RandomizedSearchCV")
    print("=" * 80)
    rs = RandomizedSearchCV(
        estimator=model,
        param_distributions=param_distributions,
        n_iter=n_iter,
        scoring="roc_auc",
        cv=cv,
        n_jobs=-1,
        verbose=1,
        random_state=42,
        return_train_score=True,
    )
    rs.fit(X_train, y_train)
    print("\nNajlepsze parametry:")
    print(rs.best_params_)
    print(f"Najlepszy ROC-AUC CV: {rs.best_score_:.4f}")
    cv_results = pd.DataFrame(rs.cv_results_)
    best_idx = rs.best_index_
    train_score = cv_results.loc[best_idx, "mean_train_score"]
    val_score = cv_results.loc[best_idx, "mean_test_score"]
    print(f"Train ROC-AUC: {train_score:.4f}, Val ROC-AUC: {val_score:.4f}")
    print(f"Overfitting gap: {train_score - val_score:.4f}")
    return rs.best_estimator_, rs

import re
import matplotlib.pyplot as plt

def generate_shap_explanations(model, X_train, X_test, feature_names, model_name):
    print(f"\nGenerowanie wyjasnien SHAP dla {model_name}...")

    # Poprawka base_score dla XGBoost, je≈õli potrzebna (je≈õli u≈ºywasz XGBoost)
    if hasattr(model, "get_booster"):
        booster = model.get_booster()
        base_score = booster.attr("base_score")
        if base_score is not None:
            if isinstance(base_score, str):
                try:
                    base_score = ast.literal_eval(base_score)
                except Exception:
                    pass
            if isinstance(base_score, (list, tuple, np.ndarray)):
                base_score = base_score[0]
            base_score = float(base_score)
            booster.set_param("base_score", base_score)

    import shap
    import matplotlib.pyplot as plt

    if hasattr(model, "get_booster") or hasattr(model, "booster_"):
        explainer = shap.TreeExplainer(model)
    else:
        background = shap.sample(X_train, 100)
        explainer = shap.KernelExplainer(model.predict_proba, background)

    shap_values = explainer.shap_values(X_test[:500])

    # Obs≈Çuga r√≥≈ºnych format√≥w shap_values (lista po klasach lub ndarray 3D)
    if isinstance(shap_values, list):
        shap_values_class = shap_values[1]  # wybierz SHAP dla klasy pozytywnej (indeks 1)
    elif len(shap_values.shape) == 3:
        shap_values_class = shap_values[..., 1]  # wybierz SHAP dla klasy pozytywnej
    else:
        shap_values_class = shap_values

    mean_abs_shap = np.abs(shap_values_class).mean(axis=0)
    top_features_idx = np.argsort(mean_abs_shap)[-3:][::-1]

    # wykres podsumowujƒÖcy (bar)
    plt.figure(figsize=(10, 6))
    shap.summary_plot(shap_values_class, X_test[:500], feature_names=feature_names, plot_type="bar", show=False)
    plt.tight_layout()
    plt.savefig(os.path.join(SHAP_DIR, f"{model_name}_summary_bar.png"), dpi=150)
    plt.close()

    # wykres typu beeswarm
    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values_class, X_test[:500], feature_names=feature_names, show=False)
    plt.tight_layout()
    plt.savefig(os.path.join(SHAP_DIR, f"{model_name}_beeswarm.png"), dpi=150)
    plt.close()

    # wykresy zale≈ºno≈õci dla 3 najwa≈ºniejszych cech
    for idx in top_features_idx:
        shap.dependence_plot(idx, shap_values_class, X_test[:500], feature_names=feature_names, interaction_index=None, show=False)
        plt.tight_layout()
        plt.savefig(os.path.join(SHAP_DIR, f"{model_name}_dependence_{feature_names[idx]}.png"), dpi=150)
        plt.close()

    print(f"Wykresy SHAP zapisane w {SHAP_DIR}")

    return explainer, shap_values_class


def generate_lime_explanations(model, X_train, X_test, y_test, feature_names, model_name, n_instances=5):
    print(f"\nGenerowanie wyjasnien LIME dla {model_name}...")
    
    # Upewnij siƒô, ≈ºe dane to dense numpy array
    if hasattr(X_train, "toarray"):
        X_train = X_train.toarray()
    if hasattr(X_test, "toarray"):
        X_test = X_test.toarray()
        
    if hasattr(X_train, "values"):
        X_train = X_train.values
    if hasattr(X_test, "values"):
        X_test = X_test.values
        
    # Upewnij siƒô, ≈ºe feature_names to lista
    feature_names = list(feature_names)

    explainer = lime.lime_tabular.LimeTabularExplainer(
        training_data=X_train,
        feature_names=feature_names,
        class_names=["No Default", "Default"],
        mode="classification",
        random_state=42,
        discretize_continuous=True
    )

    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    tp_idx = np.where((y_test == 1) & (y_pred == 1))[0]
    tn_idx = np.where((y_test == 0) & (y_pred == 0))[0]
    fp_idx = np.where((y_test == 0) & (y_pred == 1))[0]
    fn_idx = np.where((y_test == 1) & (y_pred == 0))[0]

    instances = []
    labels = []

    if len(tp_idx) > 0:
        instances.append(tp_idx[0])
        labels.append("True_Positive")
    if len(tn_idx) > 0:
        instances.append(tn_idx[0])
        labels.append("True_Negative")
    if len(fp_idx) > 0:
        instances.append(fp_idx[0])
        labels.append("False_Positive")
    if len(fn_idx) > 0:
        instances.append(fn_idx[0])
        labels.append("False_Negative")

    lime_explanations = []

    for i, (idx, label) in enumerate(zip(instances, labels)):
        # explain_instance wymaga pojedynczej instancji jako 1D array
        exp = explainer.explain_instance(X_test[idx], model.predict_proba, num_features=10)
        
        exp.save_to_file(os.path.join(LIME_DIR, f"{model_name}_{label}_instance_{idx}.html"))
        
        fig = exp.as_pyplot_figure()
        fig.tight_layout()
        plt.savefig(os.path.join(LIME_DIR, f"{model_name}_{label}_instance_{idx}.png"), dpi=150, bbox_inches="tight")
        plt.close()

        lime_explanations.append({
            "instance_idx": idx,
            "label": label,
            "true_class": y_test[idx],
            "predicted_class": y_pred[idx],
            "predicted_proba": y_pred_proba[idx],
            "explanation": exp.as_list(),
        })

    lime_df = pd.DataFrame(lime_explanations)
    lime_df.to_csv(os.path.join(LIME_DIR, f"{model_name}_lime_explanations.csv"), index=False)
    print(f"Wyjasnienia LIME zapisane w {LIME_DIR}")

    return lime_explanations

def main():
    print("Wczytywanie danych...")
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42, stratify=y)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)
    print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")
    print("\nLadowanie pipeline preprocessingu dla modeli nieinterpretowalnych...")
    preproc = joblib.load(os.path.join(PREPROC_DIR, "preprocessing_blackbox.pkl"))
    print("\nTransformacja danych...")
    X_train_proc = preproc.transform(X_train)
    X_val_proc = preproc.transform(X_val)
    X_test_proc = preproc.transform(X_test)
    feature_names = preproc.get_feature_names_out()
    
    xgb_model, xgb_grid = create_xgboost_grid()
    best_xgb, rs_xgb = train_with_randomized_search(xgb_model, xgb_grid, X_train_proc, y_train, "XGBoost", n_iter=50, cv=5)
    
    lgbm_model, lgbm_grid = create_lightgbm_grid()
    best_lgbm, rs_lgbm = train_with_randomized_search(lgbm_model, lgbm_grid, X_train_proc, y_train, "LightGBM", n_iter=50, cv=5)
    
    mlp_model, mlp_grid = create_mlp_grid()
    best_mlp, rs_mlp = train_with_randomized_search(mlp_model, mlp_grid, X_train_proc, y_train, "MLP", n_iter=30, cv=5)
    
    results = []
    results.append(evaluate_model(best_xgb, X_val_proc, y_val, "XGBoost", "val"))
    results.append(evaluate_model(best_xgb, X_test_proc, y_test, "XGBoost", "test"))
    results.append(evaluate_model(best_lgbm, X_val_proc, y_val, "LightGBM", "val"))
    results.append(evaluate_model(best_lgbm, X_test_proc, y_test, "LightGBM", "test"))
    results.append(evaluate_model(best_mlp, X_val_proc, y_val, "MLP", "val"))
    results.append(evaluate_model(best_mlp, X_test_proc, y_test, "MLP", "test"))
    df_results = print_evaluation_table(results)
    
    print("\n" + "=" * 80)
    print("GENEROWANIE WYSJASNIEN SHAP")
    print("=" * 80)
    shap_xgb = generate_shap_explanations(best_xgb, X_train_proc, X_test_proc, feature_names, "XGBoost")
    shap_lgbm = generate_shap_explanations(best_lgbm, X_train_proc, X_test_proc, feature_names, "LightGBM")
    shap_mlp = generate_shap_explanations(best_mlp, X_train_proc, X_test_proc, feature_names, "MLP")
    
    print("\n" + "=" * 80)
    print("GENEROWANIE WYJASNIEN LIME")
    print("=" * 80)
    lime_xgb = generate_lime_explanations(best_xgb, X_train_proc, X_test_proc, y_test.values, feature_names, "XGBoost")
    lime_lgbm = generate_lime_explanations(best_lgbm, X_train_proc, X_test_proc, y_test.values, feature_names, "LightGBM")
    lime_mlp = generate_lime_explanations(best_mlp, X_train_proc, X_test_proc, y_test.values, feature_names, "MLP")
    
    print("\nZapisywanie modeli i wynikow...")
    joblib.dump(best_xgb, os.path.join(MODELS_DIR, "best_xgboost.pkl"))
    joblib.dump(best_lgbm, os.path.join(MODELS_DIR, "best_lightgbm.pkl"))
    joblib.dump(best_mlp, os.path.join(MODELS_DIR, "best_mlp.pkl"))
    df_results.to_csv(os.path.join(RESULTS_DIR, "blackbox_evaluation_results.csv"), index=False)
    pd.DataFrame(rs_xgb.cv_results_).to_csv(os.path.join(RESULTS_DIR, "grid_results_xgboost.csv"), index=False)
    pd.DataFrame(rs_lgbm.cv_results_).to_csv(os.path.join(RESULTS_DIR, "grid_results_lightgbm.csv"), index=False)
    pd.DataFrame(rs_mlp.cv_results_).to_csv(os.path.join(RESULTS_DIR, "grid_results_mlp.csv"), index=False)
    
    print("Zapisano wszystkie modele black-box i wyniki.")
    print(f"\nWyniki w: {RESULTS_DIR}")
    print(f"Wykresy SHAP w: {SHAP_DIR}")
    print(f"Wyjasnienia LIME w: {LIME_DIR}")
    return best_xgb, best_lgbm, best_mlp, df_results

if __name__ == "__main__":
    main()

In [None]:
# kalibracja.py
import os
import sys
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import brier_score_loss
from sklearn.calibration import calibration_curve
from sklearn.model_selection import train_test_split
from scipy.optimize import brentq

warnings.filterwarnings("ignore")

# =============================================================================
# 1. KONFIGURACJA ≈öCIE≈ªEK
# =============================================================================
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, ".."))

# [FIX] Dodajemy folder EDA do sys.path, aby joblib widzia≈Ç definicje klas
EDA_DIR = os.path.join(PROJECT_ROOT, "EDA")
if EDA_DIR not in sys.path:
    sys.path.insert(0, EDA_DIR)

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")
PREPROC_DIR = os.path.join(PROJECT_ROOT, "EDA", "preprocesing_pipelines")
MODELS_INTERP_DIR = os.path.join(PROJECT_ROOT, "Modele_interpretowalne", "models")
MODELS_BLACKBOX_DIR = os.path.join(PROJECT_ROOT, "Modele_nieinterpretowalne", "models_blackbox")

OUTPUT_DIR = CURRENT_DIR
IMG_DIR = os.path.join(OUTPUT_DIR, "plots_separate") # Nowy folder na oddzielne wykresy
os.makedirs(IMG_DIR, exist_ok=True)

TARGET_MEAN_PD = 0.04

# =============================================================================
# 2. KLASY KALIBRATOR√ìW
# =============================================================================

class BetaCalibration(BaseEstimator, ClassifierMixin):
    def __init__(self):
        self.lr = LogisticRegression(C=999999999, solver='lbfgs')

    def fit(self, X_probs, y):
        eps = 1e-15
        p = np.clip(X_probs, eps, 1 - eps)
        l_p = np.log(p)
        l_1_p = -np.log(1 - p)
        X_trans = np.column_stack([l_p, l_1_p])
        self.lr.fit(X_trans, y)
        return self

    def predict_proba(self, X_probs):
        eps = 1e-15
        p = np.clip(X_probs, eps, 1 - eps)
        l_p = np.log(p)
        l_1_p = -np.log(1 - p)
        X_trans = np.column_stack([l_p, l_1_p])
        return self.lr.predict_proba(X_trans)


class CalibrationInTheLarge(BaseEstimator, ClassifierMixin):
    def __init__(self, target_mean=0.04):
        self.target_mean = target_mean
        self.delta = 0.0

    def fit(self, X_probs, y=None):
        eps = 1e-15
        p = np.clip(X_probs, eps, 1 - eps)
        logits = np.log(p / (1 - p))

        def objective(delta):
            shifted_logits = logits + delta
            shifted_probs = 1 / (1 + np.exp(-shifted_logits))
            return np.mean(shifted_probs) - self.target_mean

        try:
            self.delta = brentq(objective, -10, 10)
        except ValueError:
            self.delta = 0.0
        return self

    def predict_proba(self, X_probs):
        eps = 1e-15
        p = np.clip(X_probs, eps, 1 - eps)
        logits = np.log(p / (1 - p))
        shifted_logits = logits + self.delta
        new_probs = 1 / (1 + np.exp(-shifted_logits))
        return np.column_stack([1 - new_probs, new_probs])

# =============================================================================
# 3. FUNKCJE POMOCNICZE - METRYKI I WYKRESY (ZMODYFIKOWANE)
# =============================================================================

def compute_metrics(y_true, y_prob, model_name="Model"):
    n = len(y_true)
    base_prob = np.mean(y_true)
    brier = brier_score_loss(y_true, y_prob)
    
    n_bins = 10
    bins = np.linspace(0, 1, n_bins + 1)
    bin_indices = np.digitize(y_prob, bins) - 1
    
    reliability = 0.0
    resolution = 0.0
    ece = 0.0
    
    for i in range(n_bins):
        mask = bin_indices == i
        count = np.sum(mask)
        if count > 0:
            prob_avg = np.mean(y_prob[mask])
            true_avg = np.mean(y_true[mask])
            reliability += count * (prob_avg - true_avg)**2
            resolution += count * (true_avg - base_prob)**2
            ece += np.abs(prob_avg - true_avg) * (count / n)
            
    reliability /= n
    resolution /= n
    
    try:
        df = pd.DataFrame({'y': y_true, 'p': y_prob})
        df['bucket'] = pd.qcut(df['p'], n_bins, duplicates='drop')
        ace = df.groupby('bucket').apply(lambda x: np.abs(x['p'].mean() - x['y'].mean())).mean()
    except:
        ace = np.nan

    return {
        "Method": model_name,
        "Avg_PD": np.mean(y_prob),
        "ECE": ece,
        "ACE": ace,
        "Brier": brier,
        "Rel": reliability,
        "Res": resolution
    }

def plot_single_reliability(y_true, y_prob, title, filename):
    """Generuje wykres reliability z histogramem na drugiej osi Y."""
    fig, ax1 = plt.subplots(figsize=(7, 6))

    # --- O≈ö LEWA (Reliability: 0.0 - 1.0) ---
    ax1.plot([0, 1], [0, 1], "k:", label="Perfect", alpha=0.6)
    
    frac_pos, mean_pred = calibration_curve(y_true, y_prob, n_bins=10)
    ax1.plot(mean_pred, frac_pos, "s-", label="Model", color='navy', linewidth=2, markersize=6)
    
    ax1.set_ylabel("Fraction of Positives (Reliability)", color='navy')
    ax1.set_ylim([-0.05, 1.05])
    ax1.tick_params(axis='y', labelcolor='navy')
    ax1.set_xlabel("Mean Predicted Probability")
    ax1.grid(True, alpha=0.3)

    # --- O≈ö PRAWA (Histogram: Liczebno≈õƒá) ---
    ax2 = ax1.twinx()  # Druga o≈õ wsp√≥≈ÇdzielƒÖca X
    
    # Rysujemy histogram z przezroczysto≈õciƒÖ
    ax2.hist(y_prob, range=(0, 1), bins=10, histtype="stepfilled", 
             color="gray", alpha=0.2, label="Distribution")
    
    ax2.set_ylabel("Count (Histogram)", color="gray")
    ax2.tick_params(axis='y', labelcolor="gray")
    
    # Legenda - ≈ÇƒÖczymy wpisy z obu osi
    lines_1, labels_1 = ax1.get_legend_handles_labels()
    lines_2, labels_2 = ax2.get_legend_handles_labels()
    ax1.legend(lines_1 + lines_2, labels_1 + labels_2, loc="upper left")

    plt.title(title)
    
    save_path = os.path.join(IMG_DIR, filename)
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()

def plot_single_histogram(y_prob, title, filename):
    """Generuje histogram dla pojedynczej serii danych."""
    plt.figure(figsize=(8, 5))
    
    plt.hist(y_prob, bins=50, alpha=0.7, color='steelblue', 
             edgecolor='black', label="PD Distribution", density=True)
    
    # Target line
    plt.axvline(TARGET_MEAN_PD, color='red', linestyle='--', linewidth=2, 
                label=f'Target {TARGET_MEAN_PD}')
    
    plt.title(title)
    plt.xlabel("Predicted Probability (PD)")
    plt.ylabel("Density")
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    save_path = os.path.join(IMG_DIR, filename)
    plt.savefig(save_path, dpi=150, bbox_inches='tight')
    plt.close()

def find_file(directory, pattern):
    if not os.path.exists(directory): return None
    files = os.listdir(directory)
    for f in files:
        if pattern.lower() in f.lower() and f.endswith('.pkl'):
            return os.path.join(directory, f)
    pkls = [f for f in files if f.endswith('.pkl')]
    if pkls: return os.path.join(directory, pkls[0])
    return None

# =============================================================================
# 4. MAIN
# =============================================================================

def main():
    print(">>> [1/6] Wczytywanie danych...")
    if not os.path.exists(DATA_PATH):
        raise FileNotFoundError(f"Brak pliku: {DATA_PATH}")
        
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]

    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42, stratify=y)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)
    
    print(">>> [2/6] Wczytywanie Preprocessingu...")
    
    # Logit
    path_pre_logit = os.path.join(PREPROC_DIR, "preprocessing_logit_woe.pkl")
    if os.path.exists(path_pre_logit):
        try:
            pre_logit = joblib.load(path_pre_logit)
            X_val_logit = pre_logit.transform(X_val)
            X_test_logit = pre_logit.transform(X_test)
        except Exception as e:
            print(f"![ERROR] Logit Preproc: {e}")
            return
    else:
        X_val_logit, X_test_logit = X_val, X_test

    # Blackbox
    path_pre_bb = os.path.join(PREPROC_DIR, "preprocessing_blackbox.pkl")
    if os.path.exists(path_pre_bb):
        try:
            pre_bb = joblib.load(path_pre_bb)
            X_val_bb = pre_bb.transform(X_val)
            X_test_bb = pre_bb.transform(X_test)
        except Exception as e:
            print(f"![ERROR] Blackbox Preproc: {e}")
            return
    else:
        X_val_bb, X_test_bb = X_val, X_test

    print(">>> [3/6] Wczytywanie Modeli...")
    
    # Logit
    path_logit = find_file(MODELS_INTERP_DIR, "logit") or find_file(MODELS_INTERP_DIR, "logistic")
    if path_logit:
        model_logit = joblib.load(path_logit)
        p_val_logit = model_logit.predict_proba(X_val_logit)[:, 1]
        p_test_logit = model_logit.predict_proba(X_test_logit)[:, 1]
    else:
        print("![ERROR] Brak modelu Logit.")
        p_val_logit, p_test_logit = np.zeros(len(y_val)), np.zeros(len(y_test))

    # XGBoost
    path_xgb = find_file(MODELS_BLACKBOX_DIR, "xgboost") or find_file(MODELS_BLACKBOX_DIR, "boost")
    if path_xgb:
        model_xgb = joblib.load(path_xgb)
        try:
            p_val_xgb = model_xgb.predict_proba(X_val_bb)[:, 1]
            p_test_xgb = model_xgb.predict_proba(X_test_bb)[:, 1]
        except:
            p_val_xgb = model_xgb.predict_proba(np.array(X_val_bb))[:, 1]
            p_test_xgb = model_xgb.predict_proba(np.array(X_test_bb))[:, 1]
    else:
        print("![ERROR] Brak modelu XGBoost.")
        p_val_xgb, p_test_xgb = np.zeros(len(y_val)), np.zeros(len(y_test))

    models_to_calibrate = [
        ("Logit", p_val_logit, p_test_logit),
        ("XGBoost", p_val_xgb, p_test_xgb)
    ]

    results_table = []

    print(">>> [4/6] Kalibracja...")

    for name, p_val, p_test in models_to_calibrate:
        if np.sum(p_val) == 0: continue

        print(f"   ... Przetwarzanie: {name}")
        
        # Definicja metod i predykcji
        methods_map = {}
        
        # 0. Original
        methods_map["Original"] = p_test
        results_table.append(compute_metrics(y_test, p_test, f"{name}_Original"))
        
        # 1. Platt
        platt = LogisticRegression(C=99999, solver='lbfgs')
        platt.fit(p_val.reshape(-1, 1), y_val)
        p_test_platt = platt.predict_proba(p_test.reshape(-1, 1))[:, 1]
        methods_map["Platt"] = p_test_platt
        results_table.append(compute_metrics(y_test, p_test_platt, f"{name}_Platt"))
        
        # 2. Isotonic
        iso = IsotonicRegression(out_of_bounds='clip')
        iso.fit(p_val, y_val)
        p_test_iso = iso.predict(p_test)
        methods_map["Isotonic"] = p_test_iso
        results_table.append(compute_metrics(y_test, p_test_iso, f"{name}_Isotonic"))
        
        # 3. Beta
        beta = BetaCalibration()
        beta.fit(p_val, y_val)
        p_test_beta = beta.predict_proba(p_test)[:, 1]
        methods_map["Beta"] = p_test_beta
        results_table.append(compute_metrics(y_test, p_test_beta, f"{name}_Beta"))
        
        # 4. Iso + Large 4%
        p_val_iso = iso.predict(p_val)
        cal_large = CalibrationInTheLarge(target_mean=TARGET_MEAN_PD)
        cal_large.fit(p_val_iso)
        p_test_final = cal_large.predict_proba(p_test_iso)[:, 1]
        methods_map["Iso_Large4%"] = p_test_final
        results_table.append(compute_metrics(y_test, p_test_final, f"{name}_Iso+Large4%"))

        # >>> GENEROWANIE ODDZIELNYCH WYKRES√ìW <<<
        print(f"       Generowanie wykres√≥w w {IMG_DIR}...")
        for method_name, prob_arr in methods_map.items():
            # Bezpieczna nazwa pliku
            safe_method = method_name.replace(" ", "").replace("+", "_").replace("%", "")
            
            # 1. Reliability Curve
            plot_single_reliability(
                y_test, 
                prob_arr, 
                title=f"Reliability: {name} - {method_name}", 
                filename=f"rel_{name}_{safe_method}.png"
            )
            
            # 2. Histogram
            plot_single_histogram(
                prob_arr, 
                title=f"PD Hist: {name} - {method_name}", 
                filename=f"hist_{name}_{safe_method}.png"
            )

    print(">>> [5/6] Zapis tabeli wynik√≥w...")
    df_results = pd.DataFrame(results_table)
    cols = ["Avg_PD", "ECE", "ACE", "Brier", "Rel", "Res"]
    for c in cols:
        if c in df_results.columns:
            df_results[c] = df_results[c].round(5)
            
    csv_path = os.path.join(OUTPUT_DIR, "wyniki_kalibracji.csv")
    df_results.to_csv(csv_path, index=False)
    
    print("\n" + "="*80)
    print(df_results.to_string(index=False))
    print("="*80)
    print(f"Wykresy (ka≈ºdy osobno) zapisano w: {IMG_DIR}")
    print(">>> Zako≈Ñczono.")

if __name__ == "__main__":
    main()


In [None]:
# ratingi.py

"""
Pipeline do:
- wczytania modeli (logit WoE + XGBoost),
- policzenia PD na train/val/test,
- zbudowania rating√≥w (AAA...CCC) na podstawie PD,
- wygenerowania tabel ratingowych i tabel decyzyjnych.

Zak≈Çadamy, ≈ºe wej≈õciowe modele zwracajƒÖ ju≈º "PD" (docelowo: skalibrowane).
Na razie mo≈ºna u≈ºywaƒá PD z niekalibrowanego logitu.
"""
import matplotlib.pyplot as plt

import os
import numpy as np
import pandas as pd
import joblib

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

# ============================================================
#                   KONFIGURACJA ≈öCIE≈ªEK
# ============================================================

BASE_DIR = os.path.dirname(os.path.abspath(__file__))        # .../IWUM-Projekt-1/Ratingi
PROJECT_ROOT = os.path.abspath(os.path.join(BASE_DIR, "..")) # .../IWUM-Projekt-1

DATA_PATH = os.path.join(PROJECT_ROOT, "zbi√≥r_7.csv")

# interpretowalny logit + jego preproc WoE
LOGIT_MODEL_PATH = os.path.join(
    PROJECT_ROOT,
    "Modele_interpretowalne",
    "models",
    "best_logistic_regression_woe.pkl",
)
LOGIT_PREPROC_PATH = os.path.join(
    PROJECT_ROOT,
    "EDA",
    "preprocesing_pipelines",
    "preprocessing_logit_woe.pkl",
)

# black-box XGBoost (tu przyjmujƒô strukturƒô podobnƒÖ jak w repo)
XGB_MODEL_PATH = os.path.join(
    PROJECT_ROOT,
    "Modele_nieinterpretowalne",
    "models_blackbox",
    "best_xgboost.pkl",
)
# TODO: je≈õli XGBoost ma sw√≥j pipeline/preproc, dodaj tu ≈õcie≈ºkƒô:
XGB_PREPROC_PATH = os.path.join(
    PROJECT_ROOT,
    "EDA",
    "preprocesing_pipelines",
    "preprocessing_blackbox.pkl",  
)

RESULTS_DIR = os.path.join(BASE_DIR, "rating_results")
os.makedirs(RESULTS_DIR, exist_ok=True)

# Nazwy rating√≥w ‚Äì rosnƒÖce ryzyko (AAA = najlepszy, CCC = najgorszy)
RATING_LABELS = ["AAA", "AA", "A", "BBB", "BB", "B", "CCC"]


# ============================================================
#                     Wczytanie danych
# ============================================================

def load_data():
    """
    Wczytuje pe≈Çny zbi√≥r i robi podzia≈Ç 60/20/20 (train/val/test),
    sp√≥jny z resztƒÖ projektu.
    """
    df = pd.read_csv(DATA_PATH)
    X = df.drop(columns=["default"])
    y = df["default"]

    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, random_state=42, stratify=y
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )

    return X_train, y_train, X_val, y_val, X_test, y_test


def load_models():
    """
    ≈Åaduje:
    - logit interpretowalny + pipeline WoE
    - XGBoost + pipeline 
    """
    # logit
    logit_model = joblib.load(LOGIT_MODEL_PATH)
    logit_preproc = joblib.load(LOGIT_PREPROC_PATH)

    # XGBoost 
    if os.path.exists(XGB_MODEL_PATH):
        xgb_model = joblib.load(XGB_MODEL_PATH)
    else:
        xgb_model = None

    if os.path.exists(XGB_PREPROC_PATH):
        xgb_preproc = joblib.load(XGB_PREPROC_PATH)
    else:
        xgb_preproc = None

    return logit_model, logit_preproc, xgb_model, xgb_preproc


# ============================================================
#                Predykcja PD dla modeli
# ============================================================

def predict_pd_logit(logit_model, logit_preproc, X):
    """
    Zwraca przewidywane PD dla logitu.
    """
    X_tr = logit_preproc.transform(X)
    pd_hat = logit_model.predict_proba(X_tr)[:, 1]
    return pd_hat


def predict_pd_xgb(xgb_model, xgb_preproc, X):
    """
    Zwraca przewidywane PD dla XGBoost.
    """
    if xgb_model is None:
        return None

    if xgb_preproc is not None:
        X_tr = xgb_preproc.transform(X)
    else:
        X_tr = X

    if hasattr(xgb_model, "predict_proba"):
        pd_hat = xgb_model.predict_proba(X_tr)[:, 1]
    else:
        # niekt√≥re implementacje zwracajƒÖ bezpo≈õrednio PD
        pd_hat = xgb_model.predict(X_tr)
    return pd_hat


# ============================================================
#           Budowa rating√≥w na podstawie PD
# ============================================================

def build_rating_bins_by_quantiles(pd_train, n_classes=7):
    """
    Wyznacza progi rating√≥w na podstawie kwantyli PD z TRAIN.

    Zwraca tablicƒô krawƒôdzi [b0, b1, ..., b_n], gdzie:
    - b0 = 0.0
    - b_n = 1.0
    """
    quantiles = np.linspace(0, 1, n_classes + 1)
    bin_edges = np.quantile(pd_train, quantiles)

    # upewniamy siƒô, ≈ºe zakres jest ca≈Çy [0,1]
    bin_edges[0] = 0.0
    bin_edges[-1] = 1.0

    # ma≈Çe zabezpieczenie przed duplikatami prog√≥w
    bin_edges = np.unique(bin_edges)
    if len(bin_edges) - 1 < n_classes:
        # je≈õli duplikaty, mamy mniej "slot√≥w" ratingowych,
        # wiƒôc skracamy listƒô RATING_LABELS przy mapowaniu
        print("‚ö†Ô∏è Ostrze≈ºenie: duplikujƒÖce siƒô progi rating√≥w (ma≈Ço zr√≥≈ºnicowane PD).")
    return bin_edges


def assign_ratings(pd_hat, bin_edges, labels):
    """
    Przypisuje ratingi na podstawie PD i prog√≥w.

    pd_hat   : wektor PD
    bin_edges: krawƒôdzie przedia≈Ç√≥w (rosnƒÖce)
    labels   : list[str], np. ["AAA", "AA", ..., "CCC"]

    Zwraca Series dtype=category.
    """
    # je≈õli z powodu duplikat√≥w prog√≥w mamy mniej przedzia≈Ç√≥w
    n_intervals = len(bin_edges) - 1
    if n_intervals != len(labels):
        labels = labels[:n_intervals]

    ratings = pd.cut(
        pd_hat,
        bins=bin_edges,
        labels=labels,
        right=False,   # lewostronnie domkniƒôte: [b_i, b_{i+1})
        include_lowest=True,
    )
    return ratings


def rating_summary(y_true, pd_hat, ratings, model_name, dataset_name):
    """
    Podsumowanie rating√≥w:
    - liczebno≈õƒá
    - liczba bad
    - bad rate
    - ≈õrednie PD

    Zwraca DataFrame + wypisuje na ekran.
    """
    df = pd.DataFrame({
        "y": y_true,
        "pd": pd_hat,
        "rating": ratings,
    })

    summary = (
        df.groupby("rating")
          .agg(
              n_obs=("y", "size"),
              n_bad=("y", "sum"),
              bad_rate=("y", "mean"),
              avg_pd=("pd", "mean"),
          )
          .reset_index()
    )

    print("\n" + "=" * 70)
    print(f"RATING SUMMARY ‚Äì {model_name} ‚Äì {dataset_name}")
    print("=" * 70)
    print(summary.to_string(index=False))
    print("=" * 70)

    return summary


# ============================================================
#          Funkcje do prog√≥w decyzyjnych / tabel decyzyjnych
# ============================================================


profit_good_accepted=0.15   # +15% na dobrym kredycie
loss_bad_accepted=0.5      # -50% na z≈Çym kredycie
cost_reject_good= 0.06     # utrata ~40% potencjalnego zysku
profit_reject_bad=0.2      # unikniƒôcie 40% potencjalnej straty


def expected_profit(
    y_true,
    pd_hat,
    threshold,
    profit_good_accepted=0.15,   # +15% na dobrym kredycie
    loss_bad_accepted=0.50,      # -50% na z≈Çym kredycie
    frac_aux=0.4                 # u≈Çamek dla utraconego zysku / unikniƒôtej straty
):
    """
    Liczy oczekiwany zysk portfela dla danego progu PD.

    Znaczenie:
    - y_true = 0 -> dobry klient
    - y_true = 1 -> z≈Çy klient (default)
    - akceptujemy je≈õli PD <= threshold

    Przypadki:
    - good & accepted   -> +profit_good_accepted
    - bad  & accepted   -> -loss_bad_accepted
    - good & rejected   -> cost_reject_good  (ujemny)
    - bad  & rejected   -> profit_reject_bad (dodatni)
    """

    cost_reject_good = -frac_aux * profit_good_accepted   # np. -0.06
    profit_reject_bad = frac_aux * loss_bad_accepted      # np. +0.20

    y_true = np.asarray(y_true)
    pd_hat = np.asarray(pd_hat)

    accept = pd_hat <= threshold
    reject = ~accept

    good = (y_true == 0)
    bad  = (y_true == 1)

    n_A_good = np.sum(accept & good)
    n_A_bad  = np.sum(accept & bad)
    n_R_good = np.sum(reject & good)
    n_R_bad  = np.sum(reject & bad)

    return (
        n_A_good * profit_good_accepted
        - n_A_bad  * loss_bad_accepted
        + n_R_bad  * profit_reject_bad
        + n_R_good * cost_reject_good
    )


def decision_table(y_true, pd_hat, thresholds):
    """
    Buduje tabelƒô decyzyjnƒÖ dla r√≥≈ºnych prog√≥w PD:
    - udzia≈Ç zaakceptowanych / odrzuconych
    - bad rate w portfelu zaakceptowanym / odrzuconym
    - liczby TP, FP, FN, TN

    Zwraca DataFrame.
    """
    rows = []
    n = len(y_true)

    for thr in thresholds:
        y_pred = (pd_hat <= thr).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        accepted = tp + fp
        rejected = tn + fn

        row = {
            "threshold": thr,
            "accept_rate": accepted / n,
            "reject_rate": rejected / n,
            "bad_rate_accepted": fp / accepted if accepted > 0 else np.nan,
            "bad_rate_rejected": fn / rejected if rejected > 0 else np.nan,
            "TP": tp,
            "FP": fp,
            "FN": fn,
            "TN": tn,
        }
        rows.append(row)

    df = pd.DataFrame(rows)
    return df

def build_cost_curve(
    y_true,
    pd_hat,
    thresholds,
    model_name="Model",
    save_path=None,
    profit_good_accepted=0.15,
    loss_bad_accepted=0.50,
    frac_aux=0.4
):
    """
    Buduje cost curve: pr√≥g PD -> oczekiwany zysk.
    """

    profits = []
    for thr in thresholds:
        prof = expected_profit(
            y_true,
            pd_hat,
            thr,
            profit_good_accepted=profit_good_accepted,
            loss_bad_accepted=loss_bad_accepted,
            frac_aux=frac_aux,
        )
        profits.append(prof)

    curve_df = pd.DataFrame({
        "threshold": thresholds,
        "expected_profit": profits,
    })

    if save_path is not None:
        plt.figure()
        plt.plot(thresholds, profits, marker="o")
        plt.xlabel("Pr√≥g PD (akceptujemy je≈õli PD ‚â§ pr√≥g)")
        plt.ylabel("Oczekiwany zysk (jednostki umowne)")
        plt.title(f"Cost curve ‚Äì {model_name}")
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(save_path, dpi=150)
        plt.close()

    return curve_df


# ============================================================
#                           MAIN
# ============================================================

def main():
    # 1. Dane
    X_train, y_train, X_val, y_val, X_test, y_test = load_data()

    # 2. Modele
    logit_model, logit_preproc, xgb_model, xgb_preproc = load_models()

    # 3. PD z logitu (tu docelowo mo≈ºesz wstawiƒá PD po kalibracji)
    pd_train_logit = predict_pd_logit(logit_model, logit_preproc, X_train)
    pd_val_logit   = predict_pd_logit(logit_model, logit_preproc, X_val)
    pd_test_logit  = predict_pd_logit(logit_model, logit_preproc, X_test)

    # 4. (opcjonalnie) PD z XGBoost
    if xgb_model is not None:
        pd_train_xgb = predict_pd_xgb(xgb_model, xgb_preproc, X_train)
        pd_val_xgb   = predict_pd_xgb(xgb_model, xgb_preproc, X_val)
        pd_test_xgb  = predict_pd_xgb(xgb_model, xgb_preproc, X_test)
    else:
        pd_train_xgb = pd_val_xgb = pd_test_xgb = None

    # 5. Budowa prog√≥w ratingowych na podstawie PD z TRAIN (logit)
    bin_edges = build_rating_bins_by_quantiles(
        pd_train_logit,
        n_classes=len(RATING_LABELS)
    )

    # 6. Przypisanie rating√≥w dla logitu
    ratings_train_logit = assign_ratings(pd_train_logit, bin_edges, RATING_LABELS)
    ratings_val_logit   = assign_ratings(pd_val_logit,   bin_edges, RATING_LABELS)
    ratings_test_logit  = assign_ratings(pd_test_logit,  bin_edges, RATING_LABELS)

    # 7. Podsumowania ratingowe (logit)
    summary_train_logit = rating_summary(
        y_train, pd_train_logit, ratings_train_logit,
        model_name="Logit_WoE",
        dataset_name="TRAIN",
    )
    summary_val_logit = rating_summary(
        y_val, pd_val_logit, ratings_val_logit,
        model_name="Logit_WoE",
        dataset_name="VAL",
    )
    summary_test_logit = rating_summary(
        y_test, pd_test_logit, ratings_test_logit,
        model_name="Logit_WoE",
        dataset_name="TEST",
    )

    # 8. (opcjonalnie) te same ratingi dla XGBoost ‚Äì u≈ºywamy TYCH SAMYCH prog√≥w PD
    if pd_train_xgb is not None:
        ratings_train_xgb = assign_ratings(pd_train_xgb, bin_edges, RATING_LABELS)
        ratings_val_xgb   = assign_ratings(pd_val_xgb,   bin_edges, RATING_LABELS)
        ratings_test_xgb  = assign_ratings(pd_test_xgb,  bin_edges, RATING_LABELS)

        summary_train_xgb = rating_summary(
            y_train, pd_train_xgb, ratings_train_xgb,
            model_name="XGBoost",
            dataset_name="TRAIN",
        )
        summary_val_xgb = rating_summary(
            y_val, pd_val_xgb, ratings_val_xgb,
            model_name="XGBoost",
            dataset_name="VAL",
        )
        summary_test_xgb = rating_summary(
            y_test, pd_test_xgb, ratings_test_xgb,
            model_name="XGBoost",
            dataset_name="TEST",
        )
    else:
        summary_train_xgb = summary_val_xgb = summary_test_xgb = None

    # 9. Tabele decyzyjne dla logitu (np. na WALIDACJI)
    thresholds = np.linspace(0.02, 0.98, 50)  # zakres PD do analizy
    decision_val_logit = decision_table(y_val, pd_val_logit, thresholds)
    decision_test_logit = decision_table(y_test, pd_test_logit, thresholds)

    print("\nDECISION TABLE ‚Äì Logit ‚Äì VAL")
    print(decision_val_logit.to_string(index=False))

    print("\nDECISION TABLE ‚Äì Logit ‚Äì TEST")
    print(decision_test_logit.to_string(index=False))

     # 9b. Tabele decyzyjne dla XGBoost (je≈õli model istnieje)
    if pd_val_xgb is not None:
        decision_val_xgb = decision_table(y_val, pd_val_xgb, thresholds)
        decision_test_xgb = decision_table(y_test, pd_test_xgb, thresholds)

        print("\nDECISION TABLE ‚Äì XGBoost ‚Äì VAL")
        print(decision_val_xgb.to_string(index=False))

        print("\nDECISION TABLE ‚Äì XGBoost ‚Äì TEST")
        print(decision_test_xgb.to_string(index=False))
    else:
        decision_val_xgb = decision_test_xgb = None

    # 9c. Cost curves ‚Äì logit
    cost_curve_val_logit = build_cost_curve(
        y_val, pd_val_logit, thresholds,
        model_name="Logit_WoE",
        save_path=os.path.join(RESULTS_DIR, "cost_curve_logit_val.png"),
    )
    cost_curve_test_logit = build_cost_curve(
        y_test, pd_test_logit, thresholds,
        model_name="Logit_WoE",
        save_path=os.path.join(RESULTS_DIR, "cost_curve_logit_test.png"),
    )

    # Cost curves ‚Äì XGBoost (je≈õli jest)
    if pd_val_xgb is not None:
        cost_curve_val_xgb = build_cost_curve(
            y_val, pd_val_xgb, thresholds,
            model_name="XGBoost",
            save_path=os.path.join(RESULTS_DIR, "cost_curve_xgb_val.png"),
        )
        cost_curve_test_xgb = build_cost_curve(
            y_test, pd_test_xgb, thresholds,
            model_name="XGBoost",
            save_path=os.path.join(RESULTS_DIR, "cost_curve_xgb_test.png"),
        )

    # 10. Zapis wynik√≥w do CSV (≈ºeby mo≈ºna by≈Ço wciƒÖgnƒÖƒá do raportu / Excela)
    summary_train_logit.to_csv(
        os.path.join(RESULTS_DIR, "rating_summary_logit_train.csv"),
        index=False,
    )
    summary_val_logit.to_csv(
        os.path.join(RESULTS_DIR, "rating_summary_logit_val.csv"),
        index=False,
    )
    summary_test_logit.to_csv(
        os.path.join(RESULTS_DIR, "rating_summary_logit_test.csv"),
        index=False,
    )

    decision_val_logit.to_csv(
        os.path.join(RESULTS_DIR, "decision_table_logit_val.csv"),
        index=False,
    )
    decision_test_logit.to_csv(
        os.path.join(RESULTS_DIR, "decision_table_logit_test.csv"),
        index=False,
    )

    if summary_train_xgb is not None:
        summary_train_xgb.to_csv(
            os.path.join(RESULTS_DIR, "rating_summary_xgb_train.csv"),
            index=False,
        )
        summary_val_xgb.to_csv(
            os.path.join(RESULTS_DIR, "rating_summary_xgb_val.csv"),
            index=False,
        )
        summary_test_xgb.to_csv(
            os.path.join(RESULTS_DIR, "rating_summary_xgb_test.csv"),
            index=False,
        )
        if decision_val_xgb is not None:
            decision_val_xgb.to_csv(
                os.path.join(RESULTS_DIR, "decision_table_xgb_val.csv"),
                index=False,
            )
            decision_test_xgb.to_csv(
                os.path.join(RESULTS_DIR, "decision_table_xgb_test.csv"),
                index=False,
            )

    print("\nZapisano tabele ratingowe i decyzyjne do:", RESULTS_DIR)


if __name__ == "__main__":
    main()
