In [None]:
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU name: {torch.cuda.get_device_name(0)}")
    print(f"GPU capability: {torch.cuda.get_device_capability(0)}")

In [None]:
%pip install av opencv-python numpy tqdm

In [3]:
import os
import shutil
import random
import cv2
import numpy as np
from moviepy.editor import VideoFileClip, vfx
from tqdm import tqdm
import mediapipe as mp

In [4]:
# --- 0. Configuration & Setup ---
RAW_VIDEO_DATASET_PATH = "SignLanguageDataset"  # <--- UPDATE THIS PATH
BASE_OUTPUT_PATH = "SignLanguage_Processed_Data" # <--- UPDATE THIS PATH (if desired)

SPLIT_DATA_PATH = os.path.join(BASE_OUTPUT_PATH, "video_splits")
AUGMENTED_TRAIN_VIDEOS_PATH = os.path.join(BASE_OUTPUT_PATH, "augmented_train_videos")
KEYPOINTS_OUTPUT_PATH = os.path.join(BASE_OUTPUT_PATH, "keypoints_for_model")

PREFERRED_GPU_ENCODER = 'h264_nvenc' # TRY NVIDIA ENCODER or set to "" for CPU only
CPU_FALLBACK_ENCODER = 'libx264'
AUDIO_CODEC = 'aac'

os.makedirs(BASE_OUTPUT_PATH, exist_ok=True)
os.makedirs(SPLIT_DATA_PATH, exist_ok=True)
os.makedirs(AUGMENTED_TRAIN_VIDEOS_PATH, exist_ok=True)
os.makedirs(KEYPOINTS_OUTPUT_PATH, exist_ok=True)

N_FRAMES_KEYPOINTS = 30
MOVEMENT_THRESHOLD_KEYPOINTS = 0.02

In [5]:
# --- 1. Dataset Splitting ---
def split_video_dataset(
    raw_dataset_path, output_split_path, train_ratio=0.7, val_ratio=0.15,
    test_ratio=0.15, random_seed=42, move_files=False
):
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "Ratios must sum to 1"
    random.seed(random_seed)
    if not os.path.exists(raw_dataset_path):
        print(f"Error: Raw dataset path '{raw_dataset_path}' does not exist.")
        return False

    for split in ['train', 'val', 'test']:
        os.makedirs(os.path.join(output_split_path, split), exist_ok=True)

    print(f"Starting dataset split from '{raw_dataset_path}' into '{output_split_path}'...")
    for class_name in os.listdir(raw_dataset_path):
        class_dir = os.path.join(raw_dataset_path, class_name)
        if not os.path.isdir(class_dir): continue

        video_files = [f for f in os.listdir(class_dir) if os.path.isfile(os.path.join(class_dir, f)) and f.lower().endswith(('.mp4', '.avi', '.mov'))]
        if not video_files:
            print(f"Warning: No video files found in class '{class_name}'.")
            continue
        
        random.shuffle(video_files)
        n_total = len(video_files)
        n_train = int(n_total * train_ratio)
        n_val = int(n_total * val_ratio)
        n_test = n_total - n_train - n_val
        if n_test < 0: n_test = 0

        splits_data = {
            'train': video_files[:n_train],
            'val': video_files[n_train : n_train + n_val],
            'test': video_files[n_train + n_val :]
        }
        
        current_assigned_count = sum(len(v) for v in splits_data.values())
        if current_assigned_count < n_total:
            remaining_files = video_files[current_assigned_count:]
            if len(splits_data['test']) < n_test and test_ratio > 0:
                 splits_data['test'].extend(remaining_files)
            elif len(splits_data['val']) < n_val and val_ratio > 0:
                 splits_data['val'].extend(remaining_files)
            else:
                 splits_data['train'].extend(remaining_files)

        for split_name, files_in_split in splits_data.items():
            dest_class_dir = os.path.join(output_split_path, split_name, class_name)
            os.makedirs(dest_class_dir, exist_ok=True)
            for file_name in files_in_split:
                src_file = os.path.join(class_dir, file_name)
                dest_file = os.path.join(dest_class_dir, file_name)
                if move_files: shutil.move(src_file, dest_file)
                else: shutil.copy2(src_file, dest_file)
        
        print(f"Class '{class_name}' split: Train={len(splits_data['train'])}, Val={len(splits_data['val'])}, Test={len(splits_data['test'])}")
    print("✅ Dataset splitting complete.")
    return True


