© 2025 Vanargo · License: MIT. See the `LICENSE` file in the repository root.

# --- Fairness & Explainability: setup --- #

**Goal.** Evaluate the fairness and quality of the final model on the test set, choose an operating threshold `t*` under fairness constraints, check group-wise calibration, and explain feature contributions.

**Inputs (from `01_data_loading_and_eda.ipynb` / `02_modeling.ipynb`):**
1. `X_test_enc`, `y_true_test`, `feature_names` - prepared in `02_modeling.ipynb`.
2. `y_proba_best` - test predictions from the best model.
3. `X_test_sens` - sensitive features (`sex`, `race`, `age_group`).

**Outputs (to `data/reports/figures_03/`, `data/artifacts`):**
1. Threshold sweep and Pareto: `accuracy_f1_vs_threshold.png`, `dp_vs_threshold.png`, `Pareto_f1_vs_dp.png`.
2. Group metrics at `t=0.5` and `t*`: CSV tables and bar charts.
3. Group calibration: `calibration_*.png` and aggregate ECE.
4. Explainability: top features (SHAP/permutation), dependence plots.
5. Auxiliary artifacts: selected `t*`, post-processing parameters (if `ThresholdOptimizer` applied).

**Notebook contents:**
1. Sanity checks: shapes, NaNs, `y_proba` distribution.
2. Baseline metrics at `t=0.5`.
3. Group fairness at `t=0.5`.
4. Threshold sweep -> select `t*` (quality <-> fairness trade-off).
5. Post-processing (`ThresholdOptimizer`: `demographic_parity` / `equalized_odds`).
6. Probability calibration by groups (reliability curves, ECE).
7. Explainability: global and local.
8. Risks, limitations, recommendations. Artifact export.

**Environment and paths requirements:**
1. `ROOT`, `ART_DIR`, `REPORTS_DIR`, `FIG_DIR_03 = REPORTS_DIR / 'figures_03'` must exist.
2. All paths and objects are loaded via the unified artifact loader from `02_modeling.ipynb`.

In [None]:
# --- Project paths bootstrаp --- #

from pathlib import Path

# attempt to use the standard project paths module #
try:
    from paths import (
        ART_DIR,
        DATA_DIR,
        INT_DIR,
        MODELS_DIR,
        NB_DIR,
        PROC_DIR,
        RAW_DIR,
        REPORTS_DIR,
        ROOT,
    )

    print(f"[paths] ROOT = {ROOT}")
except Exception as e:
    # fallback: minimal path setup without side effects
    ROOT = Path.cwd()
    DATA_DIR = ROOT / "data"
    RAW_DIR = ROOT / "raw"
    INT_DIR = ROOT / "interim"
    PROC_DIR = ROOT / "processed"
    ART_DIR = ROOT / "artifacts"
    REPORTS_DIR = ROOT / "reports"
    MODELS_DIR = ROOT / "models"
    NB_DIR = ROOT / "notebooks"
    print(f"[paths:fallback] ROOT = {ROOT} | reason: {e!r}")

In [None]:
# --- Fairness plotting utils & output dirs --- #

import matplotlib.pyplot as plt

# use paths.py; fallback if unavailable #
if "REPORTS_DIR" not in globals():
    try:
        from paths import REPORTS_DIR as _REPORTS_DIR
    except Exception:
        from pathlib import Path

        ROOT = globals().get("ROOT", Path.cwd())
        _REPORTS_DIR = ROOT / "data" / "reports"
    REPORTS_DIR = _REPORTS_DIR

FIG_DIR_03 = REPORTS_DIR / "figures_03"
FIG_DIR_03.mkdir(parents=True, exist_ok=True)


def _ensure_png(name: str) -> str:
    return name if name.lower().endswith(".png") else f"{name}.png"


def save_fig(name: str, fig=None, dpi: int = 200, close: bool = True):
    """
    Save a figure to `reports/figures_03/`
    Parameters:
        name: file name, may omit '.png';
        fig: matplotlib.figure.Figure or None -> current figure.
    """
    if not isinstance(name, str) or not name:
        raise TypeError('save_fig: "name" must be a non-empty figure')

    # import pyplot only when needed #
    if fig is None:
        import matplotlib.pyplot as plt

        fig = plt.gcf()
    else:
        if not hasattr(fig, "savefig"):
            raise TypeError('save_fig: "fig" must have a savefig method')

    fname = _ensure_png(name)
    path = FIG_DIR_03 / fname

    # save figure #
    fig.savefig(path, dpi=dpi, bbox_inches="tight")

    # close if requested #
    if close:
        try:
            import matplotlib.pyplot as plt

            plt.close(fig)
        except Exception:
            pass

    print(f"[saved] {path}")
    return Path

In [None]:
# --- Fairness & Explainability: setup --- #

import warnings

# centralized warning suppression #
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

FAIRLEARN_OK = False
THRESH_OPT_OK = False
SHAP_OK = False

# fairlearn and components #
try:
    from fairlearn.metrics import (
        demographic_parity_difference,
        equalized_odds_difference,
    )
    from fairlearn.postprocessing import ThresholdOptimizer

    FAIRLEARN_OK = True
    THRESH_OPT_OK = True
    print("[ok] fairlearn imported")
except Exception as e:
    print(f"[warn] fairlearn unavailable: {e!r}")

# SHAP (global explainability) #
try:
    import shap

    SHAP_OK = True
    print("[ok] shap imported.")
except Exception as e:
    print(f"[warn] shap unavailable: {e!r}")

# utility readiness flag for Explainability/Fairness #
EXPL_OK = FAIRLEARN_OK or SHAP_OK
print(f"[setup] FAIRLEARN_OK={FAIRLEARN_OK}, SHAP_OK={SHAP_OK}")

In [None]:
# --- Notebook preamble: silence & style --- #

# enable autoreload if IPython is available #
try:
    ip = get_ipython()
    if ip:
        ip.run_line_magic("load_ext", "autoreload")
        ip.run_line_magic("autoreload", "2")
except Exception:
    pass

import logging

# suppress moisy loggers #
for name in ("matplotlib", "numba"):
    try:
        logging.getLogger(name).setLevel(logging.ERROR)
    except Exception:
        pass

# unified plt style #
plt.rcParams.update(
    {
        "axes.spines.top": False,
        "axes.spines.right": False,
        "axes.grid": True,
        "grid.alpha": 0.25,
        "figure.figsize": (6.5, 4.0),
        "axes.titlesize": 13,
        "axes.labelsize": 11,
        "xtick.labelsize": 10,
        "ytick.labelsize": 10,
    }
)

print("[preamble] style set.")

In [None]:
# --- Paths & unified artifact loader --- #

# stdlib
import json
from collections.abc import Iterable
from pathlib import Path

# third-party
import joblib
import numpy as np
import pandas as pd

# first-party (with fallback to data/* structure)
try:
    from paths import ART_DIR, MODELS_DIR, REPORTS_DIR, ROOT as PATHS_ROOT  # noqa: I001
except Exception:
    # detect project root by marker files
    PATHS_ROOT = Path.cwd()
    _MARKERS = {".git", "pyproject.toml", "README.md"}
    while not any((PATHS_ROOT / m).exists() for m in _MARKERS) and PATHS_ROOT.parent != PATHS_ROOT:
        PATHS_ROOT = PATHS_ROOT.parent
    ART_DIR = PATHS_ROOT / "data" / "artifacts"
    MODELS_DIR = PATHS_ROOT / "data" / "models"
    REPORTS_DIR = PATHS_ROOT / "data" / "reports"

# derived locations #
_NB_DIR = PATHS_ROOT / "notebooks"
CANDIDATE_ART_DIRS = [ART_DIR, _NB_DIR / "data" / "artifacts"]
CANDIDATE_MODEL_DIRS = [MODELS_DIR, ART_DIR, _NB_DIR / "data" / "models"]
FIG_DIR_02 = REPORTS_DIR / "figures_02"
FIG_DIR_03 = REPORTS_DIR / "figures_03"


# helper: ensure length n for names #
def _ensure_len(cols: Iterable | None, n: int) -> list[str]:
    lst = list(cols) if isinstance(cols, list | tuple | set) else []
    if len(lst) == n:
        return [str(c) for c in lst]
    return [f"f{i}" for i in range(n)]


def _pick_file(names: list[str], dirs: list[Path]) -> Path | None:
    for d in dirs:
        for n in names:
            p = Path(d) / n
            if p.exists():
                return p
    return None


# loaders: core #
def load_feature_names():
    p = _pick_file(
        ["feature_names.npy", "feature_names.json", "feature_names.txt", "feature_names.csv"],
        CANDIDATE_ART_DIRS,
    )
    if p is None:
        return None, None
    if p.suffix == ".npy":
        return np.load(p, allow_pickle=True).tolist(), p
    if p.suffix == ".json":
        return json.loads(p.read_text(encoding="utf-8")), p
    if p.suffix == ".txt":
        return [
            line.strip() for line in p.read_text(encoding="utf-8").splitlines() if line.strip()
        ], p
    if p.suffix == ".csv":
        return pd.read_csv(p, header=None)[0].tolist(), p
    return None, None


