Import libraries needed

In [26]:
import os
import json
import cv2
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tqdm import tqdm
from tensorflow import keras
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout,
                                     LSTM, TimeDistributed, GlobalAveragePooling2D)
from tensorflow.keras.models import Model, Sequential
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
import kagglehub

Download the latest version of WLASL dataset

In [27]:
# Download latest version
data_dir = kagglehub.dataset_download("risangbaskoro/wlasl-processed")

print("Path to dataset files:", data_dir)

Path to dataset files: /root/.cache/kagglehub/datasets/risangbaskoro/wlasl-processed/versions/5


In [28]:
# Define the path to the dataset
videos_dir = os.path.join(data_dir, 'videos') # path to the videos
processed_data_dir = os.path.join(data_dir, 'processed_data') # path to the processed data

if not os.path.exists(processed_data_dir): # create the directory if it does not exist
    os.makedirs(processed_data_dir) 

In [29]:
# Helper functions
def missing_videos():
    missing_videos_file = os.path.join(data_dir, 'missing.txt')
    missing_videos = set() # store the missing videos in a set so that we can easily check for membership
    with open(missing_videos_file, 'r') as f: # read the file line by line
        for line in f:
            line = line.strip()   # remove leading and trailing whitespaces
            if line: # if the line is not empty
                missing_videos.add(line)
    return missing_videos

In [30]:
def data_loader(): # load the data
    WLASL_file = os.path.join(data_dir, 'WLASL_v0.3.json')
    with open(WLASL_file, 'r') as f: # open the file
        data = json.load(f) # load the data
    return data

In [31]:
missing = missing_videos() # get the missing videos
data = data_loader() # load the data

In [32]:
# Load nslt_100 (list of video_ids to include)
nslt_100_dir = os.path.join(data_dir, 'nslt_100.json') # path to the nslt_100 file
with open(nslt_100_dir, 'r') as f:
    nslt_100 = json.load(f) # load the nslt_100 file

nslt_100 = list(nslt_100)[:50] # get the first 50 video_ids

In [33]:
def filter_dataset(data, nslt_100): # filter the dataset
    filtered_glosses = [] # store the filtered glosses
    filtered_data = [] # store the filtered data
    for gloss in data: 
        valid_instances = [] 
        for instance in gloss['instances']:     # iterate through the instances
            if instance['video_id'] in nslt_100 and instance['video_id'] not in missing: # check if the video_id is in nslt_100 and not in missing
                valid_instances.append(instance) # add the instance to the valid_instances list
        if len(valid_instances) > 0: # if there are valid instances
            filtered_glosses.append({'gloss': gloss['gloss'], 'instances': valid_instances}) # add the gloss and the valid instances to the filtered_glosses list
            filtered_data.extend(valid_instances)
    return filtered_data, filtered_glosses

In [35]:
filtered_data, glosses = filter_dataset(data, nslt_100)

In [36]:
print("Number of words in the filtered dataset:", len(glosses))
print("Number of videos in the filtered dataset:", len(filtered_data))
print("Example gloss:", glosses[0])

Number of words in the filtered dataset: 15
Number of videos in the filtered dataset: 35
Example gloss: {'gloss': 'who', 'instances': [{'bbox': [165, 4, 472, 370], 'fps': 25, 'frame_end': -1, 'frame_start': 1, 'instance_id': 14, 'signer_id': 88, 'source': 'aslsignbank', 'split': 'train', 'url': 'https://aslsignbank.haskins.yale.edu/dictionary/protected_media/glossvideo/ASL/WH/WHO-1430.mp4', 'variation_id': 0, 'video_id': '66778'}, {'bbox': [167, 3, 471, 370], 'fps': 25, 'frame_end': -1, 'frame_start': 1, 'instance_id': 18, 'signer_id': 88, 'source': 'aslsignbank', 'split': 'train', 'url': 'https://aslsignbank.haskins.yale.edu/dictionary/protected_media/glossvideo/ASL/WH/WHO-2236.mp4', 'variation_id': 0, 'video_id': '66779'}]}


