# Policy implementation to use our best Model

**The Goal of the policy is**

To translate model scores into consistent, auditable lending decisions (**REJECT / REVIEW / APPROVE**) that **maximize expected business value** (loss avoided − costs) **subject to operational constraints** (review capacity) and **risk appetite**, while decoupling model training from business rules.

Concretely, the policy aims to:

* **Align decisions with economics:** pick thresholds that balance default loss vs. profit from performing loans.
* **Control workload:** set top-k cuts so review queues match capacity.
* **Ensure consistency & auditability:** fixed, versioned thresholds independent of model weights.
* **Support adaptability:** adjust thresholds as costs, capacity, or risk appetite change—no retrain needed.
* **Promote fairness & compliance:** enable segment-specific cuts or guardrails, and transparent rationale logging.


In [29]:
import json, joblib, numpy as np, pandas as pd
from pathlib import Path

## Get the Model and theresholds

In [30]:
MODEL_DIR = Path("./saved_models")        # <- adjust if needed
MODEL_PATH = max(MODEL_DIR.glob("best_model_recall_focus_xgb_*.joblib"), key=lambda p: p.stat().st_mtime)
META_PATH  = MODEL_DIR / "best_model_metadata.json"
POLICY_PATH = MODEL_DIR / "best_model_policy.json"   # separate policy file (optional but preferred)

print("Loading model:", MODEL_PATH.name)
best_model = joblib.load(MODEL_PATH)
with open(META_PATH, "r", encoding="utf-8") as f:
    meta = json.load(f)

feature_set    = meta["feature_set"]
num_cols_meta  = meta["numeric_columns"]
cat_cols_meta  = meta["categorical_columns"]

# Some training runs saved a top-k review threshold in the report:
review_k       = float(meta.get("review_k", 0.20))
topk_thr_in_meta = meta.get("report", {}).get(f"Threshold@top_{int(review_k*100)}%")

# -------------------------------
# 1) Load policy thresholds (prefers policy.json, then metadata, then fallback)
# -------------------------------
def load_policy_thresholds():
    # 1) policy.json if present
    if POLICY_PATH.exists():
        with POLICY_PATH.open("r", encoding="utf-8") as f:
            pol = json.load(f)
        thr_reject = pol.get("thresholds", {}).get("thr_reject")
        thr_review = pol.get("thresholds", {}).get("thr_review")
        if thr_reject is not None and thr_review is not None:
            return {"thr_reject": float(thr_reject), "thr_review": float(thr_review), "source": POLICY_PATH.name}

    # 2) thresholds embedded in metadata
    pol_meta = meta.get("policy", {}).get("thresholds", {})
    thr_reject = pol_meta.get("thr_reject")
    thr_review = pol_meta.get("thr_review")
    if thr_reject is not None and thr_review is not None:
        return {"thr_reject": float(thr_reject), "thr_review": float(thr_review), "source": "metadata"}

    # 3) fallback: only a review cut from top-k (2-band)
    if topk_thr_in_meta is not None:
        return {"thr_reject": None, "thr_review": float(topk_thr_in_meta), "source": "meta_topk_only"}

    # 4) last resort (not recommended)
    return {"thr_reject": None, "thr_review": 0.5, "source": "default_0.5"}

POLICY = load_policy_thresholds()



Loading model: best_model_recall_focus_xgb_OptionA_recallAtK.joblib


## Helper Functions

In [31]:
def tieaware_threshold_at_k(probs, k: float) -> float:
    """
    Return the score threshold that selects exactly top k fraction (stable, tie-aware).
    """
    probs = np.asarray(probs, dtype=float).ravel()
    n = len(probs)
    kcount = max(1, int(round(k * n)))
    idx = np.argsort(-probs, kind="mergesort")   # stable descending sort
    sel = idx[:kcount]
    return float(probs[sel[-1]])

# ------------------------------------
# Input normalization utilities
# ------------------------------------
def parse_percent(x):
    if x is None or (isinstance(x, float) and np.isnan(x)): return np.nan
    s = str(x).strip().replace("%","")
    try: return float(s)
    except: return np.nan

def parse_term(x):
    # "36 months" -> 36 ; "60" -> 60
    if x is None: return np.nan
    s = str(x)
    try:
        return float("".join(ch for ch in s if ch.isdigit() or ch == "."))
    except:
        return np.nan

