# 0. Introducción a Cron y Logger en Python

## 0.1. ¿Qué es **Cron**?

https://en.wikipedia.org/wiki/Cron

La utilidad de línea de comandos **cron** es un **programador de tareas** (job scheduler) en sistemas operativos basados en `Unix`. Su función es permitir a los usuarios, quienes mantienen y configuran entornos de software, programar trabajos (jobs), como comandos o scripts de shell, conocidos también como **cron jobs**, para ejecutarse periódicamente en tiempos, fechas o intervalos fijos.

Cron se utiliza típicamente para automatizar tareas de mantenimiento o administración del sistema. Sin embargo, su naturaleza general lo hace útil para otras actividades, como descargar archivos desde Internet o revisar correos electrónicos a intervalos regulares. 

**Cron** es más adecuado para tareas repetitivas.

El nombre de **cron** proviene de "Chronos", la palabra griega para tiempo.

```bash
# * * * * * <command to execute>
# | | | | |
# | | | | day of the week (0–6) (Sunday to Saturday; 7 is also Sunday on some systems)
# | | | month (1–12)             
# | | day of the month (1–31)
# | hour (0–23)
# minute (0–59)
```

### Vamos a jugar
https://crontab.guru/


## 0.2. ¿Qué es **Logger** en Python?

A medida que nuestros scripts se vuelven más complejos, necesitamos una forma de **monitorear** lo que está sucediendo en ellos, especialmente cuando algo sale mal. Aquí es donde entra **logging**. 

`Python` tiene un módulo integrado llamado `logging` que permite registrar eventos o mensajes en diferentes niveles: 

- **DEBUG**: Información detallada, usualmente para desarrolladores.
- **INFO**: Confirmación de que las cosas están funcionando como se esperaba.
- **WARNING**: Algo inesperado, pero no crítico.
- **ERROR**: Fallos debido a un problema en el programa.
- **CRITICAL**: Error grave, usualmente detiene el programa.

Para usar **logger** en un script de Python:

In [1]:
import logging

# Configuración básica de logging
logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s - %(levelname)s - %(message)s',
)

# Ejemplo de cómo registrar eventos
logging.debug("Este es un mensaje de depuración")
logging.info("Este es un mensaje informativo")
logging.warning("Este es una advertencia")
logging.error("Este es un mensaje de error")
logging.critical("Este es un error crítico")


2025-10-27 20:27:08,380 - DEBUG - Este es un mensaje de depuración
2025-10-27 20:27:08,381 - INFO - Este es un mensaje informativo
2025-10-27 20:27:08,382 - ERROR - Este es un mensaje de error
2025-10-27 20:27:08,383 - CRITICAL - Este es un error crítico


### Configuración de **Logger** en Python

El módulo `logging` de Python permite una gran flexibilidad para definir cómo y dónde se registran los mensajes. Algunas de las configuraciones más comunes son:

#### 1. **Nivel de Log** (`level`)
El nivel del log define la gravedad de los mensajes que se quieren capturar. Algunos niveles de log comunes incluyen:
- `DEBUG`: Información detallada, útil para depuración.
- `INFO`: Confirmaciones de que el programa está funcionando como se espera.
- `WARNING`: Indica que algo inesperado sucedió, pero el programa sigue funcionando.
- `ERROR`: Señala errores más graves, pero que no detienen la ejecución.
- `CRITICAL`: Errores graves que probablemente detendrán el programa.

Cuando configuramos un **nivel de log**, sólo se capturan los mensajes de ese nivel o superiores. Por ejemplo, si establecemos el nivel en `WARNING`, se registrarán los mensajes `WARNING`, `ERROR` y `CRITICAL`, pero no los `DEBUG` o `INFO`.

#### 2. **Formato del mensaje** (`format`)
El formato del mensaje permite personalizar cómo se muestran los logs. Algunos componentes útiles en el formato son:
- `%(asctime)s`: La fecha y hora en que se registró el mensaje.
- `%(levelname)s`: El nivel del log (DEBUG, INFO, WARNING, etc.).
- `%(message)s`: El mensaje que se ha registrado.
- `%(name)s`: El nombre del logger.
- `%(threadName)s`: El nombre del hilo desde donde se emitió el log.
- `%(processName)s`: El nombre del proceso que emitió el log.

