# Parte 1 ‚Äî Investigaci√≥n: Conceptos Fundamentales de Prefect

## 1.1 Tasks en Prefect

### 1. ¬øQu√© es una Task en Prefect?
Una **Task** es una unidad de trabajo dentro de un pipeline. B√°sicamente es una funci√≥n que hace algo espec√≠fico (extraer datos, transformar, validar, guardar, etc.) y Prefect la puede monitorear, reintentar si falla y registrar su estado.  
La usamos cuando queremos dividir el pipeline en pasos claros y reutilizables.

### 2. ¬øQu√© significa que las Tasks sean ‚Äúlazily evaluated‚Äù?  
Significa que **no se ejecutan cuando las llam√°s en el c√≥digo**, sino reci√©n cuando corre el *flow*.  
Prefect primero arma el grafo de dependencias (el DAG) y despu√©s ejecuta todo en el orden correcto.  
Esto permite que el pipeline sea m√°s eficiente y flexible.

### 3. ¬øQu√© son los Task States?  
Son los **estados** que indican en qu√© etapa est√° una Task durante su ejecuci√≥n.

| Estado      | ¬øCu√°ndo ocurre? |
|-------------|------------------|
| **PENDING** | La task est√° lista, pero todav√≠a no empez√≥ a ejecutarse. |
| **RUNNING** | La task se est√° ejecutando en ese momento. |
| **COMPLETED** | Termin√≥ bien sin errores. |
| **FAILED** | Fall√≥ durante la ejecuci√≥n. |
| **RETRYING** | Fall√≥ pero tiene reintentos configurados y volver√° a intentarse. |

## 1.2 Flows en Prefect

### 1. ¬øCu√°l es la diferencia entre un Flow y una Task? ¬øPor qu√© necesitamos ambos?

Un **Flow** es el ‚Äúpipeline completo‚Äù: coordina y ordena todos los pasos.  
Una **Task** es un paso individual dentro de ese pipeline.

Necesitamos ambos porque:
- Las Tasks dividen el trabajo en partes peque√±as y reutilizables.
- El Flow decide el orden, manejo de dependencias y ejecuci√≥n general.

### 2. ¬øQu√© es un "subflow"? ¬øCu√°ndo ser√≠a √∫til usar subflows?

Un **subflow** es un Flow que se llama desde dentro de otro Flow.  
Sirve para modularizar el pipeline, reutilizar partes completas o separar l√≥gicas que se repiten (por ejemplo: `extract_and_validate()`).

### 3. ¬øC√≥mo maneja Prefect las dependencias entre tasks? (DAG impl√≠cito)

Prefect crea el DAG **autom√°ticamente**:  
cuando una Task usa el resultado de otra (`transform(extract())`), Prefect detecta esa dependencia sin que la declares expl√≠citamente.

As√≠ arma un **DAG impl√≠cito**, ejecutando cada Task en el orden correcto seg√∫n lo que necesita.


## 1.3 Investigaci√≥n avanzada: Results y Caching

### 1. ¬øQu√© es el "result persistence"? ¬øPor qu√© es importante en pipelines de datos?

Es la capacidad de Prefect de **guardar y recuperar resultados de tasks** (por ejemplo en archivos, S3, bases de datos o memoria).

Es importante porque:
- Evita recalcular pasos costosos
- Permite reanudar un pipeline que fall√≥
- Facilita debugging y auditor√≠a
- Mejora reproducibilidad

### 2. ¬øC√≥mo funciona el caching en Prefect? ¬øQu√© par√°metro usar√≠an para cachear el resultado de una task?

Prefect evita ejecutar una task si ya tiene un **resultado v√°lido en cach√©** generado con los mismos par√°metros.

Se habilita usando:
- `cache_expiration` (tiempo de validez)
- **o** `cache_key_fn` (clave de cach√© personalizada)

### 3. ¬øQu√© es una `cache_key_fn`? Den un ejemplo de cu√°ndo la usar√≠an.

Es una funci√≥n que genera **manual y expl√≠citamente** la clave de cach√© de una task.  
Sirve cuando no quer√©s que el cach√© dependa de *todos* los par√°metros, sino solo de algunos.

**Ejemplo:**  
Cachear una extracci√≥n diaria usando solo la fecha:

```python
def key_fn(_, date):
    return f"extract-{date}"
```

As√≠, si ya extrajiste los datos del d√≠a 2025-10-01, no se vuelve a ejecutar la tarea.

