# **Install and import**

In [None]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (34.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.5/34.5 MB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.9 sounddevice-0.4.6


In [None]:
import os
import cv2
import re
import subprocess
import numpy as np
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2
from mediapipe import solutions
from google.colab.patches import cv2_imshow
import torch
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM,Dense,GlobalAveragePooling1D,Dropout,Masking
from tensorflow.keras import Input
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import gc
import matplotlib.pyplot as plt
from tensorflow.keras.regularizers import L2
from tensorflow.keras.utils import pad_sequences
from PIL import Image
import random
import shutil

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# **main code**

## **Thao tác xoá background**

Khi chạy YOLOv5 trên một video, mỗi frame ta sẽ thu được một bounding box. Do đó một video sẽ thu được nhiều bounding box. Ta chọn phần bounding box có diện tích lớn nhất, khi đó crop video dựa trên bounding box đó sẽ không bỏ lỡ những thông tin quan trọng

    Parameters:
    - bounding_boxes (numpy.ndarray): Array containing bounding boxes.

    Returns:
    - largest_box (numpy.ndarray): The largest bounding box.

In [None]:
def get_largest_bounding_box(bounding_boxes):
    # Calculate the areas of bounding boxes (assuming the format [x_min, y_min, x_max, y_max])
    areas = (bounding_boxes[:, 2] - bounding_boxes[:, 0]) * (bounding_boxes[:, 3] - bounding_boxes[:, 1])

    # Find the index of the largest bounding box
    largest_index = np.argmax(areas)

    # Extract the largest bounding box
    largest_box = bounding_boxes[largest_index]

    return largest_box

Từ thông tin về bounding box có diện tích lớn nhất mà ta thu được, thực hiện crop video theo bounding box đó

    crop_video(input_video, output_video, x_min, y_min, x_max, y_max)
    Parameters:
    - input_video (str): Path to the input video file.
    - output_video (str): Path to save the output video after cropping.
    - x_min (int): Minimum x-coordinate of the cropping area.
    - y_min (int): Minimum y-coordinate of the cropping area.
    - x_max (int): Maximum x-coordinate of the cropping area.
    - y_max (int): Maximum y-coordinate of the cropping area.

In [None]:
def crop_video(input_video, output_video, x_min, y_min, x_max, y_max):
    # Open input video
    cap = cv2.VideoCapture(input_video)

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Convert absolute coordinates to integers
    start_row, end_row = int(y_min), int(y_max)
    start_col, end_col = int(x_min), int(x_max)

    # Clip coordinates to stay within the image bounds
    start_row, end_row = np.clip([start_row, end_row], 0, height - 1)
    start_col, end_col = np.clip([start_col, end_col], 0, width - 1)

    # Create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # or use 'XVID'
    out = cv2.VideoWriter(output_video, fourcc, fps, (end_col - start_col, end_row - start_row))

    # Read and crop frames
    while True:
        ret, frame = cap.read()

        if not ret:
            break

        # Crop the frame
        cropped_frame = frame[start_row:end_row, start_col:end_col]

        # Write the cropped frame to the output video
        out.write(cropped_frame)

    # Release video capture and writer objects
    cap.release()
    out.release()

Chạy YOLOv5. Model YOLOv5 sử dụng một model pretrained. Chạy trên từng frame ảnh của video input.

    Parameters:
    - video_path (str): Path to the input video file.
    - model_name (str): Name of the YOLOv5 model architecture (e.g., 'yolov5s', 'yolov5m', 'yolov5l', 'yolov5x').
    - model_path (str): Path to the YOLOv5 model file.
    - class_index (int): Index of the class to extract bounding boxes for.
    - confidence_threshold (float): Confidence threshold for object detection.

    Returns:
    - bounding_boxes (numpy.ndarray): NumPy array containing bounding boxes for the specified class and confidence threshold.

In [None]:
def run_yolov5_detector(video_path,model_name, model_path, class_index, confidence_threshold):
    # Load YOLOv5 model
    model = torch.hub.load(model_path, 'custom',
                           path=model_path+'/'+model_name+".pt",
                           source='local')

    # Open video file
    cap = cv2.VideoCapture(video_path)

    # List to store frames
    frames = []

    # Read frames from the video
    while True:
        ret, frame = cap.read()

        # Break the loop if there are no more frames
        if not ret:
            break

        # Convert frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(rgb_frame)

    # Close the video capture
    cap.release()

    # Run YOLOv5 on the frames
    results = model(frames)

    # Extract bounding boxes for the specified class and confidence threshold
    bounding_boxes = torch.cat(results.xyxy, dim=0)
    bounding_boxes = bounding_boxes[(bounding_boxes[:, -1] == class_index) & (bounding_boxes[:, -2] > confidence_threshold)]

    # Convert the bounding boxes to a NumPy array
    bounding_boxes_np = bounding_boxes.cpu().numpy()

    # Clear the CUDA memory cache
    torch.cuda.empty_cache()

    return bounding_boxes_np

In [None]:
def remove_background(input_video, output_video,model_name='yolov5m', model_path='/content/drive/MyDrive/yolov5',
                      class_index=0, confidence_threshold=0.8):

    # Run YOLOv5 detector to get bounding boxes for objects in the input video
    bounding_boxes = run_yolov5_detector(input_video,model_name
                                         , model_path,class_index,confidence_threshold)

    # Get the coordinates of the largest bounding box from the YOLOv5 output
    x_min, y_min, x_max, y_max, _, _ = get_largest_bounding_box(bounding_boxes)

    # Crop the input video based on the coordinates of the largest bounding box
    crop_video(input_video, output_video, x_min, y_min, x_max, y_max)

##**Trích xuất các keypoint**

Sử dụng MediaPipe của Google để thực hiện trích xuất các keypoint từ một video. Sử dụng 2 model riêng biệt để trích xuất:

   -  MediaPipe BlazePose GHUM 3D: Dùng để trích xuất các keypoint trên cơ thể

   -  MediaPipe Hands: Dùng để trích xuất các keypoint trên 2 bàn tay

2 model này đều đã được train sẵn nên chỉ cần download về và áp dụng. Từ một video có x frame, chúng ta thu được một file numpy có kích thước (x; 291) với mỗi frame có 291 parameters (33*5 + 21*3*2 = 291)

**Với run_pose_landmarker**



Model pose extraction sẽ trả về một array có kích thước 33x5 với 33 điểm keypoint, mỗi điểm gồm (x, y, z, visibility, presence) trong mỗi frame ảnh:
-	x, y: Toạ độ của điểm chính, giá trị nằm trong khoảng [0,0; 255.0] và là vị trí của điểm đó trên mặt phẳng toạ độ 2D của hình ảnh
-	z: Biểu thị cho độ sâu của điểm chính hoặc là khoảng cách của điểm chính so với mặt phẳng của hông người, có thể xem như hông người chính là gốc của trục z. Giá trị âm biểu thị rằng điểm đó đang nằm giữa hông người và camera, ngược lại giá trị dương biểu thị điểm đó đang nằm sau hông người.
-	visibility: Cung cấp thông tin về khả năng nhìn thấy hoặc cụ thể hơn là xác suất mà điểm chính đang nằm trong khung hình và không bị che khuất bởi các bộ phận của cơ thể hoặc các đối tượng khác. Giá trị nằm trong khoảng [min_float; max_float] và khi được đưa qua hàm sigmoid thì sẽ thể hiện tỉ lệ nhìn thấy được điểm đó
-	presence: tương tự như visibility, presence cũng có giá trị nằm trong khoảng [min_float; max_float] và khi được đưa qua hàm sigmoid thì sẽ thể hiện tỉ lệ điểm đó có nằm trong khung hình hay không


    Parameters:
        - input_file (str): Path to the input video file.
        - visualize (bool): Whether to visualize the pose landmarks.
        - model_path (str): Path to the pose landmark model.
                            If None, a default path is used.
        - min_pose_detection_confidence (float): Minimum confidence for pose detection.
        - min_tracking_confidence (float): Minimum confidence for pose landmarks tracking.
        - num_poses (int): Number of poses to detect.

    Returns:
        - np.array: Array of pose landmarks for each frame.


In [None]:
def run_pose_landmarker(input_file, visualize=False, model_path="/content/drive/MyDrive/pose_landmarker_full.task",
                        min_pose_detection_confidence=0.5, min_tracking_confidence=0.5, num_poses=1):
    # Load necessary components from mediapipe
    BaseOptions = mp.tasks.BaseOptions
    PoseLandmarker = mp.tasks.vision.PoseLandmarker
    PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
    VisionRunningMode = mp.tasks.vision.RunningMode

    # Set up PoseLandmarker options
    options = PoseLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=VisionRunningMode.IMAGE,
        min_pose_detection_confidence=min_pose_detection_confidence,
        min_tracking_confidence=min_tracking_confidence,
        num_poses=num_poses
    )

    # Open video capture
    cap = cv2.VideoCapture(input_file)
    list_pose_landmarks = []

    # Create PoseLandmarker from options
    with PoseLandmarker.create_from_options(options) as pose_landmarker:
        while True:
            # Read a frame from the video capture
            ret, frame = cap.read()
            if not ret:
                break

            # Convert the frame to the required format
            mp_frame = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))

            # Detect pose landmarks using PoseLandmarker
            pose_landmarker_result = pose_landmarker.detect(mp_frame)

            # (Optional) Draw pose landmarks on frame
            if visualize:
                annotated_image = draw_pose_landmarks_on_image(mp_frame.numpy_view(), pose_landmarker_result)
                cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))

            # Append pose landmarks to the list
            tmp = np.zeros(shape=(33 * num_poses, 5))
            for i, pose_landmarks in enumerate(pose_landmarker_result.pose_landmarks):
                for j, landmark in enumerate(pose_landmarks):
                    tmp[33 * i + j] = [landmark.x, landmark.y, landmark.z, landmark.visibility, landmark.presence]

            list_pose_landmarks.append(tmp)

    # Release video capture resources
    cap.release()

    return np.array(list_pose_landmarks)

