In [1]:
# train_drowsiness_lstm.py
import os, re, json, math, random, shutil
from pathlib import Path
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from collections import Counter
import tensorflow as tf

In [2]:
# 기본값 설정

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

FEATURE_COLS = [
    "EAR",
    "MAR",
    "yawn_rate_per_min",
    "blink_rate_per_min",
    "avg_blink_dur_sec",
    "longest_eye_closure_sec",
]

# level: -1,0,1,2,3  -> 총 5클래스. (-1 포함)
CLS_LEVELS = [-1, 0, 1, 2, 3]
LEVEL_TO_INDEX = {lvl: i for i, lvl in enumerate(CLS_LEVELS)}
INDEX_TO_LEVEL = {i: lvl for lvl, i in LEVEL_TO_INDEX.items()}

#### 유형 데이타셋 준비

In [3]:
# 데이타셋 구성을 위한 기본 함수들..

def _safe_symlink(src: Path, dst: Path):
    # 숨김 파일(. , ._ 시작)은 무시
    if src.name.startswith(".") or src.name.startswith("._"):
        return

    dst.parent.mkdir(parents=True, exist_ok=True)
    try:
        if dst.exists() or dst.is_symlink():
            dst.unlink()
        os.symlink(src.resolve(), dst)
    except Exception:
        # 윈도우/권한 이슈 대비: 복사로 폴백
        shutil.copy2(src, dst)

def scan_pairs(data_dir: Path) -> List[Tuple[Path, Path]]:
    """
    data_dir 아래의 CSV/JSON 쌍을 찾아 반환.
    파일명(확장자 제외)이 일치하면 쌍으로 간주.
    """
    csvs = {}
    jsons = {}
    for p in data_dir.glob("./*"):
        if p.is_file():
            stem = p.stem  # 파일명(확장자 제외)
            if p.suffix.lower() == ".csv":
                csvs[stem] = p
            elif p.suffix.lower() == ".json":
                jsons[stem] = p
    pairs = []
    for stem, csv_path in csvs.items():
        if stem in jsons:
            pairs.append((csv_path, jsons[stem]))
    return sorted(pairs)

def split_and_link(pairs: List[Tuple[Path, Path]], out_dir: Path, train_ratio=0.8):
    """
    쌍 목록을 8:2로 분할하고, out_dir/{train,val}에 심볼릭링크(또는 복사) 생성.
    """
    random.shuffle(pairs)
    n_total = len(pairs)
    n_train = int(round(n_total * train_ratio))
    train_pairs = pairs[:n_train]
    val_pairs = pairs[n_train:]

    for split_name, subset in [("train", train_pairs), ("val", val_pairs)]:
        for csv_path, json_path in subset:
            rel = csv_path.name  # 파일명만 사용
            dst_csv = out_dir / split_name / rel
            dst_json = out_dir / split_name / json_path.name
            _safe_symlink(csv_path, dst_csv)
            _safe_symlink(json_path, dst_json)
    return train_pairs, val_pairs

In [4]:
# 각자 데이터셋 디렉토리 설정
data_root = "./dataset/"
prepared_dir = "prepared-dataset"

In [5]:
data_root = Path(data_root).expanduser().resolve()
prep_dir = Path(prepared_dir).resolve()
# prep_train_dir = Path(prep_train_dir).expanduser().resolve()
# prep_val_dir = Path(prep_val_dir).expanduser().resolve()
prep_dir.mkdir(parents=True, exist_ok=True)

In [6]:
# train_pairs = scan_pairs(prep_train_dir)
# val_pairs = scan_pairs(prep_val_dir)
# print(f"학습: {len(train_pairs)} / 검증: {len(val_pairs)}")

In [7]:
# 스캔 & 분할 & 링크
pairs = scan_pairs(data_root)
print(f"총 파일쌍: {len(pairs)}")
train_pairs, val_pairs = split_and_link(pairs, prep_dir, train_ratio=0.8)
print(f"학습: {len(train_pairs)} / 검증: {len(val_pairs)}")

총 파일쌍: 204
학습: 163 / 검증: 41


#### 라벨 관련 함수들

