In [1]:
from pathlib import Path
import pandas as pd

tasks = pd.DataFrame({
    'task': [
        'ingest',           # pull raw prices from API
        'clean',            # preprocess / validate / save preprocessed
        'train_or_score',   # train (or load) model and score holdout
        'report'            # write stakeholder artifacts
    ],
    'inputs': [
        '.env (TICKER, date range)',                                  # ingest
        'data/raw/api_yfinance_AAPL_*.csv',                            # clean
        'data/processed/prices_with_tech_features_model*.csv',         # train_or_score
        'reports/full_analysis_metrics.csv'                            # report
    ],
    'outputs': [
        'data/raw/api_yfinance_AAPL_<ts>.csv',                         # ingest
        'data/processed/prices_preprocessed_<ts>.csv',                 # clean
        'model/model.pkl  (plus reports/holdout_predictions.csv)',     # train_or_score
        'reports/summary.md  (plus reports/threshold_sweep.png)'       # report
    ],
    # Idempotent if we fix seeds + write timestamped outputs; train is deterministic with our pipeline.
    'idempotent': [True, True, True, True]
})
tasks


Unnamed: 0,task,inputs,outputs,idempotent
0,ingest,".env (TICKER, date range)",data/raw/api_yfinance_AAPL_<ts>.csv,True
1,clean,data/raw/api_yfinance_AAPL_*.csv,data/processed/prices_preprocessed_<ts>.csv,True
2,train_or_score,data/processed/prices_with_tech_features_model...,model/model.pkl (plus reports/holdout_predict...,True
3,report,reports/full_analysis_metrics.csv,reports/summary.md (plus reports/threshold_sw...,True


In [2]:
# Describe dependencies and paste a small diagram if you have one.
dag = {
    'ingest': [],
    'clean': ['ingest'],
    'train_or_score': ['clean'],
    'report': ['train_or_score']
}
dag


{'ingest': [],
 'clean': ['ingest'],
 'train_or_score': ['clean'],
 'report': ['train_or_score']}

In [3]:
# Specify what you will log and where you will checkpoint for each task.
logging_plan = pd.DataFrame({
    'task': ['ingest', 'clean', 'train_or_score', 'report'],
    'log_messages': [
        'start/end, params (ticker, dates), source, rows downloaded',
        'start/end, input file, rows in/out, NA counts, schema hash',
        'seed, features, threshold, holdout size, metrics (acc, f1, auc)',
        'artifact paths written, figure names, summary bytes'
    ],
    'checkpoint_artifact': [
        'data/raw/api_yfinance_AAPL_<ts>.csv',
        'data/processed/prices_preprocessed_<ts>.csv',
        'model/model.pkl  & reports/holdout_predictions.csv',
        'reports/summary.md  & reports/threshold_sweep.png'
    ]
})
logging_plan


Unnamed: 0,task,log_messages,checkpoint_artifact
0,ingest,"start/end, params (ticker, dates), source, row...",data/raw/api_yfinance_AAPL_<ts>.csv
1,clean,"start/end, input file, rows in/out, NA counts,...",data/processed/prices_preprocessed_<ts>.csv
2,train_or_score,"seed, features, threshold, holdout size, metri...",model/model.pkl & reports/holdout_predictions...
3,report,"artifact paths written, figure names, summary ...",reports/summary.md & reports/threshold_sweep.png


- **Automate now**: clean → train_or_score → report via one CLI (or Flask /run_full_analysis). These steps are deterministic, already coded, and write timestamped artifacts.

- **Keep manual (for course scope)**: ingest (API limits/creds/one-off date ranges). It’s easy to re-run as needed, and manual control avoids accidental rate-limit hits.

- **Retry policy**: If clean detects schema/NA issues, stop and log; if train_or_score AUC < 0.55, keep last good model.pkl and flag in logs; report only runs if metrics exist.

- **Idempotency**: All tasks write new, timestamped outputs; training uses a fixed random seed via scikit-learn defaults + pipeline, so re-runs are repeatable.

In [None]:
import argparse, json, logging, sys
from datetime import datetime
from pathlib import Path
import pandas as pd

def my_task(input_path: str, output_path: str) -> None:
    """Example task: read metrics → write summary.md (stakeholder-friendly)."""
    logging.info('[my_task] start: reading %s', input_path)
    p = Path(input_path)
    if not p.exists():
        raise FileNotFoundError(f"Metrics file not found: {p}")

    df = pd.read_csv(p)
    row = df.iloc[0].to_dict()

    summary = f"""# Stakeholder Summary (Auto-generated)

**Run at:** {datetime.utcnow().isoformat()}Z  
**Source metrics:** {p.as_posix()}

## Key Metrics
- Accuracy: {row.get('accuracy', 'n/a'):.3f}
- Precision: {row.get('precision', 'n/a'):.3f}
- Recall: {row.get('recall', 'n/a'):.3f}
- F1: {row.get('f1', 'n/a'):.3f}
- ROC AUC: {row.get('roc_auc', 'n/a'):.3f}

## Notes
- Threshold and features documented inside model bundle.
- See `reports/threshold_sweep.png` for precision/recall trade-offs.
"""

    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(summary, encoding="utf-8")
    logging.info('[my_task] wrote %s', out)

def main(argv=None):
    parser = argparse.ArgumentParser(description='Homework Stage 15 – Refactor demo')
    parser.add_argument('--input', required=True, help='Path to metrics CSV (e.g., reports/full_analysis_metrics.csv)')
    parser.add_argument('--output', required=True, help='Path to summary file (e.g., reports/summary.md)')
    args = parser.parse_args(argv)

    logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)],
                        format='%(asctime)s %(levelname)s %(message)s')
    my_task(args.input, args.output)

if __name__ == '__main__':
    main(['--input', '../reports/full_analysis_metrics.csv', '--output', '../reports/summary.md'])


2025-08-28 17:22:40,107 INFO [my_task] start: reading ../reports/full_analysis_metrics.csv
2025-08-28 17:22:40,110 INFO [my_task] wrote ../reports/summary.md
