<a href="https://colab.research.google.com/github/E-DOR28/machine-learning-class/blob/main/Incorporating_restrictions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **🔧 Instalación e imports**

In [21]:
!pip -q install lightgbm xgboost fairlearn

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, accuracy_score
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)

# Utilidad: generador de dataset sintético de crédito
def generar_dataset_credito(n=8000):
    """
    Dataset sintético con rasgos típicos de originación de crédito:
    - ingreso_mensual (COP)
    - ratio_deuda (0-1)
    - utilizacion_tc (0-1)
    - edad (años)
    - genero (0=f, 1=m) -- atributo sensible para fairness
    Targets:
    - default (binario)
    - loss_12m (pérdida continua en 12 meses, para cuantil/VaR)
    """
    ingreso = np.random.lognormal(mean=np.log(2.5e6), sigma=0.5, size=n)   # media ~2.5M COP
    ratio_deuda = np.clip(np.random.beta(2, 5, size=n), 0, 1)
    utiliz = np.clip(np.random.beta(1.5, 4, size=n), 0, 1)
    edad = np.random.normal(38, 8, size=n).clip(21, 70)
    genero = np.random.binomial(1, 0.5, size=n)  # 0/1 con 50-50

    # Logit de default: ingreso ↓ riesgo, deuda/utilización ↑ riesgo, edad leve efecto
    logit = (
        -4.0
        - 0.0000008 * ingreso
        + 2.2 * ratio_deuda
        + 1.6 * utiliz
        + 0.02 * (edad - 40)  # leve
        + 0.25 * genero       # sesgo indeseado (para fairness)
    )
    p_default = 1 / (1 + np.exp(-logit))
    default = np.random.binomial(1, p_default)

    # Pérdida 12m (continua). Mayor si hay default, depende de deuda/utilización/ingreso.
    base_loss = (300_000 + 1_200_000 * ratio_deuda + 900_000 * utiliz) * (1 + 0.0000003 * (5e6 - ingreso).clip(min=0))
    noise = np.random.lognormal(mean=np.log(100_000), sigma=0.6, size=n)
    loss_12m = (default * (base_loss + noise)).astype(float)

    df = pd.DataFrame({
        'ingreso_mensual': ingreso,
        'ratio_deuda': ratio_deuda,
        'utilizacion_tc': utiliz,
        'edad': edad,
        'genero': genero,
        'default': default,
        'loss_12m': loss_12m
    })
    return df

# Creamos un dataset global para reusar en los ejemplos
df = generar_dataset_credito(n=10000)
train, test = train_test_split(df, test_size=0.25, random_state=42, stratify=df['default'])
X_cols = ['ingreso_mensual', 'ratio_deuda', 'utilizacion_tc', 'edad']

print("\nVista previa (df.head(20)):")
display(df.head(20))



Vista previa (df.head(20)):


Unnamed: 0,ingreso_mensual,ratio_deuda,utilizacion_tc,edad,genero,default,loss_12m
0,3204794.0,0.188077,0.266305,44.400555,0,0,0.0
1,2333008.0,0.170389,0.202144,32.058653,1,0,0.0
2,3456080.0,0.511379,0.200332,41.0185,0,0,0.0
3,5353795.0,0.318793,0.590051,43.152315,0,0,0.0
4,2223792.0,0.384439,0.21486,30.433007,0,0,0.0
5,2223811.0,0.085098,0.195277,42.554367,0,0,0.0
6,5506323.0,0.461075,0.163688,34.026822,0,0,0.0
7,3669326.0,0.326354,0.065519,36.238515,1,0,0.0
8,1976947.0,0.061166,0.107352,45.812179,1,0,0.0
9,3279106.0,0.401143,0.197087,30.662996,1,0,0.0


# **1) Restricción de Monotonía (Crédito)**

Caso de uso (finanzas): En scoring de crédito de consumo, el riesgo de default no debe aumentar cuando el ingreso del cliente aumenta, manteniendo lo demás constante. Además, el riesgo debe aumentar cuando el ratio de deuda o la utilización de la tarjeta suben.

## 1.A) Monotonía con LightGBM (nativa)

In [18]:
import lightgbm as lgb
from sklearn.metrics import roc_auc_score

X_train = train[X_cols].copy()
y_train = train['default'].values
X_test = test[X_cols].copy()
y_test = test['default'].values

