# 05 - Inference Smoke Test

Este notebook valida que los modelos campeones (EI, IE, ZE, EZ) cargan correctamente desde `training/reports/model_registry.json` y pueden inferir sobre muestras reales.

Checks principales:
- Carga de `TabularPredictor` por transicion.
- `predict` devuelve el mismo numero de filas que la entrada.
- `predict_proba` devuelve probabilidades validas en `[0, 1]`.
- Exporta un reporte consolidado a `training/reports/inference_smoke_test.csv`.

In [None]:
import json
from pathlib import Path
import pandas as pd
from autogluon.tabular import TabularPredictor

RANDOM_STATE = 42
SAMPLE_SIZE = 300


def find_project_root():
    cwd = Path.cwd().resolve()
    for candidate in [cwd, *cwd.parents]:
        if (candidate / "training").exists() and (candidate / "data").exists():
            return candidate
    raise FileNotFoundError("Could not detect project root containing training/ and data/")


project_root = find_project_root()
registry_path = project_root / "training/reports/model_registry.json"
report_path = project_root / "training/reports/inference_smoke_test.csv"

data_path_map = {
    "EI": "data/data_ei.csv",
    "IE": "data/data_ie.csv",
    "ZE": "data/data_ze.csv",
    "EZ": "data/data_ez.csv",
}

project_root, registry_path, report_path

In [None]:
if not registry_path.exists():
    raise FileNotFoundError(
        f"Registry not found at {registry_path}. Run training/src/model_selection.py first."
    )

with registry_path.open("r", encoding="utf-8") as f:
    registry = json.load(f)

models_cfg = registry.get("models", {})
if not models_cfg:
    raise ValueError("Registry has no models configured under 'models'.")

expected = {"EI", "IE", "ZE", "EZ"}
missing = expected - set(models_cfg.keys())
if missing:
    print(f"Warning: missing transitions in registry: {sorted(missing)}")

registry_df = pd.DataFrame(
    [
        {
            "transition": transition,
            "model_path": cfg.get("path"),
            "registry_f1": cfg.get("f1"),
        }
        for transition, cfg in sorted(models_cfg.items())
    ]
)
registry_df

In [None]:
def normalize_binary(values):
    s = pd.Series(values).copy()
    mapped = s.astype(str).str.strip().str.lower().map(
        {
            "1": 1,
            "0": 0,
            "true": 1,
            "false": 0,
            "yes": 1,
            "no": 0,
        }
    )
    if mapped.isna().any():
        numeric = pd.to_numeric(s, errors="coerce")
        if numeric.notna().all():
            mapped = numeric.astype(int)
    if mapped.isna().any():
        raise ValueError("Could not normalize binary values to 0/1")
    return mapped.astype(int).reset_index(drop=True)


def get_positive_proba(proba_output):
    if isinstance(proba_output, pd.Series):
        return pd.to_numeric(proba_output, errors="coerce").astype(float).reset_index(drop=True)

    if isinstance(proba_output, pd.DataFrame):
        for col in [1, "1", True, "True", "true", "positive", "pos", "label_1"]:
            if col in proba_output.columns:
                return pd.to_numeric(proba_output[col], errors="coerce").astype(float).reset_index(drop=True)

        if proba_output.shape[1] == 2:
            return pd.to_numeric(proba_output.iloc[:, 1], errors="coerce").astype(float).reset_index(drop=True)

        return pd.to_numeric(proba_output.max(axis=1), errors="coerce").astype(float).reset_index(drop=True)

    raise TypeError(f"Unsupported probability output type: {type(proba_output)}")


def sample_transition_dataframe(transition, sample_size=SAMPLE_SIZE, random_state=RANDOM_STATE):
    csv_path = project_root / data_path_map[transition]
    if not csv_path.exists():
        raise FileNotFoundError(f"CSV for {transition} not found at {csv_path}")

    df = pd.read_csv(csv_path)
    feature_cols = [c for c in df.columns if c.startswith("B")]
    if not feature_cols:
        raise ValueError(f"No feature columns (B*) found in {csv_path}")

    X = df[feature_cols].copy()
    y = normalize_binary(df["label"])

    if len(X) > sample_size:
        sample_idx = X.sample(n=sample_size, random_state=random_state).index
        X = X.loc[sample_idx].reset_index(drop=True)
        y = y.loc[sample_idx].reset_index(drop=True)
    else:
        X = X.reset_index(drop=True)
        y = y.reset_index(drop=True)

    return X, y, csv_path