# Parte 2 ‚Äî Dise√±o Conceptual

## 2.1 Arquitectura del escenario

**Escenario elegido:** Transacciones diarias de un e-commerce.

| Rol                 | ¬øQui√©n ser√≠a en su escenario? |
|--------------------|-------------------------------|
| Business data owner | Equipo de ventas del e-commerce |
| Data engineers      | Equipo de ingenier√≠a de datos que mantiene el pipeline ETL |
| Data consumers      | Analistas de negocio y dashboards de reporting |

## 2.2 Tipo de pipeline

- **Tipo elegido (batch/streaming):** Batch  
- **Justificaci√≥n:**  
  Las ventas se procesan de manera diaria. No necesitamos procesamiento en tiempo real, y el enfoque batch es m√°s eficiente para an√°lisis agregados, validaci√≥n de calidad de datos y reintentos controlados. Adem√°s, simplifica el dise√±o del ETL y reduce costos operativos.


# Parte 3 ‚Äî Implementaci√≥n del Pipeline Base

In [None]:
# 3.1 Setup

# Instalaci√≥n de Prefect
!pip install -q prefect pandas

# Importar librer√≠as
from prefect import flow, task
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ Entorno configurado correctamente")
print(f"üìÖ Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M')}")

‚úÖ Entorno configurado correctamente
üìÖ Fecha: 2025-12-02 21:31


In [None]:
# 3.2 Implementar Tasks

# === TASK 1: EXTRACT ===
# Usamos tags y log_prints para que todo quede registrado en Prefect

@task(tags=["extract"], log_prints=True)
def extract_data():
    """
    Extrae datos de la fuente.
    """
    np.random.seed(42)
    n_rows = 100

    data = {
        'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
        'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'cantidad': np.random.randint(1, 50, n_rows),
        'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
        'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
    }

    df = pd.DataFrame(data)
    print(f"üì• Extra√≠dos {len(df)} registros")
    return df


