In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class ConvTemporalGraphical(nn.Module):
    """The basic module for applying a graph convolution.
    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (int): Size of the graph convolving kernel.
        t_kernel_size (int): Size of the temporal convolving kernel.
        t_stride (int, optional): Stride of the temporal convolution. Default: 1.
        t_padding (int, optional): Temporal zero-padding added to both sides
            of the input. Default: 0.
        t_dilation (int, optional): Spacing between temporal kernel elements.
            Default: 1.
        bias (bool, optional): If ``True``, adds a learnable bias to the
            output. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out}
            , V)` format
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V, V)
            ` format
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]
                `,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        t_kernel_size=1,
        t_stride=1,
        t_padding=0,
        t_dilation=1,
        bias=True,
    ):
        super().__init__()

        self.kernel_size = kernel_size
        self.conv = nn.Conv2d(
            in_channels,
            out_channels * kernel_size,
            kernel_size=(t_kernel_size, 1),
            padding=(t_padding, 0),
            stride=(t_stride, 1),
            dilation=(t_dilation, 1),
            bias=bias,
        )

    def forward(self, x, A):
        assert A.size(0) == self.kernel_size

        x = self.conv(x)
        n, kc, t, v = x.size()
        x = x.view(n, self.kernel_size, kc // self.kernel_size, t, v)
        x = torch.einsum("nkctv,kvw->nctw", (x, A))

        return x.contiguous(), A                                                                                


class STGCN_BLOCK(nn.Module):
    """
    Applies a spatial temporal graph convolution over an input graph
    sequence.

    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (tuple): Size of the temporal convolving kernel and
            graph convolving kernel.
        stride (int, optional): Stride of the temporal convolution. Default: 1.
        dropout (int, optional): Dropout rate of the final output. Default: 0.
        residual (bool, optional): If ``True``, applies a residual mechanism. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format.
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out},
            V)` format.
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V,
            V)` format.
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """
    def __init__(
        self, in_channels, out_channels, kernel_size, stride=1, dropout=0, residual=True
    ):
        super().__init__()

        assert len(kernel_size) == 2
        assert kernel_size[0] % 2 == 1
        padding = ((kernel_size[0] - 1) // 2, 0)

        self.gcn = ConvTemporalGraphical(in_channels, out_channels, kernel_size[1])

        self.tcn = nn.Sequential(
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(
                out_channels,
                out_channels,
                (kernel_size[0], 1),
                (stride, 1),
                padding,
            ),
            nn.BatchNorm2d(out_channels),
            nn.Dropout(dropout, inplace=True),
        )

        if not residual:
            self.residual = lambda x: 0

        elif (in_channels == out_channels) and (stride == 1):
            self.residual = lambda x: x

        else:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=(stride, 1)),
                nn.BatchNorm2d(out_channels),
            )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x, A):
        res = self.residual(x)
        x, A = self.gcn(x, A)
        x = self.tcn(x) + res

        return self.relu(x), A

class FC(nn.Module):
    """
    Fully connected layer head
    Args:
        n_features (int): Number of features in the input.
        num_class (int): Number of class for classification.
        dropout_ratio (float): Dropout ratio to use. Default: 0.2.
        batch_norm (bool): Whether to use batch norm or not. Default: ``False``.
    """
    def __init__(self, n_features, num_class, dropout_ratio=0.2, batch_norm=False):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_ratio)
        self.bn = batch_norm
        self.n_features = n_features
        if batch_norm:
            self.bn = nn.BatchNorm1d(self.n_features)
            self.bn.weight.data.fill_(1)
            self.bn.bias.data.zero_()
        self.classifier = nn.Linear(n_features, num_class)
        nn.init.normal_(self.classifier.weight, 0, math.sqrt(2.0 / num_class))

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): Input tensor of shape: (batch_size, n_features)
        
        returns:
            torch.Tensor: logits for classification.
        """

        x = self.dropout(x)
        if self.bn:
            x = self.bn(x)
        x = self.classifier(x)
        return x

