Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 250 additions & 21 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -6286,23 +6286,33 @@ def _build_policyengine_tax_units_via_microunit(
persons: pd.DataFrame,
*,
start_tax_unit_id: int = 0,
allow_normalized_adapter: bool | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame, set[Any]] | None:
"""Reconstruct tax units by delegating to ``microunit`` (issue #113).

This is the *reconstruction-from-scratch* path. The authoritative-ID
path (#112, :meth:`_build_policyengine_tax_units_from_existing_ids`) is
handled separately and is never routed here.

Delegation only happens when ``persons`` carries the raw CPS columns in
:attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`. ``microunit``'s logic genuinely
depends on marital status (``A_MARITL``), spouse/parent line pointers
(``A_SPOUSE``/``PEPAR1``/``PEPAR2``) and the CPS relationship recode
(``A_EXPRRP``); microplex's reconstruction-stage frame collapses
relationship into a 0/1/2/3 coding and drops the pointer columns, so a
*faithful* mapping is not possible from that frame. Rather than fabricate
microunit inputs (which would silently change behavior), we return
``None`` when the columns are absent and let the caller fall back to the
legacy role-flag reconstruction.
This is microplex's **default** tax-unit constructor for CPS-derived
frames. ``microunit`` is the rules-based engine that *replaces* the
unreliable CPS-provided ``tax_unit_id`` (Census ``TAX_ID``): when the
frame carries the real CPS pointer fields, the high-fidelity adapter
(#115) builds microunit's CPS contract and microunit re-partitions each
household from scratch, intentionally overriding any incoming
``tax_unit_id``. The ``policyengine_prefer_existing_tax_unit_ids`` /
:meth:`_build_policyengine_tax_units_from_existing_ids` path is a
**fallback** for the households this method does not construct -- it runs
*after* this one, on the remaining households -- not a parallel
authority. SPM/family/marital group IDs are preserved separately (#112)
and are not touched here, so "keep the source SPM units, replace the tax
units" holds.

Delegation runs when ``persons`` carries the raw CPS columns in
:attr:`_MICROUNIT_REQUIRED_CPS_COLUMNS`, or can synthesize them: the
high-fidelity adapter (#115) is used by DEFAULT when the real
``person_number``/``spouse_person_number``/``family_relationship`` fields
are present (the production candidate carries them); the coarse
``relationship_to_head``-only heuristic stays opt-in. When neither the
raw columns nor the high-fidelity fields are available (and the coarse
heuristic is not enabled), we return ``None`` and let the caller fall
back to the legacy role-flag reconstruction.

.. warning::
``microunit`` *is* eCPS's tax-unit construction. Routing microplex
Expand All @@ -6317,8 +6327,28 @@ def _build_policyengine_tax_units_via_microunit(
"""
if "person_id" not in persons.columns or "household_id" not in persons.columns:
return None
cps_frame = persons
if not set(self._MICROUNIT_REQUIRED_CPS_COLUMNS).issubset(persons.columns):
return None
# microunit is microplex's required tax-unit engine (#113). When the
# raw CPS columns are absent, synthesize its CPS contract from the
# normalized frame. The high-fidelity path (real person_number /
# spouse_person_number / family_relationship, which the production
# candidate carries) is used by DEFAULT; the coarse
# relationship_to_head-only heuristic stays opt-in via the config flag
# so minimal frames don't silently get the lossy reconstruction.
has_high_fidelity_fields = {
"person_number",
"family_relationship",
}.issubset(persons.columns)
if allow_normalized_adapter is None:
allow_normalized_adapter = has_high_fidelity_fields or bool(
getattr(self.config, "microunit_construct_from_normalized", False)
)
if not allow_normalized_adapter:
return None
cps_frame = self._microunit_cps_frame_from_normalized(persons)
if cps_frame is None:
return None

