In [None]:
pip install mediapipe



In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
from tqdm import tqdm
from google.colab import drive

if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

PART_NUMBER = 1
CSV_FILE = f"/content/drive/MyDrive/ASL-Project/Data/metadata/1st_Prototype(100_word)/part_4.csv"
OUTPUT_DIR = "/content/drive/MyDrive/ASL-Project/Data/processed_prototype/1st_prototype_processed_2"
SEQUENCE_LENGTH = 50

os.makedirs(OUTPUT_DIR, exist_ok=True)




In [None]:
df=pd.read_csv(CSV_FILE)
df.head()

# 2. Advanced Preprocessing Logic (V3)


In [None]:
mp_holistic = mp.solutions.holistic

def normalize_hand(pts):
    """
    Normalizes hand landmarks relative to the wrist (point 0).
    Scale is determined by the distance between wrist and middle finger MCP (point 9).
    Ensures hand shape invariance across users and distances.
    """
    ref = pts[0].copy()  # Wrist
    scale = np.linalg.norm(pts[9] - ref)
    if scale < 1e-6:
        scale = 1.0
    return (pts - ref) / scale


def compute_torso_stats(pose_landmarks):
    """
    Computes torso center and scale based on shoulders and hips.
    Used to normalize pose landmarks to remove impact of camera distance
    and body size variations across different subjects.
    """
    torso_center = np.array([0.5, 0.5], dtype=np.float32)
    torso_scale = 1.0

    try:
        ps = pose_landmarks

        def get_xy(idx):
            lm = ps.landmark[idx]
            return np.array([lm.x, lm.y], dtype=np.float32)

        left_sh, right_sh = get_xy(11), get_xy(12)
        left_hip, right_hip = get_xy(23), get_xy(24)

        shoulder_center = (left_sh + right_sh) / 2.0
        hip_center = (left_hip + right_hip) / 2.0

        torso_center = (shoulder_center + hip_center) / 2.0

        shoulder_dist = np.linalg.norm(left_sh - right_sh)
        hip_dist = np.linalg.norm(left_hip - right_hip)

        torso_scale = max(shoulder_dist, hip_dist, 1e-6)

    except:
        pass

    return torso_center, float(torso_scale)


def extract_features_from_frame(results):
    """
    Extracts a 198-dimension feature vector:
    - Pose: 33 points × 2 = 66 values
    - Left Hand: 21 points × 3 + wrist_rel = 66 values
    - Right Hand: 21 points × 3 + wrist_rel = 66 values
    """
    feat = np.zeros(198, dtype=np.float32)

    torso_center = np.array([0.5, 0.5], dtype=np.float32)
    torso_scale = 1.0

    # 1. Pose (0–65)
    if results.pose_landmarks:
        torso_center, torso_scale = compute_torso_stats(results.pose_landmarks)
        pose_xy = np.array([[lm.x, lm.y] for lm in results.pose_landmarks.landmark], dtype=np.float32)
        pose_norm = (pose_xy - torso_center[None, :]) / torso_scale
        feat[0:66] = pose_norm.flatten()

    # 2. Left Hand (66–131)
    if results.left_hand_landmarks:
        l_pts = np.array([[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark], dtype=np.float32)
        feat[66:129] = normalize_hand(l_pts)[:, :3].flatten()

        wrist_rel = l_pts[0].copy()
        wrist_rel[:2] = (wrist_rel[:2] - torso_center) / torso_scale
        wrist_rel[2] /= torso_scale
        feat[129:132] = wrist_rel

    # 3. Right Hand (132–197)
    if results.right_hand_landmarks:
        r_pts = np.array([[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark], dtype=np.float32)
        feat[132:195] = normalize_hand(r_pts)[:, :3].flatten()

        wrist_rel = r_pts[0].copy()
        wrist_rel[:2] = (wrist_rel[:2] - torso_center) / torso_scale
        wrist_rel[2] /= torso_scale
        feat[195:198] = wrist_rel

    return feat


def process_video_pipeline(video_path):
    """
    Reads video and extracts a fixed-length temporal feature sequence.
    Steps:
    1. Extract features per frame
    2. Interpolate missing frames
    3. Uniform resampling or padding to SEQUENCE_LENGTH
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None

    frames_buffer = []

    with mp_holistic.Holistic(static_image_mode=False,
                              min_detection_confidence=0.5,
                              model_complexity=1) as holistic:

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(img_rgb)
            feat = extract_features_from_frame(results)

            if np.sum(np.abs(feat)) < 1e-6:
                frames_buffer.append(None)
            else:
                frames_buffer.append(feat)

    cap.release()

    if not frames_buffer:
        return None

    # Fill missing frames by interpolation
    for i in range(len(frames_buffer)):
        if frames_buffer[i] is None:
            prev_valid = next((frames_buffer[j] for j in range(i - 1, -1, -1)
                               if frames_buffer[j] is not None), None)
            next_valid = next((frames_buffer[j] for j in range(i + 1, len(frames_buffer))
                               if frames_buffer[j] is not None), None)

            if prev_valid is not None and next_valid is not None:
                frames_buffer[i] = (prev_valid + next_valid) / 2.0
            elif prev_valid is not None:
                frames_buffer[i] = prev_valid
            elif next_valid is not None:
                frames_buffer[i] = next_valid
            else:
                frames_buffer[i] = np.zeros(198, dtype=np.float32)

    data_array = np.array(frames_buffer, dtype=np.float32)
    length = len(data_array)

    # Resample or pad to SEQUENCE_LENGTH
    if length == SEQUENCE_LENGTH:
        final_data = data_array
    elif length < SEQUENCE_LENGTH:
        padding = np.zeros((SEQUENCE_LENGTH - length, 198), dtype=np.float32)
        final_data = np.vstack([data_array, padding])
    else:
        indices = np.linspace(0, length - 1, SEQUENCE_LENGTH, dtype=int)
        final_data = data_array[indices]

    return final_data


In [None]:
df = pd.read_csv(CSV_FILE)

print(f"Total videos: {len(df)}")
print(f"Words: {df['word'].unique()}")

for _, row in tqdm(df.iterrows(), total=len(df)):

    word = row["word"]
    video_path = str(row["full_path"]).strip()

    class_dir = os.path.join(OUTPUT_DIR, word)
    os.makedirs(class_dir, exist_ok=True)

    vid_name = os.path.basename(video_path).rsplit(".", 1)[0]
    save_path = os.path.join(class_dir, f"{vid_name}.npy")

    if os.path.exists(save_path):
        continue

    if not os.path.exists(video_path):
        print("Missing:", video_path)
        continue

    try:
        processed_data = process_video_pipeline(video_path)

        if processed_data is not None:
            np.save(save_path, processed_data)

    except Exception as e:
        print(f"Error: {video_path} -> {e}")

print("Finished")
