Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions policyengine_us_data/calibration/sanity_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

logger = logging.getLogger(__name__)

WEEKS_IN_YEAR = 52
STANDARD_HOURS_PER_WEEK = 40
OVERTIME_RATE_MULTIPLIER = 1.5
HOURLY_WAGE_INCOME_RELATIVE_TOLERANCE = 0.10
HOURLY_WAGE_INCOME_MISMATCH_SHARE_WARN_THRESHOLD = 0.25
HOURLY_WAGE_INCOME_MEAN_ABS_REL_ERROR_WARN_THRESHOLD = 0.20

KEY_MONETARY_VARS = [
"employment_income",
"adjusted_gross_income",
Expand All @@ -36,6 +43,162 @@
]


def _weighted_mean(values: np.ndarray, weights: np.ndarray) -> float:
if values.size == 0 or weights.sum() <= 0:
return 0.0
return float(np.average(values, weights=weights))


def _weighted_quantile(
values: np.ndarray,
weights: np.ndarray,
quantile: float,
) -> float:
if values.size == 0 or weights.sum() <= 0:
return 0.0

sorter = np.argsort(values)
values = values[sorter]
weights = weights[sorter]
cumulative_weights = np.cumsum(weights)
cutoff = quantile * cumulative_weights[-1]
return float(values[np.searchsorted(cumulative_weights, cutoff, side="left")])


def _format_hourly_wage_income_detail(
*,
comparable_count: int,
comparable_weight: float,
mismatch_count: int,
mismatch_share: float,
mean_abs_rel_error: float,
p90_abs_rel_error: float,
over_income_share: float,
tolerance: float,
) -> str:
return (
f"{mismatch_count:,}/{comparable_count:,} unweighted mismatches; "
f"{mismatch_share:.1%} weighted mismatch share at "
f">{tolerance:.0%} tolerance among {comparable_weight:,.0f} weighted workers; "
f"mean absolute relative gap {mean_abs_rel_error:.1%}; "
f"p90 absolute relative gap {p90_abs_rel_error:.1%}; "
f"{over_income_share:.1%} imply annual wages above employment_income "
f"by >{tolerance:.0%}"
)


def build_hourly_wage_income_consistency_diagnostics(
employment_income: np.ndarray,
hourly_wage: np.ndarray,
hours_worked_last_week: np.ndarray,
is_paid_hourly: np.ndarray,
weights: np.ndarray | None = None,
*,
relative_tolerance: float = HOURLY_WAGE_INCOME_RELATIVE_TOLERANCE,
mismatch_share_warn_threshold: float = (
HOURLY_WAGE_INCOME_MISMATCH_SHARE_WARN_THRESHOLD
),
mean_abs_rel_error_warn_threshold: float = (
HOURLY_WAGE_INCOME_MEAN_ABS_REL_ERROR_WARN_THRESHOLD
),
) -> List[dict]:
"""Compare hourly facts with annual employment income.

Warns when more than 25 percent of comparable hourly workers differ by
more than 10 percent, or when the weighted mean absolute relative gap
exceeds 20 percent. These thresholds flag broad inconsistencies while
allowing last-week hours and annual wages to differ for normal reasons.
"""
employment_income = np.asarray(employment_income, dtype=float)
hourly_wage = np.asarray(hourly_wage, dtype=float)
hours_worked_last_week = np.asarray(hours_worked_last_week, dtype=float)
is_paid_hourly = np.asarray(is_paid_hourly, dtype=bool)

if weights is None:
weights = np.ones_like(employment_income, dtype=float)
else:
weights = np.asarray(weights, dtype=float)

straight_time_hours = np.minimum(hours_worked_last_week, STANDARD_HOURS_PER_WEEK)
overtime_hours = np.maximum(hours_worked_last_week - STANDARD_HOURS_PER_WEEK, 0)
straight_time_equivalent_hours = WEEKS_IN_YEAR * (
straight_time_hours + overtime_hours * OVERTIME_RATE_MULTIPLIER
)
implied_annual_wages = hourly_wage * straight_time_equivalent_hours