In [8]:
# -------------------------
# 라벨 JSON 해석 (유연 파서)
# -------------------------
def parse_label_json(json_path: Path) -> List[Tuple[int, int, int]]:
    """
    라벨 JSON을 다양한 스키마로 수용하여 [(start, end, level), ...] 리스트로 변환.
    - end는 inclusive로 간주 (start <= frame <= end)
    - 인식 실패 시 빈 리스트
    """
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    segs = []

    def norm_one(d):
        # 키 변형 수용
        if isinstance(d, dict):
            # 유형 1: 명시 키
            s = d.get("start_frame", d.get("frame_start", d.get("start")))
            e = d.get("end_frame", d.get("frame_end", d.get("end")))
            lvl = d.get("level", d.get("lvl", d.get("label", d.get("state"))))
            if s is not None and e is not None and lvl is not None:
                return int(s), int(e), int(lvl)
            # 유형 2: frames: [s, e]
            if "frames" in d and isinstance(d["frames"], (list, tuple)) and len(d["frames"]) >= 2:
                s, e = d["frames"][:2]
                lvl = int(d.get("level", d.get("label", -1)))
                return int(s), int(e), int(lvl)
        return None

    if isinstance(data, list):
        for item in data:
            r = norm_one(item)
            if r: segs.append(r)
    elif isinstance(data, dict):
        # 유형 3: {"segments":[{...}, ...]}
        if "segments" in data and isinstance(data["segments"], list):
            for item in data["segments"]:
                r = norm_one(item)
                if r: segs.append(r)
        # 유형 4: {"ranges": {"100-200": 2, ...}}
        elif "ranges" in data and isinstance(data["ranges"], dict):
            for k, v in data["ranges"].items():
                m = re.match(r"^\s*(\d+)\s*[-:]\s*(\d+)\s*$", str(k))
                if m:
                    s, e = int(m.group(1)), int(m.group(2))
                    segs.append((s, e, int(v)))
        # 유형 5: {"100-200": 1, "201-260": 2}
        else:
            for k, v in data.items():
                m = re.match(r"^\s*(\d+)\s*[-:]\s*(\d+)\s*$", str(k))
                if m:
                    s, e = int(m.group(1)), int(m.group(2))
                    segs.append((s, e, int(v)))

    # 정렬 & 병합은 생략(중복은 뒤값 우선)
    segs.sort(key=lambda x: (x[0], x[1]))
    return segs

def build_frame_labels(max_frame: int, segments: List[Tuple[int,int,int]], default_level=-1) -> np.ndarray:
    """
    0..max_frame 범위에 대해 프레임별 level 배열 만들기.
    """
    labels = np.full((max_frame+1,), default_level, dtype=np.int32)
    for s, e, lvl in segments:
        s = max(0, s); e = min(max_frame, e)
        if e >= s:
            labels[s:e+1] = int(lvl)
    return labels


#### CSV 로딩 관련 함수

In [9]:
def infer_fps_from_csv(df: pd.DataFrame) -> float:
    """
    CSV의 frame과 time_sec로 FPS 추정.
    """
    df = df.sort_values("frame")
    if "time_sec" in df.columns and df["time_sec"].max() > 0:
        f_span = float(df["frame"].iloc[-1] - df["frame"].iloc[0])
        t_span = float(df["time_sec"].iloc[-1] - df["time_sec"].iloc[0])
        if t_span > 0 and f_span > 0:
            return max(1.0, round(f_span / t_span, 2))
    # fallback: frame 차분의 중앙값을 1프레임으로 간주
    return 30.0  # 보수적 기본값

def load_csv(csv_path: Path) -> pd.DataFrame:
    encodings_to_try = ["utf-8", "cp949", "euc-kr"]

    if csv_path.name.startswith(".") or csv_path.name.startswith("._"):
        return pd.DataFrame()  # 빈 데이터프레임 반환 (혹은 None 처리 가능)

    print("current csv_path:", csv_path)
    
    last_err = None
    for enc in encodings_to_try:
        try:
            df = pd.read_csv(csv_path, encoding=enc)
            break
        except UnicodeDecodeError as e:
            last_err = e
    else:
        # 모든 인코딩 실패 시 에러
        raise last_err

    # 필수 컬럼 체크
    must = ["frame", "time_sec"] + FEATURE_COLS
    missing = [c for c in must if c not in df.columns]
    if missing:
        raise ValueError(f"[{csv_path.name}] 누락 컬럼: {missing}")

    df = df.sort_values("frame").reset_index(drop=True)
    return df

