In [103]:
import os
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch
import torch.nn as nn
import torch.optim as optim

# 데이터 경로 설정
base_dir = "C:/Users/Admin/Desktop/data"
categories = ["back", "squat", "side"]

def load_data(base_dir, categories, seq_len=90):
    """
    데이터 로드 및 크기 조정 (패딩 또는 잘라내기).
    Args:
        base_dir (str): 데이터가 저장된 기본 경로.
        categories (list): 행동 카테고리 리스트.
        seq_len (int): 고정된 시퀀스 길이.
    Returns:
        np.array: 데이터 배열.
        np.array: 라벨 배열.
    """
    data = []
    labels = []
    label_map = {category: idx for idx, category in enumerate(categories)}  # 라벨 매핑

    for category in categories:
        category_path = os.path.join(base_dir, category, "좌표")
        print(f"Checking files in: {category_path}")
        for i in range(1, 51):  # keypoints_1 ~ keypoints_50
            file_path = os.path.join(category_path, f"keypoints_{i}.npy")
            if os.path.exists(file_path):
                keypoints = np.load(file_path)  # 데이터 로드
                if keypoints.shape[0] > seq_len:  # 길이가 seq_len보다 길면 잘라내기
                    keypoints = keypoints[:seq_len]
                elif keypoints.shape[0] < seq_len:  # 길이가 seq_len보다 짧으면 패딩
                    pad_width = seq_len - keypoints.shape[0]
                    keypoints = np.pad(keypoints, ((0, pad_width), (0, 0), (0, 0)), mode='constant')
                data.append(keypoints)
                labels.append(label_map[category])
            else:
                print(f"File not found: {file_path}")

    return np.array(data), np.array(labels)

# 수정된 데이터 로드 실행
data, labels = load_data(base_dir, categories)
print(f"Data shape: {data.shape}, Labels shape: {labels.shape}")

Checking files in: C:/Users/Admin/Desktop/data\back\좌표
Checking files in: C:/Users/Admin/Desktop/data\squat\좌표
Checking files in: C:/Users/Admin/Desktop/data\side\좌표
Data shape: (150, 90, 33, 3), Labels shape: (150,)


In [104]:
import os
import numpy as np

# 데이터 경로 및 라벨 매핑
data_paths = {
    "back": "C:/Users/Admin/Desktop/data/back/좌표",
    "squat": "C:/Users/Admin/Desktop/data/squat/좌표",
    "side": "C:/Users/Admin/Desktop/data/side/좌표"
}
labels_map = {"back": 0, "squat": 1, "side": 2}  # 라벨 매핑

def load_data(data_paths, labels_map, seq_len=90):
    """
    데이터 로드 및 크기 조정 (패딩 또는 잘라내기).
    Args:
        data_paths (dict): 카테고리별 데이터 경로.
        labels_map (dict): 카테고리와 라벨 매핑.
        seq_len (int): 고정된 시퀀스 길이.
    Returns:
        np.array: 데이터 배열.
        np.array: 라벨 배열.
    """
    data = []
    labels = []
    for category, path in data_paths.items():
        print(f"Checking files in: {path}")
        for i in range(1, 51):  # keypoints_1 ~ keypoints_50
            file_path = os.path.join(path, f"keypoints_{i}.npy")
            if os.path.exists(file_path):
                keypoints = np.load(file_path)  # 데이터 로드
                if keypoints.shape[0] > seq_len:  # 길이가 seq_len보다 길면 잘라내기
                    keypoints = keypoints[:seq_len]
                elif keypoints.shape[0] < seq_len:  # 길이가 seq_len보다 짧으면 패딩
                    pad_width = seq_len - keypoints.shape[0]
                    keypoints = np.pad(keypoints, ((0, pad_width), (0, 0), (0, 0)), mode='constant')
                data.append(keypoints)
                labels.append(labels_map[category])
            else:
                print(f"File not found: {file_path}")
    return np.array(data), np.array(labels)

In [105]:
import os

for label_name, path in data_paths.items():
    print(f"Checking files in: {path}")
    print(os.listdir(path))

# 데이터 로드
X, y = load_data(data_paths, labels_map)

# 데이터 크기 확인
print("데이터 크기:", X.shape, y.shape)  # Expected output: (30, frames, joints, coords), (30,)

