In [19]:
import os
import cv2
import numpy as np
import mediapipe as mp
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

In [3]:
# Initialize MediaPipe Holistic
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [33]:
def load_and_preprocess_image(image_path, target_size=(224, 224)):
    """Load and preprocess image for the model"""
    try:
        img = cv2.imread(image_path)
        if img is None:
            return None
        img = cv2.resize(img, target_size)
        # Convert to RGB and ensure type is uint8
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.uint8)
        return img
    except Exception as e:
        print(f"Error processing {image_path}: {str(e)}")
        return None

In [31]:
def extract_landmarks(image):
    """Extract hand landmarks using MediaPipe Holistic"""
    with mp_holistic.Holistic(static_image_mode=True, min_detection_confidence=0.5) as holistic:
        # Make sure image is uint8
        if image.dtype != np.uint8:
            image = (image * 255).astype(np.uint8)
            
        results = holistic.process(image)
        
        landmarks = []
        
        # Right hand landmarks
        if results.right_hand_landmarks:
            for landmark in results.right_hand_landmarks.landmark:
                landmarks.extend([landmark.x, landmark.y, landmark.z])
        else:
            landmarks.extend([0.0] * (21 * 3))
        
        # Left hand landmarks
        if results.left_hand_landmarks:
            for landmark in results.left_hand_landmarks.landmark:
                landmarks.extend([landmark.x, landmark.y, landmark.z])
        else:
            landmarks.extend([0.0] * (21 * 3))
        
        return np.array(landmarks, dtype=np.float32)

In [27]:
def process_single_image(args):
    """Process a single image and return its landmarks and label"""
    image_path, label = args
    image = load_and_preprocess_image(image_path)
    if image is None:
        return None
    
    landmarks = extract_landmarks(image)
    return landmarks, label

In [23]:
def prepare_dataset(data_dir, max_samples_per_class=1000, num_workers=16):
    """Prepare dataset by extracting landmarks and creating labels using parallel processing"""
    image_paths = []
    labels = []
    label_map = {}
    
    # First, collect all image paths and labels
    for idx, class_name in enumerate(sorted(os.listdir(data_dir))):
        if class_name.startswith('.'):  # Skip hidden files
            continue
        
        label_map[idx] = class_name
        class_path = os.path.join(data_dir, class_name)
        
        # Get list of all images in the class directory
        class_images = [f for f in os.listdir(class_path) 
                        if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        # Limit the number of samples per class
        if max_samples_per_class:
            class_images = class_images[:max_samples_per_class]
        
        for image_name in class_images:
            image_path = os.path.join(class_path, image_name)
            image_paths.append(image_path)
            labels.append(idx)
    
    print(f"Processing {len(image_paths)} images across {len(label_map)} classes...")
    
    # Prepare arguments for parallel processing
    process_args = list(zip(image_paths, labels))
    
    X = []
    y = []
    
    # Process images in parallel
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(process_single_image, arg) for arg in process_args]
        
        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing images"):
            result = future.result()
            if result is not None:
                landmarks, label = result
                X.append(landmarks)
                y.append(label)
    
    X = np.array(X, dtype=np.float32)
    y = np.array(y, dtype=np.int32)
    
    return X, y, label_map

In [11]:
def create_model(input_shape, num_classes):
    """Create a simple neural network model"""
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(input_shape,)),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(num_classes, activation='softmax')
    ])
    return model

