In [1]:
import sys
import os
sys.path.append(os.path.abspath(".."))

In [2]:
import torch.optim as optim
import torch.nn as nn
from core.dataset import YogaDataset
from torch.utils.data import DataLoader

In [8]:
json_folder = "../data/keypoints/public_data"
dataset = YogaDataset(json_folder, max_frames=100)
batch_size = 4
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Kiểm tra 1 batch dữ liệu
for batch in dataloader:
    X_batch, y_batch = batch
    print("X_batch shape:", X_batch.shape)  # (batch_size, max_frames, 33, 3)
    print("y_batch shape:", y_batch.shape)  # (batch_size,)
    print("y_batch values:", y_batch)  # Nhãn của từng sample
    break  # Dừng sau batch đầu tiên

Label map: {'Garland_Pose': 0, 'Happy_Baby_Pose': 1, 'Head_To_Knee_Pose': 2, 'Lunge_Pose': 3}
X_batch shape: torch.Size([4, 100, 33, 3])
y_batch shape: torch.Size([4])
y_batch values: tensor([1, 0, 1, 0])


In [63]:

import copy
import torch

import torch.nn as nn
from typing import Optional
from dataclasses import dataclass
from torch import Tensor
def _get_clones(mod, n):
    return nn.ModuleList([copy.deepcopy(mod) for _ in range(n)])

# @dataclass
class SelfAttention(nn.Module):
    batch_first: bool = True

class SPOTERTransformerDecoderLayer(nn.TransformerDecoderLayer):
    """
    Edited TransformerDecoderLayer implementation omitting the redundant self-attention operation as opposed to the
    standard implementation.
    """

    def __init__(self, d_model, nhead, dim_feedforward, dropout, activation):
        super(SPOTERTransformerDecoderLayer, self).__init__(d_model, nhead, dim_feedforward, dropout, activation)

        # del self.self_attn
        self.self_attn =  SelfAttention()

    def forward(
        self,
        tgt: Tensor,
        memory: Tensor,
        tgt_mask: Optional[Tensor] = None,
        memory_mask: Optional[Tensor] = None,
        tgt_key_padding_mask: Optional[Tensor] = None,
        memory_key_padding_mask: Optional[Tensor] = None,
        tgt_is_causal: bool = False,
        memory_is_causal: bool = False,
    ) -> torch.Tensor:

        tgt = tgt + self.dropout1(tgt)
        tgt = self.norm1(tgt)
        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)

        return tgt


class SPOTER(nn.Module):
    def __init__(self, num_classes, hidden_dim):
        """
        Implementation of the SPOTER (Sign POse-based TransformER) architecture for sign language recognition from sequence
        of skeletal data.
        """
        super().__init__()

        self.input_projection = nn.Linear(100 * 33 * 3, hidden_dim)  # 9900 → 54

        self.row_embed = nn.Parameter(torch.rand(50, hidden_dim))
        self.pos = nn.Parameter(torch.cat([self.row_embed[0].unsqueeze(0).repeat(1, 1, 1)], dim=-1).flatten(0, 1).unsqueeze(0))
        self.class_query = nn.Parameter(torch.rand(1, hidden_dim))
        self.transformer = nn.Transformer(hidden_dim, 9, 6, 6) #hidden_dim, num_heads, layer_encoder, layer_decoder
        self.linear_class = nn.Linear(hidden_dim, num_classes)

        custom_decoder_layer = SPOTERTransformerDecoderLayer(self.transformer.d_model, self.transformer.nhead, 2048, 0.1, "relu")
        self.transformer.decoder.layers = _get_clones(custom_decoder_layer, self.transformer.decoder.num_layers)

    def forward(self, inputs):
        h = inputs.flatten(start_dim=1)  # (batch_size, 9900)
        h = self.input_projection(h)     # (batch_size, hidden_dim=72)
        h = h.unsqueeze(1).float()       # (batch_size, 1, hidden_dim)  [Thêm 1 chiều sequence length]
        
        # Transformer yêu cầu input có shape (seq_len, batch_size, hidden_dim), nên ta cần hoán vị
        h = h.permute(1, 0, 2)  # (1, batch_size, hidden_dim)
        # Đảm bảo class_query có batch_size giống h
        class_query = self.class_query.unsqueeze(1).repeat(1, h.shape[1], 1)  # (1, batch_size, hidden_dim)
        # Đưa vào Transformer
        h = self.transformer(h, class_query)  # (1, batch_size, hidden_dim)
        h = h.squeeze(0)  # Chuyển về (batch_size, hidden_dim) để phù hợp với linear_class
        res = self.linear_class(h)  # (batch_size, num_classes)
        return res  # Trả về đúng shape: (batch_size, num_classes)




if __name__ == "__main__":
    pass

In [69]:
def train_spoter(model, dataloader, num_epochs=10, batch_size=4, learning_rate=0.001, device="cuda"):
    """
    Huấn luyện mô hình SPOTER với tập dữ liệu được truyền vào.

    Parameters:
    - model: Mô hình SPOTER đã khởi tạo.
    - dataset: Dataset đã được tạo (dựa trên class YogaDataset).
    - num_epochs: Số epoch huấn luyện (mặc định 10).
    - batch_size: Số lượng mẫu mỗi batch (mặc định 4).
    - learning_rate: Hệ số học (mặc định 0.001).
    - device: 'cuda' nếu có GPU, ngược lại dùng 'cpu'.

    Output:
    - Trả về mô hình đã được train.
    """

    # Đưa mô hình lên GPU (nếu có)
    device = torch.device(device if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # DataLoader để lấy batch dữ liệu
#     dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Khởi tạo Loss Function và Optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Vòng lặp huấn luyện
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for X_batch, y_batch in dataloader:
            # Đưa dữ liệu lên GPU (nếu có)
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            # Forward pass
            outputs = model(X_batch).squeeze(1)  # Bỏ chiều 1 để có shape (batch_size, num_classes)
            
            # Tính loss
            loss = criterion(outputs, y_batch)
            total_loss += loss.item()

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # In loss sau mỗi epoch
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    print("Training completed!")
    return model  # Trả về mô hình đã train


In [70]:
# Khởi tạo dataset (cần thay đổi đường dẫn dataset phù hợp)
json_folder = "../data/keypoints/public_data"
batch_size = 4
dataset = YogaDataset(json_folder, max_frames=100)
dataloader = DataLoader(dataset, batch_size, shuffle=True)
# Khởi tạo mô hình
model = SPOTER(num_classes=4, hidden_dim=72)

# Train mô hình
trained_model = train_spoter(model, dataloader, num_epochs=20, batch_size=8, learning_rate=0.0005)

Label map: {'Garland_Pose': 0, 'Happy_Baby_Pose': 1, 'Head_To_Knee_Pose': 2, 'Lunge_Pose': 3}
Epoch [1/20], Loss: 1.6335
Epoch [2/20], Loss: 1.4375
Epoch [3/20], Loss: 1.4143
Epoch [4/20], Loss: 1.4650
Epoch [5/20], Loss: 1.4277
Epoch [6/20], Loss: 1.4293
Epoch [7/20], Loss: 1.4461
Epoch [8/20], Loss: 1.4269
Epoch [9/20], Loss: 1.4218
Epoch [10/20], Loss: 1.3877
Epoch [11/20], Loss: 1.4361
Epoch [12/20], Loss: 1.4101
Epoch [13/20], Loss: 1.4141
Epoch [14/20], Loss: 1.4202
Epoch [15/20], Loss: 1.4128
Epoch [16/20], Loss: 1.3956
Epoch [17/20], Loss: 1.3910
Epoch [18/20], Loss: 1.4157
Epoch [19/20], Loss: 1.4044
Epoch [20/20], Loss: 1.3999
Training completed!