def compute_binary_metrics(y_true, y_pred):
    y_true = normalize_binary(y_true)
    y_pred = normalize_binary(y_pred)

    tp = int(((y_pred == 1) & (y_true == 1)).sum())
    tn = int(((y_pred == 0) & (y_true == 0)).sum())
    fp = int(((y_pred == 1) & (y_true == 0)).sum())
    fn = int(((y_pred == 0) & (y_true == 1)).sum())

    accuracy = (tp + tn) / max(tp + tn + fp + fn, 1)
    precision = tp / max(tp + fp, 1)
    recall = tp / max(tp + fn, 1)
    f1 = (2 * precision * recall) / max(precision + recall, 1e-12)

    return {
        "accuracy": float(accuracy),
        "precision": float(precision),
        "recall": float(recall),
        "f1": float(f1),
    }

In [None]:
rows = []

for transition, cfg in sorted(models_cfg.items()):
    row = {
        "transition": transition,
        "status": "FAIL",
        "model_path": "",
        "data_csv": "",
        "n_rows": 0,
        "predict_ok": False,
        "proba_ok": False,
        "pred_classes": "",
        "proba_min": None,
        "proba_max": None,
        "proba_mean": None,
        "accuracy": None,
        "precision": None,
        "recall": None,
        "f1_smoke": None,
        "registry_f1": cfg.get("f1"),
        "error": "",
    }

    try:
        if transition not in data_path_map:
            raise KeyError(f"No CSV mapping configured for transition '{transition}'")

        model_path_raw = cfg.get("path")
        if not model_path_raw:
            raise ValueError(f"Model path missing for transition '{transition}'")

        model_path = Path(model_path_raw)
        if not model_path.is_absolute():
            model_path = project_root / model_path

        predictor = TabularPredictor.load(str(model_path))
        X, y_true, csv_path = sample_transition_dataframe(transition)

        preds_raw = predictor.predict(X, as_pandas=True)
        y_pred = normalize_binary(preds_raw)

        proba_raw = predictor.predict_proba(X, as_pandas=True)
        y_score = get_positive_proba(proba_raw)

        predict_ok = len(y_pred) == len(X)
        proba_ok = bool(y_score.notna().all() and ((y_score >= 0) & (y_score <= 1)).all())

        metrics = compute_binary_metrics(y_true, y_pred)

        row.update(
            {
                "status": "PASS" if (predict_ok and proba_ok) else "FAIL",
                "model_path": str(model_path),
                "data_csv": str(csv_path),
                "n_rows": int(len(X)),
                "predict_ok": bool(predict_ok),
                "proba_ok": bool(proba_ok),
                "pred_classes": ",".join(map(str, sorted(y_pred.unique().tolist()))),
                "proba_min": float(y_score.min()),
                "proba_max": float(y_score.max()),
                "proba_mean": float(y_score.mean()),
                "accuracy": metrics["accuracy"],
                "precision": metrics["precision"],
                "recall": metrics["recall"],
                "f1_smoke": metrics["f1"],
                "error": "",
            }
        )
    except Exception as exc:
        row["error"] = repr(exc)

    rows.append(row)

results_df = pd.DataFrame(rows).sort_values("transition").reset_index(drop=True)
results_df

In [None]:
report_path.parent.mkdir(parents=True, exist_ok=True)
results_df.to_csv(report_path, index=False)
print(f"Smoke test report saved to: {report_path}")

summary_cols = [
    "transition",
    "status",
    "n_rows",
    "predict_ok",
    "proba_ok",
    "accuracy",
    "f1_smoke",
    "registry_f1",
    "error",
]
results_df[summary_cols]

if (results_df["status"] != "PASS").any():
    failed = results_df.loc[results_df["status"] != "PASS", "transition"].tolist()
    raise RuntimeError(f"Smoke test failed for transitions: {failed}")

print("Smoke test completed successfully for all transitions.")