# Definimos la dirección de monotonía por columna en el mismo orden de X_cols:
# ingreso_mensual: -1  (a mayor ingreso, MENOR prob de default)
# ratio_deuda:     +1  (a mayor ratio de deuda, MAYOR prob de default)
# utilizacion_tc:  +1  (a mayor utilización, MAYOR prob de default)
# edad:            +1  (supongamos leve aumento de riesgo con edad)
monotone = [-1, +1, +1, +1]

lgb_clf = lgb.LGBMClassifier(
    objective='binary',
    n_estimators=400,
    learning_rate=0.05,
    max_depth=-1,
    monotone_constraints=monotone,
    random_state=42
)
lgb_clf.fit(X_train, y_train)

# Evaluación rápida
proba = lgb_clf.predict_proba(X_test)[:,1]
print("AUC con monotonía:", roc_auc_score(y_test, proba))

# Verificación práctica de monotonía respecto a ingreso para una fila fija:
def chequear_monotonia_ingreso(modelo, fila_idx=0):
    base = X_test.iloc[[fila_idx]].copy()
    ingresos = np.linspace(X_test['ingreso_mensual'].quantile(0.05),
                           X_test['ingreso_mensual'].quantile(0.95), 25)
    preds = []
    for inc in ingresos:
        base_mod = base.copy()
        base_mod['ingreso_mensual'] = inc
        preds.append(modelo.predict_proba(base_mod)[:,1][0])
    # Debe ser NO creciente si la restricción se cumple (monotone -1)
    violaciones = np.sum(np.diff(preds) > 1e-8)
    return ingresos, preds, violaciones

ingresos, preds, viol = chequear_monotonia_ingreso(lgb_clf, fila_idx=0)
print(f"Violaciones de monotonía vs ingreso (esperado 0): {viol}")


[LightGBM] [Info] Number of positive: 91, number of negative: 7409
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000423 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1020
[LightGBM] [Info] Number of data points in the train set: 7500, number of used features: 4
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.012133 -> initscore=-4.399591
[LightGBM] [Info] Start training from score -4.399591
AUC con monotonía: 0.7776653171390013
Violaciones de monotonía vs ingreso (esperado 0): 0


## 1.B) Manual (posprocesado monotónico por cumulative-min sobre la secuencia ordenada)

Entrenamos un modelo sin restricciones y luego forzamos que, al ordenar por ingreso, las predicciones sean no crecientes aplicando un “cumulative min” (idea simple emparentada con PAV).

In [19]:
from xgboost import XGBClassifier

# Modelo base sin restricciones
xgb = XGBClassifier(
    max_depth=4, n_estimators=400, learning_rate=0.05, subsample=0.9,
    colsample_bytree=0.9, objective='binary:logistic', random_state=42, eval_metric='logloss'
)
xgb.fit(X_train, y_train)
proba_base = xgb.predict_proba(X_test)[:,1]
print("AUC base (sin restricción):", roc_auc_score(y_test, proba_base))

# 1) Ordenamos por ingreso y tomamos las probabilidades
order = np.argsort(X_test['ingreso_mensual'].values)
proba_sorted = proba_base[order]

# 2) Enforce no-increasing: cada valor no puede superar al anterior
proba_sorted_monotone = proba_sorted.copy()
for i in range(1, len(proba_sorted_monotone)):
    if proba_sorted_monotone[i] > proba_sorted_monotone[i-1]:
        proba_sorted_monotone[i] = proba_sorted_monotone[i-1]

# 3) Restauramos el orden original
proba_monotone = np.empty_like(proba_base)
proba_monotone[order] = proba_sorted_monotone

print("AUC tras posprocesado monotónico:", roc_auc_score(y_test, proba_monotone))

# Chequeo de violaciones tras posprocesado (debe ser 0)
violaciones = 0
prev = -1
# NOTA: como forzamos no-creciente, revisamos directamente secuencia ordenada:
for i in range(1, len(proba_sorted_monotone)):
    if proba_sorted_monotone[i] - proba_sorted_monotone[i-1] > 1e-8:
        violaciones += 1
print("Violaciones tras forzar monotonía (esperado 0):", violaciones)


AUC base (sin restricción): 0.7083940620782725
AUC tras posprocesado monotónico: 0.7248110661268554
Violaciones tras forzar monotonía (esperado 0): 0


# **2) Percentiles / Cuantiles (Riesgo / Capital económico)**

