# Import Dependencies

In [1]:
import cv2
import os
import logging
import time
import numpy as np
import mediapipe as mp
import json
from tqdm import tqdm # for progress bars in Jupyter Notebook
from matplotlib import pyplot as plt

## Detecting Keypoints using MP Holistic

In [50]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [None]:
def setup_holistic_detection():
    # Initialize Camera Input
    cap = cv2.VideoCapture(0)

    # Set Up MediaPipe Holistic Model
    holistic = mp_holistic.Holistic(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    )

    return cap, holistic

In [52]:
def process_mp_frames(frame, holistic):
    # Opencv records in BGR while mediapipe supports RGB
    # We need to recolor frame to RGB to support MediaPipe processing
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False                  # Make Image non-writeable for performance
    results = holistic.process(image)              # process image and return object contain landmarks
    image.flags.writeable = True                   # Back to writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # Color back to BGR for OpenCV 

    return image, results

In [55]:
def draw_landmarks(image, results):
    # Draw face landmarks
    mp_drawing.draw_landmarks(image, 
                              results.face_landmarks, 
                              mp_holistic.FACEMESH_TESSELATION, # FACEMESH_CONTOURS could also be valid here
                              landmark_drawing_spec=mp_drawing.DrawingSpec(color=(170,86,0), thickness=1, circle_radius=2),
                              connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style())
    # Draw pose connections
    mp_drawing.draw_landmarks(image, 
                              results.pose_landmarks, 
                              mp_holistic.POSE_CONNECTIONS,
                              landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, 
                              results.left_hand_landmarks, 
                              mp_holistic.HAND_CONNECTIONS,
                              landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style(), 
                              connection_drawing_spec=mp_drawing_styles.get_default_hand_connections_style())
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, 
                              results.right_hand_landmarks, 
                              mp_holistic.HAND_CONNECTIONS,
                              landmark_drawing_spec=mp_drawing_styles.get_default_hand_landmarks_style(), 
                              connection_drawing_spec=mp_drawing_styles.get_default_hand_connections_style())
    

In [56]:
def extract_keypoints_comprehensive(results):
    # Pose Landmarks have 33 Keypoints
    # if results.pose_landmarks:
    #     pose_landmarks = []
    #     for res in results.pose_landmakrs.landmark:
    #         pose_landmarks.extend([res.x, res.y, res.z, res.visibility])
    #     pose = np.array(pose_landmarks)
    # else:
    #     pose = np.zeros(33*4)
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    keypoints_vector = np.concatenate([pose, face, lh, rh])
    # print(keypoints_vector)
    return keypoints_vector

In [None]:
def run_holistic_detection():
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Suppress TensorFlow loggin
    logging.getLogger('mediapipe').setLevel(logging.ERROR)  # Only show errors from mediapipe
    cap, holistic = setup_holistic_detection()

    while cap.isOpened():
        # Read frame
        ret, frame = cap.read()
        if not ret:
            print('camera stopped: no frames grabbed')
            break

        # Process Frame/Make dectections
        image, results = process_mp_frames(frame, holistic)
        # print(results)

        # Draw Landmarks
        draw_landmarks(image, results)

        # Extract Keypoints
        keypoints_vector = extract_keypoints_comprehensive(results)

        # Show to screen
        cv2.imshow('ASL Detection', image)

        # Exit on 'q' press
        if cv2.waitKey(10) & 0xFF == ord('q'):
            draw_landmarks(frame, results)
            plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
run_holistic_detection()

## Dataset Collection Pipeline for Custom Camera Recordings

In [None]:
# Create Directories to store data
def create_directories(actions):
    DATA_PATH = os.path.join('MP_Data')

    # Detecting Actions
    for action in actions:
        # 30 is often considered minimum threshold to detect meaningful patterns
        # n >= 30 commonly used as rule of thumb
        for sequence in range(30): # 30 videos per action 
            try:
                os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
            except:
                pass