def parse_emp_length(x):
    # "10+ years" -> 10 ; "< 1 year" -> 0.5 ; "3 years" -> 3
    if x is None: return np.nan
    s = str(x).strip().lower()
    if s in {"n/a","na","none",""}: return np.nan
    if s.startswith("<"): return 0.5
    if "10+" in s: return 10.0
    digits = "".join(ch for ch in s if ch.isdigit() or ch == ".")
    try: return float(digits)
    except: return np.nan

def normalize_payload(payload: dict) -> pd.DataFrame:
    """
    Map raw fields -> one-row DataFrame expected by the model.
    Only features used by the model are required.
    """
    row = {}
    # numeric
    row["loan_amnt"]       = payload.get("loan_amnt")
    row["int_rate"]        = parse_percent(payload.get("int_rate"))
    row["fico_range_low"]  = payload.get("fico_range_low")
    row["fico_range_high"] = payload.get("fico_range_high")
    row["annual_inc"]      = payload.get("annual_inc")
    dti_val = payload.get("dti")
    row["dti"]             = parse_percent(dti_val) if isinstance(dti_val, str) and "%" in dti_val else dti_val
    row["revol_util"]      = parse_percent(payload.get("revol_util"))
    row["emp_length_num"]  = parse_emp_length(payload.get("emp_length"))
    row["term_num"]        = parse_term(payload.get("term"))
    # categoricals
    row["grade"]               = payload.get("grade")
    row["sub_grade"]           = payload.get("sub_grade")
    row["home_ownership"]      = payload.get("home_ownership")
    row["verification_status"] = payload.get("verification_status")
    row["purpose"]             = payload.get("purpose")

    prepared = {f: row.get(f, np.nan) for f in feature_set}
    # ensure engineered extras if training used them but they’re not listed in feature_set
    for extra in ["emp_length_num","term_num"]:
        if extra in (num_cols_meta or []) and extra not in prepared:
            prepared[extra] = row.get(extra, np.nan)

    model_feats = feature_set.copy()
    for extra in ["emp_length_num","term_num"]:
        if extra in (num_cols_meta or []) and extra not in model_feats:
            model_feats.append(extra)

    df = pd.DataFrame([prepared], columns=model_feats)

    # numeric coercion
    for c in num_cols_meta:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    if num_cols_meta:
        df[num_cols_meta] = df[num_cols_meta].fillna(
            pd.Series({c: df[c].median() for c in num_cols_meta if c in df.columns})
        )
    for c in cat_cols_meta:
        if c in df.columns:
            df[c] = df[c].astype("object").fillna("Unknown")
    return df

## 3-band decision

In [32]:
def three_band_decision(prob: float, policy: dict, labels=("REJECT","REVIEW","APPROVE")) -> str:
    """
    3-band if both thresholds exist; else 2-band ( REVIEW / APPROVE ) using thr_review only.
    """
    thr_reject = policy.get("thr_reject")
    thr_review = policy.get("thr_review")
    if thr_reject is not None and thr_review is not None:
        if prob >= thr_reject:
            return labels[0]  # REJECT
        elif prob >= thr_review:
            return labels[1]  # REVIEW
        else:
            return labels[2]  # APPROVE
    else:
        return labels[1] if prob >= float(thr_review) else labels[2]



## Scoring + decision + explanation

