# **Ciencia de Datos e Inteligencia Artificial para la industria del software**

# *Curso MLOps*

## **Edición 2023**

# 3. Orquestación

## 3.1 Introducción a la Orquestación de Flujos de Trabajo

Orquestar un flujo de trabajo de ML puede ser bastante desafiante. Hay muchos puntos potenciales de interrupción en el flujo, como indican las cruces rojas en la imagen:

![Flujo Típico](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/7985c817-ae3f-4b06-b7ad-c4a5be6efb81-2-5c4a30a6-4a39-4bd2-a17a-5533062dd67e.PNG)

Afortunadamente, existen herramientas disponibles para solucionar estos problemas. Prefect proporciona un enfoque moderno, sólido y de fácil aplicación para la orquestación de flujos de trabajo, simplificando la gestión de *pipelines* de datos complejas y permitiendo una ejecución eficiente y confiable, así como el monitoreo y la visualización de los flujos de trabajo.

A continuación se muestra una captura de pantalla de la interfaz de usuario de Prefect que ilustra cómo se podría orquestar un flujo típico:

![Interfaz de Usuario de Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/7985c817-ae3f-4b06-b7ad-c4a5be6efb81-1-2d954052-efcf-49dc-9214-2453d17ea473.PNG)

## 3.2 Introducción a Prefect

![Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/cec5f8e8-85d2-4966-8e95-6a8af1e133a9-1-c372cec2-c169-43e5-bddc-af4cc2126188.PNG)

Prefect es una herramienta de orquestación de flujos de trabajo que permite a los desarrolladores construir, observar y reaccionar ante *pipelines* de datos.

Es la forma más sencilla de transformar cualquier función Python en una unidad de trabajo que puede ser observada y orquestada. Sólo tiene que incertar el código Python y colocar algunos decoradores.

Prefect hace que sea fácil añadir reintentos automáticos, almacenamiento en caché y registro a tus funciones Python. Sólo tienes que decorar tu código con decoradores de flujo y tareas y estarás volando. 🛫

Prefect permite automatizar procesos, su objetivo principal es facilitar la gestión y ejecución de flujos de trabajo complejos y distribuidos. 

#### Beneficios

* Facilita la orquestación de flujos de trabajo complejos: Prefect permite **definir y gestionar flujos de trabajo** de manera programática utilizando código Python. Esto hace que sea más fácil representar y controlar flujos de trabajo complejos, donde las tareas deben ejecutarse en un orden específico o incluso en paralelo.

* Control de flujo y manejo de errores: Con Prefect, puedes definir reglas de flujo y **manejo de errores de manera más eficiente**. Puedes especificar qué hacer **si una tarea falla, cómo reintentarla o cómo continuar con tareas alternativas en función de condiciones específicas**.

* Programación distribuida: Prefect está diseñado para ejecutar flujos de trabajo distribuidos, lo que significa que puedes **aprovechar recursos de cómputo distribuidos**, como clústeres de máquinas o plataformas de contenedores, para ejecutar tareas en paralelo y de manera escalable.

* Monitorización y seguimiento: Prefect proporciona herramientas para **monitorear y realizar un seguimiento detallado** de la ejecución de flujos de trabajo. Esto es valioso para el diagnóstico de problemas y la mejora de la eficiencia de los flujos de trabajo.

* Flexibilidad y extensibilidad: Puedes **integrar Prefect con otras herramientas y servicios**, lo que lo hace altamente flexible y adecuado para su uso en diversos entornos y situaciones. También es extensible, lo que significa que puedes personalizar y ampliar su funcionalidad según tus necesidades específicas.

* Reproducibilidad y versionamiento: Prefect promueve las mejores prácticas de **reproducibilidad y versionamiento** al permitirte definir flujos de trabajo como código, lo que facilita la reproducción de experimentos y el seguimiento de cambios en tus flujos de trabajo a lo largo del tiempo.

En el contexto de MLOps, Prefect se utiliza comúnmente para orquestar flujos de trabajo relacionados con la preparación de datos, entrenamiento de modelos, evaluación de modelos, despliegue y monitorización. Su capacidad para gestionar flujos de trabajo complejos y distribuidos es especialmente útil en proyectos de Machine Learning donde se requiere coordinar múltiples etapas y recursos.

#### Alternativas a Prefect