def collect_asl_data():
    # Define Actions - starting with a small set for testing
    actions = np.array(['hello', 'thanks', ' I love you', 'yes', 'no', 'please', 'sorry'])

    # create directories for data collection - 30 directories per action
    create_directories(actions)

    # Define data path
    DATA_PATH = os.path.join('MP_Data')

    # set up holistic model
    cap, holistic = setup_holistic_detection()

    # Loop through different actions
    for action in actions:
        # loop through sequences
        for sequence in range(30):
            # loop through video length of 30 frames
            for frame_num in range(30):
                # Read Feed
                ret, frame = cap.read()
                if not ret:
                    print('Failed to grab frame')
                    break

                # Process the frame for landmark detection
                image, results = process_mp_frames(frame, holistic)

                # Draw Landmarks on the image
                draw_landmarks(image, results)

                # Apply status text to the frame
                if frame_num == 0:
                    # Visual Countdown before starting each sequence
                    cv2.putText(image, 'STARTING COLLECTION', (120, 200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, f'Collecting for {action} - Sequence {sequence}', (15, 15), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                    cv2.imshow('ASL Data Collection', image)
                    cv2.waitKey(2000)  # Wait 2 seconds before starting
                else:
                    # Display collection status
                    cv2.putText(image, f'Collecting for {action} - Sequence {sequence}', (15, 15), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                    cv2.putText(image, f'Frame {frame_num}', (15, 30), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

                # show to screen
                cv2.imshow('ASK Data Collection', image)

                # Export keypoints
                keypoints_vector = extract_keypoints_comprehensive(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints_vector)
                
                # Break gracefully on 'q' press
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    cap.release()
                    cv2.destroyAllWindows()
                    return
            
            # Short break between sequences
            if sequence < 29:  # Don't show after the last sequence
                # Display break message
                ret, frame = cap.read()
                if ret:
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
                    cv2.putText(image, 'SEQUENCE COMPLETE', (120, 200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Preparing for next sequence...', (120, 230), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
                    cv2.imshow('ASL Data Collection', image)
                cv2.waitKey(2000)  # 2 second break
    
    # Release resources
    cap.release()
    cv2.destroyAllWindows()
    print("Data collection complete!")

In [None]:
collect_asl_data()

## Preprocess Data and Create Labels and Features

In [2]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [3]:
DATA_PATH = os.path.join('MP_Data_01')
sequence_length = 30

# Automatically detect action classes from directory structure
actions = np.array([folder for folder in os.listdir(DATA_PATH) if not folder.startswith('.DS_Store')]) # .DS_Store is issue
print(f"Detected actions: {actions}")

Detected actions: ['please' 'no' 'thanks' 'hello' 'yes' 'iloveyou' 'sorry']


In [4]:
# Create label mapping
label_map = {label: num for num, label in enumerate(actions)}
print(f"Label mapping: {label_map}")

Label mapping: {'please': 0, 'no': 1, 'thanks': 2, 'hello': 3, 'yes': 4, 'iloveyou': 5, 'sorry': 6}


In [5]:
# Initialize empty lists for sequences and labels
sequences, labels = [], []

# Loop through each action
for action in actions:
    # Loop through all sequences
    sequence_folders = [folder for folder in os.listdir(os.path.join(DATA_PATH, action)) if not folder.startswith('.DS_Store')]
    for sequence in np.array(sequence_folders).astype(int):
        # Build window of frames for this sequence
        window = []
        for frame_num in range(sequence_length):
            # Load the frame data (keypoints)
            try:
                res = np.load(os.path.join(DATA_PATH, action, str(sequence), f"{frame_num}.npy"))
                window.append(res)
            except Exception as e:
                print(f"Error loading file: {os.path.join(DATA_PATH, action, str(sequence), f'{frame_num}.npy')}")
                print(f"Error details: {e}")
                # If a frame is missing, you can choose to skip or use zeros
                # Here we'll use zeros to maintain sequence length
                window.append(np.zeros(res.shape if 'res' in locals() else 1662))  # 1662 is the expected dimension based on your extraction function

        # Add this sequence and its label to our datasets - FOR WLASL -> may not have complete sequence
        if len(window) == sequence_length:  # Ensure we have a complete sequence
            sequences.append(window)
            labels.append(label_map[action])

# Convert sequences and labels to numpy arrays
X = np.array(sequences)
y = to_categorical(labels).astype(int)

print(f"Data shape: {X.shape}")  # Should be (num_sequences, sequence_length, num_features)
print(f"Labels shape: {y.shape}")  # Should be (num_sequences, num_classes)

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)

print(f"Training data shape: {X_train.shape}")
print(f"Testing data shape: {X_test.shape}")

Data shape: (210, 30, 1662)
Labels shape: (210, 7)
Training data shape: (199, 30, 1662)
Testing data shape: (11, 30, 1662)


## LSTM Model for Custom ASL Data

In [6]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, Input
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.optimizers import Adam

In [7]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [8]:
# Define input shape (sequence_length, features)
inputs = Input(shape=(30, 1662))

# First LSTM layer
x = LSTM(64, return_sequences=True, activation='relu')(inputs)

# Second LSTM layer
x = LSTM(128, return_sequences=True, activation='relu')(x)

# Third LSTM layer
x = LSTM(64, return_sequences=False, activation='relu')(x)

# Dense layers
x = Dense(64, activation='relu')(x)
x = Dense(32, activation='relu')(x)

# Output layer
outputs = Dense(actions.shape[0], activation='softmax')(x)

# Create model
model = Model(inputs=inputs, outputs=outputs)

In [26]:
# SIMPLE LSTM MODEL
inputs = Input(shape=(30, 1662))
x = LSTM(32, return_sequences=False, activation='relu')(inputs)
x = Dense(32, activation='relu')(x)
outputs = Dense(actions.shape[0], activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)

In [27]:
model.compile(optimizer=Adam(learning_rate=0.01), loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [28]:
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

Epoch 1/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 13ms/step - categorical_accuracy: 0.0800 - loss: 49.1760  
Epoch 2/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - categorical_accuracy: 0.1591 - loss: 33.4323
Epoch 3/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - categorical_accuracy: 0.1888 - loss: 9.3914
Epoch 4/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - categorical_accuracy: 0.0973 - loss: 7.4281
Epoch 5/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - categorical_accuracy: 0.1214 - loss: 5.0369   
Epoch 6/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - categorical_accuracy: 0.1654 - loss: 4.4449
Epoch 7/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step - categorical_accuracy: 0.1096 - loss: 2.7509
Epoch 8/2000
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1

<keras.src.callbacks.history.History at 0x3762f2010>

In [29]:
model.summary()

## Save Weights

In [34]:
model.save('Models/01_lstm_model.h5')
model.save_weights('Models/01_model.weights.h5');



In [35]:
import tensorflow as tf
model = tf.keras.models.load_model('Models/01_lstm_model.h5')
model.load_weights('Models/01_model.weights.h5')

  saveable.load_own_variables(weights_store.get(inner_path))


## Evaluate Using Confusion Matrix

In [36]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [37]:
yhat = model.predict(X_test)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 144ms/step


In [38]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [44]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[10,  0],
        [ 0,  1]],

       [[ 8,  1],
        [ 0,  2]],

       [[ 9,  0],
        [ 1,  1]],

       [[ 9,  1],
        [ 0,  1]],

       [[ 9,  0],
        [ 1,  1]],

       [[ 8,  0],
        [ 0,  3]]])

In [45]:
accuracy_score(ytrue, yhat)

0.8181818181818182

# Realtime Test

In [47]:
from scipy import stats

In [58]:
colors = [
    (245, 117, 16),  # Orange
    (117, 245, 16),  # Green
    (16, 117, 245),  # Blue
    (255, 0, 0),     # Red
    (0, 255, 255),   # Cyan
    (255, 0, 255),   # Magenta
    (0, 0, 0)        # Black
]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [59]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = process_mp_frames(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints_comprehensive(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

I0000 00:00:1744310901.506077 16339090 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M1 Pro
W0000 00:00:1744310901.593049 16609793 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744310901.608987 16609793 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744310901.611712 16609791 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744310901.611842 16609793 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1744310901.615634 16609798 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disablin

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

## MVP

In [None]:
from collections import deque

def prob_viz(res, actions, input_frame, colors):
    """
    Visualize probabilities for each action
    """
    output_frame = input_frame.copy()
    
    # Draw rectangles and text for each action
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0, 60 + num * 40), (int(prob * 100), 90 + num * 40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85 + num * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    
    return output_frame

def real_time_asl_detection():
    # Define actions - must match the ones used for training
    actions = np.array(['hello', 'thanks', 'iloveyou', 'yes', 'no', 'please', 'sorry'])
    
    # Define colors for visualization
    colors = [(245, 117, 16), (117, 245, 16), (16, 117, 245), (255, 0, 0), (0, 255, 0), (0, 0, 255), (200, 200, 200)]
    
    # Load the model
    model_path = 'Models/final_model.h5'
    if not os.path.exists(model_path):
        print(f"Error: Model not found at {model_path}")
        return
    
    model = tf.keras.models.load_model(model_path)
    print(f"Model loaded from {model_path}")
    
    # Initialize sequence buffer
    sequence = deque(maxlen=30)
    
    # Initialize prediction variables
    sentence = []
    threshold = 0.8  # Confidence threshold
    last_prediction_time = 0
    cooldown = 2.0  # Seconds between predictions
    
    # Initialize webcam
    cap = cv2.VideoCapture(0)
    
    # Initialize MediaPipe Holistic
    with mp_holistic.Holistic(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    ) as holistic:
        while cap.isOpened():
            # Read frame
            ret, frame = cap.read()
            if not ret:
                print("Failed to grab frame")
                break
                
            # Make frame smaller for faster processing
            frame = cv2.resize(frame, (640, 480))
            
            # Process frame
            image, results = process_mp_frames(frame, holistic)
            
            # Draw landmarks
            draw_landmarks(image, results)
            
            # Extract keypoints
            keypoints = extract_keypoints_comprehensive(results)
            
            # Add to sequence
            sequence.append(keypoints)
            
            # Make prediction when buffer is full
            if len(sequence) == 30:
                # Prepare for model
                input_data = np.expand_dims(np.array(sequence), axis=0)
                
                # Get prediction
                res = model.predict(input_data, verbose=0)[0]
                
                # Get class with highest probability
                predicted_class_idx = np.argmax(res)
                predicted_class = actions[predicted_class_idx]
                confidence = res[predicted_class_idx]
                
                # Only add prediction if confidence is high enough and cooldown has passed
                current_time = time.time()
                if confidence > threshold and (current_time - last_prediction_time) > cooldown:
                    # Add prediction to sentence if it's different from the last one
                    if len(sentence) == 0 or predicted_class != sentence[-1]:
                        sentence.append(predicted_class)
                        # Limit sentence length
                        if len(sentence) > 5:
                            sentence = sentence[-5:]
                    
                    last_prediction_time = current_time
                
                # Visualize probabilities
                image = prob_viz(res, actions, image, colors)
            
            # Show prediction on frame
            cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
            
            # Show current detected sign
            if len(sentence) > 0:
                cv2.putText(image, f"Detected: {sentence[-1]}", (3, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Show full sentence
            if len(sentence) > 1:
                # Position at bottom of frame
                sentence_text = ' '.join(sentence)
                text_size = cv2.getTextSize(sentence_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)[0]
                
                # Create transparent overlay for text
                overlay = image.copy()
                cv2.rectangle(overlay, (0, image.shape[0] - 50), (image.shape[1], image.shape[0]), (0, 0, 0), -1)
                alpha = 0.7
                image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
                
                # Draw text
                cv2.putText(image, sentence_text, (10, image.shape[0] - 20), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Show frame
            cv2.imshow('ASL Detection', image)
            
            # Break on 'q' key
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
    
    # Release resources
    cap.release()
    cv2.destroyAllWindows()


In [None]:
real_time_asl_detection()

## Dataset Collection and Processing Pipeline for Pre-built Dataset

In [None]:
WLASL_PATH = "WLASL_DATA"
VIDEOS_PATH = os.path.join(WLASL_PATH, "videos")
JSON_PATH = os.path.join(WLASL_PATH, "WLASL_v0.3.json")
OUTPUT_PATH = "WLASL_Processed"

# Create Output directory
os.makedirs(OUTPUT_PATH, exist_ok=True)

**Verify Dataset Structure**

In [None]:
print("WLASL folder contents:")
print(os.listdir(WLASL_PATH))

# If there's a JSON file, check its structure
json_path = os.path.join(WLASL_PATH, "WLASL_v0.3.json")
if os.path.exists(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    print(f"\nJSON file contains data for {len(data)} signs")
    print(f"Example of first sign: {data[0]['gloss']}")

**Process Videos**

In [None]:
def process_video(video_path, output_dir, max_frames=30):
    """Process a single video file and extract landmarks"""
    # Create Output Directory
    os.makedirs(output_dir, exist_ok=True)

    # open video
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate frame sampling rate
    if frame_count <= max_frames:
        # Use all frames is video is short
        frame_indices = list(range(frame_count))
    else:
        # Sample frames evenly if video is long
        frame_indices = np.linspace(0, frame_count-1, max_frames, dtype=int)

    # Initialize MediaPipe
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        # Process Frames
        for frame_idx, i in enumerate(frame_indices):
            # Set frame position
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            success, frame = cap.read()

            if not success:
                print(f"Could not read frame {i} from {video_path}")
                continue
                
            # Process image
            image, results = process_mp_frames(frame, holistic)
            
            # Extract and savve keypoints
            keypoints_vector = extract_keypoints_comprehensive(results)
            npy_path = os.path.join(output_dir, str(frame_idx))
            np.save(npy_path, keypoints_vector)

    cap.release()
    return len(frame_indices)

In [None]:
def process_wlasl_dataset(max_signs=None, max_videos_per_sign=30):
    """Process the WLASL dataset"""
    # Load WLASL metadata
    with open(JSON_PATH, 'r') as f:
        wlasl_data = json.load(f)

    # Limit number of signs if specified
    if max_signs:
        wlasl_data = wlasl_data[:max_signs]

    # list to store sign info
    processed_signs = []

    # Create sign to ID mapping
    sign_map = {sign_data['gloss']: idx for idx, sign_data in enumerate(wlasl_data)}

    # Save sign map for later use
    with open(os.path.join(OUTPUT_PATH, 'sign_map.json'), 'w') as f:
        json.dump(sign_map, f, indent=2)

    # Process each sign
    for sign_idx, sign_data in enumerate(tqdm(wlasl_data, desc="Processing signs")):
        sign_name = sign_data['gloss']
        sign_id = sign_idx  # Use index as class ID
        
        # Create directory for this sign
        sign_dir = os.path.join(OUTPUT_PATH, sign_name)
        os.makedirs(sign_dir, exist_ok=True)

        # Process videos for this sign
        videos_processed = 0
        
        for i, instance in enumerate(sign_data['instances']):
            if videos_processed >= max_videos_per_sign:
                break
                
            video_id = instance['video_id']
            video_path = os.path.join(VIDEOS_PATH, f"{video_id}.mp4")
            
            # Check if video exists
            if not os.path.exists(video_path):
                print(f"Video not found: {video_path}")
                continue
            
            # Process video
            output_dir = os.path.join(sign_dir, str(videos_processed))
            frames_processed = process_video(video_path, output_dir)
            
            if frames_processed > 0:
                videos_processed += 1
                print(f"Processed {sign_name} video {videos_processed}/{max_videos_per_sign}")
        
        # Add to processed signs list
        processed_signs.append({
            'name': sign_name,
            'id': sign_id,
            'videos_processed': videos_processed
        })
        
        print(f"Completed sign: {sign_name} - {videos_processed} videos processed")
    
    # Save processed sign information
    with open(os.path.join(OUTPUT_PATH, 'processed_signs.json'), 'w') as f:
        json.dump(processed_signs, f, indent=2)
    
    return processed_signs

**Processing Dataset in parallel**

*The parallel data processing is put here just to view, but this code was copied into a seperate python script bceause Jupyter Notebook doesn't really support multiprocessing*

In [None]:
import multiprocessing
from functools import partial

In [None]:
def process_video_parallel(video_info, output_base_dir, max_frames=30):
    """Process a single video file and extract landmarks"""
    sign_name, video_idx, video_id = video_info
    
    # Create output directory for this video
    output_dir = os.path.join(output_base_dir, sign_name, str(video_idx))
    os.makedirs(output_dir, exist_ok=True)
    
    # Form video path
    video_path = os.path.join("WLASL_DATA", "videos", f"{video_id}.mp4")
    
    # Check if video exists
    if not os.path.exists(video_path):
        print(f"Video not found: {video_path}")
        return False
    
    try:
        # Open video
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Calculate frame sampling rate
        if frame_count <= max_frames:
            frame_indices = list(range(frame_count))
        else:
            frame_indices = np.linspace(0, frame_count-1, max_frames, dtype=int)
        
        # Initialize MediaPipe - create a new instance for each process
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
            # Process Frames
            for frame_idx, i in enumerate(frame_indices):
                # Set frame position
                cap.set(cv2.CAP_PROP_POS_FRAMES, i)
                success, frame = cap.read()
                if not success:
                    continue
                    
                # Process image
                results = process_mp_frames(frame, holistic)
                
                # Extract and save keypoints
                keypoints_vector = extract_keypoints_comprehensive(results)
                npy_path = os.path.join(output_dir, f"{frame_idx}.npy")
                np.save(npy_path, keypoints_vector)
        
        cap.release()
        return True
    except Exception as e:
        print(f"Error processing {video_path}: {e}")
        return False

In [None]:
def process_wlasl_dataset_parallel(max_signs=None, max_videos_per_sign=30, num_processes=None):
    """Process WLASL Dataset using parallel processing"""
    # If num processes isn't specified using all the available cores - 1
    if num_processes is None:
        num_processes = max(1, multiprocessing.cpu_count() - 1)

    # Load WLASL metadata
    with open(os.path.join("WLASL_DATA", "WLASL_v0.3.json"), 'r') as f:
        wlasl_data = json.load(f)
    
    # Limit number of signs if specified
    if max_signs:
        wlasl_data = wlasl_data[:max_signs]

    # Create sign to ID mapping
    sign_map = {sign_data['gloss']: idx for idx, sign_data in enumerate(wlasl_data)}
    
    # Output directory
    output_path = "WLASL_Processed"
    os.makedirs(output_path, exist_ok=True)
    
    # Save sign map for later use
    with open(os.path.join(output_path, 'sign_map.json'), 'w') as f:
        json.dump(sign_map, f, indent=2)
    
    # Prepare list of videos to process
    videos_to_process = []
    for sign_idx, sign_data in enumerate(wlasl_data):
        sign_name = sign_data['gloss']
        
        # Create directory for this sign
        sign_dir = os.path.join(output_path, sign_name)
        os.makedirs(sign_dir, exist_ok=True)
        
        # Add videos for this sign to process list (limited by max_videos_per_sign)
        for video_idx, instance in enumerate(sign_data['instances'][:max_videos_per_sign]):
            video_id = instance['video_id']
            videos_to_process.append((sign_name, video_idx, video_id))
            
    # Create a partial function with fixed arguments
    process_func = partial(process_video_parallel, output_base_dir=output_path)
    
    # Process videos in parallel
    print(f"Processing {len(videos_to_process)} videos using {num_processes} processes...")
    
    # Create a multiprocessing pool
    with multiprocessing.Pool(processes=num_processes) as pool:
        # Process videos and track progress with tqdm
        results = list(tqdm(
            pool.imap(process_func, videos_to_process),
            total=len(videos_to_process),
            desc="Processing videos"
        ))
    
    # Count processed videos per sign
    processed_counts = {}
    for sign_name, _, _ in videos_to_process:
        if sign_name not in processed_counts:
            processed_counts[sign_name] = 0
        processed_counts[sign_name] += 1
    
    # Create processed sign information
    processed_signs = [
        {
            'name': sign_data['gloss'],
            'id': idx,
            'videos_processed': processed_counts.get(sign_data['gloss'], 0)
        }
        for idx, sign_data in enumerate(wlasl_data)
    ]
    
    # Save processed sign information
    with open(os.path.join(output_path, 'processed_signs.json'), 'w') as f:
        json.dump(processed_signs, f, indent=2)
    
    print(f"Dataset processing complete. Processed {sum(results)} videos successfully.")
    return processed_signs

In [None]:
# Testing with small subset
process_wlasl_dataset(max_signs=10, max_videos_per_sign=10)

In [None]:
# Process Entire Dataset
# process_wlasl_dataset()

In [None]:
# Process Entire WLASL Dataset in Parallel
# process_wlasl_dataset_parallel(num_processes=3)

## Basic LSTM Model using WLASL Dataset

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

np.random.seed(42)
tf.random.set_seed(42)

def load_wlasl_data(processed_data_path, sequence_length=30):
    """Load Processed WLASL data into train/test sets"""
    # Load sign mapping
    with open(os.path.join(processed_data_path, 'sign_map.json'), 'r') as f:
        sign_map = json.load(f)
    
    # Get list of sign directories
    sign_dirs = [d for d in os.listdir(processed_data_path) 
                 if os.path.isdir(os.path.join(processed_data_path, d)) and d in sign_map]
    
    # Prepare data containers
    X = []  # Features
    y = []  # Labels

    # Process each sign
    for sign_name in sign_dirs:
        sign_dir = os.path.join(processed_data_path, sign_name)
        
        # Get video instances for this sign
        video_dirs = [d for d in os.listdir(sign_dir) 
                     if os.path.isdir(os.path.join(sign_dir, d))]
        
        # Process each video instance
        for video_idx in video_dirs:
            video_dir = os.path.join(sign_dir, video_idx)
            
            # Get frame files
            frame_files = [f for f in os.listdir(video_dir) if f.endswith('.npy')]
            frame_files.sort(key=lambda x: int(x.split('.')[0]))  # Sort by frame index
            
            # Check if we have enough frames
            if len(frame_files) < 1:
                continue
            
            # Load sequence
            sequence = []
            for frame_file in frame_files:
                frame_path = os.path.join(video_dir, frame_file)
                frame_data = np.load(frame_path)
                sequence.append(frame_data)
            
            # Pad or truncate to sequence_length
            if len(sequence) > sequence_length:
                sequence = sequence[:sequence_length]
            elif len(sequence) < sequence_length:
                # Pad with zeros
                padding = [np.zeros_like(sequence[0]) for _ in range(sequence_length - len(sequence))]
                sequence.extend(padding)
            
            # Add to dataset
            X.append(np.array(sequence))
            y.append(sign_map[sign_name])
    
    # Convert to numpy arrays
    X = np.array(X)
    y = np.array(y)
    
    # One-hot encode labels
    num_classes = len(sign_map)
    y_encoded = to_categorical(y, num_classes=num_classes)
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_encoded, test_size=0.2, random_state=42, stratify=y
    )
    
    print(f"Loaded {len(X)} sequences across {len(sign_map)} signs")
    print(f"Features shape: {X.shape}")
    print(f"Labels shape: {y_encoded.shape}")
    print(f"Training set: {X_train.shape[0]} sequences")
    print(f"Test set: {X_test.shape[0]} sequences")
    
    return X_train, X_test, y_train, y_test, sign_map

In [None]:
def create_model(input_shape, num_classes):
    """Basic LSTM model for ASL Translation"""
    model = Sequential()
    
    # LSTM layers
    model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=input_shape))
    model.add(LSTM(128, return_sequences=True, activation='relu'))
    model.add(LSTM(64, return_sequences=False, activation='relu'))
    
    # Dense layers
    model.add(Dense(64, activation='relu'))
    model.add(Dense(32, activation='relu'))
    
    # Output layer
    model.add(Dense(num_classes, activation='softmax'))

    # # Create Adam optimizer with a lower learning rate
    # optimizer = Adam(learning_rate=0.0001)  # Default is 0.001, so this is 10x smaller
    
    # Compile model
    model.compile(
        optimizer='Adam',
        loss='categorical_crossentropy',
        metrics=['categorical_accuracy']
    )
    
    return model

In [None]:
def train_model(model, X_train, y_train, X_test, y_test, epochs=100, batch_size=32):
    # Setup TensorBoard logging
    log_dir = os.path.join('Logs')
    os.makedirs(log_dir, exist_ok=True)
    tb_callback = TensorBoard(log_dir=log_dir)
    
    # Setup model checkpoint to save best model
    checkpoint_path = os.path.join('Models', 'checkpoint.h5')
    os.makedirs('Models', exist_ok=True)
    cp_callback = ModelCheckpoint(
        checkpoint_path, 
        monitor='val_categorical_accuracy',
        save_best_only=True,
        verbose=1
    )
    
    # # Early stopping to prevent overfitting
    # es_callback = EarlyStopping(
    #     monitor='val_categorical_accuracy',
    #     patience=10,
    #     verbose=1,
    #     restore_best_weights=True
    # )
    
    # Train model
    history = model.fit(
        X_train, y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_test, y_test),
        callbacks=[tb_callback, cp_callback]
    )
    
    return history

In [None]:
processed_data_path = "WLASL_Processed"
X_train, X_test, y_train, y_test, sign_map = load_wlasl_data(processed_data_path)
# Get input shape and number of classes
input_shape = (X_train.shape[1], X_train.shape[2])  # (sequence_length, num_features)
num_classes = y_train.shape[1]

# Create model
model = create_model(input_shape, num_classes)

# Print model summary
model.summary()

# Train model
history = train_model(model, X_train, y_train, X_test, y_test, epochs=100)

In [None]:
def plot_training_history(history):
    """
    Plot the training history
    
    Args:
        history: Training history returned by model.fit()
    """
    # Create figure with 2 subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot accuracy
    ax1.plot(history.history['categorical_accuracy'])
    ax1.plot(history.history['val_categorical_accuracy'])
    ax1.set_title('Model Accuracy')
    ax1.set_ylabel('Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.legend(['Train', 'Validation'], loc='lower right')
    ax1.grid(True, alpha=0.3)
    
    # Plot loss
    ax2.plot(history.history['loss'])
    ax2.plot(history.history['val_loss'])
    ax2.set_title('Model Loss')
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epoch')
    ax2.legend(['Train', 'Validation'], loc='upper right')
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()

def evaluate_model(model, X_test, y_test, sign_map):
    """
    Evaluate the model performance
    
    Args:
        model: Trained Keras model
        X_test, y_test: Test data
        sign_map: Mapping from sign names to indices
    """
    # Get predictions
    y_pred = model.predict(X_test)
    
    # Convert from one-hot encoded to class indices
    y_true = np.argmax(y_test, axis=1)
    y_pred = np.argmax(y_pred, axis=1)
    
    # Calculate accuracy
    accuracy = np.sum(y_true == y_pred) / len(y_true)
    print(f"Test accuracy: {accuracy:.4f}")
    
    # Get inverse mapping (index to sign name)
    idx_to_sign = {v: k for k, v in sign_map.items()}
    
    # Show some example predictions
    print("\nExample predictions:")
    for i in range(min(5, len(y_true))):
        true_sign = idx_to_sign[y_true[i]]
        pred_sign = idx_to_sign[y_pred[i]]
        print(f"True: {true_sign}, Predicted: {pred_sign}")

In [None]:
# Plot training history
plot_training_history(history)

# Evaluate model
evaluate_model(model, X_test, y_test, sign_map)