# Homework Starter — Stage 15: Orchestration & System Design
Complete the sections below. Keep your answers concise and focused on orchestration readiness.

## 1) Project Task Decomposition
List 4–8 tasks. Add more rows as needed.

In [11]:
import pandas as pd

tasks = pd.DataFrame([
    {'task': 'ingest_data', 'inputs': 'raw_data_source_url', 'outputs': 'data/raw/raw_data.csv', 'idempotent': True},
    {'task': 'validate_data', 'inputs': 'data/raw/raw_data.csv', 'outputs': 'reports/validation_report.json', 'idempotent': True},
    {'task': 'clean_data', 'inputs': 'data/raw/raw_data.csv', 'outputs': 'data/processed/cleaned_data.csv', 'idempotent': True},
    {'task': 'feature_engineering', 'inputs': 'data/processed/cleaned_data.csv', 'outputs': 'data/features/features.csv', 'idempotent': True},
    {'task': 'train_model', 'inputs': 'data/features/features.csv', 'outputs': 'model/model.pkl', 'idempotent': False},
    {'task': 'evaluate_model', 'inputs': ('model/model.pkl', 'data/features/features.csv'), 'outputs': 'reports/evaluation_metrics.json', 'idempotent': True},
    {'task': 'generate_report', 'inputs': 'reports/evaluation_metrics.json', 'outputs': 'reports/final_report.md', 'idempotent': True}
])

tasks

Unnamed: 0,task,inputs,outputs,idempotent
0,ingest_data,raw_data_source_url,data/raw/raw_data.csv,True
1,validate_data,data/raw/raw_data.csv,reports/validation_report.json,True
2,clean_data,data/raw/raw_data.csv,data/processed/cleaned_data.csv,True
3,feature_engineering,data/processed/cleaned_data.csv,data/features/features.csv,True
4,train_model,data/features/features.csv,model/model.pkl,False
5,evaluate_model,"(model/model.pkl, data/features/features.csv)",reports/evaluation_metrics.json,True
6,generate_report,reports/evaluation_metrics.json,reports/final_report.md,True


## 2) Dependencies (DAG)
Describe dependencies and paste a small diagram if you have one.

In [12]:
dag = {
    'ingest_data': [],
    'validate_data': ['ingest_data'],
    'clean_data': ['ingest_data'],
    'feature_engineering': ['clean_data'],
    'train_model': ['feature_engineering'],
    'evaluate_model': ['train_model', 'feature_engineering'],
    'generate_report': ['evaluate_model']
}
dag

{'ingest_data': [],
 'validate_data': ['ingest_data'],
 'clean_data': ['ingest_data'],
 'feature_engineering': ['clean_data'],
 'train_model': ['feature_engineering'],
 'evaluate_model': ['train_model', 'feature_engineering'],
 'generate_report': ['evaluate_model']}

The dependency graph is mostly linear, but some tasks could be run in parallel.

**DAG Flow:**

`ingest_data` -> `validate_data` & `clean_data` (These two can run in parallel after ingestion).

`clean_data` -> `feature_engineering`

`feature_engineering` -> `train_model`

(`train_model`, `feature_engineering`) -> `evaluate_model`

`evaluate_model` -> `generate_report`

**Visual Sketch:**

[ingest] --> [validate]

|

+------> [clean] --> [features] --> [train] --> [evaluate] --> [report]

## 3) Logging & Checkpoints Plan
Specify what you will log and where you will checkpoint for each task.

In [13]:
logging_plan = pd.DataFrame([
    {'task': 'ingest_data', 'log_messages': 'Start/end time, source URL, number of records ingested.', 'checkpoint_artifact': 'data/raw/raw_data.csv'},
    {'task': 'validate_data', 'log_messages': 'Start/end time, validation status (pass/fail), list of validation errors.', 'checkpoint_artifact': 'reports/validation_report.json'},
    {'task': 'clean_data', 'log_messages': 'Start/end time, input/output row count, number of nulls handled.', 'checkpoint_artifact': 'data/processed/cleaned_data.csv'},
    {'task': 'feature_engineering', 'log_messages': 'Start/end time, list of features created, output data shape.', 'checkpoint_artifact': 'data/features/features.csv'},
    {'task': 'train_model', 'log_messages': 'Start/end time, model parameters, training duration, final training score.', 'checkpoint_artifact': 'model/model.pkl'},
    {'task': 'evaluate_model', 'log_messages': 'Start/end time, evaluation metrics (MAE, R2), metric thresholds passed/failed.', 'checkpoint_artifact': 'reports/evaluation_metrics.json'},
    {'task': 'generate_report', 'log_messages': 'Start/end time, path to generated report file.', 'checkpoint_artifact': 'reports/final_report.md'}
])
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest_data,"Start/end time, source URL, number of records ...",data/raw/raw_data.csv
1,validate_data,"Start/end time, validation status (pass/fail),...",reports/validation_report.json
2,clean_data,"Start/end time, input/output row count, number...",data/processed/cleaned_data.csv
3,feature_engineering,"Start/end time, list of features created, outp...",data/features/features.csv
4,train_model,"Start/end time, model parameters, training dur...",model/model.pkl
5,evaluate_model,"Start/end time, evaluation metrics (MAE, R2), ...",reports/evaluation_metrics.json
6,generate_report,"Start/end time, path to generated report file.",reports/final_report.md