**Với run_hand_landmarker:**

Model hand detection trả về một array kích thước 21x3 tượng trưng cho (x, y, z):
-	x, y: Toạ độ của điểm chính, giá trị nằm trong khoảng [0,0; 255.0] và là vị trí của điểm đó trên mặt phẳng toạ độ 2D của hình ảnh
-	z: Biểu thị cho độ sâu của điểm chính hoặc là khoảng cách của điểm chính so với mặt phẳng của hông người, có thể xem như hông người chính là gốc của trục z. Giá trị âm biểu thị rằng điểm đó đang nằm giữa hông người và camera, ngược lại giá trị dương biểu thị điểm đó đang nằm sau hông người.


    Parameters:
        - input_file (str): Path to the input video file.
        - visualize (bool): Whether to visualize the hand landmarks.
        - model_path (str): Path to the hand landmark model.
                           If None, a default path is used.
        - min_hand_detection_confidence (float): Minimum confidence for hand detection.
        - min_tracking_confidence (float): Minimum confidence for hand landmarks tracking.
        - num_hands (int): Number of hands to detect.

    Returns:
        - np.array: Array of hand landmarks for each frame.


In [None]:
def run_hand_landmarker(input_file, visualize=False, model_path="/content/drive/MyDrive/hand_landmarker.task",
                        min_hand_detection_confidence=0.5, min_tracking_confidence=0.5, num_hands=2):

    # Load necessary components from mediapipe
    BaseOptions = mp.tasks.BaseOptions
    HandLandmarker = mp.tasks.vision.HandLandmarker
    HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions
    VisionRunningMode = mp.tasks.vision.RunningMode

    # Set up HandLandmarker options
    options = HandLandmarkerOptions(
        base_options=BaseOptions(model_asset_path=model_path),
        running_mode=VisionRunningMode.IMAGE,
        min_hand_detection_confidence=min_hand_detection_confidence,
        min_tracking_confidence=min_tracking_confidence,
        num_hands=num_hands
    )

    # Open video capture
    cap = cv2.VideoCapture(input_file)
    list_hand_landmarks = []

    # Create HandLandmarker from options
    with HandLandmarker.create_from_options(options) as hand_landmarker:
        while True:
            # Read a frame from the video capture
            ret, frame = cap.read()
            if not ret:
                break

            # Convert the frame to the required format
            mp_frame = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.array(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))

            # Detect hand landmarks using HandLandmarker
            hand_landmarker_result = hand_landmarker.detect(mp_frame)

            # (Optional) Draw landmarks on frame
            if visualize:
                annotated_image = draw_hand_landmarks_on_image(mp_frame.numpy_view(), hand_landmarker_result)
                cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))

            # Append hand landmarks to the list
            tmp = np.zeros(shape=(21 * num_hands, 3))
            for i, hand_landmarks in enumerate(hand_landmarker_result.hand_landmarks):
                for j, landmark in enumerate(hand_landmarks):
                    tmp[i * 21 + j] = [landmark.x, landmark.y, landmark.z]

            list_hand_landmarks.append(tmp)

    # Release video capture resources
    cap.release()

    return np.array(list_hand_landmarks)