class Model(nn.Module):
    """Spatial temporal graph convolutional network backbone
    
    This module is proposed in
    `Spatial Temporal Graph Convolutional Networks for Skeleton-Based Action Recognition
    <https://arxiv.org/pdf/1801.07455.pdf>`_

    Args:
        in_channels (int): Number of channels in the input data.
        graph_args (dict): The arguments for building the graph.
        edge_importance_weighting (bool): If ``True``, adds a learnable importance weighting to the edges of the graph. Default: True.
        n_out_features (int): Output Embedding dimension. Default: 256. 
        kwargs (dict): Other parameters for graph convolution units.
    """
    def __init__(self,
                 in_channels = 2,
                 num_nodes = 29,
                 center = 0,
                 inward_edges = None,
                 edge_importance_weighting = True,
                 n_out_features = 256,
                 n_classes = 1000,
                 dropout_ratio = 0.05,
                 batch_norm=False,) -> None:
        super().__init__()

        self.graph = GraphWithPartition(num_nodes, center, inward_edges)
        A = torch.tensor(self.graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer("A", A)

        spatial_kernel_size = A.size(0)
        temporal_kernel_size = 9
        self.n_out_features = n_out_features
        kernel_size = (temporal_kernel_size, spatial_kernel_size)
        self.data_bn = nn.BatchNorm1d(in_channels * A.size(1))
        self.st_gcn_networks = nn.ModuleList(
            (
                STGCN_BLOCK(in_channels, 64, kernel_size, 1, residual=False,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 128, kernel_size, 2,),
                STGCN_BLOCK(128, 128, kernel_size, 1,),
                STGCN_BLOCK(128, 128, kernel_size, 1,),
                STGCN_BLOCK(128, 256, kernel_size, 2,),
                STGCN_BLOCK(256, 256, kernel_size, 1,),
                STGCN_BLOCK(256, self.n_out_features, kernel_size, 1,),
            )
        )

        if edge_importance_weighting:
            self.edge_importance = nn.ParameterList(
                [nn.Parameter(torch.ones(self.A.size())) for i in self.st_gcn_networks]
            )
        else:
            self.edge_importance = [1] * len(self.st_gcn_networks)
        
        self.head = FC(self.n_out_features, n_classes, dropout_ratio, batch_norm)

    def forward(self, x):
        """
        Args: 
            x (torch.Tensor): Input tensor of shape :math:`(N, in\_channels, T_{in}, V_{in})`
        
        Returns:
            torch.Tensor: Output embedding of shape :math:`(N, n\_out\_features)`

        where
            - :math:`N` is a batch size,
            - :math:`T_{in}` is a length of input sequence,
            - :math:`V_{in}` is the number of graph nodes,
            - :math:`n\_out\_features` is the output embedding dimension.

            our input is in shape ntvc
        """
        N, C, T, V = x.size()
        x = x.permute(0, 3, 1, 2).contiguous() # NCTV -> NVCT
        x = x.view(N, V * C, T)
        x = self.data_bn(x)
        x = x.view(N, V, C, T)
        x = x.permute(0, 2, 3, 1).contiguous() # NVCT -> NCTV

        for gcn, importance in zip(self.st_gcn_networks, self.edge_importance):
            x, _ = gcn(x, self.A * importance)

        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1)

        return self.head(x)



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class ConvTemporalGraphical(nn.Module):
    """The basic module for applying a graph convolution.
    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (int): Size of the graph convolving kernel.
        t_kernel_size (int): Size of the temporal convolving kernel.
        t_stride (int, optional): Stride of the temporal convolution. Default: 1.
        t_padding (int, optional): Temporal zero-padding added to both sides
            of the input. Default: 0.
        t_dilation (int, optional): Spacing between temporal kernel elements.
            Default: 1.
        bias (bool, optional): If ``True``, adds a learnable bias to the
            output. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out}
            , V)` format
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V, V)
            ` format
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]
                `,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        t_kernel_size=1,
        t_stride=1,
        t_padding=0,
        t_dilation=1,
        bias=True,
    ):
        super().__init__()

        self.kernel_size = kernel_size
        self.conv = nn.Conv2d(
            in_channels,
            out_channels * kernel_size,
            kernel_size=(t_kernel_size, 1),
            padding=(t_padding, 0),
            stride=(t_stride, 1),
            dilation=(t_dilation, 1),
            bias=bias,
        )

    def forward(self, x, A):
        assert A.size(0) == self.kernel_size

        x = self.conv(x)
        n, kc, t, v = x.size()
        x = x.view(n, self.kernel_size, kc // self.kernel_size, t, v)
        x = torch.einsum("nkctv,kvw->nctw", (x, A))

        return x.contiguous(), A                                                                                


class STGCN_BLOCK(nn.Module):
    """
    Applies a spatial temporal graph convolution over an input graph
    sequence.

    Args:
        in_channels (int): Number of channels in the input sequence data.
        out_channels (int): Number of channels produced by the convolution.
        kernel_size (tuple): Size of the temporal convolving kernel and
            graph convolving kernel.
        stride (int, optional): Stride of the temporal convolution. Default: 1.
        dropout (int, optional): Dropout rate of the final output. Default: 0.
        residual (bool, optional): If ``True``, applies a residual mechanism. Default: ``True``.
    Shape:
        - Input[0]: Input graph sequence in :math:`(N, in_channels, T_{in}, V)`
            format.
        - Input[1]: Input graph adjacency matrix in :math:`(K, V, V)` format
        - Output[0]: Output graph sequence in :math:`(N, out_channels, T_{out},
            V)` format.
        - Output[1]: Graph adjacency matrix for output data in :math:`(K, V,
            V)` format.
        where
            :math:`N` is a batch size,
            :math:`K` is the spatial kernel size, as :math:`K == kernel_size[1]`,
            :math:`T_{in}/T_{out}` is a length of input/output sequence,
            :math:`V` is the number of graph nodes.
    """
    def __init__(
        self, in_channels, out_channels, kernel_size, stride=1, dropout=0, residual=True
    ):
        super().__init__()

        assert len(kernel_size) == 2
        assert kernel_size[0] % 2 == 1
        padding = ((kernel_size[0] - 1) // 2, 0)

        self.gcn = ConvTemporalGraphical(in_channels, out_channels, kernel_size[1])

        self.tcn = nn.Sequential(
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(
                out_channels,
                out_channels,
                (kernel_size[0], 1),
                (stride, 1),
                padding,
            ),
            nn.BatchNorm2d(out_channels),
            nn.Dropout(dropout, inplace=True),
        )

        if not residual:
            self.residual = lambda x: 0

        elif (in_channels == out_channels) and (stride == 1):
            self.residual = lambda x: x

        else:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=(stride, 1)),
                nn.BatchNorm2d(out_channels),
            )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x, A):
        res = self.residual(x)
        x, A = self.gcn(x, A)
        x = self.tcn(x) + res

        return self.relu(x), A

class FC(nn.Module):
    """
    Fully connected layer head
    Args:
        n_features (int): Number of features in the input.
        num_class (int): Number of class for classification.
        dropout_ratio (float): Dropout ratio to use. Default: 0.2.
        batch_norm (bool): Whether to use batch norm or not. Default: ``False``.
    """
    def __init__(self, n_features, num_class, dropout_ratio=0.2, batch_norm=False):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_ratio)
        self.bn = batch_norm
        self.n_features = n_features
        if batch_norm:
            self.bn = nn.BatchNorm1d(self.n_features)
            self.bn.weight.data.fill_(1)
            self.bn.bias.data.zero_()
        self.classifier = nn.Linear(n_features, num_class)
        nn.init.normal_(self.classifier.weight, 0, math.sqrt(2.0 / num_class))

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): Input tensor of shape: (batch_size, n_features)
        
        returns:
            torch.Tensor: logits for classification.
        """

        x = self.dropout(x)
        if self.bn:
            x = self.bn(x)
        x = self.classifier(x)
        return x

class Model(nn.Module):
    """Spatial temporal graph convolutional network backbone
    
    This module is proposed in
    `Spatial Temporal Graph Convolutional Networks for Skeleton-Based Action Recognition
    <https://arxiv.org/pdf/1801.07455.pdf>`_

    Args:
        in_channels (int): Number of channels in the input data.
        graph_args (dict): The arguments for building the graph.
        edge_importance_weighting (bool): If ``True``, adds a learnable importance weighting to the edges of the graph. Default: True.
        n_out_features (int): Output Embedding dimension. Default: 256. 
        kwargs (dict): Other parameters for graph convolution units.
    """
    def __init__(self,
                 in_channels = 2,
                 num_nodes = 29,
                 center = 0,
                 inward_edges = None,
                 edge_importance_weighting = True,
                 n_out_features = 256,
                 n_classes = 1000,
                 dropout_ratio = 0.05,
                 batch_norm=False,) -> None:
        super().__init__()

        self.graph = GraphWithPartition(num_nodes, center, inward_edges)
        A = torch.tensor(self.graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer("A", A)

        spatial_kernel_size = A.size(0)
        temporal_kernel_size = 9
        self.n_out_features = n_out_features
        kernel_size = (temporal_kernel_size, spatial_kernel_size)
        self.data_bn = nn.BatchNorm1d(in_channels * A.size(1))
        self.st_gcn_networks = nn.ModuleList(
            (
                STGCN_BLOCK(in_channels, 64, kernel_size, 1, residual=False,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 64, kernel_size, 1,),
                STGCN_BLOCK(64, 128, kernel_size, 2,),
                STGCN_BLOCK(128, 128, kernel_size, 1,),
                STGCN_BLOCK(128, 128, kernel_size, 1,),
                STGCN_BLOCK(128, 256, kernel_size, 2,),
                STGCN_BLOCK(256, 256, kernel_size, 1,),
                STGCN_BLOCK(256, self.n_out_features, kernel_size, 1,),
            )
        )

        if edge_importance_weighting:
            self.edge_importance = nn.ParameterList(
                [nn.Parameter(torch.ones(self.A.size())) for i in self.st_gcn_networks]
            )
        else:
            self.edge_importance = [1] * len(self.st_gcn_networks)
        
        self.head = FC(self.n_out_features, n_classes, dropout_ratio, batch_norm)

    def forward(self, x):
        """
        Args: 
            x (torch.Tensor): Input tensor of shape :math:`(N, in\_channels, T_{in}, V_{in})`
        
        Returns:
            torch.Tensor: Output embedding of shape :math:`(N, n\_out\_features)`

        where
            - :math:`N` is a batch size,
            - :math:`T_{in}` is a length of input sequence,
            - :math:`V_{in}` is the number of graph nodes,
            - :math:`n\_out\_features` is the output embedding dimension.

            our input is in shape ntvc
        """
        N, C, T, V = x.size()
        x = x.permute(0, 3, 1, 2).contiguous() # NCTV -> NVCT
        x = x.view(N, V * C, T)
        x = self.data_bn(x)
        x = x.view(N, V, C, T)
        x = x.permute(0, 2, 3, 1).contiguous() # NVCT -> NCTV

        for gcn, importance in zip(self.st_gcn_networks, self.edge_importance):
            x, _ = gcn(x, self.A * importance)

        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1)

        return self.head(x)



In [1]:
import torch

import os
parent_dir = 'D:\\Semester_7\\GraduationProject\\SLR\\data_set\\mediapipe_sequences'
a =[]
lable_decode = []
for lable in os.listdir(parent_dir):
    video_dir = os.path.join(parent_dir, lable)
    for file_name in os.listdir(video_dir):
        lable_decode.append(lable)
        input_file = os.path.join(video_dir, file_name)
        tensor = torch.load(input_file)
        tensor = torch.einsum('xyz->zxy', tensor)
        a.append(tensor)

batch = torch.stack(a)

In [2]:
batch_x = batch[:, 0]
batch_y = batch[:, 1]
batch_vi = batch[:, -1:]

In [3]:
batch_x.shape
batch_y.shape

torch.Size([501, 25, 67])

In [4]:
print(batch_x)
print(batch_y)

tensor([[[0.6382, 0.6610, 0.6770,  ..., 0.0000, 0.0000, 0.0000],
         [0.6367, 0.6600, 0.6758,  ..., 0.0000, 0.0000, 0.0000],
         [0.6332, 0.6577, 0.6724,  ..., 0.0000, 0.0000, 0.0000],
         ...,
         [0.6277, 0.6525, 0.6668,  ..., 0.0000, 0.0000, 0.0000],
         [0.6279, 0.6528, 0.6671,  ..., 0.0000, 0.0000, 0.0000],
         [0.6281, 0.6529, 0.6672,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.5045, 0.5155, 0.5245,  ..., 0.0000, 0.0000, 0.0000],
         [0.5020, 0.5125, 0.5221,  ..., 0.0000, 0.0000, 0.0000],
         [0.4952, 0.5067, 0.5148,  ..., 0.0000, 0.0000, 0.0000],
         ...,
         [0.5092, 0.5234, 0.5321,  ..., 0.6279, 0.6244, 0.6206],
         [0.5081, 0.5226, 0.5312,  ..., 0.6284, 0.6255, 0.6229],
         [0.5076, 0.5216, 0.5301,  ..., 0.6294, 0.6267, 0.6225]],

        [[0.4818, 0.4946, 0.5027,  ..., 0.6083, 0.6032, 0.5977],
         [0.4838, 0.4958, 0.5043,  ..., 0.6077, 0.6053, 0.6025],
         [0.4853, 0.4973, 0.5068,  ..., 0.6132, 0.6088, 0.

In [5]:
batch_x_reshape = batch_x.reshape(501*25, 67)
batch_y_reshape = batch_y.reshape(501*25, 67)

In [6]:
batch_x_reshape = batch_x_reshape.t()
batch_y_reshape = batch_y_reshape.t()

In [7]:
print(batch_x_reshape)
print(batch_y_reshape)

tensor([[0.6382, 0.6367, 0.6332,  ..., 0.5639, 0.5643, 0.5644],
        [0.6610, 0.6600, 0.6577,  ..., 0.5786, 0.5787, 0.5787],
        [0.6770, 0.6758, 0.6724,  ..., 0.5953, 0.5954, 0.5953],
        ...,
        [0.0000, 0.0000, 0.0000,  ..., 0.6514, 0.6571, 0.6514],
        [0.0000, 0.0000, 0.0000,  ..., 0.6373, 0.6463, 0.6375],
        [0.0000, 0.0000, 0.0000,  ..., 0.6240, 0.6355, 0.6236]])
tensor([[0.4790, 0.4791, 0.4792,  ..., 0.2653, 0.2653, 0.2652],
        [0.4295, 0.4295, 0.4293,  ..., 0.2244, 0.2244, 0.2243],
        [0.4295, 0.4296, 0.4294,  ..., 0.2235, 0.2234, 0.2233],
        ...,
        [0.0000, 0.0000, 0.0000,  ..., 0.5561, 0.5589, 0.5601],
        [0.0000, 0.0000, 0.0000,  ..., 0.5644, 0.5644, 0.5678],
        [0.0000, 0.0000, 0.0000,  ..., 0.5689, 0.5661, 0.5704]])


In [8]:
batch_x_reshape.shape
batch_y_reshape.shape

torch.Size([67, 12525])

In [9]:
batch_x_reshape.shape

torch.Size([67, 12525])

In [10]:
import numpy as np
from sklearn.preprocessing import StandardScaler

In [11]:
# Tạo đối tượng StandardScaler
scaler = StandardScaler()

x_scaled_data = scaler.fit_transform(batch_x_reshape.numpy())
y_scaled_data = scaler.fit_transform(batch_y_reshape.numpy())


In [12]:
batch_x = torch.from_numpy(x_scaled_data).t()
batch_y = torch.from_numpy(y_scaled_data).t()

In [13]:
batch_x = batch_x.reshape(-1, 25, 67)
batch_y = batch_y.reshape(-1, 25, 67)

In [16]:
batch_xy = torch.stack([batch_x, batch_y], 1)

In [17]:
batch_xy.shape

torch.Size([501, 2, 25, 67])

In [18]:
for batch, i in enumerate(batch_xy):
    for coord, j in enumerate(i):
        for frame, k in enumerate(j):
            k = k - k[0]
            batch_xy[batch][coord][frame] = k

batch_xy

tensor([[[[ 0.0000,  0.0764,  0.1298,  ..., -2.1361, -2.1361, -2.1361],
          [ 0.0000,  0.0784,  0.1315,  ..., -2.1408, -2.1408, -2.1408],
          [ 0.0000,  0.0824,  0.1324,  ..., -2.1377, -2.1377, -2.1377],
          ...,
          [ 0.0000,  0.0821,  0.1295,  ..., -2.0772, -2.0772, -2.0772],
          [ 0.0000,  0.0829,  0.1306,  ..., -2.0912, -2.0912, -2.0912],
          [ 0.0000,  0.0826,  0.1301,  ..., -2.0887, -2.0887, -2.0887]],

         [[ 0.0000, -0.1042, -0.1042,  ..., -1.0096, -1.0096, -1.0096],
          [ 0.0000, -0.1055, -0.1054,  ..., -1.0186, -1.0186, -1.0186],
          [ 0.0000, -0.1071, -0.1069,  ..., -1.0301, -1.0301, -1.0301],
          ...,
          [ 0.0000, -0.1398, -0.1368,  ..., -1.1023, -1.1023, -1.1023],
          [ 0.0000, -0.1401, -0.1372,  ..., -1.1034, -1.1034, -1.1034],
          [ 0.0000, -0.1410, -0.1380,  ..., -1.1140, -1.1140, -1.1140]]],


        [[[ 0.0000,  0.0493,  0.0897,  ..., -2.2736, -2.2736, -2.2736],
          [ 0.0000,  0.0471,

In [19]:
batch_xy.shape

torch.Size([501, 2, 25, 67])

In [20]:
batch_xy[0]

tensor([[[ 0.0000,  0.0764,  0.1298,  ..., -2.1361, -2.1361, -2.1361],
         [ 0.0000,  0.0784,  0.1315,  ..., -2.1408, -2.1408, -2.1408],
         [ 0.0000,  0.0824,  0.1324,  ..., -2.1377, -2.1377, -2.1377],
         ...,
         [ 0.0000,  0.0821,  0.1295,  ..., -2.0772, -2.0772, -2.0772],
         [ 0.0000,  0.0829,  0.1306,  ..., -2.0912, -2.0912, -2.0912],
         [ 0.0000,  0.0826,  0.1301,  ..., -2.0887, -2.0887, -2.0887]],

        [[ 0.0000, -0.1042, -0.1042,  ..., -1.0096, -1.0096, -1.0096],
         [ 0.0000, -0.1055, -0.1054,  ..., -1.0186, -1.0186, -1.0186],
         [ 0.0000, -0.1071, -0.1069,  ..., -1.0301, -1.0301, -1.0301],
         ...,
         [ 0.0000, -0.1398, -0.1368,  ..., -1.1023, -1.1023, -1.1023],
         [ 0.0000, -0.1401, -0.1372,  ..., -1.1034, -1.1034, -1.1034],
         [ 0.0000, -0.1410, -0.1380,  ..., -1.1140, -1.1140, -1.1140]]])

In [22]:
batch_vi.shape

torch.Size([501, 1, 25, 67])

In [24]:
batch_xy.shape

torch.Size([501, 2, 25, 67])

In [25]:
batch = torch.cat((batch_xy, batch_vi), dim=1)

In [26]:
batch.shape

torch.Size([501, 3, 25, 67])

In [27]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [28]:
import torch.nn.functional as F

# Danh sách cần one-hot encode
lst = lable_decode

# Tạo từ điển ánh xạ chuỗi sang số nguyên
unique_labels = list(set(lst))  # Lấy các giá trị duy nhất
label_to_index = {label: index for index, label in enumerate(unique_labels)}

# Chuyển đổi danh sách chuỗi thành danh sách chỉ số
indices = [label_to_index[label] for label in lst]

# Chuyển danh sách chỉ số thành tensor
tensor_indices = torch.tensor(indices)


In [29]:
num_class = len(unique_labels)
num_class

26

In [30]:
tensor_indices

tensor([ 5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,
         5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5,  5, 16, 16, 16, 16, 16, 16,
        16, 16, 16, 16, 16, 16, 16, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13, 13,
        13, 13, 13, 13, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19, 19,
        19, 19, 19, 19, 19, 19, 19, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18,
        18, 18, 18, 18, 18, 18, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25, 25,
        25, 25, 25, 25, 25, 25, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23,
        23, 23, 23, 23, 23, 23, 23, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22,
        22, 22, 22, 22, 22, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,
        17, 17,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,
         4,  4,  4,  4,  4, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20,
        20, 20, 20, 20, 20, 20, 20, 20, 

In [31]:
# Kết nối khớp cơ thể (Pose Estimation)
body_connections = [
    (1,4),  
    (0,2),  
    (1,3), 
    (2,7),
    (0,5), 
    (6,4),
    (8,5), 
    (3), 
    (6),
    (10),
    (9),
    (12,13),
    (11,14),
    (11,15),
    (12,16),
    (13,17,21),
    (14,18,22),
    (15,19),
    (16,20),
    (15,17),
    (16,18),
    (15,19),
    (16),
    (24,11),
    (12,23),
    (26),  
    (25, 27),  
    (26, 28),  
    (27, 29),  
    (28),  
    (25, 31, 34),  
    (30, 32),  
    (31, 33),  
    (32),  
    (30, 38), 
    (34, 36),
    (35, 37),
    (36), 
    (34, 42, 39),
    (38, 40),
    (39, 41),
    (40),
    (38, 43),
    (42, 44),
    (43, 45),
    (44),
    (47),  
    (46, 48),  
    (47, 49),  
    (48, 50),  
    (49),  
    (46, 52, 55),  
    (51, 53),  
    (52, 54),  
    (53),  
    (51, 59), 
    (55, 57),
    (56, 58),
    (57), 
    (55, 63, 60),
    (59, 61),
    (60, 62),
    (61),
    (59, 64),
    (63, 65),
    (64, 66),
    (65)
]





In [32]:
len(body_connections)

67

In [33]:
# Tạo danh sách chi tiết hơn theo dạng (index, value)
detailed_body_connections = [(i, val) for i, conn in enumerate(body_connections) for val in (conn if isinstance(conn, tuple) else (conn,))]

print(detailed_body_connections)

[(0, 1), (0, 4), (1, 0), (1, 2), (2, 1), (2, 3), (3, 2), (3, 7), (4, 0), (4, 5), (5, 6), (5, 4), (6, 8), (6, 5), (7, 3), (8, 6), (9, 10), (10, 9), (11, 12), (11, 13), (12, 11), (12, 14), (13, 11), (13, 15), (14, 12), (14, 16), (15, 13), (15, 17), (15, 21), (16, 14), (16, 18), (16, 22), (17, 15), (17, 19), (18, 16), (18, 20), (19, 15), (19, 17), (20, 16), (20, 18), (21, 15), (21, 19), (22, 16), (23, 24), (23, 11), (24, 12), (24, 23), (25, 26), (26, 25), (26, 27), (27, 26), (27, 28), (28, 27), (28, 29), (29, 28), (30, 25), (30, 31), (30, 34), (31, 30), (31, 32), (32, 31), (32, 33), (33, 32), (34, 30), (34, 38), (35, 34), (35, 36), (36, 35), (36, 37), (37, 36), (38, 34), (38, 42), (38, 39), (39, 38), (39, 40), (40, 39), (40, 41), (41, 40), (42, 38), (42, 43), (43, 42), (43, 44), (44, 43), (44, 45), (45, 44), (46, 47), (47, 46), (47, 48), (48, 47), (48, 49), (49, 48), (49, 50), (50, 49), (51, 46), (51, 52), (51, 55), (52, 51), (52, 53), (53, 52), (53, 54), (54, 53), (55, 51), (55, 59), (56

In [34]:
detailed_body_connections = [(0, 1), (0, 4), (1, 0), (1, 2), (2, 1), (2, 3), (3, 2), (3, 7), (4, 0), (4, 5), (5, 6), (5, 4), (6, 8), (6, 5), (7, 3), (8, 6), (9, 10), (10, 9), (11, 12), (11, 13), (12, 11), (12, 14), (13, 11), (13, 15), (14, 12), (14, 16), (15, 13), (15, 17), (15, 21), (16, 14), (16, 18), (16, 22), (17, 15), (17, 19), (18, 16), (18, 20), (19, 15), (19, 17), (20, 16), (20, 18), (21, 15), (21, 19), (22, 16), (23, 24), (23, 11), (24, 12), (24, 23), (25, 26), (26, 25), (26, 27), (27, 26), (27, 28), (28, 27), (28, 29), (29, 28), (30, 25), (30, 31), (30, 34), (31, 30), (31, 32), (32, 31), (32, 33), (33, 32), (34, 30), (34, 38), (35, 34), (35, 36), (36, 35), (36, 37), (37, 36), (38, 34), (38, 42), (38, 39), (39, 38), (39, 40), (40, 39), (40, 41), (41, 40), (42, 38), (42, 43), (43, 42), (43, 44), (44, 43), (44, 45), (45, 44), (46, 47), (47, 46), (47, 48), (48, 47), (48, 49), (49, 48), (49, 50), (50, 49), (51, 46), (51, 52), (51, 55), (52, 51), (52, 53), (53, 52), (53, 54), (54, 53), (55, 51), (55, 59), (56, 55), (56, 57), (57, 56), (57, 58), (58, 57), (59, 55), (59, 63), (59, 60), (60, 59), (60, 61), (61, 60), (61, 62), (62, 61), (63, 59), (63, 64), (64, 63), (64, 65), (65, 64), (65, 66), (66, 65)]

In [35]:
from STGCN import Model
model = Model(in_channels=3, num_nodes=67, inward_edges=detailed_body_connections, n_classes=26, dropout_ratio = 0.1,batch_norm=True)

In [36]:
import torch
import torch.nn as nn

In [37]:
from torch.utils.data import random_split
batch_size=5
num = len(lable_decode)
train_ratio = 0.8
train_size = int(train_ratio * num)
val_size = num - train_size
dataset = TensorDataset(batch, tensor_indices)
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [38]:
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 25
criterion = nn.CrossEntropyLoss()
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs, labels

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass và cập nhật trọng số
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # In ra thông tin sau mỗi 100 batch
        running_loss += loss.item()
        if i  % 4 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}], Loss: {running_loss/100:.4f}')
            running_loss = 0.0

print('Hoàn thành huấn luyện!')

Epoch [1/25], Batch [1], Loss: 0.0746
Epoch [1/25], Batch [5], Loss: 0.4684
Epoch [1/25], Batch [9], Loss: 0.3210
Epoch [1/25], Batch [13], Loss: 0.2625
Epoch [1/25], Batch [17], Loss: 0.1933
Epoch [1/25], Batch [21], Loss: 0.2651
Epoch [1/25], Batch [25], Loss: 0.2759
Epoch [1/25], Batch [29], Loss: 0.1881
Epoch [1/25], Batch [33], Loss: 0.1803
Epoch [1/25], Batch [37], Loss: 0.2037
Epoch [1/25], Batch [41], Loss: 0.2536
Epoch [1/25], Batch [45], Loss: 0.1989
Epoch [1/25], Batch [49], Loss: 0.2044
Epoch [1/25], Batch [53], Loss: 0.2260
Epoch [1/25], Batch [57], Loss: 0.2206
Epoch [1/25], Batch [61], Loss: 0.1816
Epoch [1/25], Batch [65], Loss: 0.2072
Epoch [1/25], Batch [69], Loss: 0.1814
Epoch [1/25], Batch [73], Loss: 0.2297
Epoch [1/25], Batch [77], Loss: 0.1746
Epoch [2/25], Batch [1], Loss: 0.0342
Epoch [2/25], Batch [5], Loss: 0.1579
Epoch [2/25], Batch [9], Loss: 0.1553
Epoch [2/25], Batch [13], Loss: 0.2045
Epoch [2/25], Batch [17], Loss: 0.1778
Epoch [2/25], Batch [21], Loss:

In [39]:
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Giả sử bạn đã có mô hình đã huấn luyện và test_loader là DataLoader cho tập test
# Chúng ta sẽ tạo danh sách để lưu trữ các nhãn dự đoán và nhãn thực tế
all_preds = []
all_labels = []

# Đánh giá mô hình trên tập test
with torch.no_grad():
    model.eval()  # Đặt mô hình ở chế độ đánh giá
    for data, labels in train_dataset:
        data = data.unsqueeze(0)
        outputs = model(data)  # Dự đoán từ mô hình
        predicted_classes = torch.argmax(outputs, dim=1)  # Lấy nhãn có xác suất cao nhất
        all_preds.extend(predicted_classes.numpy())  # Thêm nhãn dự đoán vào danh sách
        all_labels.extend(labels.unsqueeze(0).numpy())  # Thêm nhãn thực tế vào danh sách

# Tính toán các chỉ số đánh giá
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average='weighted')  # Tính precision cho đa lớp
recall = recall_score(all_labels, all_preds, average='weighted')  # Tính recall cho đa lớp
f1 = f1_score(all_labels, all_preds, average='weighted')  # Tính F1-score cho đa lớp
confusion = confusion_matrix(all_labels, all_preds)  # Tính confusion matrix

# In ra các kết quả đánh giá
print(f'Accuracy: {accuracy * 100:.2f}%')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1 Score: {f1:.2f}')
print('Confusion Matrix:')
print(confusion)


Accuracy: 76.25%
Precision: 0.82
Recall: 0.76
F1 Score: 0.76
Confusion Matrix:
[[26  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0]
 [ 0 17  0  1  0  4  0  0  2  0  0  0  0  1  2  0  0  0  1  0  0  0  0  0
   0  0]
 [ 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  1  0  0  0
   0  0]
 [ 0  0  0 15  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0]
 [ 0  0  0  0 17  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0]
 [ 0  0  0  0  0 13  0  0  3  0  1  0  1  0  4  0  0  0  0  0  0  0  0  0
   0  0]
 [ 1  0  0  0 11  0 11  0  0  0  0  0  1  0  0  0  0  0  0  0  0  0  0  0
   2  0]
 [ 0  0  0  0  0  0  0 13  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
   0  0]
 [ 0  0  0  0  0  0  0  0 11  0  0  0  0  0  1  0  0  0  0  0  0  0  0  2
   0  0]
 [ 0  0  0  0  0  0  0  0  0  5  0  0  0  0  0  0  0  0  0  1  0  0  1  6
   0  0]
 [ 0  0  0  0  0  0  0  1  0  0 11  0  0  0  0  0  0  0  0  0  0  6  0  0
   0  0]
 [ 0  0 

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [40]:
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Giả sử bạn đã có mô hình đã huấn luyện và test_loader là DataLoader cho tập test
# Chúng ta sẽ tạo danh sách để lưu trữ các nhãn dự đoán và nhãn thực tế
all_preds = []
all_labels = []

# Đánh giá mô hình trên tập test
with torch.no_grad():
    model.eval()  # Đặt mô hình ở chế độ đánh giá
    for data, labels in val_dataset:
        data = data.unsqueeze(0)
        outputs = model(data)  # Dự đoán từ mô hình
        predicted_classes = torch.argmax(outputs, dim=1)  # Lấy nhãn có xác suất cao nhất
        all_preds.extend(predicted_classes.numpy())  # Thêm nhãn dự đoán vào danh sách
        all_labels.extend(labels.unsqueeze(0).numpy())  # Thêm nhãn thực tế vào danh sách

# Tính toán các chỉ số đánh giá
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average='weighted')  # Tính precision cho đa lớp
recall = recall_score(all_labels, all_preds, average='weighted')  # Tính recall cho đa lớp
f1 = f1_score(all_labels, all_preds, average='weighted')  # Tính F1-score cho đa lớp
confusion = confusion_matrix(all_labels, all_preds)  # Tính confusion matrix

# In ra các kết quả đánh giá
print(f'Accuracy: {accuracy * 100:.2f}%')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1 Score: {f1:.2f}')
print('Confusion Matrix:')
print(confusion)


Accuracy: 70.30%
Precision: 0.69
Recall: 0.70
F1 Score: 0.67
Confusion Matrix:
[[7 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0]
 [0 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 3 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0]
 [0 0 0 0 4 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [1 1 0 0 0 5 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0]
 [0 0 0 0 1 0 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 4 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 4 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0]
 [0 0 0 0 0 0 0 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 4 0 1 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 2 1 0 0 0 0 1 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 0 0]
 [1 0 0 0 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 5 0 0 0

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [47]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader

# 1. Thiết lập các siêu tham số
batch_size = 64
learning_rate = 0.001
num_epochs = 5

# 2. Chuẩn bị dữ liệu MNIST
transform = transforms.Compose([
    transforms.ToTensor(),  # Chuyển đổi hình ảnh thành tensor
    transforms.Normalize((0.5,), (0.5,))  # Chuẩn hóa
])

train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)



In [48]:
i = 1
for data, labels in train_loader:
    print("data", data.shape)
    print("labels", labels)
    if i ==2:
        break
    
    i+=1

data torch.Size([64, 1, 28, 28])
labels tensor([9, 1, 4, 1, 0, 2, 0, 2, 8, 4, 9, 7, 2, 3, 6, 9, 7, 6, 9, 7, 2, 1, 7, 2,
        8, 5, 1, 8, 3, 9, 8, 6, 2, 3, 7, 5, 3, 9, 9, 4, 2, 2, 9, 0, 8, 8, 2, 9,
        5, 7, 2, 0, 7, 7, 7, 8, 9, 9, 8, 8, 5, 4, 8, 8])
data torch.Size([64, 1, 28, 28])
labels tensor([6, 4, 5, 7, 0, 7, 8, 8, 3, 1, 0, 7, 0, 7, 1, 6, 9, 3, 0, 0, 7, 9, 9, 0,
        1, 1, 7, 7, 8, 2, 8, 1, 0, 6, 8, 9, 8, 0, 9, 6, 4, 8, 9, 8, 8, 3, 5, 7,
        6, 5, 2, 2, 9, 6, 4, 6, 9, 7, 1, 5, 8, 9, 9, 1])


In [41]:
import torch

# Assuming `model` is your neural network
torch.save(model.state_dict(), 'model.pth')