Checking files in: C:/Users/Admin/Desktop/data/back/좌표
['keypoints_1.npy', 'keypoints_10.npy', 'keypoints_11.npy', 'keypoints_12.npy', 'keypoints_13.npy', 'keypoints_14.npy', 'keypoints_15.npy', 'keypoints_16.npy', 'keypoints_17.npy', 'keypoints_18.npy', 'keypoints_19.npy', 'keypoints_2.npy', 'keypoints_20.npy', 'keypoints_21.npy', 'keypoints_22.npy', 'keypoints_23.npy', 'keypoints_24.npy', 'keypoints_25.npy', 'keypoints_26.npy', 'keypoints_27.npy', 'keypoints_28.npy', 'keypoints_29.npy', 'keypoints_3.npy', 'keypoints_30.npy', 'keypoints_31.npy', 'keypoints_32.npy', 'keypoints_33.npy', 'keypoints_34.npy', 'keypoints_35.npy', 'keypoints_36.npy', 'keypoints_37.npy', 'keypoints_38.npy', 'keypoints_39.npy', 'keypoints_4.npy', 'keypoints_40.npy', 'keypoints_41.npy', 'keypoints_42.npy', 'keypoints_43.npy', 'keypoints_44.npy', 'keypoints_45.npy', 'keypoints_46.npy', 'keypoints_47.npy', 'keypoints_48.npy', 'keypoints_49.npy', 'keypoints_5.npy', 'keypoints_50.npy', 'keypoints_6.npy', 'keypoints

In [106]:
def standardize_data(data):
    """
    데이터 표준화 (평균 0, 표준편차 1).
    Args:
        data (np.array): 원본 데이터, Shape: (samples, frames, joints, coords).
    Returns:
        np.array: 표준화된 데이터.
    """
    mean = data.mean(axis=(1, 2, 3), keepdims=True)  # 샘플별 평균 계산
    std = data.std(axis=(1, 2, 3), keepdims=True)  # 샘플별 표준편차 계산
    standardized_data = (data - mean) / (std + 1e-8)  # 표준화
    return standardized_data

# 데이터 표준화 적용
X_standardized = standardize_data(X)

In [107]:
# 데이터 분할 (훈련/검증/테스트 세트)
X_train, X_temp, y_train, y_temp = train_test_split(X_standardized, y, test_size=0.3, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

print(f"Train shape: {X_train.shape}, Validation shape: {X_val.shape}, Test shape: {X_test.shape}")

Train shape: (105, 90, 33, 3), Validation shape: (22, 90, 33, 3), Test shape: (23, 90, 33, 3)


In [108]:
# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)  # LSTM 출력
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력만 사용
        return out
    
# 모델 초기화
input_dim = X_train.shape[2] * X_train.shape[3]  # 관절 수 * 좌표 수
hidden_dim = 256
output_dim = len(labels_map)  # 클래스 수 (등, 스쿼트, 옆구리)
model = LSTMModel(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)

# 데이터 로더 생성
train_dataset = TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train).long())
val_dataset = TensorDataset(torch.tensor(X_val).float(), torch.tensor(y_val).long())
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4)

In [109]:
from sklearn.utils.class_weight import compute_class_weight

# 손실 함수 및 옵티마이저
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 손실 함수 가중치 조정
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
criterion = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float))

In [110]:
from torch.optim.lr_scheduler import StepLR

# 학습률 스케줄러 추가
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

# Early Stopping 기준 변경
early_stopping = EarlyStopping(patience=7, min_delta=0.0001)

# 학습 루프
num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        X_batch = X_batch.view(X_batch.size(0), -1, input_dim)
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    # 학습률 스케줄러 업데이트
    scheduler.step()

    # 검증 단계
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.view(X_batch.size(0), -1, input_dim)
            y_pred = model(X_batch)
            val_loss += criterion(y_pred, y_batch).item()

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss / len(train_loader):.4f}, Validation Loss: {val_loss / len(val_loader):.4f}")

    # Early Stopping 확인
    early_stopping(val_loss / len(val_loader))
    if early_stopping.early_stop:
        print("Early stopping triggered. Training stopped.")
        break


Epoch 1/20, Train Loss: 0.8217, Validation Loss: 0.5522
Epoch 2/20, Train Loss: 0.3649, Validation Loss: 0.1171
Epoch 3/20, Train Loss: 0.1812, Validation Loss: 0.0327
Epoch 4/20, Train Loss: 0.2008, Validation Loss: 0.3607
Epoch 5/20, Train Loss: 0.1132, Validation Loss: 0.1462
Epoch 6/20, Train Loss: 0.0134, Validation Loss: 0.1215
Epoch 7/20, Train Loss: 0.0074, Validation Loss: 0.0904
Epoch 8/20, Train Loss: 0.0049, Validation Loss: 0.0658
Epoch 9/20, Train Loss: 0.0037, Validation Loss: 0.0518
Epoch 10/20, Train Loss: 0.0030, Validation Loss: 0.0395
Early stopping triggered. Training stopped.


In [111]:
print(f"Data min: {X.min()}, Data max: {X.max()}")
print(f"Class distribution: {np.bincount(y)}")

Data min: -0.8864067792892456, Data max: 1.1873269081115723
Class distribution: [50 50 50]


In [112]:
# 테스트 데이터 평가
from sklearn.metrics import classification_report
model.eval()
y_true, y_pred = [], []
test_loader = DataLoader(TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).long()), batch_size=4)
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.view(X_batch.size(0), -1, input_dim)  # 차원 변환
        outputs = model(X_batch)
        _, preds = torch.max(outputs, 1)
        y_true.extend(y_batch.numpy())
        y_pred.extend(preds.numpy())

print(classification_report(y_true, y_pred, target_names=list(labels_map.keys())))

              precision    recall  f1-score   support

        back       1.00      1.00      1.00         7
       squat       0.89      1.00      0.94         8
        side       1.00      0.88      0.93         8

    accuracy                           0.96        23
   macro avg       0.96      0.96      0.96        23
weighted avg       0.96      0.96      0.96        23



In [113]:
from sklearn.metrics import classification_report, confusion_matrix
print(confusion_matrix(y_true, y_pred))

[[7 0 0]
 [0 8 0]
 [0 1 7]]


In [114]:
import torch
import torch.nn as nn
import os
'''
# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)  # LSTM 출력
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력만 사용
        return out

# 모델 초기화
input_dim = 34  # 입력 크기: 34
hidden_dim = 64  # LSTM의 은닉 상태 크기
output_dim = 3  # 출력 클래스 수 (e.g., Back, Squat, Side)
num_layers = 3  # LSTM 레이어 수
model = LSTMModel(input_dim, hidden_dim, output_dim, num_layers)
'''

# TorchScript로 변환
scripted_model = torch.jit.script(model)

# 경로 설정
output_dir = r"C:\Users\Admin\Desktop\data"
os.makedirs(output_dir, exist_ok=True)  # 디렉토리가 없으면 생성
model_path = os.path.join(output_dir, "lstm_model_scripted.pt")

# TorchScript 모델 저장
scripted_model.save(model_path)
print(f"Scripted model saved to {model_path}")


Scripted model saved to C:\Users\Admin\Desktop\data\lstm_model_scripted.pt
