Preprocessing

In [1]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (36.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m36.1/36.1 MB[0m [31m52.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.18 sounddevice-0.5.1


In [None]:
import cv2
import mediapipe as mp
import os
import numpy as np
import json
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# Mediapipe model and utilities
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [5]:
# Function to detect and extract landmarks using Mediapipe
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

  # Function to extract keypoints from Mediapipe results
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33 * 4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468 * 3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21 * 3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21 * 3)
    return np.concatenate([pose, face, lh, rh])

In [6]:
# Load metadata from JSON
metadata = {}
with open('/content/drive/MyDrive/Colab Notebooks/AAI-521/Final Project/Models/WLASL_v0.3.json', 'r') as file:
    metadata = json.load(file)

labelMap = {}
for i in metadata:
    label = i['gloss']
    for instance in i['instances']:
        video_id = int(instance['video_id'])
        frame_start = instance['frame_start']
        frame_end = instance['frame_end']
        fps = instance['fps']
        labelMap[video_id] = [label, frame_start, frame_end, fps]

In [12]:
# Create directory for saving processed data
DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/AAI-521/Final Project/Models/MediaPipe_processed'
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

# Iterate through videos in the dataset folder
video_path = '/content/drive/MyDrive/Colab Notebooks/AAI-521/Final Project/DataSet/videos'

In [None]:
# Function to process a single video
def process_video(video):
    video_id = int(os.path.splitext(video)[0])
    if video_id not in labelMap:
        return

    label, start_frame, end_frame, fps = labelMap[video_id]

    # Open video file
    cap = cv2.VideoCapture(os.path.join(video_path, video))
    cap.set(cv2.CAP_PROP_FPS, fps)

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        # Create directory for the action (label)
        action_path = os.path.join(DATA_PATH, label)
        if not os.path.exists(action_path):
            os.makedirs(action_path)

        keypoints_data = []
        frame_count = 0
        while cap.isOpened():
            success, image = cap.read()
            if not success:
                break
            frame_count += 1

            # Skip frames outside the valid range
            if frame_count < start_frame or (end_frame != -1 and frame_count > end_frame):
                continue

            # Process frame and extract keypoints
            image, results = mediapipe_detection(image, holistic)
            keypoints = extract_keypoints(results)
            keypoints_data.append(keypoints)

        # Save keypoints for the entire video
        video_path_save = os.path.join(action_path, f'{video_id}_keypoints.npy')
        np.save(video_path_save, np.array(keypoints_data))

    cap.release()

# List all video files
video_files = [video for video in os.listdir(video_path) if video.endswith('.mp4')]

# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    list(tqdm(executor.map(process_video, video_files), total=len(video_files), desc="Processing Videos"))


Processing Videos: 100%|██████████| 11980/11980 [2:59:51<00:00,  1.11it/s]
