### 1. Import Necessary Libraries

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import json
import os
from tqdm import tqdm
from math import sqrt

### 2. Configuration (Centralized Settings)

In [None]:
config = {
    # Path to the folder containing the WLASL video files
    "VIDEO_SOURCE_DIR": r"../data/wlasl-complete/videos",

    # Path to the JSON file that defines the dataset splits
    "SPLIT_FILE_PATH": r"../data/nslt_300.json",

    # Path where the final landmark .npz file will be saved
    "OUTPUT_NPZ_PATH": r"../data/Landmarks_GCN_augmented.npz",

    # How often to save a backup (every 250 videos)
    "SAVE_CHECKPOINT_EVERY_N_VIDEOS": 250,

    # --- MediaPipe Node Selection ---
    # We select specific pose landmarks to reduce data size and focus on
    # key points for sign language (arms, shoulders, etc.)
    "POSE_NODES": [
        mp.solutions.holistic.PoseLandmark.NOSE,
        mp.solutions.holistic.PoseLandmark.LEFT_SHOULDER,
        mp.solutions.holistic.PoseLandmark.RIGHT_SHOULDER,
        mp.solutions.holistic.PoseLandmark.LEFT_ELBOW,
        mp.solutions.holistic.PoseLandmark.RIGHT_ELBOW,
        mp.solutions.holistic.PoseLandmark.LEFT_WRIST,
        mp.solutions.holistic.PoseLandmark.RIGHT_WRIST,
        mp.solutions.holistic.PoseLandmark.LEFT_HIP,
        mp.solutions.holistic.PoseLandmark.RIGHT_HIP,
        mp.solutions.holistic.PoseLandmark.LEFT_KNEE,
        mp.solutions.holistic.PoseLandmark.RIGHT_KNEE,
        mp.solutions.holistic.PoseLandmark.LEFT_ANKLE,
        mp.solutions.holistic.PoseLandmark.RIGHT_ANKLE,
        mp.solutions.holistic.PoseLandmark.LEFT_INDEX,
        mp.solutions.holistic.PoseLandmark.RIGHT_INDEX
    ],

    # We select groups of face landmarks (lips, eyes)
    "FACE_GROUPS": [
        mp.solutions.face_mesh.FACEMESH_LIPS,
        mp.solutions.face_mesh.FACEMESH_LEFT_EYE,
        mp.solutions.face_mesh.FACEMESH_RIGHT_EYE,
        mp.solutions.face_mesh.FACEMESH_LEFT_IRIS if hasattr(mp.solutions.face_mesh, "FACEMESH_LEFT_IRIS") else (),
        mp.solutions.face_mesh.FACEMESH_RIGHT_IRIS if hasattr(mp.solutions.face_mesh, "FACEMESH_RIGHT_IRIS") else (),
    ]
}

### 3. Initialize MediaPipe Models

In [None]:
# These are the main tools we'll use from MediaPipe
mp_holistic = mp.solutions.holistic
mp_face_mesh = mp.solutions.face_mesh
mp_hands = mp.solutions.hands

### --- 4. Helper Functions ---

In [None]:
def lm_to_xyzc(lm, default_conf=1.0):
    """
    Converts a MediaPipe landmark object into a standard [x, y, z, confidence] list.

    Args:
        lm (mediapipe.framework.formats.landmark_pb2.Landmark): The landmark object.
        default_conf (float): The confidence value to use if 'visibility' is not available.

    Returns:
        list: A list containing [x, y, z, confidence].
    """
    x = lm.x if hasattr(lm, 'x') else 0.0
    y = lm.y if hasattr(lm, 'y') else 0.0
    z = lm.z if hasattr(lm, 'z') else 0.0
    
    # Use 'visibility' as the confidence score, if it exists
    conf = getattr(lm, 'visibility', None)
    if conf is None:
        conf = default_conf
        
    return [x, y, z, conf]


