<a href="https://colab.research.google.com/github/JValdez777/asl-translator-notebook/blob/main/aiASLProject_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install keras mediapipe-model-maker
!pip install tensorflow-addons

Collecting mediapipe-model-maker
  Downloading mediapipe_model_maker-0.2.1.4-py3-none-any.whl.metadata (1.7 kB)
Collecting mediapipe>=0.10.0 (from mediapipe-model-maker)
  Downloading mediapipe-0.10.21-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting tensorflow<2.16,>=2.10 (from mediapipe-model-maker)
  Downloading tensorflow-2.15.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting tensorflow-addons (from mediapipe-model-maker)
  Downloading tensorflow_addons-0.23.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting tensorflow-model-optimization<0.8.0 (from mediapipe-model-maker)
  Downloading tensorflow_model_optimization-0.7.5-py2.py3-none-any.whl.metadata (914 bytes)
Collecting tf-models-official<2.16.0,>=2.13.2 (from mediapipe-model-maker)
  Downloading tf_models_official-2.15.0-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting numpy (from keras)
  Downloading numpy-1.26.4-cp311-cp311-manyl

In [None]:
import sys
sys.setrecursionlimit(100000) # Increase to a suitable value
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras import layers, models, optimizers, callbacks, regularizers
import os
import numpy as np
import json
import shutil
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from collections import Counter
from mediapipe.python.solutions.pose import Pose
from mediapipe.python.solutions.pose import POSE_CONNECTIONS
from mediapipe.python.solutions.drawing_utils import draw_landmarks
import cv2

from google.colab import drive
drive.mount('/content/drive')

ModuleNotFoundError: No module named 'tensorflow_addons'

In [None]:
# ============================
# Customizable Settings
# ============================
ASL_DIR = '/content/drive/MyDrive/ASL_Project/ASL_Alpha'
MODEL_DIR = '/content/drive/MyDrive/ASL_Project/ASL_Model2'
config_path = os.path.join(MODEL_DIR, 'model_config.json')
delete_previous_model = False  # Option to delete previous model

# Model parameters
set_num_epochs = 200  # Lower; better generalization with early stopping
set_batch_size = 16   # Smaller batches can help sequence models
set_patience = 20
set_patience_sensitivity = 0.0005
set_min_lr = 1e-6
set_factor = 0.5

# Architecture parameters
set_lstm_units = 128            # More capacity to learn from sequences
set_dense_units = 64            # Better compression of LSTM output
set_dropout_rate = 0.3          # Moderate regularization
set_bidirectional = True
set_learning_rate = 1e-4        # Keep this low for stability
set_num_lstm_layers = 2         # More temporal abstraction
set_num_dense_layers = 1
set_activation = 'relu'
set_regularizer_strength = 0.0001  # L2 regularization, reduced
set_pose_noise_std = 0.01  # Add a tiny bit of pose jittering

# Preprocessing parameters
target_size = (64, 64)
channels = 3

In [None]:
# ============================
# Initialization & Setup
# ============================
# Handle model directory
os.makedirs(MODEL_DIR, exist_ok=True)
if delete_previous_model and os.path.exists(MODEL_DIR):
    shutil.rmtree(MODEL_DIR)
    os.makedirs(MODEL_DIR)

# Function to extract pose keypoints from an image
def extract_pose_keypoints(image, pose_model):
    results = pose_model.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    if results.pose_landmarks:
        keypoints = []
        for lm in results.pose_landmarks.landmark:
            keypoints.extend([lm.x, lm.y, lm.z, lm.visibility])
        return keypoints
    else:
        # Return all zeros if no pose is detected
        return [0.0] * (33 * 4)

# ============================
# Calculate max_frames
# ============================
def calculate_max_frames(directory):
    max_frames = 0
    for sign in tqdm(os.listdir(directory), desc="Signs"):
        sign_dir = os.path.join(directory, sign)
        if os.path.isdir(sign_dir):
            for video in os.listdir(sign_dir):
                video_path = os.path.join(sign_dir, video)
                if os.path.isdir(video_path):
                    try:
                        frame_count = len([f for f in os.listdir(video_path) if f.endswith('.jpg')])
                        if frame_count > max_frames:
                            max_frames = frame_count
                    except Exception as e:
                        print(f"Error accessing {video_path}: {e}")
    return max_frames

max_frames = calculate_max_frames(ASL_DIR)
print(f"Calculated maximum frames across all videos: {max_frames}")

# ============================
# Data Augmentation for Pose Data
# ============================
def augment_pose_sequence(pose_data, noise_std=set_pose_noise_std):
    """Augment pose sequence data by adding noise and transformations."""
    augmented_pose = pose_data.copy()

    # Adding noise to pose data (random jitter)
    noise = np.random.normal(0, noise_std, augmented_pose.shape)
    augmented_pose += noise

    # Optionally, add more augmentations like scaling, flipping, or rotating here if needed

    return augmented_pose