* **Apache Airflow**: Es una de las herramientas más conocidas para orquestación de flujos de trabajo. Airflow permite definir flujos de trabajo como DAGs (Grafos Acíclicos Dirigidos) y ofrece una amplia gama de conectores y operadores para integrarse con diversas tecnologías y servicios.

* **Luigi**: Desarrollado por Spotify, Luigi es otra herramienta de orquestación de flujos de trabajo. Al igual que Prefect, se utiliza para definir flujos de trabajo en Python y proporciona control de flujo y manejo de errores.

* **Celery**: Aunque inicialmente se diseñó para la cola de tareas y la ejecución paralela en aplicaciones web, Celery se puede utilizar para orquestación de flujos de trabajo cuando se combina con otras bibliotecas y herramientas.

* **Kubeflow Pipelines**: Si estás trabajando en un entorno de Kubernetes, Kubeflow Pipelines es una excelente opción. Está diseñado específicamente para la orquestación de flujos de trabajo en Kubernetes y es ampliamente utilizado en el contexto de la automatización de Machine Learning.

* **AWS Step Functions**: Si estás en la nube de Amazon, AWS Step Functions es una opción para orquestar flujos de trabajo en AWS. Puedes utilizarlo para coordinar servicios y recursos de AWS en tus flujos de trabajo.

* **Microsoft Azure Data Factory**: Si trabajas en el entorno de Azure, Azure Data Factory es una herramienta que te permite crear, programar y orquestar flujos de trabajo de datos en la nube.

* **Google Cloud Composer**: Si utilizas Google Cloud, Cloud Composer es una plataforma gestionada que se basa en Apache Airflow y está diseñada para la orquestación de flujos de trabajo en Google Cloud.

* **Rundeck**: Rundeck es una herramienta de orquestación de flujos de trabajo que se centra en la automatización de tareas operativas y la gestión de infraestructura.

* **DAGsHub**: Una plataforma para gestionar flujos de trabajo de ML y Data Science que permite a los equipos colaborar en la orquestación y reproducción de flujos de trabajo.

* **Tekton**: Si estás construyendo flujos de trabajo de CI/CD en entornos de contenedores, Tekton es una opción popular que se integra bien con Kubernetes.

La elección de la herramienta dependerá de tus necesidades específicas, los servicios y tecnologías que utilices, y tus preferencias personales. Cada una de estas herramientas tiene sus propias características y ventajas, por lo que es importante evaluarlas en función de tus requerimientos y restricciones particulares.

### 3.2.1 ¿Por qué usar Prefect?

![Razones para Usar Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/24f13b5a-1120-4a68-b62f-ce0ef243ea04-1-6f845297-335f-49cb-867c-c22357338b3b.PNG)

* **Facilidad de uso:**
Prefect proporciona una interfaz de usuario amigable y una API basada en Python, lo que facilita la definición y gestión de flujos de trabajo. Le permite escribir flujos de trabajo como código, aprovechando su conocimiento e infraestructura Python existente.

* **Flexibilidad:**
Prefect ofrece un marco flexible y extensible para definir flujos de trabajo. Admite dependencias complejas y le permite manejar flujos de trabajo dinámicos impulsados por datos. Puede crear, actualizar y versionar fácilmente sus flujos de trabajo a medida que evolucionan sus necesidades.

* **Tolerancia a fallos:**
Prefect proporciona tolerancia a fallos incorporada y mecanismos de reintento. Maneja las fallas de manera elegante al volver a intentar automáticamente las tareas fallidas y recuperarse de los errores. Puede configurar manejo de errores personalizados y notificaciones para garantizar que sus flujos de trabajo se ejecuten de manera confiable.

* **Monitoreo y observabilidad:**
Prefect ofrece características completas de monitoreo y observabilidad. Proporciona un panel basado en web donde puede visualizar y seguir el estado de sus flujos de trabajo, inspeccionar detalles a nivel de tarea y monitorear métricas de ejecución. Esto permite una fácil depuración, optimización de rendimiento y solución de problemas.

* **Escalabilidad:**
Prefect está diseñado para escalar horizontalmente, lo que le permite ejecutar flujos de trabajo en una infraestructura distribuida. Admite ejecución paralela y distribuida, lo que le permite ejecutar tareas de manera concurrente en múltiples máquinas o contenedores. Esto lo hace adecuado para el procesamiento de datos a gran escala y flujos de trabajo complejos.

