In [None]:
!pip install tqdm joblib



In [None]:
!pip install --upgrade mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m27.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.21 sounddevice-0.5.1


In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm

# ---------- Augmentation Functions ----------

def center_crop(frame, crop_fraction=0.4):
    """
    Crop the center of the frame to the given fraction of original size.
    For instance, crop_fraction=0.4 returns a region that is 40% of the width and height.
    """
    h, w, _ = frame.shape
    new_w = int(w * crop_fraction)
    new_h = int(h * crop_fraction)
    start_x = (w - new_w) // 2
    start_y = (h - new_h) // 2
    cropped = frame[start_y:start_y+new_h, start_x:start_x+new_w]
    return cropped

def horizontal_flip(frame):
    """Flip the frame horizontally."""
    return cv2.flip(frame, 1)

def up_sample_video(frames, replicate_fraction=0.5):
    """
    Duplicate a subset of frames to increase the total frame count.
    replicate_fraction: fraction of original frames to duplicate.
    Final count will be roughly original_frames * (1 + replicate_fraction).
    """
    n_frames = len(frames)
    replicate_count = int(n_frames * replicate_fraction)
    # Uniformly select frames to duplicate
    indices = np.linspace(0, n_frames - 1, replicate_count, dtype=int)
    augmented = frames.copy()
    # Insert duplicates immediately after the selected frames (in reverse order)
    for idx in sorted(indices, reverse=True):
        augmented.insert(idx + 1, frames[idx])
    return augmented

def down_sample_video(frames, drop_fraction=0.35):
    """
    Uniformly drop a fraction of frames so that the final count is (1 - drop_fraction)
    times the original.
    """
    n_frames = len(frames)
    keep_count = int(n_frames * (1 - drop_fraction))
    indices = np.linspace(0, n_frames - 1, keep_count, dtype=int)
    return [frames[i] for i in indices]

# ---------- Video Processing Pipeline ----------

