In [None]:
# 경로 설정
DATASET_PATH = "C:\\Users\\chaeyeonhan\\OneDrive\\2025-1\\capstone_design"
VIDEO_PATH = f"{DATASET_PATH}/data/top100_videos_only"
TEST_VIDEO_PATH = f"{DATASET_PATH}/data/test_videos"
LABEL_EXCEL = f"{DATASET_PATH}/data/KETI_top100_clean_words.xlsx"
TEST_LABEL_EXCEL = f"{DATASET_PATH}/data/test_data.xlsx"

# 전처리


In [None]:
# 01. label 불러오기 및 영상번호 매핑
import pandas as pd
import os
import unicodedata

def load_label_info(label_excel_path):
    label_df = pd.read_excel(label_excel_path)
    label_df['파일ID'] = label_df['파일명'].apply(lambda x: os.path.splitext(x)[0])
    video_to_label = dict(zip(label_df['파일ID'], label_df['한국어']))
    labels = sorted(unicodedata.normalize("NFC", l) for l in label_df['한국어'].unique())
    label_map = {label: idx for idx, label in enumerate(labels)}
    return label_df, label_map, video_to_label

label_df, label_map, video_to_label = load_label_info(LABEL_EXCEL)
test_label_df, test_label_map, test_video_to_label = load_label_info(TEST_LABEL_EXCEL)

In [None]:
#샘플링
def sample_paths(npy_files, target_len=60):
    total_len = len(npy_files)
    if total_len >= target_len:
        idxs = np.linspace(0, total_len - 1, target_len).astype(int)
        return [npy_files[i] for i in idxs]
    else:
        pad_count = target_len - total_len
        return npy_files + [npy_files[-1]] * pad_count

In [None]:
#샘플링할 때 datalist만드는 함수
import numpy as np
import os
import unicodedata

MP_DATA_PATH = "C:\\Users\\chaeyeonhan\\OneDrive\\2025-1\\capstone_design\\data\\MP_Data"
MP_DATA_PATH_TEST = "C:\\Users\\chaeyeonhan\\OneDrive\\2025-1\\capstone_design\\data\\MP_Data_Test"

def process_datalist(data_path, sequence_length=60):
    data_list = []
    for label in os.listdir(data_path):
        label = unicodedata.normalize("NFC", label)
        label_path = os.path.join(data_path, label)
        if not os.path.isdir(label_path):
            continue
        for video_id in os.listdir(label_path):
            video_path = os.path.join(label_path, video_id)
            npy_files = sorted(
                [os.path.join(video_path, f) for f in os.listdir(video_path) if f.endswith(".npy")]
            )

            if len(npy_files) == 0:
                continue

            selected_files = sample_paths(npy_files, target_len=sequence_length)

            label_idx = label_map.get(label)
            if label_idx is None:
                print(f"Warning: Label '{label}' not found in label_map. Skipping this video.")
                continue

            data_list.append((selected_files, label_idx))

    return data_list

train_data = process_datalist(MP_DATA_PATH)
test_data = process_datalist(MP_DATA_PATH_TEST)

In [None]:
#데이터 증강강
import os
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Input, Masking, Attention, Bidirectional,GlobalAveragePooling1D, BatchNormalization
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping

### 증강 함수들
def add_gaussian_noise(data, std=0.01):
    return data + np.random.normal(0, std, data.shape)

def time_masking(data, max_mask_ratio=0.2):
    masked = data.copy()
    num_frames = data.shape[0]
    num_mask = int(num_frames * max_mask_ratio)
    mask_indices = np.random.choice(num_frames, num_mask, replace=False)
    masked[mask_indices] = 0
    return masked

def normalize_frames(frames):
    return [(f - np.mean(f)) / (np.std(f) + 1e-6) for f in frames]

def smooth_labels(labels, smoothing=0.1):
    n_classes = labels.shape[1]
    return labels * (1 - smoothing) + (smoothing / n_classes)

