In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.21-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting numpy<2 (from mediapipe)
  Downloading numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.3-py3-none-any.whl.metadata (1.6 kB)
INFO: pip is looking at multiple versions of jax to determine which version is compatible with other requirements. This could take a while.
Collecting jax (from mediapipe)
  Downloading jax-0.8.1-py3-none-any.whl.metadata (13 kB)
Collecting jaxlib (from mediapipe)
  Downloading jaxlib-0.8.1-cp312-cp312-manylinux_2_27_x86_64.whl.metadata (1.3 kB)
Collecting jax (from mediapipe)
  Do

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp
from tqdm import tqdm  # For progress bar
from google.colab import drive

# ==========================================
# 1. Configuration & Drive Setup
# ==========================================

# Mount Drive (if not already mounted)
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

# --- CONFIGURATION ---
CSV_PATH = '/content/part_2.csv'   # Path to your metadata CSV
OUTPUT_DIR = '/content/drive/MyDrive/weasel/try_20Word ' # Where to save output
SEQUENCE_LENGTH = 50             # Fixed frame count
# انسخ القائمة دي وحطها مكان القائمة القديمة في كود V3
TARGET_CLASSES = [
    'erase', 'shave', 'catch', 'drown', 'envelope',
    'cool', 'cry', 'pineapple', 'follow', 'pop',
    'banana', 'sandwich', 'jacket', 'strawberry', 'cloud',
    'fork', 'dog', 'necklace', 'handsome', 'bury'
] # The 5 selected classes

VIDEO_BASE_PATH = os.path.dirname(CSV_PATH) # This sets the base to the folder containing the CSV

# Create Output Directory
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# ==========================================
# 2. Advanced Preprocessing Logic (The Core)
# ==========================================

mp_holistic = mp.solutions.holistic

def normalize_hand(pts):
    """
    Normalizes hand landmarks relative to the wrist (point 0).
    Scale is determined by the distance between wrist and middle finger MCP (point 9).
    """
    ref = pts[0].copy() # Wrist
    scale = np.linalg.norm(pts[9] - ref)
    if scale < 1e-6: scale = 1.0
    return (pts - ref) / scale

def compute_torso_stats(pose_landmarks):
    """
    Computes the torso center and scale (shoulder width or hip width)
    to make the data invariant to camera distance and user position.
    """
    torso_center = np.array([0.5, 0.5], dtype=np.float32)
    torso_scale = 1.0

    try:
        ps = pose_landmarks
        def get_xy(idx):
            lm = ps.landmark[idx]
            return np.array([lm.x, lm.y], dtype=np.float32)

        left_sh, right_sh = get_xy(11), get_xy(12)
        left_hip, right_hip = get_xy(23), get_xy(24)

        # Calculate Center
        shoulder_center = (left_sh + right_sh) / 2.0
        hip_center = (left_hip + right_hip) / 2.0
        torso_center = (shoulder_center + hip_center) / 2.0

        # Calculate Scale
        shoulder_dist = np.linalg.norm(left_sh - right_sh)
        hip_dist = np.linalg.norm(left_hip - right_hip)
        torso_scale = max(shoulder_dist, hip_dist, 1e-6)
    except:
        pass

    return torso_center, float(torso_scale)

