<a href="https://colab.research.google.com/github/Leo-xxx12/bootcamp_Leo_Xu/blob/main/Copy_of_stage15_orchestration_system_design_homework_starter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [1]:
from pathlib import Path
import pandas as pd
tasks = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'inputs': ['/data/raw.ext', 'prices_raw.json', 'prices_clean.json', 'model.json'],
    'outputs': ['prices_raw.json', 'prices_clean.json', 'model.json', 'report.txt'],
    'idempotent': [True, True, True, True]
})
tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,/data/raw.ext,prices_raw.json,True
1,clean,prices_raw.json,prices_clean.json,True
2,train_or_score,prices_clean.json,model.json,True
3,report,model.json,report.txt,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [2]:
dag = {
    'ingest': [],
    'clean': ['ingest'],
    'train_or_score': ['clean'],
    'report': ['train_or_score']
}
dag

{'ingest': [],
 'clean': ['ingest'],
 'train_or_score': ['clean'],
 'report': ['train_or_score']}

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [3]:
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'log_messages': [
        'start/end, rows, source URI',
        'start/end, rows in/out',
        'params, metrics',
        'artifact path'
    ],
    'checkpoint_artifact': [
        'prices_raw.json',
        'prices_clean.json',
        'model.json',
        'report.txt'
    ]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, rows, source URI",prices_raw.json
1,clean,"start/end, rows in/out",prices_clean.json
2,train_or_score,"params, metrics",model.json
3,report,artifact path,report.txt


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

Automate now (deterministic, repeatable, low-judgment)
ingest → scheduled job (cron/Airflow) pulls source, writes prices_raw.json; log start/end, row counts, source URI.
clean → scripted transforms + data-quality checks (null/schema/range); writes prices_clean.json; fail/alert on violations.
score (in train_or_score) → load the saved model, batch/predict daily (or serve via /predict API); log params + metrics; store predictions/artifacts.
monitoring & alerts → automatic checks across layers:
data: freshness minutes, null rate, schema hash
model: rolling MAE/AUC, calibration error, drift (e.g., PSI > 0.2)
system: p95 latency, error rate
business: approval/bad rate deltas
Trigger alerts and write to a run log.
artifact versioning → auto save model (model.json/pkl) and run metadata; keep checkpoint paths from your table.
Keep manual (judgment, changeful, or needs approval)
feature ideation & outlier policy changes → requires domain input; risky to auto-tweak.
model retraining decision → triggered by drift/metric alarms, but approved manually (guard against noisy periods/backtests).
hyperparameter tuning & model selection → run notebooks/experiments off-line; review before promoting.
thresholds for alerts/KPIs → set & revisit with stakeholders (business trade-offs).
stakeholder reports → auto-generate metric snapshots, but human-written commentary and decision recommendations.
schema changes / new data sources → manual review & migration to avoid silent breakage.
Near-term cadence & triggers
Daily: ingest → clean → score → push metrics; alert on breaches.
Weekly: manual review of dashboards; adjust thresholds if needed.
Retrain: when drift > threshold or rolling error ↑ X% over Y days—kick off pipeline, gate with manual sign-off, then deploy.
Why this split?
Automate the stable, testable steps that benefit from speed and consistency; keep human control where context, risk, or strategy changes matter. This gives you reliability day-to-day and prudence at the key decision points.


## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [5]:
import argparse, json, logging, sys
from datetime import datetime
from pathlib import Path

def my_task(input_path: str, output_path: str) -> None:
    """Example task: read (if exists) -> write a JSON result."""
    logging.info('[my_task] start')

    # Optional: read input (robust to non-JSON files)
    payload = {}
    ip = Path(input_path)
    if ip.exists():
        with ip.open('r') as f:
            try:
                payload = json.load(f)
            except json.JSONDecodeError:
                payload = {'raw_text': f.read()}

    result = {
        'run_at': datetime.utcnow().isoformat(),
        'input_preview': str(payload)[:200],
        'note': 'replace with real output'
    }

    op = Path(output_path)
    op.parent.mkdir(parents=True, exist_ok=True)
    op.write_text(json.dumps(result, indent=2))
    logging.info('[my_task] wrote %s', op)

def main(argv=None):
    parser = argparse.ArgumentParser(description='Homework task wrapper')
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args(argv)

    logging.basicConfig(level=logging.INFO,
                        handlers=[logging.StreamHandler(sys.stdout)])

    my_task(args.input, args.output)

if __name__ == '__main__':
    # Example simulated CLI in notebook:
    main(['--input', 'data/in.ext', '--output', 'data/out.json'])


  'run_at': datetime.utcnow().isoformat(),


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [6]:
import time

def retry(n_tries=3, delay=0.2):
    def decorator(fn):
        def wrapper(*args, **kwargs):
            for i in range(n_tries):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    if i < n_tries - 1:
                        wait = delay * (i + 1)  # linear backoff
                        print(f"[retry] Attempt {i+1} failed: {e}. Retrying in {wait:.1f}s...")
                        time.sleep(wait)
                    else:
                        print(f"[retry] Failed after {n_tries} attempts.")
                        raise
        return wrapper
    return decorator