#속도 왜곡
def time_warping(frames, stretch_factor=0.1):
    length = len(frames)
    factor = 1 + np.random.uniform(-stretch_factor, stretch_factor)
    indices = np.linspace(0, length-1, int(length * factor)).astype(int)
    indices = np.clip(indices, 0, length-1)
    return np.array(frames)[indices]

#각 keypoint 위치에 변동 추가
def jitter(frames, sigma=0.01):
    return frames + np.random.normal(loc=0, scale=sigma, size=frames.shape)


#프레임 밀기
def temporal_shift(seq, shift_max=5):
    shift = np.random.randint(-shift_max, shift_max + 1)
    if shift > 0:
        return np.pad(seq[:-shift], ((shift, 0), (0, 0)), mode='edge')
    elif shift < 0:
        return np.pad(seq[-shift:], ((0, -shift), (0, 0)), mode='edge')
    else:
        return seq


###  데이터 생성 + 증강 통합 함수
def make_augmented_dataset(split_data, augment_multiplier=10, noise_std=0.01):
    X, y = [], []
    for npy_files, label in tqdm(split_data):
        frames = [np.load(npy_file) for npy_file in npy_files]

        frames = sample_paths(frames, target_len=60)

        frames = normalize_frames(frames)
        original = np.stack(frames)  # shape: (60, 1629)
        X.append(original)
        y.append(label)

        aug_funcs = [add_gaussian_noise, time_masking, temporal_shift, jitter]

        for _ in range(augment_multiplier):
            aug = original.copy()
            # 무작위로 1~2개 증강 선택 후 순서대로 적용
            selected_augs = np.random.choice(aug_funcs, size=np.random.randint(1, 3), replace=False)
            for func in selected_augs:
                aug = func(aug)

            # 프레임 수 보정 및 정규화
            aug = sample_paths([f for f in aug], target_len=60)
            aug = normalize_frames(aug)
            aug = np.stack(aug)

            X.append(aug)
            y.append(label)

      

    return np.array(X), np.array(y)




# 학습

## PCA의 최적 K값 찾기

In [None]:
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import numpy as np

def find_optimal_k_with_pca(X, threshold=0.95):
    N, T, D = X.shape
    X_flat = X.reshape(-1, D)

    # PCA 전체 주성분 학습
    pca = PCA(n_components=D)
    pca.fit(X_flat)

    # 누적 분산 설명 비율
    cum_var_ratio = np.cumsum(pca.explained_variance_ratio_)

    # 임계값 이상이 되는 k 찾기
    optimal_k = np.argmax(cum_var_ratio >= threshold) + 1

    # 시각화
    plt.figure(figsize=(8, 5))
    plt.plot(cum_var_ratio, marker='o')
    plt.axhline(y=threshold, color='r', linestyle='--')
    plt.axvline(x=optimal_k, color='g', linestyle='--')
    plt.title("Cumulative Explained Variance Ratio")
    plt.xlabel("Number of Components (k)")
    plt.ylabel("Cumulative Variance")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    print(f" {threshold*100:.0f}% 분산을 설명하기 위한 최적의 k: {optimal_k}")
    return optimal_k, pca


In [None]:
train, val = train_test_split(train_data, test_size=0.2, random_state=42,
                              stratify=[label for _, label in train_data])

X_train_raw, y_train = make_augmented_dataset(train, augment_multiplier=10)
X_val_raw, y_val = make_augmented_dataset(val, augment_multiplier=0)

y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_train = smooth_labels(y_train, smoothing=0.1)

In [None]:
optimal_k, pca_model = find_optimal_k_with_pca(X_train_raw, threshold=0.95)

## 찾은 K로 PCA 적용해 학습 -> 하이퍼파라미터 튜너 적용

In [None]:
from keras_tuner import HyperModel  # 수정됨
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, GRU, Dense, Dropout, Input, LayerNormalization, Bidirectional, Attention, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.decomposition import PCA
import joblib
import os
import numpy as np
import pandas as pd

# 1. 데이터 준비
best_k = 450  # PCA로 줄인 feature 차원

