In [1]:
%pip install tensorflow-gpu opencv-python mediapipe scikit-learn sklearn gTTS googletrans==4.0.0-rc1 pygame matplotlib

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pygame

print("Pygame version:", pygame.__version__)

pygame 2.3.0 (SDL 2.24.2, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html
Pygame version: 2.3.0


In [3]:
import numpy as np
import cv2
import os
import mediapipe as mp

In [4]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [5]:
def mediapipe_detection(input_image, model):
    # Convert image from BGR to RGB
    input_image = cv2.cvtColor(input_image, cv2.COLOR_BGR2RGB)
    # Make prediction
    detection_results = model.process(input_image)
    # Convert image back to BGR
    input_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR)

    return input_image, detection_results

In [6]:
def draw_landmarks(input_image, detection_results):
    # Draw face connections
    mp_drawing.draw_landmarks(input_image, detection_results.face_landmarks, mp_holistic.FACEMESH_TESSELATION)
    
    # Draw pose connections
    mp_drawing.draw_landmarks(input_image, detection_results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    
    # Draw left hand connections
    mp_drawing.draw_landmarks(input_image, detection_results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    
    # Draw right hand connections
    mp_drawing.draw_landmarks(input_image, detection_results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

In [7]:
def draw_styled_landmarks(input_image, detection_results):
    # Set drawing specifications for face connections
    face_points = mp_drawing.DrawingSpec(color=(0,0,0), thickness=1, circle_radius=1) 
    face_lines = mp_drawing.DrawingSpec(color=(255,255,255), thickness=1, circle_radius=1)

    # Set drawing specifications for pose connections
    pose_points = mp_drawing.DrawingSpec(color=(0,0,0), thickness=1, circle_radius=2)
    pose_lines = mp_drawing.DrawingSpec(color=(255,255,255), thickness=1, circle_radius=2)

    # Set drawing specifications for left hand connections
    left_hand_points = mp_drawing.DrawingSpec(color=(0,0,0), thickness=1, circle_radius=2)
    left_hand_lines = mp_drawing.DrawingSpec(color=(255,255,255), thickness=1, circle_radius=2)

    # Set drawing specifications for right hand connections
    right_hand_points = mp_drawing.DrawingSpec(color=(0,0,0), thickness=1, circle_radius=2)
    right_hand_lines = mp_drawing.DrawingSpec(color=(255,255,255), thickness=1, circle_radius=2)

    # Draw face connections
    mp_drawing.draw_landmarks(input_image, detection_results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, face_points, face_lines)

    # Draw pose connections
    mp_drawing.draw_landmarks(input_image, detection_results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, pose_points, pose_lines)

    # Draw left hand connections
    mp_drawing.draw_landmarks(input_image, detection_results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, left_hand_points, left_hand_lines)

    # Draw right hand connections
    mp_drawing.draw_landmarks(input_image, detection_results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, right_hand_points, right_hand_lines)


In [8]:
# Extract keypoints into flattened arrays
def extract_keypoints (detection_results):
    pose = np.array([[results.x, results.y, results.z, results.visibility]
    for results in detection_results.pose_landmarks.landmark]).flatten() if detection_results.pose_landmarks else np.zeros(33*4)

    face = np.array([[results.x, results.y, results.z]
    for results in detection_results.face_landmarks.landmark]).flatten() if detection_results.face_landmarks else np.zeros(468*3)

    left_hand = np.array([[results.x, results.y, results.z]
    for results in detection_results.left_hand_landmarks.landmark]).flatten() if detection_results.left_hand_landmarks else np.zeros(21*3)

    right_hand = np.array([[results.x, results.y, results.z]
    for results in detection_results.right_hand_landmarks.landmark]).flatten() if detection_results.right_hand_landmarks else np.zeros(21*3)

    return np.concatenate([pose, face, left_hand, right_hand])

In [9]:
DATA_PATH = os.path.join('GestureData')

# Gesture that will be detected 
gestures = np.array(['Hello', 'Good', 'Morning', 'Afternoon', 'Night', 'Thanks', 'Sorry'])

# Number of sequences of data
n_frames = 50
# Length of each video will be 30 frames
length_frames = 20

In [10]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [11]:
label_map = {label:num for num, label in enumerate(gestures)}

In [12]:
sequences, labels = [], []
for gesture in gestures:
    for frame in np.array(os.listdir(os.path.join(DATA_PATH, gesture))).astype(int):
        window = []
        for frame_num in range(length_frames):
            res = np.load(os.path.join(DATA_PATH, gesture, str(frame), f"{frame_num}.npy"))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[gesture])

In [13]:
x = np.array(sequences)

In [14]:
y = to_categorical(labels).astype(int)

In [None]:
import random

In [None]:
def random_flip_keypoints(keypoints, flip_prob=0.5):
    if random.random() < flip_prob:
        keypoints[..., 0] = 1 - keypoints[..., 0]
        
    return keypoints

In [None]:
def random_scale_keypoints(keypoints, scale_range=(0.8, 1.2), axis_range=(0.8, 1.2)):
    scale_x = random.uniform(axis_range[0], axis_range[1])
    scale_y = random.uniform(axis_range[0], axis_range[1])
    keypoints[..., 0] = keypoints[..., 0] * scale_x
    keypoints[..., 1] = keypoints[..., 1] * scale_y
    return keypoints

In [None]:
def random_translate_keypoints(keypoints, translate_range=(-0.1, 0.1)):
    tx = random.uniform(translate_range[0], translate_range[1])
    ty = random.uniform(translate_range[0], translate_range[1])
    
    if keypoints.ndim < 3:
        print("Error: keypoints must have at least 3 dimensions")
        return keypoints
    
    keypoints[..., 0] = np.roll(keypoints[..., 0], int(tx * keypoints.shape[-2]), axis=-2)
    keypoints[..., 1] = np.roll(keypoints[..., 1], int(ty * keypoints.shape[-2]), axis=-2)
    
    return keypoints

In [None]:
def augment_keypoints_sequences(sequences, labels, num_augmentations=3):
    augmented_sequences = []
    augmented_labels = []

    for sequence, label in zip(sequences, labels):
        # Apply augmentation functions to each frame in the sequence
        for _ in range(num_augmentations):
            augmented_sequence = [random_flip_keypoints(frame) for frame in sequence]
            augmented_sequence = [random_scale_keypoints(frame) for frame in augmented_sequence]
            augmented_sequence = [random_translate_keypoints(frame) for frame in augmented_sequence]

            # Add augmented sequence to the list
            augmented_sequences.append(augmented_sequence)
            augmented_labels.append(label)

    return np.array(augmented_sequences), np.array(augmented_labels)

In [None]:
import matplotlib.pyplot as plt

num_augmentations = 3

# Apply data augmentation
augmented_sequences, augmented_labels = augment_keypoints_sequences(sequences, labels, num_augmentations=3)

# Visualize results
sample_idx = 14  # Change this to visualize a different sample
frame_idx = 4   # Change this to visualize a different frame within the sample

for i in range(num_augmentations + 1):
    plt.subplot(1, num_augmentations + 1, i + 1)
    
    if i == 0:
        plt.title("Original")
        keypoints = sequences[sample_idx][frame_idx]
    else:
        plt.title(f"Augmented {i}")
        keypoints = augmented_sequences[sample_idx * num_augmentations + (i - 1)][frame_idx]

    plt.scatter(keypoints[0::3], keypoints[1::3], s=5)
    plt.gca().invert_yaxis()
    plt.axis('equal')

plt.show()

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.05)
x_train_augmented, y_train_augmented = augment_keypoints_sequences(x_train, y_train)

In [None]:
x_train_combined = np.concatenate((x_train, x_train_augmented), axis=0)
y_train_combined = np.concatenate((y_train, y_train_augmented), axis=0)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
from tensorflow.keras.optimizers import Adam

In [None]:
import tensorflow

tensorflow.test.is_built_with_cuda()

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)
early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True)

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(20,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(gestures.shape[0], activation='softmax')) 

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
model.fit(x_train_combined, y_train_combined, validation_split=0.1, epochs=400, callbacks=[tb_callback])