* **Integración y extensibilidad:**
Prefect se integra perfectamente con varias tecnologías y servicios, como bases de datos, colas de mensajes, plataformas en la nube y más. Proporciona una amplia variedad de bibliotecas de tareas y le permite extender su funcionalidad mediante definiciones de tareas personalizadas y ganchos. Esto le permite integrar Prefect en su conjunto de tecnologías existente y aprovechar el poder de su ecosistema.

* **Visibilidad y colaboración en flujos de trabajo:**
Prefect promueve la colaboración y la visibilidad entre equipos. Ofrece características como control de versiones, compartición y edición colaborativa de flujos de trabajo. Puede compartir y reutilizar fácilmente flujos de trabajo en proyectos, lo que permite una mejor colaboración y compartición de conocimientos dentro de su organización.

![Flujos de Trabajo en Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/48700e8c-7f1e-4f4b-8653-84f1c6f446b3-1-598ca6c1-9e24-4144-a8c0-a5eeb2f364a6.PNG)

![Flujo Principal y Subflujo en Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/bae279ae-826a-4ff0-8d76-6ef536bda595-1-65cf645f-3220-4309-a291-897449294d68.PNG)

![Ejemplo de Flujo en Prefect](https://stephen137.github.io/posts/MLOps_Zoomcamp_Module_3/ML_Ops_Zoomcamp_Module_3_files/figure-html/e340c1ac-f033-4ebf-ac57-c3109c692f49-1-953ecfb5-7726-457f-bdff-270d188d7b83.PNG)

En el ejemplo anterior, tenemos un flujo principal llamado "Hello Flow" que llama al subflujo "Subflow".

## Ejemplo introductio 1

### a. Clonar el repositorio de Prefect en GitHub

Para clonar el repositorio de Prefect, navegue hasta el directorio donde desea clonar y ejecute el siguiente comando desde la línea de comandos:

In [6]:
!git clone https://github.com/discdiver/prefect-mlops-zoomcamp.git

fatal: destination path 'prefect-mlops-zoomcamp' already exists and is not an empty directory.


### b. Configurar un entorno Conda

Ahora, desde dentro de nuestro repo clonado, vamos a crear un entorno conda usando lo siguiente:

```bash
conda create -n prefect-ops python==3.9.12
```
y activamos el entorno usando :

```bash
conda activate prefect-ops
```   
Comprueba rápidamente que estamos utilizando la versión correcta de Python :

```bash
python -V
```
A continuación, pip install las dependencias incluidas en el archivo requirements.txt :

```bash
conda install pip
```

```bash
pip install -r requirements.txt
```
Para ver el contenido del .txt, escribe en la terminal

```bash
cat requirements.txt
```

### c. Iniciar un servidor Prefect

Podemos iniciar un servidor Prefect desde la línea de comandos:

```bash
prefect server start
```

Vamos a tomar la URL de la API y asegurarnos de que la aplicamos a nuestra configuración de Prefect para que estemos apuntando a la URL correcta de la API.

Dentro de una nueva terminal, navega al mismo directorio que antes, activa el entorno conda y establece la URL de la API:

```bash
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
```

Bien, ahora naveguemos a la carpeta 3.2 donde se encuentran los scripts que usaremos para ilustrar:

```bash
cat_facts.py
```

En caso de algun error: 

```bash
pip install --upgrade pydantic prefect
```

A continuacion definiremos una 

In [1]:
import httpx
from prefect import flow, task


@task(retries=4, retry_delay_seconds=0.1, log_prints=True) # decorator
def fetch_cat_fact():
    cat_fact = httpx.get("https://f3-vyx5c2hfpq-ue.a.run.app/")
    #An endpoint that is designed to fail sporadically
    if cat_fact.status_code >= 400:
        raise Exception()
    print(cat_fact.text) # this will be logged


@flow
def fetch():
    fetch_cat_fact()


if __name__ == "__main__":
    fetch()

Este código está utilizando la biblioteca Prefect para definir un flujo de trabajo simple que se encarga de realizar una solicitud HTTP a una página web y luego imprimir el contenido de la respuesta. A continuación la estructura y lo que está haciendo este código:

1. Importaciones:
   ```python
   import httpx
   from prefect import flow, task
   ```
   - `httpx` es una biblioteca que se utiliza para realizar solicitudes HTTP.
   - `prefect` es la biblioteca de orquestación de flujos de trabajo que se está utilizando para definir el flujo de trabajo.

2. Decorador `@task`:
   ```python
   @task(retries=4, retry_delay_seconds=0.1, log_prints=True)
   ```
   - Esto define una tarea llamada `fetch_cat_fact`. El decorador `@task` se utiliza para marcar esta función como una tarea que forma parte del flujo de trabajo Prefect.
   - `retries=4` indica que esta tarea se volverá a intentar hasta 4 veces en caso de que falle.
   - `retry_delay_seconds=0.1` establece un retraso de 0.1 segundos entre reintentos.
   - `log_prints=True` permite que los mensajes de registro generados dentro de esta tarea se impriman en la salida estándar.

3. Función `fetch_cat_fact()`:
   ```python
   def fetch_cat_fact():
       cat_fact = httpx.get("https://f3-vyx5c2hfpq-ue.a.run.app/")
       if cat_fact.status_code >= 400:
           raise Exception()
       print(cat_fact.text)
   ```
   - Esta función realiza una solicitud HTTP a la URL "https://f3-vyx5c2hfpq-ue.a.run.app/" utilizando `httpx.get()`.
   - Luego, verifica si el código de estado de la respuesta es mayor o igual a 400, lo que generalmente indica un error HTTP. Si es así, se lanza una excepción.
   - Finalmente, imprime el contenido de la respuesta en la salida estándar.

4. Función `fetch()`:
   ```python
   @flow
   def fetch():
       fetch_cat_fact()
   ```
   - Esta función `fetch()` se marca como un flujo de trabajo Prefect utilizando el decorador `@flow`.
   - En este caso, el flujo de trabajo es bastante simple y consiste en llamar a la tarea `fetch_cat_fact()`.

5. Ejecución del flujo de trabajo:
   ```python
   if __name__ == "__main__":
       fetch()
   ```
   - Esta sección verifica si el script se está ejecutando como el programa principal (`__name__ == "__main__"`).
   - Si es así, se inicia la ejecución del flujo de trabajo `fetch()`.

En resumen, este código define un flujo de trabajo Prefect que realiza una solicitud HTTP a una URL y maneja posibles errores HTTP. La respuesta de la solicitud se imprime en la salida estándar. La estructura es típica de cómo se definen y ejecutan flujos de trabajo utilizando la biblioteca Prefect en Python.

La función que llama a la API ha sido decorada con un decorador de tareas que ha sido configurado con los argumentos retries, retry_delay_seconds, y log_prints.

Podemos ejecutar el flujo desde la línea de comandos utilizando:

```bash
python cat_facts.py
```

y seguir la ejecución en directo desde la UPI de Prefect.

Como podemos ver, el flujo falló un par de veces, pero el argumento de reintentos incluido en el decorador funcionó. Y ya tenemos nuestro hecho del gato registrado:

"A diferencia de los perros, los gatos no han sufrido grandes cambios durante su proceso de domesticación".

**Ejecutemos ahora el otro script:**

```bash
python cat_dog_facts.py
```

In [2]:
import httpx
from prefect import flow

@flow
def fetch_cat_fact():
    '''A flow that gets a cat fact'''
    return httpx.get("https://catfact.ninja/fact?max_length=140").json()["fact"]

@flow
def fetch_dog_fact():
    '''A flow that gets a dog fact'''
    return httpx.get(
        "https://dogapi.dog/api/v2/facts",
        headers={"accept": "application/json"},
    ).json()["data"][0]["attributes"]["body"] # index into the JSON file to retrieve the "body" - see below for example of JSON format

@flow(log_prints=True)
def animal_facts():
    cat_fact = fetch_cat_fact()
    dog_fact = fetch_dog_fact()
    print(f"🐱: {cat_fact} \n🐶: {dog_fact}")

if __name__ == "__main__":
    animal_facts()

Este script utiliza Subflows. El flujo principal `animal_facts` llama a `fetch_cat_fact` y luego a `fetch_dog_fact`. Ten en cuenta que debido a `log_prints=True`, la salida de los flujos se imprime y registra.

🐱: En la versión original italiana de La Cenicienta, la figura benévola del hada madrina era un gato.

🐶: La canción de The Beatles "A Day in the Life" tiene un silbido extra agudo, audible solo para los perros. Fue grabado por Paul McCartney para el disfrute de su perro pastor de Shetland.



## 3.3 Flujo de Trabajo de Prefect

### 3.3.1 Predicción ride duration trip (Notebook del Módulo 2)

En la primera clase construimos un modelo simple de predicción de duración de viaje en taxi en la ciudad de Nueva York. El código fue compilado deliberadamente en un cuaderno Jupyter, el cual involucra el proceso de pensamiento y los pasos generales involucrados en la exploración y preprocesamiento de datos crudos para el entrenamiento de aprendizaje automático. Si bien un cuaderno Jupyter es adecuado para la experimentación interna, cuando se trata de producción/despliegue, quizás necesitamos algo más robusto y escalable.

Cuando el código está disperso en un cuaderno Jupyter, las cosas pueden volverse rápidamente desordenadas, especialmente cuando comienzas a iterar sobre diferentes modelos y parámetros. Puede haber una sensación de que estás perdiendo el control sobre tu experimento. Un primer paso hacia la mejora es unificar todo en un solo script de Python.

### 3.3.2 Predicción de la duración del viaje en un script de Python

Un primer paso hacia la mejora es unificar todo en un solo script de Python:

`orchestrate_pre_prefect.py`

Este script unifica todos los procedimientos necesarios para entrenar el modelo de aprendizaje automatico desarrollado preivamente.

   ```python
import pathlib
import pickle
import pandas as pd
import numpy as np
import scipy
import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import mlflow
import xgboost as xgb
from prefect import flow, task


def read_data(filename: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    df = pd.read_parquet(filename)

    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

    df["duration"] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df


def add_features(
    df_train: pd.DataFrame, df_val: pd.DataFrame
) -> tuple(
    [
        scipy.sparse._csr.csr_matrix,
        scipy.sparse._csr.csr_matrix,
        np.ndarray,
        np.ndarray,
        sklearn.feature_extraction.DictVectorizer,
    ]
):
    """Add features to the model"""
    df_train["PU_DO"] = df_train["PULocationID"] + "_" + df_train["DOLocationID"]
    df_val["PU_DO"] = df_val["PULocationID"] + "_" + df_val["DOLocationID"]

    categorical = ["PU_DO"]  #'PULocationID', 'DOLocationID']
    numerical = ["trip_distance"]

    dv = DictVectorizer()

    train_dicts = df_train[categorical + numerical].to_dict(orient="records")
    X_train = dv.fit_transform(train_dicts)

    val_dicts = df_val[categorical + numerical].to_dict(orient="records")
    X_val = dv.transform(val_dicts)

    y_train = df_train["duration"].values
    y_val = df_val["duration"].values
    return X_train, X_val, y_train, y_val, dv


def train_best_model(
    X_train: scipy.sparse._csr.csr_matrix,
    X_val: scipy.sparse._csr.csr_matrix,
    y_train: np.ndarray,
    y_val: np.ndarray,
    dv: sklearn.feature_extraction.DictVectorizer,
) -> None:
    """train a model with best hyperparams and write everything out"""

    with mlflow.start_run():
        train = xgb.DMatrix(X_train, label=y_train)
        valid = xgb.DMatrix(X_val, label=y_val)

        best_params = {
        'learning_rate': 0.4434065752589766,
        'max_depth': 81,
        'min_child_weight': 10.423237853746643,
        'objective': 'reg:linear',
        'reg_alpha': 0.2630756846813668,
        'reg_lambda': 0.1220536223877784,
        'seed': 42    
        }

        mlflow.log_params(best_params)

        booster = xgb.train(
            params=best_params,
            dtrain=train,
            num_boost_round=3,
            evals=[(valid, "validation")],
            early_stopping_rounds=3,
        )

        y_pred = booster.predict(valid)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)

        pathlib.Path("models").mkdir(exist_ok=True)
        with open("models/preprocessor.b", "wb") as f_out:
            pickle.dump(dv, f_out)
        mlflow.log_artifact("models/preprocessor.b", artifact_path="preprocessor")

        mlflow.xgboost.log_model(booster, artifact_path="models_mlflow")
    return None


def main_flow(
    train_path: str = "./data/yellow_tripdata_2022-01.parquet",
    val_path: str = "./data/yellow_tripdata_2022-02.parquet",
) -> None:
    """The main training pipeline"""

    # MLflow settings
    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("nyc-taxi-experiment")

    # Load
    df_train = read_data(train_path)
    df_val = read_data(val_path)

    # Transform
    X_train, X_val, y_train, y_val, dv = add_features(df_train, df_val)

    # Train
    train_best_model(X_train, X_val, y_train, y_val, dv)


if __name__ == "__main__":
    main_flow()
   ```

Primero, navega al directorio (3.3) donde se encuentra `orchestrate_pre_prefect.py` y ejecuta desde la línea de comandos:

```bash
python orchestrate_pre_prefect.py
```

* *Si hay algun error borrar la DB de Mlflow* (alembic)

Podemos agregar más refinamiento utilizando Prefect. Veamos esto en acción.

### 3.3.2 Aprovechando Prefect para mejorar el script (**orquestación**)

Podemos continuar construyendo sobre el script de Python agregando tareas y decoradores de flujo:

`orchestrate.py`

Explicación codigo: 

```python
import pathlib
import pickle
import pandas as pd
import numpy as np
import scipy
import sklearn
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import mean_squared_error
import mlflow
import xgboost as xgb
from prefect import flow, task
```

En esta parte, se realizan importaciones de bibliotecas y módulos necesarios para el flujo de trabajo. `prefect` se importa junto con otras bibliotecas de Python que se utilizarán más adelante.

```python
@task(retries=3, retry_delay_seconds=2)
def read_data(filename: str) -> pd.DataFrame:
    """Read data into DataFrame"""
    # ... (Carga y procesamiento de datos) ...
    return df
```

- `@task` es un decorador que marca la función `read_data` como una tarea Prefect. Esta tarea carga un archivo de datos en formato parquet, realiza varias transformaciones en el DataFrame y devuelve el DataFrame procesado. También tiene configuración de reintentos en caso de fallo.

```python
@task
def add_features(
    df_train: pd.DataFrame, df_val: pd.DataFrame
) -> tuple(
    [
        scipy.sparse._csr.csr_matrix,
        scipy.sparse._csr.csr_matrix,
        np.ndarray,
        np.ndarray,
        sklearn.feature_extraction.DictVectorizer,
    ]
):
    """Add features to the model"""
    # ... (Agrega características al conjunto de datos y realiza la vectorización de características) ...
    return X_train, X_val, y_train, y_val, dv
```

- La función `add_features` es otra tarea Prefect que agrega características al conjunto de datos y utiliza `DictVectorizer` para transformar características categóricas en una representación numérica. Devuelve varias matrices y objetos relacionados.

```python
@task(log_prints=True)
def train_best_model(
    X_train: scipy.sparse._csr.csr_matrix,
    X_val: scipy.sparse._csr.csr_matrix,
    y_train: np.ndarray,
    y_val: np.ndarray,
    dv: sklearn.feature_extraction.DictVectorizer,
) -> None:
    """train a model with best hyperparams and write everything out"""
    # ... (Entrenamiento de un modelo XGBoost, registro de parámetros y métricas con MLflow) ...
    return None
```

- `train_best_model` es la tercera tarea Prefect en el flujo de trabajo. Entrena un modelo XGBoost con hiperparámetros específicos, registra los parámetros y métricas de rendimiento con MLflow, y guarda el modelo entrenado y otros artefactos en el sistema de archivos.

```python
@flow
def main_flow(
    train_path: str = "./data/yellow_tripdata_2022-01.parquet",
    val_path: str = "./data/yellow_tripdata_2022-02.parquet",
) -> None:
    """The main training pipeline"""
    # ... (Configuración de MLflow, carga de datos, transformación de características y entrenamiento de modelo) ...

if __name__ == "__main__":
    main_flow()
```

- `main_flow` es la función que define el flujo de trabajo principal. Configura MLflow, carga datos de archivos parquet, realiza transformaciones de características y entrena un modelo utilizando las tareas Prefect definidas anteriormente.

- La sección final verifica si el script se está ejecutando como el programa principal (`if __name__ == "__main__"`) y, si es así, ejecuta el flujo de trabajo principal llamando a `main_flow()`.

En resumen, este código utiliza Prefect para definir un flujo de trabajo que incluye tareas para cargar y procesar datos, transformar características y entrenar un modelo de aprendizaje automático. Prefect permite organizar y ejecutar estas tareas de manera eficiente, facilitando la automatización y el seguimiento de todo el proceso.

Primero navegue hasta el directorio donde se encuentra orchestrate.py y ejecute desde la línea de comandos :

```bash
python orchestrate.py
```
Y podemos visualizar la ejecución usando el Prefect UI.

* Si llegase a ocurrir un error borrar **mlflow.db**

Una vez ejecutado explorar la corrida en la API. 