def augment_video_pose(video_dir, augmentations_needed=1):
    """Create augmented copies of a video by applying random transformations."""
    pose_file = os.path.join(video_dir, 'processed_poses.npy')
    if not os.path.exists(pose_file):
        print(f"Pose data not found for {video_dir}")
        return []

    pose_data = np.load(pose_file)
    augmented_dirs = []

    parent_dir = os.path.dirname(video_dir)
    base_name = os.path.basename(video_dir)

    augmented_index = 0
    for _ in range(augmentations_needed):
        new_video_name = f'augmentedposes_{augmented_index}'
        new_video_path = os.path.join(parent_dir, new_video_name)
        os.makedirs(new_video_path, exist_ok=True)

        augmented_pose_data = augment_pose_sequence(pose_data)
        np.save(os.path.join(new_video_path, 'processed_poses.npy'), augmented_pose_data)

        augmented_dirs.append(new_video_path)
        augmented_index += 1

    return augmented_dirs

# ============================
# Preprocessing Generator for Pose Data
# ============================
class PoseDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, video_paths, labels, label_to_index, batch_size):
        self.video_paths = video_paths
        self.labels = labels
        self.label_to_index = label_to_index
        self.batch_size = batch_size
        self.indices = np.arange(len(self.video_paths))

    def __len__(self):
        return int(np.ceil(len(self.video_paths) / self.batch_size))

    def __getitem__(self, idx):
        batch_indices = self.indices[idx*self.batch_size:(idx+1)*self.batch_size]
        batch_videos = []
        batch_labels = []

        for i in batch_indices:
            video_dir = self.video_paths[i]
            processed_path = os.path.join(video_dir, 'pose.npy')

            if not os.path.exists(processed_path):
                print(f"Skipping: {processed_path} not found")
                continue

            video_data = np.load(processed_path)

            # Pad/truncate to max_frames
            if video_data.shape[0] < max_frames:
                pad_width = max_frames - video_data.shape[0]
                video_data = np.pad(video_data, ((0, pad_width), (0, 0)), mode='constant')
            else:
                video_data = video_data[:max_frames]

            batch_videos.append(video_data)
            batch_labels.append(self.label_to_index[self.labels[i]])

        if not batch_videos:  # Skip if no valid data
            return np.zeros((0, max_frames, video_data.shape[1])), np.zeros((0,))

        return np.array(batch_videos), np.array(batch_labels)


    def on_epoch_end(self):
        np.random.shuffle(self.indices)

# ============================
# Collect all video paths and labels
# ============================
video_paths = []
labels = []
for sign in tqdm(os.listdir(ASL_DIR), desc="Signs"):
    sign_dir = os.path.join(ASL_DIR, sign)
    if os.path.isdir(sign_dir):
        videos = [v for v in os.listdir(sign_dir) if os.path.isdir(os.path.join(sign_dir, v))]

        # Handle signs with only 1 video
        if len(videos) == 1:
            video_path = os.path.join(sign_dir, videos[0])
            print(f"Augmenting single video for sign: {sign}")
            augmented_dirs = augment_video_pose(video_path, augmentations_needed=10)
            videos += [os.path.basename(d) for d in augmented_dirs]

        # Add all videos (original + augmented) to dataset
        for video in videos:
            video_paths.append(os.path.join(sign_dir, video))
            labels.append(sign)

# Create label mapping
label_to_index = {label: idx for idx, label in enumerate(sorted(set(labels)))}
num_classes = len(label_to_index)

print(f"Total signs: {num_classes}")
print(f"Total video samples: {len(video_paths)}")

# Split data with stratification
train_paths, test_paths, train_labels, test_labels = train_test_split(
    video_paths, labels, test_size=0.2, stratify=labels)
train_paths, val_paths, train_labels, val_labels = train_test_split(
    train_paths, train_labels, test_size=0.2, stratify=train_labels)

# Create generators
train_gen = PoseDataGenerator(train_paths, train_labels, label_to_index, set_batch_size)
val_gen = PoseDataGenerator(val_paths, val_labels, label_to_index, set_batch_size)