In [None]:
#model.save('my_model.h5')

In [15]:
from tensorflow.keras.models import load_model

model = load_model('my_model.h5')

In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [16]:
import time
import os
import uuid
from gtts import gTTS
from pygame import mixer
import threading
import atexit
import tempfile
import shutil
import collections

In [17]:
mixer.init()
pygame.init()

def play_text(text):
    try:
        tts = gTTS(text=text, lang='en', slow=False)
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
        unique_filename = temp_file.name
        tts.save(unique_filename)
        temp_file.close()
        playback_thread = threading.Thread(target=play_and_delete_audio, args=(unique_filename,))
        playback_thread.start()
    except Exception as e:
        print(f"Error playing text: {e}")

def play_and_delete_audio(unique_filename):
    try:
        mixer.music.load(unique_filename)
        mixer.music.play()
        while mixer.music.get_busy():
            time.sleep(0.1)
        os.remove(unique_filename)
    except Exception as e:
        print(f"Error in play_and_delete_audio: {e}")

def moving_average(predictions, window_size=5):
    if len(predictions) < window_size:
        return predictions
    cumsum = np.cumsum(predictions, dtype=float)
    cumsum[window_size:] = cumsum[window_size:] - cumsum[:-window_size]
    cumsum[:window_size] /= np.arange(1, window_size + 1)
    return cumsum