def load_X_test_enc():
    for nm in ["X_test_enc.npy", "X_test_enc.csv", "X_test.npy"]:
        p = _pick_file([nm], CANDIDATE_ART_DIRS)
        if p:
            if p.suffix == ".npy":
                return np.load(p, allow_pickle=False), p
            if p.suffix == ".csv":
                return pd.read_csv(p).to_numpy(), p
    return None, None


def load_X_test_sens():
    p = _pick_file(
        ["X_test_sensitive.csv", "X_test_sens.csv", "X_test_sensitive.parquet"],
        CANDIDATE_ART_DIRS,
    )
    if p is None:
        return None, None
    if p.suffix == ".parquet":
        return pd.read_parquet(p), p
    if p.suffix == ".csv":
        return pd.read_csv(p), p
    return None, None


def load_y_true_pred_proba():
    y_true_p = _pick_file(["y_true_test.npy", "y_true.npy", "y_test.npy"], CANDIDATE_ART_DIRS)
    y_pred_p = _pick_file(["y_pred_best.npy", "y_pred.npy"], CANDIDATE_ART_DIRS)
    y_proba_p = _pick_file(["y_proba_best.npy", "y_proba.npy"], CANDIDATE_ART_DIRS)

    def _load(p):
        if p is None:
            return None
        if p.suffix == ".npy":
            return np.load(p, allow_pickle=False)
        if p.suffix == ".csv":
            return pd.read_csv(p, header=None).iloc[:, 0].to_numpy()
        return None

    return (_load(y_true_p), y_true_p, _load(y_pred_p), y_pred_p, _load(y_proba_p), y_proba_p)


def load_best_model():
    cand = _pick_file(
        [
            "model_best.joblib",
            "LGBM_best.joblib",
            "lgb_best.joblib",
            "XGBoost_ES_best.joblib",
            "XGBoost_ES.joblib",
        ],
        CANDIDATE_MODEL_DIRS,
    )
    if cand is None:
        return None, None
    try:
        return joblib.load(cand), cand
    except Exception as e:
        print(f"[warn] cannot load {cand}: {type(e).__name__}: {e}")
        return None, cand


# loaders: extras used in 03 #
def load_results_df():
    p = _pick_file(["results_df.csv"], CANDIDATE_ART_DIRS)
    return (pd.read_csv(p), p) if p and p.exists() else (None, None)


def load_results_summary():
    p = _pick_file(["results_summary.csv"], CANDIDATE_MODEL_DIRS)
    return (pd.read_csv(p), p) if p and p.exists() else (None, None)


def load_export_meta():
    p = _pick_file(["export_meta.json"], CANDIDATE_ART_DIRS)
    return (json.loads(p.read_text(encoding="utf-8")), p) if p and p.exists() else (None, None)


def load_test_groups():
    p = _pick_file(["test_groups.csv"], CANDIDATE_ART_DIRS)
    return (pd.read_csv(p), p) if p and p.exists() else (None, None)


def load_y_test_optional():
    p = _pick_file(["y_test.npy"], CANDIDATE_ART_DIRS)
    return (np.load(p, allow_pickle=False), p) if p and p.exists() else (None, None)


def load_y_pred_050_optional():
    p = _pick_file(["y_pred_050.npy"], CANDIDATE_ART_DIRS)
    return (np.load(p, allow_pickle=False), p) if p and p.exists() else (None, None)


def load_y_score_optional():
    p = _pick_file(["y_score.npy"], CANDIDATE_ART_DIRS)
    return (np.load(p, allow_pickle=False), p) if p and p.exists() else (None, None)


def list_figures02():
    if not FIG_DIR_02.exists():
        return []
    return sorted([p for p in FIG_DIR_02.glob("*.png")])


# unified summary #
def load_artifacts_summary():
    fn, p_fn = load_feature_names()
    Xte, p_xe = load_X_test_enc()
    Xsens, p_xs = load_X_test_sens()
    y_true, p_yt, y_pred, p_yp, y_proba, p_ypr = (*load_y_true_pred_proba(),)
    y_test_opt, p_ytest = load_y_test_optional()
    y_pred_050_opt, p_y050 = load_y_pred_050_optional()
    y_score_opt, p_ys = load_y_score_optional()
    results_df, p_resdf = load_results_df()
    results_sum, p_ressum = load_results_summary()
    meta, p_meta = load_export_meta()
    groups, p_groups = load_test_groups()
    mdl, p_m = load_best_model()
    figs02 = list_figures02()
    return {
        "feature_names": (fn, p_fn),
        "X_test_enc": (Xte, p_xe),
        "sensitive": (Xsens, p_xs),
        "y_true": (y_true, p_yt),
        "y_pred": (y_pred, p_yp),
        "y_proba": (y_proba, p_ypr),
        "y_test_opt": (y_test_opt, p_ytest),
        "y_pred_050_opt": (y_pred_050_opt, p_y050),
        "y_score_opt": (y_score_opt, p_ys),
        "results_df": (results_df, p_resdf),
        "results_summary": (results_sum, p_ressum),
        "export_meta": (meta, p_meta),
        "test_groups": (groups, p_groups),
        "model": (mdl, p_m),
        "figures02": (figs02, FIG_DIR_02 if figs02 else None),
    }

In [None]:
# --- Unified call after initialization cell --- #

A = load_artifacts_summary()

model, model_path = A["model"]
feature_names, feature_names_path = A["feature_names"]
X_test_enc, X_test_enc_path = A["X_test_enc"]
X_test_sensitive, sens_path = A["sensitive"]
results_df, results_path = A["results_df"]
test_groups, groups_path = A["test_groups"]
y_true, y_true_path = A["y_true"]
y_proba, y_proba_path = A["y_proba"]
y_pred, y_pred_path = A["y_pred"]

print("[ok] model:", model_path.name)
for name, p in [
    ("feature_names", feature_names_path),
    ("X_test_enc", X_test_enc_path),
    ("sensitive", sens_path),
    ("results_df", results_path),
    ("test_groups", groups_path),
    ("y_true", y_true_path),
    ("y_proba", y_proba_path),
    ("y_pred", y_pred_path),
]:
    print(f"[{'ok' if p else 'miss'}] {name}:", (p.name if p else None))

# aliases for backward compatibility #
X_test_sens = X_test_sensitive
y_true_test = y_true
y_proba_best = y_proba
y_pred_best = y_pred

In [None]:
# --- sanity-report from unified loader --- #

assert "A" in globals()

for name in [
    "model",
    "feature_names",
    "X_test_enc",
    "sensitive",
    "results_df",
    "results_summary",
    "test_groups",
    "y_true",
    "y_proba",
    "y_test_opt",
    "y_pred_050_opt",
    "y_score_opt",
    "export_meta",
    "figures02",
]:
    val, p = A[name]
    shape = getattr(val, "shape", None)
    if isinstance(val, list | tuple) and not shape:
        shape = f"len={len(val)}"
    print(f"[{name:>14}] path={getattr(p, 'name', None)} shape={shape}")

In [None]:
# --- Provenance from export_meta.json --- #

meta, p_meta = A["export_meta"]
if meta:
    print(f"[meta] best_model={meta.get('best_model')} | ts={meta.get('timestamp')}")
    arts = meta.get("artifacts") or []
    print(f"[meta] listed artifacts: {len(arts)}")
else:
    print("[meta] export_meta.json not found.")

In [None]:
# --- Leaderboard (results_summary.csv) --- #

res_sum, p_sum = A["results_summary"]

if res_sum is not None:
    df = res_sum.copy()
    # sort by test_f1 then roc_auc #
    cols = [
        c
        for c in [
            "model",
            "test_f1",
            "test_roc_auc",
            "test_precision",
            "test_recall",
            "test_accuracy",
        ]
        if c in df.columns
    ]
    df = df.sort_values(
        by=[c for c in ["test_f1", "test_roc_auc"] if c in df.columns], ascending=False
    )
    display(df[cols].head(10))
else:
    print("[skip] results_summary.csv absent.")

In [None]:
# --- Consistency check: y_true vs y_test --- #

y_true, _ = A["y_true"]
y_test_opt, p_ytest = A["y_test_opt"]
if y_true is not None and y_test_opt is not None:
    same = np.array_equal(y_true, y_test_opt)
    print(f"[check] y_true vs y_test: {'OK' if same else 'MISMATCH'}")
    if not same:
        raise RuntimeError("y_true_test.npy and y_test.npy differ.")
else:
    print("[check] y_test optional not found -> skip.")

# --- Sanity check: shapes, NaNs, y_proba distribution --- #

**Goal.** Verify input data consistency and correctness of probabilistic predictions before conducting fairness and explainability analysis.

