In [16]:
import json

import keras.saving
import numpy as np
from glob import glob
import os

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelBinarizer # One-Hot Encoding을 위해 사용

In [None]:
# -----------------------------------------------------
# Mediapipe 33 관절 인덱스에 따른 한국어 관절 이름 매핑 테이블
# JSON에 있는 관절 이름만 포함하며, 없는 관절은 None으로 처리됨
# -----------------------------------------------------
JOINT_NAME_TO_INDEX = {
    # Head & Face
    "코": 0, "왼쪽 눈": 1, "오른쪽 눈": 2,
    # "왼쪽 귀": 7, "오른쪽 귀": 8, # JSON에 있는 관절

    # Shoulders (index 5, 6, 9, 10은 눈/귀 사이의 중간점 등으로 JSON에 없음)
    "왼쪽 귀": 7, "오른쪽 귀": 8,
    "왼쪽 어깨": 11, "오른쪽 어깨": 12,

    # Arms
    "왼쪽 팔꿈치": 13, "오른쪽 팔꿈치": 14,
    "왼쪽 손목": 15, "오른쪽 손목": 16,

    # Hands (Mediapipe는 손가락 끝까지 포함. JSON 데이터와 매핑)
    "왼쪽 엄지 손가락": 17,  # 엄지 끝
    "오른쪽 엄지 손가락": 18,  # 엄지 끝
    "왼쪽 중지 손가락": 19,  # 중지 끝
    "오른쪽 중지 손가락": 20,  # 중지 끝
    # (21, 22는 새끼손가락 끝 등으로, JSON 데이터에 없음)

    # Hips & Trunk
    "왼쪽 엉덩이": 23, "오른쪽 엉덩이": 24,  # 엉덩이

    # Legs
    "왼쪽 무릎": 25, "오른쪽 무릎": 26,
    "왼쪽 발목": 27, "오른쪽 발목": 28,
    "왼쪽 뒷꿈치": 29, "오른쪽 뒷꿈치": 30,  # 발꿈치
    "왼쪽 엄지 발가락": 31, "오른쪽 엄지 발가락": 32,  # 엄지발가락 끝
    # "왼쪽 새끼 발가락" 및 "오른쪽 새끼 발가락"은 Mediapipe 인덱스 31, 32 다음으로 매핑될 수 있지만,
    # Mediapipe 표준 33개 인덱스를 초과하지 않도록 주의. (31/32에 엄지 발가락만 매핑하고 나머지는 제외)
    # JSON에 있는 '왼쪽 새끼 발가락', '오른쪽 새끼 발가락'은 33개 인덱스 밖이므로 제외하거나, 인덱스 31, 32와 가까운 다른 곳에 매핑해야 함

    # JSON에 있지만 Mediapipe 33에 포함 안되는 항목 (또는 다른 인덱스에 해당)
    # 여기서는 JSON에 명시된 '가운데 엉덩이'를 P_ref 계산 시 사용했으므로, 배열에는 0으로 남겨둡니다.
    # '목'은 인덱스 1 (코) 근처의 중간점으로 추정할 수 있으나, 표준을 위해 제외합니다.
}

def normalize(raw_skeleton):
    normalized_skeleton = []
    for frame in raw_skeleton:
        P_ref = (frame[23] + frame[24]) / 2
        P_prime = frame - P_ref
        L = np.linalg.norm(frame[11] - frame[12])
        if L > 1e-6:
            P_double_prime = P_prime / L
        else:
            P_double_prime = P_prime  # 정규화하지 않음

        normalized_skeleton.append(P_double_prime)
    return np.array(normalized_skeleton)