In [37]:
# Video processing function
def process_video_fixed(video_file, frame_start, frame_end, bbox, output_dir, num_frames=16): # process the video
    cap = cv2.VideoCapture(video_file) # open the video file
    if not cap.isOpened(): # check if the video file is opened
        print(f'Failed to open video file {video_file}') # print an error message
        return # return None

    frame_start = frame_start - 1 # subtract 1 from the frame_start assuming that the frame_start is 1-indexed
    if frame_end == -1: # if frame_end is -1
        frame_end = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - 1 # set frame_end to the total number of frames in the video file
    else:
        frame_end = frame_end - 1 # subtract 1 from the frame_end assuming that the frame_end is 1-indexed

    x1, y1, x2, y2 = map(int, bbox) # map the bbox to integers

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # get the width of the video
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # get the height of the video

    # Clamp bbox coords
    x1 = max(0, min(x1, width - 1))
    x2 = max(0, min(x2, width - 1))
    y1 = max(0, min(y1, height - 1))
    y2 = max(0, min(y2, height - 1))

    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_start) # set the frame position to frame_start
    total_frames = frame_end - frame_start + 1 # calculate the total number of frames
    step = total_frames // num_frames if total_frames >= num_frames else 1 # calculate the step size if the total_frames is greater than or equal to num_frames, else set the step size to 1

    frames_selected = [] # store the selected frames
    for idx in range(frame_start, frame_end + 1, step): 
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx) # set the frame position to idx
        ret, frame = cap.read() # read the frame
        if not ret: # if the frame is not read
            break
        crop_frame = frame[y1:y2, x1:x2] # crop the frame
        gray_frame = cv2.cvtColor(crop_frame, cv2.COLOR_BGR2GRAY) # convert the frame to grayscale
        resize_frame = cv2.resize(gray_frame, (224, 224)) # resize the frame to 224x224
        frames_selected.append(resize_frame) # add the resized frame to the frames_selected list 
        if len(frames_selected) == num_frames: # if the length of frames_selected is equal to num_frames    
            break   

    # If fewer than num_frames, pad with the last frame
    if len(frames_selected) < num_frames and len(frames_selected) > 0: # if the length of frames_selected is less than num_frames and greater than 0
        last_frame = frames_selected[-1] # get the last frame
        while len(frames_selected) < num_frames: # while the length of frames_selected is less than num_frames
            frames_selected.append(last_frame) # add the last frame to the frames_selected list

    # Save frames
    for i, f in enumerate(frames_selected):
        cv2.imwrite(os.path.join(output_dir, f'frame_{i:04d}.jpg'), f) # save the frame

    cap.release()


In [38]:
def process_gloss(gloss_word, instances): # process the gloss
    for instance in instances: # iterate through the instances
        video_id = instance['video_id'] # get the video_id
        if video_id in missing: # if the video_id is in missing
            continue # continue to the next iteration
        video_file = os.path.join(videos_dir, video_id + '.mp4') # get the video file
        if not os.path.exists(video_file): # if the video file does not exist
            continue # continue to the next iteration 
        split = instance['split'] # get the split
        output_dir = os.path.join(processed_data_dir, split, gloss_word, video_id) # set the output directory path as the split/gloss_word/video_id
        if not os.path.exists(output_dir): # if the output directory does not exist
            os.makedirs(output_dir) # create the output directory
        process_video_fixed(video_file, instance['frame_start'], instance['frame_end'], instance['bbox'], output_dir) # process the video

In [39]:
# Process each gloss (You can comment out this loop if already processed)
for gloss_data in tqdm(glosses, desc='Processing glosses'): # iterate through the glosses
    gloss_word = gloss_data['gloss'] # get the gloss word
    instances = gloss_data['instances'] # get the instances
    process_gloss(gloss_word, instances) # process the gloss