def read_video_frames(video_path):
    """Read all frames from the video and return them as a list."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

def write_video(frames, output_path, fps=30):
    """Write a list of frames to a video file using OpenCV."""
    if not frames:
        return
    h, w, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
    for frame in frames:
        out.write(frame)
    out.release()

def process_video_augmentations(video_path, output_dir):
    """
    Given an input video, apply a set of augmentations and save each result.
    The output files are saved with suffixes indicating the augmentation type.
    """
    # Read original frames
    frames = read_video_frames(video_path)
    if not frames:
        print(f"Warning: No frames found in {video_path}")
        return

    # Get the base name and create an output base filename
    base_name = os.path.splitext(os.path.basename(video_path))[0]

    # 1. Save Original for reference
    write_video(frames, os.path.join(output_dir, f"{base_name}_orig.mp4"))

    # 2. Center Crop: apply center crop to each frame
    cropped_frames = [center_crop(frame) for frame in frames]
    write_video(cropped_frames, os.path.join(output_dir, f"{base_name}_crop.mp4"))

    # 3. Horizontal Flip: apply flip to each frame
    flipped_frames = [horizontal_flip(frame) for frame in frames]
    write_video(flipped_frames, os.path.join(output_dir, f"{base_name}_flip.mp4"))

    # 4. Up-sample: duplicate 50% of frames uniformly
    # upsampled_frames = up_sample_video(frames, replicate_fraction=0.5)
    # write_video(upsampled_frames, os.path.join(output_dir, f"{base_name}_up.mp4"))

    # 5. Down-sample: drop 35% of frames uniformly
    downsampled_frames = down_sample_video(frames, drop_fraction=0.35)
    write_video(downsampled_frames, os.path.join(output_dir, f"{base_name}_down.mp4"))

def process_folder(input_root, output_root):
    """
    Traverse the folder structure (e.g., Greeting/ALright, Greeting/Hello, etc.),
    apply augmentations to each .mov video, and save the augmented videos.
    """
    for root, dirs, files in os.walk(input_root):
        for file in files:
            # Process only .mov files (case-insensitive)
            if file.lower().endswith(".mov"):
                video_path = os.path.join(root, file)
                # Create an output subfolder mirroring the input structure
                rel_path = os.path.relpath(root, input_root)
                output_dir = os.path.join(output_root, rel_path)
                os.makedirs(output_dir, exist_ok=True)
                print(f"Processing {video_path}...")
                process_video_augmentations(video_path, output_dir)

if __name__ == "__main__":
    # Define the input root folder (e.g., "Greeting") and output root folder.
    input_root = "/content/drive/MyDrive/AISC/Greetings"
    output_root = "/content/drive/MyDrive/AISC/Greeting_Augmented"
    process_folder(input_root, output_root)


Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0029.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0030.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0031.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0032.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0037.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0038.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0039.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0089.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0090.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_0091.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_9914.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_9915.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_9916.MOV...
Processing /content/drive/MyDrive/AISC/Greetings/Hello/MVI_9917.MOV...
Proces

# First Implementation


In [None]:
import os
import json
import cv2
import mediapipe as mp
from tqdm.auto import tqdm
import numpy as np
import gc
import warnings

# -- Functions for keypoint extraction --

def process_landmarks(landmarks):
    x_list, y_list = [], []
    for landmark in landmarks.landmark:
        x_list.append(landmark.x)
        y_list.append(landmark.y)
    return x_list, y_list

def process_hand_keypoints(results):
    hand1_x, hand1_y, hand2_x, hand2_y = [], [], [], []
    if results.multi_hand_landmarks is not None:
        if len(results.multi_hand_landmarks) > 0:
            hand1 = results.multi_hand_landmarks[0]
            hand1_x, hand1_y = process_landmarks(hand1)
        if len(results.multi_hand_landmarks) > 1:
            hand2 = results.multi_hand_landmarks[1]
            hand2_x, hand2_y = process_landmarks(hand2)
    return hand1_x, hand1_y, hand2_x, hand2_y

def process_pose_keypoints(results):
    pose = results.pose_landmarks
    if pose: # Only process if pose is not None
        pose_x, pose_y = process_landmarks(pose)
        return pose_x, pose_y
    else:
        # Return empty lists or NaN values if no pose is detected
        return [np.nan] * 25, [np.nan] * 25

def swap_hands(left_wrist, right_wrist, hand, input_hand):
    left_wrist_x, left_wrist_y = left_wrist
    right_wrist_x, right_wrist_y = right_wrist
    hand_x, hand_y = hand
    left_dist = (left_wrist_x - hand_x) ** 2 + (left_wrist_y - hand_y) ** 2
    right_dist = (right_wrist_x - hand_x) ** 2 + (right_wrist_y - hand_y) ** 2
    if left_dist < right_dist and input_hand == "h2":
        return True
    if right_dist < left_dist and input_hand == "h1":
        return True
    return False

def process_video(path, save_dir):
    """
    Process a video to extract keypoints and save them in a JSON file.
    The label is inferred from the parent folder name.
    """
    hands = mp.solutions.hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    # Increase model_complexity if needed (here set to 2)
    pose = mp.solutions.pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=2)

    pose_points_x, pose_points_y = [], []
    hand1_points_x, hand1_points_y = [], []
    hand2_points_x, hand2_points_y = [], []

    # The label is assumed to be the parent folder's name.
    label = os.path.basename(os.path.dirname(path))
    label = "".join([i for i in label if i.isalpha()]).lower()
    uid = os.path.splitext(os.path.basename(path))[0]
    uid = "_".join([label, uid])
    n_frames = 0

    if not os.path.isfile(path):
        warnings.warn(path + " file not found")
    cap = cv2.VideoCapture(path)
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            break
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        hand_results = hands.process(image)
        pose_results = pose.process(image)

        hand1_x, hand1_y, hand2_x, hand2_y = process_hand_keypoints(hand_results)
        pose_x, pose_y = process_pose_keypoints(pose_results)

        # Swap hands if needed based on wrist distances
        if len(hand1_x) > 0 and len(hand2_x) == 0:
            if swap_hands(
                left_wrist=(pose_x[15], pose_y[15]),
                right_wrist=(pose_x[16], pose_y[16]),
                hand=(hand1_x[0], hand1_y[0]),
                input_hand="h1",
            ):
                hand1_x, hand1_y, hand2_x, hand2_y = hand2_x, hand2_y, hand1_x, hand1_y
        elif len(hand1_x) == 0 and len(hand2_x) > 0:
            if swap_hands(
                left_wrist=(pose_x[15], pose_y[15]),
                right_wrist=(pose_x[16], pose_y[16]),
                hand=(hand2_x[0], hand2_y[0]),
                input_hand="h2",
            ):
                hand1_x, hand1_y, hand2_x, hand2_y = hand2_x, hand2_y, hand1_x, hand1_y

        # Set missing keypoints to NaN for later interpolation.
        pose_x = pose_x if pose_x else [np.nan] * 25
        pose_y = pose_y if pose_y else [np.nan] * 25
        hand1_x = hand1_x if hand1_x else [np.nan] * 21
        hand1_y = hand1_y if hand1_y else [np.nan] * 21
        hand2_x = hand2_x if hand2_x else [np.nan] * 21
        hand2_y = hand2_y if hand2_y else [np.nan] * 21

        pose_points_x.append(pose_x)
        pose_points_y.append(pose_y)
        hand1_points_x.append(hand1_x)
        hand1_points_y.append(hand1_y)
        hand2_points_x.append(hand2_x)
        hand2_points_y.append(hand2_y)

        n_frames += 1

    cap.release()

    # Ensure that at least one frame is processed.
    pose_points_x = pose_points_x if pose_points_x else [[np.nan] * 25]
    pose_points_y = pose_points_y if pose_points_y else [[np.nan] * 25]
    hand1_points_x = hand1_points_x if hand1_points_x else [[np.nan] * 21]
    hand1_points_y = hand1_points_y if hand1_points_y else [[np.nan] * 21]
    hand2_points_x = hand2_points_x if hand2_points_x else [[np.nan] * 21]
    hand2_points_y = hand2_points_y if hand2_points_y else [[np.nan] * 21]

    # Prepare the JSON data.
    save_data = {
        "uid": uid,
        "label": label,
        "pose_x": pose_points_x,
        "pose_y": pose_points_y,
        "hand1_x": hand1_points_x,
        "hand1_y": hand1_points_y,
        "hand2_x": hand2_points_x,
        "hand2_y": hand2_points_y,
        "n_frames": n_frames,
    }
    # Save the JSON file. Each video produces one JSON file.
    json_filename = f"{uid}.json"
    json_path = os.path.join(save_dir, json_filename)
    with open(json_path, "w") as f:
        json.dump(save_data, f)

    hands.close()
    pose.close()
    del hands, pose, save_data
    gc.collect()

def process_all_videos(input_root, output_dir):
    """
    Walk through the folder structure (e.g., Greeting/...) and process all videos sequentially.
    Each JSON is saved in the output_dir.
    """
    os.makedirs(output_dir, exist_ok=True)
    video_paths = []
    # Walk through the root folder recursively.
    for root, dirs, files in os.walk(input_root):
        for file in files:
            # Process only video files with extensions .mp4, .avi, .mov
            if file.lower().endswith((".mp4",".mov")):
                video_paths.append(os.path.join(root, file))
    # Process videos one by one
    for path in tqdm(video_paths, desc="Processing Videos"):
        process_video(path, output_dir)

if __name__ == "__main__":
    # Define the main folder (e.g., "Greeting") and output directory.
    input_root = "/content/drive/MyDrive/AISC/Greeting_Augmented"
    # Set output_dir as a folder, not a file
    output_dir = "/content/drive/MyDrive/AISC/json_files"
    process_all_videos(input_root, output_dir)


Processing Videos:   0%|          | 0/447 [00:00<?, ?it/s]

Downloading model to /usr/local/lib/python3.11/dist-packages/mediapipe/modules/pose_landmark/pose_landmark_heavy.tflite


In [None]:
import os
import json
import numpy as np
import xgboost as xgb
import joblib
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def fix_length(lst, expected):
    return lst[:expected] + [np.nan] * (expected - len(lst)) if len(lst) < expected else lst[:expected]

def load_feature_vector(json_file, target_frames=200):
    with open(json_file, 'r') as f:
        data = json.load(f)

    per_frame_dim = 134  # Total keypoints per frame
    current_frames = len(data.get("pose_x", []))
    frames = []

    for i in range(current_frames):
        pose_x = fix_length(data.get("pose_x", [])[i], 25) if i < current_frames else [np.nan] * 25
        pose_y = fix_length(data.get("pose_y", [])[i], 25) if i < current_frames else [np.nan] * 25
        hand1_x = fix_length(data.get("hand1_x", [])[i], 21) if i < current_frames else [np.nan] * 21
        hand1_y = fix_length(data.get("hand1_y", [])[i], 21) if i < current_frames else [np.nan] * 21
        hand2_x = fix_length(data.get("hand2_x", [])[i], 21) if i < current_frames else [np.nan] * 21
        hand2_y = fix_length(data.get("hand2_y", [])[i], 21) if i < current_frames else [np.nan] * 21
        frames.append(np.array(pose_x + pose_y + hand1_x + hand1_y + hand2_x + hand2_y, dtype=np.float32))

    if not frames:
        frames = [np.zeros(per_frame_dim, dtype=np.float32) for _ in range(target_frames)]
    elif len(frames) < target_frames:
        frames.extend([np.zeros(per_frame_dim, dtype=np.float32)] * (target_frames - len(frames)))
    else:
        frames = [frames[i] for i in np.linspace(0, len(frames) - 1, target_frames, dtype=int)]

    feature_vector = np.concatenate(frames)
    if feature_vector.shape[0] != target_frames * per_frame_dim:
        raise ValueError(f"Feature vector length mismatch in {json_file}")

    return feature_vector, data["label"]

def load_dataset(json_dir, target_frames=200):
    X, y = [], []
    for file in os.listdir(json_dir):
        filepath = os.path.join(json_dir, file)
        if file.endswith(".json") and os.path.isfile(filepath):
            feature_vector, label = load_feature_vector(filepath, target_frames)
            X.append(feature_vector)
            y.append(label)
    return np.array(X), np.array(y)

if __name__ == "__main__":
    json_dir = "/content/drive/MyDrive/AISC/json_files"
    target_frames = 200
    X, y = load_dataset(json_dir, target_frames)

    unique_labels = sorted(set(y))
    label_to_int = {label: i for i, label in enumerate(unique_labels)}
    y_int = np.array([label_to_int[label] for label in y])

    X_train, X_test, y_train, y_test = train_test_split(X, y_int, test_size=0.2, random_state=42)

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    dtrain = xgb.DMatrix(X_train_scaled, label=y_train)
    dtest = xgb.DMatrix(X_test_scaled, label=y_test)

    params = {
        'max_depth': 5,
        'eta': 0.1,
        'objective': 'multi:softmax',
        'num_class': len(unique_labels),
        'verbosity': 1
    }

    model = xgb.train(params, dtrain, num_boost_round=200)
    preds = model.predict(dtest)
    print("Test accuracy:", accuracy_score(y_test, preds))

    model.save_model("xgboost_slr.model")
    joblib.dump(scaler, "scaler.pkl")


Test accuracy: 0.8888888888888888




# Second implementation

In [None]:
!git clone https://github.com/CMU-Perceptual-Computing-Lab/openpose
!cd openpose/
!git submodule update --init --recursive --remote

Cloning into 'openpose'...
remote: Enumerating objects: 16156, done.[K
remote: Total 16156 (delta 0), reused 0 (delta 0), pack-reused 16156 (from 1)[K
Receiving objects: 100% (16156/16156), 84.46 MiB | 16.69 MiB/s, done.
Resolving deltas: 100% (11324/11324), done.
fatal: not a git repository (or any of the parent directories): .git


In [None]:
import numpy as np
import cv2
import xgboost as xgb
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# Assuming you have helper functions for:
# - extract_keypoints: using a pre-trained OpenPose to extract key-points for each frame.
# - optical_flow_impute: to perform Lucas-Kanade optical flow imputation if needed.
# For demonstration, these functions are represented as placeholders.

def extract_keypoints(frame):
    """
    Placeholder function to extract keypoints using a pre-trained OpenPose model.
    Should return a numpy array of shape (num_points,) where num_points=96.
    """
    # TODO: Implement the actual extraction using OpenPose.
    return np.random.rand(96)  # Dummy data for illustration

def optical_flow_impute(prev_frame, curr_frame, prev_keypoints):
    """
    Placeholder function for Lucas-Kanade optical flow to impute missing keypoints.
    """
    # TODO: Implement optical flow imputation if required.
    # For now, simply return previous keypoints.
    return prev_keypoints

def extract_features_from_video(video_path, num_frames=200):
    """
    Process a video to extract keypoints from each frame.
    Pads with zeros if the video has fewer than num_frames frames.
    Returns a flattened feature vector of size 96 * num_frames.
    """
    cap = cv2.VideoCapture(video_path)
    frames_features = []
    prev_frame = None
    prev_keypoints = None
    frame_count = 0

    while cap.isOpened() and frame_count < num_frames:
        ret, frame = cap.read()
        if not ret:
            break
        # Preprocess the frame if needed (resize, grayscale, etc.)
        keypoints = extract_keypoints(frame)

        # Check if keypoints are valid (you can define a threshold or condition)
        if np.sum(keypoints) == 0 and prev_keypoints is not None:
            keypoints = optical_flow_impute(prev_frame, frame, prev_keypoints)

        frames_features.append(keypoints)
        prev_frame = frame.copy()
        prev_keypoints = keypoints
        frame_count += 1

    cap.release()

    # If video has fewer than num_frames frames, pad with zeros
    while len(frames_features) < num_frames:
        frames_features.append(np.zeros(96))

    # Convert list to numpy array and flatten
    feature_matrix = np.array(frames_features)  # shape: (num_frames, 96)
    flattened_features = feature_matrix.flatten()  # shape: (num_frames * 96,)
    return flattened_features

# Custom transformer to integrate our feature extraction into a pipeline
from sklearn.base import BaseEstimator, TransformerMixin

class VideoFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, num_frames=200):
        self.num_frames = num_frames

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        """
        X is expected to be an array-like of video file paths.
        Returns a 2D numpy array with each row being the flattened feature vector.
        """
        features = [extract_features_from_video(video_path, self.num_frames) for video_path in X]
        return np.array(features)

# Build the pipeline:
# Step 1: Feature extraction (including flattening)
# Step 2: Standard Scaling (Normalization)
# Step 3: XGBoost classifier

pipeline = Pipeline([
    ('feature_extractor', VideoFeatureExtractor(num_frames=200)),
    ('scaler', StandardScaler()),
    ('classifier', xgb.XGBClassifier(
        n_estimators=200,    # number of gradient boosted trees
        max_depth=5,         # maximum tree depth
        learning_rate=0.1,   # learning rate
        base_score=0.5,      # initial prediction score
        use_label_encoder=False,  # to suppress a warning in recent versions
        eval_metric='logloss'
    ))
])

# Example usage:
# Let's assume X_train is a list of video file paths and y_train are the corresponding labels.
# X_train = ['video1.mp4', 'video2.mp4', ...]
# y_train = [0, 1, ...]  # numerical labels for the signs

# Fit the pipeline
# pipeline.fit(X_train, y_train)

# Predict on new videos
# predictions = pipeline.predict(X_test)

# --------------------------
# Feature Importance Analysis:
# --------------------------

def compute_feature_importance(pipeline):
    """
    Given a fitted pipeline, extract the XGBoost model and compute feature importances.
    We then average the gain for specific keypoint groups (hands and arms).
    """
    # Get the XGBoost model
    xgb_model = pipeline.named_steps['classifier']

    # Get the feature importances (gain)
    importance_dict = xgb_model.get_booster().get_score(importance_type='gain')

    # Convert importance_dict keys to indices (assuming feature naming convention 'f0', 'f1', ...)
    importance_array = np.zeros(200 * 96)
    for k, gain in importance_dict.items():
        idx = int(k[1:])  # remove 'f' and convert to integer
        importance_array[idx] = gain

    # Reshape to (num_frames, 96) to analyze per frame
    importance_matrix = importance_array.reshape((200, 96))

    # Assume keypoints indices:
    #  - Hand keypoints: indices 48 to 95 (48 keypoints per frame for both hands)
    #  - Arm keypoints: indices 0 to 47 (shoulder, elbow, wrist, etc.)
    hand_importance = importance_matrix[:, 48:96].mean()
    arm_importance = importance_matrix[:, 0:48].mean()

    print(f"Average Gain for Hand Keypoints: {hand_importance}")
    print(f"Average Gain for Arm Keypoints: {arm_importance}")

    return importance_matrix

# After training the model, you can call:
# feature_importance_matrix = compute_feature_importance(pipeline)


In [None]:
import os
import json
import multiprocessing
import argparse
import cv2
import mediapipe as mp
from tqdm.auto import tqdm
from joblib import Parallel, delayed
import numpy as np
import gc
import warnings

def process_landmarks(landmarks):
    x_list, y_list = [], []
    for landmark in landmarks.landmark:
        x_list.append(landmark.x)
        y_list.append(landmark.y)
    return x_list, y_list

def process_hand_keypoints(results):
    hand1_x, hand1_y, hand2_x, hand2_y = [], [], [], []

    if results.multi_hand_landmarks is not None:
        if len(results.multi_hand_landmarks) > 0:
            hand1 = results.multi_hand_landmarks[0]
            hand1_x, hand1_y = process_landmarks(hand1)

        if len(results.multi_hand_landmarks) > 1:
            hand2 = results.multi_hand_landmarks[1]
            hand2_x, hand2_y = process_landmarks(hand2)

    return hand1_x, hand1_y, hand2_x, hand2_y

def process_pose_keypoints(results):
    if results.pose_landmarks:  # Check if pose landmarks are detected
        pose = results.pose_landmarks
        pose_x, pose_y = process_landmarks(pose)
        return pose_x, pose_y
    else:
        return [], []  # Return empty lists if no pose detected

def swap_hands(left_wrist, right_wrist, hand, input_hand):
    left_wrist_x, left_wrist_y = left_wrist
    right_wrist_x, right_wrist_y = right_wrist
    hand_x, hand_y = hand

    left_dist = (left_wrist_x - hand_x) ** 2 + (left_wrist_y - hand_y) ** 2
    right_dist = (right_wrist_x - hand_x) ** 2 + (right_wrist_y - hand_y) ** 2

    if left_dist < right_dist and input_hand == "h2":
        return True

    if right_dist < left_dist and input_hand == "h1":
        return True

    return False

def process_video(path, save_dir):
    hands = mp.solutions.hands.Hands(
        min_detection_confidence=0.5, min_tracking_confidence=0.5
    )
    # Removed the 'upper_body_only' parameter as it's no longer supported.
    pose = mp.solutions.pose.Pose(
        min_detection_confidence=0.5, min_tracking_confidence=0.5
    )

    pose_points_x, pose_points_y = [], []
    hand1_points_x, hand1_points_y = [], []
    hand2_points_x, hand2_points_y = [], []

    # Get label from parent folder name
    label = os.path.basename(os.path.dirname(path))
    label = "".join([i for i in label if i.isalpha()]).lower()
    uid = os.path.splitext(os.path.basename(path))[0]
    uid = "_".join([label, uid])
    n_frames = 0
    if not os.path.isfile(path):
        warnings.warn(path + " file not found: " + path)
        return  # Skip processing if file not found
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        warnings.warn(f"Error opening video file: {path}")
        return

    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            break
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        hand_results = hands.process(image)
        pose_results = pose.process(image)

        hand1_x, hand1_y, hand2_x, hand2_y = process_hand_keypoints(hand_results)
        pose_x, pose_y = process_pose_keypoints(pose_results)

        ## Assign hands to correct positions
        if len(hand1_x) > 0 and len(hand2_x) == 0 and len(pose_x) > 16 and len(pose_y) > 16:
            if swap_hands(
                left_wrist=(pose_x[15], pose_y[15]),
                right_wrist=(pose_x[16], pose_y[16]),
                hand=(hand1_x[0], hand1_y[0]),
                input_hand="h1",
            ):
                hand1_x, hand1_y, hand2_x, hand2_y = hand2_x, hand2_y, hand1_x, hand1_y

        elif len(hand1_x) == 0 and len(hand2_x) > 0 and len(pose_x) > 16 and len(pose_y) > 16:
            if swap_hands(
                left_wrist=(pose_x[15], pose_y[15]),
                right_wrist=(pose_x[16], pose_y[16]),
                hand=(hand2_x[0], hand2_y[0]),
                input_hand="h2",
            ):
                hand1_x, hand1_y, hand2_x, hand2_y = hand2_x, hand2_y, hand1_x, hand1_y

        ## Set to nan so that values can be interpolated in dataloader
        pose_x = pose_x if pose_x else [np.nan] * 25
        pose_y = pose_y if pose_y else [np.nan] * 25

        hand1_x = hand1_x if hand1_x else [np.nan] * 21
        hand1_y = hand1_y if hand1_y else [np.nan] * 21
        hand2_x = hand2_x if hand2_x else [np.nan] * 21
        hand2_y = hand2_y if hand2_y else [np.nan] * 21

        pose_points_x.append(pose_x)
        pose_points_y.append(pose_y)
        hand1_points_x.append(hand1_x)
        hand1_points_y.append(hand1_y)
        hand2_points_x.append(hand2_x)
        hand2_points_y.append(hand2_y)

        n_frames += 1

    cap.release()

    ## Set to nan so that values can be interpolated in dataloader
    pose_points_x = pose_points_x if pose_points_x else [[np.nan] * 25]
    pose_points_y = pose_points_y if pose_points_y else [[np.nan] * 25]

    hand1_points_x = hand1_points_x if hand1_points_x else [[np.nan] * 21]
    hand1_points_y = hand1_points_y if hand1_points_y else [[np.nan] * 21]
    hand2_points_x = hand2_points_x if hand2_points_x else [[np.nan] * 21]
    hand2_points_y = hand2_points_y if hand2_points_y else [[np.nan] * 21]

    save_data = {
        "uid": uid,
        "label": label,
        "pose_x": pose_points_x,
        "pose_y": pose_points_y,
        "hand1_x": hand1_points_x,
        "hand1_y": hand1_points_y,
        "hand2_x": hand2_points_x,
        "hand2_y": hand2_points_y,
        "n_frames": n_frames,
    }
    with open(os.path.join(save_dir, f"{uid}.json"), "w") as f:
        json.dump(save_data, f)

    hands.close()
    pose.close()
    del hands, pose, save_data
    gc.collect()

def get_all_video_paths(root_dir, extensions=(".mov", ".mp4")):
    """
    Recursively collect all video file paths in root_dir that match the given extensions.
    """
    video_paths = []
    for dirpath, dirnames, filenames in os.walk(root_dir):
        for filename in filenames:
            if filename.lower().endswith(extensions):
                video_paths.append(os.path.join(dirpath, filename))
    return video_paths

def save_keypoints(dataset, file_paths, mode, save_dir_base):
    save_dir = os.path.join(save_dir_base, f"{dataset}_{mode}_keypoints")
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    Parallel(n_jobs=n_cores, backend="multiprocessing")(
        delayed(process_video)(path, save_dir)
        for path in tqdm(file_paths, desc=f"processing {mode} videos")
    )

# if __name__ == "__main__":
#     parser = argparse.ArgumentParser(description="Generate keypoints from Mediapipe for Hands and Upper Body")
#     parser.add_argument(
#         "--include_dir",
#         default="/content/drive/MyDrive/AISC/Greeting_Augmented",
#         type=str,
#         help="Path to the location of videos",
#     )
#     parser.add_argument(
#         "--save_dir",
#         default="/content/drive/MyDrive/AISC/keypoints_output",
#         type=str,
#         help="Location to output json keypoint files",
#     )
#     parser.add_argument(
#         "--dataset", default="greeting_augmented", type=str, help="Dataset name"
#     )
#     parser.add_argument(
#         "--mode", default="all", type=str, help="Mode to process (set to 'all' to process all videos)"
#     )

#     # Use parse_known_args to ignore extra arguments passed by Colab/Jupyter.
#     args, unknown = parser.parse_known_args()

#     n_cores = multiprocessing.cpu_count()

#     # Recursively collect all video files from the include_dir
#     video_paths = get_all_video_paths(args.include_dir)
#     if not video_paths:
#         print("No video files found in", args.include_dir)
#     else:
#         print(f"Found {len(video_paths)} video files. Processing...")
#         save_keypoints(args.dataset, video_paths, args.mode, args.save_dir)

#     print("Keypoint extraction complete! Keypoints saved in:", args.save_dir)


In [None]:
import glob
import os

keypoints_dir = "/content/drive/MyDrive/AISC/keypoints_output/greeting_augmented_all_keypoints"
all_json_files = sorted(glob.glob(os.path.join(keypoints_dir, "*.json")))
print("Total JSON files:", len(all_json_files))

def extract_label_from_filename(filepath):
    filename = os.path.basename(filepath)
    label_str = filename.split('_')[0]
    label_str = "".join([c for c in label_str if c.isalpha()]).lower()
    return label_str

# Example:
labels_str = [extract_label_from_filename(f) for f in all_json_files]
unique_labels = sorted(set(labels_str))
print("Unique labels found:", unique_labels)



Total JSON files: 447
Unique labels found: ['alright', 'goodafternoon', 'goodmorning', 'hello', 'howareyou']


In [None]:
import numpy as np

# Shuffle data
rng = np.random.default_rng(seed=42)
indices = np.arange(len(all_json_files))
rng.shuffle(indices)

# Define split ratios
train_ratio = 0.7
val_ratio   = 0.15
test_ratio  = 0.15

train_end = int(train_ratio * len(indices))
val_end   = int((train_ratio + val_ratio) * len(indices))

train_indices = indices[:train_end]
val_indices   = indices[train_end:val_end]
test_indices  = indices[val_end:]

train_files  = [all_json_files[i] for i in train_indices]
val_files    = [all_json_files[i] for i in val_indices]
test_files   = [all_json_files[i] for i in test_indices]

train_labels_str = [labels_str[i] for i in train_indices]
val_labels_str   = [labels_str[i] for i in val_indices]
test_labels_str  = [labels_str[i] for i in test_indices]

print("Train set size:", len(train_files))
print("Val set size:  ", len(val_files))
print("Test set size: ", len(test_files))


Train set size: 312
Val set size:   67
Test set size:  68


In [None]:
label_map = {
    'alright': 0,
    'goodafternoon': 1,
    'goodmorning': 2,
    'hello': 3,
    'howareyou': 4
}

train_labels = np.array([label_map[lbl] for lbl in train_labels_str])
val_labels   = np.array([label_map[lbl] for lbl in val_labels_str])
test_labels  = np.array([label_map[lbl] for lbl in test_labels_str])


In [None]:
import json
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import xgboost as xgb


NUM_FRAMES = 120
FEATURES_PER_FRAME = 134

def load_keypoints(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    return data

def create_feature_vector(data, num_frames=NUM_FRAMES):
    pose_x   = data.get('pose_x', [])
    pose_y   = data.get('pose_y', [])
    hand1_x  = data.get('hand1_x', [])
    hand1_y  = data.get('hand1_y', [])
    hand2_x  = data.get('hand2_x', [])
    hand2_y  = data.get('hand2_y', [])

    total_frames = len(pose_x)
    features = []

    for i in range(num_frames):
        if i < total_frames:
            frame_features = (
                pose_x[i] + pose_y[i] +
                hand1_x[i] + hand1_y[i] +
                hand2_x[i] + hand2_y[i]
            )
            # Pad if needed
            frame_features = frame_features[:FEATURES_PER_FRAME] # added
            if len(frame_features) < FEATURES_PER_FRAME:
                frame_features += [np.nan] * (FEATURES_PER_FRAME - len(frame_features))
        else:
            frame_features = [np.nan] * FEATURES_PER_FRAME
        features.append(frame_features)

    return np.array(features, dtype=object).flatten() # modified

class KeypointFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, num_frames=NUM_FRAMES):
        self.num_frames = num_frames

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        all_features = []
        for path in X:
            data = load_keypoints(path)
            feat_vector = create_feature_vector(data, self.num_frames)
            all_features.append(feat_vector)
        return np.array(all_features)


pipeline = Pipeline([
    ('feature_extractor', KeypointFeatureExtractor(num_frames=NUM_FRAMES)),
    ('scaler', StandardScaler()),
    ('classifier', xgb.XGBClassifier(
        n_estimators=200,
        max_depth=5,
        learning_rate=0.1,
        base_score=0.5,
        use_label_encoder=False,
        eval_metric='logloss'
    ))
])

In [None]:
print("Fitting model on training data...")
pipeline.fit(train_files, train_labels)


Fitting model on training data...


Parameters: { "use_label_encoder" } are not used.



In [None]:
from sklearn.metrics import accuracy_score, classification_report

print("Evaluating on validation set...")
val_preds = pipeline.predict(val_files)
val_acc = accuracy_score(val_labels, val_preds)
print("Validation Accuracy:", val_acc)
print("Classification Report:")
print(classification_report(val_labels, val_preds, target_names=list(label_map.keys())))


Evaluating on validation set...
Validation Accuracy: 0.8507462686567164
Classification Report:
               precision    recall  f1-score   support

      alright       0.77      0.83      0.80        12
goodafternoon       0.86      0.80      0.83        15
  goodmorning       1.00      0.91      0.95        11
        hello       0.80      0.86      0.83        14
    howareyou       0.87      0.87      0.87        15

     accuracy                           0.85        67
    macro avg       0.86      0.85      0.85        67
 weighted avg       0.86      0.85      0.85        67



In [None]:
print("Evaluating on test set...")
test_preds = pipeline.predict(test_files)
test_acc = accuracy_score(test_labels, test_preds)
print("Test Accuracy:", test_acc)
print("Classification Report:")
print(classification_report(test_labels, test_preds, target_names=list(label_map.keys())))


Evaluating on test set...
Test Accuracy: 0.8676470588235294
Classification Report:
               precision    recall  f1-score   support

      alright       0.93      0.72      0.81        18
goodafternoon       1.00      0.87      0.93        15
  goodmorning       0.90      0.90      0.90        10
        hello       0.90      0.95      0.92        19
    howareyou       0.55      1.00      0.71         6

     accuracy                           0.87        68
    macro avg       0.85      0.89      0.85        68
 weighted avg       0.90      0.87      0.87        68



In [None]:
from joblib import dump

# Save pipeline to disk
dump(pipeline, "/content/drive/MyDrive/AISC/asl_pipeline.joblib")

xgb_clf = pipeline.named_steps['classifier']

# Save the booster
xgb_clf.get_booster().save_model("/content/drive/MyDrive/AISC/xgb_booster.json")


In [None]:
new_video_path  = "/content/drive/MyDrive/AISC/Greeting_Augmented/How are you/MVI_9987_orig.mp4"
output_json_dir = "/content/drive/MyDrive/AISC/keypoints_output/inference_keypoints"

os.makedirs(output_json_dir, exist_ok=True)
process_video(new_video_path, output_json_dir)

print("Keypoint extraction for new video complete!")


Keypoint extraction for new video complete!


In [None]:
from joblib import load
import glob
import os

# Load your saved pipeline (adjust the path if necessary)
pipeline = load("/content/drive/MyDrive/AISC/asl_pipeline.joblib")
print("Pipeline loaded successfully!")

# Define your output folder for inference keypoints
output_json_dir = "/content/drive/MyDrive/AISC/keypoints_output/inference_keypoints"

# Get the JSON file path from the output folder.
json_files = sorted(glob.glob(os.path.join(output_json_dir, "*.json")))
if not json_files:
    print("No JSON keypoint files found!")
else:
    # Labelmap to convert numeric predictions to text
    label_map = {
        0: 'alright',
        1: 'goodafternoon',
        2: 'goodmorning',
        3: 'hello',
        4: 'howareyou'
    }

    for json_file in json_files:
        prediction = pipeline.predict([json_file])[0]
        predicted_label = label_map.get(prediction, "Unknown")
        base_name = os.path.basename(json_file)
        real_label = base_name.split('_')[0].lower()
        print(f"File: {base_name}")
        print(f"Predicted Sign for the video: {predicted_label}")
        print(f"Real Sign for the video: {real_label}")
        print("-" * 40)


Pipeline loaded successfully!
File: alright_MVI_0044_down.json
Predicted Sign for the video: alright
Real Sign for the video: alright
----------------------------------------
File: alright_MVI_0096_crop.json
Predicted Sign for the video: alright
Real Sign for the video: alright
----------------------------------------
File: alright_MVI_9924.json
Predicted Sign for the video: alright
Real Sign for the video: alright
----------------------------------------
File: alright_MVI_9963_orig.json
Predicted Sign for the video: alright
Real Sign for the video: alright
----------------------------------------
File: alright_MVI_9989_crop.json
Predicted Sign for the video: alright
Real Sign for the video: alright
----------------------------------------
File: goodafternoon_MVI_0048_down.json
Predicted Sign for the video: goodafternoon
Real Sign for the video: goodafternoon
----------------------------------------
File: goodafternoon_MVI_0049.json
Predicted Sign for the video: goodafternoon
Real Sign