In [50]:
def create_enhanced_model(input_shape, num_classes):
    inputs = tf.keras.Input(shape=(input_shape,))
    
    # Batch Normalization at the input
    x = tf.keras.layers.BatchNormalization()(inputs)
    
    # First dense block
    x = tf.keras.layers.Dense(512, kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    
    # Second dense block
    x = tf.keras.layers.Dense(256, kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    
    # Third dense block
    x = tf.keras.layers.Dense(128, kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    
    # Output layer
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

In [52]:
def train_enhanced_model(X_train, y_train, X_test, y_test, num_classes, epochs=150):
    # Data preprocessing
    # Normalize the input data
    X_train_mean = X_train.mean(axis=0)
    X_train_std = X_train.std(axis=0)
    X_train_normalized = (X_train - X_train_mean) / (X_train_std + 1e-7)
    X_test_normalized = (X_test - X_train_mean) / (X_train_std + 1e-7)
    
    # Create model
    model = create_enhanced_model(X_train.shape[1], num_classes)
    
    # Learning rate schedule
    initial_learning_rate = 0.001
    decay_steps = 1000
    decay_rate = 0.9
    learning_rate_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate, decay_steps, decay_rate
    )
    
    # Compile model with learning rate schedule
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate_schedule)
    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # Callbacks
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=20,
        restore_best_weights=True,
        verbose=1
    )
    
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=7,
        min_lr=1e-6,
        verbose=1
    )
    
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        'best_model.h5',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        X_train_normalized, y_train,
        validation_data=(X_test_normalized, y_test),
        epochs=epochs,
        batch_size=32,
        callbacks=[early_stopping, reduce_lr, model_checkpoint]
    )
    
    return model, history

In [54]:
DATA_DIR = './dataset/asl_alphabet_train/train/'
RANDOM_SEED = 42
MAX_SAMPLES_PER_CLASS = 1000  # Limit samples per class for faster processing

# Prepare dataset with parallel processing
X, y, label_map = prepare_dataset(DATA_DIR, max_samples_per_class=MAX_SAMPLES_PER_CLASS)
num_classes = len(label_map)

print(f"Dataset prepared: {X.shape[0]} samples, {num_classes} classes")

Processing 9000 images across 9 classes...


Processing images: 100%|███████████████████████████████████████████████████████████| 9000/9000 [33:25<00:00,  4.49it/s]


Dataset prepared: 9000 samples, 9 classes


In [56]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)

y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)

In [58]:
# Train the enhanced model
model, history = train_enhanced_model(X_train, y_train, X_test, y_test, num_classes)

Epoch 1/150
Epoch 1: val_accuracy improved from -inf to 0.10611, saving model to best_model.h5
Epoch 2/150


  saving_api.save_model(


Epoch 2: val_accuracy did not improve from 0.10611
Epoch 3/150
Epoch 3: val_accuracy did not improve from 0.10611
Epoch 4/150
Epoch 4: val_accuracy did not improve from 0.10611
Epoch 5/150
Epoch 5: val_accuracy improved from 0.10611 to 0.11111, saving model to best_model.h5
Epoch 6/150
Epoch 6: val_accuracy did not improve from 0.11111
Epoch 7/150
Epoch 7: val_accuracy improved from 0.11111 to 0.11500, saving model to best_model.h5
Epoch 8/150
Epoch 8: val_accuracy did not improve from 0.11500
Epoch 9/150
Epoch 9: val_accuracy did not improve from 0.11500
Epoch 10/150
Epoch 10: val_accuracy did not improve from 0.11500
Epoch 11/150
Epoch 11: val_accuracy improved from 0.11500 to 0.11667, saving model to best_model.h5
Epoch 12/150
Epoch 12: val_accuracy improved from 0.11667 to 0.11722, saving model to best_model.h5
Epoch 13/150
Epoch 13: val_accuracy did not improve from 0.11722
Epoch 14/150
Epoch 14: val_accuracy did not improve from 0.11722
Epoch 15/150
Epoch 15: val_accuracy did not

In [60]:
# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_accuracy:.4f}")

Test accuracy: 0.1006


In [39]:
model = create_model(X_train.shape[1], num_classes)
model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])

In [48]:
history = model.fit(X_train, y_train,validation_data=(X_test, y_test),epochs=150,batch_size=32,
                    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss',patience=5,restore_best_weights=True)])

Epoch 1/150
Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Epoch 8/150
Epoch 9/150
Epoch 10/150
Epoch 11/150
Epoch 12/150
Epoch 13/150
Epoch 14/150
Epoch 15/150
Epoch 16/150
Epoch 17/150
Epoch 18/150
Epoch 19/150
Epoch 20/150
Epoch 21/150
Epoch 22/150


In [45]:
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_accuracy:.4f}")

Test accuracy: 0.1517