#### 윈도윙 함수

In [10]:
def make_windows_for_pair(
    csv_path: Path, json_path: Path,
    window_sec=5.0, hop_sec=1.0,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    하나의 (CSV, JSON) 쌍에서 (X, y) 윈도우 생성
    - X: (num_win, seq_len, num_feat)
    - y: (num_win,)  # 윈도우 대표 레벨 (다수결)
    """
    df = load_csv(csv_path)

    fps = infer_fps_from_csv(df)
    win = max(1, int(round(window_sec * fps)))
    hop = max(1, int(round(hop_sec * fps)))

    # 프레임별 라벨
    segs = parse_label_json(json_path)
    max_frame = int(df["frame"].max())
    frame_labels = build_frame_labels(max_frame, segs, default_level=-1)

    # 특성 행렬
    feats = df[FEATURE_COLS].astype(float).copy()
    # NaN 보정(앞채움→뒤채움→0)
    feats = feats.ffill().bfill().fillna(0.0)
    feat_np = feats.to_numpy(dtype=np.float32)
    frames = df["frame"].to_numpy(dtype=np.int32)

    # 프레임 인덱스 → 라벨 매핑
    label_by_row = np.array([frame_labels[f] if f <= max_frame else -1 for f in frames], dtype=np.int32)

    X_list, y_list = [], []
    start = 0
    last_start = len(df) - win
    while start <= last_start:
        end = start + win
        seq = feat_np[start:end, :]  # (win, feat)
        seq_labels = label_by_row[start:end]  # (win,)

        # 윈도우 대표 라벨: 다수결(동점이면 마지막 프레임값)
        cnt = Counter(seq_labels.tolist())
        most = max(cnt.items(), key=lambda kv: (kv[1], kv[0]))[0]
        y = most

        X_list.append(seq)
        y_list.append(y)
        start += hop

    if not X_list:
        return np.empty((0, win, len(FEATURE_COLS)), dtype=np.float32), np.empty((0,), dtype=np.int32)

    X = np.stack(X_list, axis=0).astype(np.float32)
    y = np.array(y_list, dtype=np.int32)
    return X, y

#### numpy 데이타셋 만들기

In [11]:
window_sec = 10.
hop_sec = 1.

In [12]:
def build_numpy_dataset(pairs: List[Tuple[Path,Path]], window_sec=5.0, hop_sec=1.0):
    Xs, ys = [], []
    for csv_path, json_path in pairs:
        Xi, yi = make_windows_for_pair(csv_path, json_path, window_sec, hop_sec)
        if len(Xi) > 0:
            Xs.append(Xi); ys.append(yi)
    if not Xs:
        raise RuntimeError("윈도우가 생성되지 않았습니다. 입력 파일/라벨을 확인하세요.")

    max_len = max(X.shape[1] for X in Xs)
    Xs_padded = [
        np.pad(X, ((0,0),(0,max_len-X.shape[1]),(0,0)), mode="constant")
        if X.shape[1] < max_len else X
        for X in Xs
    ]
    
    X = np.concatenate(Xs_padded, axis=0)
    y = np.concatenate(ys, axis=0)
    return X, y

In [13]:
# 넘파이 데이터셋
Xtr, ytr = build_numpy_dataset(train_pairs, window_sec, hop_sec)
Xva, yva = build_numpy_dataset(val_pairs, window_sec, hop_sec)

current csv_path: D:\파이널 프로젝트 데이터\dataset\20-FemaleNoGlasses-Yawning-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\27-MaleGlasses-Normal-label.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\10-MaleNoGlasses-Normal-label.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\12-MaleGlasses-Normal-label.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\19-FemaleNoGlasses-Normal-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\43-MaleNoGlasses-Normal-label.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\gA_2_s5_2019-03-13T09;19;23+01;00_rgb_face-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\37-FemaleNoGlasses-Talking&Yawning-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\25-FemaleNoGlasses-Yawning-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\45-MaleNoGlasses-Yawning-label.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\33-FemaleNoGlasses-Normal-labeled.csv
current csv_path: D:\파이널 프로젝트 데이터\dataset\28-MaleGlasses-Yawning-label.csv
current csv_path: D:\파이널 프로젝트 데이터\datase

#### 스케일링..

In [14]:
def fit_standardizer(X: np.ndarray):
    """
    채널별 표준화 스케일러(평균/표준편차) 반환.
    """
    # X shape: (N, T, C)
    mean = X.reshape(-1, X.shape[-1]).mean(axis=0)
    std = X.reshape(-1, X.shape[-1]).std(axis=0)
    std = np.where(std < 1e-8, 1.0, std)
    return mean.astype(np.float32), std.astype(np.float32)

def apply_standardizer(X: np.ndarray, mean: np.ndarray, std: np.ndarray):
    return (X - mean) / std

In [15]:
# 스케일링 (학습 기반)
mean, std = fit_standardizer(Xtr)
Xtr_scaled = apply_standardizer(Xtr, mean, std)
Xva_scaled = apply_standardizer(Xva, mean, std)

### 산출물 검증 파트

In [16]:
npz = np.load("scaler_mean_std.npz")
npz_mean = npz['mean']
npz_std = npz['std']
npz.close()

print(npz_mean == mean)
print(npz_std == std)

[ True  True  True  True  True  True]
[ True  True  True  True  True  True]


In [17]:
npz_Xtr = apply_standardizer(Xtr, npz_mean, npz_std)
npz_Xva = apply_standardizer(Xva, npz_mean, npz_std)

In [18]:
def to_tf_dataset(X: np.ndarray, y: np.ndarray, batch=64, shuffle=True):
    # y를 인덱스로 변환(-1..3 → 0..4)
    y_idx = np.array([LEVEL_TO_INDEX[int(v)] for v in y], dtype=np.int32)
    ds = tf.data.Dataset.from_tensor_slices((X, y_idx))
    if shuffle:
        ds = ds.shuffle(min(len(X), 10000), seed=RANDOM_SEED)
    ds = ds.batch(batch).prefetch(tf.data.AUTOTUNE)
    return ds

In [19]:
# TF Dataset
batch = 64

ds_tr = to_tf_dataset(npz_Xtr, ytr, batch=batch, shuffle=True)
ds_va = to_tf_dataset(npz_Xva, yva, batch=batch, shuffle=False)

In [20]:
loaded_model = tf.keras.models.load_model("best_lstm.keras")
loaded_eval_res = loaded_model.evaluate(ds_va, verbose=0)
print(f"[VAL] loss={loaded_eval_res[0]:.4f}, acc={loaded_eval_res[1]:.4f}")

[VAL] loss=0.6963, acc=0.7313


In [21]:
loaded_model.summary()

In [23]:
# 예측 샘플 & 혼동행렬
y_true = np.array([y for _, y in ds_va.unbatch().as_numpy_iterator()])
# y_true = np.concatenate([y for _, y in ds_va.unbatch().as_numpy_iterator()], axis=0)
y_pred = []
for xb, _ in ds_va:
    pb = loaded_model.predict(xb, verbose=0)
    y_pred.append(np.argmax(pb, axis=1))
y_pred = np.concatenate(y_pred, axis=0)

# 간단 리포트
from sklearn.metrics import classification_report, confusion_matrix
labels = sorted(set(y_true) | set(y_pred))  # 실제 등장한 클래스
print(classification_report(
    y_true, y_pred,
    labels=labels,
    target_names=[str(INDEX_TO_LEVEL[i]) for i in labels]
))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.57      0.68      0.62       121
           1       0.69      0.66      0.67       220
           2       0.48      0.48      0.48       239
           3       0.87      0.85      0.86       678

    accuracy                           0.73      1258
   macro avg       0.65      0.67      0.66      1258
weighted avg       0.74      0.73      0.73      1258

Confusion matrix:
 [[ 82   5  34   0]
 [ 27 146  22  25]
 [ 12  54 115  58]
 [ 24   8  69 577]]