**Biểu diễn các keypoint trên video (optional)**

In [None]:
def draw_hand_landmarks_on_image(rgb_image, detection_result):
  hand_landmarks_list = detection_result.hand_landmarks
  handedness_list = detection_result.handedness
  annotated_image = np.copy(rgb_image)

  # Loop through the detected hands to visualize.
  for idx in range(len(hand_landmarks_list)):
    hand_landmarks = hand_landmarks_list[idx]
    handedness = handedness_list[idx]

    # Draw the hand landmarks.
    hand_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    hand_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in hand_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      hand_landmarks_proto,
      solutions.hands.HAND_CONNECTIONS,
      solutions.drawing_styles.get_default_hand_landmarks_style(),
      solutions.drawing_styles.get_default_hand_connections_style())

    # Get the top left corner of the detected hand's bounding box.
    height, width, _ = annotated_image.shape
    x_coordinates = [landmark.x for landmark in hand_landmarks]
    y_coordinates = [landmark.y for landmark in hand_landmarks]
    text_x = int(min(x_coordinates) * width)
    text_y = int(min(y_coordinates) * height)

  return annotated_image

In [None]:
def draw_pose_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

## **Data augmentation**

Để tăng cường lượng dữ liệu thiếu hụt trong mỗi label, nhóm thực hiện 3 cách augment khác nhau cho mỗi video trong tập 'training' gồm:

- Shear (shear_x, shear_y)
- Translate (translate_x, translate_y)
- Rotate

Như vậy, từ một video training gốc sẽ được thêm 5 video đã augmented

In [None]:
def generate_output_filenames(base_filename, num_files):
    name, extension = os.path.splitext(base_filename)
    new_filenames = [f"{name}_{i}{extension}" for i in range(1, num_files + 1)]
    return new_filenames

def temporal_interpolate(v_list, t, n):
    if len(v_list) == 1:
        return v_list[0]
    elif len(v_list) == 2:
        return v_list[0] + (v_list[1] - v_list[0]) * t / n
    else:
        raise NotImplementedError('Invalid degree')

def shear_x(imgs, v_list=[-0.3, 0.3]):
    for v in v_list:
        assert -0.3 <= v <= 0.3
    if random.random() > 0.5:
        v_list = [-v for v in v_list]

    out = [Image.fromarray(img) for img in imgs]
    out = [img.transform(img.size, Image.AFFINE, (1, temporal_interpolate(v_list, t, len(imgs) - 1), 0, 0, 1, 0)) for t, img in enumerate(out)]
    return [np.array(img) for img in out]

def shear_y(imgs, v_list=[-0.3, 0.3]):
    for v in v_list:
        assert -0.3 <= v <= 0.3
    if random.random() > 0.5:
        v_list = [-v for v in v_list]
    out = [Image.fromarray(img) for img in imgs]
    out = [img.transform(img.size, Image.AFFINE, (1, 0, 0, temporal_interpolate(v_list, t, len(imgs) - 1), 1, 0)) for t, img in enumerate(out)]
    return [np.array(img) for img in out]

def translate_x(imgs, v_list=[-80, 80]):  # [-150, 150] => percentage: [-0.45, 0.45]
    for v in v_list:
        assert -150 <= v <=150
    if random.random() > 0.5:
        v_list = [-v for v in v_list]
    out = [Image.fromarray(img) for img in imgs]
    out = [img.transform(img.size, Image.AFFINE, (1, 0, temporal_interpolate(v_list, t, len(imgs) - 1), 0, 1, 0)) for t, img in enumerate(out)]
    return [np.array(img) for img in out]

def translate_y(imgs, v_list=[-50, 50]):  # [-150, 150] => percentage: [-0.45, 0.45]
        for v in v_list:
            assert -150 <= v <=150
        if random.random() > 0.5:
            v_list = [-v for v in v_list]
        out = [Image.fromarray(img) for img in imgs]
        out = [img.transform(img.size, Image.AFFINE, (1, 0, 0, 0, 1, temporal_interpolate(v_list, t, len(imgs) - 1))) for t, img in enumerate(out)]
        return [np.array(img) for img in out]

def rotate(imgs, v_list=[-30, 30]):  # [-30, 30]
    for v in v_list:
        assert -30 <= v <= 30
    if random.random() > 0.5:
        v_list = [-v for v in v_list]
    out = [Image.fromarray(img) for img in imgs]
    out = [img.rotate(temporal_interpolate(v_list, t, len(imgs) - 1)) for t, img in enumerate(out)]
    return [np.array(img) for img in out]

In [None]:
def video_augmentation(input_video, output_video):
    # Check if the output video already exists
    if os.path.exists(output_video):
        return
    cap = cv2.VideoCapture(input_video)
    # Check if the video is opened successfully
    if not cap.isOpened():
        print("Error: Unable to open input video.")
        return

    _, extension = os.path.splitext(input_video)
    codec = 'mp4v' if extension.lower() == '.mp4' else 'vp80'

    # Generate filenames for the output videos
    output_filenames = generate_output_filenames(output_video, 5)

    # Set up VideoWriter objects for the output videos
    fourcc = cv2.VideoWriter_fourcc(*codec)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_size = (width, height)
    out_writers = [cv2.VideoWriter(filename, fourcc, fps, frame_size) for filename in output_filenames]

    # Collect all frames of the video
    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if ret:
            frames.append(frame)
        else:
            break
    cap.release()

    # Apply transformations to frames
    transformations = [shear_x(frames), shear_y(frames), translate_x(frames), translate_y(frames), rotate(frames)]

    # Write transformed frames to output videos
    for i, (writer, transform) in enumerate(zip(out_writers, transformations)):
        for frame in transform:
            writer.write(frame)
        writer.release()


## **Process data**

**preprocess_data_1**

Từ folder raw data ban đầu, đưa vào preprocess_data_1 ta thu được một folder data mới, bên trong folder đó tập 'train' và 'dev', 'test' đã được thực hiện **Thao tác xoá background** từ folder raw data

In [None]:
def preprocess_data_1(input_root, output_root):
    count=0
    # Loop over class folders in the input directory
    for class_name in os.listdir(input_root):
        class_path = os.path.join(input_root, class_name)
        count+=1
        print(class_name,count)
        # Loop over dataset folders (e.g., 'train', 'dev') in the class folder
        for dataset_name in os.listdir(class_path):
            dataset_path = os.path.join(class_path, dataset_name)

            # Create the output folder structure
            output_dataset_path = os.path.join(output_root, class_name, dataset_name)

            # Create the output folder if it doesn't exist
            if not os.path.exists(output_dataset_path):
                os.makedirs(output_dataset_path)

            # Loop over files in the dataset folder
            for file_name in os.listdir(dataset_path):
                source_path = os.path.join(dataset_path, file_name)
                destination_path = os.path.join(output_dataset_path, file_name)

                #Create the output file if it doesn't exist
                if not os.path.exists(destination_path):

                  # Call the remove_background function
                  try :
                    remove_background(source_path, destination_path)
                  except:
                    print("Remove file name: ",source_path)
                    os.remove(source_path)

**preprocess_data_2**

từ output của preprocess_data_1, ta tiếp tục thực hiện **Data augmentation**, kết quả sẽ là một folder preprocess_data_2 mới, bên trong folder tương tự input, nhưng mỗi video trong tập 'train' đều đã được augment

In [None]:
def preprocess_data_2(input_root, output_root):
    """
    Preprocesses data by either applying video augmentation or copying files, depending on the dataset.

    Parameters:
    - input_root (str): Path to the root folder containing input data.
    - output_root (str): Path to the root folder where preprocessed data will be saved.
    """
    # Loop through each class in the input root folder
    count=0
    for class_name in os.listdir(input_root):
        class_path = os.path.join(input_root, class_name)
        count+=1
        print(class_name,count)
        # Loop through each dataset (e.g., 'train', 'dev') in the class folder
        for dataset_name in os.listdir(class_path):
            dataset_path = os.path.join(class_path, dataset_name)

            # Create the output folder structure
            output_dataset_path = os.path.join(output_root, class_name, dataset_name)

            # Create the output folder if it doesn't exist
            if not os.path.exists(output_dataset_path):
                os.makedirs(output_dataset_path)

            # Loop through each file in the dataset
            for file_name in os.listdir(dataset_path):
                source_file_path = os.path.join(dataset_path, file_name)
                destination_file_path = os.path.join(output_dataset_path, file_name)

                # Check if the dataset is 'train' to apply video augmentation, otherwise copy the file
                if dataset_name == 'train':
                    # Call video_augmentation to apply transformations
                    video_augmentation(source_file_path, destination_file_path)
                shutil.copy(source_file_path, destination_file_path)

**preprocess_data_3**

Từ output của preprocess_data_2, ta đưa vào preprocess_data_3 sẽ thu được một folder mới, tất cả các video trong output này đều đã được thực hiện **Trích xuất các keypoint**, sẵn sàng để được train

In [None]:
def preprocess_data_3(input_root, output_root):
    """
    Preprocesses data by extracting hand and pose landmarks and saving the results as .npy files.

    Parameters:
    - input_root (str): Path to the root folder containing input data.
    - output_root (str): Path to the root folder where preprocessed data will be saved.
    """
    # Loop through each class in the input root folder
    count=0
    for class_name in os.listdir(input_root):
        class_path = os.path.join(input_root, class_name)
        count+=1
        print(class_name,count)
        # Loop through each dataset (e.g., 'train', 'dev') in the class folder
        for dataset_name in os.listdir(class_path):
            dataset_path = os.path.join(class_path, dataset_name)

            # Create the output folder structure
            output_dataset_path = os.path.join(output_root, class_name, dataset_name)

            # Create the output folder if it doesn't exist
            if not os.path.exists(output_dataset_path):
                os.makedirs(output_dataset_path)

            # Loop through each file in the dataset
            for file_name in os.listdir(dataset_path):
                source_file_path = os.path.join(dataset_path, file_name)
                destination_file_path = os.path.join(output_dataset_path, file_name)

                #Create the output file if it doesn't exist:
                if not os.path.exists(destination_file_path+'.npy'):

                  # Extract hand landmarks using the run_hand_landmarker function
                  hand_landmarks = run_hand_landmarker(source_file_path)

                  # Extract pose landmarks using the run_pose_landmarker function
                  pose_landmarks = run_pose_landmarker(source_file_path)

                  # Reshape the landmarks arrays for compatibility
                  hand_landmarks_reshaped = hand_landmarks.reshape((hand_landmarks.shape[0], -1))
                  pose_landmarks_reshaped = pose_landmarks.reshape((pose_landmarks.shape[0], -1))

                  # Concatenate the reshaped landmarks arrays
                  concatenated_result = np.concatenate((hand_landmarks_reshaped, pose_landmarks_reshaped), axis=1)

                  # Save the concatenated result to a .npy file
                  np.save(destination_file_path, concatenated_result)

## **Thao tác với dữ liệu**

**get_data**
Load data với những mode khác nhau ('train', 'dev', etc.) từ folder

    Parameters:
    - input_root (str): The root folder containing class folders and their respective datasets.
    - mode (str): The mode or dataset type to load (e.g., 'train', 'dev', etc.).
    - shuffle (bool): Whether to shuffle the loaded samples. Default is True.

    Returns:
    - X (list): List containing loaded data from files.
    - y (list): List containing corresponding class labels.



In [None]:
def get_data(input_root, mode, shuffle=True):

    X, y = [], []
    count=0
    # Loop through each class in the root folder
    for class_name in os.listdir(input_root):
        class_path = os.path.join(input_root, class_name)
        count+=1
        print(class_name,count)
        # Loop through each dataset (e.g., 'train', 'dev') in the class folder
        for dataset_name in os.listdir(class_path):

            dataset_path = os.path.join(class_path, dataset_name)

            # Check if the dataset is the specified mode ('train', 'dev', etc.)
            if dataset_name == mode:

                # Loop through each file in the dataset
                for file_name in os.listdir(dataset_path):
                    source_file_path = os.path.join(dataset_path, file_name)

                    # Load data from the file and append to X
                    loaded_data = np.load(source_file_path)
                    X.append(loaded_data)

                    # Append the corresponding class label to y
                    y.append(class_name)

    # Convert X and y to numpy arrays
    X, y = np.array(X), np.array(y)

    # Shuffle the samples if shuffle is True
    if shuffle:
        indices = np.arange(len(X))
        np.random.shuffle(indices)
        X, y = X[indices], y[indices]

    return X, y

**one_hot_encode**

One-hot decode các label


    Parameters:
    - y (list): List of labels to be one-hot encoded.
    - label_dict (dict): Dictionary mapping labels to index values.

    Returns:
    - y_one_hot (numpy.ndarray): One-hot encoded representation of the input labels.


In [None]:
def one_hot_encode(y, label_dict):

    # Tạo một danh sách chứa các giá trị chỉ mục tương ứng với các nhãn
    y_processed = [label_dict[key] for key in y]

    # Sử dụng to_categorical để one-hot encode danh sách giá trị chỉ mục
    y_one_hot = to_categorical(y_processed, num_classes=np.max(y_processed) + 1)

    return y_one_hot

## **Tạo model và đánh giá model**

In [None]:
def create_model(learning_rate,num_classes):
  model=Sequential([
    Masking(),
    LSTM(units=128,dropout=0.6,return_sequences=True),
    LSTM(units=128,dropout=0.6,return_sequences=True),
    GlobalAveragePooling1D(),
    Dense(units=num_classes,kernel_regularizer=L2(0.0003),activation='softmax')
    ]
  )
  model.compile(optimizer=Adam(learning_rate=learning_rate),
                loss='categorical_crossentropy',
                metrics=['categorical_accuracy'])
  return model

In [None]:
def data_generator(data, batch_size):
    while True:
        batch_indices = np.random.choice(len(data[0]), batch_size, replace=False)
        batch = (data[0][batch_indices], data[1][batch_indices])
        yield batch

def training(model, batch_size, epochs, training_data, validation_data, initial_checkpoint_path, final_checkpoint_path, early_stopping_patience=5):
    if initial_checkpoint_path is not None:
        model.load_weights(initial_checkpoint_path)

    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor='val_categorical_accuracy', restore_best_weights=True, patience=early_stopping_patience
    )

    train_generator = data_generator(training_data, batch_size)

    history = model.fit(
        x=train_generator,
        steps_per_epoch=len(training_data[0]) // batch_size,
        validation_data=validation_data,
        epochs=epochs,
        callbacks=[early_stopping_callback]
    )

    model.save_weights(final_checkpoint_path)

    return model, history


In [None]:
def plot_smoothed_metrics(history, window_size=20):
    """
    Plot mean training and validation metrics with moving average smoothing.

    Parameters:
    - history (keras.callbacks.History): Keras History object containing training history.
    - window_size (int): Size of the moving average window. Default is 20.
    """
    plt.figure(figsize=(12, 6))
    epoch_loss = []
    epoch_val_loss = []
    epoch_accuracy = []
    epoch_val_accuracy = []

    for i in range(0, len(history.epoch), window_size):
        mean_loss = np.mean(history.history['loss'][i:i+window_size])
        mean_val_loss = np.mean(history.history['val_loss'][i:i+window_size])
        mean_accuracy = np.mean(history.history['categorical_accuracy'][i:i+window_size])
        mean_val_accuracy = np.mean(history.history['val_categorical_accuracy'][i:i+window_size])

        epoch_loss.append(mean_loss)
        epoch_val_loss.append(mean_val_loss)
        epoch_accuracy.append(mean_accuracy)
        epoch_val_accuracy.append(mean_val_accuracy)

    plt.subplot(2, 2, 1)
    plt.plot(epoch_loss, label='Mean Training Loss')
    plt.xlabel(f'Epoch (every {window_size} epochs)')
    plt.ylabel('Mean Loss')
    plt.legend()
    plt.title(f'Mean Training Loss over Epochs (Every {window_size} Epochs)')

    plt.subplot(2, 2, 2)
    plt.plot(epoch_val_loss, label='Mean Validation Loss')
    plt.xlabel(f'Epoch (every {window_size} epochs)')
    plt.ylabel('Mean Loss')
    plt.legend()
    plt.title(f'Mean Validation Loss over Epochs (Every {window_size} Epochs)')

    plt.subplot(2, 2, 3)
    plt.plot(epoch_accuracy, label='Average Training Accuracy')
    plt.xlabel(f'Epoch (every {window_size} epochs)')
    plt.ylabel('Average Accuracy')
    plt.legend()
    plt.title(f'Average Training Accuracy over Epochs (Every {window_size} Epochs)')

    plt.subplot(2, 2, 4)
    plt.plot(epoch_val_accuracy, label='Average Validation Accuracy')
    plt.xlabel(f'Epoch (every {window_size} epochs)')
    plt.ylabel('Average Accuracy')
    plt.legend()
    plt.title(f'Average Validation Accuracy over Epochs (Every {window_size} Epochs)')

    plt.tight_layout()
    plt.show()

In [None]:
def top_k_accuracy(predictions, true_labels_one_hot, k=5):
    """
    Compute Top-K accuracy.
    Parameters:
    - predictions: Model predictions.
    - true_labels_one_hot: True labels encoded in one-hot format.
    - k: Number of top predictions to consider.

    Returns:
    - accuracy: Top-K accuracy.
    """
    # Get the indices of true labels from the one-hot encoding
    true_labels = tf.argmax(true_labels_one_hot, axis=1)

    # Choose the top-k predictions
    top_k_op = tf.nn.in_top_k(predictions=predictions, targets=true_labels, k=k)

    # Compute the accuracy based on the Top-K metric
    accuracy = tf.reduce_mean(tf.cast(top_k_op, tf.float32))

    return accuracy.numpy().item()

# **execute**

In [None]:
X_train,y_train=get_data("/content/drive/MyDrive/processed data 3",mode="train")
X_dev,y_dev=get_data("/content/drive/MyDrive/processed data 3",mode="dev")

chào 1
tạm biệt 2
học sinh 3
con trai 4
con gái 5
thầy giáo 6
cô giáo 7
bố 8
mẹ 9
anh 10
chị 11
gia đình 12
em 13
con 14
ông 15
bà 16
bác 17
chú 18
cậu 19
dì 20
cô 21
đầu 22
chân 23
tay 24
mình 25
mắt 26
mũi 27
miệng 28
má 29
tóc 30
tai 31
nằm 32
đi 33
đứng 34
ngồi 35
chạy 36
bò 37
ngủ 38
thức 39
nhìn 40
nghe 41
ngửi 42
ăn 43
uống 44
bàn chải 45
lược 46
chậu 47
khăn mặt 48
sạch 49
bẩn 50
áo 51
quần 52
mũ 53
mặc 54
cởi 55
sách 56
thước kẻ 57
bút 58
học 59
nói 60
đọc 61
viết 62
vẽ 63
đúng 64
sai 65
trường học 66
nhà vệ sinh 67
lớp học 68
bảng 69
bàn 70
ghế 71
ôn tập 72
kiểm tra 73
thực hành 74
hỏi 75
trả lời 76
tốt 77
trung bình 78
kém 79
chăm chỉ 80
lười 81
thông minh 82
hiểu 83
không hiểu 84
thưởng 85
phạt 86
hát 87
múa 88
thể dục 89
chơi 90
nhảy dây 91
đá bóng 92
đá cầu 93
bắn bi 94
kéo co 95
trò chơi 96
hàng dọc 97
hàng ngang 98
thẳ

  X, y = np.array(X), np.array(y)


cậu 19
dì 20
cô 21
đầu 22
chân 23
tay 24
mình 25
mắt 26
mũi 27
miệng 28
má 29
tóc 30
tai 31
nằm 32
đi 33
đứng 34
ngồi 35
chạy 36
bò 37
ngủ 38
thức 39
nhìn 40
nghe 41
ngửi 42
ăn 43
uống 44
bàn chải 45
lược 46
chậu 47
khăn mặt 48
sạch 49
bẩn 50
áo 51
quần 52
mũ 53
mặc 54
cởi 55
sách 56
thước kẻ 57
bút 58
học 59
nói 60
đọc 61
viết 62
vẽ 63
đúng 64
sai 65
trường học 66
nhà vệ sinh 67
lớp học 68
bảng 69
bàn 70
ghế 71
ôn tập 72
kiểm tra 73
thực hành 74
hỏi 75
trả lời 76
tốt 77
trung bình 78
kém 79
chăm chỉ 80
lười 81
thông minh 82
hiểu 83
không hiểu 84
thưởng 85
phạt 86
hát 87
múa 88
thể dục 89
chơi 90
nhảy dây 91
đá bóng 92
đá cầu 93
bắn bi 94
kéo co 95
trò chơi 96
hàng dọc 97
hàng ngang 98
thẳng 99
vòng tròn 100
trước 101
sau 102
trên 103
dưới 104
trong 105
ngoài 106
lịch 107
ngày 108
tháng 109
năm 110
thứ 111
chủ nhật 112
tuần 113
nghỉ

In [None]:
X_test,y_test=get_data("/content/drive/MyDrive/processed data 3",mode="crop")

chào 1
tạm biệt 2
học sinh 3
con trai 4
con gái 5
thầy giáo 6
cô giáo 7
bố 8
mẹ 9
anh 10
chị 11
gia đình 12
em 13
con 14
ông 15
bà 16
bác 17
chú 18
cậu 19
dì 20
cô 21
đầu 22
chân 23
tay 24
mình 25
mắt 26
mũi 27
miệng 28
má 29
tóc 30
tai 31
nằm 32
đi 33
đứng 34
ngồi 35
chạy 36
bò 37
ngủ 38
thức 39
nhìn 40
nghe 41
ngửi 42
ăn 43
uống 44
bàn chải 45
lược 46
chậu 47
khăn mặt 48
sạch 49
bẩn 50
áo 51
quần 52
mũ 53
mặc 54
cởi 55
sách 56
thước kẻ 57
bút 58
học 59
nói 60
đọc 61
viết 62
vẽ 63
đúng 64
sai 65
trường học 66
nhà vệ sinh 67
lớp học 68
bảng 69
bàn 70
ghế 71
ôn tập 72
kiểm tra 73
thực hành 74
hỏi 75
trả lời 76
tốt 77
trung bình 78
kém 79
chăm chỉ 80
lười 81
thông minh 82
hiểu 83
không hiểu 84
thưởng 85
phạt 86
hát 87
múa 88
thể dục 89
chơi 90
nhảy dây 91
đá bóng 92
đá cầu 93
bắn bi 94
kéo co 95
trò chơi 96
hàng dọc 97
hàng ngang 98
thẳ

  X, y = np.array(X), np.array(y)


In [None]:
X_processed_train = pad_sequences(X_train, dtype='float32', padding='post')
X_processed_dev = pad_sequences(X_dev, dtype='float32', padding='post')

In [None]:
X_processed_test = pad_sequences(X_test, dtype='float32', padding='post')

In [None]:
import json

file_path = "/content/drive/MyDrive/label_dict.json"

with open(file_path, 'r') as file:
    label_dict = json.load(file)

In [None]:
print(X_processed_train.shape)

(9984, 231, 291)


In [None]:
num_classes=len(label_dict)
print(num_classes)

500


In [None]:
y_one_hot_train=one_hot_encode(y_train,label_dict)
y_one_hot_dev=one_hot_encode(y_dev,label_dict)

In [None]:
y_one_hot_test=one_hot_encode(y_test,label_dict)

In [None]:
model=create_model(1e-4,num_classes)
model,history=training(model,256,100,(X_processed_train,y_one_hot_train),
                       (X_processed_dev,y_one_hot_dev),None,
                       "/content/drive/MyDrive/weight/cp.ckpt",10
                       )



Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100


In [None]:
model.load_weights("/content/drive/MyDrive/weight/cp.ckpt")
predictions=model(X_processed_test)
true_labels_one_hot=y_one_hot_test
print("Top-1 Accuracy", top_k_accuracy(predictions,true_labels_one_hot,1))
print("Top-5 Accuracy", top_k_accuracy(predictions,true_labels_one_hot,5))
print("Top-10 Accuracy", top_k_accuracy(predictions,true_labels_one_hot,10))



Top-1 Accuracy 0.0820000022649765
Top-5 Accuracy 0.19599999487400055
Top-10 Accuracy 0.24799999594688416
