In [1]:
# train.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import random

In [2]:
# -----------------------------
# 1. Fix random seed
# -----------------------------
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

In [3]:
# -----------------------------
# 2. Config
# -----------------------------
DATA_DIR = r"D:\Download\data\PoseEstimation\NTU\nturgb+d_skeletons"
EPOCHS = 1
BATCH_SIZE = 32
LR = 1e-3
SEQ_LEN = 30      # số frame lấy cho mỗi sample
INPUT_SIZE = 75   # 25 joints * 3 tọa độ
HIDDEN_SIZE = 128
NUM_LAYERS = 6
NUM_CLASSES = 6
MODEL_PATH = r"D:\code_etc\Python\_File_code\Pose_estimation\1_epochs_36.pth"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# -----------------------------
# 3. Dataset class
# -----------------------------
class NTUSkeletonDataset(Dataset):
    def __init__(self, data_dir, seq_len=30):
        self.samples = []
        self.labels = []
        self.seq_len = seq_len

        # map từ tên class sang nhãn
        self.class_map = {
            "lying": 0,
            "walking": 1,
            "standing": 2,
            "jumping": 3,
            "running": 4,
            "carrying": 5,
        }

        # duyệt toàn bộ file .skeleton
        for fname in os.listdir(data_dir):
            if not fname.endswith(".skeleton"):
                continue

            # lấy action ID từ tên file
            # Format: S001C001P001R001A001.skeleton → A001 = action ID
            action_id = int(fname.split("A")[1][:3])

            # chọn 6 class theo action ID
            label = self.map_action_to_label(action_id)
            if label is None:
                continue

            fpath = os.path.join(data_dir, fname)
            data = self.parse_skeleton_file(fpath)

            if len(data) >= self.seq_len:
                # lấy ngẫu nhiên 1 đoạn seq_len frame
                start = random.randint(0, len(data) - self.seq_len)
                clip = data[start:start+self.seq_len]
                self.samples.append(clip)
                self.labels.append(label)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        X = np.array(self.samples[idx], dtype=np.float32)   # (seq_len, 75)
        y = self.labels[idx]
        return torch.tensor(X), torch.tensor(y)

    def parse_skeleton_file(self, path):
        frames = []
        with open(path, 'r') as f:
            num_frames = int(f.readline())
            for _ in range(num_frames):
                num_bodies = int(f.readline())
                joints = []
                for _ in range(num_bodies):
                    _ = f.readline()  # skip body info
                    num_joints = int(f.readline())
                    for _ in range(num_joints):
                        x, y, z, *_ = map(float, f.readline().split())
                        joints.extend([x, y, z])
                if len(joints) >= 75:
                    frames.append(joints[:75])  # chỉ lấy 25 khớp đầu
        return frames

    def map_action_to_label(self, action_id):
        # mapping ID → 6 class (tham khảo NTU action list)
        lying_ids = [15, 79]       # ví dụ: lying down, lying
        walking_ids = [2, 55]      # walking
        standing_ids = [1, 48]     # standing up / still
        jumping_ids = [26, 46]     # jumping
        running_ids = [3, 19]      # running
        carrying_ids = [50, 51]    # carrying

        if action_id in lying_ids:
            return self.class_map["lying"]
        elif action_id in walking_ids:
            return self.class_map["walking"]
        elif action_id in standing_ids:
            return self.class_map["standing"]
        elif action_id in jumping_ids:
            return self.class_map["jumping"]
        elif action_id in running_ids:
            return self.class_map["running"]
        elif action_id in carrying_ids:
            return self.class_map["carrying"]
        else:
            return None

In [5]:
# -----------------------------
# 4. Model
# -----------------------------
class ActionLSTM_FC(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(ActionLSTM_FC, self).__init__()
        
        # 1. LSTM block (2 layers)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            batch_first=True, dropout=0.5)
        
        # 2. Fully connected layers (4 tầng)
        self.fc1 = nn.Linear(hidden_size, 256)
        self.bn1 = nn.BatchNorm1d(256)
        
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        
        self.fc3 = nn.Linear(128, 64)
        self.bn3 = nn.BatchNorm1d(64)
        
        self.fc4 = nn.Linear(64, 32)
        self.bn4 = nn.BatchNorm1d(32)
        
        # 3. Output layer
        self.fc_out = nn.Linear(32, num_classes)
        
        # Dropout
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # LSTM
        out, _ = self.lstm(x)  # (batch, seq_len, hidden_size)
        out = out[:, -1, :]    # lấy hidden state cuối cùng
        
        # FC stack
        out = self.dropout(torch.relu(self.bn1(self.fc1(out))))
        out = self.dropout(torch.relu(self.bn2(self.fc2(out))))
        out = self.dropout(torch.relu(self.bn3(self.fc3(out))))
        out = self.dropout(torch.relu(self.bn4(self.fc4(out))))
        
        out = self.fc_out(out)
        return out


In [6]:
# -----------------------------
# 5. Training
# -----------------------------
def train():
    dataset = NTUSkeletonDataset(DATA_DIR, seq_len=SEQ_LEN)
    print(f"Loaded {len(dataset)} samples")

    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_ds, val_ds = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE)

    model = ActionLSTM_FC(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, NUM_CLASSES).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)

    for epoch in range(EPOCHS):
        # ---- train ----
        model.train()
        total_loss, correct, total = 0, 0, 0
        for X, y in train_loader:
            X, y = X.to(DEVICE), y.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(X)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, preds = outputs.max(1)
            correct += (preds == y).sum().item()
            total += y.size(0)

        train_acc = correct / total
        avg_loss = total_loss / len(train_loader)

        # ---- val ----
        model.eval()
        val_correct, val_total = 0, 0
        with torch.no_grad():
            for X, y in val_loader:
                X, y = X.to(DEVICE), y.to(DEVICE)
                outputs = model(X)
                _, preds = outputs.max(1)
                val_correct += (preds == y).sum().item()
                val_total += y.size(0)

        val_acc = val_correct / val_total
        print(f"Epoch [{epoch+1}/{EPOCHS}] Loss: {avg_loss:.4f} Train Acc: {train_acc:.4f} Val Acc: {val_acc:.4f}")

    # save model
    torch.save(model.state_dict(), MODEL_PATH)
    print(f"Model saved at {MODEL_PATH}")

    # test thử 5 sample
    test_samples = [dataset[i] for i in random.sample(range(len(dataset)), 5)]
    model.eval()
    for i, (X, y) in enumerate(test_samples):
        X = X.unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            outputs = model(X)
            _, pred = outputs.max(1)
        print(f"Sample {i+1}: True={y.item()} Pred={pred.item()}")

if __name__ == "__main__":
    train()

Loaded 10371 samples
Epoch [1/1] Loss: 1.8104 Train Acc: 0.1820 Val Acc: 0.1658
Model saved at D:\code_etc\Python\_File_code\Pose_estimation\1_epochs_36.pth
Sample 1: True=4 Pred=5
Sample 2: True=1 Pred=5
Sample 3: True=3 Pred=5
Sample 4: True=4 Pred=5
Sample 5: True=1 Pred=5