In [33]:
def score_loan(payload: dict, policy_override: dict | None = None):
    """
    Returns: prob_default, decision, thresholds_used, policy_source, reasons_top (best-effort)
    If policy_override is provided, it supersedes loaded thresholds for this call (testing).
    """
    x = normalize_payload(payload)
    pd_default = float(best_model.predict_proba(x)[:,1][0])
    pol = policy_override if policy_override is not None else POLICY

    decision = three_band_decision(pd_default, pol)
    result = {
        "prob_default": round(pd_default, 6),
        "decision": decision,
        "policy_source": ("override" if policy_override is not None else pol.get("source")),
        "thresholds_used": {
            "thr_reject": (None if pol.get("thr_reject") is None else round(float(pol["thr_reject"]), 6)),
            "thr_review": round(float(pol["thr_review"]), 6) if pol.get("thr_review") is not None else None,
        }
    }

    # Explanations (best-effort)
    try:
        import shap
        pre = best_model.named_steps["pre"]
        clf = best_model.named_steps["clf"]

        X_trans = pre.transform(x)
        if hasattr(X_trans, "toarray"):
            X_trans = X_trans.toarray()

        ohe = pre.transformers_[1][1] if len(cat_cols_meta) else None
        cat_names = list(ohe.get_feature_names_out(cat_cols_meta)) if ohe is not None else []
        feature_names = num_cols_meta + cat_names

        explainer = shap.TreeExplainer(clf)
        sv = explainer.shap_values(X_trans)
        vals = sv[0] if getattr(sv, "ndim", 1) == 2 else sv
        top_idx = np.argsort(-np.abs(vals))[:8]
        reasons = [{"feature": feature_names[i], "shap": float(vals[i])} for i in top_idx]
        result["reasons_top"] = reasons
    except Exception as e:
        result["reasons_top"] = [{"note": f"SHAP unavailable: {e}"}]

    return result

## Example usage

In [34]:
TEST_APPROVE = {
    "loan_amnt": 8000,
    "int_rate": "7.5%",
    "fico_range_low": 780, "fico_range_high": 784,
    "annual_inc": 120000,
    "dti": 6.0,
    "revol_util": "5%",
    "emp_length": "10+ years",
    "term": "36 months",
    "grade": "A", "sub_grade": "A1",
    "home_ownership": "MORTGAGE",
    "verification_status": "Not Verified",
    "purpose": "credit_card",
}

TEST_REVIEW = {
    "loan_amnt": 150000,
    "int_rate": "20.8%",
    "fico_range_low": 690, "fico_range_high": 694,
    "annual_inc": 55000,
    "dti": 12.0,
    "revol_util": "55%",
    "emp_length": "3 years",
    "term": "60 months",
    "grade": "E", "sub_grade": "E3",
    "home_ownership": "RENT",
    "verification_status": "Source Verified",
    "purpose": "debt_consolidation",
}

TEST_REJECT = {
    "loan_amnt": 35000,
    "int_rate": "26.5%",
    "fico_range_low": 660, "fico_range_high": 664,
    "annual_inc": 30000,
    "dti": 39.0,
    "revol_util": "97%",
    "emp_length": "< 1 year",
    "term": "60 months",
    "grade": "G", "sub_grade": "G4",
    "home_ownership": "RENT",
    "verification_status": "Verified",
    "purpose": "small_business",
}

print("\n--- Policy loaded ---")
print(POLICY)

cases = {
    "APPROVE_candidate": TEST_APPROVE,
    "REVIEW_candidate":  TEST_REVIEW,
    "REJECT_candidate":  TEST_REJECT,
}

print("\n=== Decisions with REAL policy ===")
decisions = {}
probs = {}
for name, payload in cases.items():
    out = score_loan(payload)
    decisions[name] = out["decision"]
    probs[name] = out["prob_default"]
    print(f"\n{name} -> {out}")



--- Policy loaded ---
{'thr_reject': 0.7334313988685608, 'thr_review': 0.632895827293396, 'source': 'best_model_policy.json'}

=== Decisions with REAL policy ===

APPROVE_candidate -> {'prob_default': 0.160755, 'decision': 'APPROVE', 'policy_source': 'best_model_policy.json', 'thresholds_used': {'thr_reject': 0.733431, 'thr_review': 0.632896}, 'reasons_top': [{'feature': 'grade_A', 'shap': -0.5587438941001892}, {'feature': 'grade_G', 'shap': 0.5519490838050842}, {'feature': 'sub_grade_A3', 'shap': -0.31447115540504456}, {'feature': 'fico_range_low', 'shap': -0.2961795926094055}, {'feature': 'emp_length_num', 'shap': -0.26404523849487305}, {'feature': 'sub_grade_E3', 'shap': -0.2569412589073181}, {'feature': 'sub_grade_B1', 'shap': -0.2229282557964325}, {'feature': 'verification_status_Not Verified', 'shap': -0.21282769739627838}]}

REVIEW_candidate -> {'prob_default': 0.671523, 'decision': 'REVIEW', 'policy_source': 'best_model_policy.json', 'thresholds_used': {'thr_reject': 0.733431,