In [19]:
%%capture
! pip install pyarrow

In [20]:
import os
import pandas as pd
import numpy as np
from typing import List, Tuple

In [21]:
# df = pd.read_stata('data/extracted/randhrs1992_2022v1.dta', convert_categoricals=False)
# df.to_parquet('data/extracted/randhrs1992_2022v1.parquet')
# df.columns = df.columns.str.upper()
df = pd.read_parquet('data/extracted/randhrs1992_2022v1.parquet')
df.head()

Unnamed: 0,HHIDPN,S1HHIDPN,R1MSTAT,R1MPART,S1BMONTH,S1BYEAR,S1BDATE,S1BFLAG,S1COHBYR,S1HRSAMP,...,R8LBSATWLF,R9LBSATWLF,R10LBSATWLF,R11LBSATWLF,R12LBSATWLF,R13LBSATWLF,R14LBSATWLF,R15LBSATWLF,R16LBSATWLF,FILEVER
0,1010,0.0,5.0,0.0,,,,,,,...,,,,,,,,,,X
1,2010,0.0,7.0,0.0,,,,,,,...,,,,,,,,,,X
2,3010,3020.0,1.0,0.0,9.0,1938.0,-7778.0,0.0,3.0,1.0,...,5.4,,6.4,,,,,,,X
3,3020,3010.0,1.0,0.0,1.0,1936.0,-8752.0,0.0,3.0,1.0,...,4.6,,4.2,,2.4,,,,,X
4,10001010,0.0,8.0,0.0,,,,,,,...,,2.8,,4.8,,,,,,X


In [22]:
WAVE_YEARS = {
    1: 1992,  # HRS cohort entry
    2: 1993,  # AHEAD cohort entry (off-year)
    3: 1994,  # Off-year
    4: 1995,  # Off-year
    5: 1996,  # Biennial pattern begins
    6: 1998,  # CODA/WB cohorts enter
    7: 2000,
    8: 2002,
    9: 2004,  # Early Baby Boomer cohort enters
    10: 2006,
    11: 2008,
    12: 2010, # Mid Baby Boomer cohort enters
    13: 2012,
    14: 2014,
    15: 2016, # Late Baby Boomer cohort enters
    16: 2018,
    17: 2020, # COVID wave
    18: 2022  # Early Generation X cohort enters
}

def get_wave_year(wave: int) -> int | float:
    return WAVE_YEARS.get(wave, np.nan)

def get_variable_name(wave: int, var_base: str) -> str:
    return f"R{wave}{var_base}"

In [23]:
def extract_wave_features(df: pd.DataFrame, wave: int, lags: int = 2):

    data = {}

    # Base identifiers
    data['person_id'] = df['HHIDPN']
    data['wave'] = wave
    data['calendar_year'] = WAVE_YEARS[wave]

    # Demographics
    birth_year = pd.to_numeric(df['RABYEAR'], errors='coerce')
    age = WAVE_YEARS[wave] - birth_year # pyright: ignore[reportOperatorIssue]

    data['birth_year'] = birth_year
    data['age'] = age
    data['age_squared'] = age ** 2

    data['female'] = (pd.to_numeric(df['RAGENDER'], errors='coerce') == 2)


    # data['white'] = (pd.to_numeric(df['RARACEM'], errors='coerce') == 1)
    # data['black'] = (pd.to_numeric(df['RARACEM'], errors='coerce') == 2)
    # data['hispanic'] = (pd.to_numeric(df['RAHISPAN'], errors='coerce') == 1)

    race = pd.to_numeric(df['RARACEM'], errors='coerce')
    hisp = pd.to_numeric(df['RAHISPAN'], errors='coerce')

    def map_ethnicity(r, h):
        if h == 1:
            return "Hispanic"
        elif r == 1:
            return "White"
        elif r == 2:
            return "Black"
        else:
            return "Other"

    data['ethnicity'] = pd.Categorical(
        [map_ethnicity(r, h) for r, h in zip(race, hisp)],
        categories=["White", "Black", "Hispanic", "Other"]
    )

    data['education_years'] = pd.to_numeric(df['RAEDUC'], errors='coerce')
    data['college_plus'] = (pd.to_numeric(df['RAEDEGRM'], errors='coerce') >= 4)  # pyright: ignore[reportOperatorIssue]

    variables = {
        'self_rated_health': 'SHLT',
        'bmi': 'BMI',
        'weight': 'WEIGHT',
        'height': 'HEIGHT',
        'mobility_limitations': 'MOBILA',
        'large_muscle_limitations': 'LGMUSA',
        'adl_limitations': 'ADL5A',
        'iadl_limitations': 'IADL5A',
        'fine_motor_limitations': 'FINEA',
        'cognition_score': 'COG27',
        'memory_recall': 'TR20',
        'immediate_recall': 'IMRC',
        'delayed_recall': 'DLRC',
        'serial7': 'SER7',
        'depression_score': 'CESD',
        'felt_depressed': 'DEPRES',
        'everything_effort': 'EFFORT',
        'restless_sleep': 'SLEEPR',
        'felt_lonely': 'FLONE',
        'ever_smoked': 'SMOKEV',
        'current_smoker': 'SMOKEN',
        'drinks_per_day': 'DRINKD',
        'drink_days_per_week': 'DRINKN',
        'vigorous_activity': 'VIGACT',
        'marital_status': 'MSTAT'
    }

    # Loop from 0 to lags
    for lag in range(lags + 1):
        w = wave - lag
        if w < 1:
            continue

        suffix = f"_lag{lag}" if lag > 0 else ""

        for name, code in variables.items():
            col = f'R{w}{code}'
            data[f'{name}{suffix}'] = pd.to_numeric(df.get(col), errors='raise')

    features = pd.DataFrame(data)

    return features

