# Colab ML Pipeline Agent (LangChain + Deepseek)

Por: Ricardo Urdaneta


Creamos un agente que dado un dataset, genera código Python para entrenamiento, evaluación y reporte (AutoML simplificado).

Librerías: **langchain-deepseek** (LLM), **scikit-learn** (ML), **evalcards** (reportes).

In [2]:
# Paso 1: instalación rápida
!pip -q install -U langchain-deepseek langchain scikit-learn evalcards python-dotenv
print("Dependencias instaladas")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.5/9.5 MB[0m [31m79.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.0/76.0 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m449.6/449.6 kB[0m [31m25.8 MB/s[0m eta [36m0:00:00[0m
[?25h✅ Dependencias instaladas


Configuramos **DEEPSEEK_API_KEY** de forma segura y probamos una llamada mínima con ChatDeepSeek.

Espera ver impreso OK; si aparece, la conexión está lista. (DeepSeek es OpenAI-compatible y expone https://api.deepseek.com; el paquete langchain-deepseek simplifica el uso desde LangChain).

In [4]:
# Paso 2: setear API Key y prueba mínima
from getpass import getpass
import os

# Ingresa tu clave de DeepSeek de forma segura (no queda impresa) cuando la celda la solicite
os.environ["DEEPSEEK_API_KEY"] = getpass("Ingresa tu DEEPSEEK_API_KEY: ")

# Prueba de inferencia con LangChain + DeepSeek
try:
    from langchain_deepseek import ChatDeepSeek

    llm = ChatDeepSeek(model="deepseek-chat", temperature=0.0)
    resp = llm.invoke("Responde exactamente con: OK")
    print(resp.content.strip())
except Exception as e:
    print("Error al conectar con DeepSeek:", e)

🔑 Ingresa tu DEEPSEEK_API_KEY: ··········
OK


Sube tu archivo .csv

La lectura intenta inferir separador automáticamente.

In [15]:
# Paso 3: Carga de CSV (sube tu archivo)
from google.colab import files
import io, pandas as pd

up = files.upload()  # selecciona tu .csv
fname = next(iter(up))
try:
    df = pd.read_csv(io.BytesIO(up[fname]), sep=None, engine="python", low_memory=False)
except Exception:
    # fallback a coma si el sniff falla
    df = pd.read_csv(io.BytesIO(up[fname]), low_memory=False)

DATASET_NAME = fname
print(f"Cargado: {DATASET_NAME} | Shape: {df.shape}")
display(df.head(3))

Saving train.csv to train (1).csv
Cargado: train (1).csv | Shape: (8693, 14)


Unnamed: 0,PassengerId,HomePlanet,CryoSleep,Cabin,Destination,Age,VIP,RoomService,FoodCourt,ShoppingMall,Spa,VRDeck,Name,Transported
0,0001_01,Europa,False,B/0/P,TRAPPIST-1e,39.0,False,0.0,0.0,0.0,0.0,0.0,Maham Ofracculy,False
1,0002_01,Earth,False,F/0/S,TRAPPIST-1e,24.0,False,109.0,9.0,25.0,549.0,44.0,Juanna Vines,True
2,0003_01,Europa,False,A/0/S,TRAPPIST-1e,58.0,True,43.0,3576.0,0.0,6715.0,49.0,Altark Susent,False


Detectamos:

columnas numéricas y categóricas,

nulos por columna,

columnas “ID-like”,

candidatos a target,

tipo de problema sugerido (classification o regression).

In [16]:
# Paso 4: Sniffing automático
import numpy as np
import re

assert 'df' in globals(), "Primero ejecuta la celda de carga para crear df."

# 1) Tipos de columnas
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = [c for c in df.columns if c not in num_cols]

# 2) Nulos (top 10)
na_ratio = df.isna().mean().sort_values(ascending=False)
na_top10 = na_ratio[na_ratio > 0].head(10)

# 3) ID-like (casi todos únicos o nombre sugiere id)
def looks_like_id(name, series):
    pattern = r"(?:^|_)(id|uuid|guid|index|idx)(?:$|_)"
    uniq_ratio = series.nunique(dropna=True) / max(1, len(series))
    return bool(re.search(pattern, str(name).lower())) or uniq_ratio > 0.98

id_like_cols = [c for c in df.columns if looks_like_id(c, df[c])]

# 4) Candidatos a target (heurística simple)
candidate_names = {"target","label","labels","y","class","clase","categoria","price","precio","score","rating"}
# Primero por nombre
name_hits = [c for c in df.columns if c.lower() in candidate_names]
# Luego por posición (última columna suele ser target en datasets didácticos)
pos_hits = [df.columns[-1]] if df.shape[1] > 0 else []
# Luego por cardinalidad (categórica con pocas categorías o numérica continua)
card_hits = []
for c in df.columns:
    try:
        nunq = df[c].nunique(dropna=True)
        if c not in id_like_cols:
            if c in num_cols:
                if nunq >= 20:  # continuo -> buen candidato para regression
                    card_hits.append(c)
            else:
                if 2 <= nunq <= 50:  # discreto -> buen candidato para classification
                    card_hits.append(c)
    except Exception:
        pass

# Ordenar candidatos por prioridad: nombre > posición > cardinalidad (deduplicando en orden)
SUGGESTED_TARGETS = []
for group in (name_hits, pos_hits, card_hits):
    for c in group:
        if c not in SUGGESTED_TARGETS:
            SUGGESTED_TARGETS.append(c)

# 5) Tipo de problema sugerido a partir del primer candidato fuerte
problem_type = "unknown"
if SUGGESTED_TARGETS:
    t = SUGGESTED_TARGETS[0]
    nunq = df[t].nunique(dropna=True)
    if t in num_cols:
        # umbral simple: si pocos únicos, probablemente clasificación (ej. 0/1)
        problem_type = "classification" if nunq <= 20 else "regression"
    else:
        problem_type = "classification"

# Mostrar resumen conciso
print("Dataset:", DATASET_NAME)
print(f"Filas: {len(df)} | Columnas: {df.shape[1]}")
print(f"Numéricas ({len(num_cols)}): {num_cols[:10]}{' ...' if len(num_cols)>10 else ''}")
print(f"Categóricas ({len(cat_cols)}): {cat_cols[:10]}{' ...' if len(cat_cols)>10 else ''}\n")

if len(na_ratio[na_ratio>0]) == 0:
    print("Nulos: sin valores faltantes detectados.")
else:
    print("Nulos (proporción top 10):")
    display(na_top10.to_frame("null_ratio"))

print(f"\nID-like columns ({len(id_like_cols)}): {id_like_cols if id_like_cols else 'Ninguna'}")
print("\nCandidatos a TARGET (ordenados por heurística):", SUGGESTED_TARGETS if SUGGESTED_TARGETS else "No se encontraron candidatos claros")
print("Tipo de problema sugerido:", problem_type)

# Guardar para el siguiente paso
TARGET = None          # lo fijaremos en la próxima celda (puedes dejarlo en None si quieres elegir manualmente)
PROBLEM_TYPE = problem_type
print("\nSniffing completado.")

Dataset: train (1).csv
Filas: 8693 | Columnas: 14
Numéricas (6): ['Age', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
Categóricas (8): ['PassengerId', 'HomePlanet', 'CryoSleep', 'Cabin', 'Destination', 'VIP', 'Name', 'Transported']

Nulos (proporción top 10):


Unnamed: 0,null_ratio
CryoSleep,0.024963
ShoppingMall,0.023927
VIP,0.023352
HomePlanet,0.023122
Name,0.023007
Cabin,0.022892
VRDeck,0.021627
Spa,0.021051
FoodCourt,0.021051
Destination,0.020936



ID-like columns (1): ['PassengerId']

Candidatos a TARGET (ordenados por heurística): ['Transported', 'HomePlanet', 'CryoSleep', 'Destination', 'Age', 'VIP', 'RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
Tipo de problema sugerido: classification

Sniffing completado.



• Elige la variable objetivo (TARGET).

• Se elimina columnas irrelevantes como 'ID' o similares.

• Se ajusta el tipo de problema (classification o regression).

In [17]:
# Paso 5: selección del target y limpieza avanzada
import ipywidgets as widgets
from IPython.display import display, clear_output
import numpy as np
import re

assert 'df' in globals(), "Primero carga y analiza el dataset."

# --- Detección de columnas irrelevantes  ---
def detect_irrelevant_columns(df):
    irrelevantes = []
    patrones = [
        r"(?:^|_)(id|uuid|guid|index|idx|key|code|serial|registro|row)(?:$|_)",
    ]
    for c in df.columns:
        name = str(c).lower()
        series = df[c]
        # Por patrón de nombre
        if any(re.search(p, name) for p in patrones):
            irrelevantes.append(c)
            continue
        # Por cardinalidad o nulos
        nunq = series.nunique(dropna=True)
        if nunq <= 1:  # un solo valor
            irrelevantes.append(c)
        elif nunq / len(series) > 0.98:  # casi todos únicos
            irrelevantes.append(c)
        elif series.isna().mean() > 0.9:  # demasiados nulos
            irrelevantes.append(c)
    return sorted(list(set(irrelevantes)))

irrelevantes = detect_irrelevant_columns(df)

# --- Widget para seleccionar el target ---
target_dropdown = widgets.Dropdown(
    options=df.columns.tolist(),
    value=SUGGESTED_TARGETS[0] if SUGGESTED_TARGETS else None,
    description='Target:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='60%')
)
display(target_dropdown)

button = widgets.Button(description="Confirmar TARGET", button_style='success')
output = widgets.Output()
display(button, output)

def on_confirm(b):
    clear_output(wait=True)
    global TARGET, PROBLEM_TYPE, df

    TARGET = target_dropdown.value
    print(f"Target seleccionado: {TARGET}")

    # Eliminar columnas irrelevantes (sin eliminar el target)
    cols_to_drop = [c for c in irrelevantes if c != TARGET]
    if cols_to_drop:
        df = df.drop(columns=cols_to_drop, errors='ignore')
        print(f"Columnas irrelevantes eliminadas ({len(cols_to_drop)}): {cols_to_drop}")
    else:
        print("No se detectaron columnas irrelevantes para eliminar.")

    # Recalcular tipos tras limpieza
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()

    # Detectar tipo de problema
    nunq = df[TARGET].nunique(dropna=True)
    if TARGET in num_cols:
        PROBLEM_TYPE = "classification" if nunq <= 20 else "regression"
    else:
        PROBLEM_TYPE = "classification"

    print(f"Tipo de problema detectado: {PROBLEM_TYPE}")
    print(f"Distribución de {TARGET}:")
    display(df[TARGET].value_counts().head(10) if PROBLEM_TYPE=="classification"
            else df[TARGET].describe())

    print(f"Columnas restantes después de limpieza: {df.shape[1]}")

button.on_click(on_confirm)



Target seleccionado: Transported
Columnas irrelevantes eliminadas (1): ['PassengerId']
Tipo de problema detectado: classification
Distribución de Transported:


Unnamed: 0_level_0,count
Transported,Unnamed: 1_level_1
True,4378
False,4315


Columnas restantes después de limpieza: 13


Se arma un resumen compacto del dataset (columnas num/cat, cardinalidades clave, tipo de problema, target) para que el LLM diseñe un plan.

In [18]:
# Paso 6A: preparar contexto compacto del dataset para el LLM (planner)
import numpy as np
import json

assert 'df' in globals() and 'PROBLEM_TYPE' in globals(), "Asegúrate de haber corrido los pasos previos."

def _cardinality_stats(series, max_show=5):
    nunq = int(series.nunique(dropna=True))
    sample_vals = series.dropna().unique()[:max_show].tolist()
    return {"unique": nunq, "sample_values": [str(v) for v in sample_vals]}

# Tipos y cardinalidades básicas
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = [c for c in df.columns if c not in num_cols]

cat_info = {c: _cardinality_stats(df[c]) for c in cat_cols}
num_info = {c: {"missing_ratio": float(df[c].isna().mean())} for c in num_cols}

DATA_SUMMARY = {
    "rows": int(len(df)),
    "cols": int(df.shape[1]),
    "target": TARGET,
    "problem_type": PROBLEM_TYPE,  # "classification" | "regression"
    "numeric_cols": num_cols,
    "categorical_cols": cat_cols,
    "categorical_cardinality": cat_info,
    "numeric_missing": num_info,
}

print("Contexto listo para el plan.")
print(json.dumps({
    k: (v if k not in ("categorical_cardinality","numeric_missing") else f"...{len(v)} columns...")
    for k, v in DATA_SUMMARY.items()
}, indent=2))

Contexto listo para el plan.
{
  "rows": 8693,
  "cols": 13,
  "target": "Transported",
  "problem_type": "classification",
  "numeric_cols": [
    "Age",
    "RoomService",
    "FoodCourt",
    "ShoppingMall",
    "Spa",
    "VRDeck"
  ],
  "categorical_cols": [
    "HomePlanet",
    "CryoSleep",
    "Cabin",
    "Destination",
    "VIP",
    "Name",
    "Transported"
  ],
  "categorical_cardinality": "...7 columns...",
  "numeric_missing": "...6 columns..."
}


Le pedimos al LLM un plan JSON (estricto) con:

**split:** test_size, random_state, stratify (si aplica).

**preprocessing:** imputación, escalado numérico, encoding de categóricas, manejo de alta cardinalidad.

**models:** 2–3 modelos baseline y una búsqueda ligera (param_grid simple).

**cv:** esquema de validación (KFold/StratifiedKFold), n_splits.

**metrics:** lista de métricas (reg: MAE/RMSE/R^2; cla: accuracy/F1/AUC según binaria/multiclase).

**evalcards:** título y flags para incluir curvas/tablas.

In [19]:
# Paso 6B: invocar a DeepSeek para obtener el plan JSON del pipeline
from langchain_deepseek import ChatDeepSeek
import json

assert "DEEPSEEK_API_KEY" in os.environ, "Falta la clave DEEPSEEK_API_KEY (ver Paso 2)."

llm = ChatDeepSeek(model="deepseek-chat", temperature=0.0)

SYSTEM_MSG = (
    "Eres un arquitecto de pipelines de ML. Devuelve SOLO un JSON válido y compacto, "
    "sin texto adicional, siguiendo el esquema solicitado."
)

USER_INSTRUCTIONS = f"""
Diseña un plan de entrenamiento de ML para un dataset con el siguiente resumen:

{json.dumps(DATA_SUMMARY, ensure_ascii=False)}

Requisitos:
- Usa convenciones Scikit-Learn puras para que luego pueda implementarse con Pipeline/ColumnTransformer.
- Ajusta según problem_type = "{PROBLEM_TYPE}".
- Si classification binaria: usar metrics: ['accuracy','f1','roc_auc'].
- Si classification multiclase: usar metrics: ['accuracy','f1_macro'] (evitar ROC AUC si no es apropiado).
- Si regression: usar metrics: ['mae','rmse','r2'].

Devuelve SOLO este JSON (sin comentarios ni texto extra):
{{
  "split": {{
    "test_size": 0.2,
    "random_state": 42,
    "stratify": true | false
  }},
  "preprocessing": {{
    "numeric": {{"imputer": "median", "scaler": "standard" | "minmax" | null}},
    "categorical": {{
      "imputer": "most_frequent",
      "encoder": "onehot",
      "handle_high_cardinality": {{"threshold": 50, "strategy": "hashing" | "target" | "frequency"}}
    }},
    "drop_low_variance": true | false
  }},
  "models": [
    {{
      "name": "BaselineRegressorOrClassifier",
      "estimator": "LinearRegression|Ridge|LogisticRegression|RandomForest|XGBoost|LightGBM|GradientBoosting",
      "param_grid": {{}}
    }},
    {{
      "name": "SecondModel",
      "estimator": "RandomForest|GradientBoosting|XGBoost|LightGBM",
      "param_grid": {{"n_estimators": [100, 300], "max_depth": [None, 6, 10]}}
    }}
  ],
  "cv": {{"strategy": "KFold|StratifiedKFold", "n_splits": 5, "shuffle": true, "random_state": 42}},
  "metrics": ["..."],
  "evalcards": {{"title": "Mi modelo", "include_threshold_curves": true}}
}}
"""

try:
    resp = llm.invoke([
        {"role":"system","content":SYSTEM_MSG},
        {"role":"user","content":USER_INSTRUCTIONS}
    ])
    raw = resp.content.strip()
    # Intento de parseo robusto
    raw = raw[raw.find("{") : raw.rfind("}")+1]  # recorta si vino con texto extra
    PLAN = json.loads(raw)
    print("Plan recibido y parseado.\n")
    print(json.dumps(PLAN, indent=2, ensure_ascii=False))
except Exception as e:
    print("No se pudo obtener/parsear el plan:", e)


Plan recibido y parseado.

{
  "split": {
    "test_size": 0.2,
    "random_state": 42,
    "stratify": true
  },
  "preprocessing": {
    "numeric": {
      "imputer": "median",
      "scaler": "standard"
    },
    "categorical": {
      "imputer": "most_frequent",
      "encoder": "onehot",
      "handle_high_cardinality": {
        "threshold": 50,
        "strategy": "frequency"
      }
    },
    "drop_low_variance": false
  },
  "models": [
    {
      "name": "LogisticRegression",
      "estimator": "LogisticRegression",
      "param_grid": {
        "C": [
          0.1,
          1.0,
          10.0
        ],
        "solver": [
          "liblinear"
        ]
      }
    },
    {
      "name": "RandomForest",
      "estimator": "RandomForest",
      "param_grid": {
        "n_estimators": [
          100,
          300
        ],
        "max_depth": [
          null,
          6,
          10
        ],
        "min_samples_split": [
          2,
          5
        ]
    

Implementar el plan

**(Pipeline + GridSearch)**

In [21]:
# 7A.1 — Implementación del plan con scikit-learn

import numpy as np, pandas as pd
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.feature_selection import VarianceThreshold
from sklearn.base import BaseEstimator, TransformerMixin, clone

from sklearn.linear_model import Ridge, LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor, GradientBoostingClassifier

assert 'PLAN' in globals() and 'TARGET' in globals() and 'PROBLEM_TYPE' in globals(), "Faltan PLAN/TARGET/PROBLEM_TYPE"
assert 'df' in globals(), "Falta df"

# Utils
def get_estimator(name: str, problem_type: str):
    n = (name or "").lower()
    if problem_type == "regression":
        if "ridge" in n: return Ridge()
        if "linear" in n: return LinearRegression()
        if "randomforest" in n: return RandomForestRegressor(random_state=42, n_jobs=-1)
        if "gradientboost" in n or "xgboost" in n or "lightgbm" in n: return GradientBoostingRegressor(random_state=42)
        # fallback
        return RandomForestRegressor(random_state=42, n_jobs=-1)
    else:
        if "logistic" in n: return LogisticRegression(max_iter=1000, random_state=42)
        if "randomforest" in n: return RandomForestClassifier(random_state=42, n_jobs=-1)
        if "gradientboost" in n or "xgboost" in n or "lightgbm" in n: return GradientBoostingClassifier(random_state=42)
        # fallback
        return RandomForestClassifier(random_state=42, n_jobs=-1)

class FrequencyEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, cols):
        self.cols = cols
        self.maps_ = {}
    def fit(self, X, y=None):
        X = pd.DataFrame(X, columns=self.cols)
        self.maps_ = {c: X[c].value_counts(normalize=True) for c in self.cols}
        return self
    def transform(self, X):
        X = pd.DataFrame(X, columns=self.cols)
        for c in self.cols:
            X[c] = X[c].map(self.maps_[c]).fillna(0.0)
        return X.values

def nunique_safe(series_or_col):
    try:
        return int(series_or_col.nunique(dropna=True))
    except Exception:
        return 0

# Columnas (excluye TARGET)
all_cols = df.columns.tolist()
assert TARGET in all_cols, f"TARGET '{TARGET}' no está en df.columns"
feature_cols = [c for c in all_cols if c != TARGET]

plan_num = PLAN.get("numeric_cols", [])
plan_cat = PLAN.get("categorical_cols", [])
if plan_num or plan_cat:
    num_cols = [c for c in plan_num if c in feature_cols]
    cat_cols = [c for c in plan_cat if c in feature_cols]
else:
    num_cols = df[feature_cols].select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in feature_cols if c not in num_cols]

# Alta vs baja cardinalidad
prep_cfg = PLAN.get("preprocessing", {})
cat_cfg  = prep_cfg.get("categorical", {}) if prep_cfg else {}
high_thr = int(cat_cfg.get("handle_high_cardinality", {}).get("threshold", 50) or 50)

high_card_cols = [c for c in cat_cols if nunique_safe(df[c]) > high_thr]
low_card_cols  = [c for c in cat_cols if c not in high_card_cols]

# Estimar dimensionalidad OHE para decidir sparse/dense (heurístico)
approx_ohe_dim = sum(max(0, min(nunique_safe(df[c]), 50) - 1) for c in low_card_cols)
want_sparse_ohe = (approx_ohe_dim > 2000) or (len(df) > 200_000)

# Preprocesamiento numérico
num_imputer = SimpleImputer(strategy=prep_cfg.get("numeric", {}).get("imputer", "median"))
scaler_name = prep_cfg.get("numeric", {}).get("scaler", None)
if scaler_name == "standard":
    num_scaler = StandardScaler()
elif scaler_name == "minmax":
    num_scaler = MinMaxScaler()
else:
    num_scaler = "passthrough"
num_pipe = Pipeline([("imputer", num_imputer), ("scaler", num_scaler)])

# Preprocesamiento categórico
cat_imputer = SimpleImputer(strategy=cat_cfg.get("imputer", "most_frequent")) if cat_cfg else SimpleImputer(strategy="most_frequent")
try:
    low_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=want_sparse_ohe)
except TypeError:
    low_encoder = OneHotEncoder(handle_unknown="ignore", sparse=want_sparse_ohe)

transformers = []
if num_cols:
    transformers.append(("num", num_pipe, num_cols))
if low_card_cols:
    transformers.append(("cat_low", Pipeline([("imputer", cat_imputer), ("onehot", low_encoder)]), low_card_cols))
if high_card_cols:
    transformers.append(("cat_high", Pipeline([("freq", FrequencyEncoder(high_card_cols))]), high_card_cols))

pre = ColumnTransformer(transformers=transformers, remainder="drop")

# VarianceThreshold opcional
drop_low_var = bool(prep_cfg.get("drop_low_variance", False))
final_pre = Pipeline([("pre", pre), ("var", VarianceThreshold(0.0))]) if drop_low_var else pre

# Split
split = PLAN.get("split", {})
test_size     = float(split.get("test_size", 0.2))
random_state  = int(split.get("random_state", 42))
use_stratify  = bool(split.get("stratify", PROBLEM_TYPE == "classification"))

X = df[feature_cols].copy()
y = df[TARGET].copy()
stratify_arg = y if (use_stratify and PROBLEM_TYPE == "classification") else None

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=test_size, random_state=random_state, stratify=stratify_arg
)

# CV
cv_cfg = PLAN.get("cv", {})
n_splits = int(cv_cfg.get("n_splits", 5))
shuffle  = bool(cv_cfg.get("shuffle", True))
cv_rs    = int(cv_cfg.get("random_state", 42))
if PROBLEM_TYPE == "classification" and (use_stratify or str(cv_cfg.get("strategy","")).lower().startswith("strat")):
    CV = StratifiedKFold(n_splits=n_splits, shuffle=shuffle, random_state=cv_rs)
else:
    CV = KFold(n_splits=n_splits, shuffle=shuffle, random_state=cv_rs)

# Sanitizar grids del PLAN
def sanitize_grid_for(estimator, raw_grid: dict):
    """
    - Mantiene solo hiperparámetros válidos para el estimator.
    - Para LogisticRegression, fuerza combinaciones seguras.
    - Si queda vacío, aplica un grid por defecto razonable.
    Devuelve un dict con claves 'est__param'.
    """
    raw_grid = raw_grid or {}
    valid = set(estimator.get_params().keys())
    clean = {}

    from sklearn.linear_model import LogisticRegression
    if isinstance(estimator, LogisticRegression):
        # Grid seguro y pequeño (evita combos inválidos solver/penalty)
        base = {
            "C": raw_grid.get("C", [1.0, 0.5, 2.0]),
            "penalty": ["l2"],
            "solver": ["lbfgs"],
            "max_iter": [1000],
        }
        for k, v in base.items():
            if k in valid:
                clean[f"est__{k}"] = v
        return clean

    # General: filtra por params válidos
    for k, v in raw_grid.items():
        if k in valid:
            clean[f"est__{k}"] = v
        # si no es válido, se ignora silenciosamente

    # Defaults si quedó vacío
    if not clean:
        if hasattr(estimator, "n_estimators"):
            clean = {"est__n_estimators": [200, 400]}
        elif isinstance(estimator, Ridge):
            clean = {"est__alpha": [1.0, 0.1, 10.0]}
        else:
            clean = {}  # sin grid

    return clean

#  Construir candidatos
models_plan = PLAN.get("models", [])
candidates = []
if not models_plan:
    # Fallback mínimo si el PLAN está vacío
    default_est = LogisticRegression(max_iter=1000, random_state=42) if PROBLEM_TYPE=="classification" else Ridge()
    candidates = [("Baseline", Pipeline([("prep", final_pre), ("est", default_est)]), sanitize_grid_for(default_est, {}))]
else:
    for m in models_plan:
        est = get_estimator(m.get("estimator",""), PROBLEM_TYPE)
        grid = sanitize_grid_for(est, m.get("param_grid", {}))
        pipe = Pipeline([("prep", final_pre), ("est", est)])
        candidates.append((m.get("name", est.__class__.__name__), pipe, grid))

# Métrica primaria
metrics = PLAN.get("metrics", []) or (["mae","rmse","r2"] if PROBLEM_TYPE=="regression" else ["accuracy","f1"])
primary_metric = metrics[0]
SCORING_MAP = {
    "accuracy": "accuracy", "f1": "f1", "f1_macro": "f1_macro", "roc_auc": "roc_auc",
    "mae": "neg_mean_absolute_error", "rmse": "neg_root_mean_squared_error", "r2": "r2",
}
scoring = SCORING_MAP.get(primary_metric, "accuracy" if PROBLEM_TYPE=="classification" else "r2")

# Entrenar y elegir mejor
best = None
best_name, best_est = None, None

for name, pipe, grid in candidates:
    gs = GridSearchCV(
        estimator=pipe,
        param_grid=grid,
        scoring=scoring,
        cv=CV,
        n_jobs=-1,
        refit=True,
        verbose=0
    )
    try:
        gs.fit(X_train, y_train)
        if (best is None) or (gs.best_score_ > best.best_score_):
            best, best_name, best_est = gs, name, clone(gs.best_estimator_)
    except Exception as e:
        # Si un candidato falla por cualquier motivo, continúa con el siguiente
        print(f"⚠️  Ignorando candidato '{name}' por error: {e}")

if best is None:
    raise RuntimeError("No se pudo entrenar ningún candidato. Revisa el PLAN o el preprocesamiento.")

BEST_MODEL = best.best_estimator_
print(f"Modelo ganador: {best_name} | score_cv({primary_metric}) = {best.best_score_:.5f}")
print(f"OHE sparse={want_sparse_ohe} | num={len(num_cols)} | cat_low={len(low_card_cols)} | cat_high={len(high_card_cols)}")

Modelo ganador: RandomForest | score_cv(accuracy) = 0.79278
OHE sparse=False | num=6 | cat_low=4 | cat_high=2


Evaluación con Evalcards

In [24]:
# Paso 7B: Evaluación final y reporte con Evalcards
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, classification_report
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

from evalcards import make_report

assert 'BEST_MODEL' in globals(), "Primero ejecuta la celda 7A."

y_pred = BEST_MODEL.predict(X_test)

# Preparar métricas según el plan
metrics = PLAN.get("metrics", []) or (["mae","rmse","r2"] if PROBLEM_TYPE=="regression" else ["accuracy","f1"])
results = {}

y_proba = None
if PROBLEM_TYPE == "classification":
    # intentar proba si existe
    if hasattr(BEST_MODEL, "predict_proba"):
        proba = BEST_MODEL.predict_proba(X_test)
        # para binaria, tomar proba de la clase positiva
        if proba.shape[1] == 2:
            y_proba = proba[:, 1]
        else:
            y_proba = proba  # multiclase
    # calcular métricas
    for m in metrics:
        if m == "accuracy":
            results[m] = float(accuracy_score(y_test, y_pred))
        elif m in ("f1","f1_macro"):
            avg = "binary" if (len(np.unique(y_test))==2 and m=="f1") else "macro"
            results[m] = float(f1_score(y_test, y_pred, average=avg))
        elif m == "roc_auc" and y_proba is not None and (len(np.unique(y_test))==2):
            results[m] = float(roc_auc_score(y_test, y_proba))
    print("Métricas (test):", results)
else:
    # regresión
    for m in metrics:
        if m == "mae":
            results[m] = float(mean_absolute_error(y_test, y_pred))
        elif m == "rmse":
            results[m] = float(mean_squared_error(y_test, y_pred, squared=False))
        elif m == "r2":
            results[m] = float(r2_score(y_test, y_pred))
    print("Métricas (test):", results)

# Generar reporte con Evalcards
title = PLAN.get("evalcards", {}).get("title", "Mi modelo")
report_path = make_report(
    y_true=y_test,
    y_pred=y_pred,
    y_proba=y_proba,  # opcional (solo si clasificación y disponible)
    path="reporte_modelo.md",
    title=title
)
print("Reporte generado:", report_path)

# Vista rápida de 5 predicciones
preview = pd.DataFrame({
    "y_true": y_test[:5].values if hasattr(y_test, "values") else y_test[:5],
    "y_pred": y_pred[:5]
})
display(preview)

Métricas (test): {'accuracy': 0.79700977573318, 'f1': 0.8006775832862789, 'roc_auc': 0.8693266295232199}
Reporte generado: /content/evalcards_reports/reporte_modelo.md


Unnamed: 0,y_true,y_pred
0,True,True
1,False,True
2,False,False
3,True,True
4,False,False


Guardar modelo y artefactos

In [25]:
# Paso 8A: Guardar modelo y artefactos mínimos
import os, json
from joblib import dump

assert 'BEST_MODEL' in globals() and 'PLAN' in globals() and 'df' in globals() and 'TARGET' in globals()

ART_DIR = "artifacts"
os.makedirs(ART_DIR, exist_ok=True)

# 1) Modelo completo (incluye preprocesamiento)
dump(BEST_MODEL, f"{ART_DIR}/model.joblib")

# 2) Columnas de entrada y target
FEATURES = [c for c in df.columns if c != TARGET]
with open(f"{ART_DIR}/columns.json", "w", encoding="utf-8") as f:
    json.dump({"features": FEATURES, "target": TARGET}, f, ensure_ascii=False, indent=2)

# 3) Plan (por transparencia/reproducibilidad)
with open(f"{ART_DIR}/plan.json", "w", encoding="utf-8") as f:
    json.dump(PLAN, f, ensure_ascii=False, indent=2)

print("Guardado en 'artifacts':", os.listdir(ART_DIR))

Guardado en 'artifacts': ['columns.json', 'plan.json', 'model.joblib']


Inferencia y ejemplo rápido.

In [27]:
# Paso 8B: Inferencia
import numpy as np
import pandas as pd

assert 'BEST_MODEL' in globals() and 'FEATURES' in globals()

def predict_batch(df_new: pd.DataFrame, return_proba: bool = True):
    """
    Alinea columnas al esquema de entrenamiento (FEATURES),
    agrega faltantes como NaN, reordena, y predice.
    Devuelve (pred_df, info_de_alineacion).
    """
    # columnas faltantes y extra
    missing = [c for c in FEATURES if c not in df_new.columns]
    extra   = [c for c in df_new.columns if c not in FEATURES]

    # completar faltantes con NaN y reordenar
    if missing:
        for c in missing:
            df_new[c] = np.nan
    X_infer = df_new[FEATURES].copy()

    # predicción
    y_pred = BEST_MODEL.predict(X_infer)
    out = pd.DataFrame({"y_pred": y_pred})

    # probabilidades (si clasificación y el modelo las soporta)
    if return_proba and hasattr(BEST_MODEL, "predict_proba"):
        proba = BEST_MODEL.predict_proba(X_infer)
        if proba.ndim == 2 and proba.shape[1] == 2:
            out["y_proba"] = proba[:, 1]
        elif proba.ndim == 2 and proba.shape[1] > 2:
            for i in range(proba.shape[1]):
                out[f"proba_{i}"] = proba[:, i]

    info = {"missing_cols_added": missing, "ignored_extra_cols": extra}
    return out, info

# --- Ejemplo con 10 filas del set de test (si existen X_test / y_test)
try:
    preview_X = X_test.head(10).copy()
    pred_df, info = predict_batch(preview_X, return_proba=True)
    print("Alineación:", info)
    display(pd.concat([y_test.head(10).reset_index(drop=True).rename("y_true"), pred_df], axis=1))

    # Guardar una muestra de inferencia para el repo
    import os
    os.makedirs("artifacts", exist_ok=True)
    sample_path = "artifacts/preview_predictions.csv"
    pd.concat(
        [y_test.reset_index(drop=True).rename("y_true"),
         predict_batch(X_test, True)[0]], axis=1
    ).head(100).to_csv(sample_path, index=False)
    print("Muestra guardada en:", sample_path)
except Exception as e:
    print("Nota: si no existen X_test/y_test en este entorno, omite el ejemplo. Error:", e)

Alineación: {'missing_cols_added': [], 'ignored_extra_cols': []}


Unnamed: 0,y_true,y_pred,y_proba
0,True,True,0.998412
1,False,True,0.586183
2,False,False,0.120399
3,True,True,0.755503
4,False,False,0.379708
5,False,True,0.66876
6,True,True,0.82079
7,True,True,0.647792
8,False,False,0.112971
9,True,True,0.521931


Muestra guardada en: artifacts/preview_predictions.csv
