
## Item 2

In [1]:
import numpy as np
import pandas as pd
import yaml
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from joblib import Parallel, delayed

Se procede a cargar el dataset.

In [2]:
def load_dataset():
    data = fetch_california_housing()
    return data

Se sigue con el preprocesamiento, que asegura que los datos estén en una forma adecuada para ser alimentados a los modelos. Se escalan las características, ya que los modelos basados en gradientes (como SVM y Regresión Logística) se benefician de tener datos escalados.

In [3]:
def make_preprocessor(df, target, cfg):
    y = df[target]
    X = df.drop(columns=[target])

    num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in X.columns if c not in num_cols]

    transformers = []

    if num_cols:
        steps_num = [("imputer", SimpleImputer(strategy=cfg["impute_strategy_num"]))]
        if cfg["scale_numeric"]:
            steps_num.append(("scaler", StandardScaler()))
        transformers.append(("num", Pipeline(steps=steps_num), num_cols))

    if cat_cols and cfg["one_hot_encode"]:
        transformers.append(("cat", Pipeline(steps=[
                                 ("imputer", SimpleImputer(strategy=cfg["impute_strategy_cat"])),
                                 ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=False))
                             ]), cat_cols))
    pre = ColumnTransformer(transformers=transformers)
    return X, y, pre

La función build_sgd_classifier se encarga de crear un modelo de SGDClassifier configurado con los hiperparámetros proporcionados en el archivo YAML. Usa partial_fit para inicializar el modelo y permitir que se entrene de manera incremental. Es ideal para entrenar el modelo de manera eficiente en un entorno de aprendizaje en línea o entrenamiento por lotes.

In [4]:
def build_sgd_classifier(cfg, classes_):
    clf = SGDClassifier(
        loss=cfg["loss"],
        learning_rate=cfg["learning_rate"],
        eta0=cfg["eta0"],
        alpha=cfg.get("alpha", 0.0001),
        penalty=cfg.get("penalty", "l2"),
        random_state=cfg.get("seed", 42),
        early_stopping=False,      
        warm_start=True,
        max_iter=1,                
        tol=None
    )
    clf.partial_fit(np.zeros((1, classes_.shape[0])), classes=classes_)
    return clf


Luego se entrena el modelo SGDClassifier por una única época usando lotes de tamaño configurable. Es una forma eficiente de entrenar el modelo de manera incremental, lo cual es útil cuando estás ajustando varios modelos con diferentes configuraciones o en datasets grandes.

In [5]:
def train_one_epoch(clf, X_train, y_train, batch_size=256, shuffle=True, classes_=None):
    n = X_train.shape[0]
    idx = np.arange(n)
    if shuffle:
        np.random.shuffle(idx)
    for start in range(0, n, batch_size):
        end = min(start + batch_size, n)
        batch_idx = idx[start:end]
        clf.partial_fit(X_train[batch_idx], y_train[batch_idx], classes=classes_)
    return clf

Después, se evalúa el desempeño de un modelo durante el entrenamiento, basándose en métricas específicas. El resultado de esta evaluación se utiliza para decidir qué configuraciones de modelo deben ser descartadas (culling) durante el proceso de entrenamiento.

In [6]:
def score_for_culling(model_type, clf, X, y, scoring="neg_mean_squared_error"):
    if scoring == "neg_mean_squared_error":
        y_pred = clf.predict(X)
        return -mean_squared_error(y, y_pred)
    elif scoring == "r2":
        y_pred = clf.predict(X)
        return r2_score(y, y_pred)
    else:
        return r2_score(y, clf.predict(X))

Luego se entrenar un solo modelo por una época utilizando un lote de datos (batch), para luego devolver el modelo entrenado junto con su nombre. Es una función auxiliar que se usa en procesos de entrenamiento en paralelo, especialmente cuando entrenamos múltiples modelos con diferentes configuraciones de hiperparámetros.

In [7]:
def _train_epoch_job(model_pack, X_train, y_train, batch_size, shuffle, classes_):
    name, clf = model_pack["name"], model_pack["model"]
    clf = train_one_epoch(clf, X_train, y_train, batch_size=batch_size, shuffle=shuffle, classes_=classes_)
    return {"name": name, "model": clf}


La siguiente función entrena múltiples configuraciones de modelos en paralelo y realiza un proceso de "culling" (eliminación de modelos con peor desempeño) cada cierto número de épocas. El objetivo es mantener solo las mejores configuraciones de modelos y entrenarlas hasta el final.

In [8]:
def train_with_culling(
    model_type, configs, X_train, y_train, classes_,
    batch_size=256, shuffle=True, max_epochs=50, eval_every=5, scoring="neg_mean_squared_error", n_jobs=-1):
    active = []
    for cfg in configs:
        clf = build_sgd_classifier(cfg, classes_)
        active.append({"name": cfg["name"], "cfg": cfg, "model": clf, "history": []})

    epoch = 0
    while epoch < max_epochs and len(active) > 1:
        results = Parallel(n_jobs=n_jobs)(delayed(_train_epoch_job)(m, X_train, y_train, batch_size, shuffle, classes_) for m in active)
        for i, res in enumerate(results):
            active[i]["model"] = res["model"]
        epoch += 1
        if epoch % eval_every == 0:
            scores = []
            for m in active:
                s = score_for_culling(model_type, m["model"], X_train, y_train, scoring=scoring)
                m["history"].append({"epoch": epoch, "score": s})
                scores.append((m["name"], s))
            worst = min(scores, key=lambda x: x[1])
            active = [m for m in active if m["name"] != worst[0]]
    
    active.sort(key=lambda m: m["history"][-1]["score"], reverse=True)
    return active

La siguiente función se encarga de evaluar el desempeño de un modelo utilizando el conjunto de test (datos que el modelo no ha visto durante el entrenamiento). Calcula dos métricas comunes en  regresión, MSE (Error Cuadrático Medio) y R² (Coeficiente de Determinación).

In [9]:
def test_metrics(clf, X_test, y_test):
    y_pred = clf.predict(X_test)
    return {
        "MSE": mean_squared_error(y_test, y_pred),
        "R2": r2_score(y_test, y_pred)
    }

Seguido de esto, se comienza la función main.

In [None]:
Se carga yaml y datos a utilizar, preprocesando los datos.

In [12]:
def main():
    with open("configs/experiment.yaml", "r") as f:
        cfg = yaml.safe_load(f)

    data = load_dataset()
    X = data.data
    y = data.target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    classes_ = np.unique(y_train)