In [1]:
import gzip
import json
import os
import pickle
from typing import List, Tuple

import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    balanced_accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler

In [2]:
MODEL_FILENAME = "../files/models/model.pkl.gz"
METRICS_FILENAME = "../files/output/metrics.json"
TRAIN_PATH = os.path.join("..", "files", "input", "train_data.csv.zip")
TEST_PATH = os.path.join("..", "files", "input", "test_data.csv.zip")

In [3]:
# 1. CARGA
def load_raw_data() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Carga los CSV comprimidos de entrenamiento y prueba."""
    if not os.path.exists(TRAIN_PATH):
        raise FileNotFoundError(f"No encuentro {TRAIN_PATH}")
    if not os.path.exists(TEST_PATH):
        raise FileNotFoundError(f"No encuentro {TEST_PATH}")

    df_train = pd.read_csv(TRAIN_PATH)
    df_test = pd.read_csv(TEST_PATH)
    return df_train, df_test

In [4]:
# 1. LIMPIEZA

def clean_dataset(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aplica las transformaciones del Paso 1:
    - Renombra "default payment next month" a "default".
    - Elimina la columna "ID".
    - Agrupa EDUCATION > 4 en 4 ("others").
    - Elimina filas con NA.
    """
    # Renombrar variable objetivo
    if "default payment next month" in df.columns:
        df = df.rename(columns={"default payment next month": "default"})

    # Quitar columna ID si existe
    if "ID" in df.columns:
        df = df.drop(columns=["ID"])

    # EDUCATION > 4 -> 4
    if "EDUCATION" in df.columns:
        df.loc[df["EDUCATION"] > 4, "EDUCATION"] = 4
        df = df.query('MARRIAGE != 0 and EDUCATION != 0')

    # Eliminar filas con NA
    df = df.dropna(axis=0, how="any").reset_index(drop=True)

    return df


In [5]:
def split_xy(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
    """Separa X y (Paso 2)."""
    if "default" not in df.columns:
        raise RuntimeError('La columna objetivo "default" no existe en el dataset.')

    y = df["default"].astype(int)
    X = df.drop(columns=["default"])
    return X, y

In [6]:
# 2. PIPELINE + GRID SEARCH
# ---------------------------------------------------------------------
def build_model(x_train: pd.DataFrame) -> GridSearchCV:
    """
    Crea el pipeline y el GridSearchCV (Pasos 3 y 4).

    Pipeline:
      - ColumnTransformer con OneHotEncoder para columnas categóricas
        y MinMaxScaler para columnas numéricas
      - SelectKBest para seleccionar las K mejores
      - LogisticRegression como modelo final
    """

    # Columnas categóricas
    categorical_features: List[str] = []
    for col in ["SEX", "EDUCATION", "MARRIAGE"]:
        if col in x_train.columns:
            categorical_features.append(col)

    # El resto se considera numérico
    numeric_features = [c for c in x_train.columns if c not in categorical_features]

    preprocessor = ColumnTransformer(
        transformers=[
            (
                "cat",
                OneHotEncoder(handle_unknown="ignore"),
                categorical_features,
            ),
            (
                "num",
                MinMaxScaler(),
                numeric_features,
            ),
        ]
    )

    selector = SelectKBest(score_func=f_classif)

    # Regresión logística estándar (sin class_weight)
    logreg = LogisticRegression(
        max_iter=5000,
        solver="lbfgs",    # muy común en ejemplos de clase
        n_jobs=-1,
        class_weight=None,
    )

    pipeline = Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            ("select", selector),
            ("logreg", logreg),
        ],
        verbose=False,
    )

    # Rejilla de hiperparámetros: K y C
    # Puedes ajustar estos valores si ves que no alcanza
    k_values = [1, 10, 13]

    param_grid = {
        "select__k": k_values,
        "logreg__C": [0.1, 1.0, 10.0, 100.0],
    }

    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        cv=10,
        scoring="balanced_accuracy",
        n_jobs=-1,
        refit=True,
        verbose=0,
    )

    return grid


In [7]:
# 3. MÉTRICAS
# ---------------------------------------------------------------------
def compute_classification_metrics(y_true, y_pred, dataset_name: str) -> dict:
    """Diccionario con las métricas que pide el test (Paso 6)."""
    return {
        "type": "metrics",
        "dataset": dataset_name,
        "precision": float(precision_score(y_true, y_pred, zero_division=0)),
        "balanced_accuracy": float(balanced_accuracy_score(y_true, y_pred)),
        "recall": float(recall_score(y_true, y_pred, zero_division=0)),
        "f1_score": float(f1_score(y_true, y_pred, zero_division=0)),
    }


In [8]:
def compute_confusion_matrix_dict(y_true, y_pred, dataset_name: str) -> dict:
    """Diccionario con la matriz de confusión en el formato del Paso 7."""
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    tn, fp, fn, tp = cm.ravel()

    return {
        "type": "cm_matrix",
        "dataset": dataset_name,
        "true_0": {"predicted_0": int(tn), "predicted_1": int(fp)},
        "true_1": {"predicted_0": int(fn), "predicted_1": int(tp)},
    }


In [9]:
# 4. GUARDAR MODELO Y MÉTRICAS
# ---------------------------------------------------------------------
def save_model(model) -> None:
    os.makedirs(os.path.dirname(MODEL_FILENAME), exist_ok=True)
    with gzip.open(MODEL_FILENAME, "wb") as f:
        pickle.dump(model, f)


def save_metrics(records) -> None:
    """
    records: lista de diccionarios.
    Orden esperado por el test:
      0 -> métricas train
      1 -> métricas test
      2 -> matriz confusión train
      3 -> matriz confusión test
    """
    os.makedirs(os.path.dirname(METRICS_FILENAME), exist_ok=True)
    with open(METRICS_FILENAME, "w", encoding="utf-8") as f:
        for rec in records:
            f.write(json.dumps(rec) + "\n")

In [10]:
# 5. MAIN
# ---------------------------------------------------------------------
def main() -> None:
    # Paso 1: cargar y limpiar
    df_train_raw, df_test_raw = load_raw_data()
    df_train = clean_dataset(df_train_raw)
    df_test = clean_dataset(df_test_raw)

    # Paso 2: dividir en X/y
    x_train, y_train = split_xy(df_train)
    x_test, y_test = split_xy(df_test)

    # Pasos 3 y 4: pipeline + GridSearchCV
    model = build_model(x_train)
    model.fit(x_train, y_train)

    # Paso 5: guardar modelo
    save_model(model)

    # Paso 6: métricas
    y_pred_train = model.predict(x_train)
    y_pred_test = model.predict(x_test)

    metrics_train = compute_classification_metrics(y_train, y_pred_train, "train")
    metrics_test = compute_classification_metrics(y_test, y_pred_test, "test")

    # Paso 7: matrices de confusión
    cm_train = compute_confusion_matrix_dict(y_train, y_pred_train, "train")
    cm_test = compute_confusion_matrix_dict(y_test, y_pred_test, "test")

    # Guardar en metrics.json
    save_metrics([metrics_train, metrics_test, cm_train, cm_test])


if __name__ == "__main__":
    main()