
# NHANES 1999–2023 Covariates Builder (Revised)

This notebook builds the following files into your `CONFIG.out_dir`:

- `cov_smk_1999_2023.parquet`
- `cov_alc_1999_2023.parquet`
- `cov_pa_1999_2023.parquet`
- `cov_bmx_1999_2023.parquet`
- `cov_clinical_1999_2023.parquet`
- `cov_household_1999_2023.parquet`
- `cov_core_1999_2023.parquet`

**Key fixes vs. your previous version**  
- The PA builder writes only `SEQN, LTPA, METSCORE, IMP` → prevents `_X/_Y` duplicates in CORE.  
- The CORE merge explicitly restricts columns from each piece, ensuring clean output.  
- `METSCORE` now flows correctly into CORE.


In [1]:

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional
import numpy as np, pandas as pd

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)

# --- Base project folder (adjust if needed) ---
BASE = Path("/Users/dengshuyue/Desktop/SDOH/analysis")

@dataclass
class Config:
    raw_dir: Path = BASE / "data"
    interim_dir: Path = BASE / "data" / "cov"
    out_dir: Path = BASE / "output"

    # Preferred inputs (set demo_9923 to your stack)
    demo_9923: Path = BASE / "data" / "cov" / "demo9923.parquet"
    demo_9918: Optional[Path] = None

    # Optional preferred sources (leave None if you want auto-discovery)
    bmx_9923: Optional[Path] = None
    smk_9918: Optional[Path] = None
    smk_1923: Optional[Path] = None
    pa_9918_imputed: Optional[Path] = None
    pa_1923: Optional[Path] = None
    clinical_9918: Optional[Path] = None
    clinical_1923: Optional[Path] = None

    # Output file names (fixed 1999–2023 names)
    cov_smk: str       = "cov_smk_1999_2023.parquet"
    cov_alc: str       = "cov_alc_1999_2023.parquet"
    cov_pa: str        = "cov_pa_1999_2023.parquet"
    cov_bmx: str       = "cov_bmx_1999_2023.parquet"
    cov_clinical: str  = "cov_clinical_1999_2023.parquet"
    cov_household: str = "cov_household_1999_2023.parquet"
    cov_core: str      = "cov_core_1999_2023.parquet"

CONFIG = Config()

# --- helpers ---
NHANES_MISS = {7, 9, 77, 99, 777, 999, 7777, 9999, 77777, 99999}

def log(msg: str) -> None:
    print(msg, flush=True)

def ensure_dir(p: Path | str) -> None:
    Path(p).mkdir(parents=True, exist_ok=True)

