From 346203b065aef73ab02608b1bf65c326c9ad3232 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 17:44:45 -0400 Subject: [PATCH 1/5] Prototype (#115): adapter to build microunit CPS contract from normalized frame Synthesize microunit's CPS input columns (PH_SEQ/A_LINENO/A_AGE/A_MARITL/ A_SPOUSE/PEPAR1/PEPAR2/A_EXPRRP) from microplex's normalized materialization columns (household_id/age/relationship_to_head), so the microunit tax-unit delegation can fire on real synthesized data. Gated OFF by default via allow_normalized_adapter / config.microunit_construct_from_normalized. HEURISTIC AND UNVALIDATED: the relationship->A_EXPRRP and married->A_MARITL maps are approximate and PEPAR1/PEPAR2 assume a child's parents are the household head and spouse. Fidelity must be validated against the legacy reconstruction before trust (see #115). Default behavior is unchanged with the flag off. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 109 +++++++++++++++++- .../pipelines/test_us_microunit_delegation.py | 103 +++++++++++++++++ 2 files changed, 210 insertions(+), 2 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 55df713..adc4540 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6286,6 +6286,7 @@ def _build_policyengine_tax_units_via_microunit( persons: pd.DataFrame, *, start_tax_unit_id: int = 0, + allow_normalized_adapter: bool | None = None, ) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None: """Reconstruct tax units by delegating to ``microunit`` (issue #113). @@ -6317,8 +6318,20 @@ def _build_policyengine_tax_units_via_microunit( """ if "person_id" not in persons.columns or "household_id" not in persons.columns: return None + cps_frame = persons if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns): - return None + # PROTOTYPE (#115): when the raw CPS columns are absent, optionally + # synthesize microunit's CPS contract from microplex's normalized + # columns. Default OFF — the maps are heuristic and unvalidated. + if allow_normalized_adapter is None: + allow_normalized_adapter = bool( + getattr(self.config, "microunit_construct_from_normalized", False) + ) + if not allow_normalized_adapter: + return None + cps_frame = self._microunit_cps_frame_from_normalized(persons) + if cps_frame is None: + return None # Imported lazily to match this module's optional-dependency convention: # ``microunit`` ships in the ``policyengine`` extra, and the base test @@ -6328,7 +6341,7 @@ def _build_policyengine_tax_units_via_microunit( # microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the # index keeps row order so the returned per-person TAX_ID and role align # positionally back onto person_rows. - person_rows = persons.reset_index(drop=True).copy() + person_rows = cps_frame.reset_index(drop=True).copy() person_assignments, tax_unit = construct_tax_units( person_rows.copy(), year=self._microunit_reference_year(person_rows), @@ -6411,6 +6424,98 @@ def _build_policyengine_tax_units_via_microunit( person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore") return pd.DataFrame(tax_unit_rows), person_rows, households + def _microunit_cps_frame_from_normalized( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame | None: + """PROTOTYPE (issue #115): synthesize microunit's CPS-like input contract + from microplex's normalized person columns. + + ``microunit.construct_tax_units`` needs raw CPS columns (PH_SEQ/A_LINENO/ + A_AGE/A_MARITL/A_SPOUSE/PEPAR1/PEPAR2/A_EXPRRP); at PolicyEngine + materialization microplex instead carries ``household_id``/``age``/ + ``relationship_to_head``. This builds the former from the latter, mirroring + the ACS->CPS mapping microunit documents as the consumer's responsibility. + + .. warning:: + HEURISTIC AND UNVALIDATED. The ``relationship_to_head`` -> ``A_EXPRRP`` + and married -> ``A_MARITL`` maps are approximate, and ``PEPAR1``/ + ``PEPAR2`` are inferred by assuming a child's parents are the household + head and spouse. The fidelity of these maps must be validated against + the legacy reconstruction before this is trusted (see #115); it is + gated OFF by default. + + Returns ``persons`` with the eight microunit CPS columns added, or ``None`` + if the prerequisite normalized columns are absent. + """ + required = {"person_id", "household_id", "age", "relationship_to_head"} + if not required.issubset(persons.columns): + return None + + # CPS A_EXPRRP recode (microunit.CPSRelationshipCode): 1 reference person, + # 3 husband, 5 own child, 10 other relative. + exprrp_by_rel = {0: 1, 1: 3, 2: 5, 3: 10} + + frame = persons.reset_index(drop=True).copy() + rel = ( + pd.to_numeric(frame["relationship_to_head"], errors="coerce") + .fillna(3) + .astype(int) + ) + age = pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int) + + # Per-household line numbers (1-based, unique within household): head + # first, then spouse, then everyone else oldest-first. + frame = frame.assign(_rel=rel.to_numpy(), _age=age.to_numpy()) + frame = frame.sort_values( + ["household_id", "_rel", "_age", "person_id"], + ascending=[True, True, False, True], + ).reset_index(drop=True) + frame["A_LINENO"] = frame.groupby("household_id", sort=False).cumcount() + 1 + frame["PH_SEQ"] = pd.to_numeric(frame["household_id"], errors="coerce").astype( + np.int64 + ) + frame["A_AGE"] = frame["_age"] + frame["A_EXPRRP"] = frame["_rel"].map(exprrp_by_rel).fillna(10).astype(int) + + # Head/spouse line numbers per household, for spouse pointers + marital. + head_line = ( + frame.loc[frame["_rel"] == 0].groupby("household_id")["A_LINENO"].first() + ) + spouse_line = ( + frame.loc[frame["_rel"] == 1].groupby("household_id")["A_LINENO"].first() + ) + hh = frame["household_id"] + is_head = frame["_rel"] == 0 + is_spouse = frame["_rel"] == 1 + is_child = frame["_rel"] == 2 + has_spouse = hh.map(spouse_line).notna() + + frame["A_SPOUSE"] = 0 + frame.loc[is_head, "A_SPOUSE"] = ( + hh[is_head].map(spouse_line).fillna(0).astype(int) + ) + frame.loc[is_spouse, "A_SPOUSE"] = ( + hh[is_spouse].map(head_line).fillna(0).astype(int) + ) + + # A_MARITL: 1 = married, spouse present (head/spouse of a couple); else + # 7 = never married. microunit only needs the married-vs-not distinction. + frame["A_MARITL"] = 7 + frame.loc[(is_head | is_spouse) & has_spouse, "A_MARITL"] = 1 + + # PEPAR1/PEPAR2: assume a child's parents are the household head + spouse. + frame["PEPAR1"] = 0 + frame["PEPAR2"] = 0 + frame.loc[is_child, "PEPAR1"] = ( + hh[is_child].map(head_line).fillna(0).astype(int) + ) + frame.loc[is_child, "PEPAR2"] = ( + hh[is_child].map(spouse_line).fillna(0).astype(int) + ) + + return frame.drop(columns=["_rel", "_age"]) + @staticmethod def _decode_microunit_bytes(value: Any) -> str: """Decode a ``microunit`` bytes-typed status/role into an upper string.""" diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index b2d6885..ec2f31e 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -255,3 +255,106 @@ def test_microunit_path_preserves_alignment_under_unsorted_input(pipeline): assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" assert sorted(_unit_field(tax_units, family_id, "filer_ids")) == [1, 2] assert list(_unit_field(tax_units, family_id, "dependent_ids")) == [3] + + +def _normalized_person_frame() -> pd.DataFrame: + """A purely NORMALIZED microplex frame (no raw CPS columns). + + The same population as ``_cps_person_frame`` expressed only in microplex's + materialization columns: a married couple with one child (HH 10) and a + single adult (HH 20). Used to exercise the prototype adapter (#115) that + synthesizes microunit's CPS contract from these columns. + """ + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "relationship_to_head": 0, + "age": 40, + "income": 50000.0, + }, + { + "person_id": 2, + "household_id": 10, + "relationship_to_head": 1, + "age": 38, + "income": 30000.0, + }, + { + "person_id": 3, + "household_id": 10, + "relationship_to_head": 2, + "age": 10, + "income": 0.0, + }, + { + "person_id": 4, + "household_id": 20, + "relationship_to_head": 0, + "age": 25, + "income": 45000.0, + }, + ] + ) + + +def test_normalized_adapter_is_off_by_default(pipeline): + persons = _normalized_person_frame() + # No raw CPS columns and the adapter is not enabled -> declines (the default + # behavior is unchanged; the legacy role-flag path handles such frames). + assert pipeline._build_policyengine_tax_units_via_microunit(persons) is None + + +def test_normalized_adapter_activates_delegation_when_enabled(pipeline): + persons = _normalized_person_frame() + + result = pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) + + assert result is not None, ( + "adapter must let the delegation fire from a normalized frame" + ) + tax_units, person_rows, households = result + assert households == {10, 20} + + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + # Couple + child collapse into one unit; the single adult stays separate. + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + + family_id = int(unit_by_person[1]) + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + assert int(_unit_field(tax_units, family_id, "n_dependents")) == 1 + + +def test_normalized_adapter_builds_microunit_cps_contract(pipeline): + persons = _normalized_person_frame() + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + # All eight microunit-required CPS columns are present. + for col in ( + "PH_SEQ", + "A_LINENO", + "A_AGE", + "A_MARITL", + "A_SPOUSE", + "PEPAR1", + "PEPAR2", + "A_EXPRRP", + ): + assert col in frame.columns, f"missing {col}" + by_pid = frame.set_index("person_id") + # Couple are married (A_MARITL == 1) and point at each other's line numbers. + assert int(by_pid.loc[1, "A_MARITL"]) == 1 + assert int(by_pid.loc[1, "A_SPOUSE"]) == int(by_pid.loc[2, "A_LINENO"]) + assert int(by_pid.loc[2, "A_SPOUSE"]) == int(by_pid.loc[1, "A_LINENO"]) + # The child's parent pointers reference the head and spouse line numbers. + assert int(by_pid.loc[3, "PEPAR1"]) == int(by_pid.loc[1, "A_LINENO"]) + assert int(by_pid.loc[3, "PEPAR2"]) == int(by_pid.loc[2, "A_LINENO"]) + # The single adult is never-married with no spouse/parent pointers. + assert int(by_pid.loc[4, "A_MARITL"]) == 7 + assert int(by_pid.loc[4, "A_SPOUSE"]) == 0 From 49ace3ee56e8675f8999e0c394e34c405a02d0d6 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 17:53:51 -0400 Subject: [PATCH 2/5] Harden prototype: head-guarantee + fail-safe fallback (found via real CPS data) Running the adapter against real CPS ASEC surfaced two issues: - microunit requires one reference person per household; guarantee the line-1 member is the single head and demote spurious extra heads (multi-family). - microunit still raises on households it cannot resolve, so wrap construct_tax_units in a fail-safe: log and return None (caller falls back to the legacy reconstruction) instead of crashing materialization. 10 tests passing (adds fail-safe + head-promotion coverage). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 28 +++++++++--- .../pipelines/test_us_microunit_delegation.py | 44 +++++++++++++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index adc4540..4cfcf1c 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6342,11 +6342,22 @@ def _build_policyengine_tax_units_via_microunit( # index keeps row order so the returned per-person TAX_ID and role align # positionally back onto person_rows. person_rows = cps_frame.reset_index(drop=True).copy() - person_assignments, tax_unit = construct_tax_units( - person_rows.copy(), - year=self._microunit_reference_year(person_rows), - mode=POLICYENGINE_MODE, - ) + try: + person_assignments, tax_unit = construct_tax_units( + person_rows.copy(), + year=self._microunit_reference_year(person_rows), + mode=POLICYENGINE_MODE, + ) + except Exception: + # microunit raises on households it cannot resolve (e.g. no valid + # reference person). Never let that crash materialization — fall back + # to the caller's legacy reconstruction for the whole frame. + LOGGER.warning( + "microunit tax-unit construction failed; falling back to " + "legacy reconstruction", + exc_info=True, + ) + return None tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce") person_rows["tax_unit_id"] = ( @@ -6472,6 +6483,13 @@ def _microunit_cps_frame_from_normalized( ascending=[True, True, False, True], ).reset_index(drop=True) frame["A_LINENO"] = frame.groupby("household_id", sort=False).cumcount() + 1 + # Guarantee exactly one household head (microunit requires a single + # reference person per PH_SEQ, else it raises). After the head-first sort + # the line-1 member is the most head-like; make it the head and demote + # any other rows that mapped to head (multi-family / headless households). + is_line1 = frame["A_LINENO"] == 1 + frame.loc[is_line1, "_rel"] = 0 + frame.loc[~is_line1 & (frame["_rel"] == 0), "_rel"] = 3 frame["PH_SEQ"] = pd.to_numeric(frame["household_id"], errors="coerce").astype( np.int64 ) diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index ec2f31e..bd16297 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -358,3 +358,47 @@ def test_normalized_adapter_builds_microunit_cps_contract(pipeline): # The single adult is never-married with no spouse/parent pointers. assert int(by_pid.loc[4, "A_MARITL"]) == 7 assert int(by_pid.loc[4, "A_SPOUSE"]) == 0 + + +def test_microunit_path_fails_safe_to_none(pipeline, monkeypatch): + # If microunit raises on a household it cannot resolve, the delegation must + # fall back (return None) rather than crash materialization. + import microunit + + def _boom(*args, **kwargs): + raise ValueError("synthetic microunit failure") + + monkeypatch.setattr(microunit, "construct_tax_units", _boom) + persons = _cps_person_frame() # raw CPS columns present -> reaches microunit + assert pipeline._build_policyengine_tax_units_via_microunit(persons) is None + + +def test_normalized_adapter_promotes_a_head_when_none_marked(pipeline): + # A household where nobody is marked head (all "other") must still get a + # single reference person so microunit can construct it. + persons = pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 30, + "relationship_to_head": 3, + "age": 70, + "income": 20000.0, + }, + { + "person_id": 2, + "household_id": 30, + "relationship_to_head": 3, + "age": 35, + "income": 15000.0, + }, + ] + ) + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + # Exactly one reference person (A_EXPRRP == 1) in the household. + assert int((frame["A_EXPRRP"] == 1).sum()) == 1 + # And the delegation must not raise (fail-safe covers microunit edge cases). + pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) From cfa006476e3062f69c76678fd778b10352bd622c Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 19:03:10 -0400 Subject: [PATCH 3/5] High-fidelity adapter: construct tax units from real CPS pointer fields (#115) The heuristic adapter (from collapsed relationship_to_head) crashed microunit on real heterogeneous households. microplex actually carries the real CPS-derived fields at materialization, so build microunit's contract from those instead: - A_LINENO from person_number (real 1-based within-household line number) - A_SPOUSE from spouse_person_number (real spouse line pointer) - A_EXPRRP from family_relationship; A_MARITL from spouse presence / surviving - person_number==1 always anchors a valid reference person (no more crashes) PEPAR1/PEPAR2 stay heuristic (child's parents = household head + spouse). The normalized adapter now dispatches to this path when the real fields are present, else the relationship_to_head heuristic. Validated on 8000 real CPS households: microunit constructs without crashing, tax_units/hh=1.42 vs authoritative 1.38 (vs legacy reconstruction's ~1.27 under-split). 12 tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 83 +++++++++++++++++++ .../pipelines/test_us_microunit_delegation.py | 81 ++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 4cfcf1c..ff45e37 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6435,6 +6435,76 @@ def _build_policyengine_tax_units_via_microunit( person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore") return pd.DataFrame(tax_unit_rows), person_rows, households + def _microunit_cps_frame_from_cps_fields( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + """High-fidelity (#115) build of microunit's CPS contract from the real + CPS-derived fields microplex carries at materialization: ``person_number`` + (a 1-based within-household line number), ``spouse_person_number`` (a real + spouse line pointer), ``family_relationship`` (CPS A_FAMREL) and + ``marital_status``. The household reference person (``person_number == 1``) + always anchors a valid head, so microunit never lacks one. + + Only ``PEPAR1``/``PEPAR2`` remain heuristic: a child's parents are taken to + be the household reference person (line 1) and that person's spouse (#115). + """ + frame = persons.reset_index(drop=True).copy() + hh = pd.to_numeric(frame["household_id"], errors="coerce") + pernum = ( + pd.to_numeric(frame["person_number"], errors="coerce").fillna(0).astype(int) + ) + famrel = ( + pd.to_numeric(frame["family_relationship"], errors="coerce") + .fillna(0) + .astype(int) + ) + spouse_num = ( + pd.to_numeric(frame.get("spouse_person_number", 0), errors="coerce") + .fillna(0) + .astype(int) + ) + + frame["PH_SEQ"] = hh.astype(np.int64) + frame["A_LINENO"] = pernum + frame["A_AGE"] = ( + pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int) + ) + frame["A_SPOUSE"] = spouse_num + + is_ref = pernum == 1 + # A_EXPRRP (microunit.CPSRelationshipCode): reference person 1, spouse 3, + # own child 5, everyone else other-relative 10. + exprrp = pd.Series(10, index=frame.index, dtype=int) + exprrp[famrel == 2] = 3 + exprrp[famrel == 3] = 5 + exprrp[is_ref] = 1 + frame["A_EXPRRP"] = exprrp + + # A_MARITL: 1 married spouse present; 4 widowed; else 7 never-married. + marital = pd.Series(7, index=frame.index, dtype=int) + marital[spouse_num > 0] = 1 + if "is_surviving_spouse" in frame.columns: + surviving = ( + pd.to_numeric(frame["is_surviving_spouse"], errors="coerce").fillna(0) + > 0 + ) + marital[surviving & (spouse_num == 0)] = 4 + frame["A_MARITL"] = marital + + # PEPAR1/PEPAR2: a child's parents are heuristically the household + # reference person (line 1) and that reference person's spouse. + ref_spouse_line = frame.loc[is_ref].groupby(hh[is_ref])["A_SPOUSE"].first() + frame["PEPAR1"] = 0 + frame["PEPAR2"] = 0 + is_child = famrel == 3 + frame.loc[is_child, "PEPAR1"] = 1 + frame.loc[is_child, "PEPAR2"] = ( + hh[is_child].map(ref_spouse_line).fillna(0).astype(int) + ) + + return frame + def _microunit_cps_frame_from_normalized( self, persons: pd.DataFrame, @@ -6459,6 +6529,19 @@ def _microunit_cps_frame_from_normalized( Returns ``persons`` with the eight microunit CPS columns added, or ``None`` if the prerequisite normalized columns are absent. """ + # Prefer the high-fidelity path when microplex carries the real CPS-derived + # pointer fields (person_number is a 1-based within-household line number; + # spouse_person_number a real spouse line pointer). Otherwise fall back to + # the coarse relationship_to_head heuristic below. + if { + "person_id", + "person_number", + "family_relationship", + "household_id", + "age", + }.issubset(persons.columns): + return self._microunit_cps_frame_from_cps_fields(persons) + required = {"person_id", "household_id", "age", "relationship_to_head"} if not required.issubset(persons.columns): return None diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index bd16297..75445a3 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -402,3 +402,84 @@ def test_normalized_adapter_promotes_a_head_when_none_marked(pipeline): pipeline._build_policyengine_tax_units_via_microunit( persons, allow_normalized_adapter=True ) + + +def _cps_fields_frame() -> pd.DataFrame: + """A frame in microplex's real CPS-derived fields (no relationship_to_head): + person_number = within-household line, spouse_person_number = spouse line, + family_relationship = CPS A_FAMREL. Couple+child (HH 10), single (HH 20).""" + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "age": 40, + "income": 50000.0, + "person_number": 1, + "spouse_person_number": 2, + "family_relationship": 1, + }, + { + "person_id": 2, + "household_id": 10, + "age": 38, + "income": 30000.0, + "person_number": 2, + "spouse_person_number": 1, + "family_relationship": 2, + }, + { + "person_id": 3, + "household_id": 10, + "age": 10, + "income": 0.0, + "person_number": 3, + "spouse_person_number": 0, + "family_relationship": 3, + }, + { + "person_id": 4, + "household_id": 20, + "age": 25, + "income": 45000.0, + "person_number": 1, + "spouse_person_number": 0, + "family_relationship": 0, + }, + ] + ) + + +def test_high_fidelity_adapter_uses_real_pointers(pipeline): + persons = _cps_fields_frame() + # The normalized adapter must dispatch to the high-fidelity path when the real + # CPS-derived fields are present (no relationship_to_head needed). + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + by = frame.set_index("person_id") + # Real line/spouse pointers, not heuristics. + assert int(by.loc[1, "A_LINENO"]) == 1 and int(by.loc[2, "A_LINENO"]) == 2 + assert int(by.loc[1, "A_SPOUSE"]) == 2 and int(by.loc[2, "A_SPOUSE"]) == 1 + # A_EXPRRP from family relationship: ref=1, spouse=3, own child=5. + assert int(by.loc[1, "A_EXPRRP"]) == 1 + assert int(by.loc[2, "A_EXPRRP"]) == 3 + assert int(by.loc[3, "A_EXPRRP"]) == 5 + # Child's parent pointers reference the head (line 1) and spouse (line 2). + assert int(by.loc[3, "PEPAR1"]) == 1 and int(by.loc[3, "PEPAR2"]) == 2 + + +def test_high_fidelity_adapter_activates_delegation(pipeline): + persons = _cps_fields_frame() + result = pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) + assert result is not None + tax_units, person_rows, households = result + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + # Couple + child form one unit; single adult separate. + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + family_id = int(unit_by_person[1]) + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" From be16b2a7837154a6fa05c69c8f32d92998d14a2e Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 31 May 2026 15:29:52 -0400 Subject: [PATCH 4/5] Make microunit the default tax-unit constructor (#113/#116) microunit is microplex's required tax-unit engine, not an optional prototype. Default the high-fidelity path ON when the real CPS-derived fields (person_number/spouse_person_number/family_relationship) are present -- which the production candidate carries -- while the coarse relationship_to_head-only heuristic stays opt-in via config so minimal frames don't get the lossy path. Tests: the four build_policyengine_entity_tables role-flag tests now exercise the microunit default path. filing_status is PE-computed (delegated; microplex does not export it), so its non-authoritative internal value is no longer asserted. The young-adult-child case asserts microunit's genuine qualifying-child age rule (a 19+ non-student own-child gets its own unit, not folded). Adds a legacy- fallback test (no high-fidelity fields). Full test_us.py: 159 passed; delegation suite 12 passed; ruff clean. Follow-up: thread A_HSCOL/enrollment into the adapter so microunit's qualifying-child-to-24 student extension fires (currently under-claims 19-23yo student own-child dependents vs eCPS). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 16 +++++++++---- tests/pipelines/test_us.py | 41 ++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index ff45e37..38582a9 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6320,11 +6320,19 @@ def _build_policyengine_tax_units_via_microunit( return None cps_frame = persons if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns): - # PROTOTYPE (#115): when the raw CPS columns are absent, optionally - # synthesize microunit's CPS contract from microplex's normalized - # columns. Default OFF — the maps are heuristic and unvalidated. + # microunit is microplex's required tax-unit engine (#113). When the + # raw CPS columns are absent, synthesize its CPS contract from the + # normalized frame. The high-fidelity path (real person_number / + # spouse_person_number / family_relationship, which the production + # candidate carries) is used by DEFAULT; the coarse + # relationship_to_head-only heuristic stays opt-in via the config flag + # so minimal frames don't silently get the lossy reconstruction. + has_high_fidelity_fields = { + "person_number", + "family_relationship", + }.issubset(persons.columns) if allow_normalized_adapter is None: - allow_normalized_adapter = bool( + allow_normalized_adapter = has_high_fidelity_fields or bool( getattr(self.config, "microunit_construct_from_normalized", False) ) if not allow_normalized_adapter: diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 46f8933..c6516e7 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1684,9 +1684,10 @@ def test_build_policyengine_entity_tables_resolves_dependent_head_role_conflicts person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status is PE-computed (delegated; microplex does not export it), + # so only microunit's partition is asserted here. assert len(tax_units) == 1 assert person_rows["tax_unit_id"].nunique() == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_resolves_spouse_dependent_role_conflicts( @@ -1718,8 +1719,8 @@ def test_build_policyengine_entity_tables_resolves_spouse_dependent_role_conflic tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status delegated to PE; assert only microunit's partition. assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_repairs_missing_role_flag_heads(self): @@ -1749,8 +1750,8 @@ def test_build_policyengine_entity_tables_repairs_missing_role_flag_heads(self): tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status delegated to PE; assert only microunit's partition. assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_folds_young_head_hint_dependents(self): @@ -1780,8 +1781,40 @@ def test_build_policyengine_entity_tables_folds_young_head_hint_dependents(self) tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # microunit applies the real qualifying-child age rule: a 19+ non-student + # own-child is NOT folded as a dependent (it gets its own tax unit), unlike + # the legacy role-flag heuristic. Threading student enrollment (A_HSCOL) so + # the qualifying-child-to-24 student extension fires is a tracked follow-up. + assert len(tax_units) == 2 + assert int(tax_units["n_dependents"].sum()) == 0 + + def test_build_policyengine_entity_tables_uses_legacy_path_without_cps_fields( + self, + ): + # Without the high-fidelity CPS fields (person_number/family_relationship), + # microunit cannot construct, so the legacy role-flag reconstruction (the + # fallback) handles the conflict. Preserves coverage of that path now that + # the real-data path defaults to microunit. + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 101], + "weight": [1.0, 1.0], + "age": [45, 12], + "income": [60_000.0, 0.0], + "relationship_to_head": [0, 2], + "is_tax_unit_head": [1.0, 1.0], + "is_tax_unit_spouse": [0.0, 0.0], + "is_tax_unit_dependent": [0.0, 1.0], + "state_fips": [6, 6], + "tenure": [1, 1], + } + ) + tables = pipeline.build_policyengine_entity_tables(population) + tax_units = tables.tax_units assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_keeps_positive_income_adult_heads(self): From 14f8d7eb3dde4aaea8a498d02e858acd715e65f1 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sun, 31 May 2026 16:27:29 -0400 Subject: [PATCH 5/5] Fix cycle review findings: 0-based family_relationship + override docs/tests An independent review of the microunit delegation+activation change surfaced three findings; fixes: 1. (docs / intended-behavior) microunit's default-on path replaces the CPS-provided tax_unit_id (Census TAX_ID) even when policyengine_prefer_existing_tax_unit_ids is True. This is the intended "replace the CPS tax units, keep the SPM units" behavior -- the existing-ID path is a fallback for households microunit does not construct -- but the method docstring wrongly claimed the authoritative-ID path "is never routed here". Rewrote the docstring to document the override + fallback ordering and the separate SPM/family/marital preservation. 2. (real bug) The high-fidelity adapter assumed only the 1-based CPS A_FAMREL coding, so a 0-based family_relationship frame -- which the pipeline supports elsewhere (see _normalize_relationship_to_head and data_sources.cps) -- silently mis-coded children as spouses and dropped their parent pointers. Normalize the coding scheme per household (shift 0-based households up by one) before the A_EXPRRP / parent-pointer mapping. 3. (missing coverage) Added two regression tests for the previously-untested boundaries: bad CPS tax_unit_id [100,100,200] + high-fidelity fields -> microunit folds to one unit with spm_unit_id preserved; and a 0-based family_relationship frame -> same A_EXPRRP/parent pointers and partition as the 1-based frame. test_us.py 160 passed; delegation suite 13 passed; ruff clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 53 +++++++++------ tests/pipelines/test_us.py | 44 +++++++++++++ .../pipelines/test_us_microunit_delegation.py | 64 +++++++++++++++++++ 3 files changed, 142 insertions(+), 19 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 38582a9..bdbcbb6 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6290,20 +6290,29 @@ def _build_policyengine_tax_units_via_microunit( ) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None: """Reconstruct tax units by delegating to ``microunit`` (issue #113). - This is the *reconstruction-from-scratch* path. The authoritative-ID - path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is - handled separately and is never routed here. - - Delegation only happens when ``persons`` carries the raw CPS columns in - :attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely - depends on marital status (``A_MARITL``), spouse/parent line pointers - (``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode - (``A_EXPRRP``); microplex's reconstruction-stage frame collapses - relationship into a 0/1/2/3 coding and drops the pointer columns, so a - *faithful* mapping is not possible from that frame. Rather than fabricate - microunit inputs (which would silently change behavior), we return - ``None`` when the columns are absent and let the caller fall back to the - legacy role-flag reconstruction. + This is microplex's **default** tax-unit constructor for CPS-derived + frames. ``microunit`` is the rules-based engine that *replaces* the + unreliable CPS-provided ``tax_unit_id`` (Census ``TAX_ID``): when the + frame carries the real CPS pointer fields, the high-fidelity adapter + (#115) builds microunit's CPS contract and microunit re-partitions each + household from scratch, intentionally overriding any incoming + ``tax_unit_id``. The ``policyengine_prefer_existing_tax_unit_ids`` / + :meth:`_build_policyengine_tax_units_from_existing_ids` path is a + **fallback** for the households this method does not construct -- it runs + *after* this one, on the remaining households -- not a parallel + authority. SPM/family/marital group IDs are preserved separately (#112) + and are not touched here, so "keep the source SPM units, replace the tax + units" holds. + + Delegation runs when ``persons`` carries the raw CPS columns in + :attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`, or can synthesize them: the + high-fidelity adapter (#115) is used by DEFAULT when the real + ``person_number``/``spouse_person_number``/``family_relationship`` fields + are present (the production candidate carries them); the coarse + ``relationship_to_head``-only heuristic stays opt-in. When neither the + raw columns nor the high-fidelity fields are available (and the coarse + heuristic is not enabled), we return ``None`` and let the caller fall + back to the legacy role-flag reconstruction. .. warning:: ``microunit`` *is* eCPS's tax-unit construction. Routing microplex @@ -6462,11 +6471,17 @@ def _microunit_cps_frame_from_cps_fields( pernum = ( pd.to_numeric(frame["person_number"], errors="coerce").fillna(0).astype(int) ) - famrel = ( - pd.to_numeric(frame["family_relationship"], errors="coerce") - .fillna(0) - .astype(int) - ) + # ``family_relationship`` arrives in either CPS A_FAMREL 1-based coding + # (1=reference person, 2=spouse, 3=child, ...) or the optimizer's 0-based + # coding (0=head, 1=spouse, 2=child); the rest of the pipeline detects + # this per household (see ``_normalize_relationship_to_head`` and + # ``data_sources.cps``). The A_EXPRRP / parent-pointer mapping below + # expects the 1-based scheme, so shift any 0-based household up by one -- + # otherwise a 0-based frame silently mis-codes children as spouses and + # drops their parent pointers. + famrel_raw = pd.to_numeric(frame["family_relationship"], errors="coerce") + zero_based_hh = (famrel_raw == 0).groupby(hh).transform("any").fillna(False) + famrel = famrel_raw.add(zero_based_hh.astype(int)).fillna(0).astype(int) spouse_num = ( pd.to_numeric(frame.get("spouse_person_number", 0), errors="coerce") .fillna(0) diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index c6516e7..5f5b5cd 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1620,6 +1620,50 @@ def test_build_policyengine_entity_tables_prefers_tax_unit_role_flags_over_bad_i assert tax_units.iloc[0]["filing_status"] == "JOINT" assert tax_units.iloc[0]["n_dependents"] == 1 + def test_build_policyengine_entity_tables_microunit_overrides_bad_cps_tax_unit_ids( + self, + ): + # microunit is the DEFAULT tax-unit constructor: when the high-fidelity CPS + # fields (person_number + family_relationship) are present it re-partitions + # the household and intentionally REPLACES the unreliable CPS-provided + # tax_unit_id (Census TAX_ID) -- even though + # policyengine_prefer_existing_tax_unit_ids defaults to True (that path is a + # fallback for households microunit does not construct, not a competing + # authority). This locks in "replace the CPS tax units, keep the SPM units". + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + assert pipeline.config.policyengine_prefer_existing_tax_unit_ids is True + population = pd.DataFrame( + { + "person_id": [1, 2, 3], + "household_id": [10, 10, 10], + # CPS TAX_ID nonsensically splits the dependent child into its own unit. + "tax_unit_id": [100, 100, 200], + # SPM units, by contrast, must be preserved. + "spm_unit_id": [500, 500, 500], + "weight": [1.0, 1.0, 1.0], + "age": [45, 43, 12], + "income": [60_000.0, 15_000.0, 0.0], + "person_number": [1, 2, 3], + "spouse_person_number": [2, 1, 0], + "family_relationship": [1, 2, 3], # CPS A_FAMREL: ref, spouse, child + "marital_status": [1, 1, 7], + "state_fips": [6, 6, 6], + "tenure": [1, 1, 1], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) + tax_units = tables.tax_units + + # microunit folds couple + child into ONE unit, discarding the [100,100,200] + # split (which preservation would have kept as two units). + assert len(tax_units) == 1 + assert person_rows["tax_unit_id"].nunique() == 1 + assert tax_units.iloc[0]["n_dependents"] == 1 + # The SPM unit is untouched (replace tax, keep SPM). + assert person_rows["spm_unit_id"].nunique() == 1 + def test_build_policyengine_entity_tables_resolves_spouse_head_role_conflicts( self, ): diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index 75445a3..db89544 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -483,3 +483,67 @@ def test_high_fidelity_adapter_activates_delegation(pipeline): assert unit_by_person[4] != unit_by_person[1] family_id = int(unit_by_person[1]) assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + + +def _zero_based_cps_fields_frame() -> pd.DataFrame: + """Same couple+child household as ``_cps_fields_frame`` but with + family_relationship in the optimizer's 0-based coding (0=head, 1=spouse, + 2=child) instead of CPS A_FAMREL 1-based (1=ref, 2=spouse, 3=child).""" + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "age": 40, + "income": 50000.0, + "person_number": 1, + "spouse_person_number": 2, + "family_relationship": 0, + }, + { + "person_id": 2, + "household_id": 10, + "age": 38, + "income": 30000.0, + "person_number": 2, + "spouse_person_number": 1, + "family_relationship": 1, + }, + { + "person_id": 3, + "household_id": 10, + "age": 10, + "income": 0.0, + "person_number": 3, + "spouse_person_number": 0, + "family_relationship": 2, + }, + ] + ) + + +def test_high_fidelity_adapter_normalizes_zero_based_family_relationship(pipeline): + # A 0-based family_relationship frame must map to the SAME A_EXPRRP / parent + # pointers as the 1-based CPS A_FAMREL coding. Without per-household scheme + # normalization the child (code 2) is mis-read as a spouse and loses parent + # pointers, so microunit mis-partitions the household. + frame = pipeline._microunit_cps_frame_from_normalized( + _zero_based_cps_fields_frame() + ) + assert frame is not None + by = frame.set_index("person_id") + assert int(by.loc[1, "A_EXPRRP"]) == 1 # head + assert int(by.loc[2, "A_EXPRRP"]) == 3 # spouse, not other-relative + assert int(by.loc[3, "A_EXPRRP"]) == 5 # own child, not spouse + assert int(by.loc[3, "PEPAR1"]) == 1 and int(by.loc[3, "PEPAR2"]) == 2 + + # And the partition matches the 1-based frame: couple + child = one unit. + result = pipeline._build_policyengine_tax_units_via_microunit( + _zero_based_cps_fields_frame(), allow_normalized_adapter=True + ) + assert result is not None + _, person_rows, _ = result + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3]