In [6]:
# --- 2. Video Augmentation ---
def rotate_video(clip, angle): return clip.fx(vfx.rotate, angle)
def zoom_video(clip, zoom_factor):
    w, h = clip.size
    new_w, new_h = int(w / zoom_factor), int(h / zoom_factor)
    x1, y1 = (w - new_w) // 2, (h - new_h) // 2
    return clip.fx(vfx.crop, x1=x1, y1=y1, width=new_w, height=new_h).resize(clip.size)
def flip_horizontal_video(clip): return clip.fx(vfx.mirror_x)
def add_gaussian_noise_vfx(clip, sigma=25):
    def apply_noise(gf, t):
        f = gf(t); n = np.random.normal(0, sigma, f.shape).astype(np.uint8); return cv2.add(f, n)
    return clip.fl(apply_noise)
def change_brightness_contrast_vfx(clip, bf=1.2, cf=1.2):
    def apply_bc(gf,t): f=gf(t); a=cf; b=(bf-1)*127; return cv2.convertScaleAbs(f,alpha=a,beta=b)
    return clip.fl(apply_bc)
def translate_video_vfx(clip, dx, dy):
    def apply_trans(gf,t): f=gf(t); M=np.float32([[1,0,dx],[0,1,dy]]); return cv2.warpAffine(f,M,(f.shape[1],f.shape[0]))
    return clip.fl(apply_trans)
def add_blur_vfx(clip, ks=(5,5)):
    def apply_blur(gf,t): return cv2.GaussianBlur(gf(t),ks,0)
    return clip.fl(apply_blur)
def color_jitter_video(clip, br=0.2, co=0.2, sa=0.2, hu=0.1):
    def apply_cj(gf,t):
        f=gf(t); hsv=cv2.cvtColor(f,cv2.COLOR_RGB2HSV); h,s,v_=cv2.split(hsv)
        s=np.clip(s*(1+random.uniform(-sa,sa)),0,255).astype(np.uint8)
        h=np.clip(h+random.uniform(-hu*180,hu*180),0,179).astype(np.uint8)
        fm=cv2.cvtColor(cv2.merge([h,s,v_]),cv2.COLOR_HSV2RGB)
        a=1.0+random.uniform(-co,co); b_v=random.uniform(-br,br)*255
        return cv2.convertScaleAbs(fm,alpha=a,beta=b_v)
    return clip.fl(apply_cj)

AUGMENTATIONS = {
    "rotation": lambda clip: rotate_video(clip, random.uniform(-10, 10)),
    "zoom": lambda clip: zoom_video(clip, random.uniform(1.05, 1.2)),
    # "flip_horizontal": lambda clip: flip_horizontal_video(clip), # Often not useful for SL
    "gaussian_noise": lambda clip: add_gaussian_noise_vfx(clip, sigma=random.uniform(10, 20)), # Reduced sigma
    "brightness_contrast": lambda clip: change_brightness_contrast_vfx(clip, bf=random.uniform(0.9, 1.1), cf=random.uniform(0.9, 1.1)), # Reduced factors
    "translation": lambda clip: translate_video_vfx(clip, dx=random.randint(-10, 10), dy=random.randint(-10, 10)), # Reduced translation
    "blur": lambda clip: add_blur_vfx(clip, ks=(random.choice([3, 5]), random.choice([3, 5]))),
    "color_jitter": lambda clip: color_jitter_video(clip, br=0.1, co=0.1, sa=0.1, hu=0.05), # Reduced jitter
}

