Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/stage-contracts.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ the machine-readable saved-run overlay for the stage taxonomy. It records the
canonical stages, status for the current run, artifact paths, diagnostics owned
by each stage, and the current resume posture.

## Legacy run-contract IDs

Older run-contract summaries and dashboard payloads used operational labels
such as `preflight`, `seed_build`, `donor_integration`,
`policyengine_materialization`, `calibration`, and `finalization`. New saved-run
views should report the canonical 9-stage IDs while preserving the old labels as
legacy provenance when present.

`canonicalize_us_pipeline_stage_id` maps those historical IDs into the stage
registry. The dashboard applies that mapping when reading `run_summary.json`, so
old and new runs sort into the same stage taxonomy instead of creating a second
parallel lifecycle.

## Resume artifacts

The first implementation is explicit rather than automatic. It writes reusable
Expand Down
55 changes: 51 additions & 4 deletions src/microplex_us/pipelines/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from pathlib import Path
from typing import Any

from microplex_us.pipelines.stage_contracts import (
canonicalize_us_pipeline_stage_id,
)

_ROOT = Path(__file__).resolve().parents[3]
_DEFAULT_ARTIFACT_ROOT = _ROOT / "artifacts"
_DEFAULT_OUTPUT_PATH = _DEFAULT_ARTIFACT_ROOT / "microplex_dashboard_current.json"
Expand Down Expand Up @@ -179,6 +183,14 @@ def collect_run_contracts(artifact_root: str | Path) -> list[dict[str, Any]]:
if not isinstance(summary, dict):
continue
manifest = _read_json(path.parent / "run_manifest.json") or {}
completed_stages = _canonicalize_run_contract_stage_list(
summary.get("completed_stages") or []
)
legacy_completed_stages = [
stage
for stage in summary.get("completed_stages") or []
if canonicalize_us_pipeline_stage_id(str(stage)) != stage
]
contracts.append(
{
"artifact_dir": str(path.parent),
Expand All @@ -189,15 +201,22 @@ def collect_run_contracts(artifact_root: str | Path) -> list[dict[str, Any]]:
"run_id": summary.get("run_id") or manifest.get("run_id"),
"attempt_id": summary.get("attempt_id") or manifest.get("attempt_id"),
"status": summary.get("status"),
"active": summary.get("active"),
"active": _canonicalize_run_contract_stage_ref(
summary.get("active")
),
"started_at": summary.get("started_at"),
"updated_at": summary.get("updated_at"),
"failed_at": summary.get("failed_at"),
"completed_at": summary.get("completed_at"),
"failed_event_id": summary.get("failed_event_id"),
"failure": summary.get("failure"),
"restart": summary.get("restart"),
"completed_stages": summary.get("completed_stages") or [],
"failure": _canonicalize_run_contract_stage_ref(
summary.get("failure")
),
"restart": _canonicalize_run_contract_stage_ref(
summary.get("restart")
),
"completed_stages": completed_stages,
"legacy_completed_stages": legacy_completed_stages,
}
)
return sorted(
Expand All @@ -207,6 +226,34 @@ def collect_run_contracts(artifact_root: str | Path) -> list[dict[str, Any]]:
)


def _canonicalize_run_contract_stage_list(value: Any) -> list[str]:
canonical_stages: list[str] = []
seen: set[str] = set()
if not isinstance(value, list | tuple):
return canonical_stages
for item in value:
canonical = canonicalize_us_pipeline_stage_id(str(item))
if canonical in seen:
continue
seen.add(canonical)
canonical_stages.append(canonical)
return canonical_stages


def _canonicalize_run_contract_stage_ref(value: Any) -> Any:
if not isinstance(value, dict):
return value
ref = dict(value)
stage_id = ref.get("stage_id")
if stage_id is None:
return ref
canonical = canonicalize_us_pipeline_stage_id(str(stage_id))
if canonical != stage_id and "legacy_stage_id" not in ref:
ref["legacy_stage_id"] = stage_id
ref["stage_id"] = canonical
return ref


def collect_mp300k_artifact_gate_reports(
artifact_root: str | Path,
) -> list[dict[str, Any]]:
Expand Down
53 changes: 53 additions & 0 deletions src/microplex_us/pipelines/stage_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,56 @@
"post_artifact_evidence",
]

US_CANONICAL_STAGE_IDS = (
"01_run_profile",
"02_source_loading",
"03_source_planning",
"04_seed_scaffold",
"05_donor_integration_synthesis",
"06_policyengine_entities",
"07_calibration",
"08_dataset_assembly",
"09_validation_benchmarking",
)

