
# Taller (3h) — FastAPI + Scikit-Learn orientado a investigación
**Objetivo:** Diseñar, entrenar y servir un modelo de ML **investigando** las decisiones técnicas clave.  
**Entrega:** API funcional (FastAPI) + breve informe (markdown dentro del notebook) justificando decisiones.

> Filosofía del taller: menos receta, más criterio. No hay una única respuesta correcta; lo evaluado es la **calidad del razonamiento**, la **limpieza de la implementación** y la **capacidad de probar** la API.



## Agenda sugerida (3h)
1) **Planteo del problema y dataset** (30–40 min)  
2) **Entrenamiento y persistencia** (40–50 min)  
3) **Diseño del contrato y API** (45–55 min)  
4) **Pruebas, errores y mejoras** (30–35 min)

### Reglas
- No borres los encabezados `TODO`. Agrega tu código debajo de cada bloque indicado.
- Documenta tus decisiones en la sección **Bitácora** al final.
- Puedes trabajar en equipo, pero cada entrega debe ser individual y original.



---
## 1) Selección de dataset y formulación del problema (investigación)
Elige **uno**:
- A) `sklearn.datasets.load_wine` (clasificación 3 clases, baseline rápido)
- B) Un dataset tabular de UCI, Kaggle u otra fuente **citable** (debe ser pequeño/mediano y con licencia apta)
- C) Un CSV propio (explica origen y variables)

**Requisitos mínimos:**
- Problema de **clasificación** o **regresión** tabular
- Al menos **6 features numéricas** (puedes convertir categóricas)
- Justifica por qué es un buen caso para servir vía API

**Entrega (en esta celda, texto breve):** Describe el dataset, objetivo, variables y métrica principal.



### 1.1 Carga y exploración (TODO)
- Carga el dataset (pd.read_csv o loader de sklearn).
- Muestra `head()`, `describe()` y verifica nulos/outliers.
- Selecciona `X` (features) y `y` (target); explica tu elección.


In [None]:
# TODO: Carga y EDA mínima
import pandas as pd

# from sklearn.datasets import load_wine

# Ejemplo de estructura (reemplaza con tu fuente):
# df = pd.read_csv("TU_DATASET.csv")
# ---- TU CÓDIGO AQUÍ ----

# ---- DATASET DE PRUEBA- MODIFICAR ----
from sklearn.datasets import load_iris

iris_data = load_iris(as_frame=True)
df = iris_data.frame

#  Exploración
print("--- HEAD (Primeras 5 filas) ---")
print(df.head())
print("\n--- DESCRIBE (Estadísticas descriptivas) ---")
print(df.describe())
print("\n--- NULOS (Conteo por columna) ---")
print(df.isnull().sum())

--- HEAD (Primeras 5 filas) ---
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  \
0                5.1               3.5                1.4               0.2   
1                4.9               3.0                1.4               0.2   
2                4.7               3.2                1.3               0.2   
3                4.6               3.1                1.5               0.2   
4                5.0               3.6                1.4               0.2   

   target  
0       0  
1       0  
2       0  
3       0  
4       0  

--- DESCRIBE (Estadísticas descriptivas) ---
       sepal length (cm)  sepal width (cm)  petal length (cm)  \
count         150.000000        150.000000         150.000000   
mean            5.843333          3.057333           3.758000   
std             0.828066          0.435866           1.765298   
min             4.300000          2.000000           1.000000   
25%             5.100000          2.800000        


---
## 2) Entrenamiento y persistencia del modelo (investigación)
Toma decisiones y **justifícalas**:
- ¿Modelo base? (p. ej. `LogisticRegression`, `RandomForest`, `XGBoost` si lo instalas)
- ¿Preprocesamiento? (escala, imputación, OneHot, etc.)
- ¿Validación? (`train_test_split` vs `cross_val_score`)
- ¿Métrica? (clasificación: accuracy/F1; regresión: RMSE/MAE…)

**Requisito:** empaqueta tu flujo en un `Pipeline` de sklearn y **persiste** el modelo y columnas (joblib + JSON).


