<a href="https://colab.research.google.com/github/gaegoori/handsignproject/blob/datapreprocessing/%EA%B8%B0%ED%95%99%EA%B8%B0%EC%A0%84%EC%B2%98%EB%A6%AC_%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%A6%9D%EA%B0%95.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Drive 마운트
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
%%bash
SRC="/content/drive/MyDrive/keypoints_all"
DST="/content/drive/MyDrive/keypoints_160F"

mkdir -p "$DST"

echo "정확한 WORD0001~0160 + REAL01~16 + F 버전만 추출 시작"

for z in "$SRC"/*.zip; do
    echo "ZIP 처리 중: $z"

    # 정확한 숫자 범위만 포함 (0001~0160)
    for i in $(seq -f "%04g" 1 160); do
        unzip -qq "$z" "*/NIA_SL_WORD${i}_REAL*_F*" -d "$DST"
    done
done

echo "추출 완료"
echo "저장 위치: $DST"

Process is terminated.


In [None]:
import os
from zipfile import ZipFile

zip_path = "/content/drive/MyDrive/01_real_word_morpheme.zip"   # 네가 업로드한 경로
extract_to = "/content/drive/MyDrive/morpheme_full"

os.makedirs(extract_to, exist_ok=True)

with ZipFile(zip_path, 'r') as zf:
    zf.extractall(extract_to)

print("압축해제 완료:", extract_to)

압축해제 완료: /content/drive/MyDrive/morpheme_full


In [None]:
import os, json, glob
from tqdm import tqdm

# 0. 경로 설정
# Keypoint 폴더의 루트 경로
keypoint_root = "/content/drive/MyDrive/keypoints_160F"
# Morpheme 파일의 루트 경로
morpheme_root = "/content/drive/MyDrive/morpheme_full/morpheme"
# 최종 JSON 파일이 저장될 경로
output_dir = "/content/drive/MyDrive/sign_data/fast_index"

os.makedirs(output_dir, exist_ok=True)

# ============================
# 1. WORD 0001 ~ 0160 자동 생성
# ============================
selected_words = {f"NIA_SL_WORD{str(i).zfill(4)}" for i in range(1, 161)}
print(f"선택된 단어 수: {len(selected_words)}개 (160개면 정상)")


# ============================
# 2. morpheme 파일 로드 및 REAL ID 딕셔너리 생성
# (NIA_SL_WORD0001_REAL02 → 파일 경로)
# ============================
morpheme_files = glob.glob(os.path.join(morpheme_root, "**/*.json"), recursive=True)
morpheme_dict = {}

def extract_real_id_from_morpheme(filename):
    # NIA_SL_WORD0001_REAL02_morpheme.json → NIA_SL_WORD0001_REAL02 추출
    name = os.path.basename(filename).replace("_morpheme.json", "")
    return name.rsplit("_", 1)[0]

for f in tqdm(morpheme_files, desc="Morpheme 파일 로드"):
    real_id = extract_real_id_from_morpheme(f)
    base_word = real_id.split("_REAL")[0] # NIA_SL_WORD0001

    # 선택된 단어에 포함되는 경우에만 딕셔너리에 추가
    if base_word in selected_words:
        # REAL ID를 고유 키로 사용하여 1:1 매칭 준비
        morpheme_dict[real_id] = f

print(f"필터링된 morpheme (REAL ID 기준) 개수: {len(morpheme_dict)}개")


# ============================
# 3. keypoint 폴더 로드 (F 버전만)
# ============================
keypoint_dirs = []

for root, dirs, files in os.walk(keypoint_root):
    for d in dirs:
        # 예: NIA_SL_WORD0001_REAL03_F 형식의 폴더만 선택
        if d.startswith("NIA_SL_WORD") and "_REAL" in d and d.endswith("_F"):
            keypoint_dirs.append(os.path.join(root, d))

print(f"선택된 keypoint 폴더 수: {len(keypoint_dirs)}개")


def extract_real_id_from_kp(folder_name):
    # NIA_SL_WORD0001_REAL03_F → NIA_SL_WORD0001_REAL03 추출
    return folder_name.rsplit("_", 1)[0]


# ============================
# 4. keypoint ↔ morpheme 1:1 매칭 및 fast_index 생성
# ============================
fast_index = []

for kp_path in tqdm(keypoint_dirs, desc="단어 매칭 중"):
    folder_name = os.path.basename(kp_path)

    # 1:1 매칭을 위한 REAL ID 추출
    real_id = extract_real_id_from_kp(folder_name)

    # base_word 추출 (선택된 단어 확인용)
    base_word = real_id.split("_REAL")[0]

    # base_word가 선택 목록에 있고, 해당 REAL ID의 morpheme 파일이 morpheme_dict에 있는 경우 매칭
    if base_word in selected_words and real_id in morpheme_dict:
        morpheme_path = morpheme_dict[real_id]

        fast_index.append({
            "word_id": base_word,
            "real_id": real_id, # 매칭된 고유 ID
            "keypoint_path": kp_path,
            "morpheme_path": morpheme_path
        })