**Verification steps:**
1. **Shapes.** Ensure that the lengths of `X_test_enc`, `X_test_sens`, `y_true_test`, and `y_proba_best` match. Any mismatch indicates inconsistency between features, labels, and model outputs.  
2. **Missing values.** Check for `NaN`s in `y_proba_best` and in sensitive features. Their presence may distort fairness metrics and probability distributions.  
3. **Probability distribution:**
   - Print basic statistics (`min`, `max`, `mean`, `std`);
   - Plot a histogram of `y_proba_best`;
   - Visually assess class balance — check if the probabilities are skewed toward 0 or 1.  
4. **Group slices.** If `X_test_sens` includes `sex`, `race`, or `age_group`, compare mean values of `y_proba_best` across these groups. This serves as an early indicator of possible model bias.

**Interpretation:**
1. Matching shapes and no missing values confirm artifact consistency.  
2. A balanced `y_proba_best` distribution suggests that the model is not overconfident.  
3. Group-level differences at this stage are diagnostic signals only, not statistically significant fairness metrics.

**Expected result.** A histogram of `y_proba_best` is plotted, shapes are validated, and no `NaN`s are present. The data is confirmed suitable for fairness metric computation.

In [None]:
# --- sanity-check: shapes, NaNs, y_proba distribution --- #

# check presence of artifacts from A #
assert all(k in globals() for k in ["y_true_test", "y_pred_best", "y_proba_best", "X_test_sens"]), (
    "No aliases from unified loader."
)

# size consistency #
n = len(y_true_test)
assert len(y_pred_best) == n and len(y_proba_best) == n, "Lengths of y_* do not match"
assert len(X_test_sens) == n, "Length of X_test_sens does not match y"

# NaN and valid range #
assert np.isfinite(y_proba_best).all(), "NaN/Inf present in y_proba_best"
assert (y_proba_best >= 0).all() and (y_proba_best <= 1).all(), "y_proba_best outside [0, 1]"

# probability histogram #
plt.figure()
plt.hist(y_proba_best, bins=30)
plt.xlabel("y_proba_best")
plt.ylabel("count")
plt.title("Distribution of predicted probabilties")
plt.tight_layout()
save_fig("hist_y_proba_best.png")
plt.show()

# baseline selection rate by groups (t = 0.5) #
t = 0.5
y_pred_05 = (y_proba_best >= t).astype("int8")

pred_ser = pd.Series(y_pred_05, index=X_test_sens.index, name="y_pred_05")

for gcol in [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]:
    grp = X_test_sens[gcol].fillna("NA")
    rates = pred_ser.groupby(grp).mean().sort_values(ascending=False)
    print(f"[{gcol}] selection rate @ t = 0.5:")
    print(rates, "\n")

# --- Defining Sensitive Features and Groups --- #

**Goal.** Specify the list of sensitive features used to assess model fairness and ensure groups are correctly prepared for downstream metrics.

**Feature selection.** For the *Census Income Classifier* task, the sensitive features are:
1. `sex` - binary (Male/Female).
2. `race` - categorical (White/Black/Asian-Pac-Islander/Amer-Indian-Eskimo/Other).
3. `age_group` - discretized age (e.g., 17–29, 30–44, 45–59, 60+), if present.

**Data handling:**
1. Verify that these columns exist in `X_test_sens`.
2. Replace missing values with the `'NA'` category to keep groups complete.
3. Cast categorical features to `category` dtype with a fixed category order to avoid accidental alphabetical ordering in comparisons.
4. Optionally aggregate low-count categories into `'Other'`.

**Interpretation.** Properly defining sensitive features ensures that fairness metrics (Demographic Parity, Equalized Odds, etc.) are computed on comparable groups and remain interpretable.

**Expected result.** A DataFrame `X_test_sens` with processed sensitive features, ready for use in fairness analysis blocks.

In [None]:
# --- Sensitive features and groups --- #

# ensure X_test_sens from unified loader #
assert "X_test_sens" in globals(), "X_test_sens must come from unified loader."

CAND_SENSITIVE = ["sex", "race", "age_group"]
SENSITIVE = [c for c in CAND_SENSITIVE if c in X_test_sens.columns]
print("Using SENSITIVE:", SENSITIVE)

# meaningful order for age_group #
if "age_group" in SENSITIVE:
    desired_order = ["18-25", "26-45", "46-65", "65+"]
    try:
        X_test_sens["age_group"] = pd.Categorical(
            X_test_sens["age_group"], categories=desired_order, ordered=True
        )
    except Exception as e:
        print("Could not set order for age_group:", e)

# collect unique values for each sensitive variable #
GROUP_VALUES = {}
for col in SENSITIVE:
    vals = X_test_sens[col].dropna().unique().tolist()
    if (col == "age_group") and hasattr(X_test_sens[col], "cat"):
        vals = [v for v in X_test_sens[col].cat.categories if v in set(X_test_sens[col].dropna())]
        GROUP_VALUES[col] = list(vals)

# report group presence and sample sizes #
for col in SENSITIVE:
    print(f"\n[{col}] groups and sample sizes:")
    df_sizes = (
        X_test_sens[col]
        .value_counts(dropna=False)
        .rename_axis("group")
        .reset_index(name="n")
        .assign(share=lambda d: d["n"] / len(X_test_sens))
    )
    display(df_sizes)

print("\nGROUP_VALUES =", GROUP_VALUES)

# primary sensitive variable #
PRIMARY_SENS = SENSITIVE[0] if SENSITIVE else None
print("PRIMARY_SENS:", PRIMARY_SENS)

# --- Baseline: metrics at a common threshold (0.5) --- #

**Goal.** Compute key performance metrics at a fixed threshold `t=0.5`, used as the baseline for analyzing the *quality <-> fairness* trade-off.

**Evaluation metrics:**
1. Accuracy - share of correctly classified observations.
2. Precision - correctness of positive predictions.
3. Recall - coverage of the positive class.
4. F1-score - harmonic mean of Precision and Recall.
5. ROC-AUC - ability to separate classes independent of threshold.

**Interpretation.** Threshold `0.5` is a "neutral" cutoff not adjusted for class imbalance or fairness requirements. The results serve as:
1. A starting point for selecting the optimal threshold `t*`.
2. A control baseline when comparing with post-processing (`ThresholdOptimizer`).
3. A reference to assess the impact of fairness constraints on quality.

**Expected result.** A table or dict with Accuracy, Precision, Recall, F1, and ROC-AUC at `t=0.5`. These will be used to build the threshold curve and the Pareto plot.

In [None]:
# Baseline metrics at t = 0.5 #

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# binary predictions at threshold 0.5 #
y_pred_05 = (y_proba_best >= 0.5).astype(int)

base_metrics = {
    "accuracy": accuracy_score(y_true_test, y_pred_05),
    "f1": f1_score(y_true_test, y_pred_05),
    "precision": precision_score(y_true_test, y_pred_05, zero_division=0),
    "roc_auc": roc_auc_score(y_true_test, y_proba_best),
    "recall": recall_score(y_true_test, y_pred_05, zero_division=0),
}

print("Baseline metrics (t = 0.5):")
display(pd.DataFrame([base_metrics]).round(4))

# --- Fairness across groups at t = 0.5 --- #

**Goal.** Evaluate how the model’s predictive performance differs across sensitive groups at the baseline threshold `t=0.5`.

**Metrics used:**
1. **Selection Rate (SR)** - proportion of samples predicted as positive in each group.  
2. **True Positive Rate (TPR)** and **False Positive Rate (FPR)** - sensitivity and false positive rate by group.  
3. **Precision, Recall, F1** - classic performance metrics computed separately per sensitive group.  
4. **Demographic Parity Difference** - disparity in positive prediction rates between groups.

**Interpretation:**
1. Differences in SR and TPR/FPR indicate potential model bias across subpopulations.  
2. If significant disparities are observed (e.g., one group with high SR and another with low SR), fairness constraints may be needed during post-processing.  
3. Threshold `0.5` serves as a reference point to gauge the magnitude of imbalance before optimizing for the threshold `t*`.

**Visualization and outputs:**
1. Group-wise metric tables are saved as CSV files in `reports/figures_03/`.  
2. Bar charts are generated for each metric (`bar_f1_by_*.png`, `bar_precision_by_*.png`, `bar_recall_by_*.png`).  
3. Confusion matrices per group can also be visualized to inspect error types.

**Expected result.** Summary tables and visualizations of fairness metrics at `t=0.5`. These serve as the foundation for optimal threshold selection in the next step.

In [None]:
# --- Group fairness metrics at t = 0.5 --- #

from fairlearn.metrics import demographic_parity_ratio, equalized_odds_difference

t = 0.5
y_pred = (y_proba_best >= t).astype("int8")

# DP/EOD @ t=0.5 #
# choose the sensitive feature #
sf_col = globals().get("PRIMARY_SENS", None) or "sex"
sf = X_test_sens[sf_col]