In [24]:
import pandas as pd
import numpy as np

def add_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add simplified temporal features suitable for a screening app.
    Uses first differences (current - lag1).
    Avoids DataFrame fragmentation by concatenating once.
    """

    new_features = {}

    # =====================================================
    # BMI & WEIGHT
    # =====================================================
    new_features['bmi_change'] = df['bmi'] - df['bmi_lag1']
    new_features['weight_change_kg'] = df['weight'] - df['weight_lag1']

    # Safe percent change (avoid division by zero)
    new_features['weight_change_pct'] = np.where(
        df['weight_lag1'] != 0,
        (new_features['weight_change_kg'] / df['weight_lag1']) * 100,
        np.nan
    )

    new_features['obese'] = (df['bmi'] >= 30).astype(int)
    new_features['overweight'] = ((df['bmi'] >= 25) & (df['bmi'] < 30)).astype(int)
    new_features['rapid_weight_gain'] = (new_features['bmi_change'] > 1).astype(int)
    new_features['rapid_weight_loss'] = (new_features['bmi_change'] < -1).astype(int)

    # =====================================================
    # SELF-RATED HEALTH
    # =====================================================
    new_features['health_change'] = df['self_rated_health'] - df['self_rated_health_lag1']
    new_features['health_worsening'] = (new_features['health_change'] > 0).astype(int)
    new_features['health_crash'] = (new_features['health_change'] >= 2).astype(int)

    # =====================================================
    # FUNCTIONAL STATUS
    # =====================================================
    new_features['mobility_change'] = df['mobility_limitations'] - df['mobility_limitations_lag1']
    new_features['mobility_worsening'] = (new_features['mobility_change'] > 0).astype(int)
    new_features['new_mobility_problem'] = (
        (df['mobility_limitations'] > 0) &
        (df['mobility_limitations_lag1'] == 0)
    ).astype(int)
    new_features['any_mobility_limitation'] = (df['mobility_limitations'] > 0).astype(int)

    new_features['adl_change'] = df['adl_limitations'] - df['adl_limitations_lag1']
    new_features['adl_worsening'] = (new_features['adl_change'] > 0).astype(int)
    new_features['any_adl_limitation'] = (df['adl_limitations'] > 0).astype(int)

    new_features['iadl_change'] = df['iadl_limitations'] - df['iadl_limitations_lag1']
    new_features['any_iadl_limitation'] = (df['iadl_limitations'] > 0).astype(int)

    # =====================================================
    # COGNITION
    # =====================================================
    new_features['cognition_change'] = df['cognition_score_lag1'] - df['cognition_score']
    new_features['cognition_worsening'] = (new_features['cognition_change'] > 0).astype(int)
    new_features['low_cognition'] = (df['cognition_score'] < 12).astype(int)

    new_features['memory_change'] = df['memory_recall_lag1'] - df['memory_recall']
    new_features['memory_worsening'] = (new_features['memory_change'] > 0).astype(int)

    # =====================================================
    # MENTAL HEALTH
    # =====================================================
    new_features['depression_change'] = df['depression_score'] - df['depression_score_lag1']
    new_features['depression_worsening'] = (new_features['depression_change'] > 0).astype(int)
    new_features['elevated_depression'] = (df['depression_score'] >= 3).astype(int)
    new_features['chronic_depression'] = (
        (df['depression_score'] >= 3) &
        (df['depression_score_lag1'] >= 3)
    ).astype(int)

    # =====================================================
    # HEALTH BEHAVIORS
    # =====================================================

    new_features['sleep_problem'] = (df['restless_sleep'] == 1).astype(int)

    new_features['sleep_change'] = df['restless_sleep'] - df['restless_sleep_lag1']

    new_features['new_sleep_problem'] = (
        (df['restless_sleep'] == 1) &
        (df['restless_sleep_lag1'] == 0)
    ).astype(int)
    
    new_features['former_smoker'] = (
        (df['ever_smoked'] == 1) &
        (df['current_smoker'] == 0)
    ).astype(int)

    new_features['quit_smoking'] = (
        (df['current_smoker'] == 0) &
        (df['current_smoker_lag1'] == 1)
    ).astype(int)

    new_features['sedentary'] = (df['vigorous_activity'] == 0).astype(int)
    new_features['stopped_activity'] = (
        (df['vigorous_activity'] == 0) &
        (df['vigorous_activity_lag1'] == 1)
    ).astype(int)

    new_features['drinks_per_week'] = df['drinks_per_day'] * df['drink_days_per_week']
    new_features['heavy_drinking'] = (new_features['drinks_per_week'] > 14).astype(int)


    df = pd.concat([df, pd.DataFrame(new_features, index=df.index)], axis=1)

    return df


In [32]:
disease_map = {
    'diabetes': 'DIABE',
    'cvd': 'HEARTE', # cardiovascular disease
    'stroke': 'STROKE',
    'lung': 'LUNGE',
    'cancer': 'CANCRE',
    'hibp': 'HIBPE', # high blood preasure 
    'arthritis': 'ARTHRE',
    "psychiatric": "PSYCHE",  # Psychiatric problems
    "memory":      "MEMRYE",
}

def create_targets(df: pd.DataFrame, feature_wave: int, outcome_wave: int):

    new_targets = {}
    new_targets['person_id'] = df['HHIDPN']

    for target_name, code in disease_map.items():
        baseline_col = f'R{feature_wave}{code}'
        outcome_col = f'R{outcome_wave}{code}'

        if baseline_col in df.columns and outcome_col in df.columns:
            no_disease_baseline = df[baseline_col] == 0
            develops_disease = df[outcome_col] == 1

            target_values = (no_disease_baseline & develops_disease).astype(float)
            target_values[df[outcome_col].isna()] = np.nan

            new_targets[f'target_{target_name}'] = target_values
            new_targets[f'eligible_{target_name}'] = no_disease_baseline.astype(int)

    # Single DataFrame creation → no fragmentation
    targets = pd.DataFrame(new_targets, index=df.index)

    return targets


In [33]:
# Add to your data processing notebook
print("\n=== Checking Disease Columns ===")
for disease, code in disease_map.items():
    # Check wave 5 as example
    col = f'R5{code}'
    exists = col in df.columns
    if exists:
        count = df[col].notna().sum()
        print(f"✅ {disease:12} {col:15} exists, n={count:,}")
    else:
        print(f"❌ {disease:12} {col:15} NOT FOUND")


=== Checking Disease Columns ===
✅ diabetes     R5DIABE         exists, n=19,578
✅ cvd          R5HEARTE        exists, n=19,578
✅ stroke       R5STROKE        exists, n=19,578
✅ lung         R5LUNGE         exists, n=19,578
✅ cancer       R5CANCRE        exists, n=19,578
✅ hibp         R5HIBPE         exists, n=19,578
✅ arthritis    R5ARTHRE        exists, n=19,578
✅ psychiatric  R5PSYCHE        exists, n=19,578
✅ memory       R5MEMRYE        exists, n=19,578


In [26]:
def create_dataset(
    df: pd.DataFrame, 
    feature_waves: List[int], 
    prediction_horizon: int = 2, 
    target: str = 'diabetes'
):
    """
    Create a modeling dataset for a specific disease target.
    """
    # Ensure standard naming convention
    if 'hhidpn' in df.columns:
        df.columns = df.columns.str.upper()
    
    all_data = []
    
    for wave in feature_waves:
        outcome_wave = wave + prediction_horizon
        if outcome_wave > 16:
            continue
        
        # 1. Extract wave features and add temporal math (lags, velocity)
        feats = extract_wave_features(df, wave)
        feats = add_temporal_features(feats)
        
        # 2. Create target and eligibility flags for this specific horizon
        targs = create_targets(df, wave, outcome_wave)
        
        # 3. Join features and targets on unique ID
        combined = feats.merge(targs, on='person_id', how='inner')
        all_data.append(combined)
    
    # Combine all selected waves into one master dataframe
    dataset = pd.concat(all_data, ignore_index=True)

    
    # --- FILTERING FOR THE SPECIFIC TARGET ---
    eligible_col = f'eligible_{target}'
    target_col = f'target_{target}'
    
    # Only keep people who did NOT have the disease at the feature wave
    dataset = dataset[dataset[eligible_col] == 1].copy()
    
    # Remove rows where the outcome is unknown (NaN)
    dataset = dataset[pd.notna(dataset[target_col])].copy()
    
    # --- FEATURE SELECTION ---
    # We must exclude ID, metadata, and ALL target/eligible flags to prevent leakage
    exclude_prefixes = ('target_', 'eligible_')
    metadata_cols = [
        # 'person_id', 
        'wave', 
        # 'wave_year'
    ]
    # person id is important, wave year also
    
    feature_cols = [c for c in dataset.columns 
                    if not c.startswith(exclude_prefixes) 
                    and c not in metadata_cols]

    X = dataset[feature_cols].copy()
    y = dataset[target_col].copy()
    
    return X, y

In [27]:
feature_waves = [5, 6, 7, 8, 9, 10, 11, 12]
prediction_horizon = 2  # 4 years

years = prediction_horizon * 2
save_dir = "data/processed"
os.makedirs(save_dir, exist_ok=True)

for disease in disease_map.keys():

    print(f"\nCreating dataset for {disease}...")

    X, y = create_dataset(
        df=df,
        feature_waves=feature_waves,
        prediction_horizon=prediction_horizon,
        target=disease
    )

    # Ensure y is a Series
    if isinstance(y, pd.DataFrame):
        y = y.iloc[:, 0]

    target_name = f"incident_{disease}_{years}yr"

    dataset = pd.concat(
        [
            X.reset_index(drop=True),
            y.reset_index(drop=True).rename(target_name)
        ],
        axis=1
    )

    # File paths
    csv_path = os.path.join(save_dir, f"randhrs_{disease}_{years}yr.csv")
    parquet_path = os.path.join(save_dir, f"randhrs_{disease}_{years}yr.parquet")

    # Save CSV
    dataset.to_csv(csv_path, index=False)

    # Save Parquet
    dataset.to_parquet(parquet_path, index=False)

    print(
        f"{disease}: shape = {dataset.shape}, "
        f"positives = {dataset[target_name].sum()}, "
        f"saved to {csv_path} & {parquet_path}"
    )



Creating dataset for diabetes...


diabetes: shape = (99591, 122), positives = 6493.0, saved to data/processed/randhrs_diabetes_4yr.csv & data/processed/randhrs_diabetes_4yr.parquet

Creating dataset for cvd...
cvd: shape = (97871, 122), positives = 8020.0, saved to data/processed/randhrs_cvd_4yr.csv & data/processed/randhrs_cvd_4yr.parquet

Creating dataset for stroke...
stroke: shape = (113886, 122), positives = 3976.0, saved to data/processed/randhrs_stroke_4yr.csv & data/processed/randhrs_stroke_4yr.parquet

Creating dataset for lung...
lung: shape = (113082, 122), positives = 3811.0, saved to data/processed/randhrs_lung_4yr.csv & data/processed/randhrs_lung_4yr.parquet

Creating dataset for cancer...
cancer: shape = (107492, 122), positives = 5032.0, saved to data/processed/randhrs_cancer_4yr.csv & data/processed/randhrs_cancer_4yr.parquet

Creating dataset for hibp...
hibp: shape = (58085, 122), positives = 10302.0, saved to data/processed/randhrs_hibp_4yr.csv & data/processed/randhrs_hibp_4yr.parquet

Creating da