In [14]:
# TODO: split, pipeline, training, evaluación y persistencia
from pathlib import Path
import json, joblib
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import numpy as np

ARTIFACTS_DIR = Path("artifacts")
ARTIFACTS_DIR.mkdir(exist_ok=True)

# Variables esperadas:
# X, y = ...
# pipeline = ...
# métricas calculadas en 'metrics' (dict)

# ---- TU CÓDIGO AQUÍ ----

# ---- DATASET DE PRUEBA- MODIFICAR ----
from sklearn.neighbors import KNeighborsClassifier

# 3. Selección de X (features) y y (target)
X = df.drop(columns=["target"])  # quedan todas las columnas de medidas de la flor.
# y (Target): La columna 'target' a predecir.
y = df["target"]

# split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
# pipeline
pipeline = Pipeline(
    [("scaler", StandardScaler()), ("model", KNeighborsClassifier(n_neighbors=5))]
)

# training
pipeline.fit(X_train, y_train)

# eval
y_pred = pipeline.predict(X_test)
test_accuracy = accuracy_score(y_test, y_pred)

# ---- DATASET DE PRUEBA- MODIFICAR END----

#  diccionario de métricas
metrics = {
    "test_accuracy": round(test_accuracy, 4),
    "model_type": "KNeighborsClassifier",
    "preprocessing": "StandardScaler",
}
print(f"\nMétricas de prueba: Accuracy = {metrics['test_accuracy']}")


# TODO: Persistir
# persiste modelo
joblib.dump(pipeline, ARTIFACTS_DIR / "model.joblib")
# persiste orden de cols
json.dump(
    {"feature_columns": list(X.columns)},
    open(ARTIFACTS_DIR / "feature_columns.json", "w"),
)
# persiste metadatos y metricas
json.dump({"metrics": metrics}, open(ARTIFACTS_DIR / "metadata.json", "w"))


Métricas de prueba: Accuracy = 0.9333



---
## 3) Diseño del contrato de la API (investigación)
Define **endpoints mínimos**:
- `GET /health` (estado, versión del modelo, métrica)
- `POST /predict` (un registro)
- `POST /predict-batch` (lista de registros)

**Decisiones a justificar:**
- ¿Qué validaciones aplicas en `Pydantic`? (rango, tipos, campos extra)
- ¿Cómo garantizas el **orden** de columnas?
- ¿Qué devuelves además de la predicción? (probabilidades, latencia, advertencias)


In [11]:
# TODO: Generar un esquema dinámico a partir de las columnas persistidas (opcional pero recomendado)
# Carga feature_columns.json y crea un dict para generar ejemplos de payload
import json, os
from pathlib import Path

ARTIFACTS_DIR = Path("artifacts")
feat_path = ARTIFACTS_DIR / "feature_columns.json"


# ---- TU CÓDIGO AQUÍ ----
# 1. Cargar la lista de columnas (el contrato de entrada)
if not feat_path.exists():
       raise FileNotFoundError(
        f"El archivo de columnas no existe en {feat_path}. Ejecuta la Sección 2 (entrenamiento) primero."
    )

with open(feat_path, 'r') as f:
    column_data = json.load(f)

# La lista de features en el orden de entrenamiento
feature_columns = column_data.get("feature_columns", [])

# 2. Construye payload de ejemplo (para usar en pruebas)
sample_payload = {}

# Valores típicos de medidas del Iris están entre 4.0 y 8.0 cm
EXAMPLE_VALUE = 5.5 
for col in feature_columns:
    sample_payload[col] = EXAMPLE_VALUE 
    
# ---- FIN TU CÓDIGO AQUÍ ----


---
## 4) Implementación de FastAPI (investigación)
Crea un archivo `app.py` con:
- Carga perezosa de `model.joblib` y `feature_columns.json`
- Esquemas Pydantic (v2)
- Endpoints `/health`, `/predict`, `/predict-batch`
- Manejo de errores con `HTTPException` y mensajes claros

**Pistas** (no copiar/pegar sin entender):
- `model = joblib.load(...)`
- `class Sample(BaseModel): ...`
- `model.predict` y/o `model.predict_proba`
- Retornar JSON con `dict | BaseModel`

