In [None]:
!pip install numpy==1.26.4 protobuf==4.25.3 mediapipe==0.10.20 opencv-contrib-python==4.8.1.78 pandas==2.1.4 joblib==1.4.2 tqdm==4.66.4
!pip install torch==2.3.1 torchvision==0.18.1 torchaudio==2.3.1 pytorch-lightning==2.3.3

In [None]:
import pandas as pd
import mediapipe as mp
import cv2
from collections import defaultdict
from joblib import Parallel, delayed
import tqdm
import os
import csv
import re
from sklearn.model_selection import KFold
import numpy as np

Load the videos to videos_list.csv (columns: file (path), label, gloss, video name, actor)

In [None]:
folder_path = r'10 class 28 actor (different size)'
csv_file_path = 'videos_list.csv'
labels_file_path = '1_1000_label.csv'
final_file_path = 'temp_videos_list.csv'

label_to_gloss = {}
with open(labels_file_path, mode='r', encoding='utf-8') as labels_file:
    csv_reader = csv.DictReader(labels_file)
    for row in csv_reader:
        label = int(row['id_label_in_documents'])
        gloss = row['name']
        label_to_gloss[label] = gloss

with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['file', 'label', 'gloss', 'video_name', 'actor'])

    for filename in os.listdir(folder_path):
        if filename.lower().endswith(('.mp4', '.mkv', '.avi', '.mov', '.flv', '.wmv')):
            actor = filename.split('_')[0]

            match = re.search(r'_(\d+)\.', filename)
            if match:
                label = int(match.group(1))
                gloss = label_to_gloss.get(label, 'Unknown')
            else:
                label = 'N/A'
                gloss = 'Unknown'

            if label != 200:
                full_filename = os.path.join(folder_path, filename)
                csv_writer.writerow([full_filename, label, gloss, filename, actor])

print(f'Video names have been written to {csv_file_path}')

# Find min label
with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file:
    csv_reader = csv.DictReader(csv_file)
    labels = [int(row["label"]) for row in csv_reader if row["label"].isdigit()]
    min_label = min(labels) if labels else None

print("Minimum label:", min_label)

# Normalize labels
with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file, \
     open(final_file_path, mode='w', newline='', encoding='utf-8') as final_file:

    csv_reader = csv.DictReader(csv_file)
    fieldnames = csv_reader.fieldnames

    csv_writer = csv.DictWriter(final_file, fieldnames=fieldnames)
    csv_writer.writeheader()

    for row in csv_reader:
        if row['label'].isdigit(): 
            row['label'] = str(int(row['label']) - min_label)
        csv_writer.writerow(row)

os.replace(final_file_path, csv_file_path)

print("Labels have been updated and saved.")


Number of labels in the dataset

In [None]:
num_labels = len(set(labels))
print(num_labels)
print(set(labels))

Balance the number of videos

In [None]:
import csv, re, random, collections, pathlib, copy

'''
Some videos are missing in the dataset, making the number of videos of each label not the same.
For example, label 1 has the highest number of videos, which is 10.
We will duplicate videos of other labels (with fewer than 10) so that each label has 10 videos.
'''