def upper_df(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()
    d.columns = [c.upper() for c in d.columns]
    return d

def nhanes_na(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce").mask(lambda x: x.isin(NHANES_MISS))

def read_any(p: Path) -> pd.DataFrame:
    p = Path(p)
    return pd.read_parquet(p) if p.suffix.lower()==".parquet" else pd.read_csv(p, low_memory=False)

def pick_first_existing(*cands: Optional[Path]) -> Optional[Path]:
    for c in cands:
        if c and Path(c).exists():
            return Path(c)
    return None

def norm_seqn(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["SEQN"] = pd.to_numeric(df["SEQN"], errors="coerce").astype("Int64")
    return df


## SMK 

In [2]:

SMK_STATUS_CATS = pd.CategoricalDtype(["NEVER", "FORMER", "CURRENT"], ordered=True)

def build_smk(cfg: Config = CONFIG) -> pd.DataFrame:
    """Build smoking covariates with tolerant inputs."""
    ensure_dir(cfg.out_dir)

    def _read_any(p: Path) -> pd.DataFrame:
        return pd.read_parquet(p) if str(p).endswith(".parquet") else pd.read_csv(p, low_memory=False)

    # 1) Locate sources (prefer combined 99–23; else 99–18 (+ optional 19–23))
    smk_9923 = pick_first_existing(
        cfg.interim_dir / "smk_9923.parquet",
        cfg.interim_dir / "smk_9923.csv",
    )
    if smk_9923:
        smk = upper_df(_read_any(smk_9923))
        src_msg = f"using {smk_9923.name}"
    else:
        p9918 = pick_first_existing(cfg.smk_9918, cfg.interim_dir / "smk_9918.parquet", cfg.interim_dir / "smk_9918.csv")
        p1923 = pick_first_existing(cfg.smk_1923, cfg.interim_dir / "smk_1923.parquet", cfg.interim_dir / "smk_1923.csv")
        if p9918 is None:
            raise FileNotFoundError("Provide smk_9923 or smk_9918 (csv/parquet) under interim.")
        smk = upper_df(_read_any(p9918))
        if p1923:
            smk = pd.concat([smk, upper_df(_read_any(p1923))], ignore_index=True)
            src_msg = f"using {Path(p9918).name} + {Path(p1923).name}"
        else:
            src_msg = f"using {Path(p9918).name}"

    # 2) If standardized columns already exist, just use them
    needed = {"SEQN", "SMK_STATUS", "CIGS_PER_DAY", "PACK_YEARS", "FORMER_SMOKER"}
    have_std = needed.issubset(set(smk.columns))

    if not have_std:
        d = smk.copy()
        for junk in ["UNNAMED: 0", "INDEX"]:
            if junk in d.columns:
                d = d.drop(columns=[junk])

        out = pd.DataFrame({"SEQN": d["SEQN"]})

        # SMK_STATUS
        if "SMK_STATUS" in d.columns:
            smk_status = d["SMK_STATUS"].astype("string").str.strip().str.upper()
        else:
            smk_num = pd.to_numeric(d.get("SMK"), errors="coerce")
            smk_status = smk_num.map({1: "NEVER", 2: "FORMER", 3: "CURRENT"}).astype("string")
            if smk_status.isna().all() and (("SMQ020" in d.columns) or ("SMQ040" in d.columns)):
                ever = d.get("SMQ020")
                if ever is not None:
                    ever = pd.to_numeric(ever, errors="coerce").replace({2: 0, 1: 1})
                smq040 = pd.to_numeric(d.get("SMQ040"), errors="coerce")
                smk_status = pd.Series("NEVER", index=d.index, dtype="string")
                if ever is not None:
                    smk_status = smk_status.mask(ever == 1, "FORMER")
                if smq040 is not None:
                    smk_status = smk_status.mask(smq040.isin([1, 2]), "CURRENT")
                    smk_status = smk_status.mask(smq040 == 3, "FORMER")

        out["SMK_STATUS"] = smk_status.astype("string").str.upper()

        # CIGS_PER_DAY
        cigs = pd.to_numeric(d.get("SMK_AVG"), errors="coerce")
        if cigs.isna().all():
            q = pd.to_numeric(d.get("SMQ050Q"), errors="coerce")
            u = pd.to_numeric(d.get("SMQ050U"), errors="coerce")
            if q is not None and u is not None:
                cigs = q.where(u == 1).fillna((q / 7.0).where(u == 2)).fillna((q / 30.0).where(u == 3))
        out["CIGS_PER_DAY"] = cigs

        # PACK_YEARS
        pack_years = pd.to_numeric(d.get("PACK_YR"), errors="coerce")
        if pack_years.isna().all():
            years = pd.to_numeric(d.get("SMK_YR"), errors="coerce")
            if years is None or years.isna().all():
                years = pd.to_numeric(d.get("SMD030"), errors="coerce")
            pack_years = (out["CIGS_PER_DAY"] / 20.0) * years
        out["PACK_YEARS"] = pack_years

        # FORMER_SMOKER
        out["FORMER_SMOKER"] = out["SMK_STATUS"].eq("FORMER").fillna(False).astype("int8")

        out.loc[out["CIGS_PER_DAY"] < 0, "CIGS_PER_DAY"] = np.nan
        out.loc[out["PACK_YEARS"] < 0, "PACK_YEARS"] = np.nan

        smk_std = out
    else:
        smk_std = smk[list(needed)].copy()
        smk_std["SMK_STATUS"] = smk_std["SMK_STATUS"].astype("string").str.upper()
        smk_std["FORMER_SMOKER"] = pd.to_numeric(smk_std["FORMER_SMOKER"], errors="coerce").fillna(0).astype("int8")

    smk_std = smk_std.drop_duplicates("SEQN")
    smk_std["SMK_STATUS"] = smk_std["SMK_STATUS"].astype("category").cat.set_categories(SMK_STATUS_CATS.categories, ordered=True)
    outp = cfg.out_dir / cfg.cov_smk
    smk_std.to_parquet(outp, index=False)

    miss_rate = smk_std[["SMK_STATUS", "CIGS_PER_DAY", "PACK_YEARS"]].isna().mean().round(3).to_dict()
    log(f"✓ SMK → {outp} ({src_msg}); missing: {miss_rate}")
    return smk_std


In [3]:

# ---------- Alcohol builder with optional CDC fetch (revised) ----------
import pandas as pd, numpy as np
from pathlib import Path

def _read_any(p: Path) -> pd.DataFrame:
    p = Path(p)
    if p.suffix.lower() == ".parquet":
        return pd.read_parquet(p)
    return pd.read_csv(p, low_memory=False)

def _clean_num(s: pd.Series) -> pd.Series:
    NH_MISS = {7, 9, 77, 99, 777, 999, 7777, 9999, 77777, 99999}
    s = pd.to_numeric(s, errors="coerce")
    return s.mask(s.isin(NH_MISS))

def _download(url: str, dest: Path, timeout=90):
    import requests
    dest.parent.mkdir(parents=True, exist_ok=True)
    headers = {"User-Agent": "nhanes-fetch/1.0"}
    with requests.get(url, headers=headers, stream=True, timeout=timeout) as r:
        r.raise_for_status()
        tmp = dest.with_suffix(dest.suffix + ".downloading")
        with open(tmp, "wb") as f:
            for chunk in r.iter_content(1 << 15):
                if chunk:
                    f.write(chunk)
        tmp.rename(dest)
    return dest

def _read_xpt(p: Path) -> pd.DataFrame:
    try:
        import pyreadstat
        df, _ = pyreadstat.read_xport(p)
    except Exception:
        df = pd.read_sas(p, format="xport")
    df.columns = [c.upper() for c in df.columns]
    return df

def _ensure_demo_sex(cfg) -> pd.DataFrame:
    demo_path = None
    for cand in [
        getattr(cfg, "demo_9923", None),
        getattr(cfg, "demo_9918", None),
        cfg.interim_dir / "demo_9923.parquet",
        cfg.interim_dir / "demo_9918.parquet",
    ]:
        if cand and Path(cand).exists():
            demo_path = Path(cand); break
    if demo_path:
        demo = _read_any(demo_path)
        demo = upper_df(demo)
        if "RIAGENDR" in demo.columns:
            return demo[["SEQN", "RIAGENDR"]].drop_duplicates("SEQN")

    DEMO_URLS = {
        "1999-2000": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/1999/DataFiles/DEMO.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/1999-2000/DEMO.XPT",
        ],
        "2001-2002": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2001/DataFiles/DEMO_B.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2001-2002/DEMO_B.XPT",
        ],
        "2003-2004": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2003/DataFiles/DEMO_C.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2003-2004/DEMO_C.XPT",
        ],
        "2005-2006": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2005/DataFiles/DEMO_D.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2005-2006/DEMO_D.XPT",
        ],
        "2007-2008": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2007/DataFiles/DEMO_E.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2007-2008/DEMO_E.XPT",
        ],
        "2009-2010": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2009/DataFiles/DEMO_F.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2009-2010/DEMO_F.XPT",
        ],
        "2011-2012": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2011/DataFiles/DEMO_G.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2011-2012/DEMO_G.XPT",
        ],
        "2013-2014": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2013/DataFiles/DEMO_H.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2013-2014/DEMO_H.XPT",
        ],
        "2015-2016": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2015/DataFiles/DEMO_I.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2015-2016/DEMO_I.XPT",
        ],
        "2017-2018": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2017/DataFiles/DEMO_J.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2017-2018/DEMO_J.XPT",
        ],
        "2017-March 2020 (pre-pandemic)": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2017/DataFiles/P_DEMO.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2017-2020/P_DEMO.XPT",
        ],
        "August 2021–August 2023": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2021/DataFiles/DEMO_L.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2021-2023/DEMO_L.XPT",
        ],
    }
    store = Path(cfg.interim_dir)
    parts = []
    for cyc, urls in DEMO_URLS.items():
        got = None
        for u in urls:
            try:
                dst = store / Path(u).name
                if not dst.exists():
                    print(f"⬇️ DEMO {cyc} → {dst.name}")
                    _download(u, dst)
                got = dst; break
            except Exception as e:
                print("  ⚠️", e)
        if got is None: continue
        df = _read_xpt(got)
        if {"SEQN", "RIAGENDR"}.issubset(df.columns):
            parts.append(df[["SEQN", "RIAGENDR"]])
    if not parts:
        raise RuntimeError("Could not build minimal DEMO (RIAGENDR).")
    demo = pd.concat(parts, ignore_index=True).drop_duplicates("SEQN")
    out_demo = store / "demo_riagendr_min.parquet"
    demo.to_parquet(out_demo, index=False)
    return demo

