Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 113 additions & 46 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ def _select_ssi_takeup_by_age_amount(
person_ids = person_ids.reindex(index)
age_values = pd.to_numeric(ages.reindex(index), errors="coerce").fillna(0.0)
weight_values = (
pd.to_numeric(weights.reindex(index), errors="coerce").fillna(0.0).clip(lower=0.0)
pd.to_numeric(weights.reindex(index), errors="coerce")
.fillna(0.0)
.clip(lower=0.0)
)
reported_values = (
pd.to_numeric(reported_ssi.reindex(index), errors="coerce")
Expand Down Expand Up @@ -555,7 +557,9 @@ def _select_until_amount(candidate_mask: np.ndarray, amount: float) -> None:
group_summary[group_name] = {
"reported_amount": target_amount,
"reported_recipients": float(
weight_values.to_numpy(dtype=float)[group_mask & reported_positive].sum()
weight_values.to_numpy(dtype=float)[
group_mask & reported_positive
].sum()
),
"formula_all_takeup_amount": float(full_amount[group_mask].sum()),
"formula_all_takeup_recipients": float(
Expand Down Expand Up @@ -3980,9 +3984,7 @@ def _calibrate_policyengine_ssi_takeup_from_reported_amounts(
"missing_columns": missing_columns,
}
reported_ssi = (
pd.to_numeric(persons["ssi"], errors="coerce")
.fillna(0.0)
.clip(lower=0.0)
pd.to_numeric(persons["ssi"], errors="coerce").fillna(0.0).clip(lower=0.0)
)
if not reported_ssi.gt(0.0).any():
persons["takes_up_ssi_if_eligible"] = False
Expand Down Expand Up @@ -4126,6 +4128,7 @@ def build_policyengine_entity_tables(
).transform("sum")
persons = self._augment_policyengine_person_inputs(persons)
persons["relationship_to_head"] = self._normalize_relationship_to_head(persons)
persons = self._assign_policyengine_household_head_flag(persons)

households = self._build_policyengine_households(persons)
tax_units, persons = self._build_policyengine_tax_units(persons)
Expand Down Expand Up @@ -6270,7 +6273,10 @@ def _build_policyengine_tax_units_from_role_flags(
"is_tax_unit_spouse",
"is_tax_unit_dependent",
}
if not role_columns.issubset(persons.columns) or "person_id" not in persons.columns:
if (
not role_columns.issubset(persons.columns)
or "person_id" not in persons.columns
):
return None

person_rows = persons.copy()
Expand Down Expand Up @@ -6348,9 +6354,7 @@ def _build_policyengine_tax_units_from_role_flags(
"tax_unit_id": global_tax_unit_id,
"household_id": int(household_id),
"filing_status": filing_status,
"member_ids": [
int(person_id) for person_id in unit_person_ids
],
"member_ids": [int(person_id) for person_id in unit_person_ids],
"filer_ids": [head_id, *spouse_ids],
"dependent_ids": dependent_ids,
"n_dependents": len(dependent_ids),
Expand Down Expand Up @@ -6466,12 +6470,9 @@ def _build_policyengine_tax_units_from_existing_ids(
.nunique()
)
if bool((households_per_tax_unit > 1).any()):
normalized_tax_unit_id = (
pd.factorize(pd.MultiIndex.from_frame(tax_unit_key), sort=False)[
0
].astype(np.int64)
+ int(start_tax_unit_id)
)
normalized_tax_unit_id = pd.factorize(
pd.MultiIndex.from_frame(tax_unit_key), sort=False
)[0].astype(np.int64) + int(start_tax_unit_id)
person_rows["tax_unit_id"] = normalized_tax_unit_id
else:
raw_tax_unit_id = raw_tax_unit_id.astype(np.int64)
Expand Down Expand Up @@ -6562,9 +6563,7 @@ def _resolve_tax_unit_role_flags(
& (~head_flag | dependent_hint | ~head_hint)
)
resolved_spouse = (
spouse_flag
& ~resolved_dependent
& (~head_flag | spouse_hint | ~head_hint)
spouse_flag & ~resolved_dependent & (~head_flag | spouse_hint | ~head_hint)
)
resolved_head = head_flag & ~resolved_spouse & ~resolved_dependent
return resolved_head, resolved_spouse, resolved_dependent
Expand Down Expand Up @@ -6687,9 +6686,7 @@ def _cohere_tax_unit_role_flags_for_household(
},
index=spouse_pool,
)
.sort_values(
["source_spouse", "relationship", "age", "person_id"]
)
.sort_values(["source_spouse", "relationship", "age", "person_id"])
.index[0]
)
if spouse_index is not None:
Expand All @@ -6700,26 +6697,21 @@ def _cohere_tax_unit_role_flags_for_household(
available
& (
dependent_flag
| (
dependent_hint
& (age.lt(24) | income.le(0.0))
)
| (dependent_hint & (age.lt(24) | income.le(0.0)))
| (spouse_hint & income.le(0.0))
)
] = True

available = ~(coherent_head | coherent_spouse | coherent_dependent)
coherent_head.loc[
available
& age.ge(18)
& (head_flag | income.gt(0.0))
] = True
coherent_head.loc[available & age.ge(18) & (head_flag | income.gt(0.0))] = True

coherent_dependent.loc[
~(coherent_head | coherent_spouse | coherent_dependent)
& (age.lt(18) | dependent_hint | income.le(0.0))
] = True
coherent_head.loc[~(coherent_head | coherent_spouse | coherent_dependent)] = True
coherent_head.loc[~(coherent_head | coherent_spouse | coherent_dependent)] = (
True
)

result["_is_tax_unit_head_flag"] = coherent_head
result["_is_tax_unit_spouse_flag"] = coherent_spouse
Expand Down Expand Up @@ -6803,12 +6795,10 @@ def _assign_role_flag_spouses(
head_by_person_number = {
int(person_number.loc[index]): int(row["person_id"])
for index, row in household_persons.iterrows()
if int(row["person_id"]) in head_set
and int(person_number.loc[index]) > 0
if int(row["person_id"]) in head_set and int(person_number.loc[index]) > 0
}
row_by_person_id = {
int(row["person_id"]): index
for index, row in household_persons.iterrows()
int(row["person_id"]): index for index, row in household_persons.iterrows()
}
assigned_spouses: set[int] = set()

Expand Down Expand Up @@ -6969,9 +6959,7 @@ def _infer_policyengine_aca_takeup_for_tax_unit(
marketplace = pd.Series(False, index=unit_persons.index, dtype=bool)
for column in observed:
marketplace |= (
pd.to_numeric(unit_persons[column], errors="coerce")
.fillna(0.0)
.ne(0.0)
pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0).ne(0.0)
)
return bool(marketplace.any())

Expand Down Expand Up @@ -7339,6 +7327,19 @@ def _coerce_policyengine_status_code(self, value: Any) -> int | None:

def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
result = persons.copy()
preserved_family_ids = self._normalized_complete_existing_group_ids(
result,
"family_id",
)
preserved_spm_unit_ids = self._normalized_complete_existing_group_ids(
result,
"spm_unit_id",
)
if preserved_family_ids is not None and preserved_spm_unit_ids is not None:
result["family_id"] = preserved_family_ids
result["spm_unit_id"] = preserved_spm_unit_ids
return result

family_ids: dict[int, int] = {}
spm_unit_ids: dict[int, int] = {}
next_family_id = 0
Expand All @@ -7363,8 +7364,16 @@ def _assign_family_and_spm_units(self, persons: pd.DataFrame) -> pd.DataFrame:
family_ids[int(row.name)] = next_family_id
next_family_id += 1

result["family_id"] = result.index.map(family_ids).astype(np.int64)
result["spm_unit_id"] = result.index.map(spm_unit_ids).astype(np.int64)
result["family_id"] = (
preserved_family_ids
if preserved_family_ids is not None
else result.index.map(family_ids).astype(np.int64)
)
result["spm_unit_id"] = (
preserved_spm_unit_ids
if preserved_spm_unit_ids is not None
else result.index.map(spm_unit_ids).astype(np.int64)
)
return result

def _primary_family_member_mask(
Expand All @@ -7373,9 +7382,7 @@ def _primary_family_member_mask(
) -> pd.Series:
"""Identify people who belong to the household's primary family."""

relationship_primary = household_persons["relationship_to_head"].isin(
{0, 1, 2}
)
relationship_primary = household_persons["relationship_to_head"].isin({0, 1, 2})
if "family_relationship" not in household_persons.columns:
return relationship_primary

Expand All @@ -7393,6 +7400,14 @@ def _assign_marital_units(
persons: pd.DataFrame,
) -> pd.DataFrame:
result = persons.copy()
preserved_marital_unit_ids = self._normalized_complete_existing_group_ids(
result,
"marital_unit_id",
)
if preserved_marital_unit_ids is not None:
result["marital_unit_id"] = preserved_marital_unit_ids
return result

marital_unit_by_person: dict[int, int] = {}
next_marital_unit_id = 0

Expand Down Expand Up @@ -7421,6 +7436,60 @@ def _assign_marital_units(
)
return result

def _assign_policyengine_household_head_flag(
self,
persons: pd.DataFrame,
) -> pd.DataFrame:
result = persons.copy()
derived = (
pd.to_numeric(result["relationship_to_head"], errors="coerce")
.fillna(-1)
.eq(0)
)
if "is_household_head" not in result.columns:
result["is_household_head"] = derived
return result

existing = pd.to_numeric(result["is_household_head"], errors="coerce")
result["is_household_head"] = existing.where(existing.notna(), derived).gt(0.5)
return result

def _normalized_complete_existing_group_ids(
self,
persons: pd.DataFrame,
id_column: str,
) -> pd.Series | None:
if id_column not in persons.columns:
return None
raw_ids = persons[id_column]
if raw_ids.isna().any():
return None

raw_key = raw_ids.astype("string")
key = pd.DataFrame(
{
"household_id": persons["household_id"],
id_column: raw_key,
},
index=persons.index,
)
raw_numeric = pd.to_numeric(raw_ids, errors="coerce")
households_per_raw_id = key.groupby(id_column, dropna=False)[
"household_id"
].nunique()
must_factorize = raw_numeric.isna().any() or bool(
households_per_raw_id.gt(1).any()
)
if must_factorize:
return pd.Series(
pd.factorize(pd.MultiIndex.from_frame(key), sort=False)[0].astype(
np.int64
),
index=persons.index,
name=id_column,
)
return raw_numeric.astype(np.int64).rename(id_column)

def _collapse_group_table(
self,
persons: pd.DataFrame,
Expand Down Expand Up @@ -7742,9 +7811,7 @@ def has_any(*columns: str) -> bool:
)
if "is_blind" in result.columns:
result["is_blind"] = (
pd.to_numeric(result["is_blind"], errors="coerce")
.fillna(0.0)
.ne(0.0)
pd.to_numeric(result["is_blind"], errors="coerce").fillna(0.0).ne(0.0)
)
elif "difficulty_seeing" in result.columns:
result["is_blind"] = first_present("difficulty_seeing").gt(0.0)
Expand Down
Loading
Loading