def balance_videos_list(input_csv="videos_list.csv", output_csv="videos_list_balanced.csv"):
    def canonicalize_actor(s):
        s = str(s)
        m = re.match(r'^(\d{1,2})', s)
        return f"{int(m.group(1)):02d}" if m else s  # keep as-is if no leading digits

    with open(input_csv, newline='', encoding='utf-8') as f:
        rows = list(csv.DictReader(f))
        fieldnames = rows[0].keys()

    for r in rows:
        r['actor'] = canonicalize_actor(r['actor'])

    actors = sorted({r['actor'] for r in rows})
    total_actors = len(actors)
    print(f"Detected {total_actors} actors: {actors}")

    by_label = collections.defaultdict(list)
    for r in rows:
        by_label[r['label']].append(r)

    def patch_row(base_row, missing_actor):
        new_row = copy.deepcopy(base_row)
        new_actor_code = missing_actor
        new_row['actor'] = new_actor_code

        def swap_code(s):
            return re.sub(r'^\d{1,2}(?=[_-])', new_actor_code, s)

        new_row['video_name'] = swap_code(new_row['video_name'])
        new_row['file'] = swap_code(new_row['file'])
        return new_row

    for label, label_rows in list(by_label.items()):
        present_actors = {r['actor'] for r in label_rows} 
        missing = [a for a in actors if a not in present_actors]
        for ma in missing:
            donor = random.choice(label_rows)
            synthetic = patch_row(donor, ma)
            label_rows.append(synthetic)
        by_label[label] = label_rows

    final_rows = []
    for label, label_rows in by_label.items():
        seen, uniques, extras = {}, [], []
        for r in label_rows:
            actor = r['actor'] 
            if actor not in seen:
                seen[actor] = True
                uniques.append(r)
            else:
                extras.append(r)
        while len(uniques) < total_actors and extras:
            uniques.append(extras.pop())
        final_rows.extend(uniques[:total_actors])

    pathlib.Path(output_csv).write_text('', encoding='utf-8')
    with open(output_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(final_rows)

    print(f"\nBalanced file written to: {output_csv}")
    counter = collections.Counter(r['label'] for r in final_rows)

    print("\nCount per label (ascending):")
    for lab, cnt in sorted(counter.items(), key=lambda x: x[1]):
        print(f"{lab:>4} : {cnt}")

balance_videos_list()


Extract keypoints

In [None]:
import pandas as pd
import mediapipe as mp
import cv2
import os
from collections import defaultdict
from joblib import Parallel, delayed
from tqdm import tqdm

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

hand_landmarks = ['INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP',
                  'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP',
                  'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP', 'RING_FINGER_DIP', 'RING_FINGER_MCP',
                  'RING_FINGER_PIP', 'RING_FINGER_TIP', 'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST']
pose_landmarks = ['LEFT_ANKLE', 'LEFT_EAR', 'LEFT_ELBOW', 'LEFT_EYE', 'LEFT_EYE_INNER', 'LEFT_EYE_OUTER',
                  'LEFT_FOOT_INDEX', 'LEFT_HEEL', 'LEFT_HIP', 'LEFT_INDEX', 'LEFT_KNEE', 'LEFT_PINKY',
                  'LEFT_SHOULDER', 'LEFT_THUMB', 'LEFT_WRIST', 'MOUTH_LEFT', 'MOUTH_RIGHT', 'NOSE',
                  'RIGHT_ANKLE', 'RIGHT_EAR', 'RIGHT_ELBOW', 'RIGHT_EYE', 'RIGHT_EYE_INNER', 'RIGHT_EYE_OUTER',
                  'RIGHT_FOOT_INDEX', 'RIGHT_HEEL', 'RIGHT_HIP', 'RIGHT_INDEX', 'RIGHT_KNEE', 'RIGHT_PINKY',
                  'RIGHT_SHOULDER', 'RIGHT_THUMB', 'RIGHT_WRIST']

def extract_keypoint(video_path, label, actor):
    cap = cv2.VideoCapture(video_path)

    keypoint_dict = defaultdict(list)
    count = 0

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            count += 1
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(image)

            if results.right_hand_landmarks:
                for idx, landmark in enumerate(results.right_hand_landmarks.landmark):
                    keypoint_dict[f"{hand_landmarks[idx]}_right_x"].append(landmark.x)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_y"].append(landmark.y)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_z"].append(landmark.z)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_dict[f"{hand_landmarks[idx]}_right_x"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_y"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_right_z"].append(0)

            if results.left_hand_landmarks:
                for idx, landmark in enumerate(results.left_hand_landmarks.landmark):
                    keypoint_dict[f"{hand_landmarks[idx]}_left_x"].append(landmark.x)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_y"].append(landmark.y)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_z"].append(landmark.z)
            else:
                for idx in range(len(hand_landmarks)):
                    keypoint_dict[f"{hand_landmarks[idx]}_left_x"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_y"].append(0)
                    keypoint_dict[f"{hand_landmarks[idx]}_left_z"].append(0)

            if results.pose_landmarks:
                for idx, landmark in enumerate(results.pose_landmarks.landmark):
                    keypoint_dict[f"{pose_landmarks[idx]}_x"].append(landmark.x)
                    keypoint_dict[f"{pose_landmarks[idx]}_y"].append(landmark.y)
                    keypoint_dict[f"{pose_landmarks[idx]}_z"].append(landmark.z)
            else:
                for idx in range(len(pose_landmarks)):
                    keypoint_dict[f"{pose_landmarks[idx]}_x"].append(0)
                    keypoint_dict[f"{pose_landmarks[idx]}_y"].append(0)
                    keypoint_dict[f"{pose_landmarks[idx]}_z"].append(0)

        keypoint_dict["frame"] = count
        keypoint_dict["video_path"] = video_path
        keypoint_dict["label"] = label
        keypoint_dict["actor"] = actor

        return keypoint_dict

def process_videos():
    csv_file = f"videos_list_balanced.csv"
    data = pd.read_csv(csv_file)

    keypoints_list = Parallel(n_jobs=-1)(
        delayed(extract_keypoint)(row['file'], row['label'], row['actor']) for index, row in tqdm(data.iterrows(), total=len(data), desc="Processing videos", leave=False)
    )

    keypoints_df = pd.DataFrame(keypoints_list)
    keypoints_df.to_csv(f"vsl{num_labels}_keypoints.csv", index=False)

if __name__ == '__main__':
    process_videos()


Interpolation

In [None]:
import pandas as pd
import numpy as np
import ast
from tqdm import tqdm

def find_index(array):
    for i, num in enumerate(array):
        if num != 0:
            return i

def curl_skeleton(array):
    if sum(array) == 0:
        return array
    for i, location in enumerate(array):
        if location != 0:
            continue
        else:
            if i == 0 or i == len(array) - 1:
                continue
            else:
                if array[i + 1] != 0:
                    array[i] = float((array[i - 1] + array[i + 1]) / 2)
                else:
                    if sum(array[i:]) == 0:
                        continue
                    else:
                        j = find_index(array[i + 1:])
                        array[i] = float(((1 + j) * array[i - 1] + 1 * array[i + 1 + j]) / (2 + j))
    return array

def interpolate_keypoints(input_file, output_file, body_identifiers):
    train_data = pd.read_csv(input_file)
    output_df = train_data.copy()

    for index, video in tqdm(train_data.iterrows(), total=train_data.shape[0]):
        for identifier in body_identifiers:
            # Interpolate the x and y keypoints
            x_values = curl_skeleton(ast.literal_eval(video[identifier + "_x"]))
            y_values = curl_skeleton(ast.literal_eval(video[identifier + "_y"]))

            output_df.at[index, identifier + "_x"] = str(x_values)
            output_df.at[index, identifier + "_y"] = str(y_values)

    output_df.to_csv(output_file, index=False)
    print(f"Interpolated keypoints saved to {output_file}")

if __name__ == "__main__":
    input_file_path = f"vsl{num_labels}_keypoints.csv"
    output_file_path = f"vsl{num_labels}_interpolated_keypoints.csv"

    hand_landmarks = [
        'INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP',
        'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP',
        'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP',
        'RING_FINGER_DIP', 'RING_FINGER_MCP', 'RING_FINGER_PIP', 'RING_FINGER_TIP',
        'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST'
    ]
    HAND_IDENTIFIERS = [id + "_right" for id in hand_landmarks] + [id + "_left" for id in hand_landmarks]
    POSE_IDENTIFIERS = ["RIGHT_SHOULDER", "LEFT_SHOULDER", "LEFT_ELBOW", "RIGHT_ELBOW"]
    body_identifiers = HAND_IDENTIFIERS + POSE_IDENTIFIERS

    interpolate_keypoints(input_file_path, output_file_path, body_identifiers)

    # Load interpolated data and store them in numpy files
    train_data = pd.read_csv(output_file_path)
    frames = 80

    data = []
    labels = []

    for video_index, video in tqdm(train_data.iterrows(), total=train_data.shape[0]):
        T = len(ast.literal_eval(video["INDEX_FINGER_DIP_right_x"]))
        current_row = np.empty(shape=(2, T, len(body_identifiers), 1))

        for index, identifier in enumerate(body_identifiers):
            data_keypoint_preprocess_x = ast.literal_eval(video[identifier + "_x"])
            current_row[0, :, index, :] = np.asarray(data_keypoint_preprocess_x).reshape(T, 1)

            data_keypoint_preprocess_y = ast.literal_eval(video[identifier + "_y"])
            current_row[1, :, index, :] = np.asarray(data_keypoint_preprocess_y).reshape(T, 1)

        if T < frames:
            target = np.zeros(shape=(2, frames, len(body_identifiers), 1))
            target[:, :T, :, :] = current_row
        else:
            target = current_row[:, :frames, :, :]

        data.append(target)
        labels.append(int(video["label"]))

    keypoint_data = np.stack(data, axis=0)
    label_data = np.stack(labels, axis=0)
    np.save(f'vsl{num_labels}_data_preprocess.npy', keypoint_data)
    np.save(f'vsl{num_labels}_label_preprocess.npy', label_data)
    print("Data processing and saving completed.")


In [None]:
import numpy as np
a = np.load(f'vsl{num_labels}_data_preprocess.npy')
b = np.load(f'vsl{num_labels}_label_preprocess.npy')

print(a.shape)
print(b.shape)

Do K-Folds and store the keypoints in numpy files

In [None]:
from sklearn.model_selection import KFold
import os
import numpy as np
import pandas as pd
from tqdm import tqdm

def k_fold_cross_validation(train_data, keypoint_data, label_data, num_labels, k_folds, destination_folder="numpy_files"):
    os.makedirs(destination_folder, exist_ok=True)

    actors = train_data['actor'].unique()
    print(f"Number of actors: {len(actors)}")
    print('-----------------------------------------------------')

    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    actor_to_indices = {actor: train_data.index[train_data['actor'] == actor].tolist() for actor in actors}
    folds = [[] for _ in range(k_folds)]

    for fold, (train_actors, test_actors) in enumerate(kf.split(actors)):
        train_actors = actors[train_actors]
        test_actors = actors[test_actors]

        for actor in test_actors:
            folds[fold].extend(actor_to_indices[actor])

        tqdm.write(f"Fold {fold+1}: {len(folds[fold])} test samples")

    # Iterate over each fold to create train-test splits
    for fold in range(k_folds):
        test_indices = folds[fold]
        train_indices = [idx for f in range(k_folds) if f != fold for idx in folds[f]]

        X_train, X_test = keypoint_data[train_indices], keypoint_data[test_indices]
        y_train = np.array(label_data[train_indices], dtype=np.int64)
        y_test = np.array(label_data[test_indices], dtype=np.int64)

        np.save(os.path.join(destination_folder, f'vsl{num_labels}_data_fold{fold+1}_train.npy'), X_train)
        np.save(os.path.join(destination_folder, f'vsl{num_labels}_label_fold{fold+1}_train.npy'), y_train)
        np.save(os.path.join(destination_folder, f'vsl{num_labels}_data_fold{fold+1}_test.npy'), X_test)
        np.save(os.path.join(destination_folder, f'vsl{num_labels}_label_fold{fold+1}_test.npy'), y_test)

        tqdm.write(f"Processed and saved vsl{num_labels} fold {fold+1} successfully.")

if __name__ == "__main__":
    input_file_path = f"vsl{num_labels}_interpolated_keypoints.csv"
    train_data = pd.read_csv(input_file_path)

    keypoint_data = np.load(f'vsl{num_labels}_data_preprocess.npy')
    label_data = np.load(f'vsl{num_labels}_label_preprocess.npy')

    num_labels = len(np.unique(label_data))

    k_folds = 10
    k_fold_cross_validation(train_data, keypoint_data, label_data, num_labels, k_folds)


In [None]:
import numpy as np
a = np.load(f'numpy_files/vsl{num_labels}_data_fold2_test.npy')
b = np.load(f'numpy_files/vsl{num_labels}_data_fold2_train.npy')

print(a.shape)
print(b.shape)

train based on AUTSL with different folds

In [None]:

import os
import numpy as np
import torch
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from feeder import FeederINCLUDE
from aagcn import Model
from augumentation import Rotate, Compose
from pytorch_lightning.utilities.migration import pl_legacy_patch

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

if __name__ == '__main__':
    k_folds = 10
    num_labels = 10
    config = {'batch_size': 128, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}

    device = "cuda" if torch.cuda.is_available() else "cpu"

    best_accuracy = 0.0
    best_fold = -1

    for fold in range(k_folds):
        print(f"Starting fold {fold + 1}/{k_folds}")
        train_data_path = os.path.join("numpy_files", f'vsl{num_labels}_data_fold{fold+1}_train.npy')
        train_label_path = os.path.join("numpy_files", f'vsl{num_labels}_label_fold{fold+1}_train.npy')
        val_data_path = os.path.join("numpy_files", f'vsl{num_labels}_data_fold{fold+1}_test.npy')
        val_label_path = os.path.join("numpy_files", f'vsl{num_labels}_label_fold{fold+1}_test.npy')

        transforms = Compose([
            Rotate(15, 80, 25, (0.5, 0.5))
        ])

        train_dataset = FeederINCLUDE(
            data_path=train_data_path,
            label_path=train_label_path,
            transform=transforms
        )
        val_dataset = FeederINCLUDE(
            data_path=val_data_path,
            label_path=val_label_path
        )

        train_dataloader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
        val_dataloader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)

        model = Model(num_class=num_labels, num_point=46, num_person=1, in_channels=2,
                      graph_args={"layout": "mediapipe_two_hand", "strategy": "spatial"},
                      learning_rate=config['learning_rate'], weight_decay=config['weight_decay'])

        # Path pre-trained checkpoint file on AUTSL
        checkpoint_path = "autsl_vsl199-aagcn-fold=7-v1.ckpt"

        with pl_legacy_patch():
            checkpoint = torch.load(checkpoint_path, map_location=device)

        state_dict = checkpoint['state_dict']
        filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith('fc.')}
        model.load_state_dict(filtered_state_dict, strict=False)

        callbacks = [
            ModelCheckpoint(
                dirpath="checkpoints",
                monitor="valid_accuracy",
                mode="max",
                every_n_epochs=1,
                filename=f'autsl_vsl{num_labels}-aagcn-fold={fold+1}'
            ),
        ]

        trainer = pl.Trainer(max_epochs=100, accelerator="auto", check_val_every_n_epoch=1,
                             devices=1, callbacks=callbacks)

        trainer.fit(model, train_dataloader, val_dataloader)
        val_accuracy = trainer.callback_metrics['valid_accuracy'].item()
        print(f"Fold {fold + 1} finished with validation accuracy: {val_accuracy:.4f}")

        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            best_fold = fold + 1

    print(f"The highest validation accuracy achieved is {best_accuracy:.4f} from fold {best_fold}.")


In [None]:
print(f"The highest validation accuracy achieved of autsl vsl{num_labels} is {best_accuracy:.4f} from fold {best_fold}.")