diff --git a/src/microplex_us/pipelines/export_lineage_manifest.py b/src/microplex_us/pipelines/export_lineage_manifest.py index 949efb7..4c1048c 100644 --- a/src/microplex_us/pipelines/export_lineage_manifest.py +++ b/src/microplex_us/pipelines/export_lineage_manifest.py @@ -528,6 +528,15 @@ def _add_pipeline_constructed_evidence( "CPS weekly-hours export support", ("A_HRS1", "hours_worked", "hours_worked_last_week"), ), + "selected_marketplace_plan_benchmark_ratio": ( + "PE-US ACA selected Marketplace plan ratio construction", + ( + "health_insurance_premiums_without_medicare_part_b", + "takes_up_aca_if_eligible", + "aca_ptc", + "slcsp", + ), + ), "self_employment_income_before_lsr": ( "USMicroplexPipeline income normalizer", ("self_employment_income",), diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 6e3c8dc..e6b6c9c 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -116,6 +116,7 @@ build_policyengine_us_export_variable_maps, build_policyengine_us_time_period_arrays, compile_supported_policyengine_us_household_linear_constraints, + compute_marketplace_plan_benchmark_ratio, filter_supported_policyengine_us_targets, infer_policyengine_us_variable_bindings, load_us_pipeline_checkpoint, @@ -5736,6 +5737,10 @@ def export_policyengine_dataset( tables = result.policyengine_tables or self.build_policyengine_entity_tables( result.calibrated_data ) + tables = self._attach_policyengine_marketplace_plan_benchmark_ratio( + tables, + target_period=export_period, + ) tax_benefit_system = self._resolve_policyengine_tax_benefit_system() export_maps = build_policyengine_us_export_variable_maps( tables, @@ -9329,6 +9334,11 @@ def _aggregate_policyengine_tax_unit_input_columns( aggregated[column] = float(nonzero_values.iloc[0]) continue aggregated[column] = float(values.sum()) + for column in ("health_insurance_premiums_without_medicare_part_b",): + if column not in unit_persons.columns: + continue + values = pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0) + aggregated[column] = float(values.clip(lower=0.0).sum()) for child_count_column in ("eitc_children", "eitc_child_count"): if child_count_column not in unit_persons.columns: continue @@ -9539,6 +9549,75 @@ def _attach_policyengine_tax_unit_takeup_inputs( result = self._attach_policyengine_eitc_takeup(result) return self._attach_policyengine_voluntary_filing(result) + def _attach_policyengine_marketplace_plan_benchmark_ratio( + self, + tables: PolicyEngineUSEntityTableBundle, + *, + target_period: int, + ) -> PolicyEngineUSEntityTableBundle: + """Derive eCPS's persisted selected Marketplace plan ratio input.""" + tax_units = tables.tax_units + if tax_units is None or tax_units.empty: + return tables + if not { + "health_insurance_premiums_without_medicare_part_b", + "takes_up_aca_if_eligible", + }.issubset(tax_units.columns): + return tables + + missing_intermediates = { + column for column in ("aca_ptc", "slcsp") if column not in tax_units.columns + } + materialized_tables = tables + if missing_intermediates: + materialization_result = materialize_policyengine_us_variables_safely( + tables, + variables=tuple(sorted(missing_intermediates)), + period=target_period, + dataset_year=self.config.policyengine_dataset_year or target_period, + simulation_cls=self.config.policyengine_simulation_cls, + direct_override_variables=self.config.policyengine_direct_override_variables, + batch_size=self.config.policyengine_materialize_batch_size, + ) + materialized_tables = materialization_result.tables + tax_units = materialized_tables.tax_units + if tax_units is None: + return tables + still_missing = sorted(missing_intermediates - set(tax_units.columns)) + if still_missing: + LOGGER.warning( + "Could not derive selected Marketplace plan benchmark ratio; " + "missing PE intermediate(s): %s", + ", ".join(still_missing), + ) + return materialized_tables + + tax_units = tax_units.copy() + tax_units["selected_marketplace_plan_benchmark_ratio"] = ( + compute_marketplace_plan_benchmark_ratio( + reported_premium=pd.to_numeric( + tax_units["health_insurance_premiums_without_medicare_part_b"], + errors="coerce", + ).fillna(0.0), + aca_ptc=pd.to_numeric(tax_units["aca_ptc"], errors="coerce").fillna( + 0.0 + ), + slcsp=pd.to_numeric(tax_units["slcsp"], errors="coerce").fillna(0.0), + takes_up_aca=self._normal_bool_series( + tax_units["takes_up_aca_if_eligible"], + index=tax_units.index, + ), + ) + ) + return PolicyEngineUSEntityTableBundle( + households=materialized_tables.households, + persons=materialized_tables.persons, + tax_units=tax_units, + spm_units=materialized_tables.spm_units, + families=materialized_tables.families, + marital_units=materialized_tables.marital_units, + ) + def _attach_policyengine_simple_tax_unit_takeup( self, tax_units: pd.DataFrame, diff --git a/src/microplex_us/policyengine/us.py b/src/microplex_us/policyengine/us.py index 25f9b91..f32dc55 100644 --- a/src/microplex_us/policyengine/us.py +++ b/src/microplex_us/policyengine/us.py @@ -794,6 +794,47 @@ class PolicyEngineUSVariableMaterializationResult: | POLICYENGINE_US_DATA_OVERRIDABLE_COMPUTED_EXPORT_VARIABLES ) +MARKETPLACE_PLAN_BENCHMARK_RATIO_MIN = 0.5 +MARKETPLACE_PLAN_BENCHMARK_RATIO_MAX = 1.5 +MARKETPLACE_PLAN_BENCHMARK_RATIO_COLUMN = "selected_marketplace_plan_benchmark_ratio" +MARKETPLACE_PLAN_BENCHMARK_RATIO_SOURCE_COLUMNS: frozenset[str] = frozenset( + { + "health_insurance_premiums_without_medicare_part_b", + "aca_ptc", + "slcsp", + "takes_up_aca_if_eligible", + } +) + + +def compute_marketplace_plan_benchmark_ratio( + *, + reported_premium: Any, + aca_ptc: Any, + slcsp: Any, + takes_up_aca: Any, +) -> np.ndarray: + """Back out selected Marketplace plan cost relative to SLCSP. + + ``selected_marketplace_plan_benchmark_ratio`` is a persisted eCPS input, + not SLCSP itself. PE-US still computes SLCSP from geography and family + composition; this ratio carries the selected-plan-to-benchmark adjustment + for tax units modeled as taking up Marketplace coverage. + """ + reported = np.asarray(reported_premium, dtype=float) + ptc = np.asarray(aca_ptc, dtype=float) + benchmark = np.asarray(slcsp, dtype=float) + takeup = np.asarray(takes_up_aca, dtype=bool) + with np.errstate(divide="ignore", invalid="ignore"): + raw = (reported + ptc) / np.where(benchmark > 0, benchmark, 1.0) + clipped = np.clip( + raw, + MARKETPLACE_PLAN_BENCHMARK_RATIO_MIN, + MARKETPLACE_PLAN_BENCHMARK_RATIO_MAX, + ) + applicable = takeup & (benchmark > 0) + return np.where(applicable, clipped, 1.0) + def compute_policyengine_us_definition_hash( constraints: tuple[PolicyEngineUSConstraint, ...] | list[PolicyEngineUSConstraint], @@ -3050,6 +3091,7 @@ def build_policyengine_us_export_variable_maps( ) household_table = _with_policyengine_household_export_derivatives(tables.households) person_table = _with_policyengine_person_export_derivatives(tables.persons) + tax_unit_table = _with_policyengine_tax_unit_export_derivatives(tables.tax_units) table_specs = ( ( "household", @@ -3057,7 +3099,7 @@ def build_policyengine_us_export_variable_maps( {"household_id", "household_weight", "weight"}, ), ("person", person_table, {"person_id", "household_id"}), - ("tax_unit", tables.tax_units, {"tax_unit_id", "household_id"}), + ("tax_unit", tax_unit_table, {"tax_unit_id", "household_id"}), ("spm_unit", tables.spm_units, {"spm_unit_id", "household_id"}), ("family", tables.families, {"family_id", "household_id"}), ) @@ -3141,6 +3183,7 @@ def build_policyengine_us_time_period_arrays( tables.persons, period=int(period), ) + tax_unit_table = _with_policyengine_tax_unit_export_derivatives(tables.tax_units) persons = _prepare_person_export_table( person_table, person_id_column=person_id_column, @@ -3184,7 +3227,7 @@ def build_policyengine_us_time_period_arrays( ( "tax_unit", "tax_unit_id", - tables.tax_units, + tax_unit_table, tax_unit_variable_map, "household", ), @@ -3651,6 +3694,35 @@ def _with_policyengine_person_export_derivatives( return person_table +def _with_policyengine_tax_unit_export_derivatives( + tax_units: pd.DataFrame | None, +) -> pd.DataFrame | None: + if tax_units is None: + return tax_units + + if not MARKETPLACE_PLAN_BENCHMARK_RATIO_SOURCE_COLUMNS.issubset(tax_units.columns): + return tax_units + + tax_unit_table = tax_units.copy() + tax_unit_table[MARKETPLACE_PLAN_BENCHMARK_RATIO_COLUMN] = ( + compute_marketplace_plan_benchmark_ratio( + reported_premium=pd.to_numeric( + tax_unit_table["health_insurance_premiums_without_medicare_part_b"], + errors="coerce", + ).fillna(0.0), + aca_ptc=pd.to_numeric(tax_unit_table["aca_ptc"], errors="coerce").fillna( + 0.0 + ), + slcsp=pd.to_numeric(tax_unit_table["slcsp"], errors="coerce").fillna(0.0), + takes_up_aca=_truthy_series( + tax_unit_table["takes_up_aca_if_eligible"], + index=tax_unit_table.index, + ), + ) + ) + return tax_unit_table + + def _derive_has_tin_for_export(persons: pd.DataFrame) -> pd.Series: """Mirror PE-US has_tin default while honoring MP's SSN-card type signal.""" if "ssn_card_type" not in persons.columns: diff --git a/tests/pipelines/test_export_lineage_manifest.py b/tests/pipelines/test_export_lineage_manifest.py index 976743e..171e77f 100644 --- a/tests/pipelines/test_export_lineage_manifest.py +++ b/tests/pipelines/test_export_lineage_manifest.py @@ -21,18 +21,11 @@ def test_export_lineage_manifest_tracks_source_backed_blocks(): "home_mortgage_interest", "reported_has_medicaid_health_coverage_at_interview", "ssn_card_type", + "selected_marketplace_plan_benchmark_ratio", "weekly_hours_worked_before_lsr", ): assert columns[column]["has_source_lineage"] - assert ( - columns["selected_marketplace_plan_benchmark_ratio"]["export_path_status"] - == "default_only" - ) - assert not columns["selected_marketplace_plan_benchmark_ratio"][ - "has_source_lineage" - ] - def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_path): contract_path = tmp_path / "contract.json" @@ -40,7 +33,7 @@ def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_pa json.dumps( { "required": [ - "selected_marketplace_plan_benchmark_ratio", + "is_wic_at_nutritional_risk", "weekly_hours_worked_before_lsr", ], "forbidden": [], @@ -49,8 +42,8 @@ def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_pa ) baseline_path = tmp_path / "baseline.h5" with h5py.File(baseline_path, "w") as handle: - selected = handle.create_group("selected_marketplace_plan_benchmark_ratio") - selected.create_dataset("2024", data=np.array([0.8, 1.0])) + wic = handle.create_group("is_wic_at_nutritional_risk") + wic.create_dataset("2024", data=np.array([False, True])) weekly_hours = handle.create_group("weekly_hours_worked_before_lsr") weekly_hours.create_dataset("2024", data=np.array([0.0, 40.0])) @@ -61,9 +54,9 @@ def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_pa issues = {issue["column"]: issue for issue in payload["issues"]} assert issues == { - "selected_marketplace_plan_benchmark_ratio": { - "column": "selected_marketplace_plan_benchmark_ratio", - "ecps_support_requirement": "numeric_nonzero", + "is_wic_at_nutritional_risk": { + "column": "is_wic_at_nutritional_risk", + "ecps_support_requirement": "categorical_variation", "export_path_status": "default_only", "issue": "ecps_populated_export_has_no_source_lineage", } diff --git a/tests/pipelines/test_mp300k_artifact_gates.py b/tests/pipelines/test_mp300k_artifact_gates.py index 2feb6d7..e3d963c 100644 --- a/tests/pipelines/test_mp300k_artifact_gates.py +++ b/tests/pipelines/test_mp300k_artifact_gates.py @@ -437,14 +437,14 @@ def test_export_lineage_gate_rejects_ecps_populated_default_only_column(tmp_path ) _add_period_dataset( candidate_dataset, - "selected_marketplace_plan_benchmark_ratio", - [1.0, 1.0], + "is_wic_at_nutritional_risk", + [False, True], ) baseline_dataset = _write_minimal_policyengine_dataset(tmp_path / "baseline.h5") _add_period_dataset( baseline_dataset, - "selected_marketplace_plan_benchmark_ratio", - [0.8, 1.2], + "is_wic_at_nutritional_risk", + [False, True], ) benchmark_manifest = tmp_path / "benchmark_manifest.json" _write_benchmark_manifest(benchmark_manifest) @@ -469,8 +469,8 @@ def test_export_lineage_gate_rejects_ecps_populated_default_only_column(tmp_path assert lineage_gate["status"] == "fail" assert lineage_gate["details"]["issues"] == [ { - "column": "selected_marketplace_plan_benchmark_ratio", - "ecps_support_requirement": "numeric_nonzero", + "column": "is_wic_at_nutritional_risk", + "ecps_support_requirement": "categorical_variation", "export_path_status": "default_only", "issue": "ecps_populated_export_has_no_source_lineage", } diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 42c6908..02fe434 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1230,6 +1230,7 @@ def test_build_policyengine_entity_tables_derives_tax_input_columns(self): "unemployment_compensation": [0.0, 150.0], "medicaid": [0.0, 1_250.0], "medicaid_enrolled": [False, True], + "health_insurance_premiums_without_medicare_part_b": [120.0, 80.0], "state_income_tax_paid": [400.0, 50.0], "filing_status": ["JOINT", "JOINT"], "relationship_to_head": [0, 1], @@ -1240,6 +1241,9 @@ def test_build_policyengine_entity_tables_derives_tax_input_columns(self): tables = pipeline.build_policyengine_entity_tables(population) person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) + tax_unit_rows = tables.tax_units.sort_values("household_id").reset_index( + drop=True + ) assert person_rows["employment_income_before_lsr"].tolist() == [ 50_000.0, @@ -1268,6 +1272,10 @@ def test_build_policyengine_entity_tables_derives_tax_input_columns(self): assert person_rows["is_hispanic"].tolist() == [False, True] assert person_rows["medicaid"].tolist() == [0.0, 1_250.0] assert person_rows["medicaid_enrolled"].tolist() == [False, True] + assert ( + tax_unit_rows["health_insurance_premiums_without_medicare_part_b"].sum() + == 200.0 + ) assert person_rows["state_income_tax_reported"].tolist() == [400.0, 50.0] def test_build_policyengine_entity_tables_adds_deterministic_aca_takeup( @@ -1348,6 +1356,86 @@ def test_build_policyengine_entity_tables_preserves_explicit_aca_takeup(self): tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) assert tax_units["takes_up_aca_if_eligible"].tolist() == [True, False] + def test_attach_policyengine_marketplace_ratio_materializes_intermediates( + self, + monkeypatch, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": [10, 20], + "household_weight": [1.0, 1.0], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 20], + "tax_unit_id": [100, 200], + } + ), + tax_units=pd.DataFrame( + { + "tax_unit_id": [100, 200], + "household_id": [10, 20], + "health_insurance_premiums_without_medicare_part_b": [ + 300.0, + 50.0, + ], + "takes_up_aca_if_eligible": [True, True], + } + ), + ) + + captured_variables: list[tuple[str, ...]] = [] + + def fake_materialize(tables_arg, *, variables, **kwargs): + captured_variables.append(tuple(variables)) + tax_units = tables_arg.tax_units.copy() + tax_units["aca_ptc"] = [700.0, 0.0] + tax_units["slcsp"] = [1_000.0, 1_000.0] + return PolicyEngineUSVariableMaterializationResult( + tables=PolicyEngineUSEntityTableBundle( + households=tables_arg.households, + persons=tables_arg.persons, + tax_units=tax_units, + spm_units=tables_arg.spm_units, + families=tables_arg.families, + marital_units=tables_arg.marital_units, + ), + bindings={ + "aca_ptc": PolicyEngineUSVariableBinding( + entity=EntityType.TAX_UNIT, + column="aca_ptc", + ), + "slcsp": PolicyEngineUSVariableBinding( + entity=EntityType.TAX_UNIT, + column="slcsp", + ), + }, + materialized_variables=tuple(variables), + ) + + monkeypatch.setattr( + us_pipeline_module, + "materialize_policyengine_us_variables_safely", + fake_materialize, + ) + + updated = pipeline._attach_policyengine_marketplace_plan_benchmark_ratio( + tables, + target_period=2024, + ) + + assert captured_variables == [("aca_ptc", "slcsp")] + np.testing.assert_allclose( + updated.tax_units["selected_marketplace_plan_benchmark_ratio"], + np.array([1.0, 0.5]), + ) + def test_build_policyengine_entity_tables_adds_ecps_stochastic_takeup_inputs( self, monkeypatch, diff --git a/tests/policyengine/test_us.py b/tests/policyengine/test_us.py index 6ec0761..04132d7 100644 --- a/tests/policyengine/test_us.py +++ b/tests/policyengine/test_us.py @@ -36,6 +36,7 @@ build_policyengine_us_export_variable_maps, build_policyengine_us_time_period_arrays, compile_policyengine_us_household_linear_constraints, + compute_marketplace_plan_benchmark_ratio, compute_policyengine_us_definition_hash, detect_policyengine_pseudo_inputs, materialize_policyengine_us_variables, @@ -2954,6 +2955,62 @@ def test_build_time_period_arrays_defaults_missing_ssn_card_type_to_citizen(self assert arrays["ssn_card_type"]["2024"].tolist() == [b"NONE", b"CITIZEN"] + def test_compute_marketplace_plan_benchmark_ratio_clips_marketplace_takers(self): + ratio = compute_marketplace_plan_benchmark_ratio( + reported_premium=np.array([300.0, 50.0, 500.0, 500.0]), + aca_ptc=np.array([700.0, 0.0, 0.0, 500.0]), + slcsp=np.array([1_000.0, 1_000.0, 0.0, 1_000.0]), + takes_up_aca=np.array([True, True, True, False]), + ) + + np.testing.assert_allclose(ratio, np.array([1.0, 0.5, 1.0, 1.0])) + + def test_build_time_period_arrays_derives_marketplace_plan_ratio(self): + tables = PolicyEngineUSEntityTableBundle( + households=pd.DataFrame( + { + "household_id": [10, 20, 30], + "household_weight": [1.0, 1.0, 1.0], + } + ), + persons=pd.DataFrame( + { + "person_id": [1, 2, 3], + "household_id": [10, 20, 30], + "tax_unit_id": [100, 200, 300], + } + ), + tax_units=pd.DataFrame( + { + "tax_unit_id": [100, 200, 300], + "household_id": [10, 20, 30], + "health_insurance_premiums_without_medicare_part_b": [ + 300.0, + 50.0, + 200.0, + ], + "aca_ptc": [700.0, 0.0, 100.0], + "slcsp": [1_000.0, 1_000.0, 0.0], + "takes_up_aca_if_eligible": [True, True, True], + } + ), + ) + + arrays = build_policyengine_us_time_period_arrays( + tables, + period=2024, + tax_unit_variable_map={ + "selected_marketplace_plan_benchmark_ratio": ( + "selected_marketplace_plan_benchmark_ratio" + ) + }, + ) + + np.testing.assert_allclose( + arrays["selected_marketplace_plan_benchmark_ratio"]["2024"], + np.array([1.0, 0.5, 1.0]), + ) + def test_build_time_period_arrays_defaults_absent_export_inputs(self): tables = PolicyEngineUSEntityTableBundle( households=pd.DataFrame( @@ -2999,6 +3056,9 @@ def test_build_time_period_arrays_defaults_absent_export_inputs(self): }, tax_unit_variable_map={ "first_home_mortgage_balance": "first_home_mortgage_balance", + "selected_marketplace_plan_benchmark_ratio": ( + "selected_marketplace_plan_benchmark_ratio" + ), "takes_up_aca_if_eligible": "takes_up_aca_if_eligible", "takes_up_eitc": "takes_up_eitc", "would_file_taxes_voluntarily": "would_file_taxes_voluntarily", @@ -3021,6 +3081,9 @@ def test_build_time_period_arrays_defaults_absent_export_inputs(self): assert arrays["weeks_unemployed"]["2024"].tolist() == [0] assert arrays["would_claim_wic"]["2024"].tolist() == [True] assert arrays["first_home_mortgage_balance"]["2024"].tolist() == [0.0] + assert arrays["selected_marketplace_plan_benchmark_ratio"]["2024"].tolist() == [ + 1.0 + ] assert arrays["takes_up_aca_if_eligible"]["2024"].tolist() == [True] assert arrays["takes_up_eitc"]["2024"].tolist() == [True] assert arrays["would_file_taxes_voluntarily"]["2024"].tolist() == [False]