# Stage 15 — Orchestration & System Design (Conceptual Overview)
This notebook demonstrates orchestration **concepts**: modular tasks, explicit I/O, logging, checkpoints, a tiny DAG runner, and a CLI-friendly entrypoint.

**Note:** We are *not* teaching Airflow/Prefect. We simulate the ideas in pure Python.

In [None]:
from __future__ import annotations
import argparse, json, logging, time, sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Callable

BASE = Path.cwd() / 'stage15_demo'
DATA = BASE / 'data'
LOGS = BASE / 'logs'
REPORTS = BASE / 'reports'
for p in [DATA, LOGS, REPORTS]:
    p.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler(LOGS / 'pipeline.log')
    ]
)
logger = logging.getLogger(__name__)
logger.info('Folders ready at %s', BASE)

In [None]:
@dataclass
class TaskResult:
    name: str
    ok: bool
    output_path: str | None = None
    message: str = ''

def write_json(path: Path, obj: dict):
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(obj, indent=2))

def checkpoint_exists(path: Path) -> bool:
    return path.exists()

In [None]:
def ingest_prices(output_path: Path) -> TaskResult:
    logger.info('[ingest_prices] start')
    time.sleep(0.1)
    data = {'timestamp': datetime.utcnow().isoformat(), 'prices': [100.0, 101.2, 99.8, 102.1]}
    write_json(output_path, data)
    logger.info('[ingest_prices] wrote %s', output_path)
    return TaskResult('ingest_prices', True, str(output_path))

def clean_prices(input_path: Path, output_path: Path) -> TaskResult:
    logger.info('[clean_prices] start')
    raw = json.loads(Path(input_path).read_text())
    prices = [p for p in raw['prices'] if p is not None]
    cleaned = {'timestamp': raw['timestamp'], 'prices': prices, 'mean': sum(prices)/len(prices)}
    write_json(output_path, cleaned)
    logger.info('[clean_prices] wrote %s', output_path)
    return TaskResult('clean_prices', True, str(output_path))

def fit_dummy_model(clean_path: Path, model_path: Path) -> TaskResult:
    logger.info('[fit_dummy_model] start')
    clean = json.loads(Path(clean_path).read_text())
    model = {'type': 'mean_predictor', 'mean': clean['mean']}
    write_json(model_path, model)
    logger.info('[fit_dummy_model] wrote %s', model_path)
    return TaskResult('fit_dummy_model', True, str(model_path))

def generate_report(model_path: Path, report_path: Path) -> TaskResult:
    logger.info('[generate_report] start')
    model = json.loads(Path(model_path).read_text())
    text = f'Report generated at {datetime.utcnow().isoformat()}\nModel: {model}\n'
    report_path.write_text(text)
    logger.info('[generate_report] wrote %s', report_path)
    return TaskResult('generate_report', True, str(report_path))

In [None]:
DAG: Dict[str, List[str]] = {
    'ingest': [],
    'clean': ['ingest'],
    'fit': ['clean'],
    'report': ['fit']
}

def topo_sort(dag: Dict[str, List[str]]) -> List[str]:
    pending = {k: set(v) for k, v in dag.items()}
    done = []
    while pending:
        ready = [k for k, deps in pending.items() if not deps]
        if not ready:
            raise ValueError('Cycle detected or unsatisfied deps')
        for r in ready:
            done.append(r)
            pending.pop(r)
            for deps in pending.values():
                deps.discard(r)
    return done

def retry(n_tries=3, delay=0.2):
    def wrap(fn: Callable, *args, **kwargs):
        for i in range(1, n_tries+1):
            try:
                return fn(*args, **kwargs)
            except Exception as e:
                logger.exception('Attempt %s failed: %s', i, e)
                time.sleep(delay * i)
        raise RuntimeError(f'Task failed after {n_tries} attempts')
    return wrap

def run_pipeline(skip_existing=True):
    ingest_out = DATA / 'prices_raw.json'
    clean_out = DATA / 'prices_clean.json'
    model_out = DATA / 'model.json'
    report_out = REPORTS / 'report.txt'

    order = topo_sort(DAG)
    logger.info('Execution order: %s', order)

    if not (skip_existing and checkpoint_exists(ingest_out)):
        retry()(ingest_prices, ingest_out)
    else:
        logger.info('[ingest] checkpoint hit: %s', ingest_out)

    if not (skip_existing and checkpoint_exists(clean_out)):
        retry()(clean_prices, ingest_out, clean_out)
    else:
        logger.info('[clean] checkpoint hit: %s', clean_out)

    if not (skip_existing and checkpoint_exists(model_out)):
        retry()(fit_dummy_model, clean_out, model_out)
    else:
        logger.info('[fit] checkpoint hit: %s', model_out)

    retry()(generate_report, model_out, report_out)
    return str(report_out)

logger.info('Pipeline ready.')

In [None]:
def main(argv=None):
    parser = argparse.ArgumentParser(description='Stage15 pipeline demo')
    parser.add_argument('--run-all', action='store_true', help='Run the full pipeline')
    parser.add_argument('--no-skip', action='store_true', help='Ignore checkpoints')
    args = parser.parse_args(argv)
    if args.run_all:
        out = run_pipeline(skip_existing=not args.no_skip)
        logger.info('Report at: %s', out)
    else:
        logger.info('Use --run-all to execute the full demo pipeline.')

if __name__ == '__main__':
    main(['--run-all'])