def augment_video_set(train_video_input_path, augmented_output_path):
    os.makedirs(augmented_output_path, exist_ok=True)
    print(f"Starting video augmentation for videos in: {train_video_input_path}")
    total_processed, total_augmented = 0, 0
    for class_name in os.listdir(train_video_input_path):
        class_path = os.path.join(train_video_input_path, class_name)
        if not os.path.isdir(class_path): continue
        output_class_path = os.path.join(augmented_output_path, class_name)
        os.makedirs(output_class_path, exist_ok=True)
        print(f"\nAugmenting class: {class_name}")
        video_files = [f for f in os.listdir(class_path) if f.lower().endswith(('.mp4', '.avi', '.mov'))]
        for video_file in tqdm(video_files, desc=f"Videos in {class_name}"):
            video_path = os.path.join(class_path, video_file); base_name, ext = os.path.splitext(video_file)
            output_original_path = os.path.join(output_class_path, video_file)
            try: shutil.copy2(video_path, output_original_path); total_processed += 1
            except Exception as e: print(f"Error copying {video_file}: {e}"); continue
            for aug_name, aug_func in AUGMENTATIONS.items():
                output_filename = f"{base_name}_{aug_name}{ext}"
                output_aug_path = os.path.join(output_class_path, output_filename)
                if os.path.exists(output_aug_path): continue # Skip if already augmented
                try:
                    clip = VideoFileClip(video_path); augmented_clip = aug_func(clip.copy())
                    encoder_params = {'codec': CPU_FALLBACK_ENCODER, 'audio_codec': AUDIO_CODEC, 'logger': None, 'threads': 4, 'preset': 'ultrafast'}
                    if PREFERRED_GPU_ENCODER:
                        try:
                            gpu_params = encoder_params.copy(); gpu_params['codec'] = PREFERRED_GPU_ENCODER; del gpu_params['preset']
                            augmented_clip.write_videofile(output_aug_path, **gpu_params)
                        except Exception: augmented_clip.write_videofile(output_aug_path, **encoder_params)
                    else: augmented_clip.write_videofile(output_aug_path, **encoder_params)
                    clip.close(); augmented_clip.close(); total_augmented += 1
                except Exception as e:
                    print(f"\nError: {aug_name} for {video_file}: {e}")
                    if os.path.exists(output_aug_path): os.remove(output_aug_path)
    print(f"\n--- Video Augmentation Complete ---\nOriginals: {total_processed}, Augmentations: {total_augmented}")


In [7]:
# --- 3. Keypoint Extraction ---
mp_holistic = mp.solutions.holistic
holistic_model = mp_holistic.Holistic(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

def extract_holistic_landmarks(results):
    def get_coords(lc, es):
        if lc and lc.landmark: return np.array([[lm.x,lm.y,lm.z] for lm in lc.landmark], dtype=np.float32)
        return np.zeros((es,3),dtype=np.float32)
    return np.concatenate([get_coords(results.pose_landmarks,33), get_coords(results.face_landmarks,468),
                           get_coords(results.left_hand_landmarks,21), get_coords(results.right_hand_landmarks,21)], axis=0)
def get_hand_center(hl): return np.array([hl.landmark[0].x,hl.landmark[0].y,hl.landmark[0].z],dtype=np.float32) if hl and hl.landmark else None
def hands_moved(pl,pr,cl,cr,th):
    if any(x is None for x in [pl,pr,cl,cr]): return False
    return (np.linalg.norm(cl-pl)>th or np.linalg.norm(cr-pr)>th)

def process_video_for_keypoints(video_path, n_frames=N_FRAMES_KEYPOINTS, movement_threshold=MOVEMENT_THRESHOLD_KEYPOINTS):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened(): print(f"Error: Vid {video_path}"); return np.zeros((n_frames,543,3),dtype=np.float32)
    seq_kps, m_start, plh, prh, f_cnt_mov, p_f_cnt = [], False, None, None, 0, 0
    while True:
        ret, frame = cap.read()
        if not ret: break
        p_f_cnt+=1; img_rgb=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB); results=holistic_model.process(img_rgb)
        clh,crh=get_hand_center(results.left_hand_landmarks),get_hand_center(results.right_hand_landmarks)
        if not m_start:
            if hands_moved(plh,prh,clh,crh,movement_threshold): m_start=True
            plh,prh=clh,crh; continue
        f_cnt_mov+=1
        if f_cnt_mov%2==0: seq_kps.append(extract_holistic_landmarks(results))
        if len(seq_kps)==n_frames: break
    cap.release()
    if not seq_kps:
        print(f"Warn: No kps for '{video_path}'. Frames: {p_f_cnt}.")
        return np.zeros((n_frames,543,3),dtype=np.float32)
    if len(seq_kps)<n_frames: seq_kps.extend([seq_kps[-1]]*(n_frames-len(seq_kps)))
    return np.array(seq_kps[:n_frames])