# Imported lazily to match this module's optional-dependency convention:
# ``microunit`` ships in the ``policyengine`` extra, and the base test
Expand All @@ -6328,12 +6358,23 @@ def _build_policyengine_tax_units_via_microunit(
# microunit keys its CPS-style frame on (PH_SEQ, A_LINENO); resetting the
# index keeps row order so the returned per-person TAX_ID and role align
# positionally back onto person_rows.
person_rows = persons.reset_index(drop=True).copy()
person_assignments, tax_unit = construct_tax_units(
person_rows.copy(),
year=self._microunit_reference_year(person_rows),
mode=POLICYENGINE_MODE,
)
person_rows = cps_frame.reset_index(drop=True).copy()
try:
person_assignments, tax_unit = construct_tax_units(
person_rows.copy(),
year=self._microunit_reference_year(person_rows),
mode=POLICYENGINE_MODE,
)
except Exception:
# microunit raises on households it cannot resolve (e.g. no valid
# reference person). Never let that crash materialization — fall back
# to the caller's legacy reconstruction for the whole frame.
LOGGER.warning(
"microunit tax-unit construction failed; falling back to "
"legacy reconstruction",
exc_info=True,
)
return None

tax_id = pd.to_numeric(person_assignments["TAX_ID"], errors="coerce")
person_rows["tax_unit_id"] = (
Expand Down Expand Up @@ -6411,6 +6452,194 @@ def _build_policyengine_tax_units_via_microunit(
person_rows = person_rows.drop(columns=["_microunit_role"], errors="ignore")
return pd.DataFrame(tax_unit_rows), person_rows, households

def _microunit_cps_frame_from_cps_fields(
self,
persons: pd.DataFrame,
) -> pd.DataFrame:
"""High-fidelity (#115) build of microunit's CPS contract from the real
CPS-derived fields microplex carries at materialization: ``person_number``
(a 1-based within-household line number), ``spouse_person_number`` (a real
spouse line pointer), ``family_relationship`` (CPS A_FAMREL) and
``marital_status``. The household reference person (``person_number == 1``)
always anchors a valid head, so microunit never lacks one.

Only ``PEPAR1``/``PEPAR2`` remain heuristic: a child's parents are taken to
be the household reference person (line 1) and that person's spouse (#115).
"""
frame = persons.reset_index(drop=True).copy()
hh = pd.to_numeric(frame["household_id"], errors="coerce")
pernum = (
pd.to_numeric(frame["person_number"], errors="coerce").fillna(0).astype(int)
)
# ``family_relationship`` arrives in either CPS A_FAMREL 1-based coding
# (1=reference person, 2=spouse, 3=child, ...) or the optimizer's 0-based
# coding (0=head, 1=spouse, 2=child); the rest of the pipeline detects
# this per household (see ``_normalize_relationship_to_head`` and
# ``data_sources.cps``). The A_EXPRRP / parent-pointer mapping below
# expects the 1-based scheme, so shift any 0-based household up by one --
# otherwise a 0-based frame silently mis-codes children as spouses and
# drops their parent pointers.
famrel_raw = pd.to_numeric(frame["family_relationship"], errors="coerce")
zero_based_hh = (famrel_raw == 0).groupby(hh).transform("any").fillna(False)
famrel = famrel_raw.add(zero_based_hh.astype(int)).fillna(0).astype(int)
spouse_num = (
pd.to_numeric(frame.get("spouse_person_number", 0), errors="coerce")
.fillna(0)
.astype(int)
)

frame["PH_SEQ"] = hh.astype(np.int64)
frame["A_LINENO"] = pernum
frame["A_AGE"] = (
pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int)
)
frame["A_SPOUSE"] = spouse_num

is_ref = pernum == 1
# A_EXPRRP (microunit.CPSRelationshipCode): reference person 1, spouse 3,
# own child 5, everyone else other-relative 10.
exprrp = pd.Series(10, index=frame.index, dtype=int)
exprrp[famrel == 2] = 3
exprrp[famrel == 3] = 5
exprrp[is_ref] = 1
frame["A_EXPRRP"] = exprrp

# A_MARITL: 1 married spouse present; 4 widowed; else 7 never-married.
marital = pd.Series(7, index=frame.index, dtype=int)
marital[spouse_num > 0] = 1
if "is_surviving_spouse" in frame.columns:
surviving = (
pd.to_numeric(frame["is_surviving_spouse"], errors="coerce").fillna(0)
> 0
)
marital[surviving & (spouse_num == 0)] = 4
frame["A_MARITL"] = marital

# PEPAR1/PEPAR2: a child's parents are heuristically the household
# reference person (line 1) and that reference person's spouse.
ref_spouse_line = frame.loc[is_ref].groupby(hh[is_ref])["A_SPOUSE"].first()
frame["PEPAR1"] = 0
frame["PEPAR2"] = 0
is_child = famrel == 3
frame.loc[is_child, "PEPAR1"] = 1
frame.loc[is_child, "PEPAR2"] = (
hh[is_child].map(ref_spouse_line).fillna(0).astype(int)
)

return frame

def _microunit_cps_frame_from_normalized(
self,
persons: pd.DataFrame,
) -> pd.DataFrame | None:
"""PROTOTYPE (issue #115): synthesize microunit's CPS-like input contract
from microplex's normalized person columns.

``microunit.construct_tax_units`` needs raw CPS columns (PH_SEQ/A_LINENO/
A_AGE/A_MARITL/A_SPOUSE/PEPAR1/PEPAR2/A_EXPRRP); at PolicyEngine
materialization microplex instead carries ``household_id``/``age``/
``relationship_to_head``. This builds the former from the latter, mirroring
the ACS->CPS mapping microunit documents as the consumer's responsibility.

.. warning::
HEURISTIC AND UNVALIDATED. The ``relationship_to_head`` -> ``A_EXPRRP``
and married -> ``A_MARITL`` maps are approximate, and ``PEPAR1``/
``PEPAR2`` are inferred by assuming a child's parents are the household
head and spouse. The fidelity of these maps must be validated against
the legacy reconstruction before this is trusted (see #115); it is
gated OFF by default.

Returns ``persons`` with the eight microunit CPS columns added, or ``None``
if the prerequisite normalized columns are absent.
"""
# Prefer the high-fidelity path when microplex carries the real CPS-derived
# pointer fields (person_number is a 1-based within-household line number;
# spouse_person_number a real spouse line pointer). Otherwise fall back to
# the coarse relationship_to_head heuristic below.
if {
"person_id",
"person_number",
"family_relationship",
"household_id",
"age",
}.issubset(persons.columns):
return self._microunit_cps_frame_from_cps_fields(persons)

required = {"person_id", "household_id", "age", "relationship_to_head"}
if not required.issubset(persons.columns):
return None

# CPS A_EXPRRP recode (microunit.CPSRelationshipCode): 1 reference person,
# 3 husband, 5 own child, 10 other relative.
exprrp_by_rel = {0: 1, 1: 3, 2: 5, 3: 10}

frame = persons.reset_index(drop=True).copy()
rel = (
pd.to_numeric(frame["relationship_to_head"], errors="coerce")
.fillna(3)
.astype(int)
)
age = pd.to_numeric(frame["age"], errors="coerce").fillna(0).astype(int)

# Per-household line numbers (1-based, unique within household): head
# first, then spouse, then everyone else oldest-first.
frame = frame.assign(_rel=rel.to_numpy(), _age=age.to_numpy())
frame = frame.sort_values(
["household_id", "_rel", "_age", "person_id"],
ascending=[True, True, False, True],
).reset_index(drop=True)
frame["A_LINENO"] = frame.groupby("household_id", sort=False).cumcount() + 1
# Guarantee exactly one household head (microunit requires a single
# reference person per PH_SEQ, else it raises). After the head-first sort
# the line-1 member is the most head-like; make it the head and demote
# any other rows that mapped to head (multi-family / headless households).
is_line1 = frame["A_LINENO"] == 1
frame.loc[is_line1, "_rel"] = 0
frame.loc[~is_line1 & (frame["_rel"] == 0), "_rel"] = 3
frame["PH_SEQ"] = pd.to_numeric(frame["household_id"], errors="coerce").astype(
np.int64
)
frame["A_AGE"] = frame["_age"]
frame["A_EXPRRP"] = frame["_rel"].map(exprrp_by_rel).fillna(10).astype(int)

# Head/spouse line numbers per household, for spouse pointers + marital.
head_line = (
frame.loc[frame["_rel"] == 0].groupby("household_id")["A_LINENO"].first()
)
spouse_line = (
frame.loc[frame["_rel"] == 1].groupby("household_id")["A_LINENO"].first()
)
hh = frame["household_id"]
is_head = frame["_rel"] == 0
is_spouse = frame["_rel"] == 1
is_child = frame["_rel"] == 2
has_spouse = hh.map(spouse_line).notna()

frame["A_SPOUSE"] = 0
frame.loc[is_head, "A_SPOUSE"] = (
hh[is_head].map(spouse_line).fillna(0).astype(int)
)
frame.loc[is_spouse, "A_SPOUSE"] = (
hh[is_spouse].map(head_line).fillna(0).astype(int)
)

# A_MARITL: 1 = married, spouse present (head/spouse of a couple); else
# 7 = never married. microunit only needs the married-vs-not distinction.
frame["A_MARITL"] = 7
frame.loc[(is_head | is_spouse) & has_spouse, "A_MARITL"] = 1

# PEPAR1/PEPAR2: assume a child's parents are the household head + spouse.
frame["PEPAR1"] = 0
frame["PEPAR2"] = 0
frame.loc[is_child, "PEPAR1"] = (
hh[is_child].map(head_line).fillna(0).astype(int)
)
frame.loc[is_child, "PEPAR2"] = (
hh[is_child].map(spouse_line).fillna(0).astype(int)
)

return frame.drop(columns=["_rel", "_age"])

@staticmethod
def _decode_microunit_bytes(value: Any) -> str:
"""Decode a ``microunit`` bytes-typed status/role into an upper string."""
Expand Down
Loading
Loading