## 4) Right-Sizing Automation
Which parts will you automate now? Which stay manual? Why?

For the current project phase, our automation strategy focuses on balancing efficiency with the need for human oversight.

**What to Automate Now:**
We will automate the core data processing and model training pipeline, from `ingest_data` through `evaluate_model`. These tasks are deterministic, repetitive, and form the backbone of our workflow. Automating them ensures consistency, reduces manual error, and allows the data science team to focus on analysis rather than execution. We can schedule this pipeline to run on a weekly basis to retrain the model with fresh data.

**What to Keep Manual (For Now):**
The final `generate_report` task and the subsequent decision to deploy the newly trained model into production will remain manual. The evaluation metrics from the `evaluate_model` step will be automatically generated, but a data scientist must review them to provide context and interpretation. This human-in-the-loop step is critical to catch subtle model degradations or anomalies that automated thresholds might miss. Model deployment should only occur after this manual sign-off to mitigate business risk.

## 5) (Stretch) Refactor One Task into a Function + CLI
Use the templates below.

In [15]:
import argparse
import json
import logging
import sys
import pandas as pd
from pathlib import Path
import numpy as np

def clean_data(input_path: str, output_path: str) -> None:
    """
    Reads raw data from input_path, performs simple cleaning, 
    and saves the result to output_path.
    
    Cleaning logic: Drops rows with any missing values.
    """
    logging.info(f'[clean_data] start: Cleaning data from {input_path}')
    
    try:
        df = pd.read_csv(input_path)
        rows_before = len(df)
        
        # Simple cleaning: drop rows with nulls
        df_cleaned = df.dropna()
        rows_after = len(df_cleaned)
        
        # Ensure output directory exists
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        df_cleaned.to_csv(output_path, index=False)
        
        logging.info(f'[clean_data] completed. Rows before: {rows_before}, rows after: {rows_after}.')
        logging.info(f'[clean_data] wrote cleaned data to {output_path}')
        
    except FileNotFoundError:
        logging.error(f"[clean_data] error: Input file not found at {input_path}")
        sys.exit(1)
    except Exception as e:
        logging.error(f"[clean_data] an unexpected error occurred: {e}")
        sys.exit(1)

def main(argv=None):
    """Parses command line arguments and runs the clean_data task."""
    parser = argparse.ArgumentParser(description='Data cleaning task wrapper')
    parser.add_argument('--input', required=True, help='Path to the raw input CSV file.')
    parser.add_argument('--output', required=True, help='Path to save the cleaned output CSV file.')
    args = parser.parse_args(argv)
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
    
    clean_data(args.input, args.output)

# To demonstrate, let's create a dummy input file first.
# In a real scenario, this would already exist from the 'ingest' step.
Path('data/raw').mkdir(parents=True, exist_ok=True)
dummy_df = pd.DataFrame({'A': [1, 2, np.nan, 4], 'B': [5, np.nan, 7, 8]})
dummy_df.to_csv('data/raw/dummy_raw.csv', index=False)

# Example of simulating the CLI call within the notebook:
print("--- Simulating CLI call ---")
main(['--input', 'data/raw/dummy_raw.csv', '--output', 'data/processed/dummy_cleaned.csv'])

--- Simulating CLI call ---
2025-08-28 02:12:03,209 - INFO - [clean_data] start: Cleaning data from data/raw/dummy_raw.csv
2025-08-28 02:12:03,216 - INFO - [clean_data] completed. Rows before: 4, rows after: 2.
2025-08-28 02:12:03,217 - INFO - [clean_data] wrote cleaned data to data/processed/dummy_cleaned.csv


### Optional: Simple Retry Wrapper (fill in)
Add a small retry with linear backoff to harden a task.

In [16]:
import time
import logging

def retry(n_tries=3, delay_s=1):
    """
    A simple decorator to retry a function if it fails.
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < n_tries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    if attempts < n_tries:
                        logging.warning(f"Attempt {attempts}/{n_tries} failed for {func.__name__}. Retrying in {delay_s}s... Error: {e}")
                        time.sleep(delay_s)
                    else:
                        logging.error(f"All {n_tries} attempts failed for {func.__name__}. Error: {e}")
                        raise  # Re-raise the last exception
        return wrapper
    return decorator

# --- Example Usage ---
@retry(n_tries=3, delay_s=0.5)
def sometimes_fail_task(attempt_to_succeed):
    """
    A dummy function that fails until a certain attempt number.
    """
    global attempt_counter
    attempt_counter += 1
    if attempt_counter < attempt_to_succeed:
        raise ValueError(f"Simulating failure on attempt {attempt_counter}")
    else:
        print(f"Succeeded on attempt {attempt_counter}!")
        return "Success"

# Test the retry wrapper
attempt_counter = 0
print("Testing retry wrapper (should succeed on 3rd try):")
sometimes_fail_task(attempt_to_succeed=3)

attempt_counter = 0
print("\nTesting retry wrapper (should fail all 3 times):")
try:
    sometimes_fail_task(attempt_to_succeed=4)
except ValueError as e:
    print(f"Caught expected exception after all retries: {e}")

Testing retry wrapper (should succeed on 3rd try):
Succeeded on attempt 3!

Testing retry wrapper (should fail all 3 times):
2025-08-28 02:12:13,888 - ERROR - All 3 attempts failed for sometimes_fail_task. Error: Simulating failure on attempt 3
Caught expected exception after all retries: Simulating failure on attempt 3