Caso de uso (finanzas): Estimar el VaR 95% (cuantil 0.95) de la pérdida a 12 meses (loss_12m) para portafolio de tarjetas o microcrédito. El modelo debe respetar un cuantil objetivo (no “sobrepasarlo” sistemáticamente).

## 2.A) Con librería (Quantile Regression vía Gradient Boosting)

In [11]:
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_pinball_loss

Xr_train = train[X_cols].copy()
yr_train = train['loss_12m'].values
Xr_test  = test[X_cols].copy()
yr_test  = test['loss_12m'].values

# Objetivo cuantil (VaR 95%)
alpha = 0.95
gbr_q = GradientBoostingRegressor(loss='quantile', alpha=alpha, n_estimators=500, learning_rate=0.05, random_state=42)
gbr_q.fit(Xr_train, yr_train)

yhat_q95 = gbr_q.predict(Xr_test)

# Métrica pinball (debería ser baja si el cuantil está bien ajustado)
print("Pinball loss (alpha=0.95):", mean_pinball_loss(yr_test, yhat_q95, alpha=alpha))

# Chequeo de cobertura empírica: % de observaciones <= cuantil predicho (aprox. 95% esperado)
coverage = np.mean(yr_test <= yhat_q95)
print(f"Cobertura empírica ~ {coverage:.3f} (ideal ~ {alpha})")


Pinball loss (alpha=0.95): 24850.12845472449
Cobertura empírica ~ 0.988 (ideal ~ 0.95)


## 2.B) Cuantiles manuales (pinball loss + GD) para un modelo lineal

In [12]:
from sklearn.preprocessing import StandardScaler

alpha = 0.95
Xr = Xr_train.copy()
yr = yr_train.copy()

scaler = StandardScaler()
Xr_s = scaler.fit_transform(Xr)  # estandarizamos para estabilizar el GD

# Inicializamos pesos para modelo lineal: y_hat = Xw + b
w = np.zeros(Xr_s.shape[1])
b = 0.0

lr = 0.05     # tasa de aprendizaje
epochs = 400  # iteraciones

for ep in range(epochs):
    # Predicción
    yhat = Xr_s @ w + b

    # Residuo para pinball: r = y - yhat
    r = yr - yhat

    # Subgradiente dL/dyhat:
    # si r > 0 -> dL/dyhat = -alpha
    # si r < 0 -> dL/dyhat = 1 - alpha
    # si r = 0 -> cualquier valor entre [-alpha, 1-alpha]; usamos 0
    g = np.where(r > 0, -alpha, (1 - alpha))
    g[r == 0] = 0.0

    # Gradientes respecto a w y b (promedio)
    grad_w = (g[:, None] * Xr_s).mean(axis=0) * 2.0
    grad_b = g.mean() * 2.0

    # Paso de descenso de gradiente
    w -= lr * grad_w
    b -= lr * grad_b

# Inferencia en test
Xr_test_s = scaler.transform(Xr_test)
yhat_q95_lin = Xr_test_s @ w + b

# Cobertura empírica del cuantil
coverage_lin = np.mean(yr_test <= yhat_q95_lin)
print(f"Cobertura empírica (lineal-GD) ~ {coverage_lin:.3f} (ideal ~ {alpha})")

Cobertura empírica (lineal-GD) ~ 0.988 (ideal ~ 0.95)


# **3) Fairness / Equidad (Aprobación de crédito)**

Caso de uso (finanzas): En un clasificador de aprobación (aprobado/rechazado), queremos que la tasa de aprobación sea similar entre grupos sensibles (p. ej., género), salvo justificación de negocio/regulatoria. Aquí mostramos Demographic Parity (paridad de tasas de selección).

## 3.A) Fairness con Fairlearn (Demographic Parity)

In [16]:
# @title 3.A) Fairness con Fairlearn (Demographic Parity) — versión corregida
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, accuracy_score
from fairlearn.reductions import ExponentiatedGradient, DemographicParity

np.random.seed(42)  # reproducibilidad global

# Entradas (reutiliza tus variables ya creadas)
X_f_train = train[X_cols].copy()
y_f_train = train['default'].values
A_train   = train['genero'].values  # atributo sensible (0=f, 1=m)

X_f_test  = test[X_cols].copy()
y_f_test  = test['default'].values
A_test    = test['genero'].values

# Clasificador base (controla aquí la reproducibilidad)
base_clf = LogisticRegression(max_iter=1000, solver='liblinear', random_state=42)