def extract_keypoints_from_video_set(video_set_input_path, keypoints_root_output_dir, split_name):
    print(f"\nKeypoint extraction for '{split_name}' set from: {video_set_input_path}")
    if not os.path.exists(video_set_input_path): print(f"Error: Path '{video_set_input_path}' DNE."); return
    for class_name in os.listdir(video_set_input_path):
        videos_in_class_path = os.path.join(video_set_input_path, class_name)
        if not os.path.isdir(videos_in_class_path): continue
        keypoints_out_class_split = os.path.join(keypoints_root_output_dir,class_name,split_name)
        os.makedirs(keypoints_out_class_split,exist_ok=True)
        print(f"Processing class: {class_name} ({split_name})")
        vfs = [f for f in os.listdir(videos_in_class_path) if f.lower().endswith(('.mp4','.avi','.mov'))]
        for vf in tqdm(vfs, desc=f"Keypoints {class_name} ({split_name})"):
            vfp=os.path.join(videos_in_class_path,vf); bn,_=os.path.splitext(vf)
            onfp=os.path.join(keypoints_out_class_split,f"{bn}.npy")
            if os.path.exists(onfp): continue
            np.save(onfp, process_video_for_keypoints(vfp))
    print(f"✅ Keypoint extraction for '{split_name}' set complete.")


In [None]:
# --- Main Execution ---
if __name__ == "__main__":
    # Step 1: Split
    print("--- STEP 1: Splitting Dataset ---")
    train_dir = os.path.join(SPLIT_DATA_PATH, "train")
    if not (os.path.exists(train_dir) and os.listdir(train_dir)):
        if not split_video_dataset(RAW_VIDEO_DATASET_PATH, SPLIT_DATA_PATH):
            print("Halting due to error in video dataset splitting."); exit()
    else: print(f"Video splits found at '{SPLIT_DATA_PATH}'. Skipping.")

    # Step 2: Augment
    print("\n--- STEP 2: Augmenting Training Videos ---")
    train_split_path_for_aug = os.path.join(SPLIT_DATA_PATH, "train")
    aug_train_dir_has_content = os.path.exists(AUGMENTED_TRAIN_VIDEOS_PATH) and any(os.scandir(AUGMENTED_TRAIN_VIDEOS_PATH))
    if not aug_train_dir_has_content :
         if os.path.exists(train_split_path_for_aug) and any(os.scandir(train_split_path_for_aug)):
            augment_video_set(train_split_path_for_aug, AUGMENTED_TRAIN_VIDEOS_PATH)
         else: print(f"Skipping augmentation: Training split path '{train_split_path_for_aug}' empty/DNE.")
    else: print(f"Augmented videos found at '{AUGMENTED_TRAIN_VIDEOS_PATH}'. Skipping.")

    # Step 3: Extract Keypoints
    print("\n--- STEP 3: Extracting Keypoints ---")
    kp_train_dir_example = os.path.join(KEYPOINTS_OUTPUT_PATH, (os.listdir(RAW_VIDEO_DATASET_PATH)[0] if os.path.exists(RAW_VIDEO_DATASET_PATH) and os.listdir(RAW_VIDEO_DATASET_PATH) else "dummy"), "train")
    
    keypoints_exist = os.path.exists(kp_train_dir_example) and any(os.scandir(kp_train_dir_example))
    
    if not keypoints_exist:
        source_for_train_kp = AUGMENTED_TRAIN_VIDEOS_PATH if (os.path.exists(AUGMENTED_TRAIN_VIDEOS_PATH) and any(os.scandir(AUGMENTED_TRAIN_VIDEOS_PATH))) else train_split_path_for_aug
        if os.path.exists(source_for_train_kp) and any(os.scandir(source_for_train_kp)):
            print(f"Extracting train keypoints from: {source_for_train_kp}")
            extract_keypoints_from_video_set(source_for_train_kp, KEYPOINTS_OUTPUT_PATH, "train")
        else: print(f"Source for train keypoints ('{source_for_train_kp}') empty/DNE.")

        val_split_path_for_kp = os.path.join(SPLIT_DATA_PATH, "val")
        if os.path.exists(val_split_path_for_kp) and any(os.scandir(val_split_path_for_kp)):
            extract_keypoints_from_video_set(val_split_path_for_kp, KEYPOINTS_OUTPUT_PATH, "val")
        else: print(f"Val split path '{val_split_path_for_kp}' empty/DNE for keypoints.")

        test_split_path_for_kp = os.path.join(SPLIT_DATA_PATH, "test")
        if os.path.exists(test_split_path_for_kp) and any(os.scandir(test_split_path_for_kp)):
            extract_keypoints_from_video_set(test_split_path_for_kp, KEYPOINTS_OUTPUT_PATH, "test")
        else: print(f"Test split path '{test_split_path_for_kp}' empty/DNE for keypoints.")
    else:
        print(f"Keypoints seem extracted in '{KEYPOINTS_OUTPUT_PATH}'. Skipping.")
        
    if 'holistic_model' in globals(): holistic_model.close() 
    print("\n--- ALL PREPROCESSING STEPS ATTEMPTED ---")