base_mask = (
is_paid_hourly
& (hourly_wage > 0)
& (hours_worked_last_week > 0)
& (employment_income > 0)
& np.isfinite(implied_annual_wages)
& np.isfinite(employment_income)
& np.isfinite(weights)
& (weights > 0)
)

results = []
subsets = [
("hourly_wage_income_consistency", base_mask),
(
"hourly_wage_income_consistency_overtime",
base_mask & (overtime_hours > 0),
),
]

for check_name, mask in subsets:
if not mask.any():
results.append(
{
"check": check_name,
"status": "SKIP",
"detail": "no comparable hourly workers",
}
)
continue

rel_gap = (
implied_annual_wages[mask] - employment_income[mask]
) / employment_income[mask]
subset_weights = weights[mask]
mismatch = np.abs(rel_gap) >= relative_tolerance
over_income = rel_gap >= relative_tolerance
mismatch_share = _weighted_mean(mismatch.astype(float), subset_weights)
mean_abs_rel_error = _weighted_mean(np.abs(rel_gap), subset_weights)
p90_abs_rel_error = _weighted_quantile(
np.abs(rel_gap),
subset_weights,
0.9,
)
over_income_share = _weighted_mean(
over_income.astype(float),
subset_weights,
)

warn = (
mismatch_share > mismatch_share_warn_threshold
or mean_abs_rel_error > mean_abs_rel_error_warn_threshold
)
results.append(
{
"check": check_name,
"status": "WARN" if warn else "PASS",
"detail": _format_hourly_wage_income_detail(
comparable_count=int(mask.sum()),
comparable_weight=float(subset_weights.sum()),
mismatch_count=int(mismatch.sum()),
mismatch_share=mismatch_share,
mean_abs_rel_error=mean_abs_rel_error,
p90_abs_rel_error=p90_abs_rel_error,
over_income_share=over_income_share,
tolerance=relative_tolerance,
),
}
)

return results


def run_sanity_checks(
h5_path: str,
period: int = 2024,
Expand All @@ -61,6 +224,32 @@ def _get(f, path):
except KeyError:
return None

def _get_person_weights(f, period, person_count, household_weights):
if household_weights is None:
return None
if len(household_weights) == person_count:
return household_weights

person_hh_arr = _get(f, f"person_household_id/{period}")
if person_hh_arr is None:
person_hh_arr = _get(f, "person_household_id")
hh_id_arr = _get(f, f"household_id/{period}")
if hh_id_arr is None:
hh_id_arr = _get(f, "household_id")
if person_hh_arr is None or hh_id_arr is None:
return None
if len(hh_id_arr) != len(household_weights):
return None

household_weight_by_id = dict(zip(hh_id_arr.tolist(), household_weights))
try:
return np.array(
[household_weight_by_id[hh_id] for hh_id in person_hh_arr.tolist()],
dtype=float,
)
except KeyError:
return None

with h5py.File(h5_path, "r") as f:
# 1. Weight non-negativity
w_key = f"household_weight/{period}"
Expand Down Expand Up @@ -249,6 +438,48 @@ def _get(f, path):
}
)

employment_income = _get(f, f"employment_income/{period}")
hourly_wage = _get(f, f"hourly_wage/{period}")
hours_worked_last_week = _get(f, f"hours_worked_last_week/{period}")
is_paid_hourly = _get(f, f"is_paid_hourly/{period}")
hourly_inputs = [
employment_income,
hourly_wage,
hours_worked_last_week,
is_paid_hourly,
]
if any(value is None for value in hourly_inputs):
results.append(
{
"check": "hourly_wage_income_consistency",
"status": "SKIP",
"detail": "missing one or more hourly wage consistency inputs",
}
)
results.append(
{
"check": "hourly_wage_income_consistency_overtime",
"status": "SKIP",
"detail": "missing one or more hourly wage consistency inputs",
}
)
else:
person_weights = _get_person_weights(
f,
period,
len(employment_income),
weights,
)
results.extend(
build_hourly_wage_income_consistency_diagnostics(
employment_income=employment_income,
hourly_wage=hourly_wage,
hours_worked_last_week=hours_worked_last_week,
is_paid_hourly=is_paid_hourly,
weights=person_weights,
)
)