# ============================
# Model Architecture
# ============================
def build_model(num_classes):
    model = models.Sequential()

    # First LSTM layer needs input shape
    if set_bidirectional:
        model.add(layers.Bidirectional(layers.LSTM(set_lstm_units, return_sequences=True),
                                       input_shape=(max_frames, 132)))
    else:
        model.add(layers.LSTM(set_lstm_units, return_sequences=True, input_shape=(max_frames, 132)))

    for _ in range(set_num_lstm_layers - 1):
        if set_bidirectional:
            model.add(layers.Bidirectional(layers.LSTM(set_lstm_units, return_sequences=True)))
        else:
            model.add(layers.LSTM(set_lstm_units, return_sequences=True))

    model.add(layers.LSTM(set_lstm_units))

    for _ in range(set_num_dense_layers):
        model.add(layers.Dense(set_dense_units, activation=set_activation))
        model.add(layers.Dropout(set_dropout_rate))

    model.add(layers.Dense(num_classes, activation='softmax'))

    optimizer = optimizers.Adam(learning_rate=set_learning_rate)
    model.compile(optimizer=optimizer,
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model


Signs: 100%|██████████| 26/26 [00:00<00:00, 67.53it/s]


Calculated maximum frames across all videos: 54


Signs: 100%|██████████| 26/26 [00:00<00:00, 407.55it/s]

Total signs: 26
Total video samples: 286





In [None]:
# Loop through the dataset structure
with Pose(static_image_mode=True, min_detection_confidence=0.5) as pose:
    for word in sorted(os.listdir(ASL_DIR)):
        word_path = os.path.join(ASL_DIR, word)
        if not os.path.isdir(word_path):
            continue

        print(f"Processing: {word}")
        for sequence_folder in tqdm(sorted(os.listdir(word_path))):
            seq_path = os.path.join(word_path, sequence_folder)
            if not os.path.isdir(seq_path):
                continue

            # Skip if pose.npy already exists
            out_path = os.path.join(seq_path, 'pose.npy')
            if os.path.exists(out_path):
                continue

            frames = sorted([f for f in os.listdir(seq_path) if f.endswith('.jpg')])
            if not frames:
                print(f"Warning: No frames in {seq_path}")
                continue

            pose_sequence = []
            for frame_name in frames:
                frame_path = os.path.join(seq_path, frame_name)
                image = cv2.imread(frame_path)
                if image is None:
                    continue
                keypoints = extract_pose_keypoints(image, pose)
                pose_sequence.append(keypoints)

            pose_sequence = np.array(pose_sequence)
            np.save(out_path, pose_sequence)

# ============================
# Callbacks & Training
# ============================
callbacks = [
    callbacks.EarlyStopping(monitor='val_loss', patience=set_patience,
                          min_delta=set_patience_sensitivity, restore_best_weights=True),
    callbacks.ReduceLROnPlateau(monitor='val_loss', factor=set_factor,
                              patience=set_patience//2, min_lr=set_min_lr),
    callbacks.ModelCheckpoint(os.path.join(MODEL_DIR, 'best_model.keras'),
                            save_best_only=False)
]

model_path = os.path.join(MODEL_DIR, 'final_model.keras')
if os.path.exists(model_path):
    print("Loading existing model...")
    model = tf.keras.models.load_model(model_path)
else:
    print("Building new model...")
    model = build_model(num_classes)

history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=set_num_epochs,
    callbacks=callbacks
)

# ============================
# Save Final Model
# ============================
model.save(os.path.join(MODEL_DIR, 'final_model.h5', ))

Processing: A


100%|██████████| 11/11 [00:00<00:00, 1537.50it/s]


Processing: B


100%|██████████| 11/11 [00:00<00:00, 1636.48it/s]


Processing: C


100%|██████████| 11/11 [00:00<00:00, 2060.53it/s]


Processing: D


100%|██████████| 11/11 [00:00<00:00, 2107.98it/s]


Processing: E


100%|██████████| 11/11 [00:00<00:00, 1779.37it/s]


Processing: F


100%|██████████| 11/11 [00:00<00:00, 3304.97it/s]


Processing: G


100%|██████████| 11/11 [00:00<00:00, 1943.12it/s]


Processing: H


100%|██████████| 11/11 [00:00<00:00, 3210.45it/s]


Processing: I


100%|██████████| 11/11 [00:00<00:00, 2982.57it/s]


Processing: J


100%|██████████| 11/11 [00:00<00:00, 2894.98it/s]


Processing: K


100%|██████████| 11/11 [00:00<00:00, 880.16it/s]


Processing: L


100%|██████████| 11/11 [00:00<00:00, 2666.90it/s]


Processing: M


100%|██████████| 11/11 [00:00<00:00, 1239.48it/s]


Processing: N


100%|██████████| 11/11 [00:00<00:00, 1586.84it/s]


Processing: O


100%|██████████| 11/11 [00:00<00:00, 1018.62it/s]


Processing: P


100%|██████████| 11/11 [00:00<00:00, 1407.48it/s]


Processing: Q


100%|██████████| 11/11 [00:00<00:00, 1830.56it/s]


Processing: R


100%|██████████| 11/11 [00:00<00:00, 1900.22it/s]


Processing: S


100%|██████████| 11/11 [00:00<00:00, 2044.46it/s]


Processing: T


100%|██████████| 11/11 [00:00<00:00, 1360.66it/s]


Processing: U


100%|██████████| 11/11 [00:00<00:00, 1577.40it/s]


Processing: V


100%|██████████| 11/11 [00:00<00:00, 1427.52it/s]


Processing: W


100%|██████████| 11/11 [00:00<00:00, 1455.39it/s]


Processing: X


100%|██████████| 11/11 [00:00<00:00, 1915.37it/s]


Processing: Y


100%|██████████| 11/11 [00:00<00:00, 1310.05it/s]


Processing: Z


100%|██████████| 11/11 [00:00<00:00, 1101.37it/s]


Loading existing model...
Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200


**Below is misc. and should not be run normally**

In [None]:
# ============================
# Save Model in HDF5 Format (without time_major issue)
# ============================
model_save_path = os.path.join(MODEL_DIR, 'best_model_fixed.h5')

# Remove conflicting time_major attribute before saving
for layer in model.layers:
    if isinstance(layer, tf.keras.layers.LSTM):
        layer.time_major = False  # Explicitly set time_major to False to prevent issues

# Save the model
model.save('best_model_fixed.h5', include_optimizer=False)

print(f"Model successfully saved in HDF5 format: {model_save_path}")

Model successfully saved in HDF5 format: /content/drive/MyDrive/ASL_Project/ASL_Model2/best_model_fixed.h5


In [None]:
# ================
# Data Augmentation
# ================
def augment_video(video_dir, augmentations_needed=1):
    """Create augmented copies of a video by applying random transformations"""
    frame_files = sorted([f for f in os.listdir(video_dir) if f.endswith('.jpg')])
    if not frame_files:
        return []

    parent_dir = os.path.dirname(video_dir)
    base_name = os.path.basename(video_dir)
    existing_video_dirs = [d for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d))]

    # Start numbering augmented videos
    augmented_index = 0

    augmented_dirs = []
    for i in range(augmentations_needed):
        new_video_name = f'augmented_{augmented_index}'
        new_video_path = os.path.join(parent_dir, new_video_name)
        os.makedirs(new_video_path, exist_ok=True)

        for frame_file in frame_files:
            img = tf.io.read_file(os.path.join(video_dir, frame_file))
            img = tf.image.decode_jpeg(img, channels=channels)

            # Apply random augmentations
            img = tf.image.random_brightness(img, max_delta=0.2)
            img = tf.image.random_contrast(img, lower=0.8, upper=1.2)
            img = tf.image.random_hue(img, max_delta=0.1)
            img = tf.image.random_saturation(img, lower=0.8, upper=1.2)

            # Add slight rotation
            angle = tf.random.uniform([], -0.1, 0.1)
            img = tfa.image.rotate(img, angle)

            frame_num = frame_file.split('_')[1].split('.')[0]
            tf.io.write_file(
                os.path.join(new_video_path, f'frame_{frame_num}.jpg'),
                tf.image.encode_jpeg(tf.cast(img * 255, tf.uint8)))

        augmented_dirs.append(new_video_path)
        augmented_index += 1

    return augmented_dirs