# 2. PCA 적용
N_train, T, D = X_train_raw.shape
X_train_flat = X_train_raw.reshape(-1, D)
pca = PCA(n_components=best_k)
X_train_pca_flat = pca.fit_transform(X_train_flat)
X_train_k = X_train_pca_flat.reshape(N_train, T, best_k)

N_val = X_val_raw.shape[0]
X_val_flat = X_val_raw.reshape(-1, D)
X_val_pca_flat = pca.transform(X_val_flat)
X_val_k = X_val_pca_flat.reshape(N_val, T, best_k)

# 3. 라벨 처리
y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_train = smooth_labels(y_train, smoothing=0.1)
num_classes = y_train.shape[1]
input_shape = (60, best_k)

# 4. 튜닝 가능한 모델 정의
class LSTMGRUHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes

    def build(self, hp):
        inputs = Input(shape=self.input_shape)

        x = Bidirectional(LSTM(units=hp.Int('lstm_units', 32, 128, step=32), return_sequences=True))(inputs)
        x = LayerNormalization()(x)
        x = Dropout(hp.Float('lstm_dropout', 0.2, 0.5, step=0.1))(x)
        
        x = GRU(units=hp.Int('gru_units', 32, 128, step=32), return_sequences=True)(x)
        x = LayerNormalization()(x)
        x = Dropout(hp.Float('gru_dropout', 0.2, 0.5, step=0.1))(x)

        x = Attention()([x, x])
        x = GlobalAveragePooling1D()(x)

        x = Dense(units=hp.Int('dense_units', 32, 128, step=32), activation='relu')(x)
        x = Dropout(hp.Float('gru_dropout', 0.2, 0.5, step=0.1))(x)
        outputs = Dense(self.num_classes, activation='softmax')(x)

        model = Model(inputs, outputs)
        model.compile(
            optimizer=Adam(hp.Float('lr', 1e-4, 1e-2, sampling='log')),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        return model

# 5. 튜너 설정 및 탐색 시작
hypermodel = LSTMGRUHyperModel(input_shape=input_shape, num_classes=num_classes)

tuner = BayesianOptimization(
    hypermodel,
    objective='val_accuracy',
    max_trials=15,
    executions_per_trial=1,
    directory='keras_tuner_dir',
    project_name=f'sign_tune_pca{best_k}',
    overwrite=True
)

lr_schedule = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-5,
    verbose=1
)

tuner.search(X_train_k, y_train,
             validation_data=(X_val_k, y_val),
             epochs=150,
             batch_size=64,
             callbacks=[EarlyStopping(patience=10, restore_best_weights=True), lr_schedule],
             verbose=1)

# 6. 튜닝 결과 저장
def extract_trials_to_dataframe(tuner, sort_by="score", top_n=None):
    trial_data = []
    for trial in tuner.oracle.trials.values():
        row = trial.hyperparameters.values.copy()
        row["score"] = trial.score
        trial_data.append(row)
    df = pd.DataFrame(trial_data)
    df = df.sort_values(by=sort_by, ascending=False).reset_index(drop=True)
    if top_n:
        df = df.head(top_n)
    return df

df = extract_trials_to_dataframe(tuner, top_n=15)
df.to_csv("tuner_results_pca.csv", index=False)

# 7. 최적 모델/하이퍼파라미터 저장
best_model = tuner.get_best_models(1)[0]
best_hp = tuner.get_best_hyperparameters(1)[0]
joblib.dump(pca, f"{DATASET_PATH}/pca_{best_k}.joblib")

val_acc = best_model.evaluate(X_val_k, y_val, verbose=0)[1]
print(f" [PCA {best_k}] 튜너 기반 최적 모델의 검증 정확도: {val_acc:.4f}")
print(" Best Hyperparameters:", best_hp.values)

if not os.path.exists(f"{DATASET_PATH}/models"):
    os.makedirs(f"{DATASET_PATH}/models")