dp_diff = demographic_parity_difference(y_true=y_true_test, y_pred=y_pred_05, sensitive_features=sf)
dp_ratio = demographic_parity_ratio(y_true=y_true_test, y_pred=y_pred_05, sensitive_features=sf)
eod_diff = equalized_odds_difference(y_true=y_true_test, y_pred=y_pred_05, sensitive_features=sf)

print(
    "[fairness@0.5] "
    f"{sf_col}: DP diff={dp_diff:.4f}, "
    f"DP ratio={dp_ratio:.4f}, EOD diff={eod_diff:.4f}"
)

_agg = {
    "settings": "t=0.5",
    "sf_col": sf_col,
    "dp_diff": dp_diff,
    "dp_ratio": dp_ratio,
    "eod_diff": eod_diff,
}
pd.DataFrame([_agg]).to_csv(FIG_DIR_03 / "fairness_overview_t05.csv", index=False)


def _prec(y_true, y_pred):
    return precision_score(y_true, y_pred, zero_division=0)


def _rec(y_true, y_pred):
    return recall_score(y_true, y_pred, zero_division=0)


def _f1(y_true, y_pred):
    return f1_score(y_true, y_pred, zero_division=0)


def _acc(y_true, y_pred):
    return accuracy_score(y_true, y_pred)

In [None]:
# --- score diagnostics from y_score.npy --- #

from sklearn.calibration import calibration_curve

y_score_opt, _ = A["y_score_opt"]
y_true, _ = A["y_true"]
if y_score_opt is not None:
    y_score_norm = (y_score_opt - np.min(y_score_opt)) / (np.ptp(y_score_opt) + 1e-12)
    prob_true, prob_pred = calibration_curve(y_true, y_score_norm, n_bins=10, strategy="quantile")
    df_cal = pd.DataFrame({"preb_pred": prob_pred, "prob_true": prob_true})
    df_cal.to_csv(ART_DIR / "calibration_from_score_quantile.csv", index=False)
    print("[ok] saved:", ART_DIR / "calibration_from_score_quantile.csv")
else:
    print("[skip] y_score.npy absent.")

# --- Threshold curve and Pareto: quality vs Demographic Parity --- #

**Goal.** Find an operating classification threshold `t*` that balances model quality and fairness.

**Method:**
1. Run a threshold sweep: compute quality metrics (Accuracy, F1) and fairness metrics (Demographic Parity Difference/Ratio, Equalized Odds Difference) on a grid `t ∈ [0, 1]`.
2. Plot:
   - `accuracy_f1_vs_threshold.png` - quality vs threshold;
   - `dp_vs_threshold.png` - fairness vs threshold;
   - `Pareto_f1_vs_dp.png` - F1 vs Demographic Parity trade-off.
3. Choose `t*` at a Pareto-optimal point where improving one metric worsens the other.

**Interpretation:**
1. For biased models, reducing Demographic Parity Difference often lowers F1.
2. The optimal `t*` stabilizes this trade-off.
3. Use `t*` for confusion matrices, fairness metrics, and calibration plots.

**Visualization and outputs:**
1. Save plots to `reports/figures_03`.
2. Save a CSV of per-threshold metrics (Accuracy, F1, DP, EOD) for audit transparency.
3. Save the chosen threshold `t*` as `t_star.npy`.

**Expected result.** Threshold curves and a Pareto plot are produced. The operating threshold `t*` is selected and saved, reflecting an optimal balance between quality and fairness.

In [None]:
# --- Threshold sweep + Pareto + select t* + save figures --- #

# fill NaNs in sensitive series #
def _sens_fill(s: pd.Series) -> pd.Series:
    s = s.copy()
    if isinstance(s.dtype, pd.CategoricalDtype):
        if "NA" not in s.cat.categories:
            s = s.cat.add_categories(["NA"])
        return s.fillna("NA")
    else:
        return s.fillna("NA")


# which sensitive features are present in X_test_sens #
sens_cols = [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]
if len(sens_cols) == 0:
    raise RuntimeError(
        "None of the expected sensitive features were found in X_test_sens: sex/race/age_group."
    )

# ensure y_true_test/y_proba_best are present #
assert "y_true_test" in globals() and "y_proba_best" in globals(), (
    "Нужны y_true_test и y_proba_best из 02_modeling."
)

# align indices to X_test_sens #
y_true = pd.Series(y_true_test, index=X_test_sens.index)
y_proba = pd.Series(y_proba_best, index=X_test_sens.index)

try:
    auc_once = roc_auc_score(y_true, y_proba)
except Exception:
    auc_once = np.nan

# build threshold table #
ts = np.round(np.arange(0.05, 0.95, 0.01), 2)
rows = []
for t in ts:
    # binarize probabilities on the same index
    y_pred_t = (y_proba >= t).astype(int)

    # base metrics on aligned series
    m_f1 = f1_score(y_true, y_pred_t, zero_division=0)
    m_pr = precision_score(y_true, y_pred_t, zero_division=0)
    m_rc = recall_score(y_true, y_pred_t, zero_division=0)
    m_acc = accuracy_score(y_true, y_pred_t)
    m_auc = auc_once

    # DP difference by max gam among available sensitive features
    dp_diffs = []
    for col in sens_cols:
        s = _sens_fill(X_test_sens[col])
        grp_name = s.name or col

        # categorize with a fixed group order
        if grp_name in GROUP_VALUES:
            s = pd.Series(
                pd.Categorical(s, categories=GROUP_VALUES[grp_name], ordered=False),
                index=y_pred_t.index,
                name=grp_name,
            )
        else:
            s = pd.Series(s, index=y_pred_t.index, name=grp_name)

        # upcoming pandas default: observed=True
        grp_rates = y_pred_t.groupby(s, observed=True).mean()

        # restore full group order
        if grp_name in GROUP_VALUES:
            grp_rates = grp_rates.reindex(GROUP_VALUES[grp_name])

        # compute disparate impact/difference
        dp = float(grp_rates.max() - grp_rates.min())
        dp_diffs.append(dp)

    # if all NaN -> return np.nan #
    dp_diff_max = float(np.nanmax(dp_diffs)) if len(dp_diffs) else np.nan

    # EOD #
    eod_diffs = []
    try:
        mask_pos = y_true == 1
        for col in sens_cols:
            s = _sens_fill(X_test_sens[col])
            grp_name = s.name or col

            if grp_name in GROUP_VALUES:
                s = pd.Series(
                    pd.Categorical(s, categories=GROUP_VALUES[grp_name], ordered=False),
                    index=y_true.index,
                    name=grp_name,
                )
                groups_iter = GROUP_VALUES[grp_name]
            else:
                s = pd.Series(s, index=y_true.index, name=grp_name)
                groups_iter = pd.unique(s)

            tprs = []
            for g in groups_iter:
                m = (s == g) & mask_pos
                if m.sum() > 0:
                    tprs.append((y_pred_t[m] == 1).mean())
            if len(tprs) > 1:
                eod_diffs.append(float(np.max(tprs) - np.min(tprs)))
        eod_diff_max = float(np.nanmax(eod_diffs)) if eod_diffs else np.nan
    except Exception:
        eod_diff_max = np.nan

    rows.append(
        {
            "t": t,
            "f1": m_f1,
            "precision": m_pr,
            "recall": m_rc,
            "accuracy": m_acc,
            "auc": m_auc,
            "dp_diff_max": dp_diff_max,
            "eod_diff_max": eod_diff_max,
        }
    )

scan_df = pd.DataFrame(rows)

# export csv #
ART_DIR = globals().get("ART_DIR", Path("data") / "artifacts")
ART_DIR.mkdir(parents=True, exist_ok=True)
out_csv = ART_DIR / "fairness_threshold_scan.csv"
scan_df.to_csv(out_csv, index=False)
print(f"Saved fairness_threshold_scan.csv -> {out_csv}")

# copy to reports #
REPORTS_DIR = globals().get("REPORTS_DIR", Path("data") / "reports")
FIG_DIR_03 = REPORTS_DIR / "figures_03"
FIG_DIR_03.mkdir(parents=True, exist_ok=True)
out_csv_rep = FIG_DIR_03 / "fairness_threshold_scan.csv"
scan_df.to_csv(out_csv_rep, index=False)
print(f"Copied to reports -> {out_csv_rep}")

# Pareto: F1 vs DP #
fig, ax = plt.subplots(figsize=(6, 5))
ax.scatter(scan_df["dp_diff_max"], scan_df["f1"], s=14, alpha=0.85)
ax.set_xlabel("Demographic Parity (max group diff)")
ax.set_ylabel("F1")
ax.set_title("Pareto: F1 vs DP")

