Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions src/microplex_us/data_sources/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,10 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame:
.otherwise(0.0)
)
ira_pool = pl.when(has_earned_income).then(remaining - dc_pool).otherwise(0.0)
traditional_401k_desired = dc_pool * (1 - ROTH_SHARE_OF_DC_CONTRIBUTIONS)
roth_401k_desired = dc_pool * ROTH_SHARE_OF_DC_CONTRIBUTIONS
traditional_ira_desired = ira_pool * TRADITIONAL_SHARE_OF_IRA_CONTRIBUTIONS
roth_ira_desired = ira_pool * (1 - TRADITIONAL_SHARE_OF_IRA_CONTRIBUTIONS)

derived_retirement_columns: list[pl.Expr] = []
if "self_employed_pension_contributions_desired" not in result.columns:
Expand All @@ -2098,28 +2102,20 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame:
# DC pool: traditional/Roth 401(k) split.
if "traditional_401k_contributions_desired" not in result.columns:
derived_retirement_columns.append(
(dc_pool * (1 - ROTH_SHARE_OF_DC_CONTRIBUTIONS)).alias(
"traditional_401k_contributions_desired"
)
traditional_401k_desired.alias("traditional_401k_contributions_desired")
)
if "roth_401k_contributions_desired" not in result.columns:
derived_retirement_columns.append(
(dc_pool * ROTH_SHARE_OF_DC_CONTRIBUTIONS).alias(
"roth_401k_contributions_desired"
)
roth_401k_desired.alias("roth_401k_contributions_desired")
)
# IRA pool: traditional/Roth IRA split.
if "traditional_ira_contributions_desired" not in result.columns:
derived_retirement_columns.append(
(ira_pool * TRADITIONAL_SHARE_OF_IRA_CONTRIBUTIONS).alias(
"traditional_ira_contributions_desired"
)
traditional_ira_desired.alias("traditional_ira_contributions_desired")
)
if "roth_ira_contributions_desired" not in result.columns:
derived_retirement_columns.append(
(ira_pool * (1 - TRADITIONAL_SHARE_OF_IRA_CONTRIBUTIONS)).alias(
"roth_ira_contributions_desired"
)
roth_ira_desired.alias("roth_ira_contributions_desired")
)
limit_year = max(
min(year, max(RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR)),
Expand All @@ -2133,52 +2129,41 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame:
limit_ira = pl.lit(float(limits["ira"])) + (
catch_up_eligible * float(limits["ira_catch_up"])
)
capped_se_pension = (
pl.when(has_se).then(retirement_contributions).otherwise(0.0)
)
capped_remaining_after_se = pl.max_horizontal(
retirement_contributions - capped_se_pension,
pl.lit(0.0),
)
capped_se_pension = se_pension
capped_traditional_401k = (
pl.when(has_wages)
.then(pl.min_horizontal(capped_remaining_after_se, limit_401k))
.then(pl.min_horizontal(traditional_401k_desired, limit_401k))
.otherwise(0.0)
)
capped_remaining_after_traditional_401k = pl.max_horizontal(
capped_remaining_after_se - capped_traditional_401k,
capped_remaining_401k_limit = pl.max_horizontal(
limit_401k - capped_traditional_401k,
pl.lit(0.0),
)
capped_roth_401k = (
pl.when(has_wages)
.then(
pl.min_horizontal(
capped_remaining_after_traditional_401k,
limit_401k,
roth_401k_desired,
capped_remaining_401k_limit,
)
)
.otherwise(0.0)
)
capped_remaining_after_roth_401k = pl.max_horizontal(
capped_remaining_after_traditional_401k - capped_roth_401k,
pl.lit(0.0),
)
capped_traditional_ira = (
pl.when(has_wages)
.then(pl.min_horizontal(capped_remaining_after_roth_401k, limit_ira))
pl.when(has_earned_income)
.then(pl.min_horizontal(traditional_ira_desired, limit_ira))
.otherwise(0.0)
)
capped_remaining_after_traditional_ira = pl.max_horizontal(
capped_remaining_after_roth_401k - capped_traditional_ira,
capped_remaining_ira_limit = pl.max_horizontal(
limit_ira - capped_traditional_ira,
pl.lit(0.0),
)
capped_roth_ira_limit = limit_ira - capped_traditional_ira
capped_roth_ira = (
pl.when(has_wages)
pl.when(has_earned_income)
.then(
pl.min_horizontal(
capped_remaining_after_traditional_ira,
capped_roth_ira_limit,
roth_ira_desired,
capped_remaining_ira_limit,
)
)
.otherwise(0.0)
Expand Down
69 changes: 69 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from microplex.synthesizer import Synthesizer
from microplex.targets import TargetQuery, TargetSpec

from microplex_us.data_sources.cps import (
RETIREMENT_CATCH_UP_AGE,
RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR,
)
from microplex_us.data_sources.forbes import (
ForbesFixedSpine,
ForbesFixedSpineConfig,
Expand Down Expand Up @@ -11110,6 +11114,71 @@ def first_signed_or_present(*columns: str) -> pd.Series:
"hours_worked",
).clip(lower=0.0)

retirement_desired_columns = {
"self_employed_pension_contributions": (
"self_employed_pension_contributions_desired"
),
"traditional_401k_contributions": "traditional_401k_contributions_desired",
"roth_401k_contributions": "roth_401k_contributions_desired",
"traditional_ira_contributions": "traditional_ira_contributions_desired",
"roth_ira_contributions": "roth_ira_contributions_desired",
}
if all(
column in result.columns for column in retirement_desired_columns.values()
):
limit_year = max(
min(
self.config.policyengine_dataset_year or 2024,
max(RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR),
),
min(RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR),
)
limits = RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR[limit_year]
age = first_present("age")
catch_up_eligible = age.ge(RETIREMENT_CATCH_UP_AGE)
limit_401k = pd.Series(
float(limits["401k"]),
index=result.index,
dtype=float,
) + catch_up_eligible.astype(float) * float(limits["401k_catch_up"])
limit_ira = pd.Series(
float(limits["ira"]),
index=result.index,
dtype=float,
) + catch_up_eligible.astype(float) * float(limits["ira_catch_up"])

def capped_at(values: pd.Series, caps: pd.Series) -> pd.Series:
return pd.Series(
np.minimum(values.to_numpy(), caps.to_numpy()),
index=result.index,
dtype=float,
)

self_employed_pension = first_present(
"self_employed_pension_contributions_desired"
).clip(lower=0.0)
traditional_401k = capped_at(
first_present("traditional_401k_contributions_desired").clip(lower=0.0),
limit_401k,
)
roth_401k = capped_at(
first_present("roth_401k_contributions_desired").clip(lower=0.0),
(limit_401k - traditional_401k).clip(lower=0.0),
)
traditional_ira = capped_at(
first_present("traditional_ira_contributions_desired").clip(lower=0.0),
limit_ira,
)
roth_ira = capped_at(
first_present("roth_ira_contributions_desired").clip(lower=0.0),
(limit_ira - traditional_ira).clip(lower=0.0),
)
result["self_employed_pension_contributions"] = self_employed_pension
result["traditional_401k_contributions"] = traditional_401k
result["roth_401k_contributions"] = roth_401k
result["traditional_ira_contributions"] = traditional_ira
result["roth_ira_contributions"] = roth_ira

marital_status = (
pd.to_numeric(result["marital_status"], errors="coerce")
if "marital_status" in result.columns
Expand Down
46 changes: 34 additions & 12 deletions tests/data_sources/test_cps_retirement_contributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,32 @@ def _expected_capped_split(
*,
year: int,
) -> dict[str, float]:
"""Recompute eCPS' capped account-order split (cps.py:1500-1537)."""
"""Recompute the final account leaves by capping each desired pool."""
limits = RETIREMENT_CONTRIBUTION_LIMITS_BY_YEAR[year]
catch_up = age >= 50
limit_401k = limits["401k"] + catch_up * limits["401k_catch_up"]
limit_ira = limits["ira"] + catch_up * limits["ira_catch_up"]
se_pension = retcb if se > 0 else 0.0
remaining = max(retcb - se_pension, 0.0)
traditional_401k = min(remaining, limit_401k) if wages > 0 else 0.0
remaining = max(remaining - traditional_401k, 0.0)
roth_401k = min(remaining, limit_401k) if wages > 0 else 0.0
remaining = max(remaining - roth_401k, 0.0)
traditional_ira = min(remaining, limit_ira) if wages > 0 else 0.0
remaining = max(remaining - traditional_ira, 0.0)
roth_ira = min(remaining, limit_ira - traditional_ira) if wages > 0 else 0.0
desired = _expected_split(retcb, wages, se)
traditional_401k = min(
desired["traditional_401k_contributions_desired"],
limit_401k,
)
roth_401k = min(
desired["roth_401k_contributions_desired"],
max(limit_401k - traditional_401k, 0.0),
)
traditional_ira = min(
desired["traditional_ira_contributions_desired"],
limit_ira,
)
roth_ira = min(
desired["roth_ira_contributions_desired"],
max(limit_ira - traditional_ira, 0.0),
)
return {
"self_employed_pension_contributions": se_pension,
"self_employed_pension_contributions": desired[
"self_employed_pension_contributions_desired"
],
"traditional_401k_contributions": traditional_401k,
"roth_401k_contributions": roth_401k,
"traditional_ira_contributions": traditional_ira,
Expand Down Expand Up @@ -167,7 +177,7 @@ def test_split_matches_ecps_math_exactly():
assert got == expected[leaf], f"row {i} {leaf}: {got} != {expected[leaf]}"


def test_capped_split_matches_ecps_account_order_exactly():
def test_capped_split_matches_desired_account_pools_with_limits():
rows = [
{"wages": 50_000.0, "se": 0.0, "retcb": 10_000.0, "age": 40},
{"wages": 0.0, "se": 80_000.0, "retcb": 5_000.0, "age": 40},
Expand All @@ -187,6 +197,18 @@ def test_capped_split_matches_ecps_account_order_exactly():
assert got == expected[leaf], f"row {i} {leaf}: {got} != {expected[leaf]}"


def test_capped_split_preserves_ira_support_below_401k_limit():
rows = [
{"wages": 50_000.0, "se": 0.0, "retcb": 10_000.0, "age": 40},
]
out = _process_persons(_raw_person_frame(rows), 2024)

assert out["traditional_401k_contributions"].to_list()[0] > 0
assert out["roth_401k_contributions"].to_list()[0] > 0
assert out["traditional_ira_contributions"].to_list()[0] > 0
assert out["roth_ira_contributions"].to_list()[0] > 0


def test_five_leaves_reconcile_to_retcb_for_earned_income_records():
"""For anyone with earned income the five leaves sum back to RETCB_VAL."""
rows = [
Expand Down
42 changes: 42 additions & 0 deletions tests/policyengine/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,48 @@ class FakeSystem:
"weekly_hours_worked_before_lsr",
}.issubset(columns)

def test_pipeline_export_uses_desired_retirement_leaves_for_final_inputs(self):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(policyengine_dataset_year=2024)
)
persons = pd.DataFrame(
{
"person_id": [1, 2],
"household_id": [10, 11],
"age": [40, 52],
"self_employed_pension_contributions": [0.0, 99_000.0],
"traditional_401k_contributions": [10_000.0, 99_000.0],
"roth_401k_contributions": [0.0, 99_000.0],
"traditional_ira_contributions": [0.0, 99_000.0],
"roth_ira_contributions": [0.0, 99_000.0],
"self_employed_pension_contributions_desired": [0.0, 1_000.0],
"traditional_401k_contributions_desired": [7_718.0, 40_000.0],
"roth_401k_contributions_desired": [1_362.0, 5_000.0],
"traditional_ira_contributions_desired": [360.64, 10_000.0],
"roth_ira_contributions_desired": [559.36, 3_000.0],
}
)

result = pipeline._augment_policyengine_person_inputs(persons)

np.testing.assert_allclose(
result[
[
"self_employed_pension_contributions",
"traditional_401k_contributions",
"roth_401k_contributions",
"traditional_ira_contributions",
"roth_ira_contributions",
]
].to_numpy(),
np.array(
[
[0.0, 7_718.0, 1_362.0, 360.64, 559.36],
[1_000.0, 30_500.0, 0.0, 8_000.0, 0.0],
]
),
)

def test_projects_frame_and_writes_time_period_dataset(self, tmp_path):
frame = pd.DataFrame(
{
Expand Down
Loading