# Restricción de Fairness: Paridad Demográfica (tasa de aprobación similar entre grupos)
constraint = DemographicParity()

# ⚠️ Importante: NO pasar random_state aquí (no está soportado)
mitigator = ExponentiatedGradient(
    estimator=base_clf,
    constraints=constraint,
    eps=0.02,              # tolerancia de violación (~2%)
    # T=50, eta0=2.0, ... (dejar por defecto salvo que quieras tunear)
)

# Entrenamiento con atributo sensible
mitigator.fit(X_f_train, y_f_train, sensitive_features=A_train)

# --- Probabilidades "fair" ---
# Algunas versiones exponen _pmf_predict (probabilidad del positivo por mezcla aleatoria).
# Si no existe, usamos un fallback por Monte Carlo con predict().
if hasattr(mitigator, "_pmf_predict"):
    # Matriz [n_samples, 2] con P(y=0), P(y=1); tomamos la col de la clase positiva.
    yhat_prob_fair = mitigator._pmf_predict(X_f_test)[:, 1]
else:
    # Fallback determinístico aproximado a las probabilidades por aleatorización interna
    draws = 200  # sube si quieres mayor suavidad
    acc_prob = np.zeros(len(X_f_test), dtype=float)
    for _ in range(draws):
        acc_prob += mitigator.predict(X_f_test)  # 0/1
    yhat_prob_fair = acc_prob / draws

# Decisión final con umbral 0.5 (puedes calibrar si lo necesitas)
yhat_fair = (yhat_prob_fair >= 0.5).astype(int)

# Métricas: AUC/accuracy y paridad de tasas de aprobación por grupo
auc_fair = roc_auc_score(y_f_test, yhat_prob_fair)
acc_fair = accuracy_score(y_f_test, yhat_fair)

rate_fair_g0 = yhat_fair[A_test == 0].mean()
rate_fair_g1 = yhat_fair[A_test == 1].mean()

print(f"AUC (fair): {auc_fair:.3f} | Accuracy: {acc_fair:.3f}")
print(f"Tasa aprobación g=0: {rate_fair_g0:.3f} | g=1: {rate_fair_g1:.3f} (≈ paridad)")

AUC (fair): 0.500 | Accuracy: 0.988
Tasa aprobación g=0: 0.000 | g=1: 0.000 (≈ paridad)


## 3.B) Manual (umbrales por grupo para Demographic Parity)

Entrenamos un modelo sin fairness y ajustamos umbrales por grupo para igualar la tasa de aprobación.

In [17]:
from sklearn.linear_model import LogisticRegression

# Modelo base sin correcciones
clf = LogisticRegression(max_iter=1000, solver='liblinear', random_state=42)
clf.fit(X_f_train, y_f_train)
score = clf.predict_proba(X_f_test)[:,1]

# 1) Tasa de aprobación global con umbral 0.5 (referencia)
yhat_global = (score >= 0.5).astype(int)
rate_global = yhat_global.mean()
print(f"Tasa global con umbral 0.5: {rate_global:.3f}")

# 2) Elegimos una tasa objetivo común (p. ej., la tasa global con 0.5)
target_rate = rate_global

# 3) Umbral por grupo = cuantil 1 - target_rate del score de cada grupo
thr_g0 = np.quantile(score[A_test==0], 1 - target_rate)
thr_g1 = np.quantile(score[A_test==1], 1 - target_rate)

# 4) Predicción con umbral específico por grupo
yhat_parity = np.zeros_like(y_f_test)
yhat_parity[(A_test==0) & (score >= thr_g0)] = 1
yhat_parity[(A_test==1) & (score >= thr_g1)] = 1

# 5) Métricas y verificación de paridad
auc_base = roc_auc_score(y_f_test, score)
acc_par = accuracy_score(y_f_test, yhat_parity)
rate_g0 = yhat_parity[A_test==0].mean()
rate_g1 = yhat_parity[A_test==1].mean()

print(f"AUC base (sin fairness): {auc_base:.3f}")
print(f"Acc con paridad manual: {acc_par:.3f}")
print(f"Tasa aprobación grupo 0: {rate_g0:.3f} | grupo 1: {rate_g1:.3f} (forzada ≈ {target_rate:.3f})")

Tasa global con umbral 0.5: 0.000
AUC base (sin fairness): 0.728
Acc con paridad manual: 0.987
Tasa aprobación grupo 0: 0.001 | grupo 1: 0.001 (forzada ≈ 0.000)