#### 3. **Formato de fecha y hora** (`datefmt`)
El parámetro `datefmt` permite personalizar el formato de la fecha y hora que se incluye en los mensajes de log. Puedes usar cualquier formato de fecha y hora compatible con Python, como el que se usa en `strftime`. Esto es útil para ajustar el formato a las necesidades específicas de tu aplicación.

##### Códigos de Formato Comunes

Aquí tienes algunos códigos de formato comúnmente utilizados que puedes usar con `strftime`:

| Código | Significado                           | Ejemplo de Salida       |
|--------|---------------------------------------|--------------------------|
| `%Y`   | Año con siglo                         | `2024`                   |
| `%y`   | Año sin siglo (00-99)                | `24`                     |
| `%m`   | Mes como un decimal con ceros (01-12)| `09`                     |
| `%B`   | Nombre completo del mes               | `Septiembre`             |
| `%b`   | Nombre abreviado del mes              | `Sep`                    |
| `%d`   | Día del mes (01-31)                  | `24`                     |
| `%H`   | Hora (00-23)                         | `14`                     |
| `%I`   | Hora (01-12)                         | `02`                     |
| `%M`   | Minuto (00-59)                       | `05`                     |
| `%S`   | Segundo (00-59)                      | `30`                     |
| `%p`   | AM o PM                              | `PM`                     |
| `%A`   | Nombre completo del día de la semana | `Martes`                 |
| `%a`   | Nombre abreviado del día de la semana | `Mar`                    |
| `%j`   | Día del año (001-366)                | `267`                    |
| `%U`   | Número de semana del año (00-53)     | `39`                     |
| `%W`   | Número de semana del año (00-53), basado en lunes | `39`          |



In [2]:
import logging
import os

logger = logging.getLogger(__name__)
                           
logging.basicConfig(
    # filename='backup.log', 
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
     datefmt='%Y-%m-%d %H:%M:%S',
)

def hacer_respaldo():
    try:
        # Supongamos que aquí ocurre el proceso de respaldo
        logger.info("Respaldo iniciado")
        # Simulación de respaldo
        if os.path.exists('base_de_datos.db'):
            logger.info("Respaldo exitoso")
        else:
            logger.error("Error: La base de datos no existe")
    except Exception as e:
        logger.critical(f"Error crítico durante el respaldo: {e}")

# Ejecutamos la función
hacer_respaldo()

2025-10-27 20:36:27,948 - INFO - Respaldo iniciado
2025-10-27 20:36:27,950 - ERROR - Error: La base de datos no existe


Ahora sí, empecemos la clase de hoy

# 1. Machine Learning Pipeline a.k.a Workflow Orchestration

> Un pipeline de machine learning es una serie de pasos secuenciales y automatizados que se siguen para entrenar, evaluar y desplegar un modelo de machine learning. 
> El objetivo principal de un pipeline es automatizar el proceso repetitivo de transformar datos crudos en un modelo que pueda usarse en producción.

    
### Yo tratando de explicar el orden de ejecución de las celdas de mi `jupyter-notebook`:

 <img style="display: block; margin: auto;" src="./images/orchestration-meme.png" width="580" height="500">

Revisemos lo que hemos hecho hasta ahora con nuestro código...
1. Downloading data ----> Ingestion
2. Transforming the data ----> Filtering, removing outliers
3. Preparing data for ML ----> Feature Engineering
4. Hyper-parameter tunning ----> Best params
5. Train the final model ----> Best params
6. Registry the final model

**Problemas:**
- Un cuaderno gigante
- Sin muchas instrucciones
- Poco legible para cualquier persona
- No escalable ni mantenible
- Podríamos decir que es un `workflow`, ya que se debe ejecutar en un orden específico

In [None]:
def download_data(year, month):
    ...
    return df

def prepare_data(df):
    ...
    return df

def feature_engineering(df):
    ...
    return X, y

def find_best_model(X, y):
    ...
    return params

def train_model(X, y, params):
    ...
    return model

def main():
    df = download_data(2024,1)
    df = prepare_data(df)
    X, y = feature_engineering(df)
    model_params = find_best_model(X, y)
    model = train_model(X, y, model_params)

Mucho mejor, no?