return results


Expand Down
25 changes: 25 additions & 0 deletions tests/integration/test_enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,31 @@ def test_ecps_mean_employment_income_reasonable(ecps_sim):
)


def test_ecps_hourly_wage_income_consistency_diagnostic_runs(ecps_sim):
from policyengine_us_data.calibration.sanity_checks import (
build_hourly_wage_income_consistency_diagnostics,
)

diagnostics = build_hourly_wage_income_consistency_diagnostics(
employment_income=ecps_sim.calculate("employment_income", map_to="person"),
hourly_wage=ecps_sim.calculate("hourly_wage", map_to="person"),
hours_worked_last_week=ecps_sim.calculate(
"hours_worked_last_week",
map_to="person",
),
is_paid_hourly=ecps_sim.calculate("is_paid_hourly", map_to="person"),
weights=ecps_sim.calculate("household_weight", map_to="person"),
)

by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}
for check in (
"hourly_wage_income_consistency",
"hourly_wage_income_consistency_overtime",
):
assert by_check[check]["status"] in {"PASS", "WARN"}
assert "weighted mismatch share" in by_check[check]["detail"]


def test_ecps_file_size():
from policyengine_us_data.storage import STORAGE_FOLDER

Expand Down
62 changes: 62 additions & 0 deletions tests/unit/calibration/test_hourly_wage_income_consistency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import h5py
import numpy as np

from policyengine_us_data.calibration.sanity_checks import (
build_hourly_wage_income_consistency_diagnostics,
run_sanity_checks,
)


def _write_period_dataset(h5, name, values, period=2024):
group = h5.create_group(name)
group.create_dataset(str(period), data=np.asarray(values))


def test_hourly_wage_income_consistency_warns_on_overtime_mismatch():
diagnostics = build_hourly_wage_income_consistency_diagnostics(
employment_income=np.array([65_000.0, 57_200.0, 40_000.0, 65_000.0]),
hourly_wage=np.array([25.0, 20.0, 0.0, 25.0]),
hours_worked_last_week=np.array([50.0, 50.0, 50.0, 50.0]),
is_paid_hourly=np.array([True, True, True, False]),
weights=np.array([2.0, 1.0, 1.0, 1.0]),
)

by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}

assert by_check["hourly_wage_income_consistency"]["status"] == "WARN"
assert by_check["hourly_wage_income_consistency_overtime"]["status"] == "WARN"
assert (
"66.7% weighted mismatch share"
in by_check["hourly_wage_income_consistency_overtime"]["detail"]
)
assert (
"imply annual wages above employment_income"
in by_check["hourly_wage_income_consistency_overtime"]["detail"]
)


def test_hourly_wage_income_consistency_passes_when_hourly_facts_reconcile():
diagnostics = build_hourly_wage_income_consistency_diagnostics(
employment_income=np.array([57_200.0, 41_600.0]),
hourly_wage=np.array([20.0, 20.0]),
hours_worked_last_week=np.array([50.0, 40.0]),
is_paid_hourly=np.array([True, True]),
)

assert [diagnostic["status"] for diagnostic in diagnostics] == ["PASS", "PASS"]


def test_run_sanity_checks_adds_hourly_wage_income_consistency(tmp_path):
h5_path = tmp_path / "sample.h5"
with h5py.File(h5_path, "w") as h5:
_write_period_dataset(h5, "household_weight", [2.0, 1.0])
_write_period_dataset(h5, "employment_income", [65_000.0, 57_200.0])
_write_period_dataset(h5, "hourly_wage", [25.0, 20.0])
_write_period_dataset(h5, "hours_worked_last_week", [50.0, 50.0])
_write_period_dataset(h5, "is_paid_hourly", [True, True])

diagnostics = run_sanity_checks(str(h5_path), period=2024)
by_check = {diagnostic["check"]: diagnostic for diagnostic in diagnostics}

assert by_check["hourly_wage_income_consistency"]["status"] == "WARN"
assert by_check["hourly_wage_income_consistency_overtime"]["status"] == "WARN"
Loading