# Collect all video paths and labels
video_paths = []
labels = []
for sign in tqdm(os.listdir(ASL_DIR), desc="Signs"):
    sign_dir = os.path.join(ASL_DIR, sign)
    if os.path.isdir(sign_dir):
        videos = [v for v in os.listdir(sign_dir) if os.path.isdir(os.path.join(sign_dir, v))]

        # Handle signs with only 1 video
        if len(videos) == 1:
            video_path = os.path.join(sign_dir, videos[0])
            print(f"Augmenting single video for sign: {sign}")
            augmented_dirs = augment_video(video_path, augmentations_needed=10)
            videos += [os.path.basename(d) for d in augmented_dirs]

Signs: 100%|██████████| 26/26 [00:00<00:00, 393.00it/s]


In [None]:
import os

root = '/content/drive/MyDrive/ASL_Project/ASL_Alpha'
for word in os.listdir(root):
    word_path = os.path.join(root, word)
    if not os.path.isdir(word_path): continue
    for video in os.listdir(word_path):
        video_path = os.path.join(word_path, video)
        if not os.path.isdir(video_path): continue
        src = os.path.join(video_path, 'processed.npy')
        dst = os.path.join(video_path, 'processed_poses.npy')
        if os.path.exists(src) and not os.path.exists(dst):
            os.rename(src, dst)

In [None]:
print(tf.__version__)

2.15.1