Processing glosses: 100%|██████████| 15/15 [00:33<00:00,  2.21s/it]


In [40]:
# Load processed data
def load_data():
    data_map = {} # store the data
    splits = ['train', 'test', 'val'] # define the splits
    for split in splits: # iterate through the splits
        split_dir = os.path.join(processed_data_dir, split) # set the split directory path
        if not os.path.exists(split_dir): # if the split directory does not exist
            continue # continue to the next iteration
        for gloss_word in os.listdir(split_dir): # iterate through the gloss words
            gloss_dir = os.path.join(split_dir, gloss_word) # set the gloss directory path
            if not os.path.isdir(gloss_dir):    # if the gloss directory does not exist
                continue
            for video_id in os.listdir(gloss_dir): # iterate through the video_ids
                video_dir = os.path.join(gloss_dir, video_id) # set the video directory path by joining the gloss_dir and video_id
                if os.path.isdir(video_dir): # if the video directory exists
                    frame_files = sorted(os.listdir(video_dir)) # sort the frame files to maintain the order
                    frame_paths = [os.path.join(video_dir, ff) for ff in frame_files] # get the frame paths by joining the video_dir and frame_files
                    data_map[(gloss_word, video_id, split)] = frame_paths
    return data_map

In [41]:
data_map = load_data() # load the data

In [42]:
# Extract train/val/test sets
X_train, Y_train = [], [] 
X_val, Y_val = [], []
X_test, Y_test = [], []

for (gloss_word, video_id, split) in data_map: # iterate through the data_map
    frames = data_map[(gloss_word, video_id, split)] # get the frames from the data_map
    if len(frames) < 16: # if the length of frames is less than 16
        continue  # Skip if not enough frames after processing
    # Load frames
    clip = [] # store the frames
    for f in frames: # iterate through the frames
        img = cv2.imread(f, cv2.IMREAD_GRAYSCALE) # read the frame in grayscale
        img = cv2.resize(img, (224, 224)) # resize the frame to 224x224
        clip.append(img) # add the frame to the clip
    clip = np.stack(clip, axis=0)  # shape: (num_frames, 224, 224)
    # Keep only first 16 frames if more
    clip = clip[:16] # keep only the first 16 frames    

    if split == 'train': # if the split is train`
        X_train.append(clip) # add the clip to X_train
        Y_train.append(gloss_word) # add the gloss_word to Y_train
    elif split == 'val': # if the split is val
        X_val.append(clip) # add the clip to X_val
        Y_val.append(gloss_word) # add the gloss_word to Y_val
    elif split == 'test': # if the split is test
        X_test.append(clip) # add the clip to X_test
        Y_test.append(gloss_word) # add the gloss_word to Y_test

X_train = np.array(X_train)  # (num_samples, 16, 224, 224)
X_val = np.array(X_val)
X_test = np.array(X_test)

In [43]:
# Expand dims for channel
X_train = np.expand_dims(X_train, -1)  # (num_samples, 16, 224, 224, 1)
X_val = np.expand_dims(X_val, -1) # (num_samples, 16, 224, 224, 1)
X_test = np.expand_dims(X_test, -1) # (num_samples, 16, 224, 224, 1)

In [44]:
# Normalize
X_train = X_train / 255.0
X_val = X_val / 255.0
X_test = X_test / 255.0

In [45]:
print("Train shape:", X_train.shape)
print("Val shape:", X_val.shape)
print("Test shape:", X_test.shape)

Train shape: (24, 16, 224, 224, 1)
Val shape: (7, 16, 224, 224, 1)
Test shape: (4, 16, 224, 224, 1)


In [47]:
# Combine all labels before fitting
all_labels = Y_train + Y_val + Y_test # Combine all labels

label_encoder = LabelEncoder() # Initialize the label encoder
label_encoder.fit(all_labels)  # Fit on all the labels

Y_train_encoded = label_encoder.transform(Y_train) # Transform the Y_train labels
Y_val_encoded = label_encoder.transform(Y_val) # Transform the Y_val labels 
Y_test_encoded = label_encoder.transform(Y_test) # Transform the Y_test labels

