diff --git a/src/microplex/spec.py b/src/microplex/spec.py index 673081c..3cc3d03 100644 --- a/src/microplex/spec.py +++ b/src/microplex/spec.py @@ -48,6 +48,9 @@ "SplitTransform", "DeriveTransform", "TransformSpec", + "VariableCodeReference", + "VariableSystemProvenance", + "VariableSpec", "ArchTargetSpec", "TargetsSpec", "CalibrateSpec", @@ -461,6 +464,98 @@ def kind(self) -> TransformKind: return TransformKind.SPLIT if self.split is not None else TransformKind.DERIVE +class VariableCodeReference(_StrictModel): + """A source-code pointer that explains how one system builds a variable.""" + + path: str = Field( + ..., + min_length=1, + description="Repository-relative path, or package path, to the relevant code.", + ) + lines: str | None = Field( + default=None, + min_length=1, + description="Human-readable line reference, e.g. '36,246-248'.", + ) + symbol: str | None = Field( + default=None, + min_length=1, + description="Function, constant, or class name containing the behavior.", + ) + summary: str | None = Field( + default=None, + min_length=1, + description="Short explanation of the referenced code path.", + ) + + +class VariableSystemProvenance(_StrictModel): + """How one implementation produces a spec variable. + + This is intentionally descriptive rather than executable. Country packs use + it as a temporary audit scaffold while porting an incumbent data pipeline + into the declarative spec. + """ + + method: str = Field( + ..., min_length=1, description="Short method label, e.g. 'PUF QRF'." + ) + code: list[VariableCodeReference] = Field( + default_factory=list, + description="Source-code references backing the method label.", + ) + notes: str | None = Field( + default=None, + min_length=1, + description="Known caveats or divergences for this system.", + ) + + +class VariableSpec(_StrictModel): + """Temporary per-variable audit metadata. + + ``variables:`` is not used by the runtime engine yet. It exists to make + country-pack specs self-auditing during migrations: every declared variable + can carry the eCPS/incumbent code path, the legacy-MP code path, and the + intended spec behavior side-by-side. + """ + + entity: str | None = Field( + default=None, + min_length=1, + description="Optional PolicyEngine entity label for the variable.", + ) + role: str | None = Field( + default=None, + min_length=1, + description="Optional role/category, e.g. 'puf_imputed' or 'derived'.", + ) + ecps: VariableSystemProvenance | None = Field( + default=None, + description="How the incumbent eCPS / production pipeline builds it.", + ) + mp_legacy: VariableSystemProvenance | None = Field( + default=None, + description="How the legacy imperative Microplex pipeline builds it.", + ) + mp_spec: VariableSystemProvenance | None = Field( + default=None, + description="How this declarative spec intends to build it.", + ) + temporary: bool = Field( + default=True, + description="Marks this as a migration audit scaffold, not runtime logic.", + ) + + @model_validator(mode="after") + def _has_some_provenance(self) -> VariableSpec: + if self.ecps is None and self.mp_legacy is None and self.mp_spec is None: + raise ValueError( + "variable spec must declare at least one of ecps, mp_legacy, or mp_spec." + ) + return self + + class ArchTargetSpec(_StrictModel): """Names the Arch target set to fetch and roll up.""" @@ -529,6 +624,7 @@ class MicroplexSpec(_StrictModel): spine: SpineSpec imputation: list[ImputationStep] = Field(default_factory=list) transforms: list[TransformSpec] = Field(default_factory=list) + variables: dict[str, VariableSpec] = Field(default_factory=dict) targets: TargetsSpec | None = None calibrate: CalibrateSpec | None = None diff --git a/tests/spec/test_spec.py b/tests/spec/test_spec.py index 34b8078..d449b2a 100644 --- a/tests/spec/test_spec.py +++ b/tests/spec/test_spec.py @@ -144,6 +144,51 @@ def test_load_spec_dict_roundtrip(self) -> None: assert spec.meta.country == "us" assert len(spec.imputation) == 1 assert spec.imputation[0].at is ImputationPhase.HALVES + assert spec.variables == {} + + def test_variable_provenance_metadata_loads(self) -> None: + data = _valid_spec_dict() + data["variables"] = { + "employment_income": { + "entity": "person", + "role": "puf_imputed", + "ecps": { + "method": "PUF QRF", + "code": [ + { + "path": "policyengine_us_data/storage/enhanced_cps/puf_impute.py", + "lines": "36,246-248", + "symbol": "IMPUTED_VARIABLES", + } + ], + }, + "mp_legacy": { + "method": "PUF donor imputation", + "code": [ + { + "path": "src/microplex_us/data_sources/puf.py", + "lines": "776-778", + "summary": "maps PUF E00200 to employment_income", + } + ], + }, + "mp_spec": { + "method": "synthesize from puf onto synthetic_puf", + "notes": "Synthetic half must not inherit CPS wage values.", + }, + } + } + + spec = load_spec_dict(data) + + variable = spec.variables["employment_income"] + assert variable.entity == "person" + assert variable.role == "puf_imputed" + assert variable.temporary is True + assert variable.ecps is not None + assert variable.ecps.code[0].symbol == "IMPUTED_VARIABLES" + assert variable.mp_legacy is not None + assert variable.mp_legacy.code[0].path.endswith("puf.py") def test_base_phase_imputation_can_target_base_alias(self) -> None: data = _valid_spec_dict() @@ -228,6 +273,22 @@ def test_unknown_source_field_rejected(self) -> None: with pytest.raises(SpecError, match="typo"): load_spec_dict(data) + def test_empty_variable_provenance_rejected(self) -> None: + data = _valid_spec_dict() + data["variables"] = {"employment_income": {"entity": "person"}} + with pytest.raises(SpecError, match="at least one of ecps"): + load_spec_dict(data) + + def test_unknown_variable_provenance_field_rejected(self) -> None: + data = _valid_spec_dict() + data["variables"] = { + "employment_income": { + "ecps": {"method": "PUF QRF", "typo": "not allowed"} + } + } + with pytest.raises(SpecError, match="typo"): + load_spec_dict(data) + def test_two_spine_sources_rejected(self) -> None: data = _valid_spec_dict() data["sources"]["puf"]["role"] = "spine"