# Week 6 — Part 01: From scripts to pipelines (stages + artifacts)

**Estimated time:** 75–120 minutes

## Learning Objectives

- Define pipeline stages with clear inputs/outputs
- Understand how artifacts improve reproducibility and debugging
- Design stage contracts (inputs, outputs, invariants)
- Implement a simple pipeline skeleton with explicit stages


## Overview

A pipeline is a sequence of stages.

Each stage should have:

- clear inputs
- clear outputs
- a single responsibility

This structure improves:

- debugging (you can isolate failures)
- reproducibility (you save intermediate artifacts)

---

## Underlying theory: pipelines make dataflow explicit

You can view a pipeline as a composition of functions:

$$
Y = (f_k \circ f_{k-1} \circ \cdots \circ f_1)(X)
$$

Each stage should have a small, testable contract:

- **inputs**: what files/values it needs
- **outputs**: what artifacts it produces
- **invariants**: what must be true after it runs (schema, counts, non-empty, etc.)

Practical implication:

- a bug is easier to locate because you can bisect stages
- you can cache/reuse artifacts (don’t redo expensive work unnecessarily)
- you can re-run only the stage you changed (faster iteration)

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Any


@dataclass
class Stage:
    name: str
    run: Callable[[dict], dict]
    outputs: list[str]


def run_pipeline(stages: list[Stage], context: dict) -> dict:
    for stage in stages:
        print(f"running stage: {stage.name}")
        context = stage.run(context)
        for key in stage.outputs:
            if key not in context:
                raise RuntimeError(f"stage {stage.name} did not produce output: {key}")
    return context


# Example stage stubs (replace with real logic)

def load_stage(ctx: dict) -> dict:
    ctx["loaded"] = "loaded_table_placeholder"
    return ctx


def profile_stage(ctx: dict) -> dict:
    ctx["profile"] = {"rows": 100, "cols": 5}
    return ctx


def compress_stage(ctx: dict) -> dict:
    ctx["compressed"] = {"sample_rows": 5}
    return ctx


def llm_stage(ctx: dict) -> dict:
    ctx["llm_raw"] = "raw text"
    ctx["llm_validated"] = {"summary": "..."}
    return ctx


def report_stage(ctx: dict) -> dict:
    ctx["report"] = {"title": "Demo Report"}
    return ctx


pipeline = [
    Stage("load", load_stage, ["loaded"]),
    Stage("profile", profile_stage, ["profile"]),
    Stage("compress", compress_stage, ["compressed"]),
    Stage("llm", llm_stage, ["llm_raw", "llm_validated"]),
    Stage("report", report_stage, ["report"]),
]

final_ctx = run_pipeline(pipeline, {})
print("final keys:", sorted(final_ctx.keys()))

## Suggested capstone stages

Make the contract explicit for each stage:

1. **Load**
   - Inputs: `data/*.csv`
   - Outputs: loaded table or `output/loaded.parquet`
   - Pitfalls: dtype drift, missing columns, encoding issues

2. **Profile**
   - Outputs: `output/profile.json` (and optionally `output/profile.md`)

3. **Compress**
   - Outputs: `output/compressed_input.json`
   - Goal: fit key info into the context window

4. **LLM**
   - Outputs: `output/llm_raw.txt` and `output/llm_validated.json`

5. **Report**
   - Outputs: `output/report.json` and `output/report.md`

Rule of thumb: if a stage fails, you should still have the previous stage’s artifact saved.

In [None]:
from pathlib import Path


def write_artifact(path: Path, data: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(data, encoding="utf-8")


def stage_with_artifact(name: str, out_path: Path, payload: str) -> dict:
    write_artifact(out_path, payload)
    return {name: str(out_path)}


ctx = {}
ctx.update(stage_with_artifact("profile", Path("output/profile.json"), "{}"))
ctx.update(stage_with_artifact("compressed", Path("output/compressed_input.json"), "{}"))
print(ctx)

In [None]:
def validate_stage_outputs(ctx: dict, required: list[str]) -> None:
    missing = [k for k in required if k not in ctx]
    if missing:
        raise ValueError(f"missing outputs: {missing}")


# TODO: add schema checks (e.g., profile has expected keys)
print("Implement schema checks in validate_stage_outputs().")

## Self-check

- If the LLM call fails, do you still have the profiling artifact?
- Can you re-run only the LLM stage with the saved compressed input?

## References

- Twelve-Factor logs/config mindset: https://12factor.net/