In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL/AAGCN/
import os
import csv
import re

#folder_path = r'dataset'  # Use raw string to avoid issues with backslashes
#folder_path = r'30 class 28 actor (center)'
folder_path = r'/home/ibmelab/Documents/GG/VSLRecognition/AUTSL/videos'
csv_file_path = 'videos_list.csv'
labels_file_path = '1_1000_label.csv'
final_file_path = 'temp_videos_list.csv'

# Read the label and gloss mapping
label_to_gloss = {}
with open(labels_file_path, mode='r', encoding='utf-8') as labels_file:
    csv_reader = csv.DictReader(labels_file)
    for row in csv_reader:
        label = int(row['id_label_in_documents'])
        gloss = row['name']
        label_to_gloss[label] = gloss

# Write video names, labels, and glosses to a CSV file
with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['file', 'label', 'gloss'])

    for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.mp4', '.mkv', '.avi', '.mov', '.flv', '.wmv')):
            match = re.search(r'_(\d+)\.', filename)
            if match:
                label = int(match.group(1))
                gloss = label_to_gloss.get(label, 'Unknown')
            else:
                label = 'N/A'
                gloss = 'Unknown'

            full_filename = os.path.join(folder_path, filename)
            csv_writer.writerow([full_filename, label, gloss])

print(f'Video names have been written to {csv_file_path}')

# Find min label
with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    labels = [int(row["label"]) for row in csv_reader if row["label"].isdigit()]  # Convert to int and filter out 'N/A'
    min_label = min(labels) if labels else None

print("Minimum label:", min_label)

# Normalize labels
with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file, \
     open(final_file_path, mode='w', newline='', encoding='utf-8') as final_file:
    
    csv_reader = csv.DictReader(csv_file)
    fieldnames = csv_reader.fieldnames
    
    csv_writer = csv.DictWriter(final_file, fieldnames=fieldnames)
    csv_writer.writeheader()
    
    for row in csv_reader:
        if row['label'].isdigit():  # Check if label is a digit before converting
            row['label'] = str(int(row['label']) - min_label)  # Normalize and convert back to string
        csv_writer.writerow(row)

# Replace the original file with the updated file
os.replace(final_file_path, csv_file_path)

print("Labels have been updated and saved.")


In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/

In [None]:
import pandas as pd
import random
import re

videos_df = pd.read_csv('/home/ibmelab/Documents/GG/VSLRecognition/vsl/label1-200/full_data_1_200.csv')
videos_df['label'] = videos_df['file_name'].apply(lambda x: int(x.split('_')[-1].split('.')[0]))

def extract_actor_name(file_name):
    parts = file_name.split('_')
    if len(parts) > 1:
        return '_'.join(parts[:2])
    return file_name.strip()

videos_df['actor'] = videos_df['file_name'].apply(lambda x: extract_actor_name(x.split('\\')[-1]))

#print(videos_df['actor'])

unique_actors = videos_df['actor'].unique()
train_actors = random.sample(list(unique_actors), 24)
remaining_actors = [actor for actor in unique_actors if actor not in train_actors]
valid_actors = random.sample(remaining_actors, 2)
test_actors = [actor for actor in remaining_actors if actor not in valid_actors]

train_videos = videos_df[videos_df['actor'].isin(train_actors)][['file_name', 'label']]
valid_videos = videos_df[videos_df['actor'].isin(valid_actors)][['file_name', 'label']]
test_videos = videos_df[videos_df['actor'].isin(test_actors)][['file_name', 'label']]

train_videos.to_csv('train.csv', index=False)
valid_videos.to_csv('valid.csv', index=False)
test_videos.to_csv('test.csv', index=False)

unique_actors.sort()
#print(unique_actors)

print(f'Train videos: {len(train_videos)}')
print(f'Validation videos: {len(valid_videos)}')
print(f'Test videos: {len(test_videos)}')


In [None]:
import pandas as pd
import mediapipe as mp
import cv2
import os
from collections import defaultdict
from joblib import Parallel, delayed
from tqdm import tqdm

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

hand_landmarks = ['INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP', 
                  'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP', 
                  'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP', 'RING_FINGER_DIP', 'RING_FINGER_MCP', 
                  'RING_FINGER_PIP', 'RING_FINGER_TIP', 'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST']
pose_landmarks = ['LEFT_ANKLE', 'LEFT_EAR', 'LEFT_ELBOW', 'LEFT_EYE', 'LEFT_EYE_INNER', 'LEFT_EYE_OUTER', 
                  'LEFT_FOOT_INDEX', 'LEFT_HEEL', 'LEFT_HIP', 'LEFT_INDEX', 'LEFT_KNEE', 'LEFT_PINKY', 
                  'LEFT_SHOULDER', 'LEFT_THUMB', 'LEFT_WRIST', 'MOUTH_LEFT', 'MOUTH_RIGHT', 'NOSE', 
                  'RIGHT_ANKLE', 'RIGHT_EAR', 'RIGHT_ELBOW', 'RIGHT_EYE', 'RIGHT_EYE_INNER', 'RIGHT_EYE_OUTER', 
                  'RIGHT_FOOT_INDEX', 'RIGHT_HEEL', 'RIGHT_HIP', 'RIGHT_INDEX', 'RIGHT_KNEE', 'RIGHT_PINKY', 
                  'RIGHT_SHOULDER', 'RIGHT_THUMB', 'RIGHT_WRIST']