# === TASK 2: TRANSFORM ===
@task(tags=["transform"], log_prints=True)
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aplica transformaciones a los datos.
    """
    df['total'] = df['cantidad'] * df['precio_unitario']

    df['ticket_size'] = pd.cut(
        df['total'],
        bins=[0, 100, 500, float('inf')],
        labels=['small', 'medium', 'large']
    )

    print(f"üîÑ Transformados {len(df)} registros")
    return df


# === TASK 3: LOAD ===
@task(tags=["load"], log_prints=True, retries=2, retry_delay_seconds=3)
def load_data(df: pd.DataFrame, output_path: str = "output.csv"):
    """
    Carga los datos al destino final.
    """
    df.to_csv(output_path, index=False)
    print(f"üíæ Guardados {len(df)} registros en {output_path}")


In [5]:
# 3.3 Implementar Flow
 
# === FLOW: Orquestador del pipeline ===

@flow(name="ETL Pipeline Ventas", log_prints=True)
def etl_flow():
    """
    Flow principal que orquesta las tasks ETL.
    """
    df_raw = extract_data()
    df_clean = transform_data(df_raw)
    load_data(df_clean)

    print("\n‚úÖ Pipeline ETL completado exitosamente!")
    return df_clean


# === EJECUTAR ===
if __name__ == "__main__":
    resultado = etl_flow()


## 3.4 Preguntas de observaci√≥n

### 1. ¬øQu√© informaci√≥n muestra Prefect en los logs? Copien un fragmento y expl√≠quenlo.

**Fragmento real de mis logs:**

INFO | Flow run 'intelligent-swift' - Beginning flow run 'ETL Pipeline Ventas'
INFO | Task run 'extract_data-daa' - üì• Extra√≠dos 100 registros
INFO | Task run 'extract_data-daa' - Finished in state Completed()
INFO | Task run 'transform_data-c9c' - üîÑ Transformados 100 registros
INFO | Task run 'transform_data-c9c' - Finished in state Completed()
INFO | Task run 'load_data-49f' - üíæ Guardados 100 registros en output.csv
INFO | Task run 'load_data-49f' - Finished in state Completed()
INFO | Flow run 'intelligent-swift' - Finished in state Completed()


**Explicaci√≥n:**  
Los logs muestran cada paso del pipeline: el inicio del flow, la ejecuci√≥n de cada task, los prints internos y el estado final de cada una (`Completed`). Tambi√©n muestra cu√°ntos registros se procesaron y cu√°ndo finaliza todo el ETL. Es informaci√≥n clave para monitorear el pipeline y detectar errores.

### 2. ¬øEn qu√© orden se ejecutaron las tasks? ¬øC√≥mo lo infiere Prefect?

El orden fue:

1. `extract_data`
2. `transform_data`
3. `load_data`

Prefect lo infiere autom√°ticamente porque construye un **DAG impl√≠cito**:  
cada task recibe como par√°metro la salida de la anterior, por lo que entiende que esas dependencias deben respetarse y ejecuta en ese orden sin que uno lo tenga que especificar manualmente.

### 3. ¬øQu√© pasar√≠a si una task falla? ¬øQu√© estados tendr√≠a el flow?

Si una task falla:

- La task entra en estado **FAILED** o **RETRYING** si tiene reintentos.
- Todas las tasks que dependan de ella no se ejecutan.
- El flow completo termina en **FAILED** o **CRASHED**, seg√∫n el tipo de error.

Prefect detiene autom√°ticamente el pipeline para evitar resultados inconsistentes.


# Parte 4 ‚Äî Investigaci√≥n: Funcionalidades Avanzadas

## 4.1 Retries y manejo de errores

### 1. ¬øQu√© par√°metros controlan los retries?

| Par√°metro | Descripci√≥n | Valor por defecto |
|----------|-------------|-------------------|
| `retries` | N√∫mero de veces que se intentar√° repetir la task si falla. | `0` |
| `retry_delay_seconds` | Tiempo fijo entre reintentos. | `0` |
| `retry_jitter_factor` | Factor que agrega aleatoriedad al delay para evitar colisiones. | `0` |

### 2. ¬øQu√© es exponential backoff?

Exponential backoff es una estrategia donde cada reintento espera el doble que el anterior  
(1s ‚Üí 2s ‚Üí 4s ‚Üí 8s‚Ä¶).  
Sirve para no saturar APIs/servidores cuando hay fallos repetidos.

En Prefect se puede lograr usando `retry_delay_seconds` junto con `retry_jitter_factor`,  
o implementando un delay creciente dentro de la l√≥gica de la task.

In [6]:
@task(retries=3, retry_delay_seconds=2, log_prints=True)
def extract_data_with_retry():
    """Task con reintentos autom√°ticos."""
    # Simular fallo aleatorio para probar retries
    if np.random.random() < 0.5:
        raise Exception("Error simulado de conexi√≥n")
    return "datos extra√≠dos"

In [7]:
@flow
def test_retry_flow():
    return extract_data_with_retry()

test_retry_flow()

'datos extra√≠dos'

## 4.2 Caching de resultados

### 1. ¬øQu√© es `cache_expiration`? ¬øC√≥mo se especifica?

`cache_expiration` define cu√°nto tiempo es v√°lido el resultado cacheado de una task.
Si la task corre nuevamente dentro de ese tiempo, Prefect no la re-ejecuta y devuelve el resultado almacenado.

Se especifica usando un `timedelta`, por ejemplo:

```python
@task(cache_expiration=timedelta(minutes=10))
```

### 2. ¬øCu√°ndo es √∫til cachear una task? Den 2 ejemplos de su escenario.

**Ejemplo 1:**
Cuando la extracci√≥n de datos es costosa (por ejemplo, acceder a una API lenta o pagada).

**Ejemplo 2:**
Cuando los datos no cambian todo el tiempo (por ejemplo, ventas del d√≠a que solo se actualizan una vez por d√≠a).

### 3. ¬øQu√© pasa si los inputs de la task cambian? ¬øSe usa el cache?

**No.**
Si los inputs de la task cambian, Prefect genera una clave de cach√© distinta y **vuelve a ejecutar la task.**
El cache solo se usa cuando los par√°metros de entrada y el tiempo de expiraci√≥n coinciden.

In [8]:
from datetime import timedelta

# Implementaci√≥n del caching en la task de extracci√≥n
@task(cache_expiration=timedelta(minutes=5), log_prints=True)
def extract_data_cached():
    """Task con caching ‚Äî no re-ejecuta si ya corri√≥ recientemente."""
    print("‚è≥ Ejecutando extracci√≥n (esto NO deber√≠a aparecer si est√° cacheado)")
    return extract_data()

In [11]:
@flow
def test_cache_flow():
    return extract_data_cached()

test_cache_flow()
test_cache_flow()   # segunda ejecuci√≥n ‚Üí deber√≠a evitar la extracci√≥n


Unnamed: 0,fecha,producto,cantidad,precio_unitario,region
0,2024-01-01,C,18,93.67,Oeste
1,2024-01-02,D,26,82.73,Norte
2,2024-01-03,A,44,67.01,Norte
3,2024-01-04,C,34,88.43,Oeste
4,2024-01-05,C,10,82.33,Norte
...,...,...,...,...,...
95,2024-04-05,B,13,69.72,Sur
96,2024-04-06,B,32,10.46,Oeste
97,2024-04-07,D,7,24.47,Este
98,2024-04-08,B,22,59.39,Sur


## 4.3 Logging personalizado

### 1. ¬øC√≥mo se accede al logger de Prefect dentro de una task?

Usando la funci√≥n:

```python
from prefect import get_run_logger
logger = get_run_logger()
```

### 2. ¬øQu√© niveles de log soporta Prefect? Listen al menos 4.

1. logger.debug()
2. logger.info()
3. logger.warning()
4. logger.error()

### 3. ¬øC√≥mo configurar√≠an el nivel de log para ver m√°s detalle?

En el decorador de la task o del flow, usando:

```python
@flow(log_prints=True)
```

In [12]:
from prefect import get_run_logger

@task
def transform_data_with_logging(df: pd.DataFrame) -> pd.DataFrame:
    """Task con logging estructurado."""
    
    logger = get_run_logger()   # obtener el logger de Prefect

    logger.info(f"Iniciando transformaci√≥n de {len(df)} registros")  # nivel info

    df['total'] = df['cantidad'] * df['precio_unitario']

    # Log de estad√≠sticas
    logger.info(f"Total ventas: ${df['total'].sum():,.2f}")  # nivel info
    logger.info(f"Detalle por regi√≥n: {df.groupby('region')['total'].sum().to_dict()}")

    return df


## 4.4 Concurrencia y paralelismo

### 1. ¬øQu√© es un Task Runner? ¬øCu√°l es el default?

Un **Task Runner** es el componente que define c√≥mo se ejecutan las tasks dentro de un flow:
- secuencialmente,
- en paralelo,
- con hilos,
- con procesos, etc.

El **default** en Prefect 2.x es: `ConcurrentTaskRunner`.

### 2. ¬øQu√© Task Runners ofrece Prefect? Describan al menos 2:

| Task Runner | ¬øCu√°ndo usarlo? |
|----------|-------------|
| `ConcurrentTaskRunner` | Para ejecutar tasks en paralelo usando threads. Ideal para I/O (APIs, lectura/escritura). |
| `SequentialTaskRunner` | Para ejecutar tasks una por una. √ötil para debugging o pipelines sin paralelismo. |

### 3. ¬øC√≥mo ejecutar√≠an tasks en paralelo? Investiguen .submit() y .map().

**.submit():**
Ejecuta una task as√≠ncrona, devolviendo un Future.
Permite lanzar varias tasks a la vez sin esperar que terminen.

**.map():**
Ejecuta la misma task varias veces en paralelo, una por cada elemento de una lista.

In [13]:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def process_region(region: str, df: pd.DataFrame) -> dict:
    """Procesa datos de una regi√≥n espec√≠fica."""
    df_region = df[df['region'] == region]
    return {
        'region': region,
        'total': df_region['total'].sum(),
        'count': len(df_region)
    }

@flow(task_runner=ConcurrentTaskRunner())  # usar concurrencia real
def etl_flow_parallel():
    df_raw = extract_data()
    df_clean = transform_data(df_raw)

    # Procesar cada regi√≥n en paralelo
    regiones = ['Norte', 'Sur', 'Este', 'Oeste']
    futures = [process_region.submit(r, df_clean) for r in regiones]  # m√©todo async

    # Esperar resultados
    results = [f.result() for f in futures]  # obtener resultados

    print(f"üìä Resultados por regi√≥n: {results}")
    return results

# Parte 5 ‚Äî Investigaci√≥n: Deployments y Scheduling

## 5.1 Conceptos de Deployment

### 1. ¬øQu√© es un Deployment en Prefect? ¬øCu√°l es la diferencia entre un Flow y un Deployment?

Un **Deployment** es una *configuraci√≥n ejecutable* de un Flow: incluye c√≥mo, cu√°ndo y d√≥nde se ejecuta.
El **Flow** es el *c√≥digo Python* que define la l√≥gica del pipeline.
El **Deployment** es la *instalaci√≥n* del flujo en Prefect, con par√°metros como:

* scheduling
* work pool
* versi√≥n
* par√°metros del flow
* infraestructura usada
* almacenamiento

En resumen:
*Flow = l√≥gica*
*Deployment = ejecuci√≥n programada y versionada del flow*

### 2. ¬øQu√© es un Work Pool? ¬øPara qu√© sirve?

Un **Work Pool** es un grupo l√≥gico que administra *qu√© infraestructura* se usar√° para ejecutar los deployments.
Sirve para:

* organizar distintos entornos (dev, staging, prod)
* conectar Workers con Flows
* administrar colas de ejecuci√≥n
* separar cargas de trabajo por equipo o tipo de tarea

Es, b√°sicamente, el ‚Äúpuente‚Äù entre los deployments y los workers.

### 3. ¬øQu√© es un Worker? ¬øC√≥mo se relaciona con el Work Pool?

Un **Worker** es un proceso que:

* se registra en un Work Pool
* escucha tareas pendientes
* toma deployments y los ejecuta

La relaci√≥n es:

*Deployment ‚Üí Work Pool ‚Üí Worker ‚Üí Ejecuta el Flow*

El Deployment se env√≠a al Work Pool, el Work Pool lo pone en cola, y los Workers conectados lo levantan y ejecutan.

## 5.2 Scheduling

### 1. ¬øQu√© tipos de schedules soporta Prefect?

Describan al menos 3:

| Tipo de Schedule     | Descripci√≥n                                                                             | Ejemplo                                               |
| -------------------- | --------------------------------------------------------------------------------------- | ----------------------------------------------------- |
| **CronSchedule**     | Usa sintaxis cron para ejecutar flows en horarios repetitivos.                          | `"0 6 * * *"` ‚Üí todos los d√≠as a las 6 AM             |
| **IntervalSchedule** | Ejecuta el flow cada cierto intervalo fijo.                                             | `timedelta(hours=1)` ‚Üí cada 1 hora                    |
| **RRuleSchedule**    | Usa reglas RFC 5545 para horarios complejos (frecuencias, excepciones, m√∫ltiples d√≠as). | ‚ÄúCada lunes y mi√©rcoles a las 9 AM, excepto feriados‚Äù |


### 2. ¬øC√≥mo expresar√≠an "ejecutar todos los d√≠as a las 6 AM" en cron?

```cron
0 6 * * *
```

### 3. ¬øQu√© es un `RRuleSchedule`? ¬øCu√°ndo lo usar√≠an sobre cron?

`RRuleSchedule` implementa las reglas RFC 5545 (iCalendar), permitiendo programaciones **mucho m√°s complejas** que cron:

* horarios irregulares
* m√∫ltiples d√≠as espec√≠ficos
* intervalos combinados
* exclusiones (no correr feriados)
* reglas encadenadas

Lo usar√≠a en lugar de cron cuando necesito l√≥gica de calendario m√°s avanzada que cron *no puede expresar f√°cilmente*, como:

> ‚ÄúCorrer el pipeline s√≥lo los d√≠as h√°biles del mes, a las 9 AM, excluyendo feriados que se definan aparte.‚Äù


## 5.3 Crear un Deployment (conceptual)

In [15]:
# Completar bas√°ndose en la documentaci√≥n de Deployments
# https://docs.prefect.io/latest/concepts/deployments/

# Opci√≥n 1: Usando serve() - m√°s simple
# if __name__ == "__main__":
#     etl_flow.serve(
#         name="etl-diario-ecommerce",  # nombre del deployment
#         cron="0 6 * * *",  # schedule en formato cron
#         tags=["etl", "ecommerce"],  # tags para organizaci√≥n
#     )

# Opci√≥n 2: Usando deploy() - m√°s control
etl_flow.deploy(
    name="etl-diario-ecommerce",
    work_pool_name="local-pool",
    cron="0 6 * * *",
)

<coroutine object sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call at 0x1611609a0>

# Parte 6 ‚Äî Extensi√≥n DataOps

## Opci√≥n A ‚Äî Validaci√≥n con logging estructurado

In [16]:
from prefect import task, get_run_logger

@task(
    retries=3,                   # La task se reintenta 3 veces si falla
    retry_delay_seconds=2        # Espera 2 segundos entre reintentos
)
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Valida la calidad de los datos usando logging estructurado.
    Incluye validaciones b√°sicas y reporta errores antes de continuar.
    """

    logger = get_run_logger()
    errors = []

    # --- Inicio del proceso de validaci√≥n ---
    logger.info("üîç Iniciando validaci√≥n de datos")

    # 1) Validaci√≥n: DataFrame vac√≠o
    if len(df) == 0:
        logger.error("‚ùå DataFrame vac√≠o detectado")
        errors.append("DataFrame vac√≠o")

    # 2) Validaci√≥n: valores nulos por columna
    null_counts = df.isnull().sum()
    if null_counts.sum() > 0:
        logger.warning(f"‚ö†Ô∏è Valores nulos encontrados: {null_counts.to_dict()}")

    # 3) Validaci√≥n: columnas esperadas
    columnas_esperadas = {"fecha", "producto", "cantidad", "precio_unitario", "region", "total"}
    columnas_actuales = set(df.columns)

    if columnas_esperadas - columnas_actuales:
        faltantes = columnas_esperadas - columnas_actuales
        logger.error(f"‚ùå Columnas faltantes: {faltantes}")
        errors.append(f"Columnas faltantes: {faltantes}")

    # Si hubo errores ‚Üí cortar ejecuci√≥n
    if errors:
        raise ValueError(f"Validaci√≥n fallida: {errors}")

    # --- Fin de validaci√≥n ---
    logger.info("‚úÖ Validaci√≥n exitosa")
    return df

