In [1]:
import os
import gc
import cv2
import json
import time
import shutil
import numpy as np
from tqdm import tqdm
import mediapipe as mp
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from concurrent.futures import ThreadPoolExecutor
from IPython.display import clear_output, FileLink

In [2]:
with open(r"D:\NullClass_Internship\Sign_Language_Prediction\dataset\WLASL_v0.3.json", 'r') as json_file:
    wlasl_data = json.load(json_file)

In [3]:
wlasl_data[0]['instances'][0]['bbox']

[385, 37, 885, 720]

In [4]:
dataset_dir = os.path.join(os.getcwd(), 'dataset')
video_dir = os.path.join(dataset_dir, 'wlasl-processed')
backup_dir = os.path.join(dataset_dir, 'wlasl2000-resized')

In [5]:
classes_to_keep = [
    "what", "who", "where", "when", "why", "how",
    "you", "I", "we", "they", "he", "she", "it",
    "your", "my", "our", "their", "his", "her",
    "name", "do", "go", "come", "see", "eat", "drink",
    "yes", "no", "please", "thank", "sorry",
    "is", "are", "am", "be", "have", "like",
    "this", "that", "here", "there"
]

In [6]:
data = []

for i in tqdm(range(len(wlasl_data)), ncols=100):
    gloss = wlasl_data[i]['gloss']
    if gloss not in classes_to_keep:
        continue  # Skip this iteration if gloss is not in the list of classes to keep
    instances = wlasl_data[i]['instances']
    for instance in instances:
        video_id = instance['video_id']
        if os.path.exists(os.path.join(video_dir, f'{video_id}.mp4')):
            video_path = os.path.join(video_dir, f'{video_id}.mp4')
        elif os.path.exists(os.path.join(backup_dir, f'{video_id}.mp4')):
            video_path = os.path.join(backup_dir, f'{video_id}.mp4')
        else:
            continue

        frame_start = instance['frame_start']
        frame_end = instance['frame_end']
        split = instance['split']
        data.append({
            'gloss': gloss,
            'video_path': video_path,
            'frame_start': frame_start,
            'frame_end': frame_end,
            'split': split
        })

100%|█████████████████████████████████████████████████████████| 2000/2000 [00:00<00:00, 2046.93it/s]


In [7]:
len(data)

491

In [8]:
with open(r'D:\NullClass_Internship\Sign_Language_Prediction\dataset\WLASL_parsed_data.json', 'w') as json_file:
    json.dump(data, json_file, indent=4)
    

In [9]:
FileLink(r"D:\NullClass_Internship\Sign_Language_Prediction\dataset\WLASL_parsed_data.json")

In [10]:
filtered_hand = list(range(21))

filtered_pose = [11, 12, 13, 14, 15, 16]

filtered_face = [0, 4, 7, 8, 10, 13, 14, 17, 21, 33, 37, 39, 40, 46, 52, 53, 54, 55, 58,
                 61, 63, 65, 66, 67, 70, 78, 80, 81, 82, 84, 87, 88, 91, 93, 95, 103, 105,
                 107, 109, 127, 132, 133, 136, 144, 145, 146, 148, 149, 150, 152, 153, 154,
                 155, 157, 158, 159, 160, 161, 162, 163, 172, 173, 176, 178, 181, 185, 191,
                 234, 246, 249, 251, 263, 267, 269, 270, 276, 282, 283, 284, 285, 288, 291,
                 293, 295, 296, 297, 300, 308, 310, 311, 312, 314, 317, 318, 321, 323, 324,
                 332, 334, 336, 338, 356, 361, 362, 365, 373, 374, 375, 377, 378, 379, 380,
                 381, 382, 384, 385, 386, 387, 388, 389, 390, 397, 398, 400, 402, 405, 409,
                 415, 454, 466, 468, 473]

HAND_NUM = len(filtered_hand)
POSE_NUM = len(filtered_pose)
FACE_NUM = len(filtered_face)

In [11]:
all_landmarks = np.zeros((HAND_NUM * 2 + POSE_NUM + FACE_NUM, 3)) # performing preallocation

In [12]:
hands = mp.solutions.hands.Hands()
pose = mp.solutions.pose.Pose()
face_mesh = mp.solutions.face_mesh.FaceMesh(refine_landmarks=True)