def _download_and_stack_alq(cfg) -> pd.DataFrame:
    ALC_STORE = Path(cfg.interim_dir) / "alcohol"
    ALC_STORE.mkdir(parents=True, exist_ok=True)
    ALQ_URLS = {
        "1999-2000": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/1999/DataFiles/ALQ.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/1999-2000/ALQ.XPT",
        ],
        "2001-2002": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2001/DataFiles/ALQ_B.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2001-2002/ALQ_B.XPT",
        ],
        "2003-2004": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2003/DataFiles/ALQ_C.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2003-2004/ALQ_C.XPT",
        ],
        "2005-2006": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2005/DataFiles/ALQ_D.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2005-2006/ALQ_D.XPT",
        ],
        "2007-2008": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2007/DataFiles/ALQ_E.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2007-2008/ALQ_E.XPT",
        ],
        "2009-2010": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2009/DataFiles/ALQ_F.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2009-2010/ALQ_F.XPT",
        ],
        "2011-2012": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2011/DataFiles/ALQ_G.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2011-2012/ALQ_G.XPT",
        ],
        "2013-2014": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2013/DataFiles/ALQ_H.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2013-2014/ALQ_H.XPT",
        ],
        "2015-2016": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2015/DataFiles/ALQ_I.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2015-2016/ALQ_I.XPT",
        ],
        "2017-2018": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2017/DataFiles/ALQ_J.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2017-2018/ALQ_J.XPT",
        ],
        "2017-March 2020 (pre-pandemic)": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2017/DataFiles/P_ALQ.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2017-2020/P_ALQ.XPT",
        ],
        "August 2021–August 2023": [
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2021/DataFiles/ALQ_L.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2021-2023/ALQ_L.XPT",
            "https://wwwn.cdc.gov/Nchs/Data/Nhanes/Public/2021/DataFiles/ALQ_Q.xpt",
            "https://wwwn.cdc.gov/Nchs/Nhanes/2021-2023/ALQ_Q.XPT",
        ],
    }
    parts = []
    for cycle, urls in ALQ_URLS.items():
        got = None
        for u in urls:
            try:
                dst = ALC_STORE / Path(u).name
                if not dst.exists():
                    print(f"⬇️ ALQ {cycle} → {dst.name}")
                    _download(u, dst)
                got = dst; break
            except Exception as e:
                print("  ⚠️", e)
        if got is None: continue
        df = _read_xpt(got)
        df["CYCLE"] = cycle
        keep = [c for c in ["SEQN", "CYCLE", "ALQ110", "ALQ151", "ALQ120Q", "ALQ120U", "ALQ130"] if c in df.columns]
        parts.append(df[keep])
    if not parts:
        raise RuntimeError("No ALQ data available (download failed).")
    alq = pd.concat(parts, ignore_index=True)
    stacked = Path(cfg.interim_dir) / "alq_9923.parquet"
    stacked.parent.mkdir(parents=True, exist_ok=True)
    alq.to_parquet(stacked, index=False)
    return alq

def _drinks_per_day_from_alq(alq: pd.DataFrame) -> pd.Series:
    d = upper_df(alq)
    count = _clean_num(d.get("ALQ120Q", pd.Series(np.nan, index=d.index)))
    unit  = d.get("ALQ120U", pd.Series(np.nan, index=d.index))
    per_year = pd.Series(np.nan, index=d.index, dtype="float")
    per_year = per_year.where(~(unit == 1), 365.0)
    per_year = per_year.where(~(unit == 2), 52.142)
    per_year = per_year.where(~(unit == 3), 12.0)
    per_year = per_year.where(~(unit == 4), 1.0)
    occasions_per_year = count * per_year
    drinks_per_occasion = _clean_num(d.get("ALQ130", pd.Series(np.nan, index=d.index)))
    dpd = (occasions_per_year * drinks_per_occasion) / 365.0
    return dpd.where(occasions_per_year.notna() & drinks_per_occasion.notna())

def _categorize_alcohol(dpd: pd.Series, sex: pd.Series, lifetime_lt12: pd.Series | None) -> pd.Series:
    dpd  = pd.to_numeric(dpd, errors="coerce").reset_index(drop=True)
    sexM = pd.to_numeric(sex, errors="coerce").map({1: "M", 2: "F"}).astype("string").reset_index(drop=True)
    if lifetime_lt12 is None:
        life = pd.Series(pd.NA, index=dpd.index)
    else:
        life = pd.to_numeric(lifetime_lt12, errors="coerce").reset_index(drop=True)
    none_mask     = dpd.isna() | (dpd < 0.03) | (life == 1)
    heavy_mask    = ((sexM == "F") & (dpd >= 1.0)) | ((sexM == "M") & (dpd >= 2.0))
    moderate_mask = (~none_mask) & (~heavy_mask)
    cat = pd.Series("NONE", index=dpd.index, dtype="string")
    cat.loc[moderate_mask] = "MODERATE"
    cat.loc[heavy_mask]    = "HEAVY"
    return pd.Categorical(cat, categories=["NONE", "MODERATE", "HEAVY"], ordered=True)

def build_alc(cfg: "Config" = CONFIG, allow_fetch: bool = True) -> pd.DataFrame:
    ensure_dir(cfg.out_dir)
    out_path = Path(cfg.out_dir) / cfg.cov_alc
    if out_path.exists():
        return pd.read_parquet(out_path)

    alq_path = pick_first_existing(
        Path(cfg.interim_dir) / "alq_9923.parquet",
        Path(cfg.interim_dir) / "alq_9918.parquet",
    )
    if alq_path:
        alq = _read_any(alq_path)
    else:
        if not allow_fetch:
            raise FileNotFoundError("ALQ stack not found (alq_9923 / alq_9918). Set allow_fetch=True to download from CDC.")
        alq = _download_and_stack_alq(cfg)

    alq = upper_df(alq)
    dpd = _drinks_per_day_from_alq(alq)

    life = None
    if "ALQ110" in alq.columns:
        life = (pd.to_numeric(alq["ALQ110"], errors="coerce") == 2).astype("Int8")
    elif "ALQ151" in alq.columns:
        life = (pd.to_numeric(alq["ALQ151"], errors="coerce") == 2).astype("Int8")

    demo = _ensure_demo_sex(cfg)
    demo = upper_df(demo).drop_duplicates("SEQN")
    sex = demo.set_index("SEQN").reindex(alq["SEQN"])["RIAGENDR"]

    alc_cat = _categorize_alcohol(dpd, sex=sex, lifetime_lt12=life)

    out = pd.DataFrame({
        "SEQN": alq["SEQN"].reset_index(drop=True),
        "DRINKS_PER_DAY": pd.to_numeric(dpd, errors="coerce").reset_index(drop=True),
        "ALCOHOL_CAT": alc_cat,
    })
    out.to_parquet(out_path, index=False)
    log(f"✓ ALC → {out_path}")
    return out
# ---------- end alcohol builder ----------


## Physical Activity 99-23