**¬øQu√© hace esta task?**

La task `validate_data()` implementa reglas de calidad de datos y usa **logging estructurado de Prefect** para registrar:
* mensajes informativos (`logger.info`)
* advertencias (`logger.warning`)
* errores (`logger.error`)

Esto permite auditar qu√© pas√≥ con los datos y facilita debugging.

**Se agregan:**

```python
retries=3
retry_delay_seconds=2
```

porque en pipelines reales los fallos suelen ser temporales (e.g., un archivo no lleg√≥ todav√≠a).
Con estos par√°metros la task:
* intenta 3 veces
* espera 2 segundos entre cada intento

**Validaciones implementadas**
1. DataFrame vac√≠o
2. Valores nulos
3. Columnas faltantes
4. Si hay errores ‚Üí se corta el flow

# Parte 7 ‚Äî Reflexi√≥n y Conexi√≥n con DataOps (5 min)¬∂

## 7.1 Conceptos de Prefect

### 1. ¬øC√≥mo ayuda Prefect a implementar el principio de "Observabilidad" de DataOps?

Prefect registra autom√°ticamente cada etapa del pipeline: estados, logs, retries, tiempos de ejecuci√≥n y errores.
Esto permite **ver qu√© pas√≥, d√≥nde fall√≥ y por qu√©**, logrando trazabilidad completa sin agregar infraestructura extra.
En DataOps, esto es clave para detectar problemas r√°pido y medir la salud del pipeline.