# highlight best-by-f1 point #
best_idx = int(scan_df["f1"].idxmax())
ax.scatter(
    [scan_df.loc[best_idx, "dp_diff_max"]], [scan_df.loc[best_idx, "f1"]], s=60, edgecolors="k"
)
for k in ["dp_diff_max", "f1", "t"]:
    ax.annotate(
        f"{k}={scan_df.loc[best_idx, k]:.3f}",
        (scan_df.loc[best_idx, "dp_diff_max"], scan_df.loc[best_idx, "f1"]),
        xytext=(10, 10),
        textcoords="offset points",
        fontsize=8,
    )
save_fig("Pareto_f1_vs_dp.png", fig)

# metrics vs threshold #
fig, ax = plt.subplots(figsize=(7, 5))
ax.plot(scan_df["t"], scan_df["accuracy"], label="accuracy")
ax.plot(scan_df["t"], scan_df["f1"], label="f1", linestyle="--")
ax.set_xlabel("threshold t")
ax.set_ylabel("score")
ax.set_title("Accuracy & F1 vs threshold")
ax.legend()
save_fig("accuracy_f1_vs_threshold.png", fig)

fig, ax = plt.subplots(figsize=(7, 5))
ax.plot(scan_df["t"], scan_df["dp_diff_max"])
ax.set_xlabel("threshold t")
ax.set_ylabel("DP max diff")
ax.set_title("DP (max group diff) vs threshold")
save_fig("dp_vs_threshold.png", fig)

# EOD #
if scan_df["eod_diff_max"].notna().any():
    fig, ax = plt.subplots(figsize=(7, 5))
    ax.plot(scan_df["t"], scan_df["eod_diff_max"])
    ax.set_xlabel("threshold t")
    ax.set_ylabel("EOD max diff")
    ax.set_title("EOD (max group diff) vs threshold")
    save_fig("eod_vs_threshold.png", fig)

# Pareto-front: F1 vs DP #
scan_df["dp_abs"] = scan_df["dp_diff_max"].abs()

pareto_idx = []
for i, r in scan_df.iterrows():
    dominated = (
        (scan_df["f1"] >= r["f1"])
        & (scan_df["dp_abs"] <= r["dp_abs"])
        & ((scan_df["f1"] > r["f1"]) | (scan_df["dp_abs"] < r["dp_abs"]))
    ).any()
    if not dominated:
        pareto_idx.append(i)
pareto_df = scan_df.loc[pareto_idx]

# selection rate: minimal |DP|, tie-breaker by maximal F1 #
t_star = float(pareto_df.sort_values(["dp_abs", "f1"], ascending=[True, False]).iloc[0]["t"])

# save t* as an artifact #
np.save(ART_DIR / "t_star.npy", np.array([t_star], dtype=float))

In [None]:
# --- Compare metrics: t* vs t-0.5 --- #

assert "t_star" in globals()
y_true, _ = A["y_true"]
y_proba_best, _ = A["y_proba"]
y_pred_050_opt, _ = A["y_pred_050_opt"]


def _metrics(y_true, y_pred, y_score=None):
    out = {
        "accuracy": accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred, zero_division=0),
        "recall": recall_score(y_true, y_pred, zero_division=0),
        "f1": f1_score(y_true, y_pred, zero_division=0),
    }
    if y_score is not None:
        try:
            out["roc_auc"] = roc_auc_score(y_true, y_score)
        except Exception:
            pass
    return out


# t* #
y_pred_star = (y_proba_best >= float(t_star)).astype(int)
m_star = _metrics(y_true, y_pred_star, y_proba_best)

# t=0.5 #
if y_pred_050_opt is not None:
    m_050 = _metrics(y_true, y_pred_050_opt, y_proba_best)
    comp = pd.DataFrame([m_050, m_star], index=["t=0.5", "t*"])
    display(comp)
    comp.to_csv(ART_DIR / "metrics_t050_vs_tstar.csv", index=True)
    print("[ok] saved:", ART_DIR / "metrics_050_vs_tstar.csv")
else:
    print("[skip] y_pred_050.npy absent.")

In [None]:
# --- Group metrics @ t* -> CSV --- #

# input objects: #
# y_true: vector of true labels #
# y_proba_best: positive-class probabilities #
# t_star: selected threshold #
# X_test_sensitive: DataFrame with sensitive attributes #

from sklearn.metrics import precision_recall_fscore_support

assert "t_star" in globals(), "t_star not found"
assert "y_proba_best" in globals(), "y_proba_best not found"
assert "y_true" in globals(), "y_true not found"
assert "X_test_sensitive" in globals(), "X_test_sensitive not found"

# output directories #
ART_DIR = globals().get("ART_DIR", Path("data") / "artifacts")
FIG_DIR_03 = globals().get("REPORTS_DIR", Path("data") / "reports") / "figures_03"

ART_DIR.mkdir(parents=True, exist_ok=True)
FIG_DIR_03.mkdir(parents=True, exist_ok=True)

# predictions at t* #
y_pred_star = (y_proba_best >= float(t_star)).astype(int)

rows_metrics = []
rows_selrate = []

for col in X_test_sensitive.columns:
    s = X_test_sensitive[col]
    # drop NaNs from grouping
    for g in sorted(s.dropna().unique()):
        m = (s == g).values
        n = int(m.sum())
        if n == 0:
            continue
        yt = y_true[m]
        yp = y_pred_star[m]

        # metrics
        p, r, f1, _ = precision_recall_fscore_support(yt, yp, average="binary", zero_division=0)
        acc = accuracy_score(yt, yp)

        rows_metrics.append(
            {
                "threshold": float(t_star),
                "group_col": col,
                "group": g,
                "n": n,
                "precision": float(p),
                "recall": float(r),
                "f1": float(f1),
                "accuracy": float(acc),
            }
        )

        # selection rate
        sel = float(yp.mean()) if n > 0 else 0.0
        rows_selrate.append(
            {
                "threshold": float(t_star),
                "group_col": col,
                "group": g,
                "n": n,
                "selection_rate": sel,
            }
        )

df_metrics = pd.DataFrame(
    rows_metrics,
    columns=["threshold", "group_col", "group", "n", "precision", "recall", "f1", "accuracy"],
)
df_selrate = pd.DataFrame(
    rows_selrate, columns=["threshold", "group_col", "group", "n", "selection_rate"]
)

# save to report and duplicate to artifacts #
p1 = FIG_DIR_03 / "group_metrics_t_star.csv"
p2 = FIG_DIR_03 / "selection_rates_t_star.csv"
a1 = ART_DIR / "group_metrics_t_star.csv"
a2 = ART_DIR / "selection_rates_t_star.csv"

df_metrics.to_csv(p1, index=False)
df_selrate.to_csv(p2, index=False)
df_metrics.to_csv(a1, index=False)
df_selrate.to_csv(a2, index=False)

print(f"[ok] Saved: {p1}")
print(f"[ok] Saved: {p2}")
print(f"[ok] Saved: {a1}")
print(f"[ok] Saved: {a2}")

# --- Post-Processing: ThresholdOptimizer (DemographicParity / EqualizedOdds) --- #

**Goal.** Apply post-processing to adjust model decisions under fairness constraints and assess its impact on quality and fairness.

**Approach:**
1. Use `ThresholdOptimizer` from fairlearn to adapt the classification threshold separately for each subgroup of a sensitive feature.
2. Consider two constraint types:
   - **Demographic Parity** - equalize the share of positive outcomes across groups;
   - **Equalized Odds** - equalize sensitivity (TPR) and specificity (FPR) across groups.

**Comparison methodology:**
1. Run optimization separately for each constraint using the trained model as a black box (`estimator='prefitted'`).


In [None]:
# --- Post‑processing: ThresholdOptimizer (DP / EqOdds) --- #

from fairlearn.metrics import demographic_parity_difference, equalized_odds_difference

warnings.filterwarnings("ignore", category=UserWarning, message=".*sensitive features not unique.*")


# fill NaNs in sensitive attributes #
def _sens_fill(s):
    s = s.copy()
    if isinstance(s.dtype, pd.CategoricalDtype):
        if "NA" not in s.cat.categories:
            s = s.cat.add_categories(["NA"])
        return s.fillna("NA")
    else:
        return s.fillna("NA")


# choose an available sensitive attribute #
sens_cols = [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]
if not sens_cols:
    print("No sensitive features - skipping ThresholdOptimizer")