In [49]:
num_classes = len(label_encoder.classes_) # Get the number of classes from the label encoder by getting the length of the classes
Y_train_one_hot = to_categorical(Y_train_encoded, num_classes) # One-hot encode the Y_train labels
Y_val_one_hot = to_categorical(Y_val_encoded, num_classes) # One-hot encode the Y_val labels
Y_test_one_hot = to_categorical(Y_test_encoded, num_classes) # One-hot encode the Y_test labels

In [50]:
print("Number of classes:", num_classes) 

Number of classes: 15


In [51]:
# CNN feature extractor
cnn_input = Input(shape=(224,224,1)) # Define the input shape
x = Conv2D(32, (3,3), activation='relu')(cnn_input) # Add a Conv2D layer with 32 filters and relu activation
x = MaxPooling2D((2,2))(x) # Add a MaxPooling2D layer
x = Conv2D(64, (3,3), activation='relu')(x) # Add another Conv2D layer with 64 filters and relu activation
x = MaxPooling2D((2,2))(x) # Add another MaxPooling2D layer
x = Conv2D(128, (3,3), activation='relu')(x) # Add another Conv2D layer with 128 filters and relu activation
x = MaxPooling2D((2,2))(x) # Add another MaxPooling2D layer
x = GlobalAveragePooling2D()(x)  # Get a feature vector per frame
cnn_model = Model(cnn_input, x)

In [52]:
# Now wrap it with TimeDistributed for the sequence of 16 frames
sequence_input = Input(shape=(16, 224, 224, 1)) # Define the input shape
td = TimeDistributed(cnn_model)(sequence_input) # shape: (batch, 16, feature_dim)

In [53]:
# LSTM to model temporal data
lstm_out = LSTM(128)(td) # Add an LSTM layer with 128 units
output = Dense(num_classes, activation='softmax')(lstm_out) # Add a Dense layer with softmax activation

In [54]:
model = Model(sequence_input, output) # Define the model

In [55]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # Compile the model with adam optimizer, categorical_crossentropy loss and accuracy metric
model.summary()

In [56]:
# Train the model
history = model.fit(
    X_train, Y_train_one_hot,
    validation_data=(X_val, Y_val_one_hot), 
    epochs=20,
    batch_size=8
)

Epoch 1/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 14s/step - accuracy: 0.0208 - loss: 2.6970 - val_accuracy: 0.1429 - val_loss: 2.6666
Epoch 2/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 13s/step - accuracy: 0.2396 - loss: 2.5643 - val_accuracy: 0.1429 - val_loss: 2.6623
Epoch 3/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 14s/step - accuracy: 0.1458 - loss: 2.3630 - val_accuracy: 0.1429 - val_loss: 3.1240
Epoch 4/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 12s/step - accuracy: 0.2604 - loss: 2.2657 - val_accuracy: 0.1429 - val_loss: 3.4849
Epoch 5/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 13s/step - accuracy: 0.0677 - loss: 2.2048 - val_accuracy: 0.1429 - val_loss: 3.6319
Epoch 6/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 13s/step - accuracy: 0.1927 - loss: 2.2596 - val_accuracy: 0.1429 - val_loss: 3.6713
Epoch 7/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━

In [57]:
# Evaluate on test set
test_loss, test_acc = model.evaluate(X_test, Y_test_one_hot, verbose=0)
print("Test Accuracy:", test_acc)

Test Accuracy: 0.25


In [59]:
train_loss, train_accuracy = model.evaluate(X_train, Y_train_one_hot, verbose=0)
print("Final training accuracy:", train_accuracy)

Final training accuracy: 0.375


In [60]:
val_loss, val_accuracy = model.evaluate(X_val, Y_val_one_hot, verbose=0)
print("Final val accuracy:", val_accuracy)

Final val accuracy: 0.1428571492433548