In [4]:
def derive_ltpa_from_paq(paq: pd.DataFrame) -> pd.DataFrame:
    """
    Output:
      LTPA (MET-h/week), METSCORE (MET-min/week), IMP (0/1)

    Handles:
      • 2007–2018 & P files: PAQ650/665 (yes/no), PAQ655/670 (days), PAD660/675 (minutes)
      • 2021–2023 (PAQ_L): PAD790Q/U + PAD800 (moderate), PAD810Q/U + PAD820 (vigorous)
    """
    paq = paq.copy()
    paq.columns = [c.upper() for c in paq.columns]
    out = pd.DataFrame({"SEQN": paq["SEQN"]})

    def num(col):
        return pd.to_numeric(paq.get(col, np.nan), errors="coerce")
    def clean_days(s):
        return s.mask(s.isin([77, 99]))
    def clean_min(s):
        return s.mask(s.isin([7777, 9999]))

    cols = set(paq.columns)

    # ===== New schema (PAQ_L 2021–2023) =====
    if {"PAD790Q","PAD790U","PAD800","PAD810Q","PAD810U","PAD820"}.issubset(cols):

        def per_week(q_col, u_col):
            q = num(q_col).mask(num(q_col).isin([7777, 9999]))
            u = paq[u_col].astype(str).str.strip().str.upper()
            # D=day, W=week, M=month, Y=year → times per week
            mult = np.where(u=="D", 7.0,
                    np.where(u=="W", 1.0,
                    np.where(u=="M", 52.0/12.0,
                    np.where(u=="Y", 52.0, np.nan))))
            return q * pd.to_numeric(mult, errors="coerce")

        mod_week = per_week("PAD790Q","PAD790U")
        vig_week = per_week("PAD810Q","PAD810U")
        mod_min_occ = clean_min(num("PAD800"))
        vig_min_occ = clean_min(num("PAD820"))

        mod_met_min = 4.0 * mod_week.fillna(0) * mod_min_occ.fillna(0)
        vig_met_min = 8.0 * vig_week.fillna(0) * vig_min_occ.fillna(0)
        total_met_min = mod_met_min + vig_met_min

        has_mod = mod_week.notna() & mod_min_occ.notna()
        has_vig = vig_week.notna() & vig_min_occ.notna()
        has_any = has_mod | has_vig

        zero_mod = (num("PAD790Q") == 0)
        zero_vig = (num("PAD810Q") == 0)
        true_zero = (zero_mod & zero_vig)

        total_met_min = total_met_min.where(has_any, np.nan).where(~true_zero, 0)

        imp = pd.Series(0, index=paq.index, dtype="int8")
        imp[(~has_any) & (~true_zero)] = 1

    # ===== Old schema (2007–2018 & P files) =====
    else:
        any_vig = num("PAQ650")  # 1 yes / 2 no
        any_mod = num("PAQ665")  # 1 yes / 2 no

        vig_days = clean_days(num("PAQ655"))
        vig_min  = clean_min(num("PAD660"))
        mod_days = clean_days(num("PAQ670"))
        mod_min  = clean_min(num("PAD675"))

        vig_met_min = 8.0 * vig_days.fillna(0) * vig_min.fillna(0)
        mod_met_min = 4.0 * mod_days.fillna(0) * mod_min.fillna(0)
        total_met_min = vig_met_min + mod_met_min

        has_any = ((vig_days.notna() & vig_min.notna()) | (mod_days.notna() & mod_min.notna()))
        both_no = (any_vig == 2) & (any_mod == 2)

        total_met_min = total_met_min.where(has_any, np.nan).where(~both_no, 0)

        imp = pd.Series(0, index=paq.index, dtype="int8")
        imp[(~has_any) & (~both_no)] = 1

    out["LTPA"] = (total_met_min / 60.0).clip(lower=0)       # MET-h/week
    out["METSCORE"] = total_met_min.clip(lower=0).round(0)   # MET-min/week
    out["IMP"] = imp
    return out


In [14]:
from pathlib import Path

def ensure_totalpa_9923(cfg: Config, overwrite: bool = False) -> Path:
    """
    Ensure {cfg.interim_dir}/totalpa_9923_imputed.(parquet|csv) exists.
    Uses your 9918→9923 appender if needed; writes both CSV and Parquet.
    Returns the path to the Parquet file.
    """
    idir = Path(cfg.interim_dir)
    pq  = idir / "totalpa_9923_imputed.parquet"
    csv = idir / "totalpa_9923_imputed.csv"
    idir.mkdir(parents=True, exist_ok=True)

    if pq.exists() and not overwrite:
        return pq
    if csv.exists() and not overwrite:
        # also create parquet for faster loads
        pd.read_csv(csv).to_parquet(pq, index=False)
        return pq

    # ---- build using your make_9923_from_9918() ----
    old_file = idir / "totalpa_9918_imputed.csv"
    if not old_file.exists():
        raise FileNotFoundError(f"Missing {old_file}. Provide your 1999–2018 PA stack first.")

    # reuse your exact function names/logic
    new_df = make_9923_from_9918(str(old_file), str(csv))
    # normalize SEQN dtype & write parquet
    if "SEQN" in new_df.columns:
        new_df["SEQN"] = pd.to_numeric(new_df["SEQN"], errors="coerce").astype("Int64")
    new_df.to_parquet(pq, index=False)
    print(f"✓ Wrote {pq}")
    return pq


