Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/microplex_us/pipelines/export_lineage_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,15 @@ def _add_pipeline_constructed_evidence(
"CPS weekly-hours export support",
("A_HRS1", "hours_worked", "hours_worked_last_week"),
),
"selected_marketplace_plan_benchmark_ratio": (
"PE-US ACA selected Marketplace plan ratio construction",
(
"health_insurance_premiums_without_medicare_part_b",
"takes_up_aca_if_eligible",
"aca_ptc",
"slcsp",
),
),
"self_employment_income_before_lsr": (
"USMicroplexPipeline income normalizer",
("self_employment_income",),
Expand Down
79 changes: 79 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
build_policyengine_us_export_variable_maps,
build_policyengine_us_time_period_arrays,
compile_supported_policyengine_us_household_linear_constraints,
compute_marketplace_plan_benchmark_ratio,
filter_supported_policyengine_us_targets,
infer_policyengine_us_variable_bindings,
load_us_pipeline_checkpoint,
Expand Down Expand Up @@ -5736,6 +5737,10 @@ def export_policyengine_dataset(
tables = result.policyengine_tables or self.build_policyengine_entity_tables(
result.calibrated_data
)
tables = self._attach_policyengine_marketplace_plan_benchmark_ratio(
tables,
target_period=export_period,
)
tax_benefit_system = self._resolve_policyengine_tax_benefit_system()
export_maps = build_policyengine_us_export_variable_maps(
tables,
Expand Down Expand Up @@ -9329,6 +9334,11 @@ def _aggregate_policyengine_tax_unit_input_columns(
aggregated[column] = float(nonzero_values.iloc[0])
continue
aggregated[column] = float(values.sum())
for column in ("health_insurance_premiums_without_medicare_part_b",):
if column not in unit_persons.columns:
continue
values = pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0)
aggregated[column] = float(values.clip(lower=0.0).sum())
for child_count_column in ("eitc_children", "eitc_child_count"):
if child_count_column not in unit_persons.columns:
continue
Expand Down Expand Up @@ -9539,6 +9549,75 @@ def _attach_policyengine_tax_unit_takeup_inputs(
result = self._attach_policyengine_eitc_takeup(result)
return self._attach_policyengine_voluntary_filing(result)

def _attach_policyengine_marketplace_plan_benchmark_ratio(
self,
tables: PolicyEngineUSEntityTableBundle,
*,
target_period: int,
) -> PolicyEngineUSEntityTableBundle:
"""Derive eCPS's persisted selected Marketplace plan ratio input."""
tax_units = tables.tax_units
if tax_units is None or tax_units.empty:
return tables
if not {
"health_insurance_premiums_without_medicare_part_b",
"takes_up_aca_if_eligible",
}.issubset(tax_units.columns):
return tables

missing_intermediates = {
column for column in ("aca_ptc", "slcsp") if column not in tax_units.columns
}
materialized_tables = tables
if missing_intermediates:
materialization_result = materialize_policyengine_us_variables_safely(
tables,
variables=tuple(sorted(missing_intermediates)),
period=target_period,
dataset_year=self.config.policyengine_dataset_year or target_period,
simulation_cls=self.config.policyengine_simulation_cls,
direct_override_variables=self.config.policyengine_direct_override_variables,
batch_size=self.config.policyengine_materialize_batch_size,
)
materialized_tables = materialization_result.tables
tax_units = materialized_tables.tax_units
if tax_units is None:
return tables
still_missing = sorted(missing_intermediates - set(tax_units.columns))
if still_missing:
LOGGER.warning(
"Could not derive selected Marketplace plan benchmark ratio; "
"missing PE intermediate(s): %s",
", ".join(still_missing),
)
return materialized_tables

tax_units = tax_units.copy()
tax_units["selected_marketplace_plan_benchmark_ratio"] = (
compute_marketplace_plan_benchmark_ratio(
reported_premium=pd.to_numeric(
tax_units["health_insurance_premiums_without_medicare_part_b"],
errors="coerce",
).fillna(0.0),
aca_ptc=pd.to_numeric(tax_units["aca_ptc"], errors="coerce").fillna(
0.0
),
slcsp=pd.to_numeric(tax_units["slcsp"], errors="coerce").fillna(0.0),
takes_up_aca=self._normal_bool_series(
tax_units["takes_up_aca_if_eligible"],
index=tax_units.index,
),
)
)
return PolicyEngineUSEntityTableBundle(
households=materialized_tables.households,
persons=materialized_tables.persons,
tax_units=tax_units,
spm_units=materialized_tables.spm_units,
families=materialized_tables.families,
marital_units=materialized_tables.marital_units,
)

def _attach_policyengine_simple_tax_unit_takeup(
self,
tax_units: pd.DataFrame,
Expand Down
76 changes: 74 additions & 2 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,47 @@ class PolicyEngineUSVariableMaterializationResult:
| POLICYENGINE_US_DATA_OVERRIDABLE_COMPUTED_EXPORT_VARIABLES
)

MARKETPLACE_PLAN_BENCHMARK_RATIO_MIN = 0.5
MARKETPLACE_PLAN_BENCHMARK_RATIO_MAX = 1.5
MARKETPLACE_PLAN_BENCHMARK_RATIO_COLUMN = "selected_marketplace_plan_benchmark_ratio"
MARKETPLACE_PLAN_BENCHMARK_RATIO_SOURCE_COLUMNS: frozenset[str] = frozenset(
{
"health_insurance_premiums_without_medicare_part_b",
"aca_ptc",
"slcsp",
"takes_up_aca_if_eligible",
}
)


def compute_marketplace_plan_benchmark_ratio(
*,
reported_premium: Any,
aca_ptc: Any,
slcsp: Any,
takes_up_aca: Any,
) -> np.ndarray:
"""Back out selected Marketplace plan cost relative to SLCSP.

``selected_marketplace_plan_benchmark_ratio`` is a persisted eCPS input,
not SLCSP itself. PE-US still computes SLCSP from geography and family
composition; this ratio carries the selected-plan-to-benchmark adjustment
for tax units modeled as taking up Marketplace coverage.
"""
reported = np.asarray(reported_premium, dtype=float)
ptc = np.asarray(aca_ptc, dtype=float)
benchmark = np.asarray(slcsp, dtype=float)
takeup = np.asarray(takes_up_aca, dtype=bool)
with np.errstate(divide="ignore", invalid="ignore"):
raw = (reported + ptc) / np.where(benchmark > 0, benchmark, 1.0)
clipped = np.clip(
raw,
MARKETPLACE_PLAN_BENCHMARK_RATIO_MIN,
MARKETPLACE_PLAN_BENCHMARK_RATIO_MAX,
)
applicable = takeup & (benchmark > 0)
return np.where(applicable, clipped, 1.0)


def compute_policyengine_us_definition_hash(
constraints: tuple[PolicyEngineUSConstraint, ...] | list[PolicyEngineUSConstraint],
Expand Down Expand Up @@ -3050,14 +3091,15 @@ def build_policyengine_us_export_variable_maps(
)
household_table = _with_policyengine_household_export_derivatives(tables.households)
person_table = _with_policyengine_person_export_derivatives(tables.persons)
tax_unit_table = _with_policyengine_tax_unit_export_derivatives(tables.tax_units)
table_specs = (
(
"household",
household_table,
{"household_id", "household_weight", "weight"},
),
("person", person_table, {"person_id", "household_id"}),
("tax_unit", tables.tax_units, {"tax_unit_id", "household_id"}),
("tax_unit", tax_unit_table, {"tax_unit_id", "household_id"}),
("spm_unit", tables.spm_units, {"spm_unit_id", "household_id"}),
("family", tables.families, {"family_id", "household_id"}),
)
Expand Down Expand Up @@ -3141,6 +3183,7 @@ def build_policyengine_us_time_period_arrays(
tables.persons,
period=int(period),
)
tax_unit_table = _with_policyengine_tax_unit_export_derivatives(tables.tax_units)
persons = _prepare_person_export_table(
person_table,
person_id_column=person_id_column,
Expand Down Expand Up @@ -3184,7 +3227,7 @@ def build_policyengine_us_time_period_arrays(
(
"tax_unit",
"tax_unit_id",
tables.tax_units,
tax_unit_table,
tax_unit_variable_map,
"household",
),
Expand Down Expand Up @@ -3651,6 +3694,35 @@ def _with_policyengine_person_export_derivatives(
return person_table


def _with_policyengine_tax_unit_export_derivatives(
tax_units: pd.DataFrame | None,
) -> pd.DataFrame | None:
if tax_units is None:
return tax_units

if not MARKETPLACE_PLAN_BENCHMARK_RATIO_SOURCE_COLUMNS.issubset(tax_units.columns):
return tax_units

tax_unit_table = tax_units.copy()
tax_unit_table[MARKETPLACE_PLAN_BENCHMARK_RATIO_COLUMN] = (
compute_marketplace_plan_benchmark_ratio(
reported_premium=pd.to_numeric(
tax_unit_table["health_insurance_premiums_without_medicare_part_b"],
errors="coerce",
).fillna(0.0),
aca_ptc=pd.to_numeric(tax_unit_table["aca_ptc"], errors="coerce").fillna(
0.0
),
slcsp=pd.to_numeric(tax_unit_table["slcsp"], errors="coerce").fillna(0.0),
takes_up_aca=_truthy_series(
tax_unit_table["takes_up_aca_if_eligible"],
index=tax_unit_table.index,
),
)
)
return tax_unit_table


def _derive_has_tin_for_export(persons: pd.DataFrame) -> pd.Series:
"""Mirror PE-US has_tin default while honoring MP's SSN-card type signal."""
if "ssn_card_type" not in persons.columns:
Expand Down
21 changes: 7 additions & 14 deletions tests/pipelines/test_export_lineage_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@ def test_export_lineage_manifest_tracks_source_backed_blocks():
"home_mortgage_interest",
"reported_has_medicaid_health_coverage_at_interview",
"ssn_card_type",
"selected_marketplace_plan_benchmark_ratio",
"weekly_hours_worked_before_lsr",
):
assert columns[column]["has_source_lineage"]

assert (
columns["selected_marketplace_plan_benchmark_ratio"]["export_path_status"]
== "default_only"
)
assert not columns["selected_marketplace_plan_benchmark_ratio"][
"has_source_lineage"
]


def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_path):
contract_path = tmp_path / "contract.json"
contract_path.write_text(
json.dumps(
{
"required": [
"selected_marketplace_plan_benchmark_ratio",
"is_wic_at_nutritional_risk",
"weekly_hours_worked_before_lsr",
],
"forbidden": [],
Expand All @@ -49,8 +42,8 @@ def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_pa
)
baseline_path = tmp_path / "baseline.h5"
with h5py.File(baseline_path, "w") as handle:
selected = handle.create_group("selected_marketplace_plan_benchmark_ratio")
selected.create_dataset("2024", data=np.array([0.8, 1.0]))
wic = handle.create_group("is_wic_at_nutritional_risk")
wic.create_dataset("2024", data=np.array([False, True]))
weekly_hours = handle.create_group("weekly_hours_worked_before_lsr")
weekly_hours.create_dataset("2024", data=np.array([0.0, 40.0]))

Expand All @@ -61,9 +54,9 @@ def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_pa

issues = {issue["column"]: issue for issue in payload["issues"]}
assert issues == {
"selected_marketplace_plan_benchmark_ratio": {
"column": "selected_marketplace_plan_benchmark_ratio",
"ecps_support_requirement": "numeric_nonzero",
"is_wic_at_nutritional_risk": {
"column": "is_wic_at_nutritional_risk",
"ecps_support_requirement": "categorical_variation",
"export_path_status": "default_only",
"issue": "ecps_populated_export_has_no_source_lineage",
}
Expand Down
12 changes: 6 additions & 6 deletions tests/pipelines/test_mp300k_artifact_gates.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,14 @@ def test_export_lineage_gate_rejects_ecps_populated_default_only_column(tmp_path
)
_add_period_dataset(
candidate_dataset,
"selected_marketplace_plan_benchmark_ratio",
[1.0, 1.0],
"is_wic_at_nutritional_risk",
[False, True],
)
baseline_dataset = _write_minimal_policyengine_dataset(tmp_path / "baseline.h5")
_add_period_dataset(
baseline_dataset,
"selected_marketplace_plan_benchmark_ratio",
[0.8, 1.2],
"is_wic_at_nutritional_risk",
[False, True],
)
benchmark_manifest = tmp_path / "benchmark_manifest.json"
_write_benchmark_manifest(benchmark_manifest)
Expand All @@ -469,8 +469,8 @@ def test_export_lineage_gate_rejects_ecps_populated_default_only_column(tmp_path
assert lineage_gate["status"] == "fail"
assert lineage_gate["details"]["issues"] == [
{
"column": "selected_marketplace_plan_benchmark_ratio",
"ecps_support_requirement": "numeric_nonzero",
"column": "is_wic_at_nutritional_risk",
"ecps_support_requirement": "categorical_variation",
"export_path_status": "default_only",
"issue": "ecps_populated_export_has_no_source_lineage",
}
Expand Down
Loading
Loading