In [1]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.3-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.21-cp310-cp310-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl (294 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.3-py3-none-any.whl (32 kB)
Installing collected packages: protobuf, sounddevice, mediapipe
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.3
    Uninstalling prot

In [2]:
import os
import pandas as pd
import numpy as np
import cv2
from matplotlib import pyplot as plt
import mediapipe as mp
from sklearn.model_selection import train_test_split

In [3]:
RAW_DATA_DIR = "/kaggle/input/wlasl-processed"
PROCESSED_DATA_DIR = "/kaggle/working"

In [4]:
def euclidean_distance(v1, v2):
    return np.sqrt((float(v1[0]) - float(v2[0])) ** 2 + (float(v1[1]) - float(v2[1])) ** 2)

In [5]:
def ReadVideo(video_path):    
    # Read the input video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Could not open video file {video_path}")
    return cap

In [6]:
def getGrayFramesAndFrames(cap):
    frames = []
    gray_frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frames.append(frame)        
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray_frames.append(gray_frame)
    
    cap.release()
    return gray_frames, frames

In [7]:
def calculate_histogram_differences(gray_frames):
    HDiffs = []
    for i in range(0, len(gray_frames)-1):
        if (i == 0):
            hist_curr = cv2.calcHist([gray_frames[i]], [0], None, [256], [0, 256])
            continue
        hist_prev = hist_curr
        hist_curr = cv2.calcHist([gray_frames[i]], [0], None, [256], [0, 256])

        Hdiff = np.sum(np.abs(hist_prev - hist_curr))
        HDiffs.append(Hdiff)
    return HDiffs

In [8]:
def Extract_key_frames(video_path):
    cap = ReadVideo(video_path)
    gray_frames, frames = getGrayFramesAndFrames(cap)
    HDiffs = calculate_histogram_differences(gray_frames)
    mean = np.mean(HDiffs)
    std = np.std(HDiffs)
    threshold = mean + 0.5 * std
    keyframes = []
    for i in range(len(HDiffs)):
        if HDiffs[i] > threshold:
            keyframes.append(frames[i+1])
    return keyframes

In [9]:
def extract_frames(video_path):
    cap = ReadVideo(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()
    if len(frames) > 2:
        return frames[1:-1]
    else:
        return []

In [10]:
def extract_pose_landmarks(rgb_frame, mp_pose):
    pose_results = mp_pose.process(rgb_frame)
    pose_landmarks = []

    if pose_results.pose_landmarks:
        for i, lm in enumerate(pose_results.pose_landmarks.landmark):
            if i < 17 and i not in [7, 8]:
                pose_landmarks.append((lm.x, lm.y, lm.z))

    return pose_landmarks

In [11]:
def classify_hands (pose_landmarks, hand_landmarks):
    left_wrist_pose  = pose_landmarks[13]
    right_wrist_pose = pose_landmarks[14]  
    wrist = hand_landmarks[0] 
    

    dleft = euclidean_distance(left_wrist_pose, wrist)
    dright = euclidean_distance(right_wrist_pose, wrist)
   
    if(dleft < dright):
        return "Left"
    else:
        return "Right"

In [12]:
def extract_hand_landmarks(rgb_frame, mp_hands, pose_landmarks):
    hands_results = mp_hands.process(rgb_frame)
    left_hand_landmarks = []
    right_hand_landmarks = []

    if hands_results.multi_hand_landmarks and hands_results.multi_handedness:
        for hand_landmarks, handedness in zip(hands_results.multi_hand_landmarks, hands_results.multi_handedness):
            # Không sử dụng hướng tay của mediapipe vì độ chính xác thấp và thường ngược hướng
            landmarks = [(lm.x, lm.y, lm.z) for lm in hand_landmarks.landmark]
            if classify_hands(pose_landmarks, landmarks) == "Right":
                right_hand_landmarks = landmarks
            else:
                left_hand_landmarks = landmarks   
    
    return left_hand_landmarks, right_hand_landmarks

In [13]:
def extract_landmarks(frames):
    mp_pose = mp.solutions.pose.Pose(static_image_mode=True)
    mp_hands = mp.solutions.hands.Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.1)
    
    landmarks_dict = {}
    
    for idx, frame in enumerate(frames):
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        pose_landmarks = extract_pose_landmarks(rgb_frame, mp_pose)
        left_hand_landmarks, right_hand_landmarks = extract_hand_landmarks(rgb_frame, mp_hands ,pose_landmarks)
        
        landmarks_dict[idx] = {
            "pose": pose_landmarks,
            "left": left_hand_landmarks,
            "right": right_hand_landmarks
        }
    mp_pose.close()
    mp_hands.close()
    return landmarks_dict

In [None]:
# def filter_invalid_landmarks(landmarks_dict):
#     validated = {}
#     for landmarks_idx, landmarks_data in landmarks_dict.items():
#         validated_landmarks = {}
#         # Nếu không có pose thì bỏ qua frame
#         if  not landmarks_data["pose"]:
#             continue
#         for part in ["pose", "right", "left"]:
#             # Nếu không có dữ liệu cho phần này, gán mặc định
#             if part not in landmarks_data or not landmarks_data[part]:
#                 if part in ["right", "left"]:
#                     validated_landmarks[part] = [(0.0, 0.0)] * 21
#             else:
#                 processed_points = []
#                 for point in landmarks_data[part]:
#                     x = float(point[0])
#                     y = float(point[1])
#                     if not (0.0 <= x <= 1.0 and 0.0 <= y <= 1.0):
#                         x, y = 0.0, 0.0
#                     processed_points.append((x, y))
#                 validated_landmarks[part] = processed_points
#         validated[landmarks_idx] = validated_landmarks
#     return validated


**Cập nhật mới** Lọc và nội suy

In [14]:
def filter_and_interpolate_landmarks(landmarks_dict):
    validated = {}

    for i, (frame_idx, landmarks_data) in enumerate(landmarks_dict.items()):
        validated_landmarks = {}

        if not landmarks_data["pose"]:
            continue

        for part in ["pose", "right", "left"]:
            current = landmarks_data.get(part, [])

            if part in ["right", "left"] and not current:
                if 0 < i < len(landmarks_dict) - 1:
                    prev_data = landmarks_dict.get(list(landmarks_dict.keys())[i - 1], {})
                    next_data = landmarks_dict.get(list(landmarks_dict.keys())[i + 1], {})
                    prev_points = prev_data.get(part, [])
                    next_points = next_data.get(part, [])

                    if prev_points and next_points:
                        if not (all(p[0] == 0.0 and p[1] == 0.0 and p[2] == 0.0 for p in prev_points) or
                                all(p[0] == 0.0 and p[1] == 0.0 and p[2] == 0.0 for p in next_points)):
                            # Thực hiện nội suy nếu có dữ liệu hợp lệ
                            interpolated_points = []
                            for j in range(21):  # 21 points cho hand
                                if j < len(prev_points) and j < len(next_points):
                                    x = (prev_points[j][0] + next_points[j][0]) / 2
                                    y = (prev_points[j][1] + next_points[j][1]) / 2
                                    z = (prev_points[j][2] + next_points[j][2]) / 2
                                    interpolated_points.append((x, y, z))
                                else:
                                    interpolated_points.append((0.0, 0.0, 0.0))
                            validated_landmarks[part] = interpolated_points
                            continue

                validated_landmarks[part] = [(0.0, 0.0, 0.0)] * 21
            else:
                processed_points = []
                for point in current:
                    x = float(point[0])
                    y = float(point[1])
                    z = float(point[2])
                    if not (0.0 <= x <= 1.0 and 0.0 <= y <= 1.0):
                        x, y, z = 0.0, 0.0, 0.0
                    processed_points.append((x, y, z))
                validated_landmarks[part] = processed_points

        validated[frame_idx] = validated_landmarks

    return validated


<h3> Hàm xuất ảnh đã gán landmarks cho kiểm thử

In [15]:
CUSTOM_POSE_CONNECTIONS = [
    (0, 1), (0, 4), 
    (4, 5), (5, 6),         
    (1, 2), (2, 3),         
    (7, 8),                
    (9, 10),               
    (10, 12), (12, 14),    
    (9, 11), (11, 13),     
]

def draw_landmarks(frame, landmarks_dict):
    h, w, _ = frame.shape  
    annotated_frame = frame.copy()

    pose_landmarks = landmarks_dict.get("pose", [])
    left_hand_landmarks = landmarks_dict.get("left", [])
    right_hand_landmarks = landmarks_dict.get("right", [])

    # Vẽ đường nối Pose (sử dụng kết nối tùy chỉnh)
    if pose_landmarks and len(pose_landmarks) > 0:
        for idx1, idx2 in CUSTOM_POSE_CONNECTIONS:
            if idx1 < len(pose_landmarks) and idx2 < len(pose_landmarks):
                x1, y1 , z1= pose_landmarks[idx1]
                x2, y2 , z2= pose_landmarks[idx2]
                if x1 and y1 and x2 and y2:
                    x1, y1 = int(x1 * w), int(y1 * h)
                    x2, y2 = int(x2 * w), int(y2 * h)
                    cv2.line(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 5)

    mp_hands = mp.solutions.hands
    hand_connections = list(mp_hands.HAND_CONNECTIONS)
    for hand, landmarks, color in [("left", left_hand_landmarks, (255, 0, 0)), ("right", right_hand_landmarks, (0, 0, 255))]:
        if landmarks and len(landmarks) > 0:
            for idx1, idx2 in hand_connections:
                if idx1 < len(landmarks) and idx2 < len(landmarks):
                    x1, y1, z1 = landmarks[idx1]
                    x2, y2, z2 = landmarks[idx2]
                    if x1 and y1 and x2 and y2:
                        x1, y1 = int(x1 * w), int(y1 * h)
                        x2, y2 = int(x2 * w), int(y2 * h)
                        cv2.line(annotated_frame, (x1, y1), (x2, y2), color, 2)

    for part, landmarks, color in zip(["pose", "left", "right"], 
                                      [pose_landmarks, left_hand_landmarks, right_hand_landmarks], 
                                      [(0, 255, 0), (255, 0, 0), (0, 0, 255)]):
        if landmarks and len(landmarks) > 0:
            for x, y, z in landmarks:
                if x and y:
                    x, y = int(x * w), int(y * h)
                    cv2.circle(annotated_frame, (x, y), 5, color, -1)

    return annotated_frame


In [16]:
def rotate_landmarks(landmarks, angle_deg, center):
    transformed = {}
    theta = np.radians(angle_deg)
    cos_theta = np.cos(theta)
    sin_theta = np.sin(theta)
    cx, cy = center
    
    for key in landmarks:
        if landmarks[key]:
            points = np.array(landmarks[key], dtype=np.float32)
            transformed_points = []
            
            if len(points) == 0:
                continue
            z_values = points[:, 2]
            max_z = np.max(z_values)
            z_noise = np.random.choice([-1, 1]) * np.random.uniform(0.02, 0.04) * max_z

            for x, y, z in points:
                x_new = cx + (x - cx) * cos_theta - (y - cy) * sin_theta
                y_new = cy + (x - cx) * sin_theta + (y - cy) * cos_theta
                z_new = z + z_noise
                transformed_points.append([x_new, y_new, z_new])
            transformed[key] = transformed_points
        else:
            transformed[key] = []
    return transformed

In [17]:
def translate_landmarks(landmarks, tx, ty):
    transformed = {}
    for key in landmarks:
        if landmarks[key]:
            points = np.array(landmarks[key], dtype=np.float32)
            transformed_points = []

            if len(points) == 0:
                continue
            z_values = points[:, 2]
            max_z = np.max(z_values)
            z_noise = np.random.choice([-1, 1]) * np.random.uniform(0.02, 0.04) * max_z
            for x, y, z in points:
                x_new = x + tx
                y_new = y + ty
                z_new = z + z_noise
                transformed_points.append([x_new, y_new, z_new])
            transformed[key] = transformed_points
        else:
            transformed[key] = []
    return transformed

In [18]:
def squeeze_landmarks(landmarks, squeeze_x, squeeze_y):
    transformed = {}
    for key in landmarks:
        if landmarks[key]:
            points = np.array(landmarks[key], dtype=np.float32)
            squeezed_points = []
            
            if len(points) == 0:
                continue
            z_values = points[:, 2]
            max_z = np.max(z_values)
            z_noise = np.random.choice([-1, 1]) * np.random.uniform(0.02, 0.04) * max_z
            
            for x, y, z in points:
                x_new = x * squeeze_x
                y_new = y * squeeze_y
                z_new = z + z_noise
                squeezed_points.append([x_new, y_new, z_new])
            transformed[key] = squeezed_points
        else:
            transformed[key] = []
    return transformed


In [19]:
def generate_pts2(max_shift=0.15):
    top_left = [np.random.uniform(0, max_shift), np.random.uniform(0, max_shift)]
    top_right = [1 - np.random.uniform(0, max_shift), np.random.uniform(0, max_shift)]
    bottom_right = [1 - np.random.uniform(0, max_shift), 1 - np.random.uniform(0, max_shift)]
    bottom_left = [np.random.uniform(0, max_shift), 1 - np.random.uniform(0, max_shift)]

    return np.float32([top_left, top_right, bottom_right, bottom_left])

In [20]:
def perspective_transform(landmarks, pts2):
    pts1 = np.float32([[0, 0], [1, 0], [1, 1], [0, 1]])
    
    M = cv2.getPerspectiveTransform(pts1, pts2)
    
    transformed = {}
    for key in landmarks:
        if landmarks[key]:
            points = np.array(landmarks[key], dtype=np.float32)

            if len(points) == 0:
                continue
            
            xy_points = points[:, :2]    

            transformed_points = cv2.perspectiveTransform(xy_points.reshape(-1, 1, 2), M).reshape(-1, 2)
            
            z_values = points[:, 2]
            max_z = np.max(z_values)
            z_noise = np.random.choice([-1, 1]) * np.random.uniform(0.02, 0.04) * max_z
            
            new_transformed_points = []
            for i in range(len(transformed_points)):
                transformed_point_with_z = np.append(transformed_points[i], z_values[i] + z_noise)
                new_transformed_points.append(transformed_point_with_z)

            transformed[key] = np.array(new_transformed_points).tolist()
        else:
            transformed[key] = []
    return transformed

In [21]:
def augment_data(landmarks_dict):
    import numpy as np

    rotated_landmarks_dict = {}
    squeezed_landmarks_dict = {} 
    perspectived_landmarks_dict = {}
    rotated_squeezed_landmarks_dict = {}
    rotated_translated_landmarks_dict = {}
    rotated_perspective_landmarks_dict = {}
    translated_squeezed_landmarks_dict = {}
    translated_perspective_landmarks_dict = {}
    perspective_squeezed_landmarks_dict = {}

    center = (0.5, 0.5)

    angle_r = np.random.choice([-1, 1]) * np.random.uniform(10, 20)

    squeeze_x = np.random.uniform(1 - 0.3, 0.9)
    squeeze_y = np.random.uniform(1 - 0.3, 0.9)

    pts2_p = generate_pts2(max_shift=0.2)

    angle_rs = np.random.choice([-1, 1]) * np.random.uniform(10, 20)
    squeeze_rx = np.random.uniform(1 - 0.3, 0.9)
    squeeze_ry = np.random.uniform(1 - 0.3, 0.9)

    angle_rt = np.random.choice([-1, 1]) * np.random.uniform(10, 20)
    dx_rt = np.random.choice([-1, 1]) * np.random.uniform(0.05, 0.1)
    dy_rt = np.random.choice([-1, 1]) * np.random.uniform(0.05, 0.1)

    angle_rp = np.random.choice([-1, 1]) * np.random.uniform(10, 20)
    pts2_rp = generate_pts2(max_shift=0.2)

    dx_ts = np.random.choice([-1, 1]) * np.random.uniform(0.075, 0.15)
    dy_ts = np.random.choice([-1, 1]) * np.random.uniform(0.075, 0.15)
    squeeze_tsx = np.random.uniform(1 - 0.3, 0.9)
    squeeze_tsy = np.random.uniform(1 - 0.3, 0.9)

    dx_tp = np.random.choice([-1, 1]) * np.random.uniform(0.1, 0.25)
    dy_tp = np.random.choice([-1, 1]) * np.random.uniform(0.1, 0.25)
    pts2_tp = generate_pts2(max_shift=0.2)

    pts2_ps = generate_pts2(max_shift=0.2)
    squeeze_psx = np.random.uniform(1 - 0.3, 0.9)
    squeeze_psy = np.random.uniform(1 - 0.3, 0.9)

    for idx, landmarks in landmarks_dict.items():
        rotated_landmarks_dict[idx] = rotate_landmarks(landmarks, angle_r, center)
        squeezed_landmarks_dict[idx] = squeeze_landmarks(landmarks, squeeze_x, squeeze_y)  # <-- thay translated
        perspectived_landmarks_dict[idx] = perspective_transform(landmarks, pts2_p)

        rotated = rotate_landmarks(landmarks, angle_rs, center)
        rotated_squeezed_landmarks_dict[idx] = squeeze_landmarks(rotated, squeeze_rx, squeeze_ry)

        rotated2 = rotate_landmarks(landmarks, angle_rt, center)
        rotated_translated_landmarks_dict[idx] = translate_landmarks(rotated2, dx_rt, dy_rt)

        rotated3 = rotate_landmarks(landmarks, angle_rp, center)
        rotated_perspective_landmarks_dict[idx] = perspective_transform(rotated3, pts2_rp)

        translated = translate_landmarks(landmarks, dx_ts, dy_ts)
        translated_squeezed_landmarks_dict[idx] = squeeze_landmarks(translated, squeeze_tsx, squeeze_tsy)

        translated2 = translate_landmarks(landmarks, dx_tp, dy_tp)
        translated_perspective_landmarks_dict[idx] = perspective_transform(translated2, pts2_tp)

        perspectived = perspective_transform(landmarks, pts2_ps)
        perspective_squeezed_landmarks_dict[idx] = squeeze_landmarks(perspectived, squeeze_psx, squeeze_psy)

    return (
        rotated_landmarks_dict,
        squeezed_landmarks_dict,  
        perspectived_landmarks_dict,
        rotated_squeezed_landmarks_dict,
        rotated_translated_landmarks_dict,
        rotated_perspective_landmarks_dict,
        translated_squeezed_landmarks_dict,
        translated_perspective_landmarks_dict,
        perspective_squeezed_landmarks_dict
    )


In [22]:
def calculate_head_unit(pose_landmarks):

    left_eye, right_eye = pose_landmarks[3], pose_landmarks[6]
    head_unit = euclidean_distance(left_eye,right_eye)
    return head_unit

In [23]:
def calculate_sign_space(pose_landmarks):
    head_unit = calculate_head_unit(pose_landmarks)

    nose = pose_landmarks[0]
   
   
    width = 7 * head_unit
    
    center_x, center_y , center_z = nose

    x1 = center_x - width / 2

    y1 = center_y - 1.5 * head_unit 
    x2 = center_x + width / 2
    y2 = center_y + 8 * head_unit 
    return [x1, y1, x2, y2]

In [24]:
def draw_normalized_sign_space(frame, sign_space):
    x1, y1, x2, y2 = sign_space
    h, w, _ = frame.shape

    # Convert normalized coordinates to pixel coordinates
    x1_pixel, y1_pixel = int(x1 * w), int(y1 * h)
    x2_pixel, y2_pixel = int(x2 * w), int(y2 * h)

    annotated_frame = frame.copy()
    cv2.rectangle(annotated_frame, (x1_pixel, y1_pixel), (x2_pixel, y2_pixel), (0, 255, 0), 2)

    return annotated_frame

In [25]:
def calculate_all_sign_space(landmarks_dict):
    sign_spaces = {}
    for idx, landmarks_data in landmarks_dict.items():
        sign_spaces[idx] = calculate_sign_space(landmarks_data["pose"])
    return sign_spaces

In [26]:
def normalize_landmarks_to_sign_space(landmarks_dict, sign_spaces):
    
    normalized = {}
    for landmarks_idx, landmarks_data in landmarks_dict.items():
        Xmin, Ymin, Xmax, Ymax = sign_spaces[landmarks_idx]
        w = Xmax - Xmin
        h = Ymax - Ymin
        normalized_landmarks = {}
        for part in ["pose", "right", "left"]:
            processed_points = []

            z_nose = None
            if part == "pose" and len(landmarks_data[part]) > 0:
                z_nose = landmarks_data[part][0][2]
                if abs(z_nose)< 0.01:
                    z_nose = None

            for point in landmarks_data[part]:
                x = float(point[0])
                y = float(point[1])
                z = float(point[2])
                if x != 0.0 and y != 0.0:
                   x = (x - Xmin) / w
                   y = (y - Ymin) / h

                if z_nose is not None and abs(z) > 0.0001:
                    z = z / z_nose
                processed_points.append((x, y, z))

            
            normalized_landmarks[part] = processed_points
        normalized[landmarks_idx] = normalized_landmarks
    return normalized

In [27]:
import json

In [29]:
with open("/kaggle/input/wlasl-processed/nslt_2000.json", "r") as f:
    dataset = json.load(f)


MISSING_VIDEOS_PATH = f"{PROCESSED_DATA_DIR}/missing_videos.txt" 

allowed_gloss_ids = {
    2, 6, 8, 15, 17, 24, 26, 27, 28, 29, 46, 60, 171, 1, 4, 5, 9, 10, 11, 12,
    16, 19, 21, 22, 23, 32, 33, 35, 36, 38, 39, 41, 42, 48, 50, 51, 52, 53, 54,
    58, 59, 62, 65, 66, 67, 75, 77, 82, 84, 85, 86, 88, 89, 100, 102, 108, 110,
    112, 113, 116, 121, 123, 126, 128, 130, 132, 133, 136, 140, 141, 144, 146,
    151, 164, 167, 168, 169, 174, 177, 182, 184, 199, 202, 206, 208, 210, 223,
    225, 226, 232, 233, 236, 244, 246, 247, 249, 256, 261, 267, 269
}
allowed_gloss_ids = set(map(str, allowed_gloss_ids)) 


split_data = {"train": {}, "val": {}, "test": {}}
count =0

with open(MISSING_VIDEOS_PATH, "w") as missing_file:
    for video_id, info in dataset.items():
        subset = info["subset"]  
        gloss_id = str(info["action"][0])  
        
        if gloss_id not in allowed_gloss_ids:
            continue
    
        if gloss_id not in split_data[subset]:
            split_data[subset][gloss_id] = []
            
        # kiểm tra video có tồn tại
        primary_video_path = rf'/kaggle/input/wlasl-processed/videos/{video_id}.mp4'
        backup_video_path = rf'/kaggle/input/wlasl2000-resized/wlasl-complete/videos/{video_id}.mp4'
        if os.path.exists(primary_video_path):
            split_data[subset][gloss_id].append(primary_video_path)
        elif os.path.exists(backup_video_path):
            split_data[subset][gloss_id].append(backup_video_path)
        else:
            missing_file.write(f"{video_id}\n")
            count += 1
    


for subset in ["train", "val", "test"]:
    for gloss_id in split_data[subset]:
        split_data[subset][gloss_id].sort()

    split_data[subset] = dict(sorted(split_data[subset].items(), key=lambda x: int(x[0])))







    
    filename = f"{PROCESSED_DATA_DIR}/{subset}_100.json"
    with open(filename, "w") as f:
        json.dump(split_data[subset], f, indent=4)

total_train = sum(len(videos) for videos in split_data["train"].values())
total_val = sum(len(videos) for videos in split_data["val"].values())
total_test = sum(len(videos) for videos in split_data["test"].values())
total_samples = total_train + total_val + total_test
train_ratio = (total_train / total_samples) * 100
val_ratio = (total_val / total_samples) * 100
test_ratio = (total_test / total_samples) * 100

print(f"Train ratio: {train_ratio:.2f}%")
print(f"Validation ratio: {val_ratio:.2f}%")
print(f"Test ratio: {test_ratio:.2f}%")


Train ratio: 69.76%
Validation ratio: 17.24%
Test ratio: 13.01%


In [30]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

In [31]:
OUTPUT_DIR = f"{PROCESSED_DATA_DIR}/train_by_gloss"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def process_gloss(gloss_id, video_list):
    gloss_data = []
    for video_path in video_list:
        try:
            keyframes = Extract_key_frames(video_path)
            if len(keyframes) < 4:
                continue
            landmarks_dict = extract_landmarks(keyframes)

            (rotated_landmarks_dict, 
             translated_landmarks_dict, 
             perspectived_landmarks_dict, 
             rotated_squeezed_landmarks_dict, 
             rotated_translated_landmarks_dict, 
             rotated_perspective_landmarks_dict, 
             translated_squeezed_landmarks_dict, 
             translated_perspective_landmarks_dict, 
             perspective_squeezed_landmarks_dict) = augment_data(landmarks_dict)
                

            for augmented_landmarks in [
                landmarks_dict,
                rotated_landmarks_dict,
                translated_landmarks_dict,
                perspectived_landmarks_dict,
                rotated_squeezed_landmarks_dict,
                rotated_translated_landmarks_dict,
                rotated_perspective_landmarks_dict,
                translated_squeezed_landmarks_dict,
                translated_perspective_landmarks_dict,
                perspective_squeezed_landmarks_dict
            ]:
                filtered = filter_and_interpolate_landmarks(augmented_landmarks)
                sign_spaces = calculate_all_sign_space(filtered)
                normalized = normalize_landmarks_to_sign_space(filtered, sign_spaces)

                gloss_data.append({
                    "keyframes": len(normalized),
                    "landmarks": normalized
                })
        except Exception as e:
            print(f" Error processing video: {video_path} (Gloss: {gloss_id}) - {e}")
            continue

    output_path = os.path.join(OUTPUT_DIR, f"{gloss_id}.json")
    with open(output_path, "w") as f:
        json.dump(gloss_data, f)
    
    return gloss_id


def process_train():
    print(" Start Processing train data")

    with open(f"{PROCESSED_DATA_DIR}/train_100.json", "r") as f:
        train_data = json.load(f)

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = []
        for gloss_id, video_list in train_data.items():
            futures.append(executor.submit(process_gloss, gloss_id, video_list))

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing glosses"):
            future.result() 

    print("All glosses have been processed and saved to separate files successfully!")

    OUTPUT_JSON_PATH = f"{PROCESSED_DATA_DIR}/wasl100_landmarks_train.json"
    all_data = {}
    for filename in os.listdir(OUTPUT_DIR):
        gloss_id = filename.replace(".json", "")
        with open(os.path.join(OUTPUT_DIR, filename), "r") as f:
            all_data[gloss_id] = json.load(f)

    with open(OUTPUT_JSON_PATH, "w") as f:
        json.dump(all_data, f)

    print(f"The data has been merged: {OUTPUT_JSON_PATH}")

In [32]:
!lscpu

Architecture:             x86_64
  CPU op-mode(s):         32-bit, 64-bit
  Address sizes:          46 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   4
  On-line CPU(s) list:    0-3
Vendor ID:                GenuineIntel
  Model name:             Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:           6
    Model:                79
    Thread(s) per core:   2
    Core(s) per socket:   2
    Socket(s):            1
    Stepping:             0
    BogoMIPS:             4399.99
    Flags:                fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge m
                          ca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht sysc
                          all nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xt
                          opology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq
                           ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt
                           aes xsave avx f16c rdrand hypervisor 

In [33]:
process_train()

 Start Processing traindata


  x = (x - Xmin) / w
  y = (y - Ymin) / h
  x = (x - Xmin) / w
  y = (y - Ymin) / h
Processing glosses: 100%|██████████| 100/100 [23:50<00:00, 14.31s/it]


All glosses have been processed and saved to separate files successfully!
The data has been merged: /kaggle/working/wasl100_landmarks_train.json


In [34]:
!rm -rf /kaggle/working/train_by_gloss

In [35]:
def process_val():
    print("Start Processing valdata")
    
    OUTPUT_JSON_PATH = f"{PROCESSED_DATA_DIR}/wasl100_landmarks_val.json"
    
    with open(f"{PROCESSED_DATA_DIR}/val_100.json", "r") as f:
        val_data = json.load(f)
    
    processed_data = {}
    for gloss_id, video_list in tqdm(val_data.items(), desc="Val: Processing gloss IDs"):
        processed_data[gloss_id] = [] 
    
        for video_path in video_list:
            try:
                keyframes = Extract_key_frames(video_path)
                if len(keyframes) <4: #< 5:
                    continue
                landmarks_dict = extract_landmarks(keyframes)
                filtered = filter_and_interpolate_landmarks(landmarks_dict)
                sign_spaces = calculate_all_sign_space(filtered)
                normalized = normalize_landmarks_to_sign_space(filtered, sign_spaces)
                processed_data[gloss_id].append({
                    "keyframes": len(normalized),
                    "landmarks": normalized
                    })
            except Exception as e:
                print(f" Error processing video: {video_path} (Gloss: {gloss_id}) - {e}")
                continue
            
        with open(OUTPUT_JSON_PATH, "w") as f:
            json.dump(processed_data, f, indent=None)
    print("All videos have been processed and saved successfully! "
)

> process_test

In [36]:
def process_test():
    print("Start Processing testdata")
    
    OUTPUT_JSON_PATH = f"{PROCESSED_DATA_DIR}/wasl100_landmarks_test.json"
    
    with open(f"{PROCESSED_DATA_DIR}/test_100.json", "r") as f:
        test_data = json.load(f)
    
    processed_data = {}
    for gloss_id, video_list in tqdm(test_data.items(), desc="test: Processing gloss IDs"):
        processed_data[gloss_id] = []
    
        for video_path in video_list:
            try:
                keyframes = Extract_key_frames(video_path)
                if len(keyframes) < 4:
                    continue
                landmarks_dict = extract_landmarks(keyframes)
                filtered = filter_and_interpolate_landmarks(landmarks_dict)
                sign_spaces = calculate_all_sign_space(filtered)
                normalized = normalize_landmarks_to_sign_space(filtered, sign_spaces)
                processed_data[gloss_id].append({
                    "keyframes": len(normalized),
                    "landmarks": normalized
                    })
            except Exception as e:
                print(f" Error processing video: {video_path} (Gloss: {gloss_id}) - {e}")
                continue
            
        with open(OUTPUT_JSON_PATH, "w") as f:
            json.dump(processed_data, f, indent=None)
    print("All videos have been processed and saved successfully!")

In [37]:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    futures.append(executor.submit(process_test))
    futures.append(executor.submit(process_val))

    for future in concurrent.futures.as_completed(futures):
        pass

Start Processing testdata
Start Processing valdata


test: Processing gloss IDs:   0%|          | 0/100 [00:00<?, ?it/s]
test: Processing gloss IDs:   1%|          | 1/100 [00:10<17:05, 10.36s/it]
test: Processing gloss IDs:   3%|▎         | 3/100 [00:28<14:38,  9.06s/it][A
test: Processing gloss IDs:   4%|▍         | 4/100 [00:33<11:55,  7.45s/it][A
test: Processing gloss IDs:   5%|▌         | 5/100 [00:42<13:06,  8.28s/it][A
test: Processing gloss IDs:   6%|▌         | 6/100 [00:50<12:21,  7.89s/it][A
test: Processing gloss IDs:   8%|▊         | 8/100 [01:02<10:36,  6.92s/it][A
test: Processing gloss IDs:   9%|▉         | 9/100 [01:10<10:48,  7.12s/it][A
test: Processing gloss IDs:  10%|█         | 10/100 [01:16<10:18,  6.87s/it]A
test: Processing gloss IDs:  12%|█▏        | 12/100 [01:30<10:13,  6.97s/it]A
test: Processing gloss IDs:  13%|█▎        | 13/100 [01:36<09:52,  6.81s/it]A
test: Processing gloss IDs:  15%|█▌        | 15/100 [01:49<09:23,  6.63s/it][A
test: Processing gloss IDs:  16%|█▌        | 16/100 [01:54<08:56,  6.39s/it

 Error processing video: /kaggle/input/wlasl2000-resized/wlasl-complete/videos/28206.mp4 (Gloss: 86) - list index out of range



test: Processing gloss IDs:  67%|██████▋   | 67/100 [07:37<04:22,  7.96s/it][A
test: Processing gloss IDs:  68%|██████▊   | 68/100 [07:44<03:59,  7.48s/it][A
test: Processing gloss IDs:  70%|███████   | 70/100 [08:00<03:54,  7.81s/it][A
test: Processing gloss IDs:  72%|███████▏  | 72/100 [08:11<03:05,  6.62s/it][A
test: Processing gloss IDs:  73%|███████▎  | 73/100 [08:18<03:03,  6.80s/it][A
test: Processing gloss IDs:  74%|███████▍  | 74/100 [08:25<02:53,  6.66s/it][A
test: Processing gloss IDs:  76%|███████▌  | 76/100 [08:39<02:44,  6.85s/it][A
test: Processing gloss IDs:  77%|███████▋  | 77/100 [08:48<02:57,  7.72s/it][A
test: Processing gloss IDs:  78%|███████▊  | 78/100 [08:55<02:45,  7.52s/it][A
Val: Processing gloss IDs:  59%|█████▉    | 59/100 [08:59<06:18,  9.23s/it][A
test: Processing gloss IDs:  79%|███████▉  | 79/100 [09:04<02:42,  7.76s/it][A
test: Processing gloss IDs:  81%|████████  | 81/100 [09:20<02:26,  7.71s/it][A
test: Processing gloss IDs:  82%|████████▏ | 82/100

All videos have been processed and saved successfully!



Val: Processing gloss IDs:  77%|███████▋  | 77/100 [11:54<03:43,  9.71s/it][A
Val: Processing gloss IDs:  78%|███████▊  | 78/100 [12:01<03:17,  8.98s/it][A
Val: Processing gloss IDs:  79%|███████▉  | 79/100 [12:09<03:05,  8.84s/it][A
Val: Processing gloss IDs:  80%|████████  | 80/100 [12:17<02:50,  8.51s/it][A
Val: Processing gloss IDs:  81%|████████  | 81/100 [12:26<02:44,  8.64s/it][A
Val: Processing gloss IDs:  82%|████████▏ | 82/100 [12:33<02:27,  8.19s/it][A
Val: Processing gloss IDs:  83%|████████▎ | 83/100 [12:41<02:16,  8.06s/it][A
Val: Processing gloss IDs:  84%|████████▍ | 84/100 [12:48<02:05,  7.82s/it][A
Val: Processing gloss IDs:  85%|████████▌ | 85/100 [12:55<01:53,  7.56s/it][A
Val: Processing gloss IDs:  86%|████████▌ | 86/100 [13:02<01:42,  7.30s/it][A
Val: Processing gloss IDs:  87%|████████▋ | 87/100 [13:10<01:37,  7.48s/it][A
Val: Processing gloss IDs:  88%|████████▊ | 88/100 [13:15<01:21,  6.78s/it][A
Val: Processing gloss IDs:  89%|████████▉ | 89/100 

All videos have been processed and saved successfully! 





In [38]:
input_file_path = "/kaggle/input/wlasl-processed/wlasl_class_list.txt"
output_file_path = "/kaggle/working/top_100_classes.txt"



try:
    with open(input_file_path, "r") as input_file:
        with open(output_file_path, "w") as output_file:
            for i, line in enumerate(input_file):
                if str(i) in allowed_gloss_ids:
                    output_file.write(line)
    print(f"Saved {output_file_path}")
except Exception as e:
    print(f"Error: {e}")

Saved /kaggle/working/top_100_classes.txt
