# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [1]:
from pathlib import Path
import pandas as pd
tasks = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'inputs': ['/data/raw.ext', 'prices_raw.json', 'prices_clean.json', 'model.json'],
    'outputs': ['prices_raw.json', 'prices_clean.json', 'model.json', 'report.txt'],
    'idempotent': [True, True, True, True]
})
tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,/data/raw.ext,prices_raw.json,True
1,clean,prices_raw.json,prices_clean.json,True
2,train_or_score,prices_clean.json,model.json,True
3,report,model.json,report.txt,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [2]:
dag = {
    'ingest': [],
    'clean': ['ingest'],
    'train_or_score': ['clean'],
    'report': ['train_or_score']
}
dag

{'ingest': [],
 'clean': ['ingest'],
 'train_or_score': ['clean'],
 'report': ['train_or_score']}

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [3]:
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'log_messages': [
        'start/end, rows, source URI',
        'start/end, rows in/out',
        'params, metrics',
        'artifact path'
    ],
    'checkpoint_artifact': [
        'prices_raw.json',
        'prices_clean.json',
        'model.json',
        'report.txt'
    ]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, rows, source URI",prices_raw.json
1,clean,"start/end, rows in/out",prices_clean.json
2,train_or_score,"params, metrics",model.json
3,report,artifact path,report.txt


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

- Automate now: `ingest`, `clean`, `train_or_score` — these are deterministic, idempotent, and have clear inputs/outputs.
- Keep manual (for now): `report` narrative writing and visual polishing; automate only the metrics dump.
- Rationale: focusing automation on steps with clear schemas maximizes reproducibility and reduces toil; the narrative step benefits from human review.
*(Write your rationale here.)*

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [6]:
import argparse, json, logging, sys
from pathlib import Path
from datetime import datetime

def my_task(input_path: str, output_path: str) -> None:
    """
    Example task template: read JSON -> transform -> write JSON.
    Replace the 'result' construction with your real logic.
    """
    logging.info("[my_task] start")
    src = Path(input_path)
    if not src.exists():
        raise FileNotFoundError(f"Input not found: {src}")

    data = json.loads(src.read_text())
    # --- TODO: your domain logic here ---
    result = {
        "run_at_utc": datetime.utcnow().isoformat(),
        "rows_in": len(data) if isinstance(data, list) else 1,
        "note": "replace with real output",
        "data_preview": data[:3] if isinstance(data, list) else data
    }
    # ------------------------------------
    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(result, indent=2))
    logging.info("[my_task] wrote %s", out)

def main(argv=None):
    parser = argparse.ArgumentParser(description="Homework task wrapper")
    parser.add_argument("--input", required=True, help="Path to input JSON")
    parser.add_argument("--output", required=True, help="Path to output JSON")
    args = parser.parse_args(argv)

    logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)])
    try:
        my_task(args.input, args.output)
    except Exception as e:
        logging.exception("Task failed: %s", e)
        sys.exit(1)

if __name__ == "__main__":
    # Example CLI (uncomment to run inside notebook):
    # main(["--input", "data/raw.ext", "--output", "data/out.json"])
    pass


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [7]:
import time
from functools import wraps
from typing import Callable, Any

def retry(n_tries: int = 3, delay_s: float = 0.2, backoff_s: float = 0.2, exceptions=(Exception,)):
    """
    Retry decorator with linear backoff.
    n_tries: total attempts (>=1)
    delay_s: initial sleep before first retry
    backoff_s: added sleep per retry attempt
    exceptions: tuple of exception classes to catch
    """
    def wrapper(fn: Callable):
        @wraps(fn)
        def inner(*args, **kwargs) -> Any:
            attempt = 0
            while True:
                try:
                    return fn(*args, **kwargs)
                except exceptions as e:
                    attempt += 1
                    if attempt >= n_tries:
                        raise
                    sleep_for = delay_s + backoff_s * (attempt - 1)
                    time.sleep(max(0.0, sleep_for))
        return inner
    return wrapper