def load_skeleton_from_json(json_file):
    with open(json_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    pose_data = data["labelingInfo"][0]["pose"]["location"]

    skeleton_array = np.zeros((33, 2), dtype=np.float32)

    # JSON 데이터를 순회하며 매핑 및 저장
    for k_name, v_coords in pose_data.items():

        # 1. 한국어 이름이 매핑 테이블에 있는지 확인
        if k_name in JOINT_NAME_TO_INDEX:
            index = JOINT_NAME_TO_INDEX[k_name]

            # 2. 좌표를 float으로 변환
            x = float(v_coords["x"])
            y = float(v_coords["y"])

            # 3. 해당 인덱스에 저장
            skeleton_array[index, 0] = x
            skeleton_array[index, 1] = y

        # (주의: JSON에 '가운데 엉덩이'가 있지만, 이는 정규화 시 P_ref 계산에 사용될 뿐,
        # Mediapipe 표준 인덱스에는 명시적으로 존재하지 않으므로 배열에 직접 저장하지 않습니다.)

    return skeleton_array

def load_and_preprocess_sequence_data(data_root):
    X_sequences = []
    Y_labels = []

    label_map = {
        '기본준비': 0,
        '앞굽이하고 아래막고 지르기': 1,
        '앞굽이하고 아래막기': 2,
        '앞굽이하고 지르기': 3,
        '앞서고 아래막기': 4,
        '앞서고 안막기': 5,
        '앞서고 얼굴막기': 6,
        '앞서고 지르기': 7,
        '앞차고 앞서고 지르기': 8
    }

    FRAME_TYPES = ['S01', 'M01', 'E01']

    grouped_data = {}


    json_files = glob(os.path.join(data_root, '**', '*.json'), recursive=True)

    for file_path in json_files:

        # 파일 경로에서 정보 추출 (경로를 역순으로 파싱)
        parts = file_path.split(os.sep)

        # 동작명은 태극1장 다음 폴더명 (예: '기본준비')
        action_name = parts[-5]  # 태극 1장 폴더명에서 5번째 상위 폴더

        filename = parts[-1]  # 파일명 (P-001-001-A035-M-B2006-S-20211109-05-01-M01.json)

        # 파일명에서 정보 추출
        name_parts = filename.split('-')

        # 시도 ID: P-001-...-05 부분
        attempt_id = '-'.join(name_parts[0:8])

        # 시점 번호: 01, 02, ..., 08
        view_id = name_parts[-2]

        # 프레임 타입: M01.json -> M01
        frame_type = name_parts[-1].split('.')[0]

        if frame_type in FRAME_TYPES:
            key = (action_name, attempt_id, view_id)

            if key not in grouped_data:
                grouped_data[key] = {}

            grouped_data[key][frame_type] = file_path

    # ------------------
    # 2. 시퀀스 구성 및 정규화
    # ------------------

    for (action_name, attempt_id, view_id), paths in grouped_data.items():
        # M01(Start), M02(Middle), M03(End) 세 프레임 경로가 모두 있는지 확인
        if all(ft in paths for ft in FRAME_TYPES):

            # 각 프레임의 데이터를 로드합니다.
            frame_data = []
            for ft in FRAME_TYPES:
                # 사용자가 제공한 load_skeleton_from_json 함수 사용
                data = load_skeleton_from_json(paths[ft])
                frame_data.append(data)

            # (3, 33, 2) 형태의 시퀀스 생성
            raw_sequence = np.stack(frame_data, axis=0)

            # 정규화 수행
            normalized_sequence = normalize(raw_sequence)

            # 데이터와 레이블 저장
            X_sequences.append(normalized_sequence)
            Y_labels.append(label_map[action_name])

    return np.array(X_sequences), np.array(Y_labels), label_map


Code

Load, Normalize, Preprocess

In [None]:
ROOT_DIR = r"C:\Users\gony4\Desktop\태권도 태극 1장"
X_sequences, Y_labels, label_map = load_and_preprocess_sequence_data(ROOT_DIR)

In [None]:
unique, counts = np.unique(Y_labels, return_counts=True)
class_counts = dict(zip(unique, counts))

# 레이블 인덱스를 실제 동작명으로 다시 매핑 (출력 가독성 향상)
# label_map을 뒤집어서 {0: '기본준비', 1: '앞굽이하고 아래막고 지르기', ...} 형태로 만듭니다.
index_to_label = {v: k for k, v in label_map.items()}

print("--- 구분동작별 샘플 개수 ---")
for index, count in class_counts.items():
    label_name = index_to_label.get(index, f"Unknown Index {index}")
    print(f"[{index}] {label_name}: {count} 개")

print(f"\n총 샘플 수: {len(Y_labels)} 개")

In [None]:
X_FILE_NAME = 'X_taekwondo_sequence_3frame_33joint.npy'
Y_FILE_NAME = 'Y_taekwondo_labels.npy'

# numpy.save를 사용하여 배열을 바이너리 파일로 저장
np.save(X_FILE_NAME, X_sequences)
np.save(Y_FILE_NAME, Y_labels)

print(f"✅ X 데이터가 '{X_FILE_NAME}' 파일로 저장되었습니다.")
print(f"✅ Y 데이터가 '{Y_FILE_NAME}' 파일로 저장되었습니다.")

In [17]:
# 저장된 파일 로드 (다음 세션 시작 시 사용)
X_FILE_NAME = 'X_taekwondo_sequence_3frame_33joint.npy'
Y_FILE_NAME = 'Y_taekwondo_labels.npy'

X_data = np.load(X_FILE_NAME)
Y_data = np.load(Y_FILE_NAME)

label_map = {
        '기본준비': 0,
        '앞굽이하고 아래막고 지르기': 1,
        '앞굽이하고 아래막기': 2,
        '앞굽이하고 지르기': 3,
        '앞서고 아래막기': 4,
        '앞서고 안막기': 5,
        '앞서고 얼굴막기': 6,
        '앞서고 지르기': 7,
        '앞차고 앞서고 지르기': 8
    }

print(f"✅ 데이터 로드 완료. X_data shape: {X_data.shape}")

# 이제 X_data_loaded와 Y_data_loaded를 사용하여 모델 학습을 바로 시작할 수 있습니다.

✅ 데이터 로드 완료. X_data shape: (19983, 3, 33, 2)


LSTM

In [18]:
# X_data: (N, 3, 33, 2) 형태의 정규화된 스켈레톤 시퀀스
# Y_data: (N,) 형태의 정수형 레이블

# 1. 입력 데이터 형태 변환 (Flatten the features)
# (N, 3, 33, 2) -> (N, 3, 66)
N = X_data.shape[0]
X_processed = X_data.reshape(N, 3, 33 * 2)

# 2. 레이블 One-Hot Encoding
# (N,) -> (N, N_classes)
N_CLASSES = len(label_map)
lb = LabelBinarizer()
Y_one_hot = lb.fit_transform(Y_data)
# Y_one_hot.shape는 (N, 9)가 됩니다.

# 3. 학습 및 검증 데이터 분리 (80% 학습, 20% 검증)
X_train, X_val, Y_train, Y_val = train_test_split(
    X_processed, Y_one_hot, test_size=0.2, random_state=42, stratify=Y_data # stratify로 클래스 비율 유지
)

# 4. 클래스 가중치 계산 (Class Weighting)
# 클래스 불균형 문제를 해결하기 위해 사용
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(Y_data),
    y=Y_data
)
class_weight_dict = dict(enumerate(class_weights))