Pero sigue teniendo problemas:
- Lo podemos agendar?
- Qué pasa si tengo múltiples archivos?
- O si no lo quiero ejecutar en mi máquina local?
- Qué pasa si una de las funciones falla? Si es solamente temporal el fallo?
- Y si queremos notificar que ese error ocurrió a algún administrador?

Hay múltiples herramientas que vienen a solventar esos problemas:

- [Apache Airflow](https://airflow.apache.org/)
- [Prefect](https://www.prefect.io/)
- [Mage](https://www.mage.ai/)
- [Dagster](https://dagster.io/)
- [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/)
- [Scikitlearn Pipelines](https://scikit-learn.org/stable/modules/compose.html#pipeline)



# 2. Prefect

## 2.1. Definiciones en **Prefect**

### 2.1.1. **Task (Tarea)**
https://docs.prefect.io/3.0/develop/write-tasks

En **Prefect**, una `task` es la unidad más básica de trabajo en un flujo de Prefect. Una `task` representa una operación individual que se ejecuta dentro de un flujo de trabajo (`flow`). Puedes convertir cualquier función de Python en una `task` agregando el decorador `@task`. 

Las tasks pueden:
- **Tomar entradas, realizar un trabajo y devolver salidas**: Realizan operaciones con los datos que reciben y devuelven resultados.
- **Cachear su ejecución a través de múltiples invocaciones**: Evitar repetir cálculos si una task ya se ejecutó anteriormente con los mismos inputs.
- **Encapsular la lógica del flujo en unidades reutilizables**: Pueden ser utilizadas en diferentes flows.
- **Usar logging automático** para capturar detalles de ejecución, etiquetas (tags) y estado final.
- **Ejecutarse de forma concurrente**: Permiten paralelismo en la ejecución de tareas.
- **Definirse en el mismo archivo que el `flow` o importarse de módulos**.
- **Llamarse desde `flows` u otras `tasks`**.

**Ejemplo de una tarea:**

```python
from prefect import task

@task
def sumar(a, b):
    return a + b
```

### 2.1.2. **Flow (Flujo)**
https://docs.prefect.io/3.0/develop/write-flows

Un `flow` en **Prefect** es una colección de `tasks` que se ejecutan de manera organizada y coordinada. 

Un `flow` define cómo las `tasks` interactúan entre sí y permite orquestar la ejecución de múltiples `tasks` con reglas específicas, como dependencias, condicionales y paralelismo. 

Además, un `flow` puede manejar `tasks` de manera secuencial o en paralelo, así como gestionarlas en función de eventos externos.

Los flows se definen como funciones en Python, y pueden tomar entradas, realizar tareas y devolver resultados. Cualquier función Python puede convertirse en un flow de Prefect añadiendo el decorador `@flow`:
```python
from prefect import flow

@flow
def mi_flujo_principal():
    resultado = sumar(3, 4)
    print(f"El resultado es {resultado}")
```
#### Capacidades de los Flows en Prefect:

Cuando una función se convierte en un `flow`, adquiere las siguientes capacidades:

- **Seguimiento automático de metadatos** sobre las ejecuciones del flujo, como el tiempo de ejecución y el estado final.
- **Registro de cada estado** que el flujo alcanza, lo que permite observar y actuar sobre cada transición en la ejecución del flow.
- **Validación de tipos de los argumentos** de entrada como parámetros del flujo de trabajo.
- **Reintentos automáticos** en caso de fallo, con límites y retrasos configurables.
- **Timeouts** para evitar que los flujos de trabajo se ejecuten durante demasiado tiempo sin control.
- **Capacidad de despliegue**, lo que expone una API para interactuar con el flow de manera remota.

Los `flows` se identifican de forma única por su nombre. Puedes especificar un nombre para el `flow` utilizando el parámetro `name`:

```python
@flow(name="Mi Flujo")
def mi_flujo() -> str:
    return "¡Hola, mundo!"
```

Si no proporcionas un nombre, Prefect usará el nombre de la función del flow.

#### Ejecución de `Flows`:

Una ejecución de flow (**flow run**) es una ejecución individual de un `flow`.

Puedes ejecutar un `flow` llamándolo por su nombre de función, de la misma manera que lo harías con una función normal de `Python`. También puedes ejecutar un `flow` mediante:

- **Programadores externos**, como `cron`, para invocar la función del `flow`.
- **Desplegar el flow en Prefect Cloud** o en un servidor auto-hospedado de Prefect.
- **Iniciar una ejecución de flow a través de un cronograma**, la interfaz de usuario de Prefect, o la API de Prefect.

Sin importar cómo ejecutes el `flow`, `Prefect` monitorea su ejecución, capturando el estado para observabilidad. Además, puedes registrar una variedad de metadatos sobre las ejecuciones del flow para monitoreo, resolución de problemas y auditoría.



### 2.1.3. Diferencias entre `Task` y `Flow`:

- **Task**: Una tarea individual, pequeña y específica que ejecuta una operación.
- **Flow**: Un contenedor que organiza y coordina la ejecución de varias tasks, permitiendo su orquestación.

### 2.1.4. Ejemplo Completo de Task y Flow en Prefect:

```python
from prefect import task, flow

@task
def sumar(a, b):
    return a + b

@flow
def mi_flujo_principal():
    resultado = sumar(3, 4)
    print(f"El resultado es {resultado}")

# Ejecutar el flow
mi_flujo_principal()
```

## 2.2 Uso de `Prefect`

```bash
uv add prefect
prefect version
```

Ahora vamos a inicializar un servidor de `prefect`

```bash
prefect server start
```

Vamos a ver un `flow` sencillo

In [3]:
import httpx
from prefect import flow, task


@task(retries=4, retry_delay_seconds=1, log_prints=True)
def fetch_cat_fact():
    cat_fact = httpx.get("https://f3-vyx5c2hfpq-ue.a.run.app/")
    #An endpoint that is designed to fail sporadically
    if cat_fact.status_code >= 400:
        raise Exception()
    print(cat_fact.text)


@flow
def fetch():
    fetch_cat_fact()

In [7]:
fetch()

2025-10-27 21:19:43,866 - DEBUG - connect_tcp.started host='127.0.0.1' port=4200 local_address=None timeout=60.0 socket_options=None
2025-10-27 21:19:43,868 - DEBUG - connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1137aec00>
2025-10-27 21:19:43,870 - DEBUG - send_request_headers.started request=<Request [b'GET']>
2025-10-27 21:19:43,871 - DEBUG - send_request_headers.complete
2025-10-27 21:19:43,878 - DEBUG - send_request_body.started request=<Request [b'GET']>
2025-10-27 21:19:43,879 - DEBUG - send_request_body.complete
2025-10-27 21:19:43,879 - DEBUG - receive_response_headers.started request=<Request [b'GET']>
2025-10-27 21:19:43,881 - DEBUG - receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'date', b'Tue, 28 Oct 2025 03:19:43 GMT'), (b'server', b'uvicorn'), (b'content-length', b'8'), (b'content-type', b'application/json')])
2025-10-27 21:19:43,882 - INFO - HTTP Request: GET http://127.0.0.1:4200/api/admin/version "HTTP/1

Ahora veamos el concepto de `subflows`

In [8]:
from prefect import flow


@flow(name="Cat fact")
def fetch_cat_fact():
    """A flow that gets a cat fact"""
    return httpx.get("https://catfact.ninja/fact?max_length=140").json()["fact"]


@flow(name="Dog fact")
def fetch_dog_fact():
    """A flow that gets a dog fact"""
    return httpx.get(
        "https://dogapi.dog/api/v2/facts",
        headers={"accept": "application/json"},
    ).json()["data"][0]["attributes"]["body"]


@flow(name="Animals fact", log_prints=True)
def animal_facts():
    cat_fact = fetch_cat_fact()
    dog_fact = fetch_dog_fact()
    print(f"🐱: {cat_fact} \n🐶: {dog_fact}")

In [9]:
animal_facts()

2025-10-27 21:20:44,676 - DEBUG - connect_tcp.started host='127.0.0.1' port=4200 local_address=None timeout=60.0 socket_options=None
2025-10-27 21:20:44,677 - DEBUG - connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x1138b7c20>
2025-10-27 21:20:44,677 - DEBUG - send_request_headers.started request=<Request [b'GET']>
2025-10-27 21:20:44,678 - DEBUG - send_request_headers.complete
2025-10-27 21:20:44,678 - DEBUG - send_request_body.started request=<Request [b'GET']>
2025-10-27 21:20:44,679 - DEBUG - send_request_body.complete


2025-10-27 21:20:44,680 - DEBUG - receive_response_headers.started request=<Request [b'GET']>
2025-10-27 21:20:44,681 - DEBUG - receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'date', b'Tue, 28 Oct 2025 03:20:43 GMT'), (b'server', b'uvicorn'), (b'content-length', b'8'), (b'content-type', b'application/json')])
2025-10-27 21:20:44,682 - INFO - HTTP Request: GET http://127.0.0.1:4200/api/admin/version "HTTP/1.1 200 OK"
2025-10-27 21:20:44,683 - DEBUG - receive_response_body.started request=<Request [b'GET']>
2025-10-27 21:20:44,684 - DEBUG - receive_response_body.complete
2025-10-27 21:20:44,685 - DEBUG - response_closed.started
2025-10-27 21:20:44,685 - DEBUG - response_closed.complete
2025-10-27 21:20:44,687 - DEBUG - send_request_headers.started request=<Request [b'GET']>
2025-10-27 21:20:44,689 - DEBUG - send_request_headers.complete
2025-10-27 21:20:44,690 - DEBUG - send_request_body.started request=<Request [b'GET']>
2025-10-27 21:20:44,690 - DEBUG - sen