**Requisito:** esta celda **debe** escribir `app.py` con al menos la estructura básica.


In [18]:
# TODO: Escribir app.py
from textwrap import dedent
from pathlib import Path


app_code = dedent(
    """
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, ConfigDict, Field
from typing import List, Dict
import joblib, json, time
from pathlib import Path
import numpy as np

APP_VERSION = "0.1.0"
ARTIFACTS_DIR = Path("artifacts")
MODEL_PATH = ARTIFACTS_DIR / "model.joblib"
COLUMNS_PATH = ARTIFACTS_DIR / "feature_columns.json"
META_PATH = ARTIFACTS_DIR / "metadata.json"

app = FastAPI(title="ML API", version=APP_VERSION)

# Variables globales que se cargarán al inicio
model = None
columns = []
meta = {}
label_map = {0: "Setosa", 1: "Versicolor", 2: "Virginica"} #traduce la etiqueta en predicciones


# TODO: Define tus modelos Pydantic aquí
class Sample(BaseModel):
    model_config = ConfigDict(extra="forbid")
    # ---- COMPLETA CON TUS CAMPOS SEGÚN feature_columns.json ----
    
    sepal_length_cm: float = Field(..., ge=3.0, le=8.5, description="Longitud del sépalo en cm")
    sepal_width_cm: float = Field(..., ge=1.5, le=5.0, description="Ancho del sépalo en cm")
    petal_length_cm: float = Field(..., ge=0.5, le=7.5, description="Longitud del pétalo en cm")
    petal_width_cm: float = Field(..., ge=0.0, le=3.0, description="Ancho del pétalo en cm")


class PredictionOut(BaseModel):
    # ---- COMPLETA: qué devuelves? label / score / probs / latencia ----
    label_id: int = Field(..., description="ID de la especie predicha (0, 1, 2).")
    label_name: str = Field(..., description="Nombre de la especie predicha (Setosa, Versicolor, Virginica).")
    score: float = Field(..., ge=0.0, le=1.0, description="Probabilidad o confianza de la predicción.")
    latency_ms: float = Field(..., gt=0, description="Tiempo de procesamiento de la predicción en milisegundos.")
    pass

@app.on_event("startup")
def load_artifacts():
    global model, columns, meta
    if not MODEL_PATH.exists():
        raise RuntimeError("Modelo no encontrado. Entrena y exporta primero.")
    model = joblib.load(MODEL_PATH)
    columns = json.loads(COLUMNS_PATH.read_text())["feature_columns"]
    meta = json.loads(META_PATH.read_text()) if META_PATH.exists() else {}


def prepare_data_for_model(data_samples: List[Sample], columns: List[str]) -> np.ndarray:
    # Convertir cada Sample a un diccionario
    data_dicts = [sample.model_dump() for sample in data_samples]
    
    # Crear la matriz NumPy en el orden correcto
    X_input = []
    for d in data_dicts:
        # Asegura que los valores se extraigan en el orden de 'columns'
        row = [d[col] for col in columns]
        X_input.append(row)
        
    return np.array(X_input)

#ENDPOINTS 

@app.get("/health")
def health():
    return {
        "status": "ok",
        "version": APP_VERSION,
        "metrics": meta.get("metrics"),
        "n_features": len(columns),
        "model_pipeline_steps": [name for name, _ in model.steps] if model else "N/A" #estructura interna del modelo
        }

@app.post("/predict")
def predict(sample: Sample):
    try:
        start = time.perf_counter()
        # TODO: transforma 'sample' a lista en el orden de 'columns'
        # y haz model.predict / predict_proba
        # ---- TU CÓDIGO AQUÍ ----
        
        X_input = prepare_data_for_model([sample], columns)
        #   predicciones
        prediction_id = model.predict(X_input)[0]
        prediction_proba = model.predict_proba(X_input)[0]
        
        # Calcular la confianza 
        score = np.max(prediction_proba)
        
        # latencia y formatear salida
        latency_ms = (time.perf_counter() - start) * 1000
        
        # con el mapa se obtiene el nombre de la etiqueta
        label_name = label_map.get(prediction_id, "Desconocida")
        
        return PredictionOut(
            label_id=int(prediction_id),
            label_name=label_name,
            score=round(score, 4),
            latency_ms=round(latency_ms, 3)
        )
        
    except Exception as e:
        print(f"Error en /predict: {e}")
        raise HTTPException(status_code=400, detail=f"Error en el procesamiento de la predicción: {str(e)}")


@app.post("/predict-batch")
def predict_batch(samples: List[Sample]):
    try:
        # TODO: similar a /predict pero para lista
        # ---- TU CÓDIGO AQUÍ ----
        
        start = time.perf_counter()
        X_input = prepare_data_for_model(samples, columns)
        
        #  predicciones
        prediction_ids = model.predict(X_input)
        prediction_probas = model.predict_proba(X_input)
        
        results = []
        for id_pred, probas in zip(prediction_ids, prediction_probas):
            score = np.max(probas)
            label_name = label_map.get(id_pred, "Desconocida")
            
            results.append(PredictionOut(
                label_id=int(id_pred),
                label_name=label_name,
                score=round(score, 4),
                latency_ms=0.0 # Se dejará en 0.0 para evitar complejidad en el batch
            ))

        # Reemplaza latency_ms individual con el tiempo total del batch
        total_latency_ms = (time.perf_counter() - start) * 1000
        
        # Para el batch, se actualiza el primer elemento 
        if results:
            results[0].latency_ms = round(total_latency_ms, 3)
            
        return results
        
    except Exception as e:
        print(f"Error en /predict-batch: {e}")
        raise HTTPException(status_code=400, detail=f"Error en el procesamiento del lote: {str(e)}")

"""
)