else:
    gcol = "sex" if "sex" in X_test_sens.columns else sens_cols[0]
    sf = _sens_fill(X_test_sens[gcol])

    class _ScoresEstimator:
        """A surrogate classifier that returns precomputed probabilities."""

        def __init__(self, scores):
            self.scores = np.asarray(scores)
            # to prevent check_fitted from raising errors when prefit=True
            self.fitted_ = True

        def get_params(self, deep=True):
            return {"scores": self.scores}

        def set_params(self, **params):
            # dimension retention
            return self

        def fit(self, X, y):
            self.fitted_ = True
            # dimension retention
            return self

        def predict_proba(self, X):
            # X is used inly for signature compatibility;
            # return precomputed probabilities
            p = self.scores
            return np.column_stack([1.0 - p, p])

    # create surrogate model and 'features' as indeces #
    X_idx = np.arange(len(y_true_test)).reshape(-1, 1)
    base_est = _ScoresEstimator(y_proba_best)

    # ThresholdOptimizer for demographic parity #
    postproc = ThresholdOptimizer(
        estimator=base_est,
        constraints="demographic_parity",
        predict_method="predict_proba",
        prefit=True,
    )
    # fit/predict on the same X_idx #
    postproc.fit(X=X_idx, y=y_true_test, sensitive_features=sf)
    y_pred_dp = postproc.predict(X=X_idx, sensitive_features=sf).astype("int8")

    # metrics after post-processing #
    def _prec(y_true, y_pred):
        return precision_score(y_true, y_pred, zero_division=0)

    def _rec(y_true, y_pred):
        return recall_score(y_true, y_pred, zero_division=0)

    def _f1(y_true, y_pred):
        return f1_score(y_true, y_pred, zero_division=0)

    dp = demographic_parity_difference(y_true=y_true_test, y_pred=y_pred_dp, sensitive_features=sf)
    eq = equalized_odds_difference(y_true=y_true_test, y_pred=y_pred_dp, sensitive_features=sf)

    print(
        f"[ThresholdOptimizer @ {gcol}] "
        f"acc={accuracy_score(y_true_test, y_pred_dp):.3f} "
        f"f1={_f1(y_true_test, y_pred_dp):.3f} "
        f"prec={_prec(y_true_test, y_pred_dp):.3f} "
        f"rec={_rec(y_true_test, y_pred_dp):.3f} "
        f"| DP diff={dp:+.3f} EqOdds diff={eq:+.3f}"
    )

# summary table of metrics: baseline vs t* ThresholdOptimizer #
rows = []


def _metrics_row(tag, y_pred_local, sf_local):
    return {
        "settings": tag,
        "accuracy": accuracy_score(y_true_test, y_pred_local),
        "precision": precision_score(y_true_test, y_pred_local, zero_division=0),
        "recall": recall_score(y_true_test, y_pred_local, zero_division=0),
        "f1": f1_score(y_true_test, y_pred_local, zero_division=0),
        "dp_diff": demographic_parity_difference(
            y_true=y_true_test, y_pred=y_pred_local, sensitive_features=sf_local
        ),
        "eod_diff": equalized_odds_difference(
            y_true=y_true_test, y_pred=y_pred_local, sensitive_features=sf_local
        ),
    }


# baseline t=0.5 #
y_pred_05 = (y_proba_best >= 0.5).astype(int)
rows.append(_metrics_row("baseline_t0.5", y_pred_05, sf))

# baseline t* #
if "t_star" in globals():
    y_pred_star = (y_proba_best >= t_star).astype(int)
    rows.append(_metrics_row("baseline_t*", y_pred_star, sf))

# post-processing DP #
rows.append(_metrics_row("ThresholdOptimizer_DP", y_pred_dp, sf))

postproc_df = pd.DataFrame(rows)
postproc_df.to_csv(FIG_DIR_03 / "metrics_postprocessing.csv", index=False)
print("[postproc] saved:", FIG_DIR_03 / "metrics_postprocessing.csv")

# --- Probability Calibration by Groups --- #

**Goal.** Assess the agreement between probabilistic predictions and empirical frequencies of the positive outcome across sensitive groups.

**Methodology:**
1. For each available group (`sex`, `race`, `age_group`), plot reliability curves over 10 equal probability bins.
2. Compute Expected Calibration Error (ECE) using 10 bins.
3. Ignore very small subgroups when plotting by setting `min_count=50`.

**Outputs:**
1. Plot files: `calibration_{group}.png` in `reports/figures_03/`.
2. ECE summary table: `calibration_ece_by_group.png` in `reports/figures_03/`.

**Interpretation:**
1. The line \(y=x\) corresponds to perfect calibration. Systematic deviations indicate overconfidence or underconfidence of the model within a given group.
2. ECE aggregates the weighted average deviation and serves as a compact numerical indicator of group-wise calibration.

In [None]:
# --- Calibration plots by groups + сохранение --- #

from sklearn.calibration import calibration_curve


def plot_reliability_by_groups(
    y_true, y_proba, group_series, n_bins=10, min_count=50, title="", save_name=None
):
    groups = group_series.value_counts().index.tolist()
    kept = []
    fig, ax = plt.subplots(figsize=(6, 6))
    for g in groups:
        mask = (group_series == g).to_numpy()
        if mask.sum() < min_count:
            continue
        prob_true, prob_pred = calibration_curve(
            y_true[mask], y_proba[mask], n_bins=n_bins, strategy="uniform"
        )
        ax.plot(prob_pred, prob_true, label=f"{g} (n={mask.sum()})")
        kept.append(g)
    ax.plot([0, 1], [0, 1], "--", linewidth=1)
    ax.set_xlabel("Predicted probability")
    ax.set_ylabel("Empirical positive rate")
    ax.set_title(title or f"Calibration by {group_series.name}")
    if kept:
        ax.legend()
    if save_name:
        save_fig(save_name, fig)


def compute_ece(y_true, y_proba, n_bins=10):
    # uniform binning by predicted probability #
    bins = np.linspace(0.0, 1.0, n_bins + 1)
    idx = np.digitize(y_proba, bins) - 1
    ece = 0.0
    N = len(y_true)
    for b in range(n_bins):
        mask = idx == b
        nb = mask.sum()
        if nb == 0:
            continue
        acc_b = y_true[mask].mean()
        conf_b = y_proba[mask].mean()
        ece += (nb / N) * abs(acc_b - conf_b)
    return float(ece)


# data availability check #
assert all(k in globals() for k in ["y_true_test", "y_proba_best", "X_test_sens"])

# run for all available sensitive attributes #
for col in [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]:
    plot_reliability_by_groups(
        y_true=y_true_test,
        y_proba=y_proba_best,
        group_series=X_test_sens[col],
        n_bins=10,
        min_count=50,
        title=f"Calibration by {col}",
        save_name=f"calibration_{col}.png",
    )

# ECE by groups and save CSV #
ece_rows = []
for col in [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]:
    s = X_test_sens[col]
    # compute per-category ECE within col
    for g in s.dropna().unique():
        m = (s == g).to_numpy()
        if m.sum() == 0:
            continue
        ece = compute_ece(y_true=y_true_test[m], y_proba=y_proba_best[m], n_bins=10)
        ece_rows.append({"group_col": col, "group": str(g), "n": int(m.sum()), "ece10": ece})

if ece_rows:
    pd.DataFrame(ece_rows).sort_values(["group_col", "group"]).to_csv(
        FIG_DIR_03 / "calibration_ece_by_group.csv", index=False
    )

In [None]:
# --- Group metrics bar charts + saving --- #

assert "t_star" in globals(), "Expected a chosen threshold t_star"
assert all(k in globals() for k in ["y_true_test", "y_proba_best", "X_test_sens"])
y_pred_star = (y_proba_best >= t_star).astype(int)


def _bar_metric_by_group(
    y_true, y_pred, group_series, metric_fn, metric_name: str, min_count=50, save_name=""
):
    recs = []
    for g, idx in (
        group_series.reset_index(drop=True)
        .groupby(group_series.reset_index(drop=True))
        .groups.items()
    ):
        idx = np.array(idx, dtype=int)
        if len(idx) < min_count:
            continue
        val = metric_fn(y_true[idx], y_pred[idx])
        recs.append({"group": str(g), metric_name: float(val), "n": int(len(idx))})
    if not recs:
        print(f"[fairness] skip {metric_name}: all groups < min_count")
        return
    df = pd.DataFrame(recs).sort_values(metric_name, ascending=False)

    fig, ax = plt.subplots(figsize=(7, 4))
    ax.bar(df["group"], df[metric_name])
    ax.set_title(f"{metric_name} by {group_series.name}")
    ax.set_ylabel(metric_name)
    ax.set_xlabel(group_series.name)
    for i, v in enumerate(df[metric_name].values):
        ax.text(i, v, f"{v:.2f}", ha="center", va="bottom", fontsize=8)
    save_fig(save_name, fig)


for col in [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]:
    gs = X_test_sens[col]
    _bar_metric_by_group(
        y_true_test,
        y_pred_star,
        gs,
        precision_score,
        "precision",
        save_name=f"bar_precision_by_{col}.png",
    )
    _bar_metric_by_group(
        y_true_test, y_pred_star, gs, recall_score, "recall", save_name=f"bar_recall_by_{col}.png"
    )
    _bar_metric_by_group(
        y_true_test, y_pred_star, gs, f1_score, "f1", save_name=f"bar_f1_by_{col}.png"
    )

In [None]:
# --- Confusion matrices by group + saving --- #

import seaborn as sns
from sklearn.metrics import confusion_matrix