sequence = []
current_gesture = []
predictions = []

threshold = 0.98
window_size = 5

cap = cv2.VideoCapture(0)

cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    prev_frame_time = 0
    while cap.isOpened():
        ret, input_frame = cap.read()

        input_image, detection_results = mediapipe_detection(input_frame, holistic)
        draw_styled_landmarks(input_image, detection_results)

        keypoints = extract_keypoints(detection_results)
        sequence.append(keypoints)
        sequence = sequence[-20:]

        if len(sequence) == 20:
            result = model.predict(np.expand_dims(sequence, axis=0))[0]
            predictions.append(np.argmax(result))

            if len(predictions) >= window_size:
                smoothed_predictions = moving_average(predictions[-window_size:], window_size)

                if len(current_gesture) > 0:
                    if (
                        np.unique(smoothed_predictions)[-1] == np.argmax(result)
                        and result[np.argmax(result)] > threshold
                    ) and gestures[np.argmax(result)] != current_gesture[-1]:
                        current_gesture.append(gestures[np.argmax(result)])
                        play_text(gestures[np.argmax(result)])
                elif (
                    np.unique(smoothed_predictions)[-1] == np.argmax(result)
                    and result[np.argmax(result)] > threshold
                ):
                    current_gesture.append(gestures[np.argmax(result)])
                    play_text(gestures[np.argmax(result)])

                if len(current_gesture) > 1: 
                    current_gesture = current_gesture[-1:]


        new_frame_time = time.time()
        fps = 1 / (new_frame_time - prev_frame_time)
        prev_frame_time = new_frame_time
        fps_text = f"FPS: {int(fps)}"
        cv2.putText(input_image, fps_text, (input_frame.shape[1] - 80, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
        # Display the recognized gesture with a background
        gesture_text = ' '.join(current_gesture)
        (text_width, text_height), _ = cv2.getTextSize(gesture_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
        cv2.rectangle(input_image, (5, 35 - text_height - 10), (5 + text_width + 10, 35 + 10), (0, 20, 0), -1)
        cv2.putText(input_image, gesture_text, (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # Show
        cv2.imshow('Sign Language Recognition', input_image)
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    mixer.quit()
    pygame.quit()

    cap.release()
    cv2.destroyAllWindows()

Error in play_and_delete_audio: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\dangl\\AppData\\Local\\Temp\\tmp005dtie9.mp3'
Error in play_and_delete_audio: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\dangl\\AppData\\Local\\Temp\\tmp662x7mpf.mp3'
Error in play_and_delete_audio: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\dangl\\AppData\\Local\\Temp\\tmp621alokp.mp3'
Error in play_and_delete_audio: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\dangl\\AppData\\Local\\Temp\\tmpgaps6w5f.mp3'
Error in play_and_delete_audio: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\dangl\\AppData\\Local\\Temp\\tmp1fcfdod4.mp3'
Error in play_and_delete_audio: [WinError 32] The process cannot access the file