From 25929f33225f09c04a7c2dfebf7fdfda4a63d0d0 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 13:57:13 -0400 Subject: [PATCH 1/3] Delegate PE tax-unit reconstruction to microunit (part of #113) Add microunit as a dependency and route the reconstruction-from-scratch tax-unit path through microunit.construct_tax_units when the person frame carries microunit's raw CPS input columns (PH_SEQ, A_LINENO, A_MARITL, A_SPOUSE, PEPAR1, PEPAR2, A_EXPRRP). When those columns are absent -- the current production case, since microplex's reconstruction frame collapses relationship_to_head and drops the spouse/parent pointers -- the new USPipeline._build_policyengine_tax_units_via_microunit returns None and the legacy role-flag reconstruction runs unchanged. The authoritative-ID path (#112) is never routed here. Net effect is behavior-preserving on today's data: the delegation stays inert until an upstream change threads CPS columns through to entity construction. microunit IS eCPS's tax-unit engine, so activating the delegation converges microplex's tax units toward eCPS's; any resulting loss movement is an entity-convergence effect and must be interpreted as such, not as a quality win (see #113). Adds tests/pipelines/test_us_microunit_delegation.py (4 passing); ruff clean. Implementation produced by the parallel wire-microunit agent; verified (ruff + delegation tests) and committed here. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyproject.toml | 5 + src/microplex_us/pipelines/us.py | 199 ++++++++++++++++ .../pipelines/test_us_microunit_delegation.py | 224 ++++++++++++++++++ uv.lock | 12 + 4 files changed, 440 insertions(+) create mode 100644 tests/pipelines/test_us_microunit_delegation.py diff --git a/pyproject.toml b/pyproject.toml index 3019ee1..d638859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,11 @@ policyengine = [ "microimpute==1.15.1 ; python_full_version >= '3.12' and python_full_version < '3.15'", "policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'", "spm-calculator>=0.3.1", + # Standalone tax-unit construction engine (the extraction of eCPS's + # tax-unit logic). Pinned to a pre-PyPI commit; used by the PolicyEngine + # pipeline to reconstruct tax units from CPS-like person frames (issue + # #113). Switch to a PyPI release once published. + "microunit @ git+https://github.com/PolicyEngine/microunit@d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1", ] [project.urls] diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index fcf9084..15f9367 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6262,12 +6262,193 @@ def _build_policyengine_tax_units( tax_units = pd.DataFrame(tax_unit_rows) return tax_units, person_rows + # Raw CPS ASEC columns that ``microunit.construct_tax_units`` consumes to + # reconstruct tax units. ``microunit`` is the standalone extraction of + # eCPS's tax-unit logic (issue #113); it is *source-agnostic* and expects + # this normalized CPS-like contract rather than microplex's collapsed + # ``relationship_to_head`` coding. We only delegate when the frame actually + # carries these columns, so the delegation is behavior-preserving on + # today's frames (which do not carry them) and only becomes active once an + # upstream change threads CPS columns through to entity construction. + _MICROUNIT_REQUIRED_CPS_COLUMNS = ( + "PH_SEQ", + "A_LINENO", + "A_AGE", + "A_MARITL", + "A_SPOUSE", + "PEPAR1", + "PEPAR2", + "A_EXPRRP", + ) + + def _build_policyengine_tax_units_via_microunit( + self, + persons: pd.DataFrame, + *, + start_tax_unit_id: int = 0, + ) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None: + """Reconstruct tax units by delegating to ``microunit`` (issue #113). + + This is the *reconstruction-from-scratch* path. The authoritative-ID + path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is + handled separately and is never routed here. + + Delegation only happens when ``persons`` carries the raw CPS columns in + :attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely + depends on marital status (``A_MARITL``), spouse/parent line pointers + (``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode + (``A_EXPRRP``); microplex's reconstruction-stage frame collapses + relationship into a 0/1/2/3 coding and drops the pointer columns, so a + *faithful* mapping is not possible from that frame. Rather than fabricate + microunit inputs (which would silently change behavior), we return + ``None`` when the columns are absent and let the caller fall back to the + legacy role-flag reconstruction. + + .. warning:: + ``microunit`` *is* eCPS's tax-unit construction. Routing microplex + through it makes microplex's constructed tax units **converge toward + eCPS's**. Any loss change from enabling this delegation is an + *entity-convergence* effect and must be interpreted as such, not as + a quality improvement. See issue #113. + + Returns the same ``(tax_units, person_rows, households)`` triple shape as + :meth:`_build_policyengine_tax_units_from_role_flags`, or ``None`` to + defer to the caller's fallback. + """ + if "person_id" not in persons.columns or "household_id" not in persons.columns: + return None + if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns): + return None + + # Imported lazily to match this module's optional-dependency convention: + # ``microunit`` ships in the ``policyengine`` extra, and the base test + # suite must import this module without that extra installed. + from microunit import POLICYENGINE_MODE, construct_tax_units + + # microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the + # index keeps row order so the returned per-person TAX_ID and role align + # positionally back onto person_rows. + person_rows = persons.reset_index(drop=True).copy() + person_assignments, tax_unit = construct_tax_units( + person_rows.copy(), + year=self._microunit_reference_year(person_rows), + mode=POLICYENGINE_MODE, + ) + + tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce") + person_rows["tax_unit_id"] = ( + tax_id.to_numpy() + int(start_tax_unit_id) + ).astype(np.int64) + # microunit emits an authoritative per-person HEAD/SPOUSE/DEPENDENT role; + # use it directly for the filer/dependent split rather than re-deriving + # from the (possibly absent) collapsed relationship_to_head coding. + person_rows["_microunit_role"] = [ + self._decode_microunit_bytes(role) + for role in person_assignments["tax_unit_role_input"].tolist() + ] + + filing_status_by_unit = { + int(row_tax_id) + int(start_tax_unit_id): self._decode_microunit_bytes( + filing_value + ) + for row_tax_id, filing_value in zip( + tax_unit["TAX_ID"].tolist(), + tax_unit["filing_status_input"].tolist(), + strict=True, + ) + } + + tax_unit_rows: list[dict[str, Any]] = [] + for unit_id, unit_persons in person_rows.groupby("tax_unit_id", sort=False): + ordered = unit_persons.sort_values( + ["_microunit_role", "age", "person_id"], + ascending=[True, False, True], + ).reset_index(drop=True) + is_filer = ordered["_microunit_role"].isin(["HEAD", "SPOUSE"]) + filer_ids = [ + int(person_id) for person_id in ordered.loc[is_filer, "person_id"] + ] + dependent_ids = [ + int(person_id) for person_id in ordered.loc[~is_filer, "person_id"] + ] + if not filer_ids: + filer_ids = [int(ordered.iloc[0]["person_id"])] + dependent_ids = [ + int(person_id) + for person_id in ordered["person_id"].tolist() + if int(person_id) not in filer_ids + ] + tax_unit_rows.append( + { + "tax_unit_id": int(unit_id), + "household_id": int(ordered.iloc[0]["household_id"]), + "filing_status": filing_status_by_unit.get(int(unit_id), "SINGLE"), + "member_ids": [ + int(person_id) for person_id in ordered["person_id"] + ], + "filer_ids": filer_ids, + "dependent_ids": dependent_ids, + "n_dependents": len(dependent_ids), + "total_income": float( + pd.to_numeric(ordered.get("income", 0.0), errors="coerce") + .fillna(0.0) + .sum() + ), + "tax_liability": 0.0, + **self._aggregate_policyengine_tax_unit_input_columns(ordered), + } + ) + + if not tax_unit_rows: + return None + + households = set(person_rows["household_id"].drop_duplicates().tolist()) + person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore") + return pd.DataFrame(tax_unit_rows), person_rows, households + + @staticmethod + def _decode_microunit_bytes(value: Any) -> str: + """Decode a ``microunit`` bytes-typed status/role into an upper string.""" + if isinstance(value, bytes): + return value.decode() + return str(value) + + def _microunit_reference_year(self, persons: pd.DataFrame) -> int: + """Year passed to ``microunit`` for its dependency income thresholds. + + Prefers an explicit ``year``/``tax_year`` column when the frame carries + one; otherwise falls back to the pipeline's configured reference year so + the only year-dependent behavior (the qualifying-relative gross income + limit) matches the rest of the pipeline. TODO(#113): thread the dataset + reference year through entity construction explicitly. + """ + for column in ("year", "tax_year"): + if column in persons.columns: + values = pd.to_numeric(persons[column], errors="coerce").dropna() + if not values.empty: + return int(values.iloc[0]) + configured = getattr(self.config, "reference_year", None) + if configured is not None: + return int(configured) + return 2024 + def _build_policyengine_tax_units_from_role_flags( self, persons: pd.DataFrame, *, start_tax_unit_id: int = 0, ) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None: + # Issue #113: when the frame carries microunit's CPS-style input + # columns, delegate the reconstruction to microunit. Otherwise fall + # through to the legacy role-flag reconstruction below (the current + # production path, since these columns are not yet threaded through). + microunit_result = self._build_policyengine_tax_units_via_microunit( + persons, + start_tax_unit_id=start_tax_unit_id, + ) + if microunit_result is not None: + return microunit_result + role_columns = { "is_tax_unit_head", "is_tax_unit_spouse", @@ -7326,6 +7507,16 @@ def _coerce_policyengine_status_code(self, value: Any) -> int | None: return int(numeric) def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame: + """Assign family and SPM units, preserving authoritative IDs when present. + + NOT delegated to ``microunit`` in this pass (issue #113). At the pinned + commit ``microunit.units.spm.assign_spm_partition`` is documented as "a + conservative adapter, not yet the full Census-parity constructor" and is + not exported from microunit's public API, and microunit has no + family-unit constructor. The authoritative-ID fast path is preserved + here. TODO(#113): delegate once microunit grows a Census-parity + SPM/family constructor. + """ result = persons.copy() preserved_family_ids = self._normalized_complete_existing_group_ids( result, @@ -7399,6 +7590,14 @@ def _assign_marital_units( self, persons: pd.DataFrame, ) -> pd.DataFrame: + """Assign marital units, preserving authoritative IDs when present. + + NOT delegated to ``microunit`` in this pass (issue #113): microunit does + not construct marital units at the pinned commit (filing status is its + only marital-related output; there is no ``construct_marital_units``). + The authoritative-ID fast path is preserved here. TODO(#113): revisit if + microunit grows marital-unit support. + """ result = persons.copy() preserved_marital_unit_ids = self._normalized_complete_existing_group_ids( result, diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py new file mode 100644 index 0000000..eb1992c --- /dev/null +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -0,0 +1,224 @@ +"""Tests for delegating PolicyEngine tax-unit reconstruction to ``microunit``. + +These exercise the reconstruction-from-scratch path added in issue #113, where +``USMicroplexPipeline._build_policyengine_tax_units_from_role_flags`` delegates to +:func:`microunit.construct_tax_units` when the person frame carries the raw CPS +columns ``microunit`` requires. The authoritative-ID path (#112) and the legacy +role-flag fallback are validated by ``test_us_entities.py``. + +``microunit`` ships in the ``policyengine`` optional dependency group, so these +tests skip when it is not installed. +""" + +from __future__ import annotations + +import pandas as pd +import pytest + +from microplex_us.pipelines.us import USMicroplexPipeline + +pytest.importorskip("microunit", reason="microunit is in the policyengine extra") + + +@pytest.fixture +def pipeline() -> USMicroplexPipeline: + return USMicroplexPipeline() + + +def _unit_field(tax_units: pd.DataFrame, unit_id: int, field: str): + """Read one tax unit's field robustly. + + ``tax_units`` has list-valued columns (``filer_ids`` etc.); ``DataFrame.iterrows`` + coerces a row to a single-dtype Series and mangles those, so we filter to the + single matching row and read the column's first (only) value instead. + """ + rows = tax_units.loc[tax_units["tax_unit_id"] == unit_id] + assert len(rows) == 1, f"expected exactly one unit {unit_id}, found {len(rows)}" + return rows[field].iloc[0] + + +def _cps_person_frame() -> pd.DataFrame: + """A two-household CPS-like frame microunit can consume directly. + + Household 10 is a married couple (lines 1 and 2) with one own child + (line 3, parent pointers to lines 1 and 2). Household 20 is a single adult. + The columns are the raw CPS ASEC names microunit expects; ``person_id`` and + ``household_id`` are microplex's own keys used to map results back. + """ + return pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "relationship_to_head": 0, + "age": 40, + "income": 50000.0, + "PH_SEQ": 10, + "A_LINENO": 1, + "A_AGE": 40, + "A_MARITL": 1, + "A_SPOUSE": 2, + "PEPAR1": 0, + "PEPAR2": 0, + "A_EXPRRP": 1, + "WSAL_VAL": 50000, + }, + { + "person_id": 2, + "household_id": 10, + "relationship_to_head": 1, + "age": 38, + "income": 30000.0, + "PH_SEQ": 10, + "A_LINENO": 2, + "A_AGE": 38, + "A_MARITL": 1, + "A_SPOUSE": 1, + "PEPAR1": 0, + "PEPAR2": 0, + "A_EXPRRP": 3, + "WSAL_VAL": 30000, + }, + { + "person_id": 3, + "household_id": 10, + "relationship_to_head": 2, + "age": 10, + "income": 0.0, + "PH_SEQ": 10, + "A_LINENO": 3, + "A_AGE": 10, + "A_MARITL": 7, + "A_SPOUSE": 0, + "PEPAR1": 1, + "PEPAR2": 2, + "A_EXPRRP": 5, + "WSAL_VAL": 0, + }, + { + "person_id": 4, + "household_id": 20, + "relationship_to_head": 0, + "age": 25, + "income": 45000.0, + "PH_SEQ": 20, + "A_LINENO": 1, + "A_AGE": 25, + "A_MARITL": 7, + "A_SPOUSE": 0, + "PEPAR1": 0, + "PEPAR2": 0, + "A_EXPRRP": 1, + "WSAL_VAL": 45000, + }, + ] + ) + + +def test_microunit_path_groups_married_couple_with_child(pipeline): + persons = _cps_person_frame() + + result = pipeline._build_policyengine_tax_units_via_microunit(persons) + + assert result is not None, "microunit path must activate when CPS columns exist" + tax_units, person_rows, households = result + + # Two units: the married-with-child family and the single adult. + assert households == {10, 20} + assert person_rows["tax_unit_id"].nunique() == 2 + assert len(tax_units) == 2 + + # The three household-10 members share one unit; the single adult is alone. + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + + family_id = int(unit_by_person[1]) + single_id = int(unit_by_person[4]) + + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + assert int(_unit_field(tax_units, family_id, "n_dependents")) == 1 + assert sorted(_unit_field(tax_units, family_id, "filer_ids")) == [1, 2] + assert list(_unit_field(tax_units, family_id, "dependent_ids")) == [3] + + assert _unit_field(tax_units, single_id, "filing_status") == "SINGLE" + assert int(_unit_field(tax_units, single_id, "n_dependents")) == 0 + assert list(_unit_field(tax_units, single_id, "filer_ids")) == [4] + assert list(_unit_field(tax_units, single_id, "dependent_ids")) == [] + + # The temporary role column must not leak into the returned person frame. + assert "_microunit_role" not in person_rows.columns + + +def test_microunit_path_honors_start_tax_unit_id(pipeline): + persons = _cps_person_frame() + + result = pipeline._build_policyengine_tax_units_via_microunit( + persons, start_tax_unit_id=100 + ) + + assert result is not None + _, person_rows, _ = result + assert int(person_rows["tax_unit_id"].min()) >= 100 + + +def test_role_flags_entry_delegates_to_microunit_when_cps_columns_present(pipeline): + persons = _cps_person_frame() + + via_entry = pipeline._build_policyengine_tax_units_from_role_flags(persons) + via_direct = pipeline._build_policyengine_tax_units_via_microunit(persons) + + assert via_entry is not None + assert via_direct is not None + _, entry_persons, entry_households = via_entry + _, direct_persons, direct_households = via_direct + + # The entry method must route through microunit (identical assignments), + # not the legacy role-flag reconstruction, when CPS columns are present. + assert entry_households == direct_households + pd.testing.assert_series_equal( + entry_persons.set_index("person_id")["tax_unit_id"], + direct_persons.set_index("person_id")["tax_unit_id"], + ) + + +def test_role_flags_entry_falls_back_without_cps_columns(pipeline): + # No CPS columns: only microplex role flags. microunit must NOT be used; + # the legacy role-flag reconstruction handles it (behavior-preserving). + persons = pd.DataFrame( + [ + { + "person_id": 1, + "household_id": 10, + "relationship_to_head": 0, + "age": 40, + "income": 50000.0, + "is_tax_unit_head": 1, + "is_tax_unit_spouse": 0, + "is_tax_unit_dependent": 0, + }, + { + "person_id": 2, + "household_id": 10, + "relationship_to_head": 1, + "age": 38, + "income": 30000.0, + "is_tax_unit_head": 0, + "is_tax_unit_spouse": 1, + "is_tax_unit_dependent": 0, + }, + ] + ) + + # The dedicated microunit adapter declines (missing CPS columns)... + assert pipeline._build_policyengine_tax_units_via_microunit(persons) is None + + # ...but the role-flag entry still succeeds via the legacy path. + result = pipeline._build_policyengine_tax_units_from_role_flags(persons) + assert result is not None + tax_units, person_rows, households = result + assert households == {10} + assert person_rows["tax_unit_id"].nunique() == 1 + assert tax_units["filing_status"].iloc[0] == "JOINT" diff --git a/uv.lock b/uv.lock index d9c1424..9a0551b 100644 --- a/uv.lock +++ b/uv.lock @@ -1193,6 +1193,7 @@ docs = [ ] policyengine = [ { name = "microimpute", marker = "python_full_version < '3.15'" }, + { name = "microunit" }, { name = "policyengine-us", marker = "python_full_version < '3.15'" }, { name = "spm-calculator" }, ] @@ -1208,6 +1209,7 @@ requires-dist = [ { name = "jupyter-book", marker = "extra == 'docs'", specifier = ">=0.15,<0.16" }, { name = "microimpute", marker = "python_full_version >= '3.12' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==1.15.1" }, { name = "microplex", extras = ["calibrate"], git = "https://github.com/PolicyEngine/microplex.git?rev=1e0627182f9df40aacd7043c96956c2895bf9d30" }, + { name = "microunit", marker = "extra == 'policyengine'", git = "https://github.com/PolicyEngine/microunit?rev=d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1" }, { name = "policyengine-us", marker = "python_full_version >= '3.11' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==1.715.2" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0" }, { name = "requests", specifier = ">=2.31" }, @@ -1217,6 +1219,16 @@ requires-dist = [ ] provides-extras = ["dev", "docs", "r2", "policyengine"] +[[package]] +name = "microunit" +version = "0.1.0" +source = { git = "https://github.com/PolicyEngine/microunit?rev=d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1#d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1" } +dependencies = [ + { name = "numpy" }, + { name = "pandas" }, + { name = "pyyaml" }, +] + [[package]] name = "mpmath" version = "1.3.0" From 0fae65fb7b40befef01bf89329f84adcd46d1aee Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 14:53:36 -0400 Subject: [PATCH 2/3] Depend on microunit from PyPI (>=0.1.0) instead of the git pin microunit 0.1.0 is now published to PyPI, so drop the pre-PyPI git+https commit pin in favor of a standard version constraint. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyproject.toml | 7 +++---- uv.lock | 8 ++++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d638859..99b9ea3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,10 +36,9 @@ policyengine = [ "policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'", "spm-calculator>=0.3.1", # Standalone tax-unit construction engine (the extraction of eCPS's - # tax-unit logic). Pinned to a pre-PyPI commit; used by the PolicyEngine - # pipeline to reconstruct tax units from CPS-like person frames (issue - # #113). Switch to a PyPI release once published. - "microunit @ git+https://github.com/PolicyEngine/microunit@d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1", + # tax-unit logic), used by the PolicyEngine pipeline to reconstruct tax + # units from CPS-like person frames (issue #113). + "microunit>=0.1.0", ] [project.urls] diff --git a/uv.lock b/uv.lock index 9a0551b..8dc040d 100644 --- a/uv.lock +++ b/uv.lock @@ -1209,7 +1209,7 @@ requires-dist = [ { name = "jupyter-book", marker = "extra == 'docs'", specifier = ">=0.15,<0.16" }, { name = "microimpute", marker = "python_full_version >= '3.12' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==1.15.1" }, { name = "microplex", extras = ["calibrate"], git = "https://github.com/PolicyEngine/microplex.git?rev=1e0627182f9df40aacd7043c96956c2895bf9d30" }, - { name = "microunit", marker = "extra == 'policyengine'", git = "https://github.com/PolicyEngine/microunit?rev=d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1" }, + { name = "microunit", marker = "extra == 'policyengine'", specifier = ">=0.1.0" }, { name = "policyengine-us", marker = "python_full_version >= '3.11' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==1.715.2" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0" }, { name = "requests", specifier = ">=2.31" }, @@ -1222,12 +1222,16 @@ provides-extras = ["dev", "docs", "r2", "policyengine"] [[package]] name = "microunit" version = "0.1.0" -source = { git = "https://github.com/PolicyEngine/microunit?rev=d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1#d3eccbbd33aa51f1c310bd6c2f37c9c3735beeb1" } +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "numpy" }, { name = "pandas" }, { name = "pyyaml" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/58/c1/6a8a1a1f7e90e41295e813808f170c71f0d20d36c6203722fd682d0a3387/microunit-0.1.0.tar.gz", hash = "sha256:a1e90f525e0a1a3921a3ed62ce291620bd45242f829cbd7892253dfff307eeb3", size = 21638, upload-time = "2026-05-30T18:51:35.59Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f1/cf/a38de31d10b1029923daa7f9271a78c965deb94519627f6dd4d9c3fbf359/microunit-0.1.0-py3-none-any.whl", hash = "sha256:1652fd43b57fb6fc803089d0da0fc4d28948d9e7d5e742e3327afd376e0a3060", size = 23581, upload-time = "2026-05-30T18:51:34.376Z" }, +] [[package]] name = "mpmath" From 652735c36d51d11f6eeb502f6403fbe8c8b8ed2c Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 30 May 2026 15:20:10 -0400 Subject: [PATCH 3/3] Harden microunit delegation: defensive filing-status normalize + order test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Route microunit's filing_status through _normalize_policyengine_filing_status so the delegated path cannot diverge from the legacy paths if microunit ever changes its spelling/casing (today the vocabularies already match). - Add a regression test feeding rows out of PH_SEQ/A_LINENO order, asserting correct unit/role/filing assignment — locks in microunit's input-row-order contract that the positional TAX_ID mapping relies on. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/pipelines/us.py | 9 +++-- .../pipelines/test_us_microunit_delegation.py | 33 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 15f9367..55df713 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6347,9 +6347,14 @@ def _build_policyengine_tax_units_via_microunit( for role in person_assignments["tax_unit_role_input"].tolist() ] + # microunit emits the canonical filing-status vocabulary already, but + # normalize defensively so this path can never diverge from the legacy + # paths if microunit ever changes its spelling/casing. filing_status_by_unit = { - int(row_tax_id) + int(start_tax_unit_id): self._decode_microunit_bytes( - filing_value + int(row_tax_id) + int(start_tax_unit_id): ( + self._normalize_policyengine_filing_status( + self._decode_microunit_bytes(filing_value) + ) ) for row_tax_id, filing_value in zip( tax_unit["TAX_ID"].tolist(), diff --git a/tests/pipelines/test_us_microunit_delegation.py b/tests/pipelines/test_us_microunit_delegation.py index eb1992c..b2d6885 100644 --- a/tests/pipelines/test_us_microunit_delegation.py +++ b/tests/pipelines/test_us_microunit_delegation.py @@ -222,3 +222,36 @@ def test_role_flags_entry_falls_back_without_cps_columns(pipeline): assert households == {10} assert person_rows["tax_unit_id"].nunique() == 1 assert tax_units["filing_status"].iloc[0] == "JOINT" + + +def test_microunit_path_preserves_alignment_under_unsorted_input(pipeline): + """microunit groups internally by PH_SEQ but returns per-person results in + input row order; the delegation maps TAX_ID/role back positionally, so the + mapping must stay correct even when input rows are not pre-sorted by + PH_SEQ/A_LINENO. + + Regression guard: if a future microunit change stopped preserving input row + order, the positional mapping would misassign people to units/roles and this + test would fail while the pre-sorted fixture test still passed. + """ + persons = _cps_person_frame() + # Input order != PH_SEQ order: single adult (HH 20) first, then the HH 10 + # members out of A_LINENO order (2, 1, 3). + shuffled = persons.iloc[[3, 1, 0, 2]].reset_index(drop=True) + + result = pipeline._build_policyengine_tax_units_via_microunit(shuffled) + assert result is not None + tax_units, person_rows, _ = result + + unit_by_person = dict( + zip(person_rows["person_id"], person_rows["tax_unit_id"], strict=True) + ) + # Couple + child still share one unit; the single adult stays separate. + assert unit_by_person[1] == unit_by_person[2] == unit_by_person[3] + assert unit_by_person[4] != unit_by_person[1] + + # Roles/filing must stay correctly aligned, not just the grouping. + family_id = int(unit_by_person[1]) + assert _unit_field(tax_units, family_id, "filing_status") == "JOINT" + assert sorted(_unit_field(tax_units, family_id, "filer_ids")) == [1, 2] + assert list(_unit_field(tax_units, family_id, "dependent_ids")) == [3]