# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [9]:
from pathlib import Path
import pandas as pd
tasks = pd.DataFrame({
    'task': ['ingest', 'clean', 'feature_eng', 'train', 'evaluate', 'report'],
    'inputs': ['raw_data.csv', 'raw_data.csv', 'clean_data.csv', 'features.csv', 'model.pkl', 'results.json'],
    'outputs': ['raw_data.csv', 'clean_data.csv', 'features.csv', 'model.pkl', 'results.json', 'report.pdf'],
    'idempotent': [True, True, True, False, True, True]
})
tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,raw_data.csv,raw_data.csv,True
1,clean,raw_data.csv,clean_data.csv,True
2,feature_eng,clean_data.csv,features.csv,True
3,train,features.csv,model.pkl,False
4,evaluate,model.pkl,results.json,True
5,report,results.json,report.pdf,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [10]:
dag = {
    'ingest': [],
    'clean': ['ingest'],
    'feature_eng': ['clean'],
    'train': ['feature_eng'],
    'evaluate': ['train'],
    'report': ['evaluate']
}
dag

{'ingest': [],
 'clean': ['ingest'],
 'feature_eng': ['clean'],
 'train': ['feature_eng'],
 'evaluate': ['train'],
 'report': ['evaluate']}

Linear pipeline: ingest → clean → feature_eng → train → evaluate → report
No parallel tasks. Each step depends on the previous one.

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [11]:
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'feature_eng', 'train', 'evaluate', 'report'],
    'log_messages': [
        'start/end, rows loaded, source path',
        'start/end, rows before/after cleaning',
        'start/end, features created, output shape',
        'start/end, model type, training time',
        'start/end, metrics computed',
        'start/end, report generated'
    ],
    'checkpoint_artifact': [
        'raw_data.csv',
        'clean_data.csv',
        'features.csv',
        'model.pkl',
        'results.json',
        'report.pdf'
    ]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, rows loaded, source path",raw_data.csv
1,clean,"start/end, rows before/after cleaning",clean_data.csv
2,feature_eng,"start/end, features created, output shape",features.csv
3,train,"start/end, model type, training time",model.pkl
4,evaluate,"start/end, metrics computed",results.json
5,report,"start/end, report generated",report.pdf


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

Automate now: ingest, clean, feature_eng. These are repeatable data steps.

Keep manual: train, evaluate, report. Training needs parameter tuning. Evaluation needs human review. Reports need custom analysis.

Reason: Start with stable data pipeline first. Add model automation later when requirements are clear.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [12]:
import argparse, json, logging, sys
from datetime import datetime
import pandas as pd

def clean_data(input_path: str, output_path: str) -> None:
    logging.info('[clean_data] start')
    df = pd.read_csv(input_path)
    initial_rows = len(df)
    df_clean = df.dropna()
    final_rows = len(df_clean)
    df_clean.to_csv(output_path, index=False)
    logging.info('[clean_data] %d -> %d rows, wrote %s', initial_rows, final_rows, output_path)

def main(argv=None):
    parser = argparse.ArgumentParser(description='Clean data task')
    parser.add_argument('--input', required=True)
    parser.add_argument('--output', required=True)
    args = parser.parse_args(argv)
    logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)])
    clean_data(args.input, args.output)

if __name__ == '__main__':
    main(['--input', 'data/raw.csv', '--output', 'data/clean.csv'])

INFO:root:[clean_data] start
INFO:root:[clean_data] 10 -> 10 rows, wrote data/clean.csv


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [13]:
import time
def retry(n_tries=3, delay=0.2):
    def wrapper(fn, *args, **kwargs):
        for attempt in range(n_tries):
            try:
                return fn(*args, **kwargs)
            except Exception as e:
                if attempt == n_tries - 1:
                    raise e
                time.sleep(delay * (attempt + 1))
        return fn(*args, **kwargs)
    return wrapper