In [20]:
def build_pa(cfg: Config = CONFIG) -> pd.DataFrame:
    """
    Build PA covariates from totalpa_9923_imputed (or 9918 + 19–23 append).
    Outputs only SEQN, LTPA, METSCORE, IMP.
    Prefers CSV because some Parquet dumps are stubs without LTPA/METSCORE.
    """
    ensure_dir(cfg.out_dir)

    # --- prefer CSV first; then Parquet; then legacy 9918 if provided
    src = pick_first_existing(
        Path(cfg.interim_dir) / "totalpa_9923_imputed.csv",
        Path(cfg.interim_dir) / "totalpa_9923_imputed.parquet",
        getattr(cfg, "pa_9918_imputed", None),
    )
    if src is None:
        raise FileNotFoundError(
            f"PA source not found under {cfg.interim_dir} "
            "(need totalpa_9923_imputed.csv or .parquet; or set CONFIG.pa_9918_imputed)."
        )

    pa = read_any(src)
    pa = upper_df(pa)
    if "SEQN" not in pa.columns:
        raise KeyError(f"PA file lacks SEQN: {src}")

    # --- flexible column picking (handles different headers)
    def pick_one(cands):
        for c in pa.columns:
            if c.upper() in cands:
                return c
        return None

    c_ltpa = pick_one({"LTPA", "LTPA_MET_HR_WK", "LTPA_MET_HOURS_WEEK"})
    c_mets = pick_one({"METSCORE", "MET_MIN_WEEK", "MET_MIN_WK", "METMINWEEK"})
    c_imp  = pick_one({"IMP", "IMPUTED", "PA_IMPUTED_FLAG", "LTPA_IMPUTED_FLAG", "IMPUTED_FLAG"})

    # If neither metric exists, fail loudly
    if c_ltpa is None and c_mets is None:
        raise ValueError(
            f"PA file {src} has no LTPA/METSCORE columns. "
            f"Columns present include: {list(pa.columns)[:20]} ..."
        )

    out = pd.DataFrame({
        "SEQN": pd.to_numeric(pa["SEQN"], errors="coerce").astype("Int64")
    })

    if c_ltpa:
        out["LTPA"] = pd.to_numeric(pa[c_ltpa], errors="coerce")
    else:
        out["LTPA"] = np.nan

    if c_mets:
        out["METSCORE"] = pd.to_numeric(pa[c_mets], errors="coerce")
    else:
        # derive when only LTPA present (MET-hours → MET-min)
        out["METSCORE"] = out["LTPA"] * 60.0

    if c_imp:
        out["IMP"] = pd.to_numeric(pa[c_imp], errors="coerce").fillna(0).astype("Int8")
    else:
        out["IMP"] = 0

    # one row per SEQN
    out = (out.groupby("SEQN", as_index=False)
              .agg({"LTPA": "max", "METSCORE": "max", "IMP": "max"}))

    # sanity: catch the “all NA” case (usually wrong source file)
    if out[["LTPA", "METSCORE"]].isna().all().all():
        raise RuntimeError(
            f"All PA values are NA from source {src}. "
            "Likely a stub Parquet — use the CSV or rebuild the 99–23 file."
        )

    outp = Path(cfg.out_dir) / cfg.cov_pa
    out.to_parquet(outp, index=False)
    log(f"✓ PA  → {outp} (source: {Path(src).name})")
    return out


In [21]:
# rebuild PA then core
build_pa(CONFIG)
build_core(CONFIG)

core = pd.read_parquet(CONFIG.out_dir / CONFIG.cov_core)
print((core[["LTPA","METSCORE"]].isna().mean()*100).round(2))  # % NA should not be ~100%


✓ PA  → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_pa_1999_2023.parquet (source: totalpa_9923_imputed.csv)
✓ CORE → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_core_1999_2023.parquet
LTPA        43.46
METSCORE    43.46
dtype: float64


#### preview PA

In [22]:
# quick peek
pa = pd.read_parquet(Path(CONFIG.out_dir) / CONFIG.cov_pa)
display(pa.head(10))
print(pa.isna().mean())

Unnamed: 0,SEQN,LTPA,METSCORE,IMP
0,2,0.0,60.0,1
1,5,41.066667,1920.0,1
2,7,3.033333,0.0,1
3,10,0.0,8160.0,1
4,12,5.6,0.0,1
5,13,0.0,300.0,1
6,14,30.8,10320.0,1
7,15,38.577778,1680.0,1
8,16,48.533333,5040.0,1
9,20,0.466667,0.0,1


SEQN        0.000000
LTPA        0.001358
METSCORE    0.001358
IMP         0.000000
dtype: float64


In [30]:
# check SEQN range 
print(pa.sort_values("SEQN")[["SEQN"]].head(5))
print(pa.sort_values("SEQN")[["SEQN"]].tail(5))

   SEQN
0     2
1     5
2     7
3    10
4    12
         SEQN
72922  142305
72923  142307
72924  142308
72925  142309
72926  142310


## BMX 

In [23]:

def build_bmx(cfg: Config = CONFIG) -> pd.DataFrame:
    """Load anthropometrics (BMX) and compute BMI when missing."""
    ensure_dir(cfg.out_dir)

    def _read_any(p: Path) -> pd.DataFrame:
        return pd.read_parquet(p) if str(p).lower().endswith(".parquet") else pd.read_csv(p, low_memory=False)

    if cfg.bmx_9923 and Path(cfg.bmx_9923).exists():
        src = Path(cfg.bmx_9923)
    else:
        src = pick_first_existing(
            cfg.interim_dir / "bmx_9923.parquet",
            cfg.interim_dir / "bmx_9923.csv",
            cfg.interim_dir / "bmx_9918.parquet",
            cfg.interim_dir / "bmx_9918.csv",
        )
    if src is None:
        raise FileNotFoundError("BMX not found. Put bmx_9923/bmx_9918 under interim, or set CONFIG.bmx_9923.")

    bmx = upper_df(_read_any(src))
    for need in ["SEQN", "BMXWT", "BMXHT"]:
        if need not in bmx.columns:
            raise ValueError(f"BMX table missing required column: {need}")

    def nan_series(df): 
        return pd.Series(np.nan, index=df.index, dtype="float")

    bmi_src = pd.to_numeric(bmx["BMXBMI"], errors="coerce") if "BMXBMI" in bmx.columns else nan_series(bmx)
    wt = pd.to_numeric(bmx["BMXWT"], errors="coerce")
    ht_cm = pd.to_numeric(bmx["BMXHT"], errors="coerce")

    bmi = bmi_src.copy()
    missing = bmi.isna()
    if missing.any():
        bmi.loc[missing] = wt.loc[missing] / (ht_cm.loc[missing] / 100.0) ** 2

    out = pd.DataFrame({
        "SEQN": bmx["SEQN"],
        "BMXWT": wt,
        "BMXHT": ht_cm,
        "BMI": pd.to_numeric(bmi, errors="coerce"),
    })

    outp = cfg.out_dir / cfg.cov_bmx
    out.to_parquet(outp, index=False)
    log(f"✓ BMX → {outp} (source: {src.name})")
    return out


In [24]:

@dataclass
class ClinicalThresholds:
    htn_sbp: float = 140.0
    htn_dbp: float = 90.0
    a1c_diabetes: float = 6.5
    fpg_diabetes: float = 126.0  # mg/dL

THR = ClinicalThresholds()