Ahora vamos a hacerlo para nuestro pipeline de entrenamiento en nuestro proyecto `nyc-taxi-time-prediction`

- Vamos a crear una nueva rama `feat/training_orchestration`. 
- Vamos a crear un nuevo directorio `training pipeline`
- crear un archivo llamado `02-train_pipeline_prefect.py`

In [None]:
import os
import math
import optuna
import pathlib
import pickle
import mlflow
import pathlib
import pandas as pd
import xgboost as xgb
from dotenv import load_dotenv
from optuna.samplers import TPESampler
from mlflow.models.signature import infer_signature
from sklearn.metrics import root_mean_squared_error
from sklearn.feature_extraction import DictVectorizer
from prefect import flow, task

@task(name="Read Data")
def read_data(file_path: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    df = pd.read_parquet(file_path)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df

@task(name="Add Features")
def add_features(df_train: pd.DataFrame, df_val: pd.DataFrame):
    """Add features to the model"""
    df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
    df_val["PU_DO"] = df_val["PULocationID"] + "_" + df_val["DOLocationID"]

    categorical = ["PU_DO"]  #'PULocationID', 'DOLocationID']
    numerical = ["trip_distance"]

    dv = DictVectorizer()

    train_dicts = df_train[categorical + numerical].to_dict(orient="records")
    X_train = dv.fit_transform(train_dicts)

    val_dicts = df_val[categorical + numerical].to_dict(orient="records")
    X_val = dv.transform(val_dicts)

    y_train = df_train["duration"].values
    y_val = df_val["duration"].values
    return X_train, X_val, y_train, y_val, dv

@task(name="Hyperparameter Tunning")
def hyper_parameter_tunning(X_train, X_val, y_train, y_val, dv):
    
    mlflow.xgboost.autolog()
    
    training_dataset = mlflow.data.from_numpy(X_train.data, targets=y_train, name="green_tripdata_2024-01")
    
    validation_dataset = mlflow.data.from_numpy(X_val.data, targets=y_val, name="green_tripdata_2024-02")
    
    train = xgb.DMatrix(X_train, label=y_train)
    
    valid = xgb.DMatrix(X_val, label=y_val)
    
    # ------------------------------------------------------------
    # Definir la función objetivo para Optuna
    #    - Recibe un `trial`, que se usa para proponer hiperparámetros.
    #    - Entrena un modelo con esos hiperparámetros.
    #    - Calcula la métrica de validación (RMSE) y la retorna (Optuna la minimizará).
    #    - Abrimos un run anidado de MLflow para registrar cada trial.
    # ------------------------------------------------------------
    def objective(trial: optuna.trial.Trial):
        # Hiperparámetros MUESTREADOS por Optuna en CADA trial.
        # Nota: usamos log=True para emular rangos log-uniformes (similar a loguniform).
        params = {
            "max_depth": trial.suggest_int("max_depth", 4, 100),
            "learning_rate": trial.suggest_float("learning_rate", math.exp(-3), 1.0, log=True),
            "reg_alpha": trial.suggest_float("reg_alpha",   math.exp(-5), math.exp(-1), log=True),
            "reg_lambda": trial.suggest_float("reg_lambda", math.exp(-6), math.exp(-1), log=True),
            "min_child_weight": trial.suggest_float("min_child_weight", math.exp(-1), math.exp(3), log=True),
            "objective": "reg:squarederror",  
            "seed": 42,                      
        }

        # Run anidado para dejar rastro de cada trial en MLflow
        with mlflow.start_run(nested=True):
            mlflow.set_tag("model_family", "xgboost")  # etiqueta informativa
            mlflow.log_params(params)                  # registra hiperparámetros del trial

            # Entrenamiento con early stopping en el conjunto de validación
            booster = xgb.train(
                params=params,
                dtrain=train,
                num_boost_round=100,
                evals=[(valid, "validation")],
                early_stopping_rounds=10,
            )

            # Predicción y métrica en validación
            y_pred = booster.predict(valid)
            rmse = root_mean_squared_error(y_val, y_pred)

            # Registrar la métrica principal
            mlflow.log_metric("rmse", rmse)

            # La "signature" describe la estructura esperada de entrada y salida del modelo:
            # incluye los nombres, tipos y forma (shape) de las variables de entrada y el tipo de salida.
            # MLflow la usa para validar datos en inferencia y documentar el modelo en el Model Registry.
            signature = infer_signature(X_val, y_pred)

            # Guardar el modelo del trial como artefacto en MLflow.
            mlflow.xgboost.log_model(
                booster,
                name="model",
                input_example=X_val[:5],
                signature=signature
            )

        # Optuna minimiza el valor retornado
        return rmse

    # ------------------------------------------------------------
    # Crear el estudio de Optuna
    #    - Usamos TPE (Tree-structured Parzen Estimator) como sampler.
    #    - direction="minimize" porque queremos minimizar el RMSE.
    # ------------------------------------------------------------
    sampler = TPESampler(seed=42)
    study = optuna.create_study(direction="minimize", sampler=sampler)

    # ------------------------------------------------------------
    # Ejecutar la optimización (n_trials = número de intentos)
    #    - Cada trial ejecuta la función objetivo con un set distinto de hiperparámetros.
    #    - Abrimos un run "padre" para agrupar toda la búsqueda.
    # ------------------------------------------------------------
    with mlflow.start_run(run_name="XGBoost Hyperparameter Optimization (Optuna)", nested=True):
        study.optimize(objective, n_trials=3)

    # --------------------------------------------------------
    # Recuperar y registrar los mejores hiperparámetros
    # --------------------------------------------------------
    best_params = study.best_params
    # Asegurar tipos/campos fijos (por claridad y consistencia)
    best_params["max_depth"] = int(best_params["max_depth"])
    best_params["seed"] = 42
    best_params["objective"] = "reg:squarederror"

    return best_params

@task(name="Train Best Model")
def train_best_model(X_train, X_val, y_train, y_val, dv, best_params) -> None:
    """train a model with best hyperparams and write everything out"""

    with mlflow.start_run(run_name="Best model ever"):
        train = xgb.DMatrix(X_train, label=y_train)
        valid = xgb.DMatrix(X_val, label=y_val)

        mlflow.log_params(best_params)

        # Etiquetas del run "padre" (metadatos del experimento)
        mlflow.set_tags({
            "project": "NYC Taxi Time Prediction Project",
            "optimizer_engine": "optuna",
            "model_family": "xgboost",
            "feature_set_version": 1,
        })

        # --------------------------------------------------------
        # 7) Entrenar un modelo FINAL con los mejores hiperparámetros
        #    (normalmente se haría sobre train+val o con CV; aquí mantenemos el patrón original)
        # --------------------------------------------------------
        booster = xgb.train(
            params=best_params,
            dtrain=train,
            num_boost_round=100,
            evals=[(valid, "validation")],
            early_stopping_rounds=10,
        )

        # Evaluar y registrar la métrica final en validación
        y_pred = booster.predict(valid)
        rmse = root_mean_squared_error(y_val, y_pred)
        mlflow.log_metric("rmse", rmse)

        # --------------------------------------------------------
        # 8) Guardar artefactos adicionales (p. ej. el preprocesador)
        # --------------------------------------------------------
        pathlib.Path("preprocessor").mkdir(exist_ok=True)
        with open("preprocessor/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)

        mlflow.log_artifact("preprocessor/preprocessor.b", artifact_path="preprocessor")

        # La "signature" describe la estructura esperada de entrada y salida del modelo:
        # incluye los nombres, tipos y forma (shape) de las variables de entrada y el tipo de salida.
        # MLflow la usa para validar datos en inferencia y documentar el modelo en el Model Registry.
        # Si X_val es la matriz dispersa (scipy.sparse) salida de DictVectorizer:
        feature_names = dv.get_feature_names_out()
        input_example = pd.DataFrame(X_val[:5].toarray(), columns=feature_names)

        # Para que las longitudes coincidan, usa el mismo slice en y_pred
        signature = infer_signature(input_example, y_val[:5])

        # Guardar el modelo del trial como artefacto en MLflow.
        mlflow.xgboost.log_model(
            booster,
            name="model",
            input_example=input_example,
            signature=signature,
        )
    return None

@flow(name="Main Flow")
def main_flow(year: int, month_train: str, month_val: str) -> None:
    """The main training pipeline"""
    
    train_path = f"../data/green_tripdata_{year}-{month_train}.parquet"
    val_path = f"../data/green_tripdata_{year}-{month_val}.parquet"
    
    load_dotenv(override=True)  # Carga las variables del archivo .env
    EXPERIMENT_NAME = "/Users/<tu_correo>/nyc-taxi-experiment-prefect"

    mlflow.set_tracking_uri("databricks")
    experiment = mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

    # Load
    df_train = read_data(train_path)
    df_val = read_data(val_path)

    # Transform
    X_train, X_val, y_train, y_val, dv = add_features(df_train, df_val)
    
    # Hyper-parameter Tunning
    best_params = hyper_parameter_tunning(X_train, X_val, y_train, y_val, dv)
    
    # Train
    train_best_model(X_train, X_val, y_train, y_val, dv, best_params)

if __name__ == "__main__":
    main_flow(year=2025, month_train="01", month_val="02")

## Actividad en clase:
### A los 5 primeros que logren terminar la siguiente actividad les subiré el puntaje a 100 de la tarea con menor calificación actualmente.

1. Crear una nueva task para agregar el modelo con mejor métrica al `model registry`.
2. El modelo debe tener como nombre `nyc-taxi-model-prefect` en el `model registry` para diferenciarlo del modelo que ya tenemos actualmente.
3. El registro debe ser automático, puede hacer uso de la función `mlflow.register_model` que ya saben utilizar de la tarea anterior.
4. Si recuerdan, esa función requiere un `run_uri`, el cual se conformaba algo como `f"runs:/{run_id}/model"`. Nosotros buscabamos manualmente el `run_id`, pero `mlflow` tiene manera de hacer una query y ordenar los resultados por métrica en orden ascendente y devolver dicho `run_id`.
5. Busque dicha función e impleméntela en la lógica para que sea de manera automática la obtención del `run_uri`. Use la documentación oficial de `mlflow`, saber leer y usar la documentación es una skill que como Data Scientist también debe tener. PROHIBIDO EL USO DE CHATGPT.
6. Asígnele a esa versión del modelo registrado, el alias de `@champion`.
7. Corra el pipeline y verifique que se haya registrado el modelo en el `model registry`.

## Tarea 6

1. Terminar la actividad anterior.
2. Crear un nuevo script llamado `train_challenger.py` dentro del directorio `training pipeline`
3. Convertir la tarea 5 en un `flow` (en donde entrenó dos modelos, al mejor le asignó el alias `@challenger` comparó el desempeño con el `@champion`).
4. El `flow` debe escoger quién es el nuevo `@champion` y asignarle el alias respectivo.
5. Recuerde definir las `task` que considere
6. Recuerde que los nombres de los `flow` deben ser únicos, que no interfiera con el `flow` del ejercicio anterior.
7. Use el mismo nombre del experimeto del ejercicio anterior `nyc-taxi-experiment-prefect`
8. Registre los modelos en el mismo `Model Registry` del ejercicio anterior `nyc-taxi-model-prefect`
9. Cree un `PR` con los cambios hecho en esta branch hacia `main` y hacer el merge.

## Fecha de entrega:
Martes 4 de Noviembre antes de la clase: 19:55