best_model.save(f"{DATASET_PATH}/models/best_pca{best_k}_hypertuning.keras") from keras_tuner.tuners import BayesianOptimization



## 찾은 best hyperparameter로 다시 학습

In [None]:
from sklearn.decomposition import PCA
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LayerNormalization

# 1. 증강 데이터 생성
train, val = train_test_split(train_data, test_size=0.2, random_state=42,
                              stratify=[label for _, label in train_data])
X_train_aug_raw, y_train_aug = make_augmented_dataset(train, augment_multiplier=10)
X_val_raw, y_val = make_augmented_dataset(val, augment_multiplier=0)

# 2. PCA 450차원 적용
N_train, T, D = X_train_aug_raw.shape
X_train_flat = X_train_aug_raw.reshape(-1, D)

pca = PCA(n_components=450)
X_train_pca_flat = pca.fit_transform(X_train_flat)
X_train_aug = X_train_pca_flat.reshape(N_train, T, 450)

# validation 데이터도 같은 PCA로 변환
N_val = X_val_raw.shape[0]
X_val_flat = X_val_raw.reshape(-1, D)
X_val_pca_flat = pca.transform(X_val_flat)
X_val = X_val_pca_flat.reshape(N_val, T, 450)

# 3. 라벨 처리
num_classes = y_train.shape[1]
y_train_aug = to_categorical(y_train_aug, num_classes=num_classes)
y_train_aug = smooth_labels(y_train_aug, smoothing=0.1)
y_val = to_categorical(y_val, num_classes=num_classes)

# 4. best_hp 기반 모델 정의 (input shape 수정)
input_shape = (60, 450)
model_input = Input(shape=input_shape)

x = Bidirectional(LSTM(units=best_hp.get('lstm_units'), return_sequences=True))(model_input)
x = BatchNormalization()(x)
x = Dropout(best_hp.get('lstm_dropout'))(x)

x = GRU(units=best_hp.get('gru_units'), return_sequences=True)(x)
x = BatchNormalization()(x)
x = Dropout(best_hp.get('gru_dropout'))(x)

x = Attention()([x, x])
x = GlobalAveragePooling1D()(x)

x = Dense(units=best_hp.get('dense_units'), activation='relu')(x)
x = Dropout(0.3)(x)
output = Dense(num_classes, activation='softmax')(x)

model = Model(model_input, output)

model.compile(
    optimizer=Adam(learning_rate=best_hp.get('lr')),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# 5. 학습
early_stop = EarlyStopping(patience=10, restore_best_weights=True)
lr_schedule = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5, verbose=1)

history = model.fit(
    X_train_aug, y_train_aug,
    validation_data=(X_val, y_val),
    epochs=150,
    batch_size=64,
    callbacks=[early_stop, lr_schedule],
    verbose=1
)
joblib.dump(pca, f"{DATASET_PATH}/pca_{best_k}_retrain.joblib")
model.save('best_model_pca450.keras')


# 테스트

In [None]:
import joblib
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import to_categorical

# 1. PCA 및 모델 로드
pca = joblib.load(f"{DATASET_PATH}/pca_450.joblib")
model = load_model(f"{DATASET_PATH}/models/best_pca450_hypertuning.keras")

# 2. test 데이터 준비 및 PCA 적용
X_test_raw, y_test = make_augmented_dataset(test_data, augment_multiplier=0)
N_test, T, D = X_test_raw.shape
X_test_flat = X_test_raw.reshape(-1, D)
X_test_pca_flat = pca.transform(X_test_flat)
X_test = X_test_pca_flat.reshape(N_test, T, 450)

# 3. 라벨 처리
y_test = to_categorical(y_test, num_classes=num_classes)

# 4. 평가
loss, acc = model.evaluate(X_test, y_test, verbose=0)
print(f"✅ PCA 기반 테스트 정확도: {acc:.4f}")


## 시각화

In [None]:
# PCA 테스트
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='Train')
plt.plot(history.history['val_accuracy'], label='Val')
plt.legend(); plt.title('Accuracy')