### 2. ¬øC√≥mo ayuda el caching a la "Reproducibilidad"?

El caching garantiza que, si los inputs no cambiaron, la task devuelve **exactamente el mismo resultado**.
Esto elimina variaciones accidentales, acelera re-ejecuciones y permite reconstruir outputs pasados de manera consistente.
En resumen: **mismos datos ‚Üí mismo resultado**, siempre.

### 3. ¬øC√≥mo conectan los Deployments con "CI/CD para datos"?

Un Deployment convierte un flow en un artefacto versionado, ejecutable y schedulable.
Esto permite integrarlo en pipelines de CI/CD: cuando se hace un commit, el deployment se actualiza, se testea y se despliega autom√°ticamente.
Es la forma en que Prefect permite **automatizar, versionar y publicar** pipelines como si fueran software.

## 7.2 Comparaci√≥n con alternativas

### 1. ¬øQu√© diferencias hay entre Prefect y Apache Airflow? Mencionen al menos 2.

Diferencia 1: Prefect usa un DAG **impl√≠cito**, construido seg√∫n el orden del c√≥digo; Airflow requiere declarar DAGs de forma expl√≠cita y m√°s verbosa.

Diferencia 2: Prefect tiene **mejor manejo de estados, retries y logs** sin configuraci√≥n adicional.
Airflow requiere m√°s setup y es m√°s r√≠gido en su arquitectura.

### 2. ¬øQu√© es Dagster? ¬øEn qu√© se diferencia de Prefect?

Dagster es un framework de orquestaci√≥n enfocado en data assets: permite definir datasets como entidades con dependencias expl√≠citas.
Prefect, en cambio, se centra en **flows y tasks**, con mucha m√°s flexibilidad y menos estructura obligatoria.