# Set up lib and envs

In [1]:
%%capture
!pip install super-gradients

In [2]:
import torch
import os
import pathlib
import re
from imutils import paths
import numpy as np
import csv
import json
import matplotlib.pyplot as plt
import cv2
import time
import PIL

In [3]:
import logging
logging.getLogger().setLevel(logging.CRITICAL)

In [4]:
import warnings
warnings.filterwarnings("ignore")

In [5]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [6]:
from super_gradients.training import models
from super_gradients.common.object_names import Models

yolo_nas_pose = models.get("yolo_nas_pose_l", pretrained_weights="coco_pose").cuda()

[2024-05-10 01:40:31] INFO - crash_tips_setup.py - Crash tips is enabled. You can set your environment variable to CRASH_HANDLER=FALSE to disable it


The console stream is logged into /root/sg_logs/console.log


[2024-05-10 01:40:37] INFO - utils.py - NumExpr defaulting to 8 threads.
 It is your responsibility to determine whether you have permission to use the models for your use case.
 The model you have requested was pre-trained on the coco_pose dataset, published under the following terms: https://cocodataset.org/#termsofuse
[2024-05-10 01:40:42] INFO - checkpoint_utils.py - License Notification: YOLO-NAS-POSE pre-trained weights are subjected to the specific license terms and conditions detailed in 
https://github.com/Deci-AI/super-gradients/blob/master/LICENSE.YOLONAS-POSE.md
By downloading the pre-trained weight files you agree to comply with these terms.
Downloading: "https://sghub.deci.ai/models/yolo_nas_pose_l_coco_pose.pth" to /root/.cache/torch/hub/checkpoints/yolo_nas_pose_l_coco_pose.pth
100%|██████████| 304M/304M [00:04<00:00, 65.8MB/s]
[2024-05-10 01:40:47] INFO - checkpoint_utils.py - Successfully loaded pretrained weights for architecture yolo_nas_pose_l


# 🩻 Display only the skeleton



In [7]:
from super_gradients.training.utils.visualization.detection import draw_bbox
from super_gradients.training.utils.visualization.pose_estimation import PoseVisualization

def process_single_image(image_prediction):
    """
    Process a single image prediction to visualize the pose estimation results on a blank background.

    Parameters:
    - image_prediction : object
        An instance containing the image and its associated pose prediction data.

    Returns:
    - np.ndarray
        An image with the pose skeleton drawn.
    """

    image = image_prediction.image
    pose_data = image_prediction.prediction

    blank_image = np.zeros_like(image)  # for a black background

    skeleton_image = PoseVisualization.draw_poses(
        image=blank_image,
        poses=pose_data.poses,
        boxes=pose_data.bboxes_xyxy,
        scores=pose_data.scores,
        is_crowd=None,
        edge_links=pose_data.edge_links,
        edge_colors=pose_data.edge_colors,
        keypoint_colors=pose_data.keypoint_colors,
        joint_thickness=2,
        box_thickness=2,
        keypoint_radius=5
    )
    return skeleton_image, pose_data, image

In [8]:
def create_video_from_frames(frames, output_filename='output_video.mp4', fps=30.0):
    """
    Create an mp4 video from a list of image frames.

    Parameters:
    - frames : list of np.ndarray
        List of image frames represented as numpy arrays.
    - output_filename : str, optional
        Name of the output video file.
    - fps : float, optional
        Frames per second for the output video.

    Returns:
    - None
    """

    # Determine the width and height from the first frame
    frame_height, frame_width, layers = frames[0].shape

    # Define the codec for .mp4 format
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter(output_filename, fourcc, fps, (frame_width, frame_height))

    # Write each frame to the video
    for frame in frames:
        out.write(frame)

    # Close and release everything
    out.release()
    cv2.destroyAllWindows()

# 📽️ Data Processing

In [9]:
def normalize_data(poses, bboxes, frame_width, frame_height):
    normalized_poses = np.copy(poses)
    normalized_bboxes = np.copy(bboxes)

    normalized_poses[:, :, 0] /= frame_width
    normalized_poses[:, :, 1] /= frame_height

    normalized_bboxes[:, [0, 2]] /= frame_width
    normalized_bboxes[:, [1, 3]] /= frame_height

    return normalized_poses, normalized_bboxes