print(f"✅ 최종 학습 데이터 형태: {X_train.shape}")
print(f"✅ 클래스 가중치 딕셔너리: {class_weight_dict}")

✅ 최종 학습 데이터 형태: (15986, 3, 66)
✅ 클래스 가중치 딕셔너리: {0: np.float64(0.5374808359557814), 1: np.float64(2.7963895885810244), 2: np.float64(1.3613325158389535), 3: np.float64(0.842951151607188), 4: np.float64(0.6147102251753415), 5: np.float64(0.8312741794583801), 6: np.float64(1.4399048854301773), 7: np.float64(1.6325980392156862), 8: np.float64(1.380804311774461)}


In [27]:
TIMESTEPS = X_train.shape[1] # 3
INPUT_DIM = X_train.shape[2] # 66

model = Sequential([
    # Input Shape: (timesteps, input_dim) = (3, 66)
    # return_sequences=False: 시퀀스 전체가 아닌 마지막 시점의 출력만 사용
    tf.keras.layers.InputLayer(shape=(TIMESTEPS, INPUT_DIM)),

    LSTM(units=128),

    # 과적합 방지를 위한 Dropout
    Dropout(0.3),

    # 분류를 위한 Dense Layer
    Dense(units=64, activation='relu'),

    # 최종 출력 Layer: 클래스 개수만큼의 유닛, softmax로 확률 출력
    Dense(N_CLASSES, activation='softmax')
])

