# 08 — Pipeline com Airflow e Prefect / Airflow & Prefect Pipeline

Este notebook demonstra um pipeline integrado de ETL/ML usando Apache Airflow e Prefect, incluindo: agendamento, orquestração, automação de ingestão/modelagem, métricas e logs. / This notebook demonstrates an integrated ETL/ML pipeline using Apache Airflow and Prefect, including: scheduling, orchestration, automated ingestion/modeling, metrics and logging.

## Requisitos / Requirements
- Python 3.10+
- Apache Airflow (>=2.7)
- Prefect (>=2.16)
- pandas, scikit-learn, requests

Instalação rápida / Quick install:
```bash
pip install "apache-airflow==2.7.*" --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt
pip install prefect pandas scikit-learn requests
```

In [None]:
# Imports essenciais / Essential imports
import os, json, time, logging, datetime as dt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

# Prefect
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from prefect.deployments import run_deployment
from prefect.logging import get_logger as get_prefect_logger

# Airflow (opcional para execução neste notebook)
try:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    AIRFLOW_AVAILABLE = True
except Exception:
    AIRFLOW_AVAILABLE = False

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


## Dataset de exemplo / Sample dataset
Geramos dados sintéticos para o fluxo. / We generate synthetic data for the flow.

In [None]:
import numpy as np
rng = np.random.default_rng(42)
N = 1000
df = pd.DataFrame({
    'feature_1': rng.normal(0, 1, N),
    'feature_2': rng.normal(5, 2, N),
    'feature_3': rng.integers(0, 10, N),
})
df['target'] = (df['feature_1'] + 0.3*df['feature_2'] - 0.1*df['feature_3'] > 2.0).astype(int)
df.head()


## Tarefas Prefect / Prefect Tasks
Cada etapa (ingestão, limpeza, treino, avaliação) é uma task. / Each step (ingestion, cleaning, training, evaluation) is a task.

In [None]:
@task
def ingest_data() -> pd.DataFrame:
    logger = get_run_logger()
    logger.info("Ingesting data")
    return df.copy()

@task
def clean_data(data: pd.DataFrame) -> pd.DataFrame:
    logger = get_run_logger()
    logger.info("Cleaning data")
    data = data.dropna().reset_index(drop=True)
    return data

@task
def split_data(data: pd.DataFrame):
    logger = get_run_logger()
    logger.info("Splitting data")
    X = data[['feature_1','feature_2','feature_3']]
    y = data['target']
    return train_test_split(X, y, test_size=0.2, random_state=42)

@task
def train_model(X_train, y_train):
    logger = get_run_logger()
    logger.info("Training model")
    model = LogisticRegression(max_iter=500)
    model.fit(X_train, y_train)
    return model

@task
def evaluate(model, X_test, y_test) -> dict:
    logger = get_run_logger()
    logger.info("Evaluating model")
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred, output_dict=True)
    logger.info(f"precision={report['weighted avg']['precision']:.3f} recall={report['weighted avg']['recall']:.3f}")
    return report


## Flow Prefect com agendamento / Prefect flow with scheduling
O flow pode ser implantado com um cron. / The flow can be deployed with a cron.

In [None]:
@flow(name="etl_ml_pipeline", task_runner=ConcurrentTaskRunner())
def etl_ml_pipeline():
    data = ingest_data()
    data = clean_data(data)
    X_train, X_test, y_train, y_test = split_data(data)
    model = train_model(X_train, y_train)
    metrics = evaluate(model, X_test, y_test)
    return metrics

if __name__ == "__main__":
    # Execução direta / Direct run
    metrics = etl_ml_pipeline()
    print(json.dumps(metrics, indent=2)[:1000])


### Implantação e agendamento Prefect / Prefect deployment & scheduling
Exemplo de CLI (fora do notebook) / CLI example (outside the notebook):
```bash
prefect deployment build 08_pipeline_airflow_prefect.ipynb:etl_ml_pipeline \
  -n etl-ml-daily -q default -s "0 2 * * *" -o deployment.yaml
prefect deployment apply deployment.yaml
prefect agent start -q default
```
Isso agenda execução diária às 02:00. / Schedules daily run at 02:00.

## DAG do Airflow / Airflow DAG
Exemplo mínimo que chama o flow Prefect via CLI. / Minimal example calling Prefect flow via CLI.

In [None]:
AIRFLOW_EXAMPLE = r'''
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='prefect_etl_ml_pipeline',
    default_args=default_args,
    start_date=datetime(2025, 1, 1),
    schedule_interval='0 2 * * *',
    catchup=False,
) as dag:
    run_prefect = BashOperator(
        task_id='run_prefect_flow',
        bash_command='prefect deployment run etl_ml_pipeline/etl-ml-daily'
    )
'''
print(AIRFLOW_EXAMPLE[:400])


## Métricas e Logs / Metrics and Logs
- Prefect: UI e logs estruturados das tasks. / Prefect: UI and structured task logs.
- Airflow: UI, XComs e logs por tarefa. / Airflow: UI, XComs and per-task logs.
- Salve métricas no formato JSON para auditoria. / Save metrics as JSON for audit.

In [None]:
@task
def persist_metrics(metrics: dict, path: str = 'metrics_prefect.json'):
    import json
    with open(path, 'w') as f:
        json.dump(metrics, f)
    get_run_logger().info(f"Saved metrics to {path}")

@flow
def etl_ml_pipeline_with_persistence():
    m = etl_ml_pipeline()
    persist_metrics(m)
    return m


## Conclusão / Conclusion
Este notebook oferece um template para orquestrar ETL/ML com Prefect e Airflow, com agendamento, logs e persistência de métricas. / This notebook provides a template to orchestrate ETL/ML with Prefect and Airflow, with scheduling, logs and metrics persistence.