def sample_face_indices():
    """
    Selects a small, representative sample of 8 face landmarks.
    
    The full face mesh has 468 points, which is too much data. We sample
    from the groups defined in our config (lips, eyes) to get a small
    but useful subset.

    Returns:
        list: A list of 8 landmark indices to keep.
    """
    uniq = []
    for group in config["FACE_GROUPS"]:
        if not group:
            continue
        for pair in group:
            if isinstance(pair, (tuple, list)) and len(pair) >= 2:
                for idx in pair[:2]:
                    if idx not in uniq:
                        uniq.append(idx)
                        
    # If not enough points, just take the first 8
    if len(uniq) < 8:
        uniq = list(range(8))
        
    # Sample 8 points evenly from the unique list
    step = max(1, len(uniq) // 8)
    chosen = uniq[0:8*step:step][:8]
    
    return chosen

# Pre-calculate the face indices we will keep. This runs only once.
FACE_INDICES_TO_KEEP = sample_face_indices()


def build_adjacency_map(node_labels):
    """
    Builds an (N, N) adjacency matrix for the graph nodes.
    
    This matrix defines which nodes (landmarks) are connected to each other,
    forming the "skeleton" of the sign language performer.

    Args:
        node_labels (list): A list of tuples, e.g., [('pose','LEFT_WRIST'), ...].

    Returns:
        np.ndarray: An (N, N) binary adjacency matrix.
    """
    N = len(node_labels)
    adj = np.zeros((N, N), dtype=np.uint8)
    # Create a quick lookup map: {'pose_LEFT_WRIST': 5, ...}
    label_to_idx = {lbl: i for i, lbl in enumerate(node_labels)}

    # --- 1. Connect standard pose landmarks (e.g., shoulder-to-elbow) ---
    if hasattr(mp.solutions.pose, "POSE_CONNECTIONS"):
        pose_conn = mp.solutions.pose.POSE_CONNECTIONS
    else:
        pose_conn = ()

    for (a, b) in pose_conn:
        a_name = mp_holistic.PoseLandmark(a).name if isinstance(a, int) else None
        b_name = mp_holistic.PoseLandmark(b).name if isinstance(b, int) else None
        key_a = ('pose', a_name)
        key_b = ('pose', b_name)
        if key_a in label_to_idx and key_b in label_to_idx:
            ia, ib = label_to_idx[key_a], label_to_idx[key_b]
            adj[ia, ib] = 1
            adj[ib, ia] = 1

    # --- 2. Connect our specific list of POSE_NODES sequentially ---
    pose_node_indices = [label_to_idx.get(('pose', p.name)) for p in config["POSE_NODES"]]
    pose_node_indices = [i for i in pose_node_indices if i is not None]
    for i in range(len(pose_node_indices)-1):
        a, b = pose_node_indices[i], pose_node_indices[i+1]
        adj[a, b] = 1
        adj[b, a] = 1

    # --- 3. Connect midpoints (shoulder/hip) to their endpoints ---
    for mid_name, endpoints in [('mid_shoulder', ('LEFT_SHOULDER','RIGHT_SHOULDER')),
                                 ('mid_hip', ('LEFT_HIP','RIGHT_HIP'))]:
        mid_key = ('pose_mid', mid_name)
        if mid_key in label_to_idx:
            mid_i = label_to_idx[mid_key]
            for ep in endpoints:
                ep_key = ('pose', ep)
                if ep_key in label_to_idx:
                    ei = label_to_idx[ep_key]
                    adj[mid_i, ei] = 1
                    adj[ei, mid_i] = 1

    # --- 4. Connect hand landmarks (e.g., finger bones) ---
    if hasattr(mp_hands, "HAND_CONNECTIONS"):
        for hand_side in ('L','R'):
            for (a, b) in mp_hands.HAND_CONNECTIONS:
                node_a = ('hand', f"{hand_side}_{a}")
                node_b = ('hand', f"{hand_side}_{b}")
                if node_a in label_to_idx and node_b in label_to_idx:
                    ia, ib = label_to_idx[node_a], label_to_idx[node_b]
                    adj[ia, ib] = 1
                    adj[ib, ia] = 1

    # --- 5. Connect hands to wrists ---
    for side, pose_label in [('L','LEFT_WRIST'), ('R','RIGHT_WRIST')]:
        pose_key = ('pose', pose_label)
        hand_key = ('hand', f"{side}_0") # _0 is the hand's wrist landmark
        if pose_key in label_to_idx and hand_key in label_to_idx:
            pi = label_to_idx[pose_key]
            hi = label_to_idx[hand_key]
            adj[pi, hi] = 1
            adj[hi, pi] = 1

    # --- 6. Connect face points to the nose (as a central point) ---
    nose_key = ('pose', 'NOSE')
    if nose_key in label_to_idx:
        ni = label_to_idx[nose_key]
        for fi, lbl in enumerate(node_labels):
            if lbl[0] == 'face': # if it's a face landmark
                fi_idx = label_to_idx[lbl]
                adj[ni, fi_idx] = 1
                adj[fi_idx, ni] = 1

    return adj

In [None]:
def extract_compact_landmarks(video_path, holistic_model):
    """
    Processes a single video file and extracts all landmark data.

    Args:
        video_path (str): The file path to the video.
        holistic_model (mediapipe.solutions.holistic.Holistic): The initialized MediaPipe model.

    Returns:
        tuple: (landmark_array, torso_array, bone_array)
            - landmark_array (T, N, 4): (x,y,z,conf) for T frames, N nodes
            - torso_array (T, 1): Torso length for each frame
            - bone_array (T, M*3): Bone vectors for each frame
            Returns (None, None, None) on failure.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Could not open video file: {video_path}")
        return None, None, None

    frames = []
    torso_lengths = []
    bone_vectors_per_frame = []

    try:
        while cap.isOpened():
            # Read a frame from the video
            ret, frame = cap.read()
            if not ret:
                break # End of video
                
            # --- MediaPipe Processing ---
            # Convert BGR (OpenCV default) to RGB (MediaPipe default)
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False # Make read-only for performance
            # Run landmark detection
            results = holistic_model.process(image)
            image.flags.writeable = True # Make writable again

            node_features = []
            frame_torso_length = 0.0
            frame_bone_vectors = []

            # --- 1. Extract Pose Landmarks (and midpoints) ---
            if results.pose_landmarks:
                # Add the specific nodes from our config
                for p in config["POSE_NODES"]:
                    lm = results.pose_landmarks.landmark[p]
                    node_features.append(lm_to_xyzc(lm, default_conf=getattr(lm, 'visibility', 1.0)))
                
                # --- Calculate and add midpoints ---
                left_sh = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_SHOULDER]
                right_sh = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_SHOULDER]
                mid_sh = [(left_sh.x + right_sh.x)/2, (left_sh.y + right_sh.y)/2, (left_sh.z + right_sh.z)/2, 1.0]
                node_features.append(mid_sh)

                left_hip = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_HIP]
                right_hip = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_HIP]
                mid_hip = [(left_hip.x + right_hip.x)/2, (left_hip.y + right_hip.y)/2, (left_hip.z + right_hip.z)/2, 1.0]
                node_features.append(mid_hip)
                
                # --- 2. Calculate Torso Length (for normalization) ---
                # We use the distance between mid-shoulder and mid-hip.
                # This is a stable way to measure torso size.
                dx = mid_sh[0] - mid_hip[0]
                dy = mid_sh[1] - mid_hip[1]
                dz = mid_sh[2] - mid_hip[2]
                frame_torso_length = sqrt(dx**2 + dy**2 + dz**2)
            else:
                # If no pose is detected, pad with zeros
                num_pose_nodes = len(config["POSE_NODES"]) + 2 # +2 for midpoints
                for _ in range(num_pose_nodes):
                    node_features.append([0.0, 0.0, 0.0, 0.0])

            # --- 3. Extract Left Hand Landmarks (21 nodes) ---
            if results.left_hand_landmarks:
                for lm in results.left_hand_landmarks.landmark:
                    node_features.append(lm_to_xyzc(lm, default_conf=1.0))
            else:
                # Pad with zeros if no hand detected
                for _ in range(21):
                    node_features.append([0.0, 0.0, 0.0, 0.0])

            # --- 4. Extract Right Hand Landmarks (21 nodes) ---
            if results.right_hand_landmarks:
                for lm in results.right_hand_landmarks.landmark:
                    node_features.append(lm_to_xyzc(lm, default_conf=1.0))
            else:
                # Pad with zeros if no hand detected
                for _ in range(21):
                    node_features.append([0.0, 0.0, 0.0, 0.0])

            # --- 5. Extract Face Landmarks (8 sampled nodes) ---
            if results.face_landmarks:
                for idx in FACE_INDICES_TO_KEEP:
                    if idx < len(results.face_landmarks.landmark):
                        lm = results.face_landmarks.landmark[idx]
                        node_features.append(lm_to_xyzc(lm, default_conf=1.0))
                    else:
                        node_features.append([0.0, 0.0, 0.0, 0.0])
            else:
                # Pad with zeros if no face detected
                for _ in range(len(FACE_INDICES_TO_KEEP)):
                    node_features.append([0.0, 0.0, 0.0, 0.0])

            # --- 6. Calculate Bone Vectors ---
            # "Bones" are vectors (B - A) between key joints.
            # This captures motion *direction* in addition to joint *position*.
            if results.pose_landmarks:
                lm = results.pose_landmarks.landmark
                # Right Arm (shoulder->elbow, elbow->wrist)
                for start, end in [(mp_holistic.PoseLandmark.RIGHT_SHOULDER, mp_holistic.PoseLandmark.RIGHT_ELBOW),
                                   (mp_holistic.PoseLandmark.RIGHT_ELBOW, mp_holistic.PoseLandmark.RIGHT_WRIST)]:
                    vec = [lm[end].x - lm[start].x, lm[end].y - lm[start].y, lm[end].z - lm[start].z]
                    frame_bone_vectors.extend(vec)
                # Left Arm (shoulder->elbow, elbow->wrist)
                for start, end in [(mp_holistic.PoseLandmark.LEFT_SHOULDER, mp_holistic.PoseLandmark.LEFT_ELBOW),
                                   (mp_holistic.PoseLandmark.LEFT_ELBOW, mp_holistic.PoseLandmark.LEFT_WRIST)]:
                    vec = [lm[end].x - lm[start].x, lm[end].y - lm[start].y, lm[end].z - lm[start].z]
                    frame_bone_vectors.extend(vec)
            else:
                # Pad with zeros if no pose
                frame_bone_vectors = [0.0] * 12 # 4 bones * 3 axes (x,y,z)
            
            # Add all data for this frame to our lists
            frames.append(np.array(node_features, dtype=np.float32))
            torso_lengths.append(frame_torso_length)
            bone_vectors_per_frame.append(frame_bone_vectors)

    finally:
        # Always release the video capture
        cap.release()

    if not frames:
        return None, None, None
    
    # --- Final Step: Stack all frame data into large NumPy arrays ---
    landmark_seq = np.stack(frames, axis=0)
    torso_seq = np.array(torso_lengths, dtype=np.float32)
    bone_seq = np.array(bone_vectors_per_frame, dtype=np.float32)
    
    return landmark_seq, torso_seq, bone_seq

In [None]:
def main():
    """
    Main function to run the landmark extraction pipeline.
    
    1. Loads the video list from the JSON file.
    2. Defines the complete list of node labels.
    3. Builds the adjacency matrix.
    4. Initializes the MediaPipe Holistic model.
    5. Loops through all videos, extracts landmarks, and saves them.
    6. Saves checkpoints and a final .npz file.
    """
    
    # --- 1. Load video list ---
    print(f"Loading video list from: {config['SPLIT_FILE_PATH']}")
    with open(config["SPLIT_FILE_PATH"], 'r') as f:
        data = json.load(f)
    video_ids = list(data.keys())
    # Create a list of (video_id, full_video_path)
    video_paths = [(vid, os.path.join(config["VIDEO_SOURCE_DIR"], f"{vid}.mp4")) for vid in video_ids]
    print(f"Found {len(video_paths)} videos to process.")

    # --- 2. Define node labels ---
    # This list MUST match the order of extraction in extract_compact_landmarks
    node_labels = []
    # Pose nodes (from config)
    for p in config["POSE_NODES"]:
        node_labels.append(('pose', p.name))
    # Pose midpoints
    node_labels.append(('pose_mid', 'mid_shoulder'))
    node_labels.append(('pose_mid', 'mid_hip'))
    # Left hand
    for i in range(21):
        node_labels.append(('hand', f"L_{i}"))
    # Right hand
    for i in range(21):
        node_labels.append(('hand', f"R_{i}"))
    # Face nodes
    for idx in FACE_INDICES_TO_KEEP:
        node_labels.append(('face', f"F_{idx}"))
    
    print(f"Total nodes defined: {len(node_labels)}")

    # --- 3. Build the adjacency map ---
    print("Building adjacency map...")
    adjacency = build_adjacency_map(node_labels)

    # --- 4. Initialize MediaPipe Holistic ---
    holistic = mp_holistic.Holistic(static_image_mode=False,
                                    model_complexity=1,
                                    min_detection_confidence=0.5,
                                    min_tracking_confidence=0.5)

    # --- 5. Run processing loop ---
    # These dictionaries will store the data: {'video_id': np_array, ...}
    extracted_landmarks = {}
    extracted_torsos = {}
    extracted_bones = {}

    print("Starting landmark extraction...")
    try:
        for i, (vid, path) in enumerate(tqdm(video_paths, desc="Processing Videos")):
            if not os.path.exists(path):
                print(f"Warning: Video file not found, skipping: {path}")
                continue
                
            # This is where the core extraction happens
            seq, torso_len, bone_vec = extract_compact_landmarks(path, holistic)
            
            if seq is not None:
                extracted_landmarks[vid] = seq
                extracted_torsos[vid] = torso_len
                extracted_bones[vid] = bone_vec
                
            # --- 6. Save checkpoint ---
            if (i > 0) and (i+1) % config["SAVE_CHECKPOINT_EVERY_N_VIDEOS"] == 0:
                print(f"\n--- Saving checkpoint with {len(extracted_landmarks)} videos ---")
                np.savez_compressed(config["OUTPUT_NPZ_PATH"],
                                     _node_labels=np.array(node_labels, dtype=object),
                                     _adjacency=adjacency,
                                     _torso_lengths=extracted_torsos,
                                     _bone_vectors=extracted_bones,
                                     **extracted_landmarks)
    finally:
        # Always close the MediaPipe model
        holistic.close()

    # --- 7. Save final file ---
    print(f"\nExtraction complete. Saving final file to: {config['OUTPUT_NPZ_PATH']}")
    np.savez_compressed(config["OUTPUT_NPZ_PATH"],
                        _node_labels=np.array(node_labels, dtype=object),
                        _adjacency=adjacency,
                        _torso_lengths=extracted_torsos,
                        _bone_vectors=extracted_bones,
                        **extracted_landmarks)
    print("Done.")

In [None]:
# This guard ensures that main() only runs when you execute the script directly
if __name__ == "__main__":
    main()

Videos:   5%|▍         | 249/5118 [18:26<5:16:31,  3.90s/it]

Saving checkpoint with 250 videos


Videos:  10%|▉         | 499/5118 [1:17:22<2:45:26,  2.15s/it]   

Saving checkpoint with 500 videos


Videos:  15%|█▍        | 749/5118 [1:29:24<4:05:33,  3.37s/it]

Saving checkpoint with 750 videos


Videos:  20%|█▉        | 999/5118 [1:48:53<7:36:00,  6.64s/it] 

Saving checkpoint with 1000 videos


Videos:  24%|██▍       | 1249/5118 [2:17:13<7:55:21,  7.37s/it] 

Saving checkpoint with 1250 videos


Videos:  29%|██▉       | 1499/5118 [2:46:21<4:27:23,  4.43s/it] 

Saving checkpoint with 1500 videos


Videos:  34%|███▍      | 1749/5118 [3:03:33<3:43:09,  3.97s/it]

Saving checkpoint with 1750 videos


Videos:  39%|███▉      | 1999/5118 [3:20:25<4:53:33,  5.65s/it]

Saving checkpoint with 2000 videos


Videos:  44%|████▍     | 2249/5118 [3:34:27<1:59:30,  2.50s/it]

Saving checkpoint with 2250 videos


Videos:  49%|████▉     | 2499/5118 [3:48:55<2:24:02,  3.30s/it]

Saving checkpoint with 2500 videos


Videos:  54%|█████▎    | 2749/5118 [4:04:45<2:15:23,  3.43s/it]

Saving checkpoint with 2750 videos


Videos:  59%|█████▊    | 2999/5118 [4:20:05<2:03:45,  3.50s/it]

Saving checkpoint with 3000 videos


Videos:  63%|██████▎   | 3249/5118 [4:35:23<2:10:17,  4.18s/it]

Saving checkpoint with 3250 videos


Videos:  68%|██████▊   | 3499/5118 [4:51:51<1:54:02,  4.23s/it]

Saving checkpoint with 3500 videos


Videos:  73%|███████▎  | 3749/5118 [5:07:52<1:07:52,  2.97s/it]

Saving checkpoint with 3750 videos


Videos:  78%|███████▊  | 3999/5118 [5:23:31<1:01:22,  3.29s/it]

Saving checkpoint with 4000 videos


Videos:  83%|████████▎ | 4249/5118 [5:39:31<52:15,  3.61s/it]  

Saving checkpoint with 4250 videos


Videos:  88%|████████▊ | 4499/5118 [5:53:43<39:25,  3.82s/it]  

Saving checkpoint with 4500 videos


Videos:  93%|█████████▎| 4749/5118 [6:10:34<20:44,  3.37s/it]  

Saving checkpoint with 4750 videos


Videos:  98%|█████████▊| 4999/5118 [6:23:52<07:09,  3.61s/it]

Saving checkpoint with 5000 videos


Videos: 100%|██████████| 5118/5118 [6:30:59<00:00,  4.58s/it]


Saved: D:\Landmarks\Bones+Joints\Landmarks_GCN_augmented.npz