# 모델 컴파일
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy', # 다중 분류에 적합한 손실 함수
    metrics=['accuracy']
)

model.summary()

In [28]:
# 학습 설정
EPOCHS = 50
BATCH_SIZE = 32

history = model.fit(
    X_train,
    Y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_val, Y_val),
    class_weight=class_weight_dict, # ⭐ 클래스 가중치 적용!
    verbose=1
)

print("\n✅ 모델 학습 완료.")

Epoch 1/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step - accuracy: 0.5563 - loss: 1.2122 - val_accuracy: 0.8596 - val_loss: 0.4260
Epoch 2/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.8494 - loss: 0.4221 - val_accuracy: 0.9054 - val_loss: 0.3034
Epoch 3/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.8875 - loss: 0.3114 - val_accuracy: 0.9072 - val_loss: 0.2534
Epoch 4/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.8978 - loss: 0.2738 - val_accuracy: 0.9432 - val_loss: 0.1729
Epoch 5/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9135 - loss: 0.2347 - val_accuracy: 0.9340 - val_loss: 0.1841
Epoch 6/50
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9234 - loss: 0.2095 - val_accuracy: 0.9542 - val_loss: 0.1424
Epoch 7/50
[1m500/500[0m 

In [29]:
loss, accuracy = model.evaluate(X_val, Y_val, verbose=0)

print(f"✅ Validation Loss: {loss:.4f}")
print(f"✅ Validation Accuracy: {accuracy:.4f}")

✅ Validation Loss: 0.0710
✅ Validation Accuracy: 0.9770


In [30]:
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd

# 검증 데이터에 대한 예측 확률 얻기
Y_pred_probs = model.predict(X_val)

# 예측 확률을 정수형 레이블로 변환
Y_pred = np.argmax(Y_pred_probs, axis=1)

# 실제 레이블 (원-핫 인코딩 -> 정수형 레이블)
Y_true = np.argmax(Y_val, axis=1)

[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step


In [33]:
# label_map을 사용하여 Target names를 생성 (index_to_label을 사용해야 함)
# 가정: index_to_label = {0: '기본준비', 1: '앞굽이하고 아래막고 지르기', ...}
# 실제 label_map을 뒤집어서 사용해야 합니다.
index_to_label = {v: k for k, v in label_map.items()}
sorted_class_names = [index_to_label[i] for i in sorted(index_to_label.keys())]

print("\n--- 분류 보고서 (Classification Report) ---")
print(classification_report(Y_true, Y_pred, target_names=sorted_class_names))


--- 분류 보고서 (Classification Report) ---
                precision    recall  f1-score   support

          기본준비       1.00      0.99      0.99       826
앞굽이하고 아래막고 지르기       0.98      0.97      0.98       159
    앞굽이하고 아래막기       0.94      0.99      0.96       326
     앞굽이하고 지르기       0.96      0.99      0.97       527
      앞서고 아래막기       0.99      0.95      0.97       723
       앞서고 안막기       0.98      0.97      0.98       534
      앞서고 얼굴막기       0.99      1.00      0.99       308
       앞서고 지르기       0.90      0.94      0.92       272
   앞차고 앞서고 지르기       1.00      1.00      1.00       322

      accuracy                           0.98      3997
     macro avg       0.97      0.98      0.97      3997
  weighted avg       0.98      0.98      0.98      3997



In [35]:
import keras

# HDF5 형식으로 모델 저장
MODEL_SAVE_PATH = 'taekwondo_lstm_model.h5'
# model.save(MODEL_SAVE_PATH, save_format='h5')
keras.saving.save_model(model, MODEL_SAVE_PATH)

print(f"✅ 모델이 '{MODEL_SAVE_PATH}' 경로에 성공적으로 저장되었습니다.")



✅ 모델이 'taekwondo_lstm_model.h5' 경로에 성공적으로 저장되었습니다.


In [37]:
model.save('taekwondo_model.h5')
print("✅ 모델이 네이티브 Keras 형식(.keras)으로 저장되었습니다.")



✅ 모델이 네이티브 Keras 형식(.keras)으로 저장되었습니다.


In [None]:
tfjs_target_dir = 'tfjs_output'

tfjs.converters.save_keras_model(model, tfjs_target_dir)