def extract_keypoint(video_path, label):
    video =   f"/home/ibmelab/Documents/GG/VSLRecognition/vsl/videos/{video_path}"
    cap = cv2.VideoCapture(video)
    
    keypoint_dict = defaultdict(list)
    count = 0

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            count += 1
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(image)

            if results.right_hand_landmarks:
                for idx, landmark in enumerate(results.right_hand_landmarks.landmark): 
                    keypoint_dict[f"{hand_landmarks[idx]}_right_x"].append(landmark.x)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_y"].append(landmark.y)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_z"].append(landmark.z)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_dict[f"{hand_landmarks[idx]}_right_x"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_y"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_z"].append(0)

            if results.left_hand_landmarks:
                for idx, landmark in enumerate(results.left_hand_landmarks.landmark): 
                    keypoint_dict[f"{hand_landmarks[idx]}_left_x"].append(landmark.x)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_y"].append(landmark.y)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_z"].append(landmark.z)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_dict[f"{hand_landmarks[idx]}_left_x"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_y"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_z"].append(0)

            if results.pose_landmarks:
                for idx, landmark in enumerate(results.pose_landmarks.landmark): 
                    keypoint_dict[f"{pose_landmarks[idx]}_x"].append(landmark.x)
                    keypoint_dict[f"{pose_landmarks[idx]}_y"].append(landmark.y)
                    keypoint_dict[f"{pose_landmarks[idx]}_z"].append(landmark.z)
            else:
                for idx in range(len(pose_landmarks)):
                    keypoint_dict[f"{pose_landmarks[idx]}_x"].append(0)
                    keypoint_dict[f"{pose_landmarks[idx]}_y"].append(0)
                    keypoint_dict[f"{pose_landmarks[idx]}_z"].append(0)

        keypoint_dict["frame"] = count
        keypoint_dict["video_path"] = video_path
        keypoint_dict["label"] = label

        return keypoint_dict

def process_videos(mode):
    csv_file = f"/home/ibmelab/Documents/GG/VSLRecognition/vsl/label1-200/label/labelRight/{mode}_labels.csv"
    data = pd.read_csv(csv_file)

    keypoints_list = Parallel(n_jobs=-1)(  # Chạy song song với số lượng core tối đa
        delayed(extract_keypoint)(row['file_name'], row['label_id']) for index, row in data.iterrows()
    )

    # Tạo DataFrame và lưu vào CSV
    keypoints_df = pd.DataFrame(keypoints_list)
    keypoints_df.to_csv(f"{mode}_set.csv", index=False)

if __name__ == '__main__':
    modes = ["train", "val", "test"]

    for mode in modes:
        process_videos(mode)


Try something new

In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL
import pandas as pd
import ast
import numpy as np
import os
from tqdm import tqdm

def find_index(array):
    for i, num in enumerate(array):
        if num != 0:
            return i
    return -1  # Return -1 if no non-zero element is found

def curl_skeleton(array):
    if sum(array) == 0:
        return array
    for i, location in enumerate(array):
        if location != 0:
            continue
        else:
            if i == 0 or i == len(array) - 1:
                continue
            else:
                if array[i + 1] != 0:
                    array[i] = float((array[i - 1] + array[i + 1]) / 2)
                else:
                    j = find_index(array[i + 1:])
                    if j == -1:
                        continue
                    array[i] = float(((1 + j) * array[i - 1] + array[i + 1 + j]) / (2 + j))
    return array

if __name__ == "__main__":
    hand_landmarks = [
        'INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP',
        'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP',
        'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP',
        'RING_FINGER_DIP', 'RING_FINGER_MCP', 'RING_FINGER_PIP', 'RING_FINGER_TIP',
        'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST'
    ]
    
    HAND_IDENTIFIERS = [id + "_right" for id in hand_landmarks] + [id + "_left" for id in hand_landmarks]
    POSE_IDENTIFIERS = ["RIGHT_SHOULDER", "LEFT_SHOULDER", "LEFT_ELBOW", "RIGHT_ELBOW"]
    body_identifiers = HAND_IDENTIFIERS + POSE_IDENTIFIERS

    modes = ["train", "valid", "test"]
    output_folder = "hand_keypoints"
    os.makedirs(output_folder, exist_ok=True)
    
    for mode in modes:
        print(f"Processing {mode}.csv")
        dataset = pd.read_csv(f"AAGCN/{mode}_set.csv")
        print(f"Number of videos in {mode} set: {len(dataset)}")
        
        for video_index, video in tqdm(dataset.iterrows(), total=dataset.shape[0]):
            video_name = video["video_path"]  # Assuming there's a 'video_path' column
            video_base = os.path.splitext(os.path.basename(video_name))[0]  # Get the base name of the video

            T = len(ast.literal_eval(video["INDEX_FINGER_DIP_right_x"]))
            num_keypoints = len(body_identifiers)
            keypoints_all_frames = np.empty((T, num_keypoints, 2))
            
            for index, identifier in enumerate(body_identifiers):
                data_keypoint_preprocess_x = curl_skeleton(ast.literal_eval(video[identifier + "_x"]))
                data_keypoint_preprocess_y = curl_skeleton(ast.literal_eval(video[identifier + "_y"]))
                keypoints_all_frames[:, index, 0] = np.asarray(data_keypoint_preprocess_x)
                keypoints_all_frames[:, index, 1] = np.asarray(data_keypoint_preprocess_y)
            
            # Tạo thư mục đầu ra cho video
            video_output_folder = os.path.join(output_folder, video_base)
            os.makedirs(video_output_folder, exist_ok=True)
            
            # Lưu dữ liệu keypoint cho từng frame
            for idx in range(T):
                frame_data = keypoints_all_frames[idx]
                output_file = os.path.join(video_output_folder, f"hand_kp_{idx:05d}.npy")
                np.save(output_file, frame_data)
                
        print(f"Processing of {mode} set completed.")


