# Stage 15 — Orchestration & System Design (Filled)

This notebook is **self-contained** and satisfies the Stage 15 homework deliverables without creating any extra files.
It includes:
- Pipeline tasks & boundaries (4–8 tasks)
- Dependencies (DAG) rendered **inline** with `matplotlib`
- Reliability patterns: logging, checkpoints, idempotency (in-memory demo)
- Failure points & retry policy (with exponential backoff utility)
- What to automate now vs manual gate (with rationale)
- Minimal runbook snippets
- Final deliverables checklist

## A. Pipeline Tasks & Boundaries

1. **Ingest Raw Data**  
   **Input:** `data/raw/` (dated partitions)  
   **Output:** `data/ingested/data_ingested.parquet`  
   **Idempotent:** Yes (immutable inputs → deterministic output)  
   **Logging:** `logs/ingest.log` (pattern)  
   **Checkpoint:** `checkpoints/ingest.ok` (pattern)

2. **Validate & Profile**  
   **Input:** `data/ingested/data_ingested.parquet`  
   **Output:** `reports/data_profile.json`, `reports/validation_summary.md`  
   **Idempotent:** Yes  
   **Logging:** `logs/validate.log`  
   **Checkpoint:** `checkpoints/validate.ok`

3. **Clean & Feature Build**  
   **Input:** ingested data + `config/schema.yaml`  
   **Output:** `data/processed/features.parquet`  
   **Idempotent:** Yes (deterministic transforms; seed fixed)  
   **Logging:** `logs/clean.log`  
   **Checkpoint:** `checkpoints/clean.ok`

4. **Train Model**  
   **Input:** `data/processed/features.parquet`, `config/train.yaml`  
   **Output:** `models/model.pkl`, `reports/train_metrics.json`  
   **Idempotent:** Yes if inputs/params unchanged (content hashes)  
   **Logging:** `logs/train.log`  
   **Checkpoint:** `checkpoints/train.ok`

5. **Evaluate & Drift Checks**  
   **Input:** `models/model.pkl`, held-out data  
   **Output:** `reports/eval_metrics.json`, `reports/drift_report.md`  
   **Idempotent:** Yes  
   **Logging:** `logs/eval.log`  
   **Checkpoint:** `checkpoints/eval.ok`

6. **Package for Serving**  
   **Input:** `models/model.pkl`  
   **Output:** `artifact/model_v{hash}/manifest.json` (hash = data+params)  
   **Idempotent:** Yes  
   **Logging:** `logs/package.log`  
   **Checkpoint:** `checkpoints/package.ok`

7. **Report / Deliver**  
   **Input:** metrics + templates  
   **Output:** `deliverables/report.md` (+ images)  
   **Idempotent:** Yes  
   **Logging:** `logs/report.log`  
   **Checkpoint:** `checkpoints/report.ok`

## B. Dependencies (DAG) — Inline Rendering (No Files Saved)

In [None]:
import matplotlib.pyplot as plt

# Render a simple DAG inline (no files written)
fig, ax = plt.subplots(figsize=(8, 4))
ax.set_axis_off()

def _box(ax, x, y, w, h, label):
    rect = plt.Rectangle((x, y), w, h, fill=False)
    ax.add_patch(rect)
    ax.text(x + w/2, y + h/2, label, ha="center", va="center")

def _arrow(ax, x1, y1, x2, y2):
    ax.annotate("", xy=(x2, y2), xytext=(x1, y1),
                arrowprops=dict(arrowstyle="->", lw=1.4))

x0, y0, w, h, dx = 0.02, 0.45, 0.15, 0.18, 0.16
labels = ["Ingest", "Validate", "Clean/Feature", "Train", "Evaluate", "Package", "Report"]
xs = [x0 + i*dx for i in range(len(labels))]

for xi, lbl in zip(xs, labels):
    _box(ax, xi, y0, w, h, lbl)
for i in range(len(labels)-1):
    _arrow(ax, xs[i]+w, y0+h/2, xs[i+1], y0+h/2)

ax.set_xlim(0, 1); ax.set_ylim(0, 1)
plt.tight_layout()
plt.show()

print("DAG rendered inline.")

## C. Reliability Patterns — In-Memory Logging, Checkpoints, Idempotency