def build_clinical(cfg: Config = CONFIG, thr: ClinicalThresholds = THR) -> pd.DataFrame:
    ensure_dir(cfg.out_dir)

    clin_9923 = pick_first_existing(cfg.interim_dir / "clinical_9923.parquet", cfg.interim_dir / "clinical_9923.csv")
    if clin_9923:
        clin = read_any(clin_9923)
    else:
        p9918 = pick_first_existing(
            cfg.clinical_9918,
            cfg.interim_dir / "clinical_9918.parquet",
            cfg.interim_dir / "clinical_9918.csv",
            cfg.interim_dir / "nhanes_primary_anal_full_singleimputation_v2.parquet",
            cfg.interim_dir / "nhanes_primary_anal_full_singleimputation_v2.csv",
        )
        p1923 = pick_first_existing(cfg.clinical_1923, cfg.interim_dir / "clinical_1923.parquet", cfg.interim_dir / "clinical_1923.csv")
        if p9918 is None:
            raise FileNotFoundError("Provide clinical_9923 or clinical_9918 under interim.")
        clin = read_any(p9918)
        if p1923:
            clin = pd.concat([clin, read_any(p1923)], ignore_index=True)

    clin = upper_df(clin)

    # Derive BMI_CLAS if missing
    if "BMI_CLAS" not in clin.columns:
        bmi_src = None
        bmx_path = cfg.out_dir / cfg.cov_bmx
        if bmx_path.exists():
            bmx = upper_df(pd.read_parquet(bmx_path))
            if {"SEQN", "BMI"}.issubset(bmx.columns) and "SEQN" in clin.columns:
                bmi_src = clin["SEQN"].map(bmx.set_index("SEQN")["BMI"]).astype(float)
        if bmi_src is None:
            bmi_src = pd.to_numeric(clin.get("BMI", np.nan), errors="coerce")

        def bmi_class(x):
            if pd.isna(x): return pd.NA
            if x < 18.5:  return "UNDER"
            if x < 25:    return "NORMAL"
            if x < 30:    return "OVER"
            return "OBESE"

        clin["BMI_CLAS"] = pd.Series([bmi_class(v) for v in bmi_src], dtype="string")

    # Derive HTN if missing
    if "HTN" not in clin.columns:
        sbp = pd.to_numeric(clin.get("SBP", np.nan), errors="coerce")
        dbp = pd.to_numeric(clin.get("DBP", np.nan), errors="coerce")
        diag_col = next((c for c in clin.columns if (("HTN" in c or "HYPERT" in c) and "MED" not in c and c != "HTN")), None)
        med_col  = next((c for c in clin.columns if ("MED" in c and ("BP" in c or "HYPER" in c))), None)
        htn = pd.Series(0, index=clin.index, dtype="Int8")
        if diag_col:
            diag = pd.to_numeric(clin[diag_col], errors="coerce")
            htn = ((diag == 1) | (diag > 0)).astype("Int8")
        if med_col:
            med = pd.to_numeric(clin[med_col], errors="coerce")
            htn = ((htn == 1) | (med == 1) | (med > 0)).astype("Int8")
        htn = ((htn == 1) | (sbp >= thr.htn_sbp) | (dbp >= thr.htn_dbp)).astype("Int8")
        clin["HTN"] = htn

    # Derive HIGH_CHOL if missing
    if "HIGH_CHOL" not in clin.columns:
        tch = pd.to_numeric(clin.get("TCHOL", np.nan), errors="coerce")
        ldl = pd.to_numeric(clin.get("LDL", np.nan), errors="coerce")
        med_col = next((c for c in clin.columns if ("CHOL" in c and "MED" in c)), None)
        high = ((tch >= 240) | (ldl >= 160)).astype("Int8")
        if med_col:
            med = pd.to_numeric(clin[med_col], errors="coerce")
            high = ((high == 1) | (med == 1) | (med > 0)).astype("Int8")
        clin["HIGH_CHOL"] = high

    keep = ["SEQN", "BMI_CLAS", "DIABETES", "HTN", "HIGH_CHOL", "CVD", "CANCER", "SBP", "DBP", "TCHOL", "HDL", "LDL", "TG"]
    for k in keep:
        if k not in clin.columns:
            clin[k] = pd.Series(np.nan, index=clin.index)

    out = clin[keep].copy()
    for b in ["DIABETES", "HTN", "HIGH_CHOL", "CVD", "CANCER"]:
        out[b] = pd.to_numeric(out[b], errors="coerce").astype("Int8")

    outp = cfg.out_dir / cfg.cov_clinical
    out.to_parquet(outp, index=False)
    log(f"✓ CLN → {outp}")
    return out


In [25]:

SURVEY_KEEP = ["SEQN", "SDDSRVYR", "SDMVPSU", "SDMVSTRA", "WTMEC2YR"]

def build_household(cfg: Config = CONFIG) -> pd.DataFrame:
    ensure_dir(cfg.out_dir)
    demo = upper_df(pd.read_parquet(cfg.demo_9923))
    if "DMDHHSIZ" not in demo.columns:
        raise ValueError("DMDHHSIZ not found in DEMO stack.")
    out = demo[["SEQN", "DMDHHSIZ"]].drop_duplicates("SEQN")
    outp = cfg.out_dir / cfg.cov_household
    out.to_parquet(outp, index=False)
    log(f"✓ HH  → {outp}")
    return out

def get_survey_core(cfg: Config = CONFIG) -> pd.DataFrame:
    demo_p = (cfg.demo_9923 if cfg.demo_9923 and Path(cfg.demo_9923).exists()
              else cfg.demo_9918 if cfg.demo_9918 and Path(cfg.demo_9918).exists()
              else pick_first_existing(cfg.interim_dir / "demo_9923.parquet",
                                       cfg.interim_dir / "demo_9918.parquet"))
    if demo_p is None:
        raise FileNotFoundError("Could not find DEMO table.")
    demo = upper_df(pd.read_parquet(demo_p))
    miss = [c for c in SURVEY_KEEP if c not in demo.columns]
    if miss:
        raise ValueError(f"Missing survey fields in DEMO: {miss}")
    return demo[SURVEY_KEEP].drop_duplicates("SEQN").copy()


## merge all to one core file

In [26]:
def build_core(cfg: Config = CONFIG) -> pd.DataFrame:
    ensure_dir(cfg.out_dir)

    survey = upper_df(get_survey_core(cfg))
    smk    = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_smk))
    alc    = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_alc))
    pa     = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_pa))
    bmx    = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_bmx))
    clin   = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_clinical))
    hh     = upper_df(pd.read_parquet(cfg.out_dir / cfg.cov_household))

    # ensure SEQN types align
    for df in (survey, smk, alc, pa, bmx, clin, hh):
        df["SEQN"] = pd.to_numeric(df["SEQN"], errors="coerce").astype("Int64")

    # select only non-overlapping columns from each piece
    smk  = smk[["SEQN", "SMK_STATUS", "CIGS_PER_DAY", "PACK_YEARS", "FORMER_SMOKER"]]
    alc  = alc[["SEQN", "DRINKS_PER_DAY", "ALCOHOL_CAT"]]
    pa   = pa[["SEQN", "LTPA", "METSCORE", "IMP"]]
    bmx  = bmx[["SEQN", "BMXWT", "BMXHT", "BMI"]]
    clin = clin[["SEQN", "BMI_CLAS", "DIABETES", "HTN", "HIGH_CHOL", "CVD", "CANCER",
                 "SBP", "DBP", "TCHOL", "HDL", "LDL", "TG"]]
    hh   = hh[["SEQN", "DMDHHSIZ"]]

    # merge with parentheses (no backslashes)
    core = (
        survey
        .merge(smk,  on="SEQN", how="left")
        .merge(alc,  on="SEQN", how="left")
        .merge(pa,   on="SEQN", how="left")
        .merge(bmx,  on="SEQN", how="left")
        .merge(clin, on="SEQN", how="left")
        .merge(hh,   on="SEQN", how="left")
    )

    # safety check for accidental duplicates
    if core.columns.duplicated().any():
        dups = core.columns[core.columns.duplicated()].tolist()
        raise RuntimeError(f"Duplicate columns after merge: {dups}")

    outp = cfg.out_dir / cfg.cov_core
    core.to_parquet(outp, index=False)
    log(f"✓ CORE → {outp}")
    return core


In [27]:

from typing import Dict

def run_all(cfg: Config = CONFIG) -> Dict[str, pd.DataFrame]:
    ensure_dir(cfg.out_dir)
    out: Dict[str, pd.DataFrame] = {}
    out["smk"]      = build_smk(cfg)
    out["alc"]      = build_alc(cfg)
    out["pa"]       = build_pa(cfg)
    out["bmx"]      = build_bmx(cfg)
    out["clinical"] = build_clinical(cfg)
    out["household"]= build_household(cfg)
    out["core"]     = build_core(cfg)
    return out

def quick_checks(cfg: Config = CONFIG) -> pd.Series:
    core = pd.read_parquet(cfg.out_dir / cfg.cov_core)
    checks = {
        "n_rows": int(len(core)),
        "n_unique_seqn": int(core["SEQN"].nunique()),
        "LTPA %NA": float(core["LTPA"].isna().mean()*100),
        "METSCORE %NA": float(core["METSCORE"].isna().mean()*100),
        "ALCOHOL_CAT %NA": float(core["ALCOHOL_CAT"].isna().mean()*100),
        "SMK_STATUS %NA": float(core["SMK_STATUS"].isna().mean()*100),
        "has_weights": int("WTMEC2YR" in core.columns),
    }
    return pd.Series(checks).round(2)

# --- set DEMO path and RUN ---
CONFIG.demo_9923 = Path("/Users/dengshuyue/Desktop/SDOH/analysis/data/cov/demo9923.parquet")
assert CONFIG.demo_9923.exists(), "demo9923.parquet not found"

# Optional: clean old core/pa artifacts if they exist
for fn in (CONFIG.cov_core, CONFIG.cov_pa):
    f = Path(CONFIG.out_dir) / fn
    if f.exists():
        f.unlink()

out = run_all(CONFIG)
print("Done. Files in:", CONFIG.out_dir)
print(quick_checks(CONFIG))


✓ SMK → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_smk_1999_2023.parquet (using smk_9918.csv); missing: {'SMK_STATUS': 0.001, 'CIGS_PER_DAY': 0.794, 'PACK_YEARS': 0.78}
✓ PA  → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_pa_1999_2023.parquet (source: totalpa_9923_imputed.csv)
✓ BMX → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_bmx_1999_2023.parquet (source: bmx_9918.csv)
✓ CLN → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_clinical_1999_2023.parquet
✓ HH  → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_household_1999_2023.parquet
✓ CORE → /Users/dengshuyue/Desktop/SDOH/analysis/output/cov_core_1999_2023.parquet
Done. Files in: /Users/dengshuyue/Desktop/SDOH/analysis/output
n_rows             128809.00
n_unique_seqn      128809.00
LTPA %NA               43.46
METSCORE %NA           43.46
ALCOHOL_CAT %NA        46.54
SMK_STATUS %NA         57.29
has_weights             1.00
dtype: float64


## check merged table (cov_core_1999_2023)

In [36]:
from pathlib import Path
import pandas as pd
import pyarrow.parquet as pq

OUT = Path("/Users/dengshuyue/Desktop/SDOH/analysis/output")
p = OUT / "cov_core_1999_2023.parquet"

# schema/meta without loading full table
pf = pq.ParquetFile(p)
print(pf.metadata)          # row count, row groups, created by, etc.
print(pf.schema)            # column names + types

# show a few useful columns
cols = ["SEQN","SDDSRVYR","WTMEC2YR","SMK_STATUS","ALCOHOL_CAT","LTPA","METSCORE",
        "BMXWT","BMXHT","BMI","BMI_CLAS","DIABETES","HTN","HIGH_CHOL","SBP","DBP"]
cols = [c for c in cols if c in pf.schema.names]  # keep only those present
df = pd.read_parquet(p, columns=cols)
display(df.head(20))


<pyarrow._parquet.FileMetaData object at 0x1258e5c10>
  created_by: parquet-cpp-arrow version 21.0.0
  num_columns: 30
  num_rows: 128809
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 14790
<pyarrow._parquet.ParquetSchema object at 0x11433c9c0>
required group field_id=-1 schema {
  optional int64 field_id=-1 SEQN;
  optional double field_id=-1 SDDSRVYR;
  optional double field_id=-1 SDMVPSU;
  optional double field_id=-1 SDMVSTRA;
  optional double field_id=-1 WTMEC2YR;
  optional binary field_id=-1 SMK_STATUS (String);
  optional double field_id=-1 CIGS_PER_DAY;
  optional double field_id=-1 PACK_YEARS;
  optional double field_id=-1 FORMER_SMOKER;
  optional double field_id=-1 DRINKS_PER_DAY;
  optional binary field_id=-1 ALCOHOL_CAT (String);
  optional double field_id=-1 LTPA;
  optional double field_id=-1 METSCORE;
  optional int32 field_id=-1 IMP (Int(bitWidth=8, isSigned=true));
  optional double field_id=-1 BMXWT;
  optional double field_id=-1 BMXHT;
  optional do

Unnamed: 0,SEQN,SDDSRVYR,WTMEC2YR,SMK_STATUS,ALCOHOL_CAT,LTPA,METSCORE,BMXWT,BMXHT,BMI,BMI_CLAS,DIABETES,HTN,HIGH_CHOL,SBP,DBP
0,1,1.0,10982.898896,,,,,12.5,91.6,14.897695,UNDER,0,0,0,91.333333,56.0
1,2,1.0,28325.384898,NEVER,MODERATE,0.0,60.0,75.4,174.0,24.904215,NORMAL,0,0,0,100.666667,56.666667
2,3,1.0,46192.256945,,,,,32.9,136.6,17.631713,UNDER,0,0,0,108.666667,62.0
3,4,1.0,10251.26002,,,,,13.3,,,,0,0,1,95.333333,61.333333
4,5,1.0,99445.065735,FORMER,HEAVY,41.066667,1920.0,92.5,178.3,29.096386,OVER,0,1,1,122.0,82.666667
5,6,1.0,39656.600444,,,,,59.2,162.0,22.557537,NORMAL,0,0,0,114.666667,68.0
6,7,1.0,25525.423409,FORMER,NONE,3.033333,0.0,78.0,162.9,29.393577,OVER,0,0,1,125.333333,80.0
7,8,1.0,31510.587866,,,,,40.7,162.0,15.508307,UNDER,0,0,0,100.666667,49.333333
8,9,1.0,7575.870247,,,,,45.5,156.9,18.482704,UNDER,0,0,0,109.333333,53.333333
9,10,1.0,22445.808572,CURRENT,MODERATE,0.0,8160.0,111.8,190.1,30.936955,OBESE,0,1,0,145.333333,96.0


#### check what cycle are included 

In [37]:
from pathlib import Path
import pandas as pd

OUT = Path("/Users/dengshuyue/Desktop/SDOH/analysis/output")
p = OUT / "cov_core_1999_2023.parquet"

cycles = (
    pd.read_parquet(p, columns=["SDDSRVYR"])   # loads only this column
      .dropna()
      .astype({"SDDSRVYR":"Int64"})            # tidy types
      ["SDDSRVYR"]
      .sort_values()
      .unique()
)
print("Cycles present:", cycles.tolist())


Cycles present: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 66]


#### Preview rows for a specific cycle (fast, column-pruned)

In [43]:
import pyarrow.dataset as ds
import pyarrow.compute as pc
import pandas as pd

cols = ["SEQN","SDDSRVYR","WTMEC2YR","SMK_STATUS","ALCOHOL_CAT","LTPA","METSCORE",
        "BMXWT","BMXHT","BMI","BMI_CLAS","DIABETES","HTN","HIGH_CHOL","SBP","DBP"]

target_cycle = 66  # ← change this to whatever cycle you want

dataset = ds.dataset(str(p))  # single-file dataset is fine
table = dataset.to_table(
    filter = pc.field("SDDSRVYR") == target_cycle,
    columns = [c for c in cols if c in dataset.schema.names],
)
df_cycle = table.to_pandas()

print(f"Cycle {target_cycle}: rows={len(df_cycle)}")
display(df_cycle.head(20))

## currently extend PA to 2023 
## also need to extend all other cov to 2023 later by the same method 
 


Cycle 66: rows=15560


Unnamed: 0,SEQN,SDDSRVYR,WTMEC2YR,SMK_STATUS,ALCOHOL_CAT,LTPA,METSCORE,BMXWT,BMXHT,BMI,BMI_CLAS,DIABETES,HTN,HIGH_CHOL,SBP,DBP
0,109263,66.0,,,,,,,,,,,,,,
1,109264,66.0,,,,,,,,,,,,,,
2,109265,66.0,,,,,,,,,,,,,,
3,109266,66.0,,,NONE,48.0,2880.0,,,,,,,,,
4,109267,66.0,,,,72.0,4320.0,,,,,,,,,
5,109268,66.0,,,,0.0,0.0,,,,,,,,,
6,109269,66.0,,,,,,,,,,,,,,
7,109270,66.0,,,,,,,,,,,,,,
8,109271,66.0,,,NONE,0.0,0.0,,,,,,,,,
9,109272,66.0,,,,,,,,,,,,,,


#### Notes 
- currently extend PA to 2023 
- also need to extend all other cov to 2023 later by the same method 
 

#### check PA by cycle 

In [33]:
# check PA by cycle 
# 66 is 17-20

from pathlib import Path
import pandas as pd

core = pd.read_parquet(Path(CONFIG.out_dir) / CONFIG.cov_core)

# All cycles present
by_cycle = core.groupby("SDDSRVYR").size().rename("rows").sort_index()
print("All cycles in CORE:\n", by_cycle)

# Cycles with PA available (LTPA non-missing)
pa_cycles = (core[core["LTPA"].notna()]
             .groupby("SDDSRVYR").size().rename("rows_with_pa").sort_index())
print("\nCycles with PA (non-missing LTPA):\n", pa_cycles)

# Coverage (% rows with PA per cycle)
cov = (pa_cycles / by_cycle * 100).round(1).rename("pa_coverage_pct")
print("\nPA coverage by cycle (% of rows with LTPA):\n", cov.fillna(0))


All cycles in CORE:
 SDDSRVYR
1.0      9965
2.0     11039
3.0     10122
4.0     10348
5.0     10149
6.0     10537
7.0      9756
8.0     10175
9.0      9971
10.0     9254
12.0    11933
66.0    15560
Name: rows, dtype: int64

Cycles with PA (non-missing LTPA):
 SDDSRVYR
1.0     4880
2.0     5411
3.0     5041
4.0     4979
5.0     5935
6.0     6218
7.0     5560
8.0     5769
9.0     5719
10.0    5569
12.0    8070
66.0    9677
Name: rows_with_pa, dtype: int64

PA coverage by cycle (% of rows with LTPA):
 SDDSRVYR
1.0     49.0
2.0     49.0
3.0     49.8
4.0     48.1
5.0     58.5
6.0     59.0
7.0     57.0
8.0     56.7
9.0     57.4
10.0    60.2
12.0    67.6
66.0    62.2
Name: pa_coverage_pct, dtype: float64


#### check PA missing by age 

In [34]:
# check PA missing by age 
import pandas as pd
from pathlib import Path

core = pd.read_parquet(Path(CONFIG.out_dir) / CONFIG.cov_core)
demo = pd.read_parquet(CONFIG.demo_9923)[["SEQN","RIDAGEYR","SDDSRVYR"]]

# attach age
core = core.merge(demo, on=["SEQN","SDDSRVYR"], how="left")

def coverage(df, label):
    by_cycle = df.groupby("SDDSRVYR").size().rename("rows")
    with_pa  = df[df["LTPA"].notna()].groupby("SDDSRVYR").size().rename("rows_with_pa")
    cov = (with_pa / by_cycle * 100).round(1).rename(f"pa_cov_pct_{label}")
    return pd.concat([by_cycle, with_pa, cov], axis=1).fillna(0).sort_index()

print("All ages:\n", coverage(core, "all"))

# Typical PAQ eligibility (NHANES self-report) is ≥12; many analyses use ≥18
print("\nAge ≥12:\n", coverage(core[core["RIDAGEYR"] >= 12], "age12+"))
print("\nAge ≥18:\n", coverage(core[core["RIDAGEYR"] >= 18], "age18+"))


All ages:
            rows  rows_with_pa  pa_cov_pct_all
SDDSRVYR                                     
1.0        9965          4880            49.0
2.0       11039          5411            49.0
3.0       10122          5041            49.8
4.0       10348          4979            48.1
5.0       10149          5935            58.5
6.0       10537          6218            59.0
7.0        9756          5560            57.0
8.0       10175          5769            56.7
9.0        9971          5719            57.4
10.0       9254          5569            60.2
12.0      11933          8070            67.6
66.0      15560          9677            62.2

Age ≥12:
            rows  rows_with_pa  pa_cov_pct_age12+
SDDSRVYR                                        
1.0        7295          4880               66.9
2.0        7898          5411               68.5
3.0        7344          5041               68.6
4.0        7267          4979               68.5
5.0        7173          5935           