In [None]:
import cv2
import numpy as np
import mediapipe as mp
import os
from collections import defaultdict

# Define hand and pose landmarks as per your specification
hand_landmarks = [
    'INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP',
    'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP',
    'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP',
    'RING_FINGER_DIP', 'RING_FINGER_MCP', 'RING_FINGER_PIP', 'RING_FINGER_TIP',
    'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST'
]

HAND_IDENTIFIERS = [id + "_right" for id in hand_landmarks] + [id + "_left" for id in hand_landmarks]
POSE_IDENTIFIERS = ["RIGHT_SHOULDER", "LEFT_SHOULDER", "LEFT_ELBOW", "RIGHT_ELBOW"]
body_identifiers = HAND_IDENTIFIERS + POSE_IDENTIFIERS  # Total of 46 keypoints

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# Function to find the index of the first non-zero element
def find_index(array):
    for i, num in enumerate(array):
        if num != 0:
            return i
    return -1  # Return -1 if no non-zero element is found

# Function to fill in missing keypoints
def curl_skeleton(array):
    array = list(array)
    if sum(array) == 0:
        return array
    for i, location in enumerate(array):
        if location != 0:
            continue
        else:
            if i == 0 or i == len(array) - 1:
                continue
            else:
                if array[i + 1] != 0:
                    array[i] = float((array[i - 1] + array[i + 1]) / 2)
                else:
                    j = find_index(array[i + 1:])
                    if j == -1:
                        continue
                    array[i] = float(((1 + j) * array[i - 1] + array[i + 1 + j]) / (2 + j))
    return array

