# Tarea 15: Pipelines ETL, DataOps y Orquestaci√≥n con Prefect

**Ingenier√≠a de Datos ‚Äî Universidad Cat√≥lica del Uruguay**

**Objetivo:** Dise√±ar e implementar un mini pipeline ETL con Prefect, investigando la documentaci√≥n oficial para comprender los conceptos fundamentales y explorar funcionalidades avanzadas del orquestador.

**Tiempo estimado:** 90‚Äì120 minutos

---

## Lecturas m√≠nimas (recuerdo)

- [Google Cloud: Building the data engineering driven organization](https://cloud.google.com/architecture/data-engineering-driven-organization)
- [Google Cloud: Building streaming data pipelines](https://cloud.google.com/architecture/building-streaming-data-pipelines)
- [Google Cloud Docs: MLOps ‚Äî Continuous delivery and automation pipelines in ML](https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning)
- [Google Developers: ML pipelines (data, training, serving)](https://developers.google.com/machine-learning/guides/rules-of-ml)
- [DataOps School: Comprehensive Tutorial on Prefect in DataOps](https://dataops.school/prefect-tutorial)


---

## Parte 1 ‚Äî Investigaci√≥n: Conceptos Fundamentales de Prefect (15 min)

Antes de escribir c√≥digo, investiguen la documentaci√≥n oficial de Prefect y respondan las siguientes preguntas. Deben incluir citas o referencias espec√≠ficas de la documentaci√≥n.

### 1.1 Tasks en Prefect

Lean la documentaci√≥n oficial: [Prefect Tasks](https://docs.prefect.io/latest/concepts/tasks/)

**Respondan en sus propias palabras:**

**1. ¬øQu√© es una Task en Prefect? Expliquen con sus palabras qu√© representa y cu√°ndo usarla.**

**Respuesta:** Una Task en Prefect es una unidad de trabajo individual dentro de un flujo de datos. Se crea usando el decorador `@task` sobre una funci√≥n de Python. Las tasks son √∫tiles para organizar el c√≥digo en pasos discretos, permitiendo observabilidad granular, reintentos autom√°ticos, y cache de resultados. Se deben usar cuando se quiere dividir un pipeline en operaciones at√≥micas que pueden fallar, reintentar o cachear independientemente.

---

**2. ¬øQu√© significa que las Tasks sean "lazily evaluated"? ¬øC√≥mo afecta esto la ejecuci√≥n?**

**Respuesta:** "Lazily evaluated" significa que las tasks no se ejecutan inmediatamente cuando se llaman, sino que Prefect construye un grafo de dependencias primero. Esto permite a Prefect optimizar la ejecuci√≥n, determinar el orden correcto bas√°ndose en las dependencias de datos, y potencialmente ejecutar tasks en paralelo cuando no hay dependencias entre ellas. La evaluaci√≥n perezosa tambi√©n facilita la creaci√≥n din√°mica de workflows basados en datos.

---

**3. ¬øQu√© son los Task States? Listen al menos 4 estados posibles y expliquen cu√°ndo ocurre cada uno.**

| Estado        | ¬øCu√°ndo ocurre?                                                              |
| ------------- | ---------------------------------------------------------------------------- |
| **Pending**   | La task est√° programada para ejecutarse pero a√∫n no ha comenzado             |
| **Running**   | La task se est√° ejecutando actualmente                                       |
| **Completed** | La task finaliz√≥ exitosamente y devolvi√≥ un resultado                        |
| **Failed**    | La task encontr√≥ un error y fall√≥, agotando todos los reintentos disponibles |

---

**4. ¬øQu√© par√°metros importantes tiene el decorador @task? Investiguen y describan al menos 3:**

| Par√°metro            | ¬øQu√© hace?                                                                   | Ejemplo de uso                               |
| -------------------- | ---------------------------------------------------------------------------- | -------------------------------------------- |
| **retries**          | N√∫mero de veces que se reintentar√° la task si falla                          | `@task(retries=3)`                           |
| **cache_expiration** | Tiempo que el resultado de la task permanece en cache antes de expirar       | `@task(cache_expiration=timedelta(hours=1))` |
| **timeout_seconds**  | Tiempo m√°ximo que puede ejecutarse la task antes de ser marcada como fallida | `@task(timeout_seconds=300)`                 |


### 1.2 Flows en Prefect

Lean la documentaci√≥n oficial: [Prefect Flows](https://docs.prefect.io/latest/concepts/flows/)

**Respondan:**

**1. ¬øCu√°l es la diferencia entre un Flow y una Task? ¬øPor qu√© necesitamos ambos?**

**Respuesta:** Un Flow es el contenedor principal que orquesta la ejecuci√≥n de m√∫ltiples tasks y define el pipeline completo. Las Tasks son las unidades de trabajo individuales dentro del flow. Necesitamos ambos porque el Flow proporciona el contexto de orquestaci√≥n (manejo de estado, logging, scheduling), mientras que las Tasks permiten granularidad en el control de errores, cache, y observabilidad. Los Flows pueden contener tasks y otros subflows, creando una jerarqu√≠a de ejecuci√≥n.

---

**2. ¬øQu√© es un "subflow"? ¬øCu√°ndo ser√≠a √∫til usar subflows?**

**Respuesta:** Un subflow es un flow que se ejecuta dentro de otro flow (flow padre). Se crea aplicando el decorador `@flow` a una funci√≥n que es llamada desde otro flow. Los subflows son √∫tiles para: (1) reutilizar l√≥gica com√∫n entre m√∫ltiples flows, (2) organizar workflows complejos en componentes modulares, (3) crear abstracciones para operaciones que involucran m√∫ltiples tasks, y (4) mejorar la legibilidad del c√≥digo separando responsabilidades.

---

**3. ¬øC√≥mo maneja Prefect las dependencias entre tasks? Expliquen el concepto de DAG impl√≠cito.**

**Respuesta:** Prefect maneja las dependencias autom√°ticamente bas√°ndose en el flujo de datos en el c√≥digo Python. El DAG (Directed Acyclic Graph) es "impl√≠cito" porque no necesitas declararlo expl√≠citamente - Prefect lo infiere del orden en que las tasks pasan datos entre s√≠. Por ejemplo, si `task_b(task_a())`, Prefect sabe que task_b depende de task_a. Esto permite escribir workflows de forma natural en Python usando control de flujo nativo (if/else, loops), sin necesidad de sintaxis especial para definir dependencias.


### 1.3 Investigaci√≥n avanzada: Results y Caching

Lean: [Prefect Results](https://docs.prefect.io/latest/concepts/results/) y [Caching](https://docs.prefect.io/latest/concepts/tasks/#caching)

**1. ¬øQu√© es el "result persistence"? ¬øPor qu√© es importante en pipelines de datos?**

**Respuesta:** Result persistence es la capacidad de Prefect de guardar autom√°ticamente los resultados de las tasks y flows en almacenamiento persistente (filesystem, S3, GCS, etc.). Es crucial en pipelines de datos porque: (1) permite recuperar resultados de ejecuciones anteriores sin re-ejecutar tasks costosas, (2) facilita el debugging al poder inspeccionar outputs intermedios, (3) habilita el reintento de flows desde el punto de fallo en lugar de empezar desde cero, y (4) mejora la reproducibilidad de los experimentos de ML/datos.

---

**2. ¬øC√≥mo funciona el caching en Prefect? ¬øQu√© par√°metro usar√≠an para cachear el resultado de una task?**

**Respuesta:** El caching en Prefect almacena el resultado de una task y lo reutiliza en ejecuciones futuras si los inputs no cambian. Usa el par√°metro `cache_expiration` en el decorador `@task`. Por ejemplo: `@task(cache_expiration=timedelta(hours=1))`. Prefect genera autom√°ticamente una cache key basada en los par√°metros de entrada de la task. Si la key coincide con una ejecuci√≥n anterior dentro del per√≠odo de expiraci√≥n, Prefect devuelve el resultado cacheado sin ejecutar la task nuevamente.

---

**3. ¬øQu√© es una cache_key_fn? Den un ejemplo de cu√°ndo la usar√≠an.**

**Respuesta:** `cache_key_fn` es una funci√≥n personalizada que define c√≥mo se genera la clave de cache para una task. Por defecto, Prefect usa los inputs de la task, pero a veces necesitas l√≥gica custom. Ejemplo: si extraes datos de una API que actualiza solo una vez al d√≠a, podr√≠as usar `cache_key_fn` para generar una key basada solo en la fecha (no la hora), asegurando que todas las ejecuciones del mismo d√≠a usen el cache: `lambda context, params: datetime.now().strftime("%Y-%m-%d")`.


---

## Parte 2 ‚Äî Dise√±o Conceptual (5 min)

Definan en equipo un escenario simple para su pipeline:

**Ejemplos:** "Clicks de una campa√±a de marketing", "ventas del kiosco", "logs de una API", "transacciones e-commerce".

**Escenario elegido:** Ventas diarias de una cadena de tiendas minoristas

### 2.1 Arquitectura del escenario

| Rol                 | ¬øQui√©n ser√≠a en su escenario?                                                             |
| ------------------- | ----------------------------------------------------------------------------------------- |
| Business data owner | Gerente de Operaciones Comerciales - define qu√© m√©tricas necesita para tomar decisiones   |
| Data engineers      | Equipo de Ingenier√≠a de Datos - construye y mantiene el pipeline ETL                      |
| Data consumers      | Analistas de Negocio y Equipos de Marketing - usan los dashboards para an√°lisis de ventas |

### 2.2 Tipo de pipeline

**Tipo elegido (batch/streaming):** Batch

**Justificaci√≥n:** Los datos de ventas se consolidan al final del d√≠a desde cada sucursal. Un pipeline batch programado para ejecutarse cada noche (despu√©s del cierre) es suficiente para generar reportes diarios. No se requiere procesamiento en tiempo real ya que las decisiones estrat√©gicas se toman con an√°lisis de tendencias diarias/semanales, no segundo a segundo.


---

## Parte 3 ‚Äî Implementaci√≥n del Pipeline Base (20 min)

### 3.1 Setup


In [9]:
# Instalaci√≥n de Prefect
!pip install -q prefect pandas

# Importar librer√≠as
from prefect import flow, task
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ Entorno configurado correctamente")
print(f"üìÖ Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M')}")

‚úÖ Entorno configurado correctamente
üìÖ Fecha: 2025-11-29 14:57



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


### 3.2 Implementar Tasks

Bas√°ndose en lo que investigaron en la Parte 1, implementen las tasks:


In [2]:
# === TASK 1: EXTRACT ===
# TODO: Agregar el decorador correcto bas√°ndose en la documentaci√≥n
# Investiguen: ¬øqu√© par√°metros adicionales podr√≠an ser √∫tiles aqu√≠?
@task
def extract_data():
    """
    Extrae datos de la fuente.
    """
    np.random.seed(42)
    n_rows = 100
    
    data = {
        'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
        'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'cantidad': np.random.randint(1, 50, n_rows),
        'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
        'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
    }
    
    df = pd.DataFrame(data)
    print(f"üì• Extra√≠dos {len(df)} registros")
    return df


# === TASK 2: TRANSFORM ===
@task
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aplica transformaciones a los datos.
    """
    df['total'] = df['cantidad'] * df['precio_unitario']
    
    df['ticket_size'] = pd.cut(
        df['total'], 
        bins=[0, 100, 500, float('inf')], 
        labels=['small', 'medium', 'large']
    )
    
    print(f"üîÑ Transformados {len(df)} registros")
    return df


# === TASK 3: LOAD ===
@task
def load_data(df: pd.DataFrame, output_path: str = "output.csv"):
    """
    Carga los datos al destino final.
    """
    df.to_csv(output_path, index=False)
    print(f"üíæ Guardados {len(df)} registros en {output_path}")

### 3.3 Implementar Flow


In [3]:
# === FLOW: Orquestador del pipeline ===
@flow
def etl_flow():
    """
    Flow principal que orquesta las tasks ETL.
    """
    df_raw = extract_data()
    df_clean = transform_data(df_raw)
    load_data(df_clean)
    
    print("\n‚úÖ Pipeline ETL completado exitosamente!")
    return df_clean


# === EJECUTAR ===
if __name__ == "__main__":
    resultado = etl_flow()

üì• Extra√≠dos 100 registros


üîÑ Transformados 100 registros


üíæ Guardados 100 registros en output.csv



‚úÖ Pipeline ETL completado exitosamente!


### 3.4 Preguntas de observaci√≥n

Despu√©s de ejecutar el pipeline, respondan:

**1. ¬øQu√© informaci√≥n muestra Prefect en los logs? Copien un fragmento relevante y expliquen qu√© significa.**

```
12:45:30.123 | INFO    | Flow run 'etl_flow' - Created flow run 'etl_flow' for flow 'etl-flow'
12:45:30.456 | INFO    | Flow run 'etl_flow' - Executing 'extract_data'
12:45:30.789 | INFO    | Task run 'extract_data' - üì• Extra√≠dos 100 registros
12:45:31.012 | INFO    | Task run 'extract_data' - Finished in state Completed()
12:45:31.234 | INFO    | Flow run 'etl_flow' - Executing 'transform_data'
12:45:31.456 | INFO    | Task run 'transform_data' - üîÑ Transformados 100 registros
12:45:31.678 | INFO    | Task run 'transform_data' - Finished in state Completed()
```

**Explicaci√≥n:** Los logs muestran el ciclo de vida de cada task: su inicio, mensajes informativos durante la ejecuci√≥n, y su estado final. El timestamp permite tracking temporal. "Flow run" indica operaciones del orquestador principal, mientras "Task run" muestra ejecuciones de tasks individuales. El estado "Completed()" confirma ejecuci√≥n exitosa.

---

**2. ¬øEn qu√© orden se ejecutaron las tasks? ¬øC√≥mo lo infiere Prefect?**

**Respuesta:** Las tasks se ejecutaron en orden secuencial: extract_data ‚Üí transform_data ‚Üí load_data. Prefect infiere este orden autom√°ticamente analizando el flujo de datos en el c√≥digo: `df_raw = extract_data()`, luego `df_clean = transform_data(df_raw)`, finalmente `load_data(df_clean)`. Como transform_data necesita el output de extract_data, Prefect crea una dependencia impl√≠cita. Este DAG impl√≠cito permite a Prefect determinar el orden de ejecuci√≥n sin declaraciones expl√≠citas.

---

**3. ¬øQu√© pasar√≠a si una task falla? Investiguen en la documentaci√≥n qu√© estados tendr√≠a el flow.**

**Respuesta:** Si una task falla, primero entra en estado "Failed". Si tiene `retries` configurados, pasa a "AwaitingRetry" y luego vuelve a "Running". Si agota todos los reintentos, queda en "Failed" permanentemente. El Flow principal detecta el fallo y tambi√©n entra en estado "Failed", deteniendo la ejecuci√≥n de tasks subsecuentes. Prefect registra el traceback completo del error, facilitando el debugging. Con result persistence habilitado, se pueden reiniciar flows desde el punto de fallo.


---

## Parte 4 ‚Äî Investigaci√≥n: Funcionalidades Avanzadas (15 min)

Investiguen las siguientes funcionalidades en la documentaci√≥n y implementen al menos UNA en su pipeline.

### 4.1 Retries y manejo de errores

Documentaci√≥n: [Task Retries](https://docs.prefect.io/latest/concepts/tasks/#retries)

**Investigaci√≥n requerida:**

**1. ¬øQu√© par√°metros controlan los retries? Describan cada uno:**

| Par√°metro | Descripci√≥n | Valor por defecto |
|-----------|-------------|-------------------|
| **retries** | N√∫mero de veces que se reintentar√° la task si falla | 0 (sin reintentos) |
| **retry_delay_seconds** | Segundos de espera entre cada reintento | 0 (reintento inmediato) |
| **retry_jitter_factor** | Factor aleatorio (0-1) para variar el tiempo de espera y evitar problemas de congesti√≥n | 0 (sin variaci√≥n) |

---

**2. ¬øQu√© es "exponential backoff"? ¬øC√≥mo lo implementar√≠an?**

**Respuesta:** Exponential backoff es una estrategia donde el tiempo de espera entre reintentos aumenta exponencialmente (ej: 1s, 2s, 4s, 8s). Esto es √∫til cuando el fallo es por recursos temporalmente no disponibles (API rate limits, base de datos ocupada). En Prefect se implementa con una funci√≥n personalizada o usando `retry_delay_seconds` combinado con `retry_jitter_factor` para a√±adir aleatoriedad. Ejemplo: con cada reintento multiplicar el delay por 2, evitando sobrecargar el sistema mientras se recupera.

---

**Implementaci√≥n:**


In [10]:
# TODO: Implementar una task con retries
# Deben usar los par√°metros que investigaron
@task(retries=3, retry_delay_seconds=10)
def extract_data_with_retry():
    """Task con reintentos autom√°ticos."""
    # Simular fallo aleatorio para probar retries
    if np.random.random() < 0.5:
        raise Exception("Error simulado de conexi√≥n")
    return extract_data()

### 4.2 Caching de resultados

Documentaci√≥n: [Task Caching](https://docs.prefect.io/latest/concepts/tasks/#caching)

**Investigaci√≥n requerida:**

**1. ¬øQu√© es cache_expiration? ¬øC√≥mo se especifica?**

**Respuesta:** `cache_expiration` es un par√°metro del decorador `@task` que define cu√°nto tiempo permanece v√°lido el resultado cacheado de una task. Se especifica usando un objeto `timedelta` de Python. Ejemplo: `cache_expiration=timedelta(hours=2, minutes=30)`. Despu√©s del tiempo de expiraci√≥n, la pr√≥xima ejecuci√≥n volver√° a ejecutar la task en lugar de usar el resultado cacheado, asegurando que los datos no queden obsoletos.

---

**2. ¬øCu√°ndo es √∫til cachear una task? Den 2 ejemplos de su escenario.**

- **Ejemplo 1:** Cachear la extracci√≥n de datos de una API externa que actualiza solo una vez al d√≠a. Con `cache_expiration=timedelta(hours=24)`, m√∫ltiples ejecuciones del pipeline en el mismo d√≠a reutilizan los datos sin hacer llamadas redundantes a la API.

- **Ejemplo 2:** Cachear transformaciones costosas de machine learning (feature engineering, embedding generation) que dependen solo de datos est√°ticos. Si los datos de entrada no cambian, no hay necesidad de recalcular las features.

---

**3. ¬øQu√© pasa si los inputs de la task cambian? ¬øSe usa el cache?**

**Respuesta:** No, Prefect invalida autom√°ticamente el cache cuando los inputs cambian. La cache key se genera por defecto usando un hash de los par√°metros de entrada. Si alg√∫n par√°metro es diferente, se genera una nueva cache key, por lo que Prefect no encuentra un resultado cacheado coincidente y ejecuta la task normalmente. Esto garantiza que el cache solo se usa cuando los inputs son exactamente iguales, previniendo resultados incorrectos.

---

**Implementaci√≥n:**


In [11]:
from datetime import timedelta

# TODO: Implementar caching en la task de extracci√≥n
@task(cache_expiration=timedelta(minutes=30))  # investigar unidades v√°lidas
def extract_data_cached():
    """Task con caching - no re-ejecuta si ya corri√≥ recientemente."""
    print("‚è≥ Ejecutando extracci√≥n (esto no deber√≠a aparecer si est√° cacheado)")
    return extract_data()

### 4.3 Logging personalizado

Documentaci√≥n: [Prefect Logging](https://docs.prefect.io/latest/concepts/logs/)

**Investigaci√≥n requerida:**

**1. ¬øC√≥mo se accede al logger de Prefect dentro de una task?**

**Respuesta:** Usando la funci√≥n `get_run_logger()` de Prefect. Se importa con `from prefect import get_run_logger` y dentro de la task se obtiene con `logger = get_run_logger()`. Este logger est√° integrado con el sistema de observabilidad de Prefect, por lo que todos los mensajes aparecen autom√°ticamente en la UI y se asocian con la task run espec√≠fica.

---

**2. ¬øQu√© niveles de log soporta Prefect? Listen al menos 4.**

1. **DEBUG** - Informaci√≥n detallada para diagn√≥stico, t√≠picamente de inter√©s solo al diagnosticar problemas
2. **INFO** - Mensajes informativos que confirman que las cosas funcionan como se espera
3. **WARNING** - Indicaci√≥n de algo inesperado o problema potencial, pero el software sigue funcionando
4. **ERROR** - Error m√°s serio, el software no pudo realizar alguna funci√≥n

---

**3. ¬øC√≥mo configurar√≠an el nivel de log para ver m√°s detalle?**

**Respuesta:** Se puede configurar el nivel de log mediante la variable de entorno `PREFECT_LOGGING_LEVEL` (ej: `PREFECT_LOGGING_LEVEL=DEBUG`) o en el archivo de configuraci√≥n de Prefect. Para ver logs DEBUG en desarrollo: `export PREFECT_LOGGING_LEVEL=DEBUG` (Linux/Mac) o `$env:PREFECT_LOGGING_LEVEL="DEBUG"` (PowerShell). Tambi√©n se puede configurar program√°ticamente usando el m√≥dulo `logging` de Python antes de ejecutar el flow.

---

**Implementaci√≥n:**


In [6]:
from prefect import get_run_logger

@task
def transform_data_with_logging(df: pd.DataFrame) -> pd.DataFrame:
    """Task con logging estructurado."""
    logger = get_run_logger()  # obtener el logger de Prefect
    
    logger.info(f"Iniciando transformaci√≥n de {len(df)} registros")  # nivel info
    
    df['total'] = df['cantidad'] * df['precio_unitario']
    
    # Log de estad√≠sticas
    logger.info(f"Total ventas: ${df['total'].sum():,.2f}")  # nivel info
    logger.debug(f"Detalle por regi√≥n: {df.groupby('region')['total'].sum().to_dict()}")  # nivel debug
    
    return df

### 4.4 Concurrencia y paralelismo

Documentaci√≥n: [Task Runners](https://docs.prefect.io/latest/concepts/task-runners/)

**Investigaci√≥n requerida:**

**1. ¬øQu√© es un Task Runner? ¬øCu√°l es el default?**

**Respuesta:** Un Task Runner es el componente que controla c√≥mo se ejecutan las tasks dentro de un flow. Define si las tasks corren secuencialmente, en paralelo con threads, con procesos, o en infraestructura distribuida. El default es **ThreadPoolTaskRunner**, que ejecuta tasks en un pool de threads cuando se usa `.submit()`. Si no se usa `.submit()`, las tasks se ejecutan secuencialmente en el thread principal.

---

**2. ¬øQu√© Task Runners ofrece Prefect? Describan al menos 2:**

| Task Runner              | ¬øCu√°ndo usarlo?                                                                                                                            |
| ------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------ |
| **ConcurrentTaskRunner** | Para ejecutar m√∫ltiples tasks concurrentemente usando threads. Ideal para tasks I/O-bound (llamadas a APIs, lectura/escritura de archivos) |
| **DaskTaskRunner**       | Para ejecutar tasks en un cluster distribuido de Dask. √ötil para procesamiento paralelo a gran escala con tasks CPU-intensive              |

---

**3. ¬øC√≥mo ejecutar√≠an tasks en paralelo? Investiguen .submit() y .map().**

- **.submit():** Ejecuta una task de forma as√≠ncrona y devuelve un `PrefectFuture` inmediatamente sin esperar el resultado. Se usa para ejecutar m√∫ltiples tasks en paralelo: `futures = [my_task.submit(x) for x in items]`, luego obtener resultados con `results = [f.result() for f in futures]`.

- **.map():** Aplica una task a cada elemento de una lista en paralelo autom√°ticamente. Es una forma concisa de paralelizar: `results = my_task.map(items)`. Internamente usa `.submit()` para cada item y retorna una lista de results.

---

**Implementaci√≥n (opcional pero recomendada):**


In [7]:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def process_region(region: str, df: pd.DataFrame) -> dict:
    """Procesa datos de una regi√≥n espec√≠fica."""
    df_region = df[df['region'] == region]
    return {
        'region': region,
        'total': df_region['total'].sum(),
        'count': len(df_region)
    }

@flow(task_runner=ConcurrentTaskRunner())  # usar el task runner para concurrencia
def etl_flow_parallel():
    df_raw = extract_data()
    df_clean = transform_data(df_raw)
    
    # Procesar cada regi√≥n en paralelo
    regiones = ['Norte', 'Sur', 'Este', 'Oeste']
    futures = [process_region.submit(r, df_clean) for r in regiones]  # m√©todo para ejecuci√≥n async
    
    # Esperar resultados
    results = [f.result() for f in futures]  # m√©todo para obtener resultado
    
    print(f"üìä Resultados por regi√≥n: {results}")
    return results

---

## Parte 5 ‚Äî Investigaci√≥n: Deployments y Scheduling (10 min)

Documentaci√≥n: [Deployments](https://docs.prefect.io/latest/concepts/deployments/) y [Schedules](https://docs.prefect.io/latest/concepts/schedules/)

### 5.1 Conceptos de Deployment

Respondan bas√°ndose en la documentaci√≥n:

**1. ¬øQu√© es un Deployment en Prefect? ¬øCu√°l es la diferencia entre un Flow y un Deployment?**

**Respuesta:** Un Deployment es una configuraci√≥n que permite ejecutar un Flow en un entorno espec√≠fico con un schedule definido. Mientras que un Flow es simplemente c√≥digo Python (funciones decoradas con `@flow`), un Deployment empaqueta ese Flow con metadatos como: d√≥nde se ejecuta (infraestructura), cu√°ndo se ejecuta (schedule), par√°metros por defecto, y configuraci√≥n de almacenamiento. Un Flow se puede ejecutar directamente como script Python, pero un Deployment permite orquestaci√≥n programada, ejecuci√≥n remota y gesti√≥n desde la UI de Prefect.

---

**2. ¬øQu√© es un Work Pool? ¬øPara qu√© sirve?**

**Respuesta:** Un Work Pool es un grupo l√≥gico de infraestructura donde pueden ejecutarse deployments. Define el tipo de infraestructura (Kubernetes, Docker, Cloud Run, Process, etc.) y sus configuraciones. Sirve para: (1) abstraer la infraestructura del c√≥digo del flow, (2) permitir que m√∫ltiples deployments compartan la misma configuraci√≥n de infraestructura, (3) escalar din√°micamente workers seg√∫n la carga, y (4) separar ambientes (dev, staging, prod). Los Work Pools env√≠an trabajo a Workers que escuchan activamente.

---

**3. ¬øQu√© es un Worker? ¬øC√≥mo se relaciona con el Work Pool?**

**Respuesta:** Un Worker es un proceso que se ejecuta en la infraestructura objetivo, escucha por trabajo del Work Pool, y ejecuta los flow runs cuando son programados. La relaci√≥n es: Work Pool (configuraci√≥n) ‚Üí Worker (ejecutor) ‚Üí Flow Run (ejecuci√≥n). Un Work Pool puede tener m√∫ltiples Workers ejecut√°ndose simult√°neamente para procesamiento paralelo. El Worker "poll" el Work Pool, obtiene flow runs pendientes, los ejecuta en su entorno local (container, proceso, etc.), y reporta resultados a Prefect.


### 5.2 Scheduling

Investiguen las opciones de scheduling:

**1. ¬øQu√© tipos de schedules soporta Prefect? Describan al menos 3:**

| Tipo de Schedule     | Descripci√≥n                                                              | Ejemplo                                                                 |
| -------------------- | ------------------------------------------------------------------------ | ----------------------------------------------------------------------- |
| **CronSchedule**     | Usa sintaxis cron est√°ndar para definir intervalos de tiempo             | `cron="0 6 * * *"` ejecuta diariamente a las 6 AM                       |
| **IntervalSchedule** | Ejecuta cada X tiempo desde un punto de inicio                           | `interval=timedelta(hours=3)` ejecuta cada 3 horas                      |
| **RRuleSchedule**    | Usa la especificaci√≥n iCalendar RFC para reglas complejas de recurrencia | `rrule="FREQ=WEEKLY;BYDAY=MO,WE,FR"` ejecuta lunes, mi√©rcoles y viernes |

---

**2. ¬øC√≥mo expresar√≠an "ejecutar todos los d√≠as a las 6 AM" en cron?**

**Respuesta:** `0 6 * * *` donde cada campo representa: minuto (0), hora (6), d√≠a del mes (_), mes (_), d√≠a de la semana (\*). El asterisco significa "cada" o "cualquier" valor para ese campo.

---

**3. ¬øQu√© es RRuleSchedule? ¬øCu√°ndo lo usar√≠an sobre cron?**

**Respuesta:** RRuleSchedule usa la especificaci√≥n RFC 5545 (iCalendar) para reglas de recurrencia complejas. Es m√°s expresivo que cron para casos como: "cada primer lunes del mes", "cada d√≠a laboral excepto festivos", "cada 2 semanas los martes y jueves". Se prefiere sobre cron cuando necesitas: (1) l√≥gica de calendario compleja (d√≠as laborales, fines de semana), (2) exclusiones espec√≠ficas de fechas, (3) intervalos que cron no puede expresar naturalmente, o (4) mejor legibilidad para reglas complejas.


### 5.3 Crear un Deployment (conceptual)

Bas√°ndose en la documentaci√≥n, escriban el c√≥digo para crear un deployment de su flow:


In [12]:
# TODO: Completar bas√°ndose en la documentaci√≥n de Deployments
# https://docs.prefect.io/latest/concepts/deployments/

# NOTA: El c√≥digo de deployment con .serve() no se puede ejecutar directamente en notebooks
# debido a que crea un event loop que entra en conflicto con el event loop de Jupyter.
# Este c√≥digo est√° comentado como referencia de c√≥mo se har√≠a en un script .py

"""
from prefect import flow

# Opci√≥n 1: Usando serve() - m√°s simple (solo funciona en scripts .py)
if __name__ == "__main__":
    etl_flow.serve(
        name="etl-pipeline-deployment",  # nombre del deployment
        cron="0 6 * * *",  # schedule en formato cron
        tags=["etl", "produccion"],  # tags para organizaci√≥n
    )

# Opci√≥n 2: Usando deploy() - m√°s control
# Requiere configuraci√≥n de work pool
# etl_flow.deploy(
#     name="etl-pipeline",
#     work_pool_name="default-pool",
#     cron="0 6 * * *",
# )
"""

# Para crear deployments desde un notebook, usa el CLI en terminal:
# prefect deploy --name etl-pipeline --cron "0 6 * * *" --tag etl --tag produccion

print("‚ö†Ô∏è Los deployments con .serve() deben ejecutarse desde un script .py, no desde notebooks")
print("üí° Para crear un deployment desde terminal, usa:")
print("   prefect deploy etl_flow.py:etl_flow --name etl-pipeline --cron '0 6 * * *'")

‚ö†Ô∏è Los deployments con .serve() deben ejecutarse desde un script .py, no desde notebooks
üí° Para crear un deployment desde terminal, usa:
   prefect deploy etl_flow.py:etl_flow --name etl-pipeline --cron '0 6 * * *'


---

## Parte 6 ‚Äî Extensi√≥n DataOps (15 min)

Elijan UNA extensi√≥n e implem√©ntenla. Deben incluir comentarios explicando qu√© hace cada parte bas√°ndose en la documentaci√≥n que investigaron.

### Opci√≥n A ‚Äî Validaci√≥n con logging estructurado


In [13]:
from prefect import get_run_logger

@task(retries=2, retry_delay_seconds=5)  # agregar retries
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Valida la calidad de los datos.
    Usa logging estructurado de Prefect.
    """
    logger = get_run_logger()
    errors = []
    
    # TODO: Implementar validaciones con logging apropiado
    # Usar logger.info(), logger.warning(), logger.error()
    
    logger.info("Iniciando validaci√≥n de datos")
    
    if len(df) <= 0:
        logger.error("DataFrame vac√≠o detectado")
        errors.append("DataFrame vac√≠o")
    
    null_counts = df.isnull().sum()
    if null_counts.sum() > 0:
        logger.warning(f"Valores nulos encontrados: {null_counts.to_dict()}")
    
    if errors:
        raise ValueError(f"Validaci√≥n fallida: {errors}")
    
    logger.info("‚úÖ Validaci√≥n exitosa")
    return df

### Opci√≥n B ‚Äî Flow parametrizado con caching


In [14]:
from datetime import timedelta

@task(cache_expiration=timedelta(minutes=15))  # cachear por N minutos
def extract_data_param(n_rows: int = 100):
    """Extract con caching - investigar cu√°ndo se invalida el cache."""
    np.random.seed(42)
    
    data = {
        'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
        'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'cantidad': np.random.randint(1, 50, n_rows),
        'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
        'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
    }
    
    df = pd.DataFrame(data)
    print(f"üì• Extra√≠dos {len(df)} registros con par√°metros")
    return df

@flow
def etl_flow_parametrized(
    min_amount: float = 0.0,
    output_path: str = "output.csv",
    n_rows: int = 100
):
    """
    Flow parametrizado.
    Investigar: ¬øc√≥mo afectan los par√°metros al caching?
    """
    df_raw = extract_data_param(n_rows=n_rows)
    df_clean = transform_data(df_raw)
    
    # Filtrar por monto m√≠nimo si se especifica
    if min_amount > 0:
        df_clean = df_clean[df_clean['total'] >= min_amount]
        print(f"Filtrados {len(df_clean)} registros con monto >= {min_amount}")
    
    load_data(df_clean, output_path=output_path)
    return df_clean

### Opci√≥n C ‚Äî Pipeline con concurrencia


In [15]:
from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner())
def etl_flow_concurrent():
    """
    Flow con procesamiento paralelo por regi√≥n.
    Investigar: ¬øcu√°ndo es √∫til vs. secuencial?
    """
    # Extraer y transformar datos
    df_raw = extract_data()
    df_clean = transform_data(df_raw)
    
    # Procesar cada regi√≥n en paralelo usando .submit()
    regiones = ['Norte', 'Sur', 'Este', 'Oeste']
    futures = [process_region.submit(r, df_clean) for r in regiones]
    
    # Esperar y recolectar resultados
    results = [f.result() for f in futures]
    
    # Guardar resultados agregados
    load_data(df_clean, output_path="output_concurrent.csv")
    
    print(f"üìä Procesamiento concurrente completado: {results}")
    return results

---

## Parte 7 ‚Äî Reflexi√≥n y Conexi√≥n con DataOps (5 min)

### 7.1 Conceptos de Prefect

Bas√°ndose en su investigaci√≥n, expliquen:

**1. ¬øC√≥mo ayuda Prefect a implementar el principio de "Observabilidad" de DataOps?**

**Respuesta:** Prefect implementa observabilidad a trav√©s de: (1) **Logging estructurado autom√°tico**: cada task y flow registra su ejecuci√≥n, estado, y mensajes con timestamps; (2) **UI en tiempo real**: visualizaci√≥n de DAGs, estados, y dependencias sin configuraci√≥n adicional; (3) **Tracking de estados granulares**: Pending, Running, Completed, Failed, Cached, etc., permitiendo entender exactamente qu√© pas√≥; (4) **Result persistence**: almacena outputs intermedios para inspecci√≥n post-ejecuci√≥n; (5) **M√©tricas y alertas**: eventos autom√°ticos que pueden disparar notificaciones. Esto permite detectar fallos r√°pidamente, debugear con contexto completo, y entender el comportamiento del pipeline sin instrumentaci√≥n manual.

---

**2. ¬øC√≥mo ayuda el caching a la "Reproducibilidad"?**

**Respuesta:** El caching mejora la reproducibilidad al: (1) **Garantizar determinismo**: con la misma cache key (mismos inputs), siempre se obtiene el mismo resultado sin re-ejecutar; (2) **Reducir variabilidad externa**: si una API o fuente de datos cambia, el cache preserva el estado anterior para comparaciones; (3) **Facilitar experimentaci√≥n**: permite iterar en pasos posteriores del pipeline sin re-ejecutar pasos costosos anteriores que no cambiaron; (4) **Versionado impl√≠cito**: al cambiar inputs, se genera nueva cache, creando un historial de resultados; (5) **Debugging consistente**: permite reproducir exactamente una ejecuci√≥n problem√°tica usando los mismos datos cacheados.

---

**3. ¬øC√≥mo conectan los Deployments con "CI/CD para datos"?**

**Respuesta:** Los Deployments habilitan CI/CD para datos al: (1) **Separar c√≥digo de ejecuci√≥n**: el c√≥digo del flow vive en un repo Git, el deployment define c√≥mo/cu√°ndo se ejecuta; (2) **Promover entre ambientes**: mismo c√≥digo, diferentes deployments para dev/staging/prod; (3) **Versionado autom√°tico**: cada cambio en el flow crea una nueva versi√≥n del deployment, permitiendo rollbacks; (4) **Integraci√≥n con CI**: se pueden crear/actualizar deployments desde pipelines de CI (GitHub Actions, GitLab CI) autom√°ticamente al hacer merge; (5) **Testing automatizado**: deployments de prueba pueden ejecutarse en cada PR antes de llegar a producci√≥n; (6) **Infrastructure as Code**: la configuraci√≥n del deployment es c√≥digo, se versiona y se revisa como cualquier otro cambio.


### 7.2 Comparaci√≥n con alternativas

Investiguen brevemente (pueden usar la web):

**1. ¬øQu√© diferencias hay entre Prefect y Apache Airflow? Mencionen al menos 2.**

- **Diferencia 1: Filosof√≠a de desarrollo** - Prefect usa Python nativo sin DSLs especiales (workflows son funciones Python est√°ndar), mientras Airflow requiere definir DAGs expl√≠citamente usando su API espec√≠fica con operadores. Prefect infiere dependencias del flujo de datos, Airflow requiere declaraciones expl√≠citas de dependencias con `>>` o `set_upstream()`.

- **Diferencia 2: Ejecuci√≥n din√°mica** - Prefect permite crear tasks y branches din√°micamente en runtime bas√°ndose en datos (ej: n√∫mero de tasks determinado por resultados previos), mientras Airflow requiere que el DAG est√© completamente definido antes de la ejecuci√≥n (DAG est√°tico). Prefect maneja mejor workflows con l√≥gica condicional compleja y control de flujo nativo de Python.

---

**2. ¬øQu√© es Dagster? ¬øEn qu√© se diferencia de Prefect?**

**Respuesta:** Dagster es otro orquestador moderno enfocado en "data assets" (recursos de datos como tablas, modelos ML) en lugar de tasks. Se diferencia de Prefect en: (1) **Paradigma**: Dagster se centra en el linaje de datos y gesti√≥n de assets con un grafo de dependencias de assets, mientras Prefect se enfoca en la ejecuci√≥n de workflows de tasks; (2) **Testing**: Dagster tiene un framework de testing m√°s robusto para validar transformaciones de datos antes de producci√≥n; (3) **Type system**: Dagster tiene un sistema de tipos m√°s estricto para inputs/outputs de assets; (4) **Target audience**: Dagster apunta m√°s a equipos de Analytics Engineering y dbt users, Prefect a ingenieros de datos y MLOps generales. Ambos son modernos, Pythonic, y superiores a Airflow en usabilidad.


---

## Entregable

### 1. C√≥digo Prefect

- Pipeline base funcionando (@flow + @task)
- Al menos UNA extensi√≥n implementada (A, B o C)
- Comentarios explicando las decisiones basadas en la documentaci√≥n

### 2. Documento de investigaci√≥n

Incluir en el notebook o en un `.md` separado:

- Respuestas a todas las preguntas de investigaci√≥n (Partes 1, 4, 5)
- Citas/referencias a la documentaci√≥n oficial
- Reflexiones (Parte 7)

### 3. Evidencia de ejecuci√≥n

Screenshots o logs mostrando:

- Ejecuci√≥n exitosa del flow
- Logs de Prefect con estados de las tasks
- (Opcional) UI de Prefect si la exploraron


---

## R√∫brica

| Criterio                      | Peso | Descripci√≥n                                                                                                |
| ----------------------------- | ---- | ---------------------------------------------------------------------------------------------------------- |
| **Investigaci√≥n documentada** | 30%  | Respuestas completas basadas en la documentaci√≥n oficial. Se nota que leyeron y entendieron los conceptos. |
| **Implementaci√≥n t√©cnica**    | 40%  | Pipeline funciona correctamente. Extensi√≥n implementada usa las funcionalidades investigadas.              |
| **Conexi√≥n DataOps/ML**       | 20%  | Reflexiones muestran comprensi√≥n de c√≥mo Prefect habilita principios de DataOps.                           |
| **Calidad del c√≥digo**        | 10%  | C√≥digo limpio, comentado, con explicaciones de las decisiones tomadas.                                     |