assert "t_star" in globals()
assert all(k in globals() for k in ["y_true_test", "y_proba_best", "X_test_sens"])
y_pred_star = (y_proba_best >= t_star).astype(int)


def _plot_cm(y_true, y_pred, title=""):
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    fig, ax = plt.subplots(figsize=(4.5, 4))
    sns.heatmap(cm, annot=True, fmt="d", cbar=False, ax=ax)
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    ax.set_title(title)
    ax.set_xticklabels(["0", "1"])
    ax.set_yticklabels(["0", "1"])
    return fig


for col in [c for c in ["sex", "race", "age_group"] if c in X_test_sens.columns]:
    s = X_test_sens[col]
    for g in s.dropna().unique():
        mask = (s == g).to_numpy()
        if mask.sum() < 50:
            continue
        fig = _plot_cm(
            y_true_test[mask], y_pred_star[mask], title=f"CM: {col}={g} (n={mask.sum()})"
        )
        safe_g = str(g).replace("/", "-").replace("\\", "-").replace(" ", "_")
        save_fig(f"cm_{col}_{safe_g}.png", fig)

In [None]:
# --- Baselines visuals from figures_02 --- #

figs02, dir02 = A["figures02"]
if figs02:
    print(f"[info] figures_02: {dir02} | count={len(figs02)}")
    for p in figs02[:6]:
        print(" -", p.name)
else:
    print("[skip] figures_02 not found.")

# --- Explainability: global (SHAP / alternatives) --- #

**Goal.** Identify which features contribute most to the model’s predictions and check for indirect dependencies on sensitive attributes.

**Methodology.** For the final model (LightGBM/XGBoost), we use `shap.TreeExplainer` to estimate each feature’s contribution to the predicted probability of the `>50k` class. SHAP provides consistent, additive attributions, which makes it the preferred method for explaining boosted trees.

**Global analysis results:**
1. Key socio-economic features (e.g., `marital-status_Married-civ-spouse`, `education-num`, `capital-gain`, `hours-per-week`, `age`) dominate the importance rankings.
2. Sensitive variables (`sex`, `race`, `age_group`) show low average impact, indicating no direct reliance on these attributes by the model.
3. Partial dependencies via proxy features (e.g., `occupation`, `marital-status`) may indirectly reflect group differences.

**Conclusion.** SHAP indicates the model primarily focuses on economic and demographic factors relevant to income rather than directly on sensitive attributes. This aligns with the fairness evaluation, which showed moderate but not critical group disparities.

In [None]:
# --- Explainability input selection via unified loader --- #

assert "A" in globals(), "Run the unified loader"
model, _ = A["model"]
XS, _ = A["X_test_enc"]
feature_names, _ = A["feature_names"]
test_groups, _ = A["test_groups"]

# direct use of loaded artifacts #
if XS is not None and feature_names is not None:
    clf_for_shap = getattr(model, "best_estimator_", model)
    print(
        "[ok] "
        f"XS: {getattr(XS, 'shape', None)}, "
        f"features: {len(feature_names)}, "
        f"model: {type(clf_for_shap).__name__}"
    )
else:
    # fallback inly if something is missing
    print(
        "[warn] No XS or feature_names from artifacts. "
        "Fallback recovery enabled (disabled per unified loader policy)."
    )
    raise RuntimeError("Required artifacts for explainability: X_test_enc and/or feature_names.")

In [None]:
# --- Dependence-plots for top features --- #

import re

import scipy.sparse as sp

rng = np.random.default_rng(42)

# search for artifacts in common project folders #
ART_DIRS = [
    Path("data") / "artifacts",
    Path("data") / "models",
    Path("notebooks") / "artifacts",
    Path("notebooks") / "models",
]


def _first_exists_any(names):
    names_lower = [n.lower() for n in names]
    for d in ART_DIRS:
        if not d.exists():
            continue
        for p in d.iterdir():
            if p.is_file() and p.name.lower() in names_lower:
                return p
    # recursive fallback
    roots = [Path(".").resolve()]
    roots += list(roots[0].parents)[:3]
    for root in roots:
        for p in root.rglob("*"):
            try:
                if p.is_file() and p.name.lower() in names_lower:
                    return p
            except PermissionError:
                continue
    return None


# model #
clf = None
p_model = _first_exists_any(["lgb_best.joblib", "LGBM_best.joblib", "model_best.joblib"])
if p_model is not None:
    model, model_path = A["model"]
    obj = model
    # if this is pipeline — pull out clf
    if hasattr(obj, "named_steps"):
        clf = obj.named_steps.get("clf", None) or obj
    else:
        clf = obj

# data: X_test_enc and feature_names #
X_test_enc, X_test_enc_path = A["X_test_enc"]
XS = X_test_enc

# load feature_names from artifacts #
feature_names, feature_names_path = A["feature_names"]

# safeguard when data are missing #
if XS is None:
    print("No XS/X_test_enc data: skipping dependence plots")
    raise SystemExit

# align feature_names to current XS matrix #
ncols = XS.shape[1]

# try to get names from the model (if not default f0..fN) #
try:
    if (
        "clf" in globals()
        and clf is not None
        and hasattr(clf, "booster_")
        and clf.booster_ is not None
    ):
        fnm = list(clf.booster_.feature_name())
        all_fnums = all(isinstance(x, str) and re.fullmatch(r"f\d+", x) for x in fnm)
        if isinstance(fnm, list) and len(fnm) == ncols and not all_fnums:
            feature_names = [str(x) for x in fnm]
            print("[info] feature_names taken from model booster")
except Exception:
    pass


# ensure matching length: trim/pad if needed #
def _ensure_len(cols, n):
    lst = list(cols) if isinstance(cols, list | np.ndarray | pd.Index) else []
    if len(lst) == n:
        return [str(c) for c in lst]
    if len(lst) > n:
        print(f"[warn] feature_names longer ({len(lst)}) than ncols={n}. Trimming.")
        return [str(c) for c in lst[:n]]
    print(
        f"[warn] feature_names shorter ({len(lst)}) than ncols={n}. Padding f{len(lst)}..f{n - 1}."
    )
    return [str(c) for c in (lst + [f"f{i}" for i in range(len(lst), n)])]


if (feature_names is None) or (
    isinstance(feature_names, (list | np.ndarray | pd.Index)) and len(feature_names) != ncols
):
    feature_names = _ensure_len(feature_names, ncols)

if XS is not None and not isinstance(XS, pd.DataFrame):
    if sp.issparse(XS):
        XS = pd.DataFrame.sparse.from_spmatrix(
            XS, columns=(feature_names if feature_names is not None else None)
        )
    else:
        XS = pd.DataFrame(XS, columns=(feature_names if feature_names is not None else None))

# extract classifier from pipeline if needed #
clf_for_shap = None
if "clf" in globals() and clf is not None and hasattr(clf, "predict_proba"):
    clf_for_shap = clf
else:
    g = globals()
    _pipe = g.get("pipe")
    if _pipe is not None and hasattr(_pipe, "named_steps"):
        clf_for_shap = _pipe.named_steps.get("clf", None)
        if clf_for_shap is None:
            for _, step in _pipe.named_steps.items():
                if hasattr(step, "predict_proba") or hasattr(step, "predict"):
                    clf_for_shap = step
                    break

# unified check and subsampling #
if clf_for_shap is None or XS is None:
    print("No suitable model/data for SHAP: skipping dependence plots")
else:
    # subsample
    idx = np.arange(len(XS))
    if len(idx) > 5000:
        idx = rng.choice(idx, size=5000, replace=False)
    XS_sub = XS.iloc[idx] if hasattr(XS, "iloc") else XS[idx, :]

    expl = shap.TreeExplainer(clf_for_shap)
    shap_vals = expl.shap_values(XS_sub)

    # binary class -> take contributions for the positive class
    if isinstance(shap_vals, list):
        try:
            classes_ = getattr(clf_for_shap, "classes_", [0, 1])
            pos_idx = list(classes_).index(1)
        except Exception:
            pos_idx = 1 if len(shap_vals) > 1 else 0
        shap_vals = shap_vals[pos_idx]

    # top-k
    topk = 10

    # beeswarm -> file
    shap.summary_plot(
        shap_vals,
        XS_sub,
        feature_names=(feature_names if feature_names is not None else None),
        show=False,
        max_display=topk,
    )
    plt.tight_layout()
    save_fig("shap_summary_top10.png", None)
    plt.close()

    # bar -> file
    shap.summary_plot(
        shap_vals,
        XS_sub,
        feature_names=(feature_names if feature_names is not None else None),
        plot_type="bar",
        show=False,
        max_display=topk,
    )
    plt.tight_layout()
    save_fig("shap_bar_top10.png", None)
    plt.close()

In [None]:
# --- Explainability: local (TP/FP/FN cases) --- #

assert all(k in globals() for k in ["y_true_test", "y_proba_best", "X_test_sens"])
thr = globals().get("t_star", 0.5)
y_pred_thr = (y_proba_best >= thr).astype("int8")

