# ‚úàÔ∏è Notebook 6: Airflow Pipeline Development

**Author:** Amey Talkatkar | **Course:** MLOps with Agentic AI

## üéØ Learning Objectives
- Understand Airflow DAG structure
- Create tasks with different operators
- Define task dependencies
- Use XCom for inter-task communication
- Build end-to-end ML training pipeline
- Test and debug DAGs

## üî• The Problem
Current ML workflow:
1. Sarah manually runs data pull script at 2 AM
2. If it fails, she debugs for hours
3. Then runs training script
4. If training fails, starts over
5. Manually deploys if successful
6. Exhausted after 3 months

**Airflow Solution:** Automate everything! Run at 2 AM automatically, retry on failures, Sarah sleeps peacefully üò¥

## ‚ö†Ô∏è Important Note

This notebook explains Airflow concepts and shows code examples. 

**To actually run:** Copy code to `~/airflow/dags/` and trigger via Airflow UI.

## Part 1: Airflow Basics

### What is a DAG?

**DAG = Directed Acyclic Graph**

```
     [Start]
        |
        v
   [Pull Data]
        |
        v
  [Validate Data] --> If invalid, STOP
        |
        v
  [Feature Engineering]
        |
    +---+---+
    |   |   |
    v   v   v
  [LR][RF][XGB]  <-- Parallel execution
    |   |   |
    +---+---+
        |
        v
  [Compare Models]
        |
        v
  [Deploy Best]
```

## Part 2: Simple DAG Example

In [None]:
# This code shows DAG structure (don't run in notebook)
example_code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Default arguments for all tasks
default_args = {
    'owner': 'amey',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# Define DAG
with DAG(
    'simple_ml_pipeline',
    default_args=default_args,
    description='Simple ML pipeline',
    schedule='@daily',  # Run once per day
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'training'],
) as dag:
    
    def load_data():
        print("Loading data...")
        return "data_loaded"
    
    def train_model():
        print("Training model...")
        return "model_trained"
    
    def deploy_model():
        print("Deploying model...")
        return "deployed"
    
    # Create tasks
    task_load = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
    )
    
    task_train = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
    )
    
    task_deploy = PythonOperator(
        task_id='deploy_model',
        python_callable=deploy_model,
    )
    
    # Define dependencies
    task_load >> task_train >> task_deploy
'''

print("Simple DAG Example:")
print(example_code)

## Part 3: Task Dependencies

### Linear (Sequential)
```python
task_a >> task_b >> task_c
# Runs: A ‚Üí B ‚Üí C
```

### Parallel
```python
task_a >> [task_b, task_c, task_d] >> task_e
# Runs: A ‚Üí (B, C, D in parallel) ‚Üí E
```

### Branching
```python
from airflow.operators.python import BranchOperator

def choose_branch(**context):
    if condition:
        return 'deploy_task'
    else:
        return 'skip_task'

branch = BranchOperator(
    task_id='branch',
    python_callable=choose_branch,
)
```

## Part 4: XCom (Cross-Communication)

Pass data between tasks:

In [None]:
xcom_example = '''
# Task 1: Push data to XCom
def task_push(**context):
    rmse = 1234.56
    context['ti'].xcom_push(key='model_rmse', value=rmse)
    return "rmse_calculated"

# Task 2: Pull data from XCom
def task_pull(**context):
    rmse = context['ti'].xcom_pull(
        task_ids='calculate_rmse',
        key='model_rmse'
    )
    print(f"Model RMSE: {rmse}")
    
    if rmse < 1500:
        return "deploy"
    else:
        return "reject"
'''

print("XCom Example:")
print(xcom_example)

## Part 5: Complete ML Training Pipeline

Full production-ready DAG:

In [None]:
full_pipeline = '''
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import os

default_args = {
    'owner': 'amey',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': False,
}

def dvc_pull_data(**context):
    """Pull latest data from DVC."""
    import subprocess
    result = subprocess.run(
        ['dvc', 'pull', 'data/raw/sales_data.csv.dvc'],
        capture_output=True,
        cwd='/path/to/project'
    )
    if result.returncode != 0:
        raise Exception(f"DVC pull failed: {result.stderr}")
    return "data_pulled"

def validate_data(**context):
    """Validate data quality."""
    df = pd.read_csv('data/raw/sales_data.csv')
    
    # Check for missing values
    missing = df.isnull().sum().sum()
    if missing > 0:
        raise ValueError(f"Data has {missing} missing values!")
    
    # Check minimum rows
    if len(df) < 1000:
        raise ValueError(f"Not enough data: {len(df)} rows")
    
    context['ti'].xcom_push(key='num_rows', value=len(df))
    return "data_valid"