print(f"최종 매칭된 데이터 개수: {len(fast_index)}개")


# ============================
# 5. JSON 저장
# ============================
output_path = os.path.join(output_dir, "fast_match.json")
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(fast_index, f, indent=4, ensure_ascii=False)

print("\n 저장 완료:", output_path)

선택된 단어 수: 160개 (160개면 정상)


Morpheme 파일 로드: 100%|██████████| 159672/159672 [00:00<00:00, 353926.36it/s]


필터링된 morpheme (REAL ID 기준) 개수: 1760개
선택된 keypoint 폴더 수: 2560개


단어 매칭 중: 100%|██████████| 2560/2560 [00:00<00:00, 105017.59it/s]

최종 매칭된 데이터 개수: 1760개

 저장 완료: /content/drive/MyDrive/sign_data/fast_index/fast_match.json





In [None]:
!pip install ujson

Collecting ujson
  Downloading ujson-5.11.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (9.4 kB)
Downloading ujson-5.11.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (57 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.4/57.4 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ujson
Successfully installed ujson-5.11.0


In [None]:
import os, json, ujson, numpy as np
from tqdm import tqdm
from multiprocessing import Pool, cpu_count

# ============================
# 0. 경로 설정
# ============================
# 입력 파일: 이전 단계에서 생성된 1:1 매칭 인덱스 파일
fast_index_path = "/content/drive/MyDrive/sign_data/fast_index/fast_match.json"
# 출력 파일: Keypoint와 형태소 정보를 결합한 최종 데이터셋
output_path = "/content/drive/MyDrive/sign_data/processed/merged_dataset.json"

os.makedirs(os.path.dirname(output_path), exist_ok=True)

# 매칭 인덱스 로드
try:
    with open(fast_index_path, "r", encoding="utf-8") as f:
        pairs = json.load(f)
except FileNotFoundError:
    print(f"오류: fast_index 파일이 경로에 없습니다. 경로를 확인해주세요: {fast_index_path}")
    pairs = []
except Exception as e:
    print(f"오류: fast_index 파일 로드 중 예외 발생: {e}")
    pairs = []

print(f"처리할 매칭 쌍 개수: {len(pairs)}개")


# ============================
# 1. Keypoint 데이터 로드 및 정제 함수
# ============================
def load_person_data(fp):
    """프레임 하나 빠르게 읽고 (N, 3) Keypoint 배열 반환"""
    try:
        # ujson으로 파일 빠르게 로드
        with open(fp, "r") as f:
            kf = ujson.load(f)
    except:
        return None

    # 'people' 데이터 추출 및 형식 통일
    people = kf.get("people", [])
    if isinstance(people, dict):
        people = [people.get("0", people)]

    if not people:
        return None

    person = people[0]

    def safe_array(key):
        """키포인트를 안전하게 로드하고 (N, 3) 형태로 변환"""
        arr = person.get(key, [])
        if not arr:
            # 데이터가 없는 경우 (0, 3) 배열 반환
            return np.zeros((0, 3), dtype=np.float32)

        arr = np.array(arr, dtype=np.float32)
        # 2D 키포인트는 (N*3,) 형태이므로 (N, 3)으로 재구성
        return arr.reshape(-1, 3)

    # Keypoint 추출 및 연결 순서: Pose(25) + LeftHand(21) + RightHand(21) + Face(70) = 137 Keypoints
    p = safe_array("pose_keypoints_2d")
    lh = safe_array("hand_left_keypoints_2d")
    rh = safe_array("hand_right_keypoints_2d")
    face = safe_array("face_keypoints_2d")

    # 모든 키포인트 연결
    full = np.concatenate([p, lh, rh, face], axis=0)

    # 신뢰도(Confidence, 3번째 열)가 0.5 미만인 키포인트는 0으로 설정
    full[full[:, 2] < 0.5] = 0

    return full


# ============================
# 2. 병렬 처리 함수
# ============================
def process_item(item):
    """단어 1개의 Keypoint와 Morpheme을 결합하여 프레임 시퀀스 생성"""
    keypoint_dir = item["keypoint_path"]
    morpheme_path = item["morpheme_path"]
    fps = 30 # 초당 프레임 수 (NIA 수어 데이터셋 표준)
    real_id = item.get("real_id", "N/A") # real_id 가져오기

    # Morpheme 로드
    try:
        with open(morpheme_path, "r", encoding="utf-8") as f:
            mor = ujson.load(f)
        segments = mor.get("data", [])
        if not isinstance(segments, list):
             segments = [segments]
    except Exception:
        # print(f"Morpheme 로드 오류: {morpheme_path}")
        return []

    # Keypoint 프레임 파일 목록
    frame_files = sorted([
        os.path.join(keypoint_dir, f)
        for f in os.listdir(keypoint_dir)
        if f.endswith(".json")
    ])

    if not frame_files:
        return []

    # 모든 프레임 데이터 로드
    frames = []
    for fp in frame_files:
        full = load_person_data(fp)
        # 키포인트 개수 확인 (137개 = 25+21+21+70)
        if full is not None and full.shape[0] == 137:
            frames.append(full)

    if not frames:
        return []

    frames = np.stack(frames).astype(np.float32)
    results = []

    # 형태소 구간별 잘라서 저장
    for seg in segments:
        try:
            start_idx = int(seg["start"] * fps)
            end_idx = int(seg["end"] * fps)

            # 유효성 검사
            if start_idx >= len(frames) or end_idx <= start_idx:
                continue

            # 프레임 자르기
            cut = frames[start_idx:min(end_idx, len(frames))]

            results.append({
                "word_id": item["word_id"],
                "real_id": real_id,
                "label": seg["attributes"], # 형태소 레이블
                "frames": cut.tolist() # 넘파이 배열을 리스트로 변환
            })
        except Exception:
             # print(f"세그먼트 처리 오류: {real_id} - {seg}")
             continue

    return results


# ============================
# 3. 단어 전체를 병렬 처리 및 데이터 병합
# ============================
merged_data = []
if pairs:
    # CPU 코어 수만큼 Pool 생성
    core_count = cpu_count()
    print(f"사용 가능한 CPU 코어 수: {core_count}")
    with Pool(core_count) as P:
        # imap_unordered를 사용하여 결과를 비동기적으로 가져옴
        for res in tqdm(P.imap_unordered(process_item, pairs), total=len(pairs), desc="데이터 결합 중"):
            merged_data.extend(res)

# ============================
# 4. JSON 저장 (ujson 사용)
# ============================
if merged_data:
    try:
        print("\n데이터를 JSON 파일로 저장 중...")
        with open(output_path, "w") as f:
            ujson.dump(merged_data, f)

        print("\n 완료")
        print(f"최종 저장된 형태소 세그먼트 개수: {len(merged_data)}개")
        print(f"경로: {output_path}")
    except Exception as e:
        print(f"저장 오류: {e}")
else:
    print("\n 처리된 데이터가 없어 저장하지 않습니다.")

처리할 매칭 쌍 개수: 1760개
사용 가능한 CPU 코어 수: 2


데이터 결합 중: 100%|██████████| 1760/1760 [10:22:26<00:00, 21.22s/it]



데이터를 JSON 파일로 저장 중...

✅ 완료
최종 저장된 형태소 세그먼트 개수: 1757개
경로: /content/drive/MyDrive/sign_data/processed/merged_dataset.json


In [None]:
# 0. JSON 로딩 + 기본 구조 확인
import numpy as np
import json
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 1. 파일 경로
file_path = '/content/drive/MyDrive/sign_data/processed/merged_dataset.json'

print("데이터 로드 시작...")
with open(file_path, 'r') as f:
    data_json = json.load(f)
print("로드 완료. 총 세그먼트 수:", len(data_json))

# 첫 항목 구조 확인
print("첫 샘플 키:", list(data_json[0].keys()))


데이터 로드 시작...
로드 완료. 총 세그먼트 수: 1757
첫 샘플 키: ['word_id', 'real_id', 'label', 'frames']


In [None]:
# 1. frames 데이터만 추출 → padding → raw_data 만들기
print("frames 추출 시작...")

keypoint_data_list = [item["frames"] for item in data_json]

print(f"총 {len(keypoint_data_list)}개 샘플 padding 중...")

padded_data = pad_sequences(keypoint_data_list, padding='post', dtype='float32')

raw_data = np.array(padded_data)

print("padding 완료. raw_data shape:", raw_data.shape)


frames 추출 시작...
총 1757개 샘플 padding 중...
padding 완료. raw_data shape: (1757, 106, 137, 3)


In [None]:
# 2. raw_data 4D → (N,T,F*3) 형태로 변환
# raw_data: (N, T, 137, 3)

try:
    N, T, P, C = raw_data.shape  # P=137, C=3
    F = P * C
    raw_data = raw_data.reshape(N, T, F)
    print("reshape 완료 →", raw_data.shape)
except:
    print("이미 (N,T,F) 형태일 가능성 있음.")
    raw_data = raw_data  # 그대로 사용


reshape 완료 → (1757, 106, 411)


In [None]:
# 3. Mid-Hip 기준 상대좌표 변환
def apply_midhip_relative(raw):
    print("Mid-Hip 상대좌표 변환 중...")
    N, T, F = raw.shape
    num_points = F // 3
    REF = 0  # Mid-Hip index

    ref_point = raw[:, :, REF*3:(REF+1)*3]
    ref_point_expand = np.tile(ref_point, (1,1,num_points))

    rel = raw - ref_point_expand
    print("완료:", rel.shape)
    return rel

X_midhip = apply_midhip_relative(raw_data)

Mid-Hip 상대좌표 변환 중...
완료: (1757, 106, 411)


In [None]:
# 4. 핵심 Keypoint 선택 (총 18개 → 54차원)
POSE_IDXS = [0,1,2,3,4,5,6,7]
HAND_IDXS = [4,8,12,16,20]

def select_keypoints(frame_137x3):
    pose = frame_137x3[:25]
    lh   = frame_137x3[25:46]
    rh   = frame_137x3[46:67]

    selected = []
    for idx in POSE_IDXS: selected.append(pose[idx])
    for idx in HAND_IDXS: selected.append(lh[idx])
    for idx in HAND_IDXS: selected.append(rh[idx])

    selected = np.array(selected)
    return selected.reshape(-1)  # 54

def apply_keypoint_selection(X):
    print("핵심 keypoint 선택 중...")
    N,T,F = X.shape
    num_points = F // 3
    X_reshaped = X.reshape(N,T,num_points,3)

    X_new = []
    for i in range(N):
        seq_new = []
        for t in range(T):
            seq_new.append(select_keypoints(X_reshaped[i,t]))
        X_new.append(seq_new)

    print("완료:", np.array(X_new).shape)
    return np.array(X_new)

X_selected = apply_keypoint_selection(X_midhip)

핵심 keypoint 선택 중...
완료: (1757, 106, 54)


In [None]:
# 5. confidence 0 구간 interpolation
def interpolate_low_confidence(frames, th=0.5):
    T,F = frames.shape
    P = F//3
    arr = frames.reshape(T,P,3)

    for p in range(P):
        for d in range(2):
            coords = arr[:,p,d]
            conf   = arr[:,p,2]

            low = np.where(conf < th)[0]
            for idx in low:
                prev = idx-1
                next = idx+1
                while prev>=0 and arr[prev,p,2]<th: prev-=1
                while next<T and arr[next,p,2]<th: next+=1
                if prev<0 or next>=T: continue
                arr[idx,p,d] = (arr[prev,p,d]+arr[next,p,d])/2
                arr[idx,p,2] = max(arr[prev,p,2], arr[next,p,2])

    return arr.reshape(T,F)

print("보간 시작...")
X_interp = np.zeros_like(X_selected)
for i in range(len(X_selected)):
    X_interp[i] = interpolate_low_confidence(X_selected[i])
print("보간 완료:", X_interp.shape)


보간 시작...
보간 완료: (1757, 106, 54)


In [None]:
# 6. Label 인코딩
from sklearn.preprocessing import LabelEncoder

labels_text = np.array([item["label"][0]["name"] for item in data_json])
encoder = LabelEncoder()
y = encoder.fit_transform(labels_text)

print("총 클래스:", len(encoder.classes_))


총 클래스: 163


In [None]:
# 7. Train/Test split (stratify 없음)
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X_interp, y,
    test_size=0.2,
    random_state=42,
    shuffle=True
)

print("Train:", X_train.shape)
print("Test :", X_test.shape)



Train: (1405, 106, 54)
Test : (352, 106, 54)


In [None]:
# 8. StandardScaler
from sklearn.preprocessing import StandardScaler

def scale_per_feature(X_train, X_test):
    scaler = StandardScaler()

    # F(특징) 기준 scaling
    X_train_scaled = scaler.fit_transform(
        X_train.reshape(-1, X_train.shape[-1])
    ).reshape(X_train.shape)

    X_test_scaled = scaler.transform(
        X_test.reshape(-1, X_test.shape[-1])
    ).reshape(X_test.shape)

    return X_train_scaled, X_test_scaled, scaler

X_train_scaled, X_test_scaled, scaler = scale_per_feature(X_train, X_test)
print("스케일링 완료")


스케일링 완료


In [None]:
# 모델 학습 준비 완료
X_train_final = X_train_scaled
X_test_final = X_test_scaled
y_train_final = y_train
y_test_final = y_test

print("학습 준비 완료")

학습 준비 완료


In [None]:
with open(file_path, 'r') as f:
    data_json = json.load(f)

print("로드 완료. 총 세그먼트 수:", len(data_json))

로드 완료. 총 세그먼트 수: 1757
