Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/microplex/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
"SplitTransform",
"DeriveTransform",
"TransformSpec",
"VariableCodeReference",
"VariableSystemProvenance",
"VariableSpec",
"ArchTargetSpec",
"TargetsSpec",
"CalibrateSpec",
Expand Down Expand Up @@ -461,6 +464,98 @@ def kind(self) -> TransformKind:
return TransformKind.SPLIT if self.split is not None else TransformKind.DERIVE


class VariableCodeReference(_StrictModel):
"""A source-code pointer that explains how one system builds a variable."""

path: str = Field(
...,
min_length=1,
description="Repository-relative path, or package path, to the relevant code.",
)
lines: str | None = Field(
default=None,
min_length=1,
description="Human-readable line reference, e.g. '36,246-248'.",
)
symbol: str | None = Field(
default=None,
min_length=1,
description="Function, constant, or class name containing the behavior.",
)
summary: str | None = Field(
default=None,
min_length=1,
description="Short explanation of the referenced code path.",
)


class VariableSystemProvenance(_StrictModel):
"""How one implementation produces a spec variable.

This is intentionally descriptive rather than executable. Country packs use
it as a temporary audit scaffold while porting an incumbent data pipeline
into the declarative spec.
"""

method: str = Field(
..., min_length=1, description="Short method label, e.g. 'PUF QRF'."
)
code: list[VariableCodeReference] = Field(
default_factory=list,
description="Source-code references backing the method label.",
)
notes: str | None = Field(
default=None,
min_length=1,
description="Known caveats or divergences for this system.",
)


class VariableSpec(_StrictModel):
"""Temporary per-variable audit metadata.

``variables:`` is not used by the runtime engine yet. It exists to make
country-pack specs self-auditing during migrations: every declared variable
can carry the eCPS/incumbent code path, the legacy-MP code path, and the
intended spec behavior side-by-side.
"""

entity: str | None = Field(
default=None,
min_length=1,
description="Optional PolicyEngine entity label for the variable.",
)
role: str | None = Field(
default=None,
min_length=1,
description="Optional role/category, e.g. 'puf_imputed' or 'derived'.",
)
ecps: VariableSystemProvenance | None = Field(
default=None,
description="How the incumbent eCPS / production pipeline builds it.",
)
mp_legacy: VariableSystemProvenance | None = Field(
default=None,
description="How the legacy imperative Microplex pipeline builds it.",
)
mp_spec: VariableSystemProvenance | None = Field(
default=None,
description="How this declarative spec intends to build it.",
)
temporary: bool = Field(
default=True,
description="Marks this as a migration audit scaffold, not runtime logic.",
)

@model_validator(mode="after")
def _has_some_provenance(self) -> VariableSpec:
if self.ecps is None and self.mp_legacy is None and self.mp_spec is None:
raise ValueError(
"variable spec must declare at least one of ecps, mp_legacy, or mp_spec."
)
return self


class ArchTargetSpec(_StrictModel):
"""Names the Arch target set to fetch and roll up."""

Expand Down Expand Up @@ -529,6 +624,7 @@ class MicroplexSpec(_StrictModel):
spine: SpineSpec
imputation: list[ImputationStep] = Field(default_factory=list)
transforms: list[TransformSpec] = Field(default_factory=list)
variables: dict[str, VariableSpec] = Field(default_factory=dict)
targets: TargetsSpec | None = None
calibrate: CalibrateSpec | None = None

Expand Down
61 changes: 61 additions & 0 deletions tests/spec/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,51 @@ def test_load_spec_dict_roundtrip(self) -> None:
assert spec.meta.country == "us"
assert len(spec.imputation) == 1
assert spec.imputation[0].at is ImputationPhase.HALVES
assert spec.variables == {}

def test_variable_provenance_metadata_loads(self) -> None:
data = _valid_spec_dict()
data["variables"] = {
"employment_income": {
"entity": "person",
"role": "puf_imputed",
"ecps": {
"method": "PUF QRF",
"code": [
{
"path": "policyengine_us_data/storage/enhanced_cps/puf_impute.py",
"lines": "36,246-248",
"symbol": "IMPUTED_VARIABLES",
}
],
},
"mp_legacy": {
"method": "PUF donor imputation",
"code": [
{
"path": "src/microplex_us/data_sources/puf.py",
"lines": "776-778",
"summary": "maps PUF E00200 to employment_income",
}
],
},
"mp_spec": {
"method": "synthesize from puf onto synthetic_puf",
"notes": "Synthetic half must not inherit CPS wage values.",
},
}
}

spec = load_spec_dict(data)

variable = spec.variables["employment_income"]
assert variable.entity == "person"
assert variable.role == "puf_imputed"
assert variable.temporary is True
assert variable.ecps is not None
assert variable.ecps.code[0].symbol == "IMPUTED_VARIABLES"
assert variable.mp_legacy is not None
assert variable.mp_legacy.code[0].path.endswith("puf.py")

def test_base_phase_imputation_can_target_base_alias(self) -> None:
data = _valid_spec_dict()
Expand Down Expand Up @@ -228,6 +273,22 @@ def test_unknown_source_field_rejected(self) -> None:
with pytest.raises(SpecError, match="typo"):
load_spec_dict(data)

def test_empty_variable_provenance_rejected(self) -> None:
data = _valid_spec_dict()
data["variables"] = {"employment_income": {"entity": "person"}}
with pytest.raises(SpecError, match="at least one of ecps"):
load_spec_dict(data)

def test_unknown_variable_provenance_field_rejected(self) -> None:
data = _valid_spec_dict()
data["variables"] = {
"employment_income": {
"ecps": {"method": "PUF QRF", "typo": "not allowed"}
}
}
with pytest.raises(SpecError, match="typo"):
load_spec_dict(data)

def test_two_spine_sources_rejected(self) -> None:
data = _valid_spec_dict()
data["sources"]["puf"]["role"] = "spine"
Expand Down