Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 139 additions & 3 deletions src/microplex_us/pipelines/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def build_dashboard_payload(
actual_l0_runs = collect_actual_l0_objective_runs(artifact_root)
materialized_l0_scores = collect_materialized_policyengine_l0_scores(artifact_root)
artifact_gate_reports = collect_mp300k_artifact_gate_reports(artifact_root)
release_gate_reports = [
*artifact_gate_reports,
*_release_gate_reports_from_score_runs(
score_runs,
artifact_gate_reports,
),
]
run_contracts = collect_run_contracts(artifact_root)
active_logs = collect_recent_log_summaries(artifact_root)
tmux_sessions = collect_tmux_sessions() if include_tmux else []
Expand All @@ -116,7 +123,7 @@ def build_dashboard_payload(
"actual_l0_objective_runs": actual_l0_runs,
"materialized_policyengine_l0_scores": materialized_l0_scores,
"mp300k_artifact_gate_reports": artifact_gate_reports,
"release_readiness": build_release_readiness(artifact_gate_reports),
"release_readiness": build_release_readiness(release_gate_reports),
"run_contracts": run_contracts,
"active_logs": active_logs,
"tmux_sessions": tmux_sessions,
Expand Down Expand Up @@ -293,6 +300,114 @@ def collect_mp300k_artifact_gate_reports(
)


def _release_gate_reports_from_score_runs(
score_runs: list[dict[str, Any]],
artifact_gate_reports: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Build release-readiness rows from scored artifacts with smoke metadata.

Full gate reports are preferred when present. This fallback keeps the living
dashboard useful for older candidate artifacts that persisted PE-native
scores and loader-smoke results before the full gate sidecar existed.
"""

gate_report_dirs = {
str(row.get("artifact_dir"))
for row in artifact_gate_reports
if row.get("artifact_dir")
}
reports: list[dict[str, Any]] = []
for score in score_runs:
artifact_dir = str(score.get("artifact_dir") or "")
if not artifact_dir or artifact_dir in gate_report_dirs:
continue
release_smoke = score.get("release_smoke")
if not isinstance(release_smoke, dict):
continue
product = score.get("record_count_tier")
if not product:
continue

file_size_passes = release_smoke.get("passes_file_size_ratio_2x")
runtime_passes = release_smoke.get("passes_runtime_ratio_1_25x")
candidate_beats_baseline = score.get("candidate_beats_baseline")
failed_required_gates = []
unmeasured_required_gates = ["full_gate_report"]
for gate_name, gate_value in (
("artifact_size", file_size_passes),
("runtime", runtime_passes),
("ecps_comparison", candidate_beats_baseline),
):
if gate_value is True:
continue
if gate_value is False:
failed_required_gates.append(gate_name)
else:
unmeasured_required_gates.append(gate_name)

reports.append(
{
"artifact_path": release_smoke.get("artifact_path")
or score.get("artifact_path"),
"artifact_dir": artifact_dir,
"artifact_id": Path(artifact_dir).name,
"product": product,
"period": score.get("period"),
"status": _release_smoke_gate_status(
failed_required_gates,
unmeasured_required_gates,
),
"passing_required_gate_count": 4
- len(failed_required_gates)
- len(unmeasured_required_gates),
"failed_required_gate_count": len(failed_required_gates),
"unmeasured_required_gate_count": len(unmeasured_required_gates),
"failed_required_gates": failed_required_gates,
"unmeasured_required_gates": unmeasured_required_gates,
"candidate_dataset_path": score.get("candidate_dataset"),
"candidate_size_bytes": release_smoke.get(
"candidate_file_size_bytes"
),
"candidate_households": release_smoke.get("candidate_households"),
"candidate_persons": None,
"compatibility_status": "smoke_only",
"artifact_size_status": _gate_bool_status(file_size_passes),
"artifact_size_ratio": release_smoke.get("file_size_ratio"),
"runtime_status": _gate_bool_status(runtime_passes),
"runtime_ratio": release_smoke.get("median_runtime_ratio"),
"ecps_comparison_status": _gate_bool_status(
candidate_beats_baseline
),
"candidate_loss": score.get("candidate_loss"),
"baseline_loss": score.get("baseline_loss"),
"loss_delta": score.get("loss_delta"),
"n_targets_kept": score.get("n_targets_kept"),
"metric_runtime": score.get("metric_runtime"),
"source_kind": "score_release_smoke",
}
)
return reports


def _release_smoke_gate_status(
failed_required_gates: list[str],
unmeasured_required_gates: list[str],
) -> str:
if failed_required_gates:
return "failed"
if unmeasured_required_gates:
return "incomplete"
return "passed"


def _gate_bool_status(value: Any) -> str | None:
if value is True:
return "pass"
if value is False:
return "fail"
return None


def build_release_readiness(
artifact_gate_reports: list[dict[str, Any]],
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -1627,8 +1742,10 @@ def _score_entries_from_payload(path: Path, payload: Any) -> list[dict[str, Any]
"loss_delta": _number_or_none(
summary.get("enhanced_cps_native_loss_delta")
),
"candidate_beats_baseline": bool(
summary.get("candidate_beats_baseline")
"candidate_beats_baseline": _candidate_beats_baseline(
summary,
candidate_loss,
baseline_loss,
),
"candidate_unweighted_msre": _number_or_none(
summary.get("candidate_unweighted_msre")
Expand Down Expand Up @@ -1700,6 +1817,25 @@ def _release_smoke_summary(artifact_dir: Path) -> dict[str, Any] | None:
}


def _candidate_beats_baseline(
summary: dict[str, Any],
candidate_loss: float,
baseline_loss: float,
) -> bool:
raw_value = summary.get("candidate_beats_baseline")
if isinstance(raw_value, bool):
return raw_value
if raw_value is None:
return candidate_loss < baseline_loss
if isinstance(raw_value, str):
lowered = raw_value.strip().lower()
if lowered in {"true", "1", "yes"}:
return True
if lowered in {"false", "0", "no"}:
return False
return bool(raw_value)


def _summarize_unified_diagnostics(path: Path) -> dict[str, Any] | None:
try:
with path.open(newline="") as file:
Expand Down
55 changes: 54 additions & 1 deletion tests/pipelines/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def test_dashboard_payload_reads_release_smoke_for_record_tiers(tmp_path):
"period": 2024,
"summary": {
"baseline_enhanced_cps_native_loss": 0.1664,
"candidate_beats_baseline": True,
"candidate_enhanced_cps_native_loss": 0.0936,
"enhanced_cps_native_loss_delta": -0.0728,
"n_targets_kept": 2818,
Expand Down Expand Up @@ -254,6 +253,7 @@ def test_dashboard_payload_reads_release_smoke_for_record_tiers(tmp_path):
assert score_run["release_smoke"]["median_runtime_ratio"] == 1.19
assert score_run["release_smoke"]["passes_file_size_ratio_2x"] is True
assert score_run["release_smoke"]["passes_runtime_ratio_1_25x"] is True
assert score_run["candidate_beats_baseline"] is True
current_best = next(
row
for row in payload["run_board"]["comparison_matrix"]
Expand All @@ -264,6 +264,21 @@ def test_dashboard_payload_reads_release_smoke_for_record_tiers(tmp_path):
assertions = payload["run_board"]["assertions"]
assert assertions["microplex_current_best_has_release_smoke"] is True
assert assertions["microplex_current_best_release_smoke_passes"] is True
readiness = payload["run_board"]["release_readiness"]
assert len(readiness) == 1
assert readiness[0]["product"] == "mp-120k"
assert readiness[0]["metric_runtime"] == "latest_policyengine_us"
assert readiness[0]["status"] == "incomplete"
assert readiness[0]["best_passing_artifact"] is None
assert readiness[0]["release_blockers"] == ["full_gate_report"]
assert readiness[0]["best_fit_artifact"]["artifact_id"] == (
"mp120k_latest_us_data_refit"
)
assert readiness[0]["best_fit_artifact"]["compatibility_status"] == (
"smoke_only"
)
assert readiness[0]["best_fit_artifact"]["candidate_households"] == 120_000
assert readiness[0]["best_fit_release_blockers"] == ["full_gate_report"]


def test_dashboard_payload_wires_materialized_pe_l0_score_jsons(tmp_path):
Expand Down Expand Up @@ -469,6 +484,41 @@ def test_dashboard_payload_reads_mp300k_artifact_gate_reports(tmp_path):
}
)
)
(gate_dir / "scores.json").write_text(
json.dumps(
[
{
"summary": {
"baseline_enhanced_cps_native_loss": 0.1664,
"candidate_enhanced_cps_native_loss": 0.0936,
"n_targets_kept": 2818,
},
"broad_loss": {
"candidate_dataset": str(gate_dir / "pe_l0_candidate.h5"),
"baseline_dataset": "enhanced_cps_2024.h5",
},
}
]
)
)
(gate_dir / "runtime_smoke_loader.json").write_text(
json.dumps(
{
"file_size_ratio": 1.36,
"median_runtime_ratio": 1.19,
"candidate": {
"file_size_bytes": 150_658_539,
"households": 120_000,
"median_elapsed_seconds": 0.137,
},
"baseline": {
"file_size_bytes": 110_717_166,
"households": 41_314,
"median_elapsed_seconds": 0.115,
},
}
)
)
blocked_dir = artifacts / "mp120k_better_fit_blocked"
blocked_dir.mkdir(parents=True)
(blocked_dir / "mp300k_artifact_gates.json").write_text(
Expand Down Expand Up @@ -542,6 +592,9 @@ def test_dashboard_payload_reads_mp300k_artifact_gate_reports(tmp_path):
assert readiness[0]["passed_artifact_count"] == 1
assert readiness[0]["failed_artifact_count"] == 1
assert readiness[0]["best_passing_artifact"]["artifact_id"] == "mp120k_release"
assert readiness[0]["best_passing_artifact"]["artifact_path"].endswith(
"mp300k_artifact_gates.json"
)
assert (
readiness[0]["best_fit_artifact"]["artifact_id"] == "mp120k_better_fit_blocked"
)
Expand Down
Loading