diff --git a/policyengine_uk_data/datasets/frs.py b/policyengine_uk_data/datasets/frs.py index 7cc05cd3..c1692475 100644 --- a/policyengine_uk_data/datasets/frs.py +++ b/policyengine_uk_data/datasets/frs.py @@ -7,10 +7,16 @@ modelling and policy analysis. """ -from policyengine_uk.data import UKSingleYearDataset +from functools import lru_cache from pathlib import Path -import pandas as pd + import numpy as np +import pandas as pd +from policyengine_uk import CountryTaxBenefitSystem +from policyengine_uk.data import UKSingleYearDataset +from policyengine_uk.variables.household.income.employment_status import ( + EmploymentStatus, +) from policyengine_uk_data.utils.datasets import ( sum_to_entity, categorical, @@ -22,6 +28,192 @@ from policyengine_uk_data.parameters import load_take_up_rate, load_parameter +LEGACY_JOBSEEKER_MIN_AGE = 18 +HOURS_WORKED_WEEKS_PER_YEAR = 52 +ESA_MIN_AGE = 16 +ESA_HEALTH_EMPLOYMENT_STATUSES = ( + EmploymentStatus.LONG_TERM_DISABLED.name, + EmploymentStatus.SHORT_TERM_DISABLED.name, +) + + +@lru_cache(maxsize=None) +def load_legacy_jobseeker_max_annual_hours(year: int) -> int: + """Read the JSA single-claimant hours rule from policyengine-uk.""" + + system = CountryTaxBenefitSystem() + max_weekly_hours = int(system.parameters.gov.dwp.JSA.hours.single(str(year))) + return max_weekly_hours * HOURS_WORKED_WEEKS_PER_YEAR + + +def derive_legacy_jobseeker_proxy( + age, + employment_status, + hours_worked, + current_education, + employment_status_reported, + state_pension_age, + max_annual_hours, +) -> np.ndarray: + """Approximate legacy JSA claimant-state from observed survey data. + + This is intentionally a proxy, not a legislative determination. It + identifies person-level working-age adults who report being unemployed + and working less than the legacy JSA 16-hour weekly limit. The + ``hours_worked`` input is the annualised FRS-derived measure used in the + dataset, so the threshold is converted to annual hours here. + """ + + age = np.asarray(age) + employment_status = np.asarray(employment_status) + hours_worked = np.asarray(hours_worked) + current_education = np.asarray(current_education) + employment_status_reported = np.asarray(employment_status_reported) + state_pension_age = np.asarray(state_pension_age) + + return ( + employment_status_reported + & (age >= LEGACY_JOBSEEKER_MIN_AGE) + & (age < state_pension_age) + & (employment_status == "UNEMPLOYED") + & (hours_worked < max_annual_hours) + & (current_education == "NOT_IN_EDUCATION") + ) + + +def derive_esa_health_condition_proxy( + age, + employment_status, + employment_status_reported, + state_pension_age, +) -> np.ndarray: + """Approximate working-age ESA health-related claimant-state. + + This proxy relies only on person-level labour market status, not on + current disability or incapacity benefit receipt. It is a dataset-side + approximation for future modelling, not a direct observation of ESA + legal entitlement or LCW/LCWRA status. + """ + + age = np.asarray(age) + employment_status = np.asarray(employment_status) + employment_status_reported = np.asarray(employment_status_reported) + state_pension_age = np.asarray(state_pension_age) + disability_labour_market_state = np.isin( + employment_status, ESA_HEALTH_EMPLOYMENT_STATUSES + ) + + return ( + employment_status_reported + & (age >= ESA_MIN_AGE) + & (age < state_pension_age) + & disability_labour_market_state + ) + + +def derive_esa_support_group_proxy( + age, + employment_status, + hours_worked, + esa_health_condition_proxy, + employment_status_reported, + state_pension_age, +) -> np.ndarray: + """Approximate a severe-health ESA subgroup akin to support group. + + This is a stricter subset of ``esa_health_condition_proxy`` intended + for future legacy ESA approximation work. It uses only non-receipt + labour market signals already available in the survey. + """ + + age = np.asarray(age) + employment_status = np.asarray(employment_status) + hours_worked = np.asarray(hours_worked) + esa_health_condition_proxy = np.asarray(esa_health_condition_proxy) + employment_status_reported = np.asarray(employment_status_reported) + state_pension_age = np.asarray(state_pension_age) + severe_health_evidence = (employment_status == "LONG_TERM_DISABLED") & ( + hours_worked <= 0 + ) + + return ( + employment_status_reported + & (age >= ESA_MIN_AGE) + & (age < state_pension_age) + & esa_health_condition_proxy + & severe_health_evidence + ) + + +def add_legacy_benefit_proxies( + pe_person: pd.DataFrame, + employment_status_reported, + state_pension_age, + legacy_jobseeker_max_annual_hours, +) -> pd.DataFrame: + """Populate person-scoped ESA/JSA proxy columns on the person frame. + + These remain person-level by design because the claimant-state inputs + they approximate attach to individuals. Downstream benunit-level legacy + benefit models should aggregate them explicitly rather than assuming the + raw survey contains a benunit claimant-state field. + """ + + pe_person["legacy_jobseeker_proxy"] = derive_legacy_jobseeker_proxy( + age=pe_person.age, + employment_status=pe_person.employment_status, + hours_worked=pe_person.hours_worked, + current_education=pe_person.current_education, + employment_status_reported=employment_status_reported, + state_pension_age=state_pension_age, + max_annual_hours=legacy_jobseeker_max_annual_hours, + ) + pe_person["esa_health_condition_proxy"] = derive_esa_health_condition_proxy( + age=pe_person.age, + employment_status=pe_person.employment_status, + employment_status_reported=employment_status_reported, + state_pension_age=state_pension_age, + ) + pe_person["esa_support_group_proxy"] = derive_esa_support_group_proxy( + age=pe_person.age, + employment_status=pe_person.employment_status, + hours_worked=pe_person.hours_worked, + esa_health_condition_proxy=pe_person.esa_health_condition_proxy, + employment_status_reported=employment_status_reported, + state_pension_age=state_pension_age, + ) + return pe_person + + +def apply_legacy_benefit_proxies( + pe_person: pd.DataFrame, sim, year: int, employment_status_reported +) -> pd.DataFrame: + """Attach legacy ESA/JSA proxies using post-build simulation context.""" + + state_pension_age = sim.calculate("state_pension_age", year).values + legacy_jobseeker_max_annual_hours = load_legacy_jobseeker_max_annual_hours(year) + return add_legacy_benefit_proxies( + pe_person, + employment_status_reported=employment_status_reported, + state_pension_age=state_pension_age, + legacy_jobseeker_max_annual_hours=legacy_jobseeker_max_annual_hours, + ) + + +def attach_legacy_benefit_proxies_from_frs_person( + pe_person: pd.DataFrame, person: pd.DataFrame, sim, year: int +) -> pd.DataFrame: + """Bridge raw FRS person fields into the proxy derivation hook.""" + + employment_status_reported = person.empstati.fillna(0).to_numpy() > 0 + return apply_legacy_benefit_proxies( + pe_person, + sim, + year, + employment_status_reported=employment_status_reported, + ) + + def create_frs( raw_frs_folder: str, year: int, @@ -744,7 +936,6 @@ def determine_education_level(fted_val, typeed2_val, age_val): sim = Microsimulation(dataset=dataset) region = sim.populations["benunit"].household("region", dataset.time_period) lha_category = sim.calculate("LHA_category", year) - brma = np.empty(len(region), dtype=object) # Sample from a random BRMA in the region, weighted by the number of observations in each BRMA @@ -808,6 +999,13 @@ def determine_education_level(fted_val, typeed2_val, age_val): paragraph_3 | paragraph_4 | paragraph_5 ) + # Dataset-side claimant-state approximations for future legacy ESA/JSA + # modelling. These are explicit proxies based on observed survey + # conditions, not legislative determinations. + pe_person = attach_legacy_benefit_proxies_from_frs_person( + pe_person, person, sim, year + ) + # Generate stochastic take-up decisions # All randomness is generated here in the data package using take-up rates # stored in YAML parameter files. This keeps the country package purely diff --git a/policyengine_uk_data/tests/test_legacy_benefit_proxies.py b/policyengine_uk_data/tests/test_legacy_benefit_proxies.py new file mode 100644 index 00000000..07479277 --- /dev/null +++ b/policyengine_uk_data/tests/test_legacy_benefit_proxies.py @@ -0,0 +1,447 @@ +import numpy as np +import pandas as pd +import policyengine_uk +import policyengine_uk_data.datasets.frs as frs_module + +from policyengine_uk_data.datasets.frs import ( + add_legacy_benefit_proxies, + attach_legacy_benefit_proxies_from_frs_person, + apply_legacy_benefit_proxies, + create_frs, + derive_esa_health_condition_proxy, + derive_esa_support_group_proxy, + derive_legacy_jobseeker_proxy, + load_legacy_jobseeker_max_annual_hours, +) + + +class FakeSim: + def __init__(self, state_pension_age): + self._state_pension_age = np.asarray(state_pension_age) + + def calculate(self, variable, period): + assert variable == "state_pension_age" + return pd.Series(self._state_pension_age) + + +def test_legacy_jobseeker_proxy_tracks_unemployed_working_age_low_hours(): + max_annual_hours = load_legacy_jobseeker_max_annual_hours(2025) + result = derive_legacy_jobseeker_proxy( + age=np.array([18, 30, 30, 66, 17, 25, 25, 66, 30, 30]), + employment_status=np.array( + [ + "UNEMPLOYED", + "UNEMPLOYED", + "UNEMPLOYED", + "UNEMPLOYED", + "UNEMPLOYED", + "STUDENT", + "CARER", + "UNEMPLOYED", + "UNEMPLOYED", + "UNEMPLOYED", + ] + ), + hours_worked=np.array([0, 12 * 52, 16 * 52, 0, 0, 0, 0, 0, 0, 0]), + current_education=np.array( + [ + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "TERTIARY", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "UPPER_SECONDARY", + "NOT_IN_EDUCATION", + ] + ), + employment_status_reported=np.array( + [True, True, True, True, True, True, True, True, True, False] + ), + state_pension_age=np.array([66, 66, 66, 66, 66, 66, 66, 67, 66, 66]), + max_annual_hours=max_annual_hours, + ) + + assert result.tolist() == [ + True, + True, + False, + False, + False, + False, + False, + True, + False, + False, + ] + + +def test_esa_health_condition_proxy_uses_disabled_employment_states(): + result = derive_esa_health_condition_proxy( + age=np.array([16, 45, 45, 66, 45]), + employment_status=np.array( + [ + "LONG_TERM_DISABLED", + "SHORT_TERM_DISABLED", + "FT_EMPLOYED", + "LONG_TERM_DISABLED", + "LONG_TERM_DISABLED", + ] + ), + employment_status_reported=np.array([True, True, True, True, False]), + state_pension_age=np.array([66, 66, 66, 66, 66]), + ) + + assert result.tolist() == [True, True, False, False, False] + + +def test_esa_support_group_proxy_is_stricter_subset_of_health_proxy(): + health_proxy = np.array([True, True, True, False, True]) + result = derive_esa_support_group_proxy( + age=np.array([16, 45, 45, 66, 45]), + employment_status=np.array( + [ + "LONG_TERM_DISABLED", + "SHORT_TERM_DISABLED", + "LONG_TERM_DISABLED", + "FT_EMPLOYED", + "LONG_TERM_DISABLED", + ] + ), + hours_worked=np.array([0, 0, 12 * 52, 0, 0]), + esa_health_condition_proxy=health_proxy, + employment_status_reported=np.array([True, True, True, True, False]), + state_pension_age=np.array([66, 66, 66, 66, 66]), + ) + + assert result.tolist() == [True, False, False, False, False] + + +def test_add_legacy_benefit_proxies_wires_all_three_columns(): + pe_person = pd.DataFrame( + { + "age": [18, 45, 45, 66], + "employment_status": [ + "UNEMPLOYED", + "LONG_TERM_DISABLED", + "SHORT_TERM_DISABLED", + "LONG_TERM_DISABLED", + ], + "hours_worked": [0, 0, 12 * 52, 0], + "current_education": [ + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + "NOT_IN_EDUCATION", + ], + "is_disabled_for_benefits": [False, True, False, True], + "is_severely_disabled_for_benefits": [False, False, True, True], + "esa_income_reported": [0.0, 0.0, 100.0, 0.0], + "esa_contrib_reported": [0.0, 0.0, 0.0, 0.0], + "incapacity_benefit_reported": [0.0, 0.0, 0.0, 0.0], + "sda_reported": [0.0, 0.0, 0.0, 0.0], + } + ) + + result = add_legacy_benefit_proxies( + pe_person.copy(), + employment_status_reported=np.array([True, True, True, False]), + state_pension_age=np.array([66, 66, 66, 66]), + legacy_jobseeker_max_annual_hours=load_legacy_jobseeker_max_annual_hours(2025), + ) + + assert result["legacy_jobseeker_proxy"].tolist() == [True, False, False, False] + assert result["esa_health_condition_proxy"].tolist() == [False, True, True, False] + assert result["esa_support_group_proxy"].tolist() == [False, True, False, False] + + +def test_legacy_jobseeker_hours_limit_matches_policyengine_uk_parameter(): + assert load_legacy_jobseeker_max_annual_hours(2025) == 16 * 52 + + +def test_apply_legacy_benefit_proxies_uses_sim_state_pension_age(): + pe_person = pd.DataFrame( + { + "age": [66, 66], + "employment_status": ["UNEMPLOYED", "UNEMPLOYED"], + "hours_worked": [0, 0], + "current_education": ["NOT_IN_EDUCATION", "NOT_IN_EDUCATION"], + } + ) + + result = apply_legacy_benefit_proxies( + pe_person.copy(), + FakeSim([66, 67]), + 2025, + employment_status_reported=np.array([True, True]), + ) + + assert result["legacy_jobseeker_proxy"].tolist() == [False, True] + + +def test_attach_legacy_benefit_proxies_from_frs_person_uses_empstati_mask(): + pe_person = pd.DataFrame( + { + "age": [30, 30], + "employment_status": ["UNEMPLOYED", "LONG_TERM_DISABLED"], + "hours_worked": [12 * 52, 0], + "current_education": ["NOT_IN_EDUCATION", "NOT_IN_EDUCATION"], + } + ) + person = pd.DataFrame({"empstati": [1, np.nan]}) + + result = attach_legacy_benefit_proxies_from_frs_person( + pe_person.copy(), + person, + FakeSim([66, 66]), + 2025, + ) + + assert result["legacy_jobseeker_proxy"].tolist() == [True, False] + assert result["esa_health_condition_proxy"].tolist() == [False, False] + assert result["esa_support_group_proxy"].tolist() == [False, False] + + +class FakeBenunitPopulation: + def __init__(self, dataset): + self.dataset = dataset + + def household(self, variable, period): + if variable == "region": + return np.array(["LONDON"]) + if variable == "household_id": + return np.array([100]) + raise KeyError(variable) + + +class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + self.populations = {"benunit": FakeBenunitPopulation(dataset)} + self.tax_benefit_system = type( + "FakeTaxBenefitSystem", + (), + { + "parameters": lambda self, year: type( + "FakeParametersRoot", + (), + { + "gov": type( + "FakeGov", + (), + { + "dwp": type( + "FakeDwp", + (), + { + "dla": type( + "FakeDla", + (), + { + "self_care": type( + "FakeSelfCare", + (), + {"higher": 1}, + )() + }, + )(), + "pip": type( + "FakePip", + (), + { + "daily_living": type( + "FakeDailyLiving", + (), + {"enhanced": 1}, + )() + }, + )(), + }, + )() + }, + )() + }, + )() + }, + )() + + def calculate(self, variable, year=None): + if variable == "LHA_category": + return np.array(["A"]) + if variable == "household_id": + return np.array([100]) + if variable == "state_pension_age": + return pd.Series([66]) + raise KeyError(variable) + + +def test_create_frs_smoke_includes_legacy_proxy_columns(tmp_path, monkeypatch): + original_read_csv = frs_module.pd.read_csv + + def fake_read_csv(path, *args, **kwargs): + if str(path).endswith("lha_list_of_rents.csv.gz"): + return pd.DataFrame( + {"region": ["LONDON"], "lha_category": ["A"], "brma": ["BRMA1"]} + ) + return original_read_csv(path, *args, **kwargs) + + monkeypatch.setattr(policyengine_uk, "Microsimulation", FakeMicrosimulation) + monkeypatch.setattr(frs_module.pd, "read_csv", fake_read_csv) + monkeypatch.setattr(frs_module, "load_take_up_rate", lambda *args, **kwargs: 0.0) + monkeypatch.setattr(frs_module, "load_parameter", lambda *args, **kwargs: 0.0) + monkeypatch.setattr( + frs_module, "sum_to_entity", lambda values, ids, index: np.zeros(len(index)) + ) + monkeypatch.setattr( + frs_module, + "sum_from_positive_fields", + lambda table, fields: np.zeros(len(table)), + ) + monkeypatch.setattr( + frs_module, + "sum_positive_variables", + lambda variables: ( + np.sum(np.vstack([np.asarray(v) for v in variables]), axis=0) + if variables + else 0 + ), + ) + monkeypatch.setattr( + frs_module, + "fill_with_mean", + lambda table, indicator, amount: np.zeros(len(table)), + ) + + adult = pd.DataFrame( + [ + { + "sernum": 100, + "benunit": 1, + "person": 1, + "accssamt": 0, + "adema": 0, + "ademaamt": 0, + "age": 30, + "age80": 30, + "cvpay": 0, + "educft": 0, + "educqual": 0, + "eduma": 0, + "edumaamt": 0, + "empstati": 8, + "fsbval": 0, + "fsfvval": 0, + "fsmval": 0, + "fted": 0, + "heartval": 0, + "hrpid": 1, + "inearns": 0, + "marital": 0, + "mntamt1": 0, + "mntamt2": 0, + "mntus1": 0, + "mntusam1": 0, + "redamt": 0, + "royyr1": 0, + "seincam2": 0, + "sex": 1, + "slrepamt": 0, + "smpadj": 0, + "sspadj": 0, + "tothours": 0, + "tuborr": 0, + "typeed2": 0, + "uperson": 1, + "allpay2": 0, + "royyr2": 0, + "royyr3": 0, + "royyr4": 0, + "chamtern": 0, + "chamttst": 0, + "apamt": 0, + "apdamt": 0, + "pareamt": 0, + "allpay3": 0, + "allpay4": 0, + "grtdir1": 0, + "grtdir2": 0, + } + ] + ) + child = pd.DataFrame(columns=adult.columns) + benunit = pd.DataFrame([{"sernum": 100, "benunit": 1, "famtypb2": 1}]) + househol = pd.DataFrame( + [ + { + "sernum": 100, + "adulth": 1, + "bedroom6": 1, + "csewamt": 0, + "ctannual": 0, + "ctband": 1, + "ctrebamt": 0, + "cwatamtd": 0, + "gross4": 0, + "gvtregno": 1, + "hhrent": 0, + "mortint": 0, + "ptentyp2": 0, + "rt2rebam": 0, + "struins": 0, + "subrent": 0, + "tentyp2": 0, + "typeacc": 0, + "watsewrt": 0, + "niratlia": 0, + **{f"chrgamt{i}": 0 for i in range(1, 10)}, + } + ] + ) + raw_tables = { + "adult": adult, + "child": child, + "benunit": benunit, + "househol": househol, + "pension": pd.DataFrame( + columns=[ + "person", + "sernum", + "penoth", + "penpay", + "poamt", + "poinc", + "ptamt", + "ptinc", + ] + ), + "oddjob": pd.DataFrame(columns=["person", "sernum", "ojamt", "ojnow"]), + "accounts": pd.DataFrame( + columns=["person", "sernum", "accint", "acctax", "invtax", "account"] + ), + "job": pd.DataFrame(columns=["person", "sernum", "deduc1", "spnamt", "salsac"]), + "benefits": pd.DataFrame( + columns=["person", "sernum", "benamt", "benefit", "var2"] + ), + "maint": pd.DataFrame(columns=["person", "sernum", "mramt", "mruamt", "mrus"]), + "penprov": pd.DataFrame(columns=["person", "sernum", "penamt", "stemppen"]), + "chldcare": pd.DataFrame( + columns=["person", "sernum", "chamt", "cost", "registrd"] + ), + "extchild": pd.DataFrame(columns=["sernum", "nhhamt"]), + "mortgage": pd.DataFrame( + columns=["sernum", "borramt", "mortend", "rmamt", "rmort"] + ), + } + + for name, table in raw_tables.items(): + table.to_csv(tmp_path / f"{name}.tab", sep="\t", index=False) + + dataset = create_frs(tmp_path, 2025) + + assert { + "legacy_jobseeker_proxy", + "esa_health_condition_proxy", + "esa_support_group_proxy", + }.issubset(dataset.person.columns)