# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [1]:
from pathlib import Path
import pandas as pd
from pathlib import Path
import pandas as pd

tasks = pd.DataFrame({
    'task': [
        'ingest',          # fetch raw data (e.g., yfinance or API)
        'validate',        # schema + data quality checks
        'clean',           # remove NaNs, outliers, enrich features
        'feature_engineer',# compute MA/Volatility/Lags
        'train_or_score',  # train new model or score with existing
        'evaluate',        # metrics, drift detection
        'report'           # save final outputs for stakeholders
    ],
    'inputs': [
        '/data/raw.ext',        # external data source
        'prices_raw.json',      # output of ingest
        'prices_valid.json',    # after validation
        'prices_clean.json',    # after cleaning
        'features.json',        # engineered dataset
        'model.json',           # trained model
        'evaluation.json'       # eval results
    ],
    'outputs': [
        'prices_raw.json',      # stored raw data
        'prices_valid.json',    # validated dataset
        'prices_clean.json',    # cleaned dataset
        'features.json',        # engineered features
        'model.json',           # serialized model
        'evaluation.json',      # metrics + drift info
        'report.txt'            # human-readable report
    ],
    'idempotent': [
        True,   # ingest: safe to rerun (overwrites)
        True,   # validate: same output for same input
        True,   # clean: deterministic
        True,   # feature engineering: deterministic
        False,  # train_or_score: may change if randomness not fixed
        True,   # evaluate: deterministic given same model+data
        True    # report: deterministic given inputs
    ]
})

tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,/data/raw.ext,prices_raw.json,True
1,validate,prices_raw.json,prices_valid.json,True
2,clean,prices_valid.json,prices_clean.json,True
3,feature_engineer,prices_clean.json,features.json,True
4,train_or_score,features.json,model.json,False
5,evaluate,model.json,evaluation.json,True
6,report,evaluation.json,report.txt,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [2]:
dag = {
    'ingest': [],
    'validate': ['ingest'],
    'clean': ['validate'],
    'feature_engineer': ['clean'],
    'train_or_score': ['feature_engineer'],
    'evaluate': ['train_or_score'],
    'report': ['evaluate']
}
dag

{'ingest': [],
 'validate': ['ingest'],
 'clean': ['validate'],
 'feature_engineer': ['clean'],
 'train_or_score': ['feature_engineer'],
 'evaluate': ['train_or_score'],
 'report': ['evaluate']}

In [11]:
**The pipeline follows a step-by-step sequence of tasks:

- **ingest**: fetches raw data and produces `prices_raw.json`.  
- **validate**: depends on `ingest`, checks schema/quality, outputs `prices_valid.json`.  
- **clean**: depends on `validate`, cleans the dataset, outputs `prices_clean.json`.  
- **feature_engineer**: depends on `clean`, creates moving averages, volatility, and lags, outputs `features.json`.  
- **train_or_score**: depends on `feature_engineer`, either trains a model or scores with an existing one, outputs `model.json` or `predictions.json`.  
- **evaluate**: depends on `train_or_score`, evaluates results, outputs `evaluation.json`.  
- **report**: depends on `evaluate`, generates a human-readable `report.txt`.  

### Idempotency
- `ingest`, `validate`, `clean`, `feature_engineer`, `evaluate`, and `report` are idempotent - the same inputs always produce the same outputs.  
- `train_or_score` is not strictly idempotent unless random seeds and data windows are controlled.

### Failure & Retry
- If **validate** fails, the workflow stops and reports the error.  
- **ingest**, **clean**, and **feature_engineer** can be retried with backoff on transient failures.  
- **train_or_score** retries once; if it still fails, the previous model is kept.  
- **evaluate** and **report** only run if their required inputs are available.

### DAG Dictionary
```python
dag = {
    "ingest": [],
    "validate": ["ingest"],
    "clean": ["validate"],
    "feature_engineer": ["clean"],
    "train_or_score": ["feature_engineer"],
    "evaluate": ["train_or_score"],
    "report": ["evaluate"],
}

SyntaxError: invalid syntax (1301444899.py, line 1)

```mermaid
graph TD
    ingest --> validate
    validate --> clean
    clean --> feature_engineer
    feature_engineer --> train_or_score
    train_or_score --> evaluate
    evaluate --> report

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [12]:
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'log_messages': [
        'start/end, rows, source URI',
        'start/end, rows in/out',
        'params, metrics',
        'artifact path'
    ],
    'checkpoint_artifact': [
        'prices_raw.json',
        'prices_clean.json',
        'model.json',
        'report.txt'
    ]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, rows, source URI",prices_raw.json
1,clean,"start/end, rows in/out",prices_clean.json
2,train_or_score,"params, metrics",model.json
3,report,artifact path,report.txt


## 4) Right-Sizing Automation

### Automate Now
- **Data ingestion (ingest)**: Automated via yfinance/API fetch. It is repetitive, deterministic, and benefits from scheduling (daily/weekly runs).  
- **Validation & cleaning (validate, clean)**: Automated checks for schema, nulls, duplicates, and outlier handling. These are rule-based and consistent across runs.  
- **Feature engineering (feature_engineer)**: Automated creation of MA5/MA10/MA20, volatility, and lag features. The logic is fixed, so running it programmatically avoids human error.  
- **Model training and scoring (train_or_score)**: Automated for reproducibility. Training can be scheduled periodically or triggered when new data arrives; scoring runs whenever predictions are requested.  
- **Evaluation (evaluate)**: Automated metrics (AUC, precision/recall, drift statistics) ensure objective, consistent tracking of model quality.

### Keep Manual (for now)
- **Reporting (report)**: While report generation can be scripted, final interpretation and narrative for stakeholders benefit from human oversight. Analysts add context, business implications, and highlight risks that automation alone cannot.  
- **Retraining decisions**: Although training can be automated, the decision *when* to retrain (e.g., based on business cycles, data anomalies, or drift signals) should remain manual until confidence in monitoring thresholds is high.  
- **Feature updates**: Adding new domain-specific features should stay manual, since it requires domain expertise and experimentation.

### Why
- **Automated tasks** are repetitive, rules-driven, and benefit from consistency and scalability. They reduce human error and free up time for higher-level tasks.  
- **Manual tasks** involve judgment, interpretation, or creativity (e.g., report writing, deciding when new features are necessary). Leaving these human-in-the-loop ensures decisions align with business context and ethical considerations.  
- This balance avoids over-engineering early, while still giving reliable, reproducible automation where it adds the most value.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [15]:
import argparse, json, logging, sys
from datetime import datetime
from pathlib import Path

def my_task(input_path: str, output_path: str) -> None:
    logging.info("[my_task] start")
    result = {
        "run_at": datetime.utcnow().isoformat(),
        "note": "replace with real output"
    }
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    Path(output_path).write_text(json.dumps(result, indent=2))
    logging.info("[my_task] wrote %s", output_path)

def main(argv=None):
    parser = argparse.ArgumentParser(description="Homework task wrapper")
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    args = parser.parse_args(argv)
    logging.basicConfig(level=logging.INFO,
                        handlers=[logging.StreamHandler(sys.stdout)])
    my_task(args.input, args.output)

# CLI entrypoint
if __name__ == "__main__":          
    main(["--input", "data/in.ext", "--output", "data/out.json"]) 

INFO:root:[my_task] start
INFO:root:[my_task] wrote data/out.json


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [None]:
import time
def retry(n_tries=3, delay=0.2):
    def wrapper(fn, *args, **kwargs):
        # TODO: implement try/except loop with sleep backoff
        return fn(*args, **kwargs)
    return wrapper