Path("app.py").write_text(app_code, encoding='utf-8')
"app.py escrito (plantilla con TODOs)"

'app.py escrito (plantilla con TODOs)'


### 4.1 Ejecutar el servidor
```bash
uvicorn app:app --reload --port 8000
```
Abre `http://127.0.0.1:8000/docs` y prueba manualmente. Registra resultados.



---
## 5) Pruebas y casos límite (investigación)
Define y ejecuta **al menos 6** pruebas:
- 3 válidas (predict y batch)
- 3 inválidas (campo faltante, tipo incorrecto, campo extra, etc.)

Incluye código de prueba y captura de respuestas.


In [None]:
# TODO: pruebas con requests (server debe estar corriendo)
# import requests, json
# payload_valido = {...}
# r = requests.post("http://127.0.0.1:8000/predict", json=payload_valido, timeout=5)
# print(r.status_code, r.json())
raise NotImplementedError("Escribe tus pruebas de cliente aquí")


---
## 6) (Opcional) Observabilidad y despliegue (investigación)
- Middleware de latencia y logger estructurado (añade header `X-Process-Time-ms`)
- Manejo de warnings cuando la entrada está fuera de rango esperado
- Dockerfile mínimo para empaquetar y correr localmente


In [None]:
# TODO (opcional): pega aquí snippets de middleware o Dockerfile que diseñes
# class TimingMiddleware(...): ...
# FROM python:3.11-slim
pass


---
## 7) Criterios de evaluación (rúbrica breve)
- **Justificación técnica (25%)**: dataset, métrica, modelo y preprocesamiento argumentados.
- **Calidad del pipeline (20%)**: reproducibilidad y limpieza (Pipeline, persistencia correcta).
- **Contrato y validaciones (25%)**: Pydantic coherente, errores claros, orden de features garantizado.
- **Pruebas (20%)**: variedad de casos, evidencia de resultados y manejo de fallos.
- **Código y documentación (10%)**: legibilidad, estructura y claridad de mensajes.

---
## Bitácora de decisiones (responde aquí)
- Dataset y objetivo:
- Selección de features/target:
- Modelo y preprocesamiento:
- Métrica principal y resultados:
- Decisiones de contrato (payload, validaciones, respuestas):
- Observabilidad y pruebas:
- Lecciones aprendidas:
