# KNHANES 2014–2017: Data Prep Pipeline (ALL ↔ PAM)

본 노트북은 분석 재현을 위해 **수집→정제→변환→병합** 4단계 파이프라인을
Colab/로컬 공용으로 제공합니다. 각 단계는 실행 시 **shape/columns 로그**를 출력합니다.

**산출물**
- `csv/2014_all.csv.gz` … `csv/2017_all.csv.gz` (존재 시 재생성하지 않음)
- `csv/health_2014_2017.csv.gz` (건강 표준 테이블)
- `csv/analysis_ready_expanded.csv.gz` (최종 분석 테이블)

> 참고: PAM 요약(`csv/20xx_pam.csv.gz`)이 없으면 health만 기반으로 저장합니다.


In [9]:
pip install pyreadstat

Collecting pyreadstat
  Downloading pyreadstat-1.3.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (1.2 kB)
Collecting narwhals>=2.0 (from pyreadstat)
  Downloading narwhals-2.2.0-py3-none-any.whl.metadata (11 kB)
Downloading pyreadstat-1.3.1-cp311-cp311-macosx_11_0_arm64.whl (587 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m587.6/587.6 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading narwhals-2.2.0-py3-none-any.whl (401 kB)
Installing collected packages: narwhals, pyreadstat
[2K  Attempting uninstall: narwhals
[2K    Found existing installation: narwhals 1.47.0
[2K    Uninstalling narwhals-1.47.0:
[2K      Successfully uninstalled narwhals-1.47.0
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [pyreadstat]
[1A[2KSuccessfully installed narwhals-2.2.0 pyreadstat-1.3.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mno

## 1) SAS7BDAT → CSV.GZ 변환

- SAS 원본(`hn14_all.sas7bdat` 등)이 있을 때만 변환합니다.
- 이미 `csv/*_all.csv.gz`가 있으면 스킵합니다.


In [None]:
from pathlib import Path
import os, gc
import pandas as pd

# 반드시 설치되어 있어야 합니다 (한 번만)
# pip install pyreadstat
import pyreadstat

ENCODING_TRY = [None, "cp949", "euc-kr", "latin1"]  # 한글 우선, 마지막은 넓은 범위

def read_sas7bdat_robust(sas_path: str):
    """
    pyreadstat로 sas7bdat을 읽되, 인코딩을 바꿔가며 재시도
    """
    last_err = None
    for enc in ENCODING_TRY:
        try:
            df, meta = pyreadstat.read_sas7bdat(sas_path, encoding=enc)
            print(f"  [OK] read_sas7bdat | encoding={enc}")
            return df
        except Exception as e:
            print(f"  [RETRY] encoding={enc} -> {type(e).__name__}: {e}")
            last_err = e
    raise last_err

def convert_sas_to_csv(sas_path: str, out_path: str, keep_cols=None):
    p = Path(sas_path)
    if not p.exists():
        print(f"[SKIP] Not found: {sas_path}")
        return False
    if p.stat().st_size == 0:
        print(f"[ERROR] Empty file: {sas_path}")
        return False

    print(f"\n[READ] {sas_path} (size={p.stat().st_size:,} bytes)")
    try:
        df = read_sas7bdat_robust(sas_path)
    except Exception as e:
        print(f"[ERROR] failed to read: {e}")
        return False

    # 필요한 열만 저장하고 싶으면 keep_cols로 제한
    if keep_cols:
        found = [c for c in keep_cols if c in df.columns]
        if found:
            df = df[found]
        else:
            print("  [WARN] keep_cols가 데이터에 없어 전체 컬럼 저장합니다.")

    print(f"  shape={df.shape} | cols={len(df.columns)}")
    print("  head:\n", df.head(3))

    Path(out_path).parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_path, index=False, compression="gzip")
    print(f"[SAVE] -> {out_path} | rows={len(df)}")
    del df
    gc.collect()
    return True

# ----------------- 실행부 예시 -----------------
YEARS = [2014, 2015, 2016, 2017]
SAS_ALLS = {
    2014: "hn14_all.sas7bdat",
    2015: "hn15_all.sas7bdat",
    2016: "hn16_all.sas7bdat",
    2017: "hn17_all.sas7bdat",
}
SAS_PAMS = {
    2014: "hn14_pam.sas7bdat",
    2015: "hn15_pam.sas7bdat",
    2016: "hn16_pam.sas7bdat",
    2017: "hn17_pam.sas7bdat",
}
PATH_ALLS = {y: f"csv/{y}_all.csv.gz" for y in YEARS}
PATH_PAMS = {y: f"csv/{y}_pam.csv.gz" for y in YEARS}

print("Working dir:", os.getcwd())

# ALL 변환
for y in YEARS:
    convert_sas_to_csv(SAS_ALLS[y], PATH_ALLS[y])

# PAM 변환
for y in YEARS:
    convert_sas_to_csv(SAS_PAMS[y], PATH_PAMS[y])


## 2) ALL → 건강표준 테이블 (정제·변환)

- 컬럼 표준화(MAP_ALL)
- 파생: 비만/고혈압/당뇨, sex_female 등
- 로그: 전체 병합 shape, 저장 경로


In [13]:
def build_health_table():
    parts = []
    for y, p in PATH_ALLS.items():
        if os.path.exists(p):
            d = read_all_standardized(p); parts.append(d)
        else:
            print(f"[WARN] ALL not found, skip: {p}")
    if not parts:
        raise RuntimeError("No ALL files to build health table.")
    all_df = pd.concat(parts, ignore_index=True)
    print("ALL merged shape:", all_df.shape)
    print("ALL columns (head):", list(all_df.columns)[:12])

    dfh = all_df.copy()
    if "sex" in dfh: dfh["sex_female"] = (dfh["sex"] == 2).astype(int)
    if "age" in dfh: dfh["age"] = pd.to_numeric(dfh["age"], errors="coerce")

    # 안전하게 시리즈 생성
    SBP = dfh["SBP"] if "SBP" in dfh else pd.Series(np.nan, index=dfh.index)
    DBP = dfh["DBP"] if "DBP" in dfh else pd.Series(np.nan, index=dfh.index)
    htn_med = dfh["htn_med"] if "htn_med" in dfh else pd.Series(0, index=dfh.index)

    HbA1c = dfh["HbA1c"] if "HbA1c" in dfh else pd.Series(np.nan, index=dfh.index)
    dm_med = dfh["dm_med"] if "dm_med" in dfh else pd.Series(0, index=dfh.index)

    # Outcomes
    dfh["out_obesity"] = (dfh["BMI"] >= 30).astype(int) if "BMI" in dfh else pd.Series(np.nan, index=dfh.index)
    dfh["out_hypertension"] = ((SBP >= 140) | (DBP >= 90) | (htn_med == 1)).astype(int)
    dfh["out_diabetes"] = ((HbA1c >= 6.5) | (dm_med == 1)).astype(int)

    # Behavior
    dfh["smoker"] = (dfh["smoking"] > 0).astype(int) if "smoking" in dfh else pd.Series(np.nan, index=dfh.index)
    dfh["alcohol_freq"] = dfh["alcohol"] if "alcohol" in dfh else pd.Series(np.nan, index=dfh.index)
    if "kcal" in dfh: dfh["kcal"] = pd.to_numeric(dfh["kcal"], errors="coerce")

    keep = ["ID","year","sex_female","age","smoker","alcohol_freq","kcal",
            "BMI","SBP","DBP","HbA1c",
            "out_obesity","out_hypertension","out_diabetes",
            "incm","edu","wt_tot","psu","kstrata"]
    dfh = dfh[[c for c in keep if c in dfh.columns]].dropna(subset=["age"])

    outp = "csv/health_2014_2017.csv.gz"
    dfh.to_csv(outp, index=False, compression="gzip")
    print(f"[Saved] {outp} | shape={dfh.shape}")
    print("Health columns:", list(dfh.columns))
    return dfh


## 3) PAM 요약 불러오기 & 4) 병합

- 개인 단위(PID, year) 요약이 존재하면 병합, 없으면 health만 저장
- 해석 편의를 위한 스케일링: `mvpa10`(10분 단위), `sed10`(10%p 단위)


In [21]:
def read_pam_person():
    items = []
    for y, p in PATH_PAM.items():
        if os.path.exists(p):
            d = pd.read_csv(p)
            ren = {}
            for c in d.columns:
                cl = str(c).lower()
                if cl == "id": ren[c] = "ID"
                if cl == "year": ren[c] = "year"
            if ren: d = d.rename(columns=ren)
            need = ["ID","year","n_days","worn_min_day","mvpa_min_day","sed_ratio"]
            d = d[[c for c in need if c in d.columns]].copy()
            d["ID"] = d["ID"].astype(str)
            items.append(d)
        else:
            print(f"[PAM] 없음: {p}")
    if not items:
        return None
    act = pd.concat(items, ignore_index=True)
    print("PAM person shape:", act.shape)
    print("PAM columns:", list(act.columns))
    uniq = act["worn_min_day"].round(1).nunique() if "worn_min_day" in act else None
    if uniq is not None and uniq <= 2:
        print(f"[Note] worn_min_day 분산 낮음 (unique={uniq})")
    return act

def build_analysis_ready():
    act = read_pam_person()
    hlth = pd.read_csv("csv/health_2014_2017.csv.gz")
    for d in (act, hlth):
        if d is None: continue
        if "id" in d.columns and "ID" not in d.columns:
            d.rename(columns={"id":"ID"}, inplace=True)
        if "year" in d.columns:
            d["year"] = pd.to_numeric(d["year"], errors="coerce").astype("Int64")
        d["ID"] = d["ID"].astype(str)

    if act is not None:
        df = act.merge(hlth, on=["ID","year"], how="inner")
    else:
        print("[Note] PAM 요약 없음 → health만 사용")
        df = hlth.copy()

    if "mvpa_min_day" in df: df["mvpa10"] = df["mvpa_min_day"] / 10.0
    if "sed_ratio" in df:    df["sed10"]  = df["sed_ratio"] * 10.0

    # 가중치/설계 존재 확인
    if "wt_tot" not in df.columns:
        print("[HOTFIX] 가중치 미존재 → wt_tot=1.0 설정")
        df["wt_tot"] = 1.0
    if not {"psu","kstrata"}.issubset(df.columns):
        print("[HOTFIX] psu/kstrata 없음 → 설계부트 불가(후속분석 폴백)")

    outp = "csv/analysis_ready_expanded.csv.gz"
    df.to_csv(outp, index=False, compression="gzip")
    print(f"[Saved] {outp} | shape={df.shape}")
    print("Final columns (head):", list(df.columns)[:20])
    return df

df_final = build_analysis_ready()
df_final.head()

PAM person shape: (28392322, 2)
PAM columns: ['ID', 'year']
[Saved] csv/analysis_ready_expanded.csv.gz | shape=(24259522, 14)
Final columns (head): ['ID', 'year', 'sex_female', 'age', 'smoker', 'alcohol_freq', 'out_obesity', 'out_hypertension', 'out_diabetes', 'incm', 'edu', 'wt_tot', 'psu', 'kstrata']


Unnamed: 0,ID,year,sex_female,age,smoker,alcohol_freq,out_obesity,out_hypertension,out_diabetes,incm,edu,wt_tot,psu,kstrata
0,A209799515,2014,1,64.0,,,,0,0,3.0,3.0,5573.985554,,612.0
1,A209799515,2014,1,64.0,,,,0,0,3.0,3.0,5573.985554,,612.0
2,A209799515,2014,1,64.0,,,,0,0,3.0,3.0,5573.985554,,612.0
3,A209799515,2014,1,64.0,,,,0,0,3.0,3.0,5573.985554,,612.0
4,A209799515,2014,1,64.0,,,,0,0,3.0,3.0,5573.985554,,612.0


In [15]:
dfh = build_health_table()

ALL merged shape: (25131, 9)
ALL columns (head): ['ID', 'year', 'sex', 'age', 'incm', 'edu', 'wt_tot', 'psu', 'kstrata']
[Saved] csv/health_2014_2017.csv.gz | shape=(25131, 14)
Health columns: ['ID', 'year', 'sex_female', 'age', 'smoker', 'alcohol_freq', 'out_obesity', 'out_hypertension', 'out_diabetes', 'incm', 'edu', 'wt_tot', 'psu', 'kstrata']


In [1]:
import os, gc, math
from pathlib import Path
import numpy as np
import pandas as pd

# ========== 유틸 ==========
def _find_col(cols, candidates):
    """후보 목록 중 데이터프레임에 존재하는 첫 컬럼명을 반환(대소문자 무시)"""
    low = {c.lower(): c for c in cols}
    for cand in candidates:
        if cand.lower() in low:
            return low[cand.lower()]
    return None

def _to_num(s):
    return pd.to_numeric(s, errors="coerce")

# ========== 핵심: minute → person 요약 ==========
def build_activity_person(path_pam_map,
                          out_person_path="csv/activity_person_2014_2017.csv.gz",
                          chunk_size=2_000_000,
                          min_wear_per_day=600,   # ≥10h/day
                          min_valid_days=4):      # ≥4일
    """
    Minute-level PAM CSV(.gz)들을 읽어 개인 요약치(n_days, worn_min_day, mvpa_min_day, sed_ratio)로 변환.
    - 입력: path_pam_map = {2014: 'csv/2014_pam.csv.gz', ...}
    - 출력: out_person_path 저장 및 DataFrame 리턴
    """
    Path("csv").mkdir(exist_ok=True)
    per_year_person = []

    for year, path in path_pam_map.items():
        if not os.path.exists(path):
            print(f"[PAM] 파일 없음: {path} (연도 {year}) → 스킵")
            continue

        print(f"\n[PAM] {year} 처리 시작 → {path}")
        # 1) 컬럼 샘플링해서 스키마 파악
        sample = pd.read_csv(path, nrows=50)
        # 기본키
        id_col   = _find_col(sample.columns, ["ID", "id", "Id"])
        year_col = _find_col(sample.columns, ["year", "Year"])
        # 날짜/일자
        date_col = _find_col(sample.columns, ["date", "day", "mod_d", "DATE", "Day"])
        # 분 단위 지표(원-핫 또는 인디케이터)
        sed_col  = _find_col(sample.columns, ["sed", "sedentary", "SED"])
        lgt_col  = _find_col(sample.columns, ["light", "lgt", "LIGHT"])
        mod_col  = _find_col(sample.columns, ["mod", "moderate", "MOD"])
        vig_col  = _find_col(sample.columns, ["vig", "vigorous", "VIG"])
        # 혹시 per-minute 카운트가 있다면(없어도 됨)
        cnt_col  = _find_col(sample.columns, ["counts", "cpm", "cnt"])

        # 필수키 확인
        assert id_col and year_col, f"[{path}] ID/year 컬럼을 찾지 못했습니다."
        assert date_col, f"[{path}] 날짜(일자) 컬럼(date/day/mod_d 등)을 찾지 못했습니다."
        assert (sed_col or lgt_col or mod_col or vig_col), f"[{path}] sed/light/mod/vig 중 최소 1개가 필요합니다."

        usecols = [id_col, year_col, date_col]
        for c in [sed_col, lgt_col, mod_col, vig_col, cnt_col]:
            if c and c not in usecols:
                usecols.append(c)

        # 2) chunk 단위로 읽어서 '개인-일자' 집계
        daily_accum = []  # (ID, year, date, wear_min, sed_min, mvpa_min)

        total_rows = 0
        for chunk in pd.read_csv(path, usecols=usecols, chunksize=chunk_size):
            total_rows += len(chunk)
            # 타입/정리
            chunk[id_col]   = chunk[id_col].astype(str)
            chunk[year_col] = _to_num(chunk[year_col]).astype("Int64")

            # 날짜 → 날짜형(일 단위만 유지)
            # mod_d 처럼 문자열(YYYY.MM.DD. or YYYY-MM-DD 등)도 처리가능
            chunk[date_col] = pd.to_datetime(chunk[date_col], errors="coerce").dt.date

            # 분단위 지표 0/1 보정
            for c in [sed_col, lgt_col, mod_col, vig_col]:
                if c:
                    chunk[c] = _to_num(chunk[c]).fillna(0).clip(lower=0)
                    # 값이 0/1이 아니어도 0/1로 수렴(>0 → 1)
                    chunk[c] = (chunk[c] > 0).astype(int)

            # 착용 분(wear): sed|light|mod|vig 중 1이라도 1이면 착용으로 간주
            wear_arr = None
            for c in [sed_col, lgt_col, mod_col, vig_col]:
                if c:
                    wear_arr = chunk[c] if wear_arr is None else (wear_arr | (chunk[c] == 1))
            chunk["_wear"] = wear_arr.astype(int)

            # MVPA = mod + vig (분)
            mvpa_arr = None
            for c in [mod_col, vig_col]:
                if c:
                    mvpa_arr = chunk[c] if mvpa_arr is None else (mvpa_arr + chunk[c])
            if mvpa_arr is None:
                mvpa_arr = 0
            chunk["_mvpa"] = mvpa_arr

            # sed 분
            chunk["_sed"] = chunk[sed_col] if sed_col else 0

            # 일자 집계
            g = chunk.groupby([id_col, year_col, date_col], as_index=False).agg(
                wear_min=("_wear", "sum"),
                sed_min =("_sed",  "sum"),
                mvpa_min=("_mvpa", "sum"),
            )
            daily_accum.append(g)

            # 메모리
            del chunk
            gc.collect()

            print(f"  - processed rows: {total_rows:,}", end="\r")

        if not daily_accum:
            print(f"[WARN] {year}: daily_accum 비어있음 → 스킵")
            continue

        daily = pd.concat(daily_accum, ignore_index=True)
        # 3) 유효일 기준 필터(≥10h/day)
        daily = daily[daily["wear_min"] >= min_wear_per_day].copy()

        # 4) 개인 요약: 일수, 평균 착용분, 평균 MVPA, 좌식비율(총 sed / 총 wear)
        person = (
            daily
            .groupby([id_col, year_col], as_index=False)
            .agg(
                n_days     = ("wear_min", "size"),
                worn_total = ("wear_min", "sum"),
                sed_total  = ("sed_min",  "sum"),
                mvpa_total = ("mvpa_min", "sum"),
            )
        )
        person["worn_min_day"] = person["worn_total"] / person["n_days"]
        person["mvpa_min_day"] = person["mvpa_total"] / person["n_days"]
        person["sed_ratio"]    = np.where(person["worn_total"] > 0,
                                          person["sed_total"] / person["worn_total"],
                                          np.nan)
        # 유효일수 기준(≥4일) 필터
        person = person[person["n_days"] >= min_valid_days].copy()

        # 표준 컬럼명 정리
        person = person.rename(columns={id_col: "ID", year_col: "year"})
        person = person[["ID","year","n_days","worn_min_day","mvpa_min_day","sed_ratio"]].copy()
        person["ID"] = person["ID"].astype(str)
        person["year"] = person["year"].astype("Int64")

        # 연도별 저장(선택)
        out_y = f"csv/{year}_pam_person.csv.gz"
        person.to_csv(out_y, index=False, compression="gzip")
        print(f"[SAVE] {year} → {out_y} | rows={len(person):,}")

        per_year_person.append(person)

        # 큰 객체 해제
        del daily_accum, daily, person
        gc.collect()

    if not per_year_person:
        raise RuntimeError("모든 연도에서 person-level 요약이 생성되지 않았습니다.")

    act = pd.concat(per_year_person, ignore_index=True)
    act.to_csv(out_person_path, index=False, compression="gzip")
    print(f"[DONE] person-level 통합 저장 → {out_person_path} | rows={len(act):,}")
    print("Columns:", list(act.columns))
    return act

# ========== 기존 통합 단계와 연결(업데이트) ==========
def read_pam_person_safely():
    merged_path = "csv/activity_person_2014_2017.csv.gz"
    if os.path.exists(merged_path):
        d = pd.read_csv(merged_path, dtype={"ID":"string"})
        print(f"[LOAD] {merged_path} | shape={d.shape}")
        return d
    else:
        print("[MISS] person-level PAM 통합 파일 없음 → 즉시 생성")
        return build_activity_person(PATH_PAM)

def build_analysis_ready_updated():
    # 1) person-level PAM
    act = read_pam_person_safely()

    # 2) health 테이블
    hlth_path = "csv/health_2014_2017.csv.gz"
    assert os.path.exists(hlth_path), "health_2014_2017.csv.gz가 필요합니다. 먼저 build_health_table()을 실행하세요."
    hlth = pd.read_csv(hlth_path, dtype={"ID":"string"})
    # 타입 정리
    for d in (act, hlth):
        if "year" in d.columns:
            d["year"] = pd.to_numeric(d["year"], errors="coerce").astype("Int64")
        d["ID"] = d["ID"].astype(str)

    # 3) 병합
    df = act.merge(hlth, on=["ID","year"], how="inner")
    print("Merged shape:", df.shape)

    # 4) 노출 스케일
    if "mvpa_min_day" in df: df["mvpa10"] = df["mvpa_min_day"] / 10.0
    if "sed_ratio" in df:    df["sed10"]  = df["sed_ratio"] * 10.0

    # 5) 가중치/설계 폴백
    if "wt_tot" not in df.columns:
        print("[HOTFIX] 가중치 미존재 → wt_tot=1.0")
        df["wt_tot"] = 1.0
    if not {"psu","kstrata"}.issubset(df.columns):
        print("[HOTFIX] psu/kstrata 없음 → 설계부트 불가(후속분석 폴백)")

    outp = "csv/analysis_ready_expanded.csv.gz"
    df.to_csv(outp, index=False, compression="gzip")
    print(f"[SAVE] {outp} | shape={df.shape}")
    print("Cols(head):", list(df.columns)[:20])
    return df


## (참고) 우리가 겪었던 대표 이슈 & 가드

- **Series→bool 축소**로 `.astype(int)` 에러 → 연산 전 컬럼 존재 여부 확인
- **사분위수 분할 실패**(`Bin edges must be unique`) → 착용시간 분산 낮을 때 `duplicates="drop"` 또는 분할 스킵
- **열명 대소문자/이형** → lower-case 매칭으로 표준화
- **설계정보 부재** → survey 설계기반 SE 불가 → 폴백으로 단순가중/부트스트랩
- **디렉터리 누락** → `csv/`, `out/` 사전 생성