def process_video(video_path, save_dir):
    cap = cv2.VideoCapture(video_path)
    mp_holistic_instance = mp_holistic.Holistic(
        min_detection_confidence=0.5, min_tracking_confidence=0.5)

    # Prepare a dictionary to store keypoints
    keypoint_data = defaultdict(list)
    frame_count = 0

    with mp_holistic_instance as holistic:
        while frame_count < 30:
            ret, frame = cap.read()
            if not ret:
                break
            frame_count += 1

            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(image)

            # Process right hand
            if results.right_hand_landmarks:
                for idx, landmark in enumerate(results.right_hand_landmarks.landmark):
                    keypoint_data[f"{hand_landmarks[idx]}_right_x"].append(landmark.x)
                    keypoint_data[f"{hand_landmarks[idx]}_right_y"].append(landmark.y)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_data[f"{hand_landmarks[idx]}_right_x"].append(0)
                    keypoint_data[f"{hand_landmarks[idx]}_right_y"].append(0)

            # Process left hand
            if results.left_hand_landmarks:
                for idx, landmark in enumerate(results.left_hand_landmarks.landmark):
                    keypoint_data[f"{hand_landmarks[idx]}_left_x"].append(landmark.x)
                    keypoint_data[f"{hand_landmarks[idx]}_left_y"].append(landmark.y)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_data[f"{hand_landmarks[idx]}_left_x"].append(0)
                    keypoint_data[f"{hand_landmarks[idx]}_left_y"].append(0)

            # Process pose landmarks (shoulders and elbows)
            if results.pose_landmarks:
                landmark_dict = {mp_holistic.PoseLandmark(idx).name: idx for idx in range(len(mp_holistic.PoseLandmark))}
                for pose_identifier in POSE_IDENTIFIERS:
                    idx = landmark_dict.get(pose_identifier, None)
                    if idx is not None:
                        landmark = results.pose_landmarks.landmark[idx]
                        keypoint_data[f"{pose_identifier}_x"].append(landmark.x)
                        keypoint_data[f"{pose_identifier}_y"].append(landmark.y)
                    else:
                        keypoint_data[f"{pose_identifier}_x"].append(0)
                        keypoint_data[f"{pose_identifier}_y"].append(0)
            else:
                for pose_identifier in POSE_IDENTIFIERS:
                    keypoint_data[f"{pose_identifier}_x"].append(0)
                    keypoint_data[f"{pose_identifier}_y"].append(0)

    cap.release()

    # Process the keypoints
    T = frame_count  # Number of frames processed
    num_keypoints = len(body_identifiers)
    keypoints_all_frames = np.empty((T, num_keypoints, 2))

    for index, identifier in enumerate(body_identifiers):
        x_key = identifier + "_x"
        y_key = identifier + "_y"
        x_array = keypoint_data.get(x_key, [0]*T)
        y_array = keypoint_data.get(y_key, [0]*T)
        data_keypoint_preprocess_x = curl_skeleton(x_array)
        data_keypoint_preprocess_y = curl_skeleton(y_array)
        keypoints_all_frames[:, index, 0] = np.asarray(data_keypoint_preprocess_x)
        keypoints_all_frames[:, index, 1] = np.asarray(data_keypoint_preprocess_y)

    # Draw the keypoints on black background and save images
    os.makedirs(save_dir, exist_ok=True)
    image_size = (480, 640, 3)  # Height x Width x Channels

    for idx in range(T):
        black_image = np.zeros(image_size, dtype=np.uint8)
        keypoints = keypoints_all_frames[idx]

        # Reconstruct the landmarks
        left_hand_landmarks_list = []
        right_hand_landmarks_list = []
        pose_landmarks_list = []

        # Left hand
        for i in range(len(hand_landmarks)):
            x = keypoints[i + len(hand_landmarks), 0]
            y = keypoints[i + len(hand_landmarks), 1]
            left_hand_landmarks_list.append(
                mp.framework.formats.landmark_pb2.NormalizedLandmark(x=x, y=y))

        # Right hand
        for i in range(len(hand_landmarks)):
            x = keypoints[i, 0]
            y = keypoints[i, 1]
            right_hand_landmarks_list.append(
                mp.framework.formats.landmark_pb2.NormalizedLandmark(x=x, y=y))

        # Pose landmarks
        for i in range(len(POSE_IDENTIFIERS)):
            x = keypoints[2 * len(hand_landmarks) + i, 0]
            y = keypoints[2 * len(hand_landmarks) + i, 1]
            pose_landmarks_list.append(
                mp.framework.formats.landmark_pb2.NormalizedLandmark(x=x, y=y))

        # Create LandmarkList objects
        left_hand_landmarks = mp.framework.formats.landmark_pb2.NormalizedLandmarkList(
            landmark=left_hand_landmarks_list)
        right_hand_landmarks = mp.framework.formats.landmark_pb2.NormalizedLandmarkList(
            landmark=right_hand_landmarks_list)
        pose_landmarks = mp.framework.formats.landmark_pb2.NormalizedLandmarkList(
            landmark=pose_landmarks_list)

        # Draw landmarks on the black image
        mp_drawing.draw_landmarks(
            black_image,
            left_hand_landmarks,
            mp_holistic.HAND_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=2),
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

        mp_drawing.draw_landmarks(
            black_image,
            right_hand_landmarks,
            mp_holistic.HAND_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2),
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

        # Draw pose landmarks (custom connections)
        # Since we're only using shoulders and elbows, we'll define custom connections
        pose_connections = [
            (0, 2),  # RIGHT_SHOULDER to RIGHT_ELBOW
            (1, 3),  # LEFT_SHOULDER to LEFT_ELBOW
        ]

        mp_drawing.draw_landmarks(
            black_image,
            pose_landmarks,
            pose_connections,
            mp_drawing.DrawingSpec(color=(0, 255, 255), thickness=2, circle_radius=2),
            mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=2, circle_radius=2))

        # Save image
        output_file = os.path.join(save_dir, f"frame_{idx:05d}.png")
        cv2.imwrite(output_file, black_image)

if __name__ == "__main__":
    video_path = "path_to_your_video.mp4"  # Replace with your video path
    save_directory = "path_to_save_directory"  # Replace with your desired save directory
    process_video(video_path, save_directory)
    print("Processing completed.")


End of trying

In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN
import pandas as pd
import ast
import numpy as np
from tqdm import tqdm

def find_index(array):
    for i, num in enumerate(array):
        if num != 0:
            return i

def curl_skeleton(array):
    if sum(array) == 0:
        return array
    for i, location in enumerate(array):
        if location != 0:
            continue
        else:
            if i == 0 or i == len(array) - 1:
                continue
            else:
                if array[i + 1] != 0:
                    array[i] = float((array[i - 1] + array[i + 1]) / 2)
                else:
                    if sum(array[i:]) == 0:
                        continue
                    else:
                        j = find_index(array[i + 1:])
                        array[i] = float(((1 + j) * array[i - 1] + 1 * array[i + 1 + j]) / (2 + j))
    return array

if __name__ == "__main__":
    hand_landmarks = [
        'INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP', 
        'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP', 
        'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP', 
        'RING_FINGER_DIP', 'RING_FINGER_MCP', 'RING_FINGER_PIP', 'RING_FINGER_TIP', 
        'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST'
    ]
    
    HAND_IDENTIFIERS = [id + "_right" for id in hand_landmarks] + [id + "_left" for id in hand_landmarks]
    POSE_IDENTIFIERS = ["RIGHT_SHOULDER", "LEFT_SHOULDER", "LEFT_ELBOW", "RIGHT_ELBOW"]
    body_identifiers = HAND_IDENTIFIERS + POSE_IDENTIFIERS 
    
    frames = 80
    modes = ["train", "val", "test"]
    
    for mode in modes:
        print(f"Processing {mode}_set.csv")
        train_data = pd.read_csv(f"{mode}_set.csv")
        print(len(train_data))
        
        data = []
        labels = []
        
        for video_index, video in tqdm(train_data.iterrows(), total=train_data.shape[0]):  # Ensure tqdm knows total count
            # Remove the print statement for row_index
            # row_index = video["video_path"]
            # print(row_index)

            T = len(ast.literal_eval(video["INDEX_FINGER_DIP_right_x"]))
            current_row = np.empty(shape=(2, T, len(body_identifiers), 1))
            for index, identifier in enumerate(body_identifiers):
                data_keypoint_preprocess_x = curl_skeleton(ast.literal_eval(video[identifier + "_x"]))
                current_row[0, :, index, :] = np.asarray(data_keypoint_preprocess_x).reshape(T, 1)
                data_keypoint_preprocess_y = curl_skeleton(ast.literal_eval(video[identifier + "_y"]))
                current_row[1, :, index, :] = np.asarray(data_keypoint_preprocess_y).reshape(T, 1)

            if T < frames:
                target = np.zeros(shape=(2, frames, len(body_identifiers), 1))
                target[:, :T, :, :] = current_row
            else:
                target = current_row[:, :frames, :, :]
                
            data.append(target)
            labels.append(int(video["label"]))

        keypoint_data = np.stack(data, axis=0)
        label_data = np.stack(labels, axis=0)
        np.save(f'vsl199_{mode}_right_data_preprocess.npy', keypoint_data)
        np.save(f'vsl199_{mode}_right_label_preprocess.npy', label_data)
        print("Processed and saved successfully.")


In [None]:
import numpy as np

data = np.load('vsl199_test_right_label_preprocess.npy')
data_long = data.astype('int64')
np.save('test_label_preprocess.npy', data_long)

data = np.load('vsl199_train_right_label_preprocess.npy')
data_long = data.astype('int64')
np.save('train_label_preprocess.npy', data_long)

data = np.load('vsl199_val_right_label_preprocess.npy')
data_long = data.astype('int64')
np.save('val_label_preprocess.npy', data_long)

In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL/AAGCN
import pandas as pd
df = pd.read_csv('full_labels.csv')
unique_labels = df['label'].unique()
num_labels = len(unique_labels)
print(num_labels)

unique_labels.sort()
print(unique_labels)

In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Thay 'path_to_your_folder' bằng đường dẫn đến thư mục chứa video của bạn
video_folder = '/home/ibmelab/Documents/GG/VSLRecognition/vsl/videos'

video_names = []
frame_counts = []

# Duyệt qua tất cả các file trong thư mục
for filename in os.listdir(video_folder):
    # Kiểm tra nếu file là video (có thể mở rộng điều kiện này)
    if filename.endswith(('.mp4', '.avi', '.mov', '.mkv')):
        video_path = os.path.join(video_folder, filename)
        cap = cv2.VideoCapture(video_path)
        
        if not cap.isOpened():
            print(f"Không thể mở video: {filename}")
            continue
        
        # Lấy số frame của video
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_counts.append(frame_count)
        video_names.append(filename)
        cap.release()
        print(f"Video: {filename}, Số frame: {frame_count}")

# Tìm số frame ít nhất và video tương ứng
if frame_counts:
    min_frame_count = min(frame_counts)
    min_indices = [i for i, count in enumerate(frame_counts) if count == min_frame_count]
    print("\nSố frame ít nhất là:", min_frame_count)
    print("Video(s) có số frame ít nhất:")
    for idx in min_indices:
        print(f"- {video_names[idx]}")
else:
    print("Không có video nào được tìm thấy trong thư mục.")

# Sau khi tính toán frame_counts
max_frame_count = max(frame_counts)

# Vẽ biểu đồ histogram của số frame
plt.figure(figsize=(10, 6))
plt.hist(frame_counts, bins=30, edgecolor='black', range=(0, max_frame_count))
plt.title('Phân bố số frame của các video')
plt.xlabel('Số frame')
plt.ylabel('Số lượng video')
plt.grid(True)

# Giới hạn trục x từ 0 đến số frame tối đa
plt.xlim(0, max_frame_count)

plt.show()



In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN
from torch.utils.data import DataLoader
import torch
from torchinfo import summary
from feeder import FeederINCLUDE
from aagcn import AAGCN
import pytorch_lightning as pl
from pytorch_lightning.loggers.wandb import WandbLogger
from pytorch_lightning.callbacks import ModelCheckpoint
import wandb
from augumentation import Rotate, Left, Right, GaussianNoise, Compose
from torch.utils.data import random_split
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "1"

if __name__ == '__main__':

    # Hyper parameter tuning : batch_size, learning_rate, weight_decay
    #batch_size: 2->1
    config = {'batch_size': 170, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}
    # Load device
    device = "cuda:1" if torch.cuda.is_available() else "cpu"
    # Load model

    # num_class: 101 -> 3 (= number of labels)
    model = AAGCN(num_class=num_labels, num_point=46, num_person=1, in_channels=2,
                graph_args = {"layout" :"mediapipe_two_hand", "strategy": "spatial"},
                learning_rate=config["learning_rate"], weight_decay=config["weight_decay"])

    # Callback PL
    callbacks = [
        ModelCheckpoint(
            dirpath="checkpoints",
            monitor="valid_loss",
            mode="min",
            every_n_epochs = 2,
            filename='{epoch}-{valid_accuracy:.2f}-autsl-aagcn-smaller-model'
        ),
    ]
    # Augument 
    batch_size = config["batch_size"]
    transforms = Compose([
        Rotate(15, 80, 25, (0.5, 0.5))
    ])

    # Dataset class
    ''' Đổi tên path
    train_dataset = FeederINCLUDE(data_path=f"wsl100_train_data_preprocess.npy", label_path=f"wsl100_train_label_preprocess.npy",
                            transform=transforms)
    test_dataset = FeederINCLUDE(data_path=f"wsl100_test_data_preprocess.npy", label_path=f"wsl100_test_label_preprocess.npy")
    valid_dataset = FeederINCLUDE(data_path=f"wsl100_valid_data_preprocess.npy", label_path=f"wsl100_valid_label_preprocess.npy")
    '''
    %cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL/AAGCN/
    train_dataset = FeederINCLUDE(data_path=f"autsl_train_data_preprocess.npy", label_path=f"train_label_preprocess.npy",
                            transform=transforms)
    test_dataset = FeederINCLUDE(data_path=f"autsl_test_data_preprocess.npy", label_path=f"test_label_preprocess.npy")
    valid_dataset = FeederINCLUDE(data_path=f"autsl_valid_data_preprocess.npy", label_path=f"valid_label_preprocess.npy")

    # DataLoader
    train_dataloader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False)
    val_dataloader = DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False)

    specific_batch = next(iter(train_dataloader))
    print("Input shape ", specific_batch[0].shape)
    print("Data loader success")
    # Trainer PL
    %cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN
    trainer = pl.Trainer(max_epochs = 120, accelerator="auto", check_val_every_n_epoch = 1, 
                       devices = 1, callbacks=callbacks)
                    #  , logger=wandb_logger) # wandb
    trainer.fit(model, train_dataloader, val_dataloader)
    # Test PL (When test find the right ckpt_path and comment code line 58)
    # trainer.test(model, test_dataloader, ckpt_path="checkpoints/epoch=61-valid_accuracy=0.91-vsl_100-aagcn-2hand+preprocessing_keypoint+augment(v1).ckpt", 
                # verbose=True)

In [None]:
import numpy as np

# Load the NumPy file
# array = np.load('/home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/vsl199_train_data_preprocess.npy')
array = np.load('/home/ibmelab/Documents/GG/VSLRecognition/vsl/gcn_keypoints_v2/01_Co-Hien_1-100_1-2-3_0108___center_device02_signer01_center_ord1_2/hand_flow_00000.npy')
# array = np.load('/home/ibmelab/Documents/GG/VSLRecognition/vsl/hand_keypoints/01_Co-Hien_1-100_1-2-3_0108___center_device02_signer01_center_ord1_3/hand_kp_00000.npy')
# Print the shape of the array
print(array.shape)


In [None]:
import torch
from torch.utils.data import DataLoader
from feeder import FeederINCLUDE, FeederCustomV2
from aagcn import AAGCN
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from augumentation import Rotate, Compose, GaussianNoise
from pytorch_lightning.utilities.migration import pl_legacy_patch
import os
transforms = Compose([
        Rotate(15, 80, 25, (0.5, 0.5))
    ])
# Lấy mẫu từ FeederINCLUDE
%cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/
train_dataset_include = FeederINCLUDE(
        data_path="vsl199_train_right_data_preprocess.npy",
        label_path="train_label_preprocess.npy",
        transform=transforms)
data_include, label_include = train_dataset_include[3]
print(f"FeederINCLUDE sample data shape: {data_include.shape}")
print(f"FeederINCLUDE sample label: {label_include}")

# Lấy mẫu từ FeederCustomV2
train_dataset_custom = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','train')
data_custom, label_custom = train_dataset_custom[5]
print(f"FeederCustomV2 sample data shape: {data_custom.shape}")
print(f"FeederCustomV2 sample label: {label_custom}")


In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN/
import torch
from torch.utils.data import DataLoader
from feeder import FeederINCLUDE, FeederCustomV2
from aagcn import AAGCN
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from augumentation import Rotate, Compose, GaussianNoise
from pytorch_lightning.utilities.migration import pl_legacy_patch
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "1"
if __name__ == '__main__':
    # Hyperparameters
    config = {'batch_size': 160, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}
    num_labels = 199  # Set your new number of classes here

    # Load device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Initialize the model with the new number of classes
    model = AAGCN(
        num_class=num_labels,
        num_point=46,
        num_person=1,
        in_channels=2,
        graph_args={"layout": "mediapipe_two_hand", "strategy": "spatial"},
        learning_rate=config["learning_rate"],
        weight_decay=config["weight_decay"]
    )

    # Path to your checkpoint
    checkpoint_path = "checkpoints/epoch=65-valid_accuracy=0.86-autsl-aagcn-fold=0.ckpt"

    # Load the checkpoint
    with pl_legacy_patch():
        checkpoint = torch.load(checkpoint_path, map_location=device)

    # Get the state dict
    state_dict = checkpoint['state_dict']

    # Remove the keys for the final layer (adjust 'fc' to match your model's final layer name)
    filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith('fc.')}

    # Load the filtered state dict into the model
    model.load_state_dict(filtered_state_dict, strict=False)

    # Callbacks
    callbacks = [
        ModelCheckpoint(
            dirpath="checkpoints",
            monitor="valid_loss",
            mode="min",
            every_n_epochs=2,
            filename='{epoch}-{valid_accuracy:.2f}-vsl199-model'
        ),
    ]

    batch_size = config["batch_size"]
    transforms = Compose([
        Rotate(15, 80, 25, (0.5, 0.5))
    ])
    

    %cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/
    # Datasets
    # train_dataset = FeederINCLUDE(
    #     data_path="vsl199_train_right_data_preprocess.npy",
    #     label_path="train_label_preprocess.npy",
    #     transform=transforms
    # )
    # test_dataset = FeederINCLUDE(
    #     data_path="vsl199_test_right_data_preprocess.npy",
    #     label_path="test_label_preprocess.npy"
    # )
    # valid_dataset = FeederINCLUDE(
    #     data_path="vsl199_val_right_data_preprocess.npy",
    #     label_path="val_label_preprocess.npy"
    # )
    def gcn_bert_collate_fn_(batch):
        labels = torch.stack([s[1] for s in batch],dim = 0)
        keypoints = torch.stack([s[0] for s in batch],dim = 0) # bs t n c
                                                                                                             
        return {'keypoints':keypoints},labels

    collate_func = gcn_bert_collate_fn_
    train_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','train')
    test_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','test')
    valid_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','val')

    # DataLoaders
    train_dataloader = DataLoader(train_dataset, collate_fn=collate_func,batch_size=config["batch_size"], shuffle=True,
                                  num_workers = 12, prefetch_factor = 4, persistent_workers =  True)
    test_dataloader = DataLoader(test_dataset, collate_fn=collate_func,batch_size=config["batch_size"], shuffle=False,
                                 num_workers = 12, prefetch_factor = 4, persistent_workers =  True)
    val_dataloader = DataLoader(valid_dataset, collate_fn=collate_func,batch_size=config["batch_size"], shuffle=False,
                                num_workers = 12, prefetch_factor = 4, persistent_workers =  True)

    %cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN/
    # Trainer
    trainer = pl.Trainer(
        max_epochs=120,
        accelerator="auto",
        check_val_every_n_epoch=1,
        devices=1,
        callbacks=callbacks
    )

    # Start training
    trainer.fit(model, train_dataloader, val_dataloader)


In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN/
import torch
from torch.utils.data import DataLoader
from feeder import FeederINCLUDE, FeederCustomV2
from aagcn import AAGCN
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from augumentation import Rotate, Compose
from pytorch_lightning.utilities.migration import pl_legacy_patch


if __name__ == '__main__':
    # Hyperparameters
    config = {'batch_size': 90, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}
    num_labels = 199  # Set your new number of classes here

    # Load device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Initialize the model with the new number of classes
    model = AAGCN(
        num_class=num_labels,
        num_point=46,
        num_person=1,
        in_channels=2,
        graph_args={"layout": "mediapipe_two_hand", "strategy": "spatial"},
        learning_rate=config["learning_rate"],
        weight_decay=config["weight_decay"]
    )

    # Path to your checkpoint
    checkpoint_path = "checkpoints/epoch=95-valid_accuracy=0.73-vsl199.ckpt"

    # Load the checkpoint
    with pl_legacy_patch():
        checkpoint = torch.load(checkpoint_path, map_location=device)

    # Get the state dict
    state_dict = checkpoint['state_dict']

    # Remove the keys for the final layer (adjust 'fc' to match your model's final layer name)
    # filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith('fc.')}

    # Load the filtered state dict into the model
    model.load_state_dict(state_dict, strict=False)

    # Callbacks
    callbacks = [
        ModelCheckpoint(
            dirpath="checkpoints",
            monitor="valid_loss",
            mode="min",
            every_n_epochs=2,
            filename='{epoch}-{valid_accuracy:.2f}-vsl199-FeederCustom'
        ),
    ]

    batch_size = config["batch_size"]
    transforms = Compose([
        Rotate(15, 80, 25, (0.5, 0.5))
    ])
    

    %cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/
    # Datasets
    train_dataset = FeederINCLUDE(
        data_path="vsl199_train_data_preprocess.npy",
        label_path="train_label_preprocess.npy",
        transform=transforms
    )
    test_dataset = FeederINCLUDE(
        data_path="vsl199_test_data_preprocess.npy",
        label_path="test_label_preprocess.npy"
    )
    valid_dataset = FeederINCLUDE(
        data_path="vsl199_val_data_preprocess.npy",
        label_path="val_label_preprocess.npy"
    )
    # def gcn_bert_collate_fn_(batch):
    #     labels = torch.stack([s[1] for s in batch],dim = 0)
    #     keypoints = torch.stack([s[0] for s in batch],dim = 0) # bs t n c
                                                                                                             
    #     return {'keypoints':keypoints},labels

    # collate_func = gcn_bert_collate_fn_
    # train_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','train')
    # test_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','test')
    # valid_dataset = FeederCustomV2('/home/ibmelab/Documents/GG/VSLRecognition/vsl','val')

    # DataLoaders
    train_dataloader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True,
                                  num_workers = 12, prefetch_factor = 4, persistent_workers =  True)
    test_dataloader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False,
                                 num_workers = 12, prefetch_factor = 4, persistent_workers =  True)
    val_dataloader = DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False,
                                num_workers = 12, prefetch_factor = 4, persistent_workers =  True)

    %cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN/
    # Trainer
    trainer = pl.Trainer(
        max_epochs=120,
        accelerator="auto",
        check_val_every_n_epoch=1,
        devices=1,
        callbacks=callbacks
    )

    # Start training
    trainer.fit(model, train_dataloader, val_dataloader)


In [None]:
%cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN
from torch.utils.data import DataLoader
import torch
from torchinfo import summary
from feeder import FeederINCLUDE
from aagcn import AAGCN
import pytorch_lightning as pl
from pytorch_lightning.loggers.wandb import WandbLogger
from pytorch_lightning.callbacks import ModelCheckpoint
import wandb
from augumentation import Rotate, Left, Right, GaussianNoise, Compose
from torch.utils.data import random_split

if __name__ == '__main__':

    # Hyper parameter tuning : batch_size, learning_rate, weight_decay
    #batch_size: 2->1
    config = {'batch_size': 170, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}
    # Load device
    device = "cuda:1" if torch.cuda.is_available() else "cpu"
    # Load model

    # num_class: 101 -> 3 (= number of labels)
    model = AAGCN(num_class=199, num_point=46, num_person=1, in_channels=2,
                graph_args = {"layout" :"mediapipe_two_hand", "strategy": "spatial"},
                learning_rate=config["learning_rate"], weight_decay=config["weight_decay"])

    # Callback PL
    callbacks = [
        ModelCheckpoint(
            dirpath="checkpoints",
            monitor="valid_accuracy",
            mode="max",
            every_n_epochs = 2,
            filename='{epoch}-{valid_accuracy:.2f}-wsl_100-aagcn-{fold}'
        ),
    ]
    # Augument 
    batch_size = config["batch_size"]
    transforms = Compose([
        Rotate(15, 80, 25, (0.5, 0.5))
    ])

    # Dataset class
    ''' Đổi tên path
    train_dataset = FeederINCLUDE(data_path=f"wsl100_train_data_preprocess.npy", label_path=f"wsl100_train_label_preprocess.npy",
                            transform=transforms)
    test_dataset = FeederINCLUDE(data_path=f"wsl100_test_data_preprocess.npy", label_path=f"wsl100_test_label_preprocess.npy")
    valid_dataset = FeederINCLUDE(data_path=f"wsl100_valid_data_preprocess.npy", label_path=f"wsl100_valid_label_preprocess.npy")
    '''
    %cd /home/ibmelab/Documents/GG/VSLRecognition/vsl/AAGCN/
    # Datasets
    train_dataset = FeederINCLUDE(
        data_path="vsl199_train_right_data_preprocess.npy",
        label_path="train_label_preprocess.npy",
        transform=transforms
    )
    test_dataset = FeederINCLUDE(
        data_path="vsl199_test_right_data_preprocess.npy",
        label_path="test_label_preprocess.npy"
    )
    valid_dataset = FeederINCLUDE(
        data_path="vsl199_val_right_data_preprocess.npy",
        label_path="val_label_preprocess.npy"
    )

    # DataLoader
    train_dataloader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False)
    val_dataloader = DataLoader(valid_dataset, batch_size=config["batch_size"], shuffle=False)

    specific_batch = next(iter(train_dataloader))
    print("Input shape ", specific_batch[0].shape)
    print("Data loader success")
    # Trainer PL
    %cd /home/ibmelab/Documents/GG/VSLRecognition/HandSignRecogDev/AAGCN
    trainer = pl.Trainer(max_epochs = 120, accelerator="auto", check_val_every_n_epoch = 1, 
                       devices = 1, callbacks=callbacks)
                    #  , logger=wandb_logger) # wandb
    #trainer.fit(model, train_dataloader, val_dataloader)
    # Test PL (When test find the right ckpt_path and comment code line 58)
    trainer.test(model, test_dataloader, ckpt_path="checkpoints/epoch=53-valid_accuracy=0.62-vsl199-small-model.ckpt", 
                verbose=True)