In [None]:
import time, json, hashlib
from typing import Dict, Any, Callable, Tuple

# In-memory "stores" so nothing is written to disk:
MEM_LOGS: Dict[str, list] = {}
MEM_CHECKPOINTS: Dict[str, bool] = {}

def write_ok(task: str) -> None:
    MEM_CHECKPOINTS[task] = True

def is_ok(task: str) -> bool:
    return MEM_CHECKPOINTS.get(task, False)

def mem_log(task: str, message: str) -> None:
    MEM_LOGS.setdefault(task, []).append(f"{time.time():.3f} {message}")

def content_hash(obj: Any) -> str:
    payload = json.dumps(obj, sort_keys=True, default=str).encode("utf-8")
    return hashlib.sha1(payload).hexdigest()[:8]

def run_task(task: str, fn: Callable[[], None], deps: Tuple[str, ...] = ()) -> None:
    print(f"[{task}] start")
    for d in deps:
        assert is_ok(d), f"Upstream checkpoint missing: {d}.ok"
    mem_log(task, "start")
    fn()
    write_ok(task)
    mem_log(task, "done")
    print(f"[{task}] done -> checkpoint set")

# Demo bodies (replace with real functions)
def task_ingest():   time.sleep(0.05)
def task_validate(): time.sleep(0.05)
def task_clean():    time.sleep(0.05)

run_task("ingest", task_ingest)
run_task("validate", task_validate, deps=("ingest",))
run_task("clean", task_clean, deps=("validate",))

print("In-memory logs:", {k: len(v) for k,v in MEM_LOGS.items()})
print("Checkpoints set:", [k for k,v in MEM_CHECKPOINTS.items() if v])
print("Example content hash (params):", content_hash({"seed": 42, "scale": True}))

## D. Failure Points & Retries

- **Schema/CSV drift** → fail fast in **Validate**; retry **0–2** times after upstream fix.  
- **Null spikes/parse errors** in **Clean** → quarantine & skip bad rows; retry once.  
- **Training instability** → retry with fallback seed; halt with diagnostics if persists.  
- **Disk/IO/network** → up to **3** retries with exponential backoff.  
- **Threshold breach** in **Evaluate** (e.g., AUC drop/drift) → stop; open ticket; human review before proceeding.

In [None]:
import random, time

def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 0.2, jitter: float = 0.1):
    """
    Execute fn() with exponential backoff and jitter.
    Returns the return value of fn() or raises the last exception.
    """
    attempt = 0
    while True:
        try:
            return fn()
        except Exception as e:
            if attempt >= max_retries:
                print(f"Retries exhausted at attempt {attempt}.")
                raise
            sleep_s = base_delay * (2 ** attempt) + random.uniform(0, jitter)
            print(f"Attempt {attempt} failed: {e}. Backing off {sleep_s:.2f}s")
            time.sleep(sleep_s)
            attempt += 1

# Demo: retry a flaky function (20% failure rate)
def flaky():
    if random.random() < 0.2:
        raise RuntimeError("flaked")
    return "ok"

print("retry_with_backoff result:", retry_with_backoff(flaky))

## E. What to Automate Now vs. Manual

- **Automate now:** Ingest, Validate, Clean/Feature, Train, Evaluate (deterministic; benefit from re-runs).  
- **Manual for now:** Final **Report** polishing; **Package** promotion to prod artifact registry (human gate).  
**Rationale:** Prioritize reliability and speed for high-churn steps; keep judgment steps human-reviewed.

## F. Runbook Snippets

- **Re-run failed task:** Verify upstream checkpoints exist; clear partial outputs; re-run with same params.  
- **Backfill:** Use date-partitioned Ingest; idempotent tasks + content hashes keep outputs consistent.  
- **Rollback:** If Evaluate thresholds are violated downstream, block Package and revert to last good artifact hash.

## Deliverables Checklist (Stage 15)

- **Pipeline decomposition** (4–8 tasks with boundaries) ✔️  
- **DAG** (dependencies) rendered inline ✔️  
- **Reliability**: logging, checkpoints, idempotency patterns (in-memory demo) ✔️  
- **Failure & retries** policy + backoff utility ✔️  
- **Right-sized automation** (now vs manual) ✔️  
- **Runbook snippets** ✔️