TP_idx = np.where((y_true_test == 1) & (y_pred_thr == 1))[0]
FP_idx = np.where((y_true_test == 0) & (y_pred_thr == 1))[0]
FN_idx = np.where((y_true_test == 1) & (y_pred_thr == 0))[0]

# use only artifacts from the unified loader #
XS, _ = A["X_test_enc"]
feature_names, _ = A["feature_names"]
model, _ = A["model"]

# extract classifier from best_estimator_/Pipeline #
obj = getattr(model, "best_estimator_", model)
if hasattr(obj, "named_steps"):
    clf_for_shap = obj.named_steps.get("clf", None)
    if clf_for_shap is None:
        for step in obj.named_steps.values():
            if hasattr(step, "predict_proba") or hasattr(step, "predict"):
                clf_for_shap = step
                break
else:
    clf_for_shap = obj

if clf_for_shap is None:
    raise RuntimeError("Failed to extract a slassifier from the Pipeline for SHAP")

if XS is None and feature_names is None:
    raise RuntimeError("No X_test_enc or feature_names from artifacts; skipping explainability")

# build SHAP plots #
expl = shap.TreeExplainer(clf_for_shap)


def _pick_some(arr, k=3):
    if len(arr) == 0:
        return []
    rng = np.random.default_rng(42)
    return arr if len(arr) <= k else rng.choice(arr, size=k, replace=False).tolist()


cases = [
    ("TP", _pick_some(TP_idx, 3)),
    ("FP", _pick_some(FP_idx, 3)),
    ("FN", _pick_some(FN_idx, 3)),
]

for tag, idxs in cases:
    if not idxs:
        print(f"{tag}: no cases - skipping")
        continue
    for i in idxs:
        if hasattr(XS, "iloc"):
            row = XS.iloc[[i]]
        elif sp.issparse(XS):
            row = XS[i]
        else:
            row = XS[i : i + 1]

        sv = expl.shap_values(row)

        # choose class and reshape to 2D
        sv_arr = sv[1] if isinstance(sv, list) else sv
        if getattr(sv_arr, "ndim", 1) == 1:
            sv_arr = sv_arr.reshape(1, -1)

        # LightGBM: last column is base value -> extract and trim
        if sv_arr.shape[1] == row.shape[1] + 1:
            base_val = sv_arr[0, -1]
            sv_arr = sv_arr[:, :-1]
        else:
            ev = expl.expected_value
            base_val = ev[1] if isinstance(ev, list | np.ndarray) else ev

        # features as 1D
        if hasattr(row, "values"):  # DataFrame
            feats_1d = row.values.reshape(-1)
        else:  # np/sparse
            feats_1d = np.asarray(row).reshape(-1)

        fig = shap.force_plot(
            base_value=base_val,
            shap_values=sv_arr.reshape(-1),
            features=feats_1d,
            feature_names=feature_names,
            matplotlib=True,
            show=False,
        )
        plt.suptitle(f"{tag} case idx={i} @ t={thr:.2f}")
        plt.tight_layout()
        plt.show(fig)

# --- Risks, Limitations, and Recommendations --- #

**Quality and Fairness.** The final model shows strong predictive performance (ROC-AUC around 0.9) with a moderate trade-off between accuracy and fairness. Without fairness correction, a gap in the share of positive predictions between genders is observed. Applying post-processing (ThresholdOptimizer with demographic parity or equalized odds) notably reduces this gap, at the cost of a slight F1 decrease - a typical outcome of balancing fairness and accuracy.

**Calibration and Interpretability.** For large groups, the model is well-calibrated: predicted probabilities align with observed positive rates. For smaller groups (by age or rare racial categories), calibration error increases due to limited data. Global SHAP analysis confirms that the model relies primarily on socio-economic features (`marital-status`, `education-num`, `capital-gain`, `hours-per-week`, `age`), while sensitive variables have minimal direct influence.

**Risks and Areas for Improvement:**
1. Small groups lead to unstable fairness metrics and weaker calibration.
2. Possible proxy factors (e.g., marital status or occupation) may partially encode sensitive attributes.
3. Stronger fairness regularization can noticeably reduce accuracy.

**Recommendations:**
1. For practical use, apply the model with a baseline threshold `t=0.5` or the optimized `t*`.
2. In high-stakes fairness scenarios, use ThresholdOptimizer with demographic parity control.
3. Examine the effect or aggregation of proxy features on fairness stability.
4. If needed, recalibrate predicted probabilities (Platt or Isotonic) and extend data coverage for small subgroups.

In [None]:
# --- Export artifacts from 03_fairness_and_explainability.ipynb --- #

ART_DIR = globals().get("ART_DIR", Path("data") / "artifacts")
REPORTS_DIR = globals().get("REPORTS_DIR", Path("data") / "reports")

ART_DIR.mkdir(parents=True, exist_ok=True)
(REPORTS_DIR / "figures_03").mkdir(parents=True, exist_ok=True)

# export threshold scan #
scan_obj = globals().get("scan_df", None)
try:
    if isinstance(scan_obj, pd.DataFrame) and len(scan_obj) > 0:
        out_csv = ART_DIR / "fairness_threshold_scan.csv"
        scan_obj.to_csv(out_csv, index=False)
        print(f"Saved fairness_threshold_scan.csv -> {out_csv}")

        # copy to reports for convenience
        out_csv_rep = REPORTS_DIR / "figures_03" / "fairness_threshold_scan.csv"
        try:
            scan_obj.to_csv(out_csv_rep, index=False)
            print(f"Copied to reports -> {out_csv_rep}")
        except Exception as e:
            print("[warn] copy to reports failed:", e)
    else:
        print("[info] scan_df missing or empty - export skipped")
except Exception as e:
    print("[warn] scan_df export:", e)

print("Export artifacts from 03_fairness_and_explainability.ipynb completed")

In [None]:
# --- Rediness check --- #

# base directories #
ROOT = globals().get("ROOT", Path("."))
ART_DIR = globals().get("ART_DIR", ROOT / "data" / "artifacts")
REPORTS_DIR = globals().get("REPORTS_DIR", ROOT / "data" / "reports")
FIG_DIR_03 = REPORTS_DIR / "figures_03"

# verify presence of key directories #
assert ART_DIR.exists(), f"Нет папки артефактов: {ART_DIR}"
FIG_DIR_03.mkdir(parents=True, exist_ok=True)

# required in-memory objects #
need = ["feature_names", "X_test_enc", "sensitive", "y_true", "y_proba", "y_pred"]
miss = [k for k in need if (k not in A) or (A.get(k, (None, None))[0] is None)]
if miss:
    raise RuntimeError(f"Missing required artifacts {miss}")

# expected files and patterns #
critical = [
    FIG_DIR_03 / "fairness_threshold_scan.csv",
    ART_DIR / "fairness_threshold_scan.csv",
    FIG_DIR_03 / "shap_summary_top10.png",
    FIG_DIR_03 / "shap_bar_top10.png",
    FIG_DIR_03 / "group_metrics_t_star.csv",
    FIG_DIR_03 / "selection_rates_t_star.csv",
]

patterns = [
    "Pareto_f1_vs_dp.png",
    "accuracy_f1_vs_threshold.png",
    "dp_vs_threshold.png",
    "eod_vs_threshold.png",
    "calibration_*.png",
    "bar_precision_by_*.png",
    "bar_recall_by_*.png",
    "bar_f1_by_*.png",
    "cm_*_*.png",
]

# check critical #
missing_critical = [str(p) for p in critical if not Path(p).exists()]
if missing_critical:
    raise AssertionError(f"Missing critical files: {missing_critical}")

# summary by patterns #
summary = {}
for pat in patterns:
    files = list(FIG_DIR_03.glob(pat))
    summary[pat] = len(files)

# print summary #
print("[fairness] report @", FIG_DIR_03)
for pat, cnt in summary.items():
    print(f"    {pat:28s} -> {cnt:3d}")

# extra logic: warnings #
warn = []

# if sensitive featrues exist, expect at least one var_* plot #
_sens_df = A.get("sensitive", (globals().get("X_test_sensitive", None), None))[0]
present_cols = [
    c for c in ["sex", "race", "age_group"] if (_sens_df is not None and c in _sens_df.columns)
]
for col in present_cols:
    for base in ["bar_precision_by_", "bar_recall_by_", "bar_f1_by_"]:
        if not list(FIG_DIR_03.glob(f"{base}{col}.png")):
            warn.append(f"Missing {base}{col}.png")

# at least one confusion matrix #
if summary.get("cm_*_*.png", 0) == 0:
    warn.append("No confusion matrices (cm_*_*.png)")

# at least one calibration_*.png #
if summary.get("calibration_*.png", 0) == 0:
    warn.append("No calibartion_*.png")

if warn:
    print("[fairness][warn]", "; ".join(warn))
else:
    print("[fairness] OK: full set of files generated")