In [2]:
!pip install tensorflow mediapipe opencv-python numpy




In [3]:
import cv2
import numpy as np
import os
import mediapipe as mp
from IPython.display import clear_output
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
import time
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
import tensorflow as tf



In [6]:
import cv2
import numpy as np
import mediapipe as mp
import os
import time
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# ===============================
# SETTINGS
# ===============================
DATA_PATH = "data"
SIGNS = ["Thank You","Hello"]  # change or add more signs
NUM_SEQUENCES = 50
SEQUENCE_LENGTH = 30
FRAME_WIDTH = 1280
FRAME_HEIGHT = 720
COUNTDOWN_START = 3
MIN_VALID_FRAMES = 20
FEATURES = 126

# ===============================
# MEDIA PIPE SETUP
# ===============================
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# ===============================
# UTILITY FUNCTIONS
# ===============================
def extract_hand_keypoints(results):
    lh = np.zeros(21*3)
    rh = np.zeros(21*3)
    if results.left_hand_landmarks:
        lh = np.array([[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark]).flatten()
    if results.right_hand_landmarks:
        rh = np.array([[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark]).flatten()
    return np.concatenate([lh, rh])

def create_folders(signs):
    for sign in signs:
        os.makedirs(os.path.join(DATA_PATH, sign), exist_ok=True)

def display_countdown(frame, countdown, sign_name, sequence_num):
    overlay = frame.copy()
    cv2.putText(overlay, f"{countdown}", (FRAME_WIDTH//2 - 50, FRAME_HEIGHT//2),
                cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 0, 255), 10)
    cv2.putText(overlay, f"{sign_name} | Sequence {sequence_num+1}/{NUM_SEQUENCES}",
                (30, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 0), 3)
    return cv2.addWeighted(overlay, 0.7, frame, 0.3, 0)

def collect_sequence(cap, holistic, sign_name, seq_num):
    sequence = []
    frame_num = 0
    while frame_num < SEQUENCE_LENGTH:
        ret, frame = cap.read()
        if not ret:
            continue

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = holistic.process(image)

        if results.left_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        if results.right_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

        keypoints = extract_hand_keypoints(results)

        if np.any(keypoints != 0):
            sequence.append(keypoints)
            frame_num += 1

        cv2.putText(frame, f"Collecting: {sign_name}", (30, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 3)
        cv2.putText(frame, f"Sequence {seq_num+1}/{NUM_SEQUENCES} | Frame {frame_num}/{SEQUENCE_LENGTH}",
                    (30, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
        cv2.imshow("Sign Data Collection", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            raise KeyboardInterrupt

    if len(sequence) < MIN_VALID_FRAMES:
        print(f"Warning: Sequence {seq_num+1} for '{sign_name}' too short ({len(sequence)} frames). Skipping.")
        return None

    if len(sequence) < SEQUENCE_LENGTH:
        pad_len = SEQUENCE_LENGTH - len(sequence)
        padding = np.zeros((pad_len, FEATURES))
        sequence = np.vstack([sequence, padding])

    return sequence

# ===============================
# MAIN COLLECTION + PREPARE DATA
# ===============================
def collect_and_prepare_data(cap, signs):
    create_folders(signs)
    sequences = []
    labels = []

    # Load existing data first
    for sign in signs:
        action_path = os.path.join(DATA_PATH, sign)
        if os.path.exists(action_path):
            files = sorted(os.listdir(action_path))
            for file in files:
                file_path = os.path.join(action_path, file)
                seq = np.load(file_path)
                sequences.append(seq)
                labels.append(signs.index(sign))

    # Start new collection
    with mp_holistic.Holistic(min_detection_confidence=0.6,
                              min_tracking_confidence=0.6) as holistic:
        for sign in signs:
            print(f"\nStarting new collection for sign: {sign}")
            time.sleep(1)
            for seq in range(NUM_SEQUENCES):
                # Countdown
                for countdown in range(COUNTDOWN_START, 0, -1):
                    ret, frame = cap.read()
                    if not ret:
                        continue
                    overlay_frame = display_countdown(frame, countdown, sign, seq)
                    cv2.imshow("Sign Data Collection", overlay_frame)
                    cv2.waitKey(800)

                # Collect sequence
                sequence = collect_sequence(cap, holistic, sign, seq)
                if sequence is not None:
                    file_path = os.path.join(DATA_PATH, sign, f"new_{int(time.time())}_{seq}.npy")
                    np.save(file_path, sequence)
                    print(f"Saved new {sign} sequence {seq+1}/{NUM_SEQUENCES}")

                    sequences.append(sequence)
                    labels.append(signs.index(sign))

    # Convert to arrays
    sequences = np.array(sequences)
    labels = np.array(labels)

    # Split dataset
    X_train, X_test, y_train, y_test = train_test_split(
        sequences, labels, test_size=0.2, random_state=42, shuffle=True
    )

    # One-hot encode labels
    y_train = to_categorical(y_train, num_classes=len(signs))
    y_test = to_categorical(y_test, num_classes=len(signs))

    return X_train, X_test, y_train, y_test

# ===============================
# RUN EVERYTHING
# ===============================
cap = cv2.VideoCapture(0)
cap.set(3, FRAME_WIDTH)
cap.set(4, FRAME_HEIGHT)

try:
    X_train, X_test, y_train, y_test = collect_and_prepare_data(cap, SIGNS)
except KeyboardInterrupt:
    print("\nCollection interrupted by user.")

cap.release()
cv2.destroyAllWindows()
print("Data collection and preparation complete!")
print(f"Training samples: {len(X_train)}, Testing samples: {len(X_test)}")



Starting new collection for sign: Thank You
Saved new Thank You sequence 1/1

Starting new collection for sign: Hello
Saved new Hello sequence 1/1
Data collection and preparation complete!
Training samples: 76, Testing samples: 20


In [7]:
import os
import numpy as np
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# ===============================
# SETTINGS
# ===============================
DATA_PATH = "data"
SEQUENCE_LENGTH = 30
MIN_VALID_FRAMES = 20
FEATURES = 126

# ===============================
# LOAD & CLEAN DATA
# ===============================
actions = sorted(os.listdir(DATA_PATH))
label_map = {action: idx for idx, action in enumerate(actions)}

sequences = []
labels = []

for action in actions:
    action_path = os.path.join(DATA_PATH, action)
    files = sorted(os.listdir(action_path))
    for file in files:
        file_path = os.path.join(action_path, file)
        seq = np.load(file_path)

        # Remove invalid sequences
        if seq.shape[0] < MIN_VALID_FRAMES or np.all(seq == 0):
            print(f"Deleting invalid sequence: {file_path}")
            os.remove(file_path)
            continue

        # Pad if too short
        if seq.shape[0] < SEQUENCE_LENGTH:
            pad_len = SEQUENCE_LENGTH - seq.shape[0]
            seq = np.vstack([seq, np.zeros((pad_len, FEATURES))])

        sequences.append(seq)
        labels.append(label_map[action])

# Convert to arrays
sequences = np.array(sequences)
labels = np.array(labels)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(
    sequences, labels, test_size=0.2, random_state=42, shuffle=True
)

# One-hot encode labels
y_train = to_categorical(y_train, num_classes=len(actions))
y_test = to_categorical(y_test, num_classes=len(actions))

print(f"Actions: {actions}")
print(f"Training samples: {len(X_train)}, Testing samples: {len(X_test)}")


Actions: ['Hello', 'No', 'Thank You', 'Yes']
Training samples: 171, Testing samples: 43


In [8]:

# ===============================
# LSTM MODEL
# ===============================
model = Sequential()

# First LSTM layer
model.add(LSTM(128, return_sequences=True, activation='relu', input_shape=(30, sequences.shape[2])))
model.add(Dropout(0.2))
model.add(BatchNormalization())

# Second LSTM layer
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())

# Dense layer
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.2))

# Output layer
model.add(Dense(len(actions), activation='softmax'))

# Compile
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Summary
model.summary()



Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 30, 128)           130560    
                                                                 
 dropout (Dropout)           (None, 30, 128)           0         
                                                                 
 batch_normalization (Batch  (None, 30, 128)           512       
 Normalization)                                                  
                                                                 
 lstm_1 (LSTM)               (None, 64)                49408     
                                                                 
 dropout_1 (Dropout)         (None, 64)                0         
                                                                 
 batch_normalization_1 (Bat  (None, 64)                256       
 chNormalization)                                      

In [9]:
history = model.fit(X_train, y_train, epochs=200, validation_data=(X_test, y_test))


Epoch 1/200


Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 

In [10]:
model.save("sign_language_lstm.h5")
np.save("actions.npy", actions)


  saving_api.save_model(


In [11]:
# ===============================
# MEDIA PIPE SETUP
# ===============================
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# ===============================
# LOAD MODEL & ACTIONS
# ===============================
model = tf.keras.models.load_model("sign_language_lstm.h5")
actions = np.load("actions.npy", allow_pickle=True)

# ===============================
# KEYPOINT EXTRACTION (HANDS ONLY)
# ===============================
def extract_hand_keypoints(results):
    """Extract only left and right hand keypoints (21 points each)"""
    lh = np.zeros(21*3)
    rh = np.zeros(21*3)

    if results.left_hand_landmarks:
        lh = np.array([[lm.x, lm.y, lm.z] for lm in results.left_hand_landmarks.landmark]).flatten()
    if results.right_hand_landmarks:
        rh = np.array([[lm.x, lm.y, lm.z] for lm in results.right_hand_landmarks.landmark]).flatten()

    return np.concatenate([lh, rh])  # total 126 features

# ===============================
# CAMERA SETUP
# ===============================
cap = cv2.VideoCapture(0)
cap.set(3, 1280)
cap.set(4, 720)

sequence = []
last_action = ""
threshold = 0.9  # confidence threshold

# ===============================
# REAL-TIME PREDICTION LOOP
# ===============================
with mp_holistic.Holistic(min_detection_confidence=0.5,
                          min_tracking_confidence=0.5) as holistic:

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Convert BGR to RGB for MediaPipe
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = holistic.process(image)

        # Draw only hands on frame
        if results.left_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        if results.right_hand_landmarks:
            mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

        # Check if any hand is detected
        hands_present = results.left_hand_landmarks or results.right_hand_landmarks

        if hands_present:
            # Extract keypoints and add to sequence
            keypoints = extract_hand_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]  # keep last 30 frames

            # Predict only if sequence is full
            if len(sequence) == 30:
                pred = model.predict(np.expand_dims(sequence, axis=0), verbose=0)[0]
                max_confidence = np.max(pred)
                predicted_action = actions[np.argmax(pred)]

                if max_confidence > threshold:
                    last_action = predicted_action
                else:
                    last_action = ""  # clear action if confidence too low
        else:
            # No hands detected: clear sequence and last action
            sequence = []
            last_action = ""

        # Display action if available
        if last_action != "":
            cv2.putText(frame, f"{last_action}", (50, 100),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)

        # Show frame
        cv2.imshow("Sign Recognition", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Release resources
cap.release()
cv2.destroyAllWindows()
