# Tarea 15: Pipelines ETL, DataOps y Orquestaci√≥n con Prefect




## Parte 1 ‚Äî Investigaci√≥n: Conceptos Fundamentales de Prefect (15 min)

Antes de escribir c√≥digo, investigamos la documentaci√≥n oficial de Prefect y respondemos las siguientes preguntas. **Deben incluirse citas o referencias espec√≠ficas de la documentaci√≥n.**


### 1.1 Tasks en Prefect

**Documentaci√≥n oficial:** [Prefect Tasks](https://docs.prefect.io/latest/concepts/tasks/)

#### 1. ¬øQu√© es una Task en Prefect?

**Respuesta:**

Una **Task** en Prefect es una unidad de trabajo individual que representa una operaci√≥n espec√≠fica dentro de un pipeline. Seg√∫n la [documentaci√≥n oficial de Prefect](https://docs.prefect.io/latest/concepts/tasks/), una task es "a discrete piece of work that can be tracked and retried independently". 

Las tasks son funciones Python decoradas con `@task` que encapsulan una l√≥gica de trabajo espec√≠fica (por ejemplo, extraer datos, transformar un DataFrame, cargar en una base de datos). Son la unidad b√°sica de ejecuci√≥n en Prefect y pueden tener dependencias entre ellas, lo que permite que Prefect construya autom√°ticamente un DAG (grafo ac√≠clico dirigido) de ejecuci√≥n.

**Cu√°ndo usarla:** Se usa cuando necesitas dividir un flujo de trabajo en pasos discretos, reutilizables y rastreables. Cada task debe realizar una acci√≥n espec√≠fica y bien definida.


#### 2. ¬øQu√© significa que las Tasks sean "lazily evaluated"?

**Respuesta:**

La evaluaci√≥n diferida ("lazy evaluation") significa que las tasks **no se ejecutan inmediatamente** cuando se definen o cuando se llama a la funci√≥n decorada. En su lugar, Prefect construye una representaci√≥n del grafo de dependencias primero.

Cuando llamas a una task dentro de un flow, Prefect no ejecuta el c√≥digo de la task de inmediato. En lugar de eso, registra la llamada y las dependencias. La ejecuci√≥n real ocurre cuando se ejecuta el flow completo, momento en el cual Prefect resuelve las dependencias y ejecuta las tasks en el orden correcto.

Esto permite que Prefect optimice la ejecuci√≥n, maneje dependencias complejas, y construya el DAG antes de ejecutar cualquier cosa. Seg√∫n la [documentaci√≥n de Prefect](https://docs.prefect.io/latest/concepts/flows/), esto es parte del concepto de "imperative orchestration" donde defines el flujo de manera imperativa pero Prefect lo ejecuta de manera declarativa.


#### 3. ¬øQu√© son los Task States?

**Respuesta:**

Los **Task States** representan el estado actual de una task durante su ciclo de vida. Seg√∫n la [documentaci√≥n de Prefect sobre States](https://docs.prefect.io/latest/concepts/states/), algunos estados posibles son:

| Estado | ¬øCu√°ndo ocurre? |
|--------|----------------|
| **PENDING** | La task est√° esperando ser ejecutada. Estado inicial cuando se crea la task y est√° en cola. |
| **RUNNING** | La task est√° actualmente en ejecuci√≥n. Ocurre cuando el c√≥digo de la task est√° siendo procesado activamente. |
| **COMPLETED** | La task se ejecut√≥ exitosamente y retorn√≥ un resultado. Ocurre cuando la ejecuci√≥n termina sin errores. |
| **FAILED** | La task fall√≥ durante su ejecuci√≥n debido a una excepci√≥n. Ocurre cuando se lanza una excepci√≥n no capturada. |
| **RETRYING** | La task est√° siendo reintentada despu√©s de un fallo. Ocurre cuando se configura `retries` y la task falla, antes del siguiente intento. |
| **CANCELLED** | La task fue cancelada antes de completarse. Ocurre cuando el flow es cancelado o interrumpido externamente. |
| **CRASHED** | La task fall√≥ de manera inesperada o no manejada. Estado de fallo cr√≠tico cuando no hay manejo de errores. |


#### 4. ¬øQu√© par√°metros importantes tiene el decorador `@task`?

**Respuesta:**


Seg√∫n la [documentaci√≥n de Prefect Tasks](https://docs.prefect.io/latest/api-ref/prefect/tasks/), algunos par√°metros importantes son:

| Par√°metro | ¬øQu√© hace? | Ejemplo de uso |
|-----------|------------|----------------|
| **`retries`** | Define cu√°ntas veces reintentar la task si falla. Por defecto es 0. | `@task(retries=3)` - La task intentar√° hasta 3 veces antes de fallar definitivamente. |
| **`retry_delay_seconds`** | Tiempo de espera entre reintentos en segundos. √ötil para tareas que pueden fallar temporalmente. | `@task(retries=2, retry_delay_seconds=5)` - Espera 5 segundos entre reintentos. |
| **`timeout_seconds`** | Tiempo m√°ximo de ejecuci√≥n para la task. Si excede, se cancela autom√°ticamente. | `@task(timeout_seconds=300)` - La task debe completarse en 5 minutos o se cancela. |
| **`cache_key_fn`** | Funci√≥n personalizada para generar la clave de cach√©. Permite controlar cu√°ndo se cachea un resultado. | `@task(cache_key_fn=lambda task, context: context['parameters']['date'])` - Cachea basado en un par√°metro espec√≠fico. |
| **`cache_expiration`** | Duraci√≥n para la cual el resultado cacheado es v√°lido. Puede ser un timedelta. | `@task(cache_expiration=timedelta(hours=1))` - El cache expira despu√©s de 1 hora. |
| **`tags`** | Etiquetas para organizar y filtrar tasks. √ötil para monitoreo y organizaci√≥n. | `@task(tags=["extract", "production"])` - Etiqueta la task para filtrado y organizaci√≥n. |
| **`log_prints`** | Si es True, captura los prints como logs de Prefect. Facilita el debugging. | `@task(log_prints=True)` - Los prints se registran autom√°ticamente en los logs de Prefect. |


### 1.2 Flows en Prefect

**Documentaci√≥n oficial:** [Prefect Flows](https://docs.prefect.io/latest/concepts/flows/)

#### 1. ¬øCu√°l es la diferencia entre un Flow y una Task? ¬øPor qu√© necesitamos ambos?

**Respuesta:**

Seg√∫n la [documentaci√≥n de Prefect](https://docs.prefect.io/latest/concepts/flows/), un **Flow** es un contenedor que orquesta y coordina la ejecuci√≥n de m√∫ltiples Tasks. Mientras que una Task representa una unidad de trabajo individual, un Flow representa el flujo de trabajo completo que conecta m√∫ltiples Tasks.

**Diferencias clave:**
- **Task**: Unidad b√°sica de trabajo, realiza una acci√≥n espec√≠fica
- **Flow**: Orquesta m√∫ltiples tasks, define las dependencias y el orden de ejecuci√≥n

**¬øPor qu√© necesitamos ambos?**
- Las Tasks nos permiten dividir el trabajo en componentes reutilizables y rastreables
- Los Flows nos permiten coordinar las Tasks, manejar dependencias autom√°ticamente, y proporcionar un contexto de ejecuci√≥n unificado. Sin Flows, las Tasks ser√≠an funciones aisladas sin orquestaci√≥n.


#### 2. ¬øQu√© es un "subflow"? ¬øCu√°ndo ser√≠a √∫til usar subflows?

**Respuesta:**

Un **subflow** es un Flow que se puede llamar desde dentro de otro Flow. Seg√∫n la [documentaci√≥n de Prefect sobre subflows](https://docs.prefect.io/latest/concepts/flows/#composing-flows), cuando llamas a un flow decorado con `@flow` desde dentro de otro flow, autom√°ticamente se convierte en un subflow.

**Cu√°ndo ser√≠a √∫til:**
- **Modularidad**: Dividir un pipeline grande en m√≥dulos m√°s peque√±os y manejables
- **Reutilizaci√≥n**: Reutilizar flujos comunes en m√∫ltiples pipelines principales
- **Organizaci√≥n**: Organizar l√≥gicamente grupos relacionados de tasks
- **Debugging**: Facilitar el debugging al aislar secciones del pipeline
- **Testing**: Probar secciones espec√≠ficas del pipeline de manera independiente

Por ejemplo, podr√≠as tener un subflow `extract_and_validate()` que se use en m√∫ltiples pipelines ETL diferentes.


#### 3. ¬øC√≥mo maneja Prefect las dependencias entre tasks? Expliquen el concepto de DAG impl√≠cito.

**Respuesta:**

Prefect maneja las dependencias entre tasks de manera **autom√°tica e impl√≠cita** a trav√©s de lo que se conoce como "DAG impl√≠cito". Seg√∫n la [documentaci√≥n de Prefect](https://docs.prefect.io/latest/concepts/flows/#task-dependencies), Prefect detecta autom√°ticamente las dependencias bas√°ndose en c√≥mo se llaman las tasks dentro de un flow.

**C√≥mo funciona:**
1. Cuando llamas a una task dentro de un flow y pasas el resultado de otra task como par√°metro, Prefect autom√°ticamente detecta esa dependencia
2. Prefect construye un DAG (grafo ac√≠clico dirigido) bas√°ndose en estas dependencias impl√≠citas
3. Las tasks se ejecutan en el orden correcto seg√∫n sus dependencias, no necesariamente en el orden en que aparecen en el c√≥digo

**Ejemplo:**
```python
@flow
def my_flow():
    data = extract_data()  # Task 1
    transformed = transform_data(data)  # Task 2 depende de Task 1
    load_data(transformed)  # Task 3 depende de Task 2
```

Prefect detecta autom√°ticamente que `transform_data` depende de `extract_data`, y `load_data` depende de `transform_data`, construyendo un DAG sin necesidad de especificarlo expl√≠citamente.


### 1.3 Investigaci√≥n avanzada: Results y Caching

**Documentaci√≥n oficial:** [Prefect Results](https://docs.prefect.io/latest/concepts/results/) y [Caching](https://docs.prefect.io/latest/concepts/tasks/#caching)

#### 1. ¬øQu√© es el "result persistence"? ¬øPor qu√© es importante en pipelines de datos?

**Respuesta:**

El **result persistence** se refiere a la capacidad de Prefect de almacenar y recuperar los resultados de las tasks. Seg√∫n la [documentaci√≥n de Prefect sobre Results](https://docs.prefect.io/latest/concepts/results/), Prefect puede persistir los resultados de las tasks en diferentes backends (local, S3, GCS, etc.).

**¬øPor qu√© es importante en pipelines de datos?**
- **Resiliencia**: Si un flow falla a mitad de camino, no necesitas re-ejecutar todas las tasks desde el inicio
- **Debugging**: Puedes inspeccionar los resultados intermedios para entender qu√© sali√≥ mal
- **Reproducibilidad**: Puedes reproducir exactamente el mismo estado de ejecuci√≥n
- **Eficiencia**: En caso de fallo, solo necesitas re-ejecutar desde el punto de fallo
- **Auditor√≠a**: Permite rastrear y validar los resultados de cada etapa del pipeline


#### 2. ¬øC√≥mo funciona el caching en Prefect? ¬øQu√© par√°metro usar√≠an para cachear el resultado de una task?

**Respuesta:**

El **caching** en Prefect permite que las tasks eviten la re-ejecuci√≥n si ya se ejecutaron previamente con los mismos par√°metros. Seg√∫n la [documentaci√≥n de Prefect sobre Caching](https://docs.prefect.io/latest/concepts/tasks/#caching), Prefect genera autom√°ticamente una clave de cach√© bas√°ndose en los par√°metros de entrada de la task.

**Par√°metros para cachear:**
- **`cache_key_fn`**: Funci√≥n personalizada para generar la clave de cach√©
- **`cache_expiration`**: Duraci√≥n de validez del cach√© (ej: `timedelta(hours=1)`)
- **`refresh_cache`**: Forzar la invalidaci√≥n del cach√©

**Ejemplo:**
```python
@task(cache_expiration=timedelta(hours=1))
def extract_data(date: str):
    # Esta task solo se ejecutar√° una vez por hora para la misma fecha
    return fetch_data(date)
```

Si la misma task se llama con los mismos par√°metros dentro de la ventana de expiraci√≥n, Prefect retornar√° el resultado cacheado en lugar de re-ejecutar la task.


#### 3. ¬øQu√© es una `cache_key_fn`? Den un ejemplo de cu√°ndo la usar√≠an.

**Respuesta:**

Una **`cache_key_fn`** es una funci√≥n personalizada que se usa para generar la clave de cach√© de una task. Seg√∫n la [documentaci√≥n de Prefect](https://docs.prefect.io/latest/concepts/tasks/#cache-key-functions), permite controlar exactamente qu√© factores determinan si una task debe ser re-ejecutada o usar el cach√©.

**Ejemplo de uso:**

```python
@task(
    cache_key_fn=lambda task, context: f"{context['parameters']['date']}_{context['parameters']['region']}"
)
def extract_regional_data(date: str, region: str, api_key: str):
    # Esta task se cachea bas√°ndose en date y region, pero NO en api_key
    return fetch_data(date, region, api_key)
```

**Cu√°ndo usarla:**
- Cuando solo quieres cachear bas√°ndote en **algunos par√°metros** (no todos)
- Cuando necesitas incluir factores externos (como la hora del d√≠a, estado de archivos) en la clave de cach√©
- Cuando quieres invalidar el cach√© bas√°ndote en l√≥gica de negocio compleja
- Por ejemplo: cachear datos de ventas solo bas√°ndote en la fecha, ignorando otros par√°metros como IDs de usuario que no afectan el resultado


## Parte 2 ‚Äî Dise√±o Conceptual (5 min)

Definimos un **escenario simple** para nuestro pipeline.


### 2.1 Arquitectura del escenario

**Escenario elegido**: Ventas de un e-commerce con datos de transacciones diarias

| Rol | ¬øQui√©n ser√≠a en nuestro escenario? |
|-----|-----------------------------------|
| **Business data owner** | Equipo de ventas/negocios que genera las transacciones diarias en la plataforma e-commerce |
| **Data engineers** | Equipo de ingenier√≠a de datos que construye y mantiene el pipeline ETL para procesar y cargar las ventas |
| **Data consumers** | Analistas de negocio, cient√≠ficos de datos y dashboards que consumen los datos procesados para an√°lisis y reportes |


### 2.2 Tipo de pipeline

* **Tipo elegido**: Batch
* **Justificaci√≥n**: Las ventas se procesan diariamente en lotes. No necesitamos procesamiento en tiempo real para este caso de uso inicial. El procesamiento batch es m√°s eficiente para an√°lisis agregados y permite procesar grandes vol√∫menes de datos de manera controlada. Adem√°s, facilita la validaci√≥n y el manejo de errores antes de cargar los datos procesados.


## Parte 3 ‚Äî Implementaci√≥n del Pipeline Base (20 min)


### 3.1 Setup

In [1]:
# Instalaci√≥n de Prefect
# !pip install -q prefect pandas

# Importar librer√≠as
from prefect import flow, task
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ Entorno configurado correctamente")
print(f"üìÖ Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M')}")


ModuleNotFoundError: No module named 'prefect'

### 3.2 Implementar Tasks

Bas√°ndose en lo que investigamos en la Parte 1, implementamos las tasks con los decoradores y par√°metros correctos.


In [None]:
# === TASK 1: EXTRACT ===
# Decorador @task con par√°metros √∫tiles: tags para organizaci√≥n y log_prints para capturar prints
@task(tags=["extract", "data-source"], log_prints=True)
def extract_data():
    """
    Extrae datos de la fuente.
    Simula la extracci√≥n de datos de ventas de un e-commerce.
    """
    np.random.seed(42)
    n_rows = 100

    data = {
        'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
        'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'cantidad': np.random.randint(1, 50, n_rows),
        'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
        'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
    }

    df = pd.DataFrame(data)
    print(f"üì• Extra√≠dos {len(df)} registros")
    return df


In [None]:
# === TASK 2: TRANSFORM ===
# Decorador @task con tags para organizaci√≥n
@task(tags=["transform", "data-processing"], log_prints=True)
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aplica transformaciones a los datos.
    Calcula totales, categor√≠as de ticket size, y limpia los datos.
    """
    # Calcular total
    df['total'] = df['cantidad'] * df['precio_unitario']
    
    # Categorizar por ticket size
    df['ticket_size'] = pd.cut(
        df['total'],
        bins=[0, 100, 500, 1000, float('inf')],
        labels=['Bajo', 'Medio', 'Alto', 'Muy Alto']
    )
    
    # Agregar mes y d√≠a de la semana para an√°lisis
    df['mes'] = df['fecha'].dt.month
    df['dia_semana'] = df['fecha'].dt.day_name()
    
    print(f"üîÑ Transformados {len(df)} registros")
    print(f"   Total vendido: ${df['total'].sum():,.2f}")
    
    return df


In [None]:
# === TASK 3: LOAD ===
# Decorador @task con tags y retries por si falla la escritura
@task(tags=["load", "data-output"], log_prints=True, retries=2, retry_delay_seconds=3)
def load_data(df: pd.DataFrame, output_path: str = "ventas_procesadas.csv") -> str:
    """
    Carga los datos transformados en el destino.
    En producci√≥n, esto podr√≠a ser una base de datos, data warehouse, etc.
    """
    df.to_csv(output_path, index=False)
    print(f"üíæ Datos cargados en: {output_path}")
    print(f"   Registros guardados: {len(df)}")
    return output_path


### 3.3 Implementar Flow

Ahora creamos el flow principal que orquesta las tasks.


In [None]:
# === FLOW PRINCIPAL ===
@flow(name="ETL Pipeline Ventas", log_prints=True)
def etl_flow():
    """
    Flow principal que orquesta el pipeline ETL completo.
    Prefect detecta autom√°ticamente las dependencias entre tasks:
    - transform_data depende de extract_data
    - load_data depende de transform_data
    """
    print("üöÄ Iniciando pipeline ETL...")
    
    # Paso 1: Extraer datos
    df_raw = extract_data()
    
    # Paso 2: Transformar datos (depende de extract_data)
    df_transformed = transform_data(df_raw)
    
    # Paso 3: Cargar datos (depende de transform_data)
    output_file = load_data(df_transformed)
    
    print(f"‚úÖ Pipeline completado. Archivo generado: {output_file}")
    return output_file


### 3.4 Ejecutar el Flow

Ejecutamos el flow para ver c√≥mo funciona Prefect:


In [None]:
# Ejecutar el flow
if __name__ == "__main__":
    result = etl_flow()
    print(f"\nüéâ Flow ejecutado exitosamente. Resultado: {result}")


In [None]:
# Ejecutar el flow (en notebook)
result = etl_flow()
print(f"\nüéâ Flow ejecutado exitosamente. Resultado: {result}")


### 3.4 Preguntas de observaci√≥n

#### 1. ¬øQu√© observan en los logs de Prefect?
**Respuesta:** Los logs muestran el estado de cada task (PENDING ‚Üí RUNNING ‚Üí COMPLETED), los mensajes de print capturados por `log_prints=True`, y un resumen del flujo de ejecuci√≥n. Prefect proporciona informaci√≥n detallada sobre el tiempo de ejecuci√≥n de cada task y el flujo completo.

#### 2. ¬øC√≥mo se construy√≥ el DAG?
**Respuesta:** Prefect construy√≥ el DAG autom√°ticamente bas√°ndose en las dependencias impl√≠citas detectadas cuando pasamos el resultado de una task como par√°metro de otra. El DAG es: `extract_data` ‚Üí `transform_data` ‚Üí `load_data`.

#### 3. ¬øQu√© pasar√≠a si una task falla?
**Respuesta:** Si una task falla, Prefect marca el estado como FAILED y detiene la ejecuci√≥n del flow (a menos que configuremos manejo de errores). Las tasks dependientes no se ejecutar√°n. Con `retries` configurado, Prefect reintentar√° la task antes de marcar como fallida.


## Parte 4 ‚Äî Investigaci√≥n: Funcionalidades Avanzadas (15 min)


### 4.1 Retries y manejo de errores

**Investigaci√≥n:** [Prefect Retries](https://docs.prefect.io/latest/concepts/tasks/#retries)

**Respuestas:**

1. **¬øC√≥mo funcionan los retries?** Prefect permite configurar reintentos autom√°ticos cuando una task falla. Se configuran con `@task(retries=N, retry_delay_seconds=X)` donde N es el n√∫mero de reintentos y X es el tiempo de espera entre intentos.

2. **¬øCu√°ndo usar retries?** √ötil para operaciones que pueden fallar temporalmente (APIs, conexiones de red, operaciones de I/O). No es recomendable para errores de l√≥gica que siempre fallar√°n.

3. **Ya lo implementamos:** En la task `load_data()` configuramos `retries=2, retry_delay_seconds=3` para manejar posibles fallos en la escritura del archivo.


### 4.2 Caching de resultados

**Investigaci√≥n:** [Prefect Caching](https://docs.prefect.io/latest/concepts/tasks/#caching)

**Respuestas:**

1. **¬øC√≥mo funciona el caching?** Prefect genera autom√°ticamente una clave de cach√© basada en los par√°metros de entrada. Si la misma task se ejecuta con los mismos par√°metros, retorna el resultado cacheado en lugar de re-ejecutar.

2. **Par√°metros clave:** `cache_expiration` (timedelta) y `cache_key_fn` (funci√≥n personalizada).

3. **Cu√°ndo usar:** Para tasks costosas que pueden reutilizar resultados (extracciones de APIs, procesamiento pesado) cuando los par√°metros no cambian.


### 4.3 Logging personalizado

**Investigaci√≥n:** [Prefect Logging](https://docs.prefect.io/latest/concepts/logs/)

**Respuestas:**

1. **¬øC√≥mo usar logging?** Se puede usar `get_run_logger()` dentro de una task para obtener un logger estructurado, o configurar `log_prints=True` para capturar autom√°ticamente los prints.

2. **Ya lo implementamos:** Usamos `log_prints=True` en todas nuestras tasks para que los prints se registren como logs estructurados de Prefect.

3. **Ventajas:** Logs centralizados, niveles de log (INFO, WARNING, ERROR), y integraci√≥n con la UI de Prefect para debugging.


### 4.4 Concurrencia y paralelismo

**Investigaci√≥n:** [Prefect Task Runners](https://docs.prefect.io/latest/concepts/task-runners/)

**Respuestas:**

1. **¬øC√≥mo funciona?** Prefect soporta ejecuci√≥n concurrente usando `ConcurrentTaskRunner()` como par√°metro del flow. Las tasks independientes pueden ejecutarse en paralelo.

2. **Cu√°ndo usar:** Cuando tienes m√∫ltiples tasks que no dependen una de otra y quieres reducir el tiempo total de ejecuci√≥n.

3. **Ejemplo:** Si procesamos datos por regi√≥n, podemos usar `futures = [process_region.submit(r) for r in regions]` para ejecutar en paralelo.


## Parte 5 ‚Äî Investigaci√≥n: Deployments y Scheduling (10 min)

**Documentaci√≥n:** [Deployments](https://docs.prefect.io/latest/concepts/deployments/) y [Schedules](https://docs.prefect.io/latest/concepts/schedules/)


### 5.1 Conceptos de Deployment

**Respuestas basadas en la documentaci√≥n:**

1. **¬øQu√© es un Deployment en Prefect?** Un Deployment es una configuraci√≥n que permite ejecutar un Flow de manera programada o bajo demanda. Seg√∫n la [documentaci√≥n](https://docs.prefect.io/latest/concepts/deployments/), un Deployment conecta un Flow con una configuraci√≥n de ejecuci√≥n espec√≠fica (scheduling, work pool, par√°metros). La diferencia con un Flow es que un Flow es el c√≥digo, mientras que un Deployment es la "instalaci√≥n" del Flow en Prefect Cloud/Server para ejecutarlo.

2. **¬øQu√© es un Work Pool?** Un Work Pool es un grupo de workers que pueden ejecutar flows. Seg√∫n la [documentaci√≥n](https://docs.prefect.io/latest/concepts/work-pools/), permite organizar d√≥nde y c√≥mo se ejecutan los flows (local, servidor, cloud). Es una abstracci√≥n para manejar la infraestructura de ejecuci√≥n.

3. **¬øQu√© es un Worker?** Un Worker es un proceso que ejecuta flows desde un Work Pool. Los Workers se conectan a un Work Pool y "toman" trabajos (flows) para ejecutarlos. La relaci√≥n es: Deployment ‚Üí Work Pool ‚Üí Worker ‚Üí Ejecuci√≥n del Flow.


### 5.2 Scheduling

**Tipos de schedules soportados por Prefect:**

| Tipo de Schedule | Descripci√≥n | Ejemplo |
|-----------------|-------------|---------|
| **CronSchedule** | Usa sintaxis cron para definir horarios recurrentes | `cron="0 6 * * *"` - Todos los d√≠as a las 6 AM |
| **IntervalSchedule** | Ejecuta a intervalos fijos de tiempo | `interval=timedelta(hours=1)` - Cada hora |
| **RRuleSchedule** | Usa reglas RFC 5545 (iCalendar) para horarios complejos | √ötil para horarios de negocio, excluir fines de semana |
| **Manual** | Sin schedule, solo ejecuci√≥n manual | Para ejecuci√≥n bajo demanda |

**1. ¬øC√≥mo expresar√≠an "ejecutar todos los d√≠as a las 6 AM" en cron?**
**Respuesta:** `"0 6 * * *"` - El formato cron es: minuto hora d√≠a_mes mes d√≠a_semana

**2. ¬øQu√© es `RRuleSchedule`?** 
**Respuesta:** RRuleSchedule permite definir horarios complejos usando reglas RFC 5545, como "cada lunes y mi√©rcoles a las 9 AM, excepto feriados" o "√∫ltimo d√≠a del mes". Es m√°s flexible que cron para casos de negocio complejos.


### 5.3 Crear un Deployment (conceptual)

Bas√°ndose en la documentaci√≥n, c√≥digo para crear un deployment del flow:


In [None]:
# TODO: C√≥digo conceptual para crear un deployment
# https://docs.prefect.io/latest/concepts/deployments/

from prefect import flow
from prefect.schedules import CronSchedule

# Opci√≥n 1: Usando serve() - m√°s simple para desarrollo
if __name__ == "__main__":
    etl_flow.serve(
        name="etl-ventas-daily",  # nombre del deployment
        cron="0 6 * * *",  # schedule: todos los d√≠as a las 6 AM
        tags=["production", "etl", "ventas"],  # tags para organizaci√≥n
    )

# Opci√≥n 2: Usando deploy() - m√°s control para producci√≥n
# etl_flow.deploy(
#     name="etl-ventas-daily",
#     work_pool_name="default",  # o un work pool espec√≠fico
#     cron="0 6 * * *",
#     tags=["production", "etl"],
# )


## Parte 6 ‚Äî Extensi√≥n DataOps (15 min)

Elegimos implementar la **Opci√≥n A ‚Äî Validaci√≥n con logging estructurado**, ya que es fundamental para la calidad de datos en pipelines ETL.


In [None]:
# === OPCI√ìN A: VALIDACI√ìN CON LOGGING ESTRUCTURADO ===
from prefect import get_run_logger

@task(retries=1, retry_delay_seconds=2, tags=["validation", "data-quality"], log_prints=True)
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Valida la calidad de los datos usando logging estructurado de Prefect.
    Esta task valida que los datos cumplan con las expectativas antes de continuar.
    
    Basado en: https://docs.prefect.io/latest/concepts/logs/
    """
    logger = get_run_logger()  # Obtiene el logger estructurado de Prefect
    errors = []

    logger.info("Iniciando validaci√≥n de datos")
    logger.info(f"DataFrame recibido con {len(df)} registros y {len(df.columns)} columnas")

    # Validaci√≥n 1: DataFrame no vac√≠o
    if len(df) <= 0:
        logger.error("DataFrame vac√≠o detectado - No se pueden procesar datos vac√≠os")
        errors.append("DataFrame vac√≠o")
    else:
        logger.info(f"‚úÖ Validaci√≥n de cantidad de registros: OK ({len(df)} registros)")

    # Validaci√≥n 2: Valores nulos
    null_counts = df.isnull().sum()
    if null_counts.sum() > 0:
        logger.warning(f"Valores nulos encontrados: {null_counts[null_counts > 0].to_dict()}")
        # No agregamos a errors si hay nulos, solo lo registramos como warning
        # En producci√≥n, podr√≠as decidir si esto es cr√≠tico o no
    else:
        logger.info("‚úÖ Validaci√≥n de valores nulos: OK (sin nulos)")

    # Validaci√≥n 3: Columnas requeridas
    required_columns = ['fecha', 'producto', 'cantidad', 'precio_unitario', 'total']
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        logger.error(f"Columnas requeridas faltantes: {missing_columns}")
        errors.append(f"Columnas faltantes: {missing_columns}")
    else:
        logger.info(f"‚úÖ Validaci√≥n de columnas requeridas: OK")

    # Validaci√≥n 4: Tipos de datos b√°sicos
    if 'total' in df.columns:
        if df['total'].dtype not in [float, 'float64', 'float32', int, 'int64', 'int32']:
            logger.error(f"Tipo de dato incorrecto para 'total': {df['total'].dtype}")
            errors.append("Tipo de dato incorrecto en columna 'total'")
        else:
            logger.info("‚úÖ Validaci√≥n de tipos de datos: OK")

    # Validaci√≥n 5: Valores negativos en campos que no deber√≠an tenerlos
    if 'cantidad' in df.columns:
        negative_qty = (df['cantidad'] < 0).sum()
        if negative_qty > 0:
            logger.warning(f"Se encontraron {negative_qty} registros con cantidad negativa")
            # Podr√≠as decidir si esto es un error cr√≠tico o solo un warning

    if errors:
        logger.error(f"Validaci√≥n fallida con {len(errors)} error(es) cr√≠tico(s)")
        raise ValueError(f"Validaci√≥n fallida: {errors}")

    logger.info("‚úÖ Validaci√≥n exitosa - Todos los checks pasaron")
    return df


Ahora actualizamos el flow para incluir la validaci√≥n:


In [None]:
# === FLOW ACTUALIZADO CON VALIDACI√ìN ===
@flow(name="ETL Pipeline Ventas con Validaci√≥n", log_prints=True)
def etl_flow_with_validation():
    """
    Flow principal que orquesta el pipeline ETL completo con validaci√≥n de calidad.
    Incluye validaci√≥n entre extract y transform para asegurar calidad de datos.
    """
    print("üöÄ Iniciando pipeline ETL con validaci√≥n...")
    
    # Paso 1: Extraer datos
    df_raw = extract_data()
    
    # Paso 2: Validar datos (EXTENSI√ìN DATAOPS)
    # Esta validaci√≥n asegura que los datos cumplen con las expectativas antes de transformar
    df_validated = validate_data(df_raw)
    
    # Paso 3: Transformar datos (solo si la validaci√≥n pasa)
    df_transformed = transform_data(df_validated)
    
    # Paso 4: Cargar datos
    output_file = load_data(df_transformed)
    
    print(f"‚úÖ Pipeline completado con validaci√≥n. Archivo generado: {output_file}")
    return output_file


In [None]:
# Ejecutar el flow con validaci√≥n
result = etl_flow_with_validation()
print(f"\nüéâ Flow con validaci√≥n ejecutado exitosamente. Resultado: {result}")


## Parte 7 ‚Äî Reflexi√≥n y Conexi√≥n con DataOps (5 min)


### 7.1 Conceptos de Prefect

**1. ¬øC√≥mo ayuda Prefect a implementar el principio de "Observabilidad" de DataOps?**

**Respuesta:** Prefect implementa observabilidad de m√∫ltiples formas:
- **Logging estructurado**: Los logs de Prefect capturan autom√°ticamente el estado, tiempo de ejecuci√≥n y mensajes de cada task
- **Estados de ejecuci√≥n**: Cada task tiene estados claros (PENDING, RUNNING, COMPLETED, FAILED) que se pueden monitorear
- **UI centralizada**: Prefect Cloud/Server proporciona una interfaz visual para monitorear todos los flows y tasks en tiempo real
- **M√©tricas**: Prefect rastrea m√©tricas como tiempo de ejecuci√≥n, tasa de √©xito, y frecuencias de fallo
- **Notificaciones**: Permite configurar alertas cuando los flows fallan

Esto permite que los equipos de datos tengan visibilidad completa sobre sus pipelines, facilitando el debugging y la detecci√≥n temprana de problemas.

**2. ¬øC√≥mo ayuda el caching a la "Reproducibilidad"?**

**Respuesta:** El caching en Prefect ayuda a la reproducibilidad de varias formas:
- **Resultados consistentes**: Al cachear resultados basados en par√°metros, aseguras que las mismas entradas produzcan los mismos resultados
- **Traza de ejecuciones**: Prefect mantiene un historial de ejecuciones cacheadas, permitiendo reproducir estados anteriores
- **Puntos de recuperaci√≥n**: Si un pipeline falla, puedes reutilizar resultados cacheados de tasks exitosas anteriores
- **Validaci√≥n**: Permite comparar resultados actuales con resultados cacheados para detectar cambios inesperados

Esto es fundamental en DataOps donde necesitas garantizar que los pipelines produzcan resultados consistentes y reproducibles.

**3. ¬øC√≥mo conectan los Deployments con "CI/CD para datos"?**

**Respuesta:** Los Deployments en Prefect son el equivalente a "releases" en CI/CD tradicional:
- **Versionado**: Los deployments permiten versionar y desplegar diferentes versiones de flows
- **Automatizaci√≥n**: Los schedules permiten automatizar la ejecuci√≥n de pipelines, similar a pipelines de CI/CD
- **Ambientes**: Puedes tener diferentes deployments para desarrollo, staging y producci√≥n
- **Rollback**: Puedes cambiar qu√© versi√≥n de un flow est√° activa, permitiendo rollback r√°pido
- **Integraci√≥n con CI/CD**: Prefect puede integrarse con sistemas de CI/CD (GitHub Actions, GitLab CI) para desplegar autom√°ticamente cambios en los flows

Esto permite aplicar pr√°cticas de DevOps a los pipelines de datos, facilitando despliegues seguros y controlados.


### 7.2 Comparaci√≥n con alternativas

**1. ¬øQu√© diferencias hay entre Prefect y Apache Airflow? Mencionen al menos 2.**

**Diferencia 1:** **Filosof√≠a de dise√±o**: Airflow usa DAGs expl√≠citos definidos en Python usando operadores, mientras que Prefect usa "DAGs impl√≠citos" donde las dependencias se detectan autom√°ticamente bas√°ndose en c√≥mo se llaman las tasks. Prefect es m√°s "Python puro" y menos verboso.

**Diferencia 2:** **Curva de aprendizaje**: Prefect tiene una curva de aprendizaje m√°s suave porque es m√°s simple de usar - solo decoras funciones con `@task` y `@flow`. Airflow requiere entender conceptos como DAGs, Operators, XComs, etc., lo que lo hace m√°s complejo para empezar.

**Otra diferencia importante:** Prefect est√° dise√±ado para ser m√°s moderno y orientado a Python nativo, mientras que Airflow fue dise√±ado originalmente m√°s como un sistema de workflow basado en DAGs.

**2. ¬øQu√© es Dagster? ¬øEn qu√© se diferencia de Prefect?**

**Respuesta:** Dagster es otra herramienta de orquestaci√≥n de pipelines de datos moderna, similar a Prefect. Las principales diferencias son:

- **Enfoque en assets**: Dagster est√° m√°s orientado a "assets" (datos y modelos) como primitivos de primera clase, mientras que Prefect est√° m√°s orientado a "tasks" y "flows"
- **Integraci√≥n con dataframes**: Dagster tiene integraci√≥n m√°s profunda con bibliotecas de datos como pandas, pyspark, etc.
- **Software-defined assets**: Dagster permite definir pipelines bas√°ndose en qu√© datos (assets) necesitas, no solo en qu√© tareas ejecutar
- **Ecosistema**: Prefect tiene un ecosistema m√°s amplio y comunidad m√°s grande, mientras que Dagster est√° m√°s enfocado en equipos de datos grandes

Ambas son alternativas modernas a Airflow, pero Dagster tiene un enfoque m√°s "data-centric" mientras que Prefect es m√°s "workflow-centric".


## Conclusiones

En esta tarea aprendimos:

1. **Conceptos fundamentales** de Prefect: Tasks, Flows, DAGs impl√≠citos, estados, y caching
2. **Implementaci√≥n pr√°ctica** de un pipeline ETL completo con Prefect
3. **Funcionalidades avanzadas**: Retries, logging estructurado, validaci√≥n de datos
4. **Deployments y scheduling**: C√≥mo desplegar y programar pipelines en producci√≥n
5. **Conexi√≥n con DataOps**: C√≥mo Prefect habilita observabilidad, reproducibilidad y CI/CD para datos

Prefect es una herramienta poderosa para orquestar pipelines de datos de manera moderna, con una curva de aprendizaje suave y excelente integraci√≥n con el ecosistema Python.

---

**Referencias:**
- [Documentaci√≥n oficial de Prefect](https://docs.prefect.io/)
- [Prefect Concepts Overview](https://docs.prefect.io/latest/concepts/)
- [Prefect GitHub Examples](https://github.com/PrefectHQ/prefect)