def train_model(**context):
    """Train model with MLflow tracking."""
    # Load data
    X_train = pd.read_csv('data/processed/X_train.csv')
    y_train = pd.read_csv('data/processed/y_train.csv').squeeze()
    X_test = pd.read_csv('data/processed/X_test.csv')
    y_test = pd.read_csv('data/processed/y_test.csv').squeeze()
    
    # Set MLflow
    mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
    mlflow.set_experiment('airflow_pipeline')
    
    with mlflow.start_run(run_name='random_forest_airflow'):
        # Train
        model = RandomForestRegressor(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = model.predict(X_test)
        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
        
        # Log to MLflow
        mlflow.log_params(model.get_params())
        mlflow.log_metric('rmse', rmse)
        mlflow.sklearn.log_model(model, 'model')
        
        # Push metrics to XCom
        context['ti'].xcom_push(key='rmse', value=float(rmse))
        context['ti'].xcom_push(key='run_id', value=mlflow.active_run().info.run_id)
        
    return "model_trained"

def evaluate_model(**context):
    """Decide if model is good enough."""
    ti = context['ti']
    rmse = ti.xcom_pull(task_ids='train_model', key='rmse')
    
    threshold = 2000  # Business requirement
    
    if rmse < threshold:
        print(f"‚úÖ Model approved: RMSE {rmse:.2f} < {threshold}")
        return 'register_model'
    else:
        print(f"‚ùå Model rejected: RMSE {rmse:.2f} >= {threshold}")
        return 'skip_registration'

def register_model(**context):
    """Register model to MLflow registry."""
    ti = context['ti']
    run_id = ti.xcom_pull(task_ids='train_model', key='run_id')
    
    mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
    
    model_uri = f"runs:/{run_id}/model"
    mv = mlflow.register_model(model_uri, "sales_forecasting_model")
    
    # Transition to Staging
    client = mlflow.MlflowClient()
    client.transition_model_version_stage(
        name="sales_forecasting_model",
        version=mv.version,
        stage="Staging"
    )
    
    return "model_registered"

# Define DAG
with DAG(
    'ml_training_pipeline_complete',
    default_args=default_args,
    description='Complete ML training pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'production'],
) as dag:
    
    # Tasks
    start = EmptyOperator(task_id='start')
    
    pull_data = PythonOperator(
        task_id='dvc_pull_data',
        python_callable=dvc_pull_data,
    )
    
    validate = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data,
    )
    
    train = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
    )
    
    evaluate = BranchOperator(
        task_id='evaluate_model',
        python_callable=evaluate_model,
    )
    
    register = PythonOperator(
        task_id='register_model',
        python_callable=register_model,
    )
    
    skip = EmptyOperator(task_id='skip_registration')
    
    end = EmptyOperator(
        task_id='end',
        trigger_rule='none_failed_min_one_success'
    )
    
    # Dependencies
    start >> pull_data >> validate >> train >> evaluate
    evaluate >> [register, skip]
    [register, skip] >> end
'''

print("Complete ML Training Pipeline:")
print("(Copy this to ~/airflow/dags/ml_training_pipeline.py)\n")
print(full_pipeline)

## Part 6: Testing DAGs

Before deploying to production:

```bash
# Test if DAG has syntax errors
python ~/airflow/dags/ml_training_pipeline.py

# List DAGs
airflow dags list | grep ml_training

# Test specific task
airflow tasks test ml_training_pipeline train_model 2024-01-01

# Trigger DAG manually
airflow dags trigger ml_training_pipeline

# Check DAG runs
airflow dags list-runs -d ml_training_pipeline
```

## Part 7: Monitoring & Debugging

### View Logs
```bash
# View task logs
airflow tasks logs ml_training_pipeline train_model 2024-01-01

# Or in UI: DAG ‚Üí Run ‚Üí Task ‚Üí Logs
```

### Common Issues

**Issue 1: Task fails intermittently**
```python
default_args = {
    'retries': 3,  # Retry 3 times
    'retry_delay': timedelta(minutes=5),
}
```

**Issue 2: Task stuck in running state**
```bash
# Clear task state
airflow tasks clear ml_training_pipeline train_model
```

**Issue 3: DAG not showing up**
```bash
# Check import errors
airflow dags list-import-errors
```

## Part 8: Production Best Practices

### 1. Use Task Groups
```python
from airflow.utils.task_group import TaskGroup

with TaskGroup('training_group') as training:
    train_lr = PythonOperator(...)
    train_rf = PythonOperator(...)
    train_xgb = PythonOperator(...)
```

### 2. Error Notifications
```python
default_args = {
    'email': ['team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
}
```

### 3. Resource Management
```python
# Limit parallel tasks on t3.small (2GB RAM)
# In airflow.cfg:
# parallelism = 4
# dag_concurrency = 4
```

### 4. Idempotency
Tasks should be safe to re-run:
```python
def save_results():
    # BAD: Append to file (duplicates on re-run)
    # df.to_csv('results.csv', mode='a')
    
    # GOOD: Overwrite with timestamp
    df.to_csv(f'results_{datetime.now():%Y%m%d}.csv')
```

## ‚úÖ Summary

### What We Learned:
1. ‚úÖ **DAG Structure**: Python file defining workflow
2. ‚úÖ **Task Types**: PythonOperator, BashOperator, BranchOperator
3. ‚úÖ **Dependencies**: >>, parallel execution
4. ‚úÖ **XCom**: Pass data between tasks
5. ‚úÖ **Scheduling**: @daily, cron expressions
6. ‚úÖ **Error Handling**: Retries, failure alerts

### Real-World Benefits:
- üöÄ **Automation**: No manual intervention
- üîÑ **Reliability**: Automatic retries
- üìä **Visibility**: See what's running/failed
- ‚è∞ **Scheduling**: Run at optimal times
- üë• **Team Coordination**: Everyone sees pipeline status

### Before Airflow:
"Sarah, did you run the training script?"
"Sarah, why did it fail?"
"Sarah, can you run it again?"

### After Airflow:
Pipeline runs automatically every day at 2 AM.
If it fails, retries 3 times.
Sends alert if still fails.
Team checks dashboard for status.
Sarah sleeps peacefully. üò¥

---

**Next:** `07_Complete_Pipeline_Walkthrough.ipynb` - Put it all together!

**¬© 2024 Amey Talkatkar**