In [10]:
def select_primary_pose(poses, bboxes, frame_width, frame_height):
    center_of_frame = np.array([frame_width / 2, frame_height / 2])
    highest_score = -1
    primary_pose = None
    primary_bbox = None

    for pose, bbox in zip(poses, bboxes):
        bbox_center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])
        distance_to_center = np.linalg.norm(center_of_frame - bbox_center)
        bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

        score = bbox_area / (distance_to_center + 1)

        if score > highest_score:
            highest_score = score
            primary_pose = pose
            primary_bbox = bbox

    return primary_pose, primary_bbox

In [11]:
def extract_pose_information_per_frame(video_path, conf_threshold=0.4):
    # Load model
    # yolo_nas_pose = models.get("yolo_nas_pose_l", pretrained_weights="coco_pose").cuda()
    result = yolo_nas_pose.to('cuda').predict(video_path, conf=conf_threshold)

    frame_data_list = []

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Could not open video file")
    ret, frame = cap.read()
    frame_height, frame_width = frame.shape[:2]
    cap.release()

    for image_prediction in result._images_prediction_gen:
        poses = image_prediction.prediction.poses
        bboxes_xyxy = image_prediction.prediction.bboxes_xyxy
        edge_links = image_prediction.prediction.edge_links

        normalized_poses, normalized_bboxes = normalize_data(poses, bboxes_xyxy, frame_width, frame_height)

        primary_pose, primary_bbox = select_primary_pose(normalized_poses, normalized_bboxes, 1, 1)  #

        if primary_pose is not None:
            frame_data = {
                'poses': np.array([primary_pose]),
                'bboxes_xyxy': np.array([primary_bbox]),
                'edge_links': edge_links
            }
            frame_data_list.append(frame_data)
    print(f"Finish processing the video {video_path}")
    return frame_data_list


In [12]:
def remove_outlier_frames(frame_data_list, area_threshold=0.1, distance_threshold=0.1):
    filtered_frames = []
    prev_area = None
    prev_center = np.array([0.5, 0.5])

    for frame_data in frame_data_list:
        if 'bboxes_xyxy' not in frame_data or len(frame_data['bboxes_xyxy']) == 0:
            continue

        bbox = frame_data['bboxes_xyxy'][0]
        area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])

        if prev_area is not None:
            area_change = abs(area - prev_area)
            center_distance = np.linalg.norm(center - prev_center)

            # Check
            if area_change > area_threshold or center_distance > distance_threshold:
                continue

        prev_area = area
        prev_center = center
        filtered_frames.append(frame_data)

    return filtered_frames


In [13]:
def processing_for_pose(pose_data_list, save_format="npy", poselist_filename='/content/drive/MyDrive/sample', video_filename=None, frame_size=(640,480), fps=30.0):
    if video_filename is not None:
        fourcc = cv2.VideoWriter_fourcc(*'MP4V')
        out = cv2.VideoWriter(video_filename, fourcc, fps, frame_size)

    frame_width, frame_height = frame_size
    pose_data_list = remove_outlier_frames(pose_data_list) # Remove outlier step
    only_pose_list = []
    for frame_data in pose_data_list:
        if video_filename is not None:
            frame = np.zeros((frame_height, frame_width, 3), np.uint8)
        for pose in frame_data['poses']:
            if video_filename is not None:
                for link in frame_data['edge_links']:
                    start_point = (int(pose[link[0], 0] * frame_width), int(pose[link[0], 1] * frame_height))
                    end_point = (int(pose[link[1], 0] * frame_width), int(pose[link[1], 1] * frame_height))
                    color = (0, 255, 0)  # Skeleton Color
                    cv2.line(frame, start_point, end_point, color, 2)
        only_pose_list.append(pose)
        if video_filename is not None:
            out.write(frame)

    if video_filename is not None:
        out.release()

    if save_format == 'npy':
        np.save(poselist_filename + '.npy', np.array(only_pose_list))
    elif save_format == 'csv':
        with open(poselist_filename + '.csv', mode='w', newline='') as file:
            writer = csv.writer(file)
            for pose in only_pose_list:
                # Flatten pose array
                flattened_pose = pose.flatten()
                writer.writerow(flattened_pose)
    elif save_format == "json":
        pose_list_as_lists = [pose.tolist() for pose in only_pose_list]
        with open(poselist_filename + '.json', 'w') as file:
            json.dump(pose_list_as_lists, file)
    else:
        print("Save format is invalid!")

    return only_pose_list

