diff --git a/docs/next-run-plan.md b/docs/next-run-plan.md index 241a290..46f0c15 100644 --- a/docs/next-run-plan.md +++ b/docs/next-run-plan.md @@ -1,10 +1,17 @@ # Next v8 pipeline run plan +> Superseded for release-candidate builds as of 2026-06-06. PE-US-data PUF +> support clone rebuilds must use `--donor-imputer-backend regime_aware`, which +> routes through MicroImpute chained donor imputations. The older `qrf` and +> `zi_qrf` backends remain useful only for explicit non-release experiments with +> `puf_support_clone_enabled=False`; release-profile config now fails closed if +> either backend is requested. + ## Summary v7 (2026-04-18 12:19 PM, artifact `live_pe_us_data_rebuild_checkpoint_20260418_microcalibrate_modular`) uses the default `donor_imputer_backend="qrf"`. That path leaves `zero_inflated_vars` empty in `ColumnwiseQRFDonorImputer`, so the imputer fits no zero-classifier and the QRF runs `predict()` over all 3.37 M rows for every target column — including columns that are 99 % zero. -v8 should flip to `--donor-imputer-backend zi_qrf`, which activates the `ZERO_INFLATED_POSITIVE`-whitelist path. On whitelisted columns the imputer fits a `RandomForestClassifier` zero-gate, then only invokes QRF `predict()` on rows the gate sends to the positive branch. On a 97 %-zero column this cuts QRF predict to ~3 % of rows — a large wall-clock win on donor integration. +v8 originally planned to flip to `--donor-imputer-backend zi_qrf`, which activates the `ZERO_INFLATED_POSITIVE`-whitelist path. On whitelisted columns the imputer fits a `RandomForestClassifier` zero-gate, then only invokes QRF `predict()` on rows the gate sends to the positive branch. On a 97 %-zero column this cuts QRF predict to ~3 % of rows — a large wall-clock win on donor integration. That is no longer sufficient for MP/eCPS release candidates because it does not use MicroImpute chained imputations across related donor targets. ## What `zi_qrf` actually covers @@ -38,8 +45,8 @@ uv run python -m microplex_us.pipelines.pe_us_data_rebuild_checkpoint \ --targets-db /Users/maxghenis/PolicyEngine/policyengine-us-data-aca-agi-db/policyengine_us_data/storage/calibration/policy_data.db \ --policyengine-us-data-repo /Users/maxghenis/PolicyEngine/policyengine-us-data \ --calibration-backend microcalibrate \ - --donor-imputer-backend zi_qrf \ - --version-id microcalibrate-zi-qrf-v8 \ + --donor-imputer-backend regime_aware \ + --version-id microcalibrate-regime-aware-v8 \ --n-synthetic 100000 \ --defer-policyengine-harness \ --defer-policyengine-native-score \ diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 0db7e9c..63c5298 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -78,7 +78,7 @@ def default_policyengine_us_data_rebuild_config( policyengine_calibration_deferred_stage_min_full_oracle_capped_mean_abs_relative_error=None, policyengine_calibration_deferred_stage_top_family_count=7, policyengine_calibration_deferred_stage_top_geography_count=4, - donor_imputer_backend="qrf", + donor_imputer_backend="regime_aware", donor_imputer_condition_selection="pe_prespecified", donor_imputer_qrf_zero_threshold=0.05, donor_imputer_excluded_variables=(), @@ -91,7 +91,18 @@ def default_policyengine_us_data_rebuild_config( ), policyengine_prefer_existing_tax_unit_ids=True, ) - return replace(defaults, **overrides) + config = replace(defaults, **overrides) + if ( + config.puf_support_clone_enabled + and config.donor_imputer_backend != "regime_aware" + ): + raise ValueError( + "PE-US-data PUF support clone rebuilds require " + "donor_imputer_backend='regime_aware' so release candidates use " + "MicroImpute chained donor imputations. Set " + "puf_support_clone_enabled=False for legacy imputer experiments." + ) + return config def default_policyengine_us_data_rebuild_source_providers( diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index 59bc784..f60f4af 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -87,7 +87,7 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() - ) assert config.policyengine_calibration_deferred_stage_top_family_count == 7 assert config.policyengine_calibration_deferred_stage_top_geography_count == 4 - assert config.donor_imputer_backend == "qrf" + assert config.donor_imputer_backend == "regime_aware" assert config.donor_imputer_condition_selection == "pe_prespecified" assert config.donor_imputer_excluded_variables == () assert config.puf_support_clone_enabled is True @@ -101,6 +101,31 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() - assert config.cps_asec_source_year == 2022 +def test_default_policyengine_us_data_rebuild_config_rejects_legacy_imputer_for_puf_support_clone() -> ( + None +): + try: + default_policyengine_us_data_rebuild_config(donor_imputer_backend="qrf") + except ValueError as exc: + message = str(exc) + assert "PUF support clone rebuilds require" in message + assert "donor_imputer_backend='regime_aware'" in message + else: + raise AssertionError("Expected PUF support clone qrf rebuild to fail") + + +def test_default_policyengine_us_data_rebuild_config_allows_legacy_imputer_when_puf_support_clone_disabled() -> ( + None +): + config = default_policyengine_us_data_rebuild_config( + puf_support_clone_enabled=False, + donor_imputer_backend="qrf", + ) + + assert config.puf_support_clone_enabled is False + assert config.donor_imputer_backend == "qrf" + + def test_default_policyengine_us_data_rebuild_config_respects_calibration_support_override() -> ( None ): diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index d5af537..49abbc8 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -69,7 +69,7 @@ def test_default_policyengine_us_data_rebuild_checkpoint_config_sets_pe_context( ) assert config.policyengine_calibration_deferred_stage_top_family_count == 7 assert config.policyengine_calibration_deferred_stage_top_geography_count == 4 - assert config.donor_imputer_backend == "qrf" + assert config.donor_imputer_backend == "regime_aware" assert config.donor_imputer_condition_selection == "pe_prespecified" assert config.donor_imputer_excluded_variables == () assert config.policyengine_baseline_dataset == "/tmp/enhanced_cps_2024.h5" @@ -89,6 +89,23 @@ def test_default_policyengine_us_data_rebuild_checkpoint_config_sets_pe_context( assert config.random_seed == 123 +def test_default_policyengine_us_data_rebuild_checkpoint_config_rejects_legacy_imputer_for_puf_support_clone() -> ( + None +): + try: + default_policyengine_us_data_rebuild_checkpoint_config( + policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5", + policyengine_targets_db="/tmp/policy_data.db", + donor_imputer_backend="qrf", + ) + except ValueError as exc: + message = str(exc) + assert "PUF support clone rebuilds require" in message + assert "donor_imputer_backend='regime_aware'" in message + else: + raise AssertionError("Expected checkpoint qrf rebuild to fail") + + def test_default_policyengine_us_data_rebuild_checkpoint_config_preserves_explicit_calibration_scope() -> ( None ): @@ -389,9 +406,12 @@ def _install_resume_stage_test_doubles(monkeypatch, artifact_root, captured) -> class FakeResumePipeline: def __init__(self, config=None, *, stage_runtime_writer=None): - self.config = config or default_policyengine_us_data_rebuild_checkpoint_config( - policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5", - policyengine_targets_db="/tmp/policy_data.db", + self.config = ( + config + or default_policyengine_us_data_rebuild_checkpoint_config( + policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5", + policyengine_targets_db="/tmp/policy_data.db", + ) ) self.stage_runtime_writer = stage_runtime_writer if stage_runtime_writer is not None: @@ -811,7 +831,10 @@ def test_stage_resume_preflight_reports_missing_policyengine_bundle_member( ) -> None: artifact_root = _write_complete_resume_artifact_root(tmp_path / "run-1") missing_member = ( - artifact_root / "stage_artifacts" / "06_policyengine_entities" / "persons.parquet" + artifact_root + / "stage_artifacts" + / "06_policyengine_entities" + / "persons.parquet" ) missing_member.unlink() @@ -822,10 +845,15 @@ def test_stage_resume_preflight_reports_missing_policyengine_bundle_member( assert not preflight.ok missing = {item.label: item for item in preflight.missing} - assert "06_policyengine_entities.pre_calibration_policyengine_entity_tables" in missing - assert missing[ - "06_policyengine_entities.pre_calibration_policyengine_entity_tables" - ].path == missing_member + assert ( + "06_policyengine_entities.pre_calibration_policyengine_entity_tables" in missing + ) + assert ( + missing[ + "06_policyengine_entities.pre_calibration_policyengine_entity_tables" + ].path + == missing_member + ) def test_run_policyengine_us_data_rebuild_checkpoint_builds_bundle_and_parity( @@ -2304,6 +2332,7 @@ def _write_complete_resume_artifact_root(artifact_root: Path) -> Path: "complete": True, "lifecycleStatus": "complete", "requiredOutputs": required_outputs, + "missingRequiredOutputs": [], "outputs": outputs, } )