# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [1]:
from pathlib import Path
import pandas as pd
tasks = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report', 'deploy'],
    'inputs': ['/data/raw.ext', 'prices_raw.json', 'prices_clean.json', 'model.json', 'report.txt'],
    'outputs': ['prices_raw.json', 'prices_clean.json', 'model.json', 'report.txt', 'deployed_model.json'],
    'idempotent': [True, True, True, True, True]
})
tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,/data/raw.ext,prices_raw.json,True
1,clean,prices_raw.json,prices_clean.json,True
2,train_or_score,prices_clean.json,model.json,True
3,report,model.json,report.txt,True
4,deploy,report.txt,deployed_model.json,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [2]:
dag = {
    'ingest': [],
    'clean': ['ingest'],
    'train_or_score': ['clean'],
    'report': ['train_or_score'],
    'deploy': ['report']
}
dag

{'ingest': [],
 'clean': ['ingest'],
 'train_or_score': ['clean'],
 'report': ['train_or_score'],
 'deploy': ['report']}

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [3]:
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report', 'deploy'],
    'log_messages': [
        'start/end, rows, source URI',
        'start/end, rows in/out',
        'params, metrics',
        'artifact path',
        'deployment path'
    ],
    'checkpoint_artifact': [
        'prices_raw.json',
        'prices_clean.json',
        'model.json',
        'report.txt',
        'deployed_model.json'
    ]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, rows, source URI",prices_raw.json
1,clean,"start/end, rows in/out",prices_clean.json
2,train_or_score,"params, metrics",model.json
3,report,artifact path,report.txt
4,deploy,deployment path,deployed_model.json


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

Automatable:
Tasks that are repetitive, deterministic, and well-defined, such as ingest, clean, train_or_score, report, and deploy. These steps can log progress, save checkpoints, and generate outputs automatically.

Manual:
Tasks requiring human oversight or decision-making, such as validating data sources, reviewing model metrics, approving reports, and authorizing production deployment. These ensure quality and correctness where automation alone may not suffice.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [None]:
import json, os
os.makedirs('/Users/aditya/Documents/bootcamp_aditya_shah/homework/stage15_orchestration_and_system_design/data/raw', exist_ok=True)

dummy_data = [
    {"date": "2025-01-01", "price": 100},
    {"date": "2025-01-02", "price": 102},
]

with open('/Users/aditya/Documents/bootcamp_aditya_shah/homework/stage15_orchestration_and_system_design/data/raw/prices_raw.json', 'w') as f:
    json.dump(dummy_data, f, indent=2)


In [None]:
import argparse, json, logging, sys
from datetime import datetime
from pathlib import Path
import pandas as pd

def clean_task(input_path: str, output_path: str) -> None:
    """Clean raw data: read → transform → write JSON."""
    logging.info('[clean_task] start')
    
    # Read raw data
    df = pd.read_json(input_path)
    
    # Simple cleaning logic example: drop duplicates, fill missing values
    df_clean = df.drop_duplicates().fillna(0)
    
    # Save cleaned data
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    df_clean.to_json(output_path, orient='records', indent=2)
    
    logging.info('[clean_task] wrote %s, rows in/out: %d/%d', 
                 output_path, len(df), len(df_clean))

def main(argv=None):
    parser = argparse.ArgumentParser(description='Clean task wrapper')
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args(argv)
    
    logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)])
    clean_task(args.input, args.output)

if __name__ == "__main__":
    # Example simulated CLI
    main(['--input', '/Users/aditya/Documents/bootcamp_aditya_shah/homework/stage15_orchestration_and_system_design/data/raw/prices_raw.json', '--output', '/Users/aditya/Documents/bootcamp_aditya_shah/homework/stage15_orchestration_and_system_design/data/processed/prices_clean.json'])


INFO:root:[clean_task] start
INFO:root:[clean_task] wrote /Users/aditya/Documents/bootcamp_aditya_shah/homework/stage15_orchestration_and_system_design/data/prices_clean.json, rows in/out: 2/2


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [None]:
import time
import logging

def retry(n_tries=3, delay=0.2):
    """Retry a function up to n_tries with linear backoff (delay increases each retry)."""
    def decorator(fn):
        def wrapped(*args, **kwargs):
            last_exception = None
            for i in range(1, n_tries + 1):
                try:
                    logging.info('[retry] attempt %d for %s', i, fn.__name__)
                    return fn(*args, **kwargs)
                except Exception as e:
                    logging.warning('[retry] failed attempt %d: %s', i, e)
                    last_exception = e
                    time.sleep(delay * i)  # linear backoff
            logging.error('[retry] all %d attempts failed for %s', n_tries, fn.__name__)
            raise last_exception
        return wrapped
    return decorator