In [14]:
def data_processing(input_dir, list_data_dir, video_data_dir=None, frame_size=(640,480), fps=30.0):
    os.makedirs(list_data_dir, exist_ok=True)
    if video_data_dir is not None:
        os.makedirs(video_data_dir, exist_ok=True)

    for category in os.listdir(input_dir):
        print(f"***Processing: {category} ***")
        start_time = time.time()

        category_path = os.path.join(input_dir, category)
        list_category_path = os.path.join(list_data_dir, category)

        os.makedirs(list_category_path, exist_ok=True)
        if video_data_dir is not None:
            video_category_path = os.path.join(video_data_dir, category)
            os.makedirs(video_category_path, exist_ok=True)

        video_count = 0

        if os.path.isdir(category_path):
            for video_file in os.listdir(category_path):
                video_path = os.path.join(category_path, video_file)
                base_filename = os.path.splitext(video_file)[0]
                poselist_path = os.path.join(list_category_path, base_filename)
                video_output_path = None if video_data_dir is None else os.path.join(video_category_path, base_filename + '.mp4')

                if video_path.lower().endswith(('.mp4', '.mov', '.MOV')):
                    pose_data = extract_pose_information_per_frame(video_path)
                    only_pose_list = processing_for_pose(pose_data,
                                                         save_format="npy",
                                                         poselist_filename=poselist_path,
                                                         video_filename=video_output_path,
                                                         frame_size=frame_size,
                                                         fps=fps)
                    video_count += 1

        elapsed_time = time.time() - start_time
        print(f"Processed {video_count} videos in '{category}' in {elapsed_time:.2f} seconds.")

        if video_data_dir is not None:
            print(f"Total videos processed in '{video_data_dir}/{category}': {video_count}")

# Apply for dataset

In [15]:
# For data vòng loại
input_dir = '/content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại'
list_data_dir = '/content/drive/MyDrive/DrAIgon/list_processing_data'

In [None]:
data_processing(input_dir, list_data_dir, video_data_dir=None)

***Processing: tricep Pushdown ***


[2024-03-31 10:58:08] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_10.mp4


[2024-03-31 10:58:12] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_13.mp4


[2024-03-31 10:58:16] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_14.mp4


[2024-03-31 10:58:19] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_16.mp4


[2024-03-31 10:58:23] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_11.mp4


[2024-03-31 10:58:29] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_15.mp4


[2024-03-31 10:58:33] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_1.mp4


[2024-03-31 10:58:38] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_12.mp4


[2024-03-31 10:58:44] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_2.mp4


[2024-03-31 10:58:49] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_17.mp4


[2024-03-31 10:58:53] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_20.mp4


[2024-03-31 10:58:57] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_21.mp4


[2024-03-31 10:59:01] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_19.mp4


[2024-03-31 10:59:07] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_23.mp4


[2024-03-31 10:59:11] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_24.mp4


[2024-03-31 10:59:16] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_22.mp4


[2024-03-31 10:59:20] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_18.mp4


[2024-03-31 10:59:26] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_26.mp4


[2024-03-31 10:59:31] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_33.mp4


[2024-03-31 10:59:35] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_3.mp4


[2024-03-31 10:59:40] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_31.mp4


[2024-03-31 10:59:45] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_32.mp4


[2024-03-31 10:59:50] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_25.mp4


[2024-03-31 10:59:54] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_28.mp4


[2024-03-31 11:00:00] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_30.mp4


[2024-03-31 11:00:06] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_29.mp4


[2024-03-31 11:00:11] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_27.mp4


[2024-03-31 11:00:15] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_4.mp4


[2024-03-31 11:00:19] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_34.mp4


[2024-03-31 11:00:24] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Data - Vòng loại/tricep Pushdown/tricep pushdown_38.mp4


[2024-03-31 11:00:29] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


KeyboardInterrupt: 

In [16]:
# For test vòng loại
input_dir_test = '/content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Test - Vòng loại - 2'
list_data_dir_test = '/content/drive/MyDrive/DrAIgon/list_processing_data_testvongloai'

In [None]:
data_processing(input_dir_test, list_data_dir_test, video_data_dir_test=None)

# Util

In [17]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from torch.utils.data import Dataset
import numpy as np
from sklearn.model_selection import train_test_split