def extract_features_from_frame(results):
    """
    Extracts 198 features:
    - Pose: 33 points (x, y) normalized by torso.
    - Hands: 21 points (x, y, z) normalized by wrist + wrist relative pos.
    """
    feat = np.zeros(198, dtype=np.float32)

    # --- 1. POSE ---
    torso_center = np.array([0.5, 0.5], dtype=np.float32)
    torso_scale = 1.0

    if results.pose_landmarks:
        torso_center, torso_scale = compute_torso_stats(results.pose_landmarks)
        pose_xy = np.array([[lm.x, lm.y] for lm in results.pose_landmarks.landmark], dtype=np.float32)
        # Normalize: (Point - Center) / Scale
        pose_norm = (pose_xy - torso_center[None, :]) / torso_scale
        feat[0:66] = pose_norm.flatten()

    # --- 2. LEFT HAND ---
    start_idx = 66
    if results.left_hand_landmarks:
        l_pts = np.array([[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark], dtype=np.float32)
        # Shape features (63)
        feat[start_idx : start_idx+63] = normalize_hand(l_pts)[:, :3].flatten()
        # Relative Wrist Position (3)
        wrist = l_pts[0]
        wrist_rel = np.array([
            (wrist[0] - torso_center[0]) / torso_scale,
            (wrist[1] - torso_center[1]) / torso_scale,
            wrist[2] / max(torso_scale, 1e-6)
        ], dtype=np.float32)
        feat[start_idx+63 : start_idx+66] = wrist_rel

    # --- 3. RIGHT HAND ---
    start_idx += 66
    if results.right_hand_landmarks:
        r_pts = np.array([[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark], dtype=np.float32)
        # Shape features (63)
        feat[start_idx : start_idx+63] = normalize_hand(r_pts)[:, :3].flatten()
        # Relative Wrist Position (3)
        wrist = r_pts[0]
        wrist_rel = np.array([
            (wrist[0] - torso_center[0]) / torso_scale,
            (wrist[1] - torso_center[1]) / torso_scale,
            wrist[2] / max(torso_scale, 1e-6)
        ], dtype=np.float32)
        feat[start_idx+63 : start_idx+66] = wrist_rel

    return feat

def process_video_pipeline(video_path):
    """
    Full pipeline: Read Video -> MediaPipe -> Normalize -> Interpolate -> Pad
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None

    frames_buffer = []

    with mp_holistic.Holistic(static_image_mode=False, min_detection_confidence=0.5) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret: break

            # Convert to RGB
            img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = holistic.process(img_rgb)

            # Extract
            features = extract_features_from_frame(results)

            # Simple check: if frame is empty (all zeros), we might mark it as None to interpolate later
            # For simplicity here, we append whatever we got.
            # Ideally, you check if sum() == 0 to handle dropouts.
            if np.sum(np.abs(features)) < 1e-6:
                frames_buffer.append(None)
            else:
                frames_buffer.append(features)

    cap.release()

    if not frames_buffer:
        return None

    # --- Interpolation (Filling missing frames) ---
    # Convert list to array logic manually or via pandas
    # Simple forward/backward fill logic:
    for i in range(len(frames_buffer)):
        if frames_buffer[i] is None:
            # Find previous valid
            prev_valid = next((frames_buffer[j] for j in range(i-1, -1, -1) if frames_buffer[j] is not None), None)
            # Find next valid
            next_valid = next((frames_buffer[j] for j in range(i+1, len(frames_buffer)) if frames_buffer[j] is not None), None)

            if prev_valid is not None and next_valid is not None:
                frames_buffer[i] = (prev_valid + next_valid) / 2.0
            elif prev_valid is not None:
                frames_buffer[i] = prev_valid
            elif next_valid is not None:
                frames_buffer[i] = next_valid
            else:
                frames_buffer[i] = np.zeros(198, dtype=np.float32)

    # Convert to Numpy
    data_array = np.array(frames_buffer, dtype=np.float32)

    # --- Resampling / Padding to 50 Frames ---
    current_len = len(data_array)
    if current_len == SEQUENCE_LENGTH:
        final_data = data_array
    elif current_len < SEQUENCE_LENGTH:
        # Pad with zeros at the end
        padding = np.zeros((SEQUENCE_LENGTH - current_len, 198), dtype=np.float32)
        final_data = np.vstack([data_array, padding])
    else:
        # Uniform Sampling (downsample)
        indices = np.linspace(0, current_len - 1, SEQUENCE_LENGTH, dtype=int)
        final_data = data_array[indices]

    return final_data

# ==========================================
# 3. Main Execution Loop (With Checkpointing)
# ==========================================

def run_processing():
    # 1. Load Metadata
    df = pd.read_csv(CSV_PATH)

    # 2. Filter for Target Classes
    # Assuming the column name is 'word' based on previous context
    subset_df = df[df['word'].isin(TARGET_CLASSES)].copy()

    print(f"Total videos to process: {len(subset_df)}")
    print(f"Classes: {TARGET_CLASSES}")

    # 3. Iterate and Process
    # We use tqdm for a progress bar
    for idx, row in tqdm(subset_df.iterrows(), total=len(subset_df)):

        word = row['word']
        # --- MODIFICATION START ---
        # Construct the full video path using the VIDEO_BASE_PATH
        video_path = os.path.join(VIDEO_BASE_PATH, row['full_path'])

        # Print the first video path to help debug if needed
        if idx == 0:
            print(f"\nSample video path: {video_path}")
        # --- MODIFICATION END ---

        # Prepare Output Path
        class_dir = os.path.join(OUTPUT_DIR, word)
        if not os.path.exists(class_dir):
            os.makedirs(class_dir)

        # Create a unique filename (using video name or index)
        # Assuming there is a 'video_name' or unique ID column. If not, use index.
        vid_name = os.path.basename(video_path).split('.')[0]
        save_path = os.path.join(class_dir, f"{vid_name}.npy")

        # --- CHECKPOINTING ---
        # If file exists, skip it (Resume capability)
        if os.path.exists(save_path):
            continue

        # Process
        try:
            processed_data = process_video_pipeline(video_path)

            if processed_data is not None:
                np.save(save_path, processed_data)
            else:
                print(f"Warning: Could not process or found no frames for {video_path}")
        except Exception as e:
            print(f"Error processing {video_path}: {e}")

    print("\n✅ Processing Complete!")
    print(f"Data saved to: {OUTPUT_DIR}")

# Run the script
if __name__ == "__main__":
    run_processing()

Total videos to process: 744
Classes: ['erase', 'shave', 'catch', 'drown', 'envelope', 'cool', 'cry', 'pineapple', 'follow', 'pop', 'banana', 'sandwich', 'jacket', 'strawberry', 'cloud', 'fork', 'dog', 'necklace', 'handsome', 'bury']


  0%|          | 0/744 [00:00<?, ?it/s]


Sample video path: /content/drive/MyDrive/weasel/ASL-Project/Data/dataset/row_data(videos)/part_10/pineapple_20241119_172633.mp4


  0%|          | 1/744 [00:56<11:42:50, 56.76s/it]



 20%|█▉        | 147/744 [22:40<3:22:38, 20.37s/it]



 60%|██████    | 448/744 [1:05:42<1:45:06, 21.31s/it]



100%|██████████| 744/744 [1:49:25<00:00,  8.82s/it]


✅ Processing Complete!
Data saved to: /content/drive/MyDrive/weasel/try_20Word 



