# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [None]:
from pathlib import Path
import pandas as pd

# Options Pricing Model Pipeline Tasks
tasks = pd.DataFrame({
    'task': ['data_fetch', 'data_validate', 'feature_engineer', 'model_train', 'model_evaluate', 'model_deploy', 'generate_report'],
    'inputs': [
        'API endpoints, config.py',
        'data/raw/options_data_YYYYMMDD.csv',
        'data/processed/validated_data.csv', 
        'data/processed/features_YYYYMMDD.csv',
        'model + test data',
        'trained model',
        'all pipeline artifacts'
    ],
    'outputs': [
        'data/raw/options_data_YYYYMMDD.csv',
        'data/processed/validated_data.csv',
        'data/processed/features_YYYYMMDD.csv',
        'model/options_pricing_model.pkl',
        'reports/evaluation_metrics.json',
        'Flask API + Streamlit app',
        'reports/stakeholder_report.md'
    ],
    'idempotent': [True, True, True, False, True, False, True]
})

print("Options Pricing Model Pipeline Tasks:")
tasks

## 2) Dependencies (DAG)

### Options Pricing Model Pipeline Dependencies

```
data_fetch
    ↓
data_validate
    ↓
feature_engineer
    ↓
model_train ──→ model_evaluate
    ↓              ↓
model_deploy   generate_report
```

**Sequential Core**: Data flows through fetch → validate → engineer → train  
**Parallel Branches**: After training, evaluation and deployment can run independently  
**Report Generation**: Waits for all upstream tasks for comprehensive summary

In [None]:
# Options Pricing Model DAG Dependencies
dag = {
    'data_fetch': [],
    'data_validate': ['data_fetch'],
    'feature_engineer': ['data_validate'],
    'model_train': ['feature_engineer'],
    'model_evaluate': ['model_train'],
    'model_deploy': ['model_train'],
    'generate_report': ['model_evaluate', 'model_deploy']
}

print("Pipeline Dependencies:")
for task, deps in dag.items():
    deps_str = ', '.join(deps) if deps else 'None (root task)'
    print(f"{task}: depends on [{deps_str}]")
    
dag

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [None]:
# Options Pricing Model Logging & Checkpoint Plan
logging_plan = pd.DataFrame({
    'task': ['data_fetch', 'data_validate', 'feature_engineer', 'model_train', 'model_evaluate', 'model_deploy', 'generate_report'],
    'log_location': [
        'logs/data_fetch_YYYYMMDD.log',
        'logs/data_validate_YYYYMMDD.log', 
        'logs/feature_eng_YYYYMMDD.log',
        'logs/model_train_YYYYMMDD.log',
        'logs/model_eval_YYYYMMDD.log',
        'logs/deployment_YYYYMMDD.log',
        'logs/reporting_YYYYMMDD.log'
    ],
    'log_messages': [
        'start/end, rows fetched, API response codes',
        'validation rules passed/failed, null rates',
        'features created, distribution stats',
        'hyperparameters, training metrics, convergence',
        'performance metrics, bootstrap results',
        'API health checks, service status',
        'artifacts processed, report sections'
    ],
    'checkpoint_artifact': [
        'data/raw/options_data_YYYYMMDD.csv',
        'data/processed/validated_data.csv',
        'data/processed/features_YYYYMMDD.csv',
        'model/options_pricing_model.pkl',
        'reports/evaluation_metrics.json',
        'deployment status log',
        'reports/stakeholder_report.md'
    ]
})

print("Logging & Checkpoint Strategy:")
logging_plan

## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

### Automation Strategy for Options Pricing Model

**Automate Now (High Value, Low Risk):**
- **data_fetch**: Scheduled daily at market close (6 PM EST) - deterministic API calls
- **data_validate**: Immediate validation with automated alerts - well-defined rules  
- **feature_engineer**: Deterministic transformations (moneyness, vol-time) - safe to automate
- **generate_report**: Template-based reporting with consistent output format

**Keep Manual (High Risk, Requires Judgment):**
- **model_train**: Requires hyperparameter tuning and performance validation
- **model_evaluate**: Results need human interpretation for business context (R² only 15.6%)
- **model_deploy**: Production deployment requires careful validation and rollback planning

**Rationale**: Given the model's current limitations (low R², high uncertainty), human oversight is critical for model-related tasks. Data processing tasks are well-understood and provide immediate automation value through consistency and reliability.

**Future Automation**: Consider automating model tasks when R² consistently exceeds 25% and comprehensive drift detection is implemented.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [None]:
import argparse, json, logging, sys
from datetime import datetime

def my_task(input_path: str, output_path: str) -> None:
    '''Example task template: read → transform → write JSON.'''
    logging.info('[my_task] start')
    # TODO: implement your logic
    result = {'run_at': datetime.utcnow().isoformat(), 'note': 'replace with real output'}
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    Path(output_path).write_text(json.dumps(result, indent=2))
    logging.info('[my_task] wrote %s', output_path)

def main(argv=None):
    parser = argparse.ArgumentParser(description='Homework task wrapper')
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args(argv)
    logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)])
    my_task(args.input, args.output)

if **name** == '**main**'':
    # Example simulated CLI in notebook:
    main(['--input', 'data/in.ext', '--output', 'data/out.json'])

### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [None]:
import time
def retry(n_tries=3, delay=0.2):
    def wrapper(fn, *args, **kwargs):
        # TODO: implement try/except loop with sleep backoff
        return fn(*args, **kwargs)
    return wrapper