def fetch_frame_landmarks(frame):
    
    all_landmarks = np.zeros((HAND_NUM * 2 + POSE_NUM + FACE_NUM, 3))
    
    def get_hands(frame):
        results_hands = hands.process(frame)
        if results_hands.multi_hand_landmarks:
            for i, hand_landmarks in enumerate(results_hands.multi_hand_landmarks):
                if results_hands.multi_handedness[i].classification[0].index == 0: # perform classification for each hand
                    all_landmarks[:HAND_NUM, :] = np.array(
                        [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]) # right
                else:
                    all_landmarks[HAND_NUM:HAND_NUM * 2, :] = np.array(
                        [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]) # left

    def get_pose(frame):
        results_pose = pose.process(frame)
        if results_pose.pose_landmarks:
            all_landmarks[HAND_NUM * 2:HAND_NUM * 2 + POSE_NUM, :] = np.array(
                [(lm.x, lm.y, lm.z) for lm in results_pose.pose_landmarks.landmark])[filtered_pose]
        
    def get_face(frame):
        results_face = face_mesh.process(frame)
        if results_face.multi_face_landmarks:
            all_landmarks[HAND_NUM * 2 + POSE_NUM:, :] = np.array(
                [(lm.x, lm.y, lm.z) for lm in results_face.multi_face_landmarks[0].landmark])[filtered_face]
        
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.submit(get_hands, frame)
        executor.submit(get_pose, frame)
        executor.submit(get_face, frame)

    return all_landmarks

In [13]:
import cv2
import numpy as np

def fetch_video_landmarks(video_path, start_frame=1, end_frame=-1, hands=None, pose=None, face_mesh=None):
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise IOError("Could not open video file")
        
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
         # handling edge cases
        if start_frame <= 1:
            start_frame = 1

        elif start_frame > total_frames:
            start_frame = 1
            end_frame = total_frames
            
        if end_frame < 0 or end_frame > total_frames:
            end_frame = total_frames
        
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame - 1)
        
        num_landmarks = HAND_NUM * 2 + POSE_NUM + FACE_NUM
        total_frame_landmarks = np.zeros((min(end_frame, total_frames) - start_frame + 1, num_landmarks, 3))
        
        frame_index = start_frame
        while frame_index <= end_frame:
            ret, frame = cap.read()
            if not ret:
                break
            
            frame.flags.writeable = False
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_landmarks = fetch_frame_landmarks(frame)
            total_frame_landmarks[frame_index - start_frame] = frame_landmarks
            
            frame_index += 1
        
    except IOError as e:
        print(f"Error opening video file: {e}")
        return None
    finally:
        cap.release()
        if hands: 
            hands.reset()
        if pose:
            pose.reset()
        if face_mesh:
            face_mesh.reset()
    
    return total_frame_landmarks

In [14]:
def draw_landmarks(input_path, output_path, video_landmarks, start_frame=1, end_frame=-1):
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise ValueError("Error opening video file.")
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # handling edge cases
    if start_frame < 1 or start_frame > total_frames:
        start_frame = 1
    if end_frame < 0 or end_frame > total_frames:
        end_frame = total_frames
    if start_frame > end_frame:
        raise ValueError("start_frame must be less than or equal to end_frame.")
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    if not out.isOpened():
        raise ValueError("Error opening video for output.")
    
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame-1)
    frame_index = start_frame
    while frame_index <= end_frame and cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        landmark_index = frame_index - start_frame
        if landmark_index < len(video_landmarks):
            frame_landmarks = video_landmarks[landmark_index]
            landmarks = [(int(x * width), int(y * height)) for x, y, _ in frame_landmarks]
            for x, y in landmarks:
                cv2.circle(frame, (x, y), 1, (0, 255, 0), -1)
        
        out.write(frame)
        frame_index += 1

    cap.release()
    out.release()

In [None]:
# Inference purpose only

In [None]:
with open(r"D:\NullClass_Internship\Sign_Language_Prediction\dataset\WLASL_parsed_data.json", 'r') as json_file:
    data = json.load(json_file)

test = data[205]
output_path = r"D:\NullClass_Internship\Sign_Language_Prediction\output.mp4"
video_landmarks = fetch_video_landmarks(test['video_path'],test['frame_start'],test['frame_end'])
draw_landmarks(test['video_path'], output_path, video_landmarks, test['frame_start'],test['frame_end'])

In [None]:
for i in range(len(data)):
    if data[i]['gloss'] == 'how':
        print(i)
        break

In [15]:
saved_features_dir = os.path.join(os.getcwd(), 'saved_features')
os.makedirs(saved_features_dir, exist_ok=True)

In [17]:
# saing the features for all the videos
for i in tqdm(range(len(data)), ncols=100):
    npy_path = os.path.join(saved_features_dir, f'{i}.npy')
    if os.path.exists(npy_path): continue
    video_path = data[i]['video_path']
    start = data[i]['frame_start']
    end = data[i]['frame_end']
    
    try:
        video_landmarks = fetch_video_landmarks(video_path, start, end)
        np.save(npy_path, video_landmarks)
        
    except Exception as e:
        print(f"\nError encoding {video_path}\n{e}")
        continue   
    clear_output(wait=True)

100%|█████████████████████████████████████████████████████████████| 491/491 [26:23<00:00,  3.23s/it]


In [19]:
landmarks_dict = {}

for filename in os.listdir(saved_features_dir):
    if filename.endswith('.npy'):
        key = filename.split('.')[0]
        landmarks = np.load(os.path.join(saved_features_dir, filename), allow_pickle=True)
        landmarks_dict[key] = landmarks


np.savez_compressed('features_dict.npz', **landmarks_dict)

In [22]:
FileLink(r'features_dict.npz')