US_LEGACY_STAGE_ID_ALIASES = {
# Historical run_contract.py IDs from the US Microplex build path.
"preflight": "01_run_profile",
"source_loading": "02_source_loading",
"source_planning": "03_source_planning",
"seed_scaffold": "04_seed_scaffold",
"seed_build": "05_donor_integration_synthesis",
"donor_integration": "05_donor_integration_synthesis",
"synthesis": "05_donor_integration_synthesis",
"support_enforcement": "05_donor_integration_synthesis",
"policyengine_materialization": "06_policyengine_entities",
"target_build": "07_calibration",
"calibration": "07_calibration",
"dataset_assembly": "08_dataset_assembly",
"finalization": "08_dataset_assembly",
"validation": "09_validation_benchmarking",
"benchmark": "09_validation_benchmarking",
"scoring": "09_validation_benchmarking",
"policyengine_native_scores": "09_validation_benchmarking",
# Historical PE-US-data parity plan IDs used in Microplex docs/snapshots.
"source-contracts": "02_source_loading",
"cps-construction": "02_source_loading",
"puf-ingestion-uprating": "02_source_loading",
"extended-cps-qrf": "05_donor_integration_synthesis",
"family-imputation-parity": "05_donor_integration_synthesis",
"entity-export-parity": "06_policyengine_entities",
"weighting-backend": "07_calibration",
"targets-and-eval": "09_validation_benchmarking",
}


def canonicalize_us_pipeline_stage_id(stage_id: str) -> str:
"""Return the canonical US runtime stage ID for a current or legacy ID."""

if stage_id in US_CANONICAL_STAGE_IDS:
return stage_id
return US_LEGACY_STAGE_ID_ALIASES.get(stage_id, stage_id)


@dataclass(frozen=True)
class USStageArtifactContract:
Expand Down Expand Up @@ -435,10 +485,13 @@ def serialize_us_pipeline_stage_contracts() -> dict[str, object]:


__all__ = [
"US_CANONICAL_STAGE_IDS",
"US_LEGACY_STAGE_ID_ALIASES",
"US_STAGE_CONTRACT_VERSION",
"USPipelineStageContract",
"USStageArtifactContract",
"USStageValidationContract",
"canonicalize_us_pipeline_stage_id",
"default_us_pipeline_stage_contracts",
"get_us_pipeline_stage_contract",
"serialize_us_pipeline_stage_contracts",
Expand Down
23 changes: 20 additions & 3 deletions tests/pipelines/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,15 @@ def test_dashboard_payload_reads_run_contract_summaries(tmp_path):
"run_id": "contracted-run",
"attempt_id": "attempt-1",
"status": "running",
"active": None,
"active": {"stage_id": "policyengine_materialization"},
"started_at": "2026-05-28T00:00:00+00:00",
"updated_at": "2026-05-28T00:00:01+00:00",
"completed_stages": ["preflight"],
"completed_stages": ["preflight", "target_build", "calibration"],
"failure": {"stage_id": "donor_integration"},
"restart": {
"stage_id": "policyengine_materialization",
"checkpoint_ref": "checkpoint:post_microsim",
},
}
)
)
Expand All @@ -431,7 +436,19 @@ def test_dashboard_payload_reads_run_contract_summaries(tmp_path):
assert contracts[0]["status_source"] == "contract"
assert contracts[0]["run_id"] == "contracted-run"
assert contracts[0]["status"] == "running"
assert contracts[0]["completed_stages"] == ["preflight"]
assert contracts[0]["active"]["stage_id"] == "06_policyengine_entities"
assert contracts[0]["active"]["legacy_stage_id"] == "policyengine_materialization"
assert contracts[0]["failure"]["stage_id"] == "05_donor_integration_synthesis"
assert contracts[0]["restart"]["stage_id"] == "06_policyengine_entities"
assert contracts[0]["completed_stages"] == [
"01_run_profile",
"07_calibration",
]
assert contracts[0]["legacy_completed_stages"] == [
"preflight",
"target_build",
"calibration",
]


def test_dashboard_payload_reads_mp300k_artifact_gate_reports(tmp_path):
Expand Down
13 changes: 13 additions & 0 deletions tests/pipelines/test_stage_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from microplex_us.pipelines.stage_contracts import (
canonicalize_us_pipeline_stage_id,
default_us_pipeline_stage_contracts,
get_us_pipeline_stage_contract,
serialize_us_pipeline_stage_contracts,
Expand Down Expand Up @@ -56,3 +57,15 @@ def test_serialize_us_pipeline_stage_contracts_is_json_ready():
assert payload["contractVersion"] == "us-runtime-stages-v1"
assert len(payload["stages"]) == 9
assert payload["stages"][5]["id"] == "06_policyengine_entities"


def test_canonicalize_us_pipeline_stage_id_maps_legacy_runtime_ids():
assert (
canonicalize_us_pipeline_stage_id("policyengine_materialization")
== "06_policyengine_entities"
)
assert canonicalize_us_pipeline_stage_id("target_build") == "07_calibration"
assert canonicalize_us_pipeline_stage_id("finalization") == "08_dataset_assembly"
assert canonicalize_us_pipeline_stage_id("benchmark") == "09_validation_benchmarking"
assert canonicalize_us_pipeline_stage_id("08_dataset_assembly") == "08_dataset_assembly"
assert canonicalize_us_pipeline_stage_id("custom-stage") == "custom-stage"
Loading