class GymPoseDataset(Dataset):
    def __init__(self, list_data_dir, transform=None, augment=False, fixed_length=100):
        self.list_data_dir = list_data_dir
        self.transform = transform
        self.augment = augment
        self.fixed_length = fixed_length
        self.data_info, self.class_to_index = self.get_data_info_and_class_mapping()

    def get_data_info_and_class_mapping(self):
        data_info = []
        class_names = set()
        for class_dir in os.listdir(self.list_data_dir):
            class_dir_path = os.path.join(self.list_data_dir, class_dir)
            if os.path.isdir(class_dir_path):
                for file_name in os.listdir(class_dir_path):
                    file_path = os.path.join(class_dir_path, file_name)
                    data_info.append((file_path, class_dir))
                    class_names.add(class_dir)
        class_to_index = {class_name: i for i, class_name in enumerate(sorted(class_names))}
        return data_info, class_to_index


    def __len__(self):
        return len(self.data_info)

    def time_warp(self, data, warp_factor=0.2):
        sequence_length = data.shape[0]
        new_length = int(sequence_length * warp_factor)

        warped_data = np.zeros((new_length, data.shape[1], data.shape[2]))
        for i in range(data.shape[1]):
            for j in range(data.shape[2]):
                warped_data[:, i, j] = np.interp(np.linspace(0, sequence_length, new_length), np.arange(sequence_length), data[:, i, j])

        return warped_data

    def __getitem__(self, idx):
        file_path, class_name = self.data_info[idx]
        data = np.load(file_path)

        if self.augment:
            data = self.time_warp(data)

        if data.shape[0] < self.fixed_length:
            padding = np.zeros((self.fixed_length - data.shape[0], data.shape[1], data.shape[2]))
            data = np.concatenate((data, padding), axis=0)
        elif data.shape[0] > self.fixed_length:
            data = data[:self.fixed_length, :, :]

        data = data[:, :, :2].reshape(data.shape[0], -1)
        data = torch.as_tensor(data, dtype=torch.float)
        label = torch.tensor(self.class_to_index[class_name], dtype=torch.long)

        return data, label

In [18]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence

def pad_collate(batch):
    (xx, yy) = zip(*batch)
    x_lens = torch.tensor([x.size(0) for x in xx], dtype=torch.long)

    xx_pad = pad_sequence(xx, batch_first=True, padding_value=0)
    yy_stack = torch.stack(yy, dim=0)
    return xx_pad, yy_stack, x_lens

In [19]:
def create_dataloaders(dataset, batch_size=32, val_split=0.1, test_split=0.1):
    dataset_size = len(dataset)
    test_size = int(test_split * dataset_size)
    val_size = int(val_split * dataset_size)
    train_size = dataset_size - val_size - test_size
    train_dataset, val_test_dataset = random_split(dataset, [train_size, val_size + test_size])
    val_dataset, test_dataset = random_split(val_test_dataset, [val_size, test_size])
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=pad_collate)
    val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=pad_collate)
    test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=pad_collate)
    return train_loader, val_loader, test_loader

In [20]:
def create_test_dataloader(dataset, batch_size=32):
    test_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=pad_collate)
    return test_loader

In [21]:
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import matplotlib.pyplot as plt

def evaluate_model(model, loader, criterion, device, mode = 2):
    # mode = 1 for LSTM
    # mode = 2 for ST-GCN
    model.eval()
    running_loss, total_samples = 0.0, 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for data, labels, lengths in loader:
            data, labels, lengths = data.to(device), labels.to(device), lengths.to(device)
            if mode == 1:
                outputs = model(data, lengths)
            elif mode == 2:
                outputs = model(data)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * data.size(0)
            total_samples += labels.size(0)

            all_preds.extend(preds.view(-1).cpu().numpy())
            all_labels.extend(labels.view(-1).cpu().numpy())

    epoch_loss = running_loss / total_samples
    accuracy = 100 * (np.array(all_preds) == np.array(all_labels)).mean()
    f1 = f1_score(all_labels, all_preds, average='weighted')
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_preds)

    return epoch_loss, accuracy, f1, precision, recall, conf_matrix

In [22]:
def plot_training_history(train_losses, train_accuracies, val_losses, val_accuracies):
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Loss History')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Accuracy History')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import matplotlib.pyplot as plt

