diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 55df713..bdbcbb6 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6286,23 +6286,33 @@ def _build_policyengine_tax_units_via_microunit( persons: pd.DataFrame, *, start_tax_unit_id: int = 0, + allow_normalized_adapter: bool | None = None, ) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None: """Reconstruct tax units by delegating to ``microunit`` (issue #113). - This is the *reconstruction-from-scratch* path. The authoritative-ID - path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is - handled separately and is never routed here. - - Delegation only happens when ``persons`` carries the raw CPS columns in - :attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely - depends on marital status (``A_MARITL``), spouse/parent line pointers - (``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode - (``A_EXPRRP``); microplex's reconstruction-stage frame collapses - relationship into a 0/1/2/3 coding and drops the pointer columns, so a - *faithful* mapping is not possible from that frame. Rather than fabricate - microunit inputs (which would silently change behavior), we return - ``None`` when the columns are absent and let the caller fall back to the - legacy role-flag reconstruction. + This is microplex's **default** tax-unit constructor for CPS-derived + frames. ``microunit`` is the rules-based engine that *replaces* the + unreliable CPS-provided ``tax_unit_id`` (Census ``TAX_ID``): when the + frame carries the real CPS pointer fields, the high-fidelity adapter + (#115) builds microunit's CPS contract and microunit re-partitions each + household from scratch, intentionally overriding any incoming + ``tax_unit_id``. The ``policyengine_prefer_existing_tax_unit_ids`` / + :meth:`_build_policyengine_tax_units_from_existing_ids` path is a + **fallback** for the households this method does not construct -- it runs + *after* this one, on the remaining households -- not a parallel + authority. SPM/family/marital group IDs are preserved separately (#112) + and are not touched here, so "keep the source SPM units, replace the tax + units" holds. + + Delegation runs when ``persons`` carries the raw CPS columns in + :attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`, or can synthesize them: the + high-fidelity adapter (#115) is used by DEFAULT when the real + ``person_number``/``spouse_person_number``/``family_relationship`` fields + are present (the production candidate carries them); the coarse + ``relationship_to_head``-only heuristic stays opt-in. When neither the + raw columns nor the high-fidelity fields are available (and the coarse + heuristic is not enabled), we return ``None`` and let the caller fall + back to the legacy role-flag reconstruction. .. warning:: ``microunit`` *is* eCPS's tax-unit construction. Routing microplex @@ -6317,8 +6327,28 @@ def _build_policyengine_tax_units_via_microunit( """ if "person_id" not in persons.columns or "household_id" not in persons.columns: return None + cps_frame = persons if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns): - return None + # microunit is microplex's required tax-unit engine (#113). When the + # raw CPS columns are absent, synthesize its CPS contract from the + # normalized frame. The high-fidelity path (real person_number / + # spouse_person_number / family_relationship, which the production + # candidate carries) is used by DEFAULT; the coarse + # relationship_to_head-only heuristic stays opt-in via the config flag + # so minimal frames don't silently get the lossy reconstruction. + has_high_fidelity_fields = { + "person_number", + "family_relationship", + }.issubset(persons.columns) + if allow_normalized_adapter is None: + allow_normalized_adapter = has_high_fidelity_fields or bool( + getattr(self.config, "microunit_construct_from_normalized", False) + ) + if not allow_normalized_adapter: + return None + cps_frame = self._microunit_cps_frame_from_normalized(persons) + if cps_frame is None: + return None # Imported lazily to match this module's optional-dependency convention: # ``microunit`` ships in the ``policyengine`` extra, and the base test @@ -6328,12 +6358,23 @@ def _build_policyengine_tax_units_via_microunit( # microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the # index keeps row order so the returned per-person TAX_ID and role align # positionally back onto person_rows. - person_rows = persons.reset_index(drop=True).copy() - person_assignments, tax_unit = construct_tax_units( - person_rows.copy(), - year=self._microunit_reference_year(person_rows), - mode=POLICYENGINE_MODE, - ) + person_rows = cps_frame.reset_index(drop=True).copy() + try: + person_assignments, tax_unit = construct_tax_units( + person_rows.copy(), + year=self._microunit_reference_year(person_rows), + mode=POLICYENGINE_MODE, + ) + except Exception: + # microunit raises on households it cannot resolve (e.g. no valid + # reference person). Never let that crash materialization — fall back + # to the caller's legacy reconstruction for the whole frame. + LOGGER.warning( + "microunit tax-unit construction failed; falling back to " + "legacy reconstruction", + exc_info=True, + ) + return None tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce") person_rows["tax_unit_id"] = ( @@ -6411,6 +6452,194 @@ def _build_policyengine_tax_units_via_microunit( person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore") return pd.DataFrame(tax_unit_rows), person_rows, households + def _microunit_cps_frame_from_cps_fields( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + """High-fidelity (#115) build of microunit's CPS contract from the real + CPS-derived fields microplex carries at materialization: ``person_number`` + (a 1-based within-household line number), ``spouse_person_number`` (a real + spouse line pointer), ``family_relationship`` (CPS A_FAMREL) and + ``marital_status``. The household reference person (``person_number == 1``) + always anchors a valid head, so microunit never lacks one. + + Only ``PEPAR1``/``PEPAR2`` remain heuristic: a child's parents are taken to + be the household reference person (line 1) and that person's spouse (#115). + """ + frame = persons.reset_index(drop=True).copy() + hh = pd.to_numeric(frame["household_id"], errors="coerce") + pernum = ( + pd.to_numeric(frame["person_number"], errors="coerce").fillna(0).astype(int) + ) + # ``family_relationship`` arrives in either CPS A_FAMREL 1-based coding + # (1=reference person, 2=spouse, 3=child, ...) or the optimizer's 0-based + # coding (0=head, 1=spouse, 2=child); the rest of the pipeline detects + # this per household (see ``_normalize_relationship_to_head`` and + # ``data_sources.cps``). The A_EXPRRP / parent-pointer mapping below + # expects the 1-based scheme, so shift any 0-based household up by one -- + # otherwise a 0-based frame silently mis-codes children as spouses and + # drops their parent pointers. + famrel_raw = pd.to_numeric(frame["family_relationship"], errors="coerce") + zero_based_hh = (famrel_raw == 0).groupby(hh).transform("any").fillna(False) + famrel = famrel_raw.add(zero_based_hh.astype(int)).fillna(0).astype(int) + spouse_num = ( + pd.to_numeric(frame.get("spouse_person_number", 0), errors="coerce") + .fillna(0) + .astype(int) + ) + + frame["PH_SEQ"] = hh.astype(np.int64) + frame["A_LINENO"] = pernum + frame["A_AGE"] = ( + pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int) + ) + frame["A_SPOUSE"] = spouse_num + + is_ref = pernum == 1 + # A_EXPRRP (microunit.CPSRelationshipCode): reference person 1, spouse 3, + # own child 5, everyone else other-relative 10. + exprrp = pd.Series(10, index=frame.index, dtype=int) + exprrp[famrel == 2] = 3 + exprrp[famrel == 3] = 5 + exprrp[is_ref] = 1 + frame["A_EXPRRP"] = exprrp + + # A_MARITL: 1 married spouse present; 4 widowed; else 7 never-married. + marital = pd.Series(7, index=frame.index, dtype=int) + marital[spouse_num > 0] = 1 + if "is_surviving_spouse" in frame.columns: + surviving = ( + pd.to_numeric(frame["is_surviving_spouse"], errors="coerce").fillna(0) + > 0 + ) + marital[surviving & (spouse_num == 0)] = 4 + frame["A_MARITL"] = marital + + # PEPAR1/PEPAR2: a child's parents are heuristically the household + # reference person (line 1) and that reference person's spouse. + ref_spouse_line = frame.loc[is_ref].groupby(hh[is_ref])["A_SPOUSE"].first() + frame["PEPAR1"] = 0 + frame["PEPAR2"] = 0 + is_child = famrel == 3 + frame.loc[is_child, "PEPAR1"] = 1 + frame.loc[is_child, "PEPAR2"] = ( + hh[is_child].map(ref_spouse_line).fillna(0).astype(int) + ) + + return frame + + def _microunit_cps_frame_from_normalized( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame | None: + """PROTOTYPE (issue #115): synthesize microunit's CPS-like input contract + from microplex's normalized person columns. + + ``microunit.construct_tax_units`` needs raw CPS columns (PH_SEQ/A_LINENO/ + A_AGE/A_MARITL/A_SPOUSE/PEPAR1/PEPAR2/A_EXPRRP); at PolicyEngine + materialization microplex instead carries ``household_id``/``age``/ + ``relationship_to_head``. This builds the former from the latter, mirroring + the ACS->CPS mapping microunit documents as the consumer's responsibility. + + .. warning:: + HEURISTIC AND UNVALIDATED. The ``relationship_to_head`` -> ``A_EXPRRP`` + and married -> ``A_MARITL`` maps are approximate, and ``PEPAR1``/ + ``PEPAR2`` are inferred by assuming a child's parents are the household + head and spouse. The fidelity of these maps must be validated against + the legacy reconstruction before this is trusted (see #115); it is + gated OFF by default. + + Returns ``persons`` with the eight microunit CPS columns added, or ``None`` + if the prerequisite normalized columns are absent. + """ + # Prefer the high-fidelity path when microplex carries the real CPS-derived + # pointer fields (person_number is a 1-based within-household line number; + # spouse_person_number a real spouse line pointer). Otherwise fall back to + # the coarse relationship_to_head heuristic below. + if { + "person_id", + "person_number", + "family_relationship", + "household_id", + "age", + }.issubset(persons.columns): + return self._microunit_cps_frame_from_cps_fields(persons) + + required = {"person_id", "household_id", "age", "relationship_to_head"} + if not required.issubset(persons.columns): + return None + + # CPS A_EXPRRP recode (microunit.CPSRelationshipCode): 1 reference person, + # 3 husband, 5 own child, 10 other relative. + exprrp_by_rel = {0: 1, 1: 3, 2: 5, 3: 10} + + frame = persons.reset_index(drop=True).copy() + rel = ( + pd.to_numeric(frame["relationship_to_head"], errors="coerce") + .fillna(3) + .astype(int) + ) + age = pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int) + + # Per-household line numbers (1-based, unique within household): head + # first, then spouse, then everyone else oldest-first. + frame = frame.assign(_rel=rel.to_numpy(), _age=age.to_numpy()) + frame = frame.sort_values( + ["household_id", "_rel", "_age", "person_id"], + ascending=[True, True, False, True], + ).reset_index(drop=True) + frame["A_LINENO"] = frame.groupby("household_id", sort=False).cumcount() + 1 + # Guarantee exactly one household head (microunit requires a single + # reference person per PH_SEQ, else it raises). After the head-first sort + # the line-1 member is the most head-like; make it the head and demote + # any other rows that mapped to head (multi-family / headless households). + is_line1 = frame["A_LINENO"] == 1 + frame.loc[is_line1, "_rel"] = 0 + frame.loc[~is_line1 & (frame["_rel"] == 0), "_rel"] = 3 + frame["PH_SEQ"] = pd.to_numeric(frame["household_id"], errors="coerce").astype( + np.int64 + ) + frame["A_AGE"] = frame["_age"] + frame["A_EXPRRP"] = frame["_rel"].map(exprrp_by_rel).fillna(10).astype(int) + + # Head/spouse line numbers per household, for spouse pointers + marital. + head_line = ( + frame.loc[frame["_rel"] == 0].groupby("household_id")["A_LINENO"].first() + ) + spouse_line = ( + frame.loc[frame["_rel"] == 1].groupby("household_id")["A_LINENO"].first() + ) + hh = frame["household_id"] + is_head = frame["_rel"] == 0 + is_spouse = frame["_rel"] == 1 + is_child = frame["_rel"] == 2 + has_spouse = hh.map(spouse_line).notna() + + frame["A_SPOUSE"] = 0 + frame.loc[is_head, "A_SPOUSE"] = ( + hh[is_head].map(spouse_line).fillna(0).astype(int) + ) + frame.loc[is_spouse, "A_SPOUSE"] = ( + hh[is_spouse].map(head_line).fillna(0).astype(int) + ) + + # A_MARITL: 1 = married, spouse present (head/spouse of a couple); else + # 7 = never married. microunit only needs the married-vs-not distinction. + frame["A_MARITL"] = 7 + frame.loc[(is_head | is_spouse) & has_spouse, "A_MARITL"] = 1 + + # PEPAR1/PEPAR2: assume a child's parents are the household head + spouse. + frame["PEPAR1"] = 0 + frame["PEPAR2"] = 0 + frame.loc[is_child, "PEPAR1"] = ( + hh[is_child].map(head_line).fillna(0).astype(int) + ) + frame.loc[is_child, "PEPAR2"] = ( + hh[is_child].map(spouse_line).fillna(0).astype(int) + ) + + return frame.drop(columns=["_rel", "_age"]) + @staticmethod def _decode_microunit_bytes(value: Any) -> str: """Decode a ``microunit`` bytes-typed status/role into an upper string.""" diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 46f8933..5f5b5cd 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1620,6 +1620,50 @@ def test_build_policyengine_entity_tables_prefers_tax_unit_role_flags_over_bad_i assert tax_units.iloc[0]["filing_status"] == "JOINT" assert tax_units.iloc[0]["n_dependents"] == 1 + def test_build_policyengine_entity_tables_microunit_overrides_bad_cps_tax_unit_ids( + self, + ): + # microunit is the DEFAULT tax-unit constructor: when the high-fidelity CPS + # fields (person_number + family_relationship) are present it re-partitions + # the household and intentionally REPLACES the unreliable CPS-provided + # tax_unit_id (Census TAX_ID) -- even though + # policyengine_prefer_existing_tax_unit_ids defaults to True (that path is a + # fallback for households microunit does not construct, not a competing + # authority). This locks in "replace the CPS tax units, keep the SPM units". + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + assert pipeline.config.policyengine_prefer_existing_tax_unit_ids is True + population = pd.DataFrame( + { + "person_id": [1, 2, 3], + "household_id": [10, 10, 10], + # CPS TAX_ID nonsensically splits the dependent child into its own unit. + "tax_unit_id": [100, 100, 200], + # SPM units, by contrast, must be preserved. + "spm_unit_id": [500, 500, 500], + "weight": [1.0, 1.0, 1.0], + "age": [45, 43, 12], + "income": [60_000.0, 15_000.0, 0.0], + "person_number": [1, 2, 3], + "spouse_person_number": [2, 1, 0], + "family_relationship": [1, 2, 3], # CPS A_FAMREL: ref, spouse, child + "marital_status": [1, 1, 7], + "state_fips": [6, 6, 6], + "tenure": [1, 1, 1], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) + tax_units = tables.tax_units + + # microunit folds couple + child into ONE unit, discarding the [100,100,200] + # split (which preservation would have kept as two units). + assert len(tax_units) == 1 + assert person_rows["tax_unit_id"].nunique() == 1 + assert tax_units.iloc[0]["n_dependents"] == 1 + # The SPM unit is untouched (replace tax, keep SPM). + assert person_rows["spm_unit_id"].nunique() == 1 + def test_build_policyengine_entity_tables_resolves_spouse_head_role_conflicts( self, ): @@ -1684,9 +1728,10 @@ def test_build_policyengine_entity_tables_resolves_dependent_head_role_conflicts person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status is PE-computed (delegated; microplex does not export it), + # so only microunit's partition is asserted here. assert len(tax_units) == 1 assert person_rows["tax_unit_id"].nunique() == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_resolves_spouse_dependent_role_conflicts( @@ -1718,8 +1763,8 @@ def test_build_policyengine_entity_tables_resolves_spouse_dependent_role_conflic tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status delegated to PE; assert only microunit's partition. assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_repairs_missing_role_flag_heads(self): @@ -1749,8 +1794,8 @@ def test_build_policyengine_entity_tables_repairs_missing_role_flag_heads(self): tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # filing_status delegated to PE; assert only microunit's partition. assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_folds_young_head_hint_dependents(self): @@ -1780,8 +1825,40 @@ def test_build_policyengine_entity_tables_folds_young_head_hint_dependents(self) tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + # microunit applies the real qualifying-child age rule: a 19+ non-student + # own-child is NOT folded as a dependent (it gets its own tax unit), unlike + # the legacy role-flag heuristic. Threading student enrollment (A_HSCOL) so + # the qualifying-child-to-24 student extension fires is a tracked follow-up. + assert len(tax_units) == 2 + assert int(tax_units["n_dependents"].sum()) == 0 + + def test_build_policyengine_entity_tables_uses_legacy_path_without_cps_fields( + self, + ): + # Without the high-fidelity CPS fields (person_number/family_relationship), + # microunit cannot construct, so the legacy role-flag reconstruction (the + # fallback) handles the conflict. Preserves coverage of that path now that + # the real-data path defaults to microunit. + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 101], + "weight": [1.0, 1.0], + "age": [45, 12], + "income": [60_000.0, 0.0], + "relationship_to_head": [0, 2], + "is_tax_unit_head": [1.0, 1.0], + "is_tax_unit_spouse": [0.0, 0.0], + "is_tax_unit_dependent": [0.0, 1.0], + "state_fips": [6, 6], + "tenure": [1, 1], + } + ) + tables = pipeline.build_policyengine_entity_tables(population) + tax_units = tables.tax_units assert len(tax_units) == 1 - assert tax_units.iloc[0]["filing_status"] == "SINGLE" assert tax_units.iloc[0]["n_dependents"] == 1 def test_build_policyengine_entity_tables_keeps_positive_income_adult_heads(self): diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index b2d6885..db89544 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -255,3 +255,295 @@ def test_microunit_path_preserves_alignment_under_unsorted_input(pipeline): assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" assert sorted(_unit_field(tax_units, family_id, "filer_ids")) == [1, 2] assert list(_unit_field(tax_units, family_id, "dependent_ids")) == [3] + + +def _normalized_person_frame() -> pd.DataFrame: + """A purely NORMALIZED microplex frame (no raw CPS columns). + + The same population as ``_cps_person_frame`` expressed only in microplex's + materialization columns: a married couple with one child (HH 10) and a + single adult (HH 20). Used to exercise the prototype adapter (#115) that + synthesizes microunit's CPS contract from these columns. + """ + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "relationship_to_head": 0, + "age": 40, + "income": 50000.0, + }, + { + "person_id": 2, + "household_id": 10, + "relationship_to_head": 1, + "age": 38, + "income": 30000.0, + }, + { + "person_id": 3, + "household_id": 10, + "relationship_to_head": 2, + "age": 10, + "income": 0.0, + }, + { + "person_id": 4, + "household_id": 20, + "relationship_to_head": 0, + "age": 25, + "income": 45000.0, + }, + ] + ) + + +def test_normalized_adapter_is_off_by_default(pipeline): + persons = _normalized_person_frame() + # No raw CPS columns and the adapter is not enabled -> declines (the default + # behavior is unchanged; the legacy role-flag path handles such frames). + assert pipeline._build_policyengine_tax_units_via_microunit(persons) is None + + +def test_normalized_adapter_activates_delegation_when_enabled(pipeline): + persons = _normalized_person_frame() + + result = pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) + + assert result is not None, ( + "adapter must let the delegation fire from a normalized frame" + ) + tax_units, person_rows, households = result + assert households == {10, 20} + + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + # Couple + child collapse into one unit; the single adult stays separate. + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + + family_id = int(unit_by_person[1]) + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + assert int(_unit_field(tax_units, family_id, "n_dependents")) == 1 + + +def test_normalized_adapter_builds_microunit_cps_contract(pipeline): + persons = _normalized_person_frame() + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + # All eight microunit-required CPS columns are present. + for col in ( + "PH_SEQ", + "A_LINENO", + "A_AGE", + "A_MARITL", + "A_SPOUSE", + "PEPAR1", + "PEPAR2", + "A_EXPRRP", + ): + assert col in frame.columns, f"missing {col}" + by_pid = frame.set_index("person_id") + # Couple are married (A_MARITL == 1) and point at each other's line numbers. + assert int(by_pid.loc[1, "A_MARITL"]) == 1 + assert int(by_pid.loc[1, "A_SPOUSE"]) == int(by_pid.loc[2, "A_LINENO"]) + assert int(by_pid.loc[2, "A_SPOUSE"]) == int(by_pid.loc[1, "A_LINENO"]) + # The child's parent pointers reference the head and spouse line numbers. + assert int(by_pid.loc[3, "PEPAR1"]) == int(by_pid.loc[1, "A_LINENO"]) + assert int(by_pid.loc[3, "PEPAR2"]) == int(by_pid.loc[2, "A_LINENO"]) + # The single adult is never-married with no spouse/parent pointers. + assert int(by_pid.loc[4, "A_MARITL"]) == 7 + assert int(by_pid.loc[4, "A_SPOUSE"]) == 0 + + +def test_microunit_path_fails_safe_to_none(pipeline, monkeypatch): + # If microunit raises on a household it cannot resolve, the delegation must + # fall back (return None) rather than crash materialization. + import microunit + + def _boom(*args, **kwargs): + raise ValueError("synthetic microunit failure") + + monkeypatch.setattr(microunit, "construct_tax_units", _boom) + persons = _cps_person_frame() # raw CPS columns present -> reaches microunit + assert pipeline._build_policyengine_tax_units_via_microunit(persons) is None + + +def test_normalized_adapter_promotes_a_head_when_none_marked(pipeline): + # A household where nobody is marked head (all "other") must still get a + # single reference person so microunit can construct it. + persons = pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 30, + "relationship_to_head": 3, + "age": 70, + "income": 20000.0, + }, + { + "person_id": 2, + "household_id": 30, + "relationship_to_head": 3, + "age": 35, + "income": 15000.0, + }, + ] + ) + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + # Exactly one reference person (A_EXPRRP == 1) in the household. + assert int((frame["A_EXPRRP"] == 1).sum()) == 1 + # And the delegation must not raise (fail-safe covers microunit edge cases). + pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) + + +def _cps_fields_frame() -> pd.DataFrame: + """A frame in microplex's real CPS-derived fields (no relationship_to_head): + person_number = within-household line, spouse_person_number = spouse line, + family_relationship = CPS A_FAMREL. Couple+child (HH 10), single (HH 20).""" + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "age": 40, + "income": 50000.0, + "person_number": 1, + "spouse_person_number": 2, + "family_relationship": 1, + }, + { + "person_id": 2, + "household_id": 10, + "age": 38, + "income": 30000.0, + "person_number": 2, + "spouse_person_number": 1, + "family_relationship": 2, + }, + { + "person_id": 3, + "household_id": 10, + "age": 10, + "income": 0.0, + "person_number": 3, + "spouse_person_number": 0, + "family_relationship": 3, + }, + { + "person_id": 4, + "household_id": 20, + "age": 25, + "income": 45000.0, + "person_number": 1, + "spouse_person_number": 0, + "family_relationship": 0, + }, + ] + ) + + +def test_high_fidelity_adapter_uses_real_pointers(pipeline): + persons = _cps_fields_frame() + # The normalized adapter must dispatch to the high-fidelity path when the real + # CPS-derived fields are present (no relationship_to_head needed). + frame = pipeline._microunit_cps_frame_from_normalized(persons) + assert frame is not None + by = frame.set_index("person_id") + # Real line/spouse pointers, not heuristics. + assert int(by.loc[1, "A_LINENO"]) == 1 and int(by.loc[2, "A_LINENO"]) == 2 + assert int(by.loc[1, "A_SPOUSE"]) == 2 and int(by.loc[2, "A_SPOUSE"]) == 1 + # A_EXPRRP from family relationship: ref=1, spouse=3, own child=5. + assert int(by.loc[1, "A_EXPRRP"]) == 1 + assert int(by.loc[2, "A_EXPRRP"]) == 3 + assert int(by.loc[3, "A_EXPRRP"]) == 5 + # Child's parent pointers reference the head (line 1) and spouse (line 2). + assert int(by.loc[3, "PEPAR1"]) == 1 and int(by.loc[3, "PEPAR2"]) == 2 + + +def test_high_fidelity_adapter_activates_delegation(pipeline): + persons = _cps_fields_frame() + result = pipeline._build_policyengine_tax_units_via_microunit( + persons, allow_normalized_adapter=True + ) + assert result is not None + tax_units, person_rows, households = result + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + # Couple + child form one unit; single adult separate. + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + family_id = int(unit_by_person[1]) + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + + +def _zero_based_cps_fields_frame() -> pd.DataFrame: + """Same couple+child household as ``_cps_fields_frame`` but with + family_relationship in the optimizer's 0-based coding (0=head, 1=spouse, + 2=child) instead of CPS A_FAMREL 1-based (1=ref, 2=spouse, 3=child).""" + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "age": 40, + "income": 50000.0, + "person_number": 1, + "spouse_person_number": 2, + "family_relationship": 0, + }, + { + "person_id": 2, + "household_id": 10, + "age": 38, + "income": 30000.0, + "person_number": 2, + "spouse_person_number": 1, + "family_relationship": 1, + }, + { + "person_id": 3, + "household_id": 10, + "age": 10, + "income": 0.0, + "person_number": 3, + "spouse_person_number": 0, + "family_relationship": 2, + }, + ] + ) + + +def test_high_fidelity_adapter_normalizes_zero_based_family_relationship(pipeline): + # A 0-based family_relationship frame must map to the SAME A_EXPRRP / parent + # pointers as the 1-based CPS A_FAMREL coding. Without per-household scheme + # normalization the child (code 2) is mis-read as a spouse and loses parent + # pointers, so microunit mis-partitions the household. + frame = pipeline._microunit_cps_frame_from_normalized( + _zero_based_cps_fields_frame() + ) + assert frame is not None + by = frame.set_index("person_id") + assert int(by.loc[1, "A_EXPRRP"]) == 1 # head + assert int(by.loc[2, "A_EXPRRP"]) == 3 # spouse, not other-relative + assert int(by.loc[3, "A_EXPRRP"]) == 5 # own child, not spouse + assert int(by.loc[3, "PEPAR1"]) == 1 and int(by.loc[3, "PEPAR2"]) == 2 + + # And the partition matches the 1-based frame: couple + child = one unit. + result = pipeline._build_policyengine_tax_units_via_microunit( + _zero_based_cps_fields_frame(), allow_normalized_adapter=True + ) + assert result is not None + _, person_rows, _ = result + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3]