diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index ff6d134..92f005c 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -6033,18 +6033,28 @@ def _build_policyengine_tax_units_from_role_flags( return None person_rows = persons.copy() - person_rows["_is_tax_unit_head_flag"] = self._role_flag_series( + raw_head_flag = self._role_flag_series( person_rows, "is_tax_unit_head", ) - person_rows["_is_tax_unit_spouse_flag"] = self._role_flag_series( + raw_spouse_flag = self._role_flag_series( person_rows, "is_tax_unit_spouse", ) - person_rows["_is_tax_unit_dependent_flag"] = self._role_flag_series( + raw_dependent_flag = self._role_flag_series( person_rows, "is_tax_unit_dependent", ) + ( + person_rows["_is_tax_unit_head_flag"], + person_rows["_is_tax_unit_spouse_flag"], + person_rows["_is_tax_unit_dependent_flag"], + ) = self._resolve_tax_unit_role_flags( + person_rows, + head_flag=raw_head_flag, + spouse_flag=raw_spouse_flag, + dependent_flag=raw_dependent_flag, + ) tax_unit_rows: list[dict[str, Any]] = [] person_to_tax_unit: dict[int, int] = {} @@ -6071,7 +6081,9 @@ def _build_policyengine_tax_units_from_role_flags( for head_id in head_ids: spouse_ids = head_to_spouses.get(head_id, []) dependent_ids = head_to_dependents.get(head_id, []) - unit_person_ids = [head_id, *spouse_ids, *dependent_ids] + unit_person_ids = list( + dict.fromkeys([head_id, *spouse_ids, *dependent_ids]) + ) unit_persons = ordered.loc[ ordered["person_id"].astype(int).isin(unit_person_ids) ].copy() @@ -6276,6 +6288,45 @@ def _role_flag_series(self, frame: pd.DataFrame, column: str) -> pd.Series: return pd.Series(False, index=frame.index, dtype=bool) return pd.to_numeric(frame[column], errors="coerce").fillna(0.0).gt(0.5) + def _resolve_tax_unit_role_flags( + self, + frame: pd.DataFrame, + *, + head_flag: pd.Series, + spouse_flag: pd.Series, + dependent_flag: pd.Series, + ) -> tuple[pd.Series, pd.Series, pd.Series]: + relationship = ( + pd.to_numeric(frame["relationship_to_head"], errors="coerce") + .fillna(-1) + .astype(int) + if "relationship_to_head" in frame.columns + else pd.Series(-1, index=frame.index, dtype=int) + ) + family_relationship = ( + pd.to_numeric(frame["family_relationship"], errors="coerce") + .fillna(-1) + .astype(int) + if "family_relationship" in frame.columns + else pd.Series(-1, index=frame.index, dtype=int) + ) + head_hint = relationship.eq(0) | family_relationship.isin([0, 1]) + spouse_hint = relationship.eq(1) | family_relationship.eq(2) + dependent_hint = relationship.isin([2, 3]) | family_relationship.isin([3, 4]) + + resolved_dependent = ( + dependent_flag + & (~spouse_flag | dependent_hint | ~spouse_hint) + & (~head_flag | dependent_hint | ~head_hint) + ) + resolved_spouse = ( + spouse_flag + & ~resolved_dependent + & (~head_flag | spouse_hint | ~head_hint) + ) + resolved_head = head_flag & ~resolved_spouse & ~resolved_dependent + return resolved_head, resolved_spouse, resolved_dependent + def _assign_role_flag_spouses( self, household_persons: pd.DataFrame, diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 36a94d0..ce199ad 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1339,6 +1339,108 @@ def test_build_policyengine_entity_tables_prefers_tax_unit_role_flags_over_bad_i assert tax_units.iloc[0]["filing_status"] == "JOINT" assert tax_units.iloc[0]["n_dependents"] == 1 + def test_build_policyengine_entity_tables_resolves_spouse_head_role_conflicts( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 101], + "weight": [1.0, 1.0], + "age": [45, 43], + "income": [60_000.0, 15_000.0], + "relationship_to_head": [0, 1], + "family_relationship": [1, 2], + "person_number": [1, 2], + "spouse_person_number": [2, 1], + "tax_unit_is_joint": [1.0, 1.0], + "tax_unit_count_dependents": [0.0, 0.0], + "is_tax_unit_head": [1.0, 1.0], + "is_tax_unit_spouse": [0.0, 1.0], + "is_tax_unit_dependent": [0.0, 0.0], + "state_fips": [6, 6], + "tenure": [1, 1], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) + tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + + assert len(tax_units) == 1 + assert person_rows["tax_unit_id"].nunique() == 1 + assert tax_units.iloc[0]["filing_status"] == "JOINT" + + def test_build_policyengine_entity_tables_resolves_dependent_head_role_conflicts( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 101], + "weight": [1.0, 1.0], + "age": [45, 12], + "income": [60_000.0, 0.0], + "relationship_to_head": [0, 2], + "family_relationship": [1, 3], + "person_number": [1, 2], + "spouse_person_number": [0, 0], + "tax_unit_is_joint": [0.0, 0.0], + "tax_unit_count_dependents": [1.0, 1.0], + "is_tax_unit_head": [1.0, 1.0], + "is_tax_unit_spouse": [0.0, 0.0], + "is_tax_unit_dependent": [0.0, 1.0], + "state_fips": [6, 6], + "tenure": [1, 1], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + person_rows = tables.persons.sort_values("person_id").reset_index(drop=True) + tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + + assert len(tax_units) == 1 + assert person_rows["tax_unit_id"].nunique() == 1 + assert tax_units.iloc[0]["filing_status"] == "HEAD_OF_HOUSEHOLD" + assert tax_units.iloc[0]["n_dependents"] == 1 + + def test_build_policyengine_entity_tables_resolves_spouse_dependent_role_conflicts( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "tax_unit_id": [100, 100], + "weight": [1.0, 1.0], + "age": [45, 12], + "income": [60_000.0, 0.0], + "relationship_to_head": [0, 2], + "family_relationship": [1, 3], + "person_number": [1, 2], + "spouse_person_number": [0, 0], + "tax_unit_is_joint": [0.0, 1.0], + "tax_unit_count_dependents": [1.0, 1.0], + "is_tax_unit_head": [1.0, 0.0], + "is_tax_unit_spouse": [0.0, 1.0], + "is_tax_unit_dependent": [0.0, 1.0], + "state_fips": [6, 6], + "tenure": [1, 1], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + tax_units = tables.tax_units.sort_values("tax_unit_id").reset_index(drop=True) + + assert len(tax_units) == 1 + assert tax_units.iloc[0]["filing_status"] == "HEAD_OF_HOUSEHOLD" + assert tax_units.iloc[0]["n_dependents"] == 1 + def test_build_policyengine_entity_tables_preserves_tax_unit_agi_inputs(self): pipeline = USMicroplexPipeline( USMicroplexBuildConfig(policyengine_prefer_existing_tax_unit_ids=True)