def train_model(model, train_loader, val_loader, device, num_epochs=80, weight_path='best_model_weights.pth', mode = 2):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10, factor=0.1)

    best_val_loss = 99

    train_losses, train_accuracies = [], []
    val_losses, val_accuracies = [], []

    for epoch in range(num_epochs):
        model.train()
        running_loss, running_corrects, total_samples = 0.0, 0, 0
        for data, labels, lengths in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            data, labels, lengths = data.to(device), labels.to(device), lengths.to(device)
            optimizer.zero_grad()
            if mode == 1:
                outputs = model(data, lengths)
            elif mode == 2:
                outputs = model(data)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
            optimizer.step()
            # scheduler.step()

            running_loss += loss.item() * data.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += labels.size(0)

        epoch_loss = running_loss / total_samples
        epoch_acc = running_corrects.double()*100 / total_samples

        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_acc.item())

        val_loss, val_accuracy, _, _, _, _ = evaluate_model(model, val_loader, criterion, device, mode)
        scheduler.step(val_loss)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), weight_path)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

    plot_training_history(train_losses, train_accuracies, val_losses, val_accuracies)

In [24]:
def test_model(model, test_loader, device, weight_path='best_model_weights.pth', mode = 2):
    model.load_state_dict(torch.load(weight_path))
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    test_loss, test_accuracy, test_f1, test_precision, test_recall, test_conf_matrix = evaluate_model(model, test_loader, criterion, device, mode=mode)

    print(f'------------Testing on the best weight------------')
    print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.2f}%')
    print(f'F1 Score: {test_f1:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}')
    print('Confusion Matrix:\n', test_conf_matrix)

In [25]:
def single_inference(model, video_filename, device, class_to_index, fixed_length=100, weight_path='best_model_weights.pth'):
    pose_data = extract_pose_information_per_frame(video_filename)
    pose_list = processing_for_pose(pose_data)
    input_data = np.array(pose_list)

    model.load_state_dict(torch.load(weight_path))
    model.to(device)
    model.eval()

    if input_data.shape[0] < fixed_length:
        padding = np.zeros((fixed_length - input_data.shape[0], input_data.shape[1], input_data.shape[2]))
        input_data = np.concatenate((input_data, padding), axis=0)
    elif input_data.shape[0] > fixed_length:
        input_data = input_data[:fixed_length, :, :]

    input_data = input_data[:, :, :2].reshape(input_data.shape[0], -1)
    input_data = torch.as_tensor(input_data, dtype=torch.float).unsqueeze(0)
    input_data = input_data.to(device)

    with torch.no_grad():
        outputs = model(input_data)
        _, preds = torch.max(outputs, 1)

    index_to_class = {v: k for k, v in class_to_index.items()}
    predicted_class_name = index_to_class[preds.item()]

    return predicted_class_name

# Training: ST-GCN

In [26]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [27]:
class Graph():
  def __init__(self, hop_size):
      self.num_node = 17
      self.get_edge()
      self.hop_size = hop_size
      self.hop_dis = self.get_hop_distance(self.num_node, self.edge, hop_size=hop_size)
      self.get_adjacency()

  def __str__(self):
      return self.A

  def get_edge(self):
      self_link = [(i, i) for i in range(self.num_node)]
      neighbor_link = [(0, 1), (0, 2), (1, 2), (1, 3), (2, 4), (3, 5), (4, 6),
                        (5, 6), (5, 7), (5, 11), (6, 8), (6, 12), (7, 9), (8, 10),
                        (11, 12), (11, 13), (12, 14), (13, 15), (14, 16)]
      self.edge = self_link + neighbor_link

  def get_adjacency(self):
      valid_hop = range(0, self.hop_size + 1, 1)
      adjacency = np.zeros((self.num_node, self.num_node))
      for hop in valid_hop:
          adjacency[self.hop_dis == hop] = 1
      normalize_adjacency = self.normalize_digraph(adjacency)
      A = np.zeros((len(valid_hop), self.num_node, self.num_node))
      for i, hop in enumerate(valid_hop):
          A[i][self.hop_dis == hop] = normalize_adjacency[self.hop_dis == hop]
      self.A = A

  def get_hop_distance(self, num_node, edge, hop_size):
      A = np.zeros((num_node, num_node))
      for i, j in edge:
          A[j, i] = 1
          A[i, j] = 1
      hop_dis = np.zeros((num_node, num_node)) + np.inf
      transfer_mat = [np.linalg.matrix_power(A, d) for d in range(hop_size + 1)]
      arrive_mat = (np.stack(transfer_mat) > 0)
      for d in range(hop_size, -1, -1):
          hop_dis[arrive_mat[d]] = d
      return hop_dis

  def normalize_digraph(self, A):
      Dl = np.sum(A, 0)
      num_node = A.shape[0]
      Dn = np.zeros((num_node, num_node))
      for i in range(num_node):
          if Dl[i] > 0:
              Dn[i, i] = Dl[i]**(-1)
      DAD = np.dot(A, Dn)
      return DAD

In [28]:
class SpatialGraphConvolution(nn.Module):
  def __init__(self, in_channels, out_channels, s_kernel_size):
    super().__init__()
    self.s_kernel_size = s_kernel_size
    self.conv = nn.Conv2d(in_channels=in_channels,
                          out_channels=out_channels * s_kernel_size,
                          kernel_size=1)

  def forward(self, x, A):
    x = self.conv(x)
    n, kc, t, v = x.size()
    x = x.view(n, self.s_kernel_size, kc//self.s_kernel_size, t, v)
    x = torch.einsum('nkctv,kvw->nctw', (x, A))
    return x.contiguous()

In [29]:
class STGC_block(nn.Module):
  def __init__(self, in_channels, out_channels, stride, t_kernel_size, A_size, dropout=0.5):
    super().__init__()
    self.sgc = SpatialGraphConvolution(in_channels=in_channels,
                                       out_channels=out_channels,
                                       s_kernel_size=A_size[0])

    self.M = nn.Parameter(torch.ones(A_size))

    self.tgc = nn.Sequential(nn.BatchNorm2d(out_channels),
                            nn.ReLU(),
                            nn.Dropout(dropout),
                            nn.Conv2d(out_channels,
                                      out_channels,
                                      (t_kernel_size, 1),
                                      (stride, 1),
                                      ((t_kernel_size - 1) // 2, 0)),
                            nn.BatchNorm2d(out_channels),
                            nn.ReLU())

  def forward(self, x, A):
    x = self.tgc(self.sgc(x, A * self.M))
    return x

In [30]:
class ST_GCN(nn.Module):
    def __init__(self, num_classes=22, num_joints=17, sequence_length=20, t_kernel_size=9, hop_size=2):
        super().__init__()
        in_channels = 2  # (x,y) not confidences

        graph = Graph(hop_size)
        A = torch.tensor(graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer('A', A)

        self.stgc_blocks = nn.ModuleList([
            STGC_block(in_channels, 64, 1, t_kernel_size, A.shape),
            STGC_block(64, 128, 2, t_kernel_size, A.shape),
            STGC_block(128, 256, 2, t_kernel_size, A.shape),
            STGC_block(256, 256, 1, t_kernel_size, A.shape),
            STGC_block(256, 256, 1, t_kernel_size, A.shape)
        ])

        self.fc = nn.Conv2d(256, num_classes, kernel_size=1)

    def forward(self, x):
        N, T, VC = x.size()
        C = 2
        V = VC // C
        x = x.view(N, T, V, C).permute(0, 3, 1, 2)  # [N, C, T, V]

        for stgc in self.stgc_blocks:
            x = stgc(x, self.A)

        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1, 1, 1)
        x = self.fc(x)
        x = x.view(x.size(0), -1)
        return x

In [31]:
# Config
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
list_data_dir = '/content/drive/MyDrive/DrAIgon/list_data_vl'
mode = 2 #mode 2 for ST_GCN

In [32]:
ST_GCN = ST_GCN().to(device)

In [33]:
dataset = GymPoseDataset(list_data_dir, transform=transforms.ToTensor(), augment=False)
train_loader, val_loader, test_loader = create_dataloaders(dataset)

In [None]:
train_model(ST_GCN, train_loader, val_loader, device, num_epochs=100, mode=mode)

In [None]:
test_model(ST_GCN, test_loader, device, mode=mode)

# Testing_VL: ST-GCN

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_dir_test = '/content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Test - Vòng loại - 2'
list_data_dir_test = '/content/drive/MyDrive/DrAIgon/list_data_tvl'
mode = 2

In [None]:
dataset_test = GymPoseDataset(list_data_dir_test, transform=transforms.ToTensor(), augment=False)
test_loader_t = create_test_dataloader(dataset_test)

In [None]:
test_model(ST_GCN, test_loader_t, device, mode=mode)

In [None]:
#Using the best weight
test_model(ST_GCN, test_loader_t, device, weight_path='/content/drive/MyDrive/DrAIgon/final_best_weight.pth', mode=mode)

------------Testing on the best weight------------
Test Loss: 2.7090, Test Acc: 51.28%
F1 Score: 0.4844, Precision: 0.5192, Recall: 0.5128
Confusion Matrix:
 [[0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 1 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0]
 [0 0 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 1 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 1]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0]
 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0 0 0 0]
 [0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0

# Test for single video

In [None]:
video_filename = '/content/drive/MyDrive/DrAIgon/AI4LIFE2024-DATA/Test - Vòng loại/barbell biceps curl_1.mp4'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_to_index = dataset.class_to_index
weight_path = '/content/drive/MyDrive/DrAIgon/final_best_weight.pth'
single_inference(ST_GCN,video_filename=video_filename, device=device, class_to_index=class_to_index, weight_path=weight_path )

# Test for batch (Thể thức mới của ban tổ chức)

In [34]:
def batch_inference(folder_path, model, device, class_to_index, weight_path):
    video_files = [f for f in os.listdir(folder_path) if f.endswith('.mp4')]
    results_file = os.path.join("inference_results.csv")

    with open(results_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['video', 'dự đoán'])

        for video_file in video_files:
            # ground_truth = video_file.rsplit('_', 1)[0]

            predicted_class_name = single_inference(model, os.path.join(folder_path, video_file), device, class_to_index, weight_path=weight_path)

            writer.writerow([video_file, predicted_class_name])


In [35]:
#Config
folder_path = "/content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết"
model = ST_GCN
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class_to_index = dataset.class_to_index
print(class_to_index)
weight_path = '/content/drive/MyDrive/DrAIgon/final_best_weight.pth'
# weight_path = 'best_model_weights.pth'

In [36]:
batch_inference(folder_path=folder_path, model=model, device=device, class_to_index=class_to_index, weight_path=weight_path)

[2024-05-10 01:41:04] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video1.mp4


[2024-05-10 01:41:31] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video2.mp4


[2024-05-10 01:41:52] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video3.mp4


[2024-05-10 01:42:13] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video4.mp4


[2024-05-10 01:42:37] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video5.mp4


[2024-05-10 01:43:25] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video6.mp4


[2024-05-10 01:43:48] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video7.mp4


[2024-05-10 01:44:31] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video8.mp4


[2024-05-10 01:44:47] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video9.mp4


[2024-05-10 01:45:01] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video10.mp4


[2024-05-10 01:45:15] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video11.mp4


[2024-05-10 01:45:30] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video12.mp4


[2024-05-10 01:45:53] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video13.mp4


[2024-05-10 01:46:21] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video14.mp4


[2024-05-10 01:46:49] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video15.mp4


[2024-05-10 01:47:25] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video16.mp4


[2024-05-10 01:47:41] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video17t.mp4


[2024-05-10 01:47:57] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video18.mp4


[2024-05-10 01:48:25] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video19.mp4


[2024-05-10 01:48:38] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video20.mp4


[2024-05-10 01:49:30] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


Finish processing the video /content/drive/MyDrive/DrAIgon/Dữ liệu kiểm thử vòng chung kết/video21.mp4


[2024-05-10 01:49:49] INFO - pipelines.py - Fusing some of the model's layers. If this takes too much memory, you can deactivate it by setting `fuse_model=False`


In [38]:
import pandas as pd
results_file = "inference_results.csv"
results_df = pd.read_csv(results_file)
print(results_df)

           video              dự đoán
0     video1.mp4  barbell biceps curl
1     video2.mp4          bench press
2     video3.mp4    chest fly machine
3     video4.mp4          tricep dips
4     video5.mp4  incline bench press
5     video6.mp4        lateral raise
6     video7.mp4        leg extension
7     video8.mp4           leg raises
8     video9.mp4         lat pulldown
9    video10.mp4        lateral raise
10   video11.mp4        leg extension
11   video12.mp4      tricep Pushdown
12   video13.mp4                plank
13   video14.mp4                squat
14   video15.mp4              push-up
15   video16.mp4    romanian deadlift
16  video17t.mp4        russian twist
17   video18.mp4         lat pulldown
18   video19.mp4                squat
19   video20.mp4  incline bench press
20   video21.mp4          tricep dips
21   video22.mp4      tricep Pushdown
