In [None]:
import os
import time
import numpy as np
import cv2
import mediapipe as mp
import tensorflow as tf
from keras.utils import to_categorical

In [None]:
# To create the folder for storing the MediaPipe Keypoints data
try:
    os.makedirs(os.path.join('MP_Data'))
except:
    pass

DATA_PATH = os.path.join('MP_Data')

In [None]:
no_sequences = 30  # no of clips per sign
sequence_length = 30  # no of frames per clip
signs = []

In [None]:
# MediaPipe Models for Landmark Detection
mp_holistic = mp.solutions.holistic  # Holistic model
mp_drawing = mp.solutions.drawing_utils  # Drawing utilities

In [None]:
class BreakIt(Exception):
    """Custom exception to break out of nested loops"""
    pass

In [None]:
def create_folders(signs, no_sequences):
    """To create folder for each new sign"""
    for sign in signs:
        for sequence in range(no_sequences):
            try:
                os.makedirs(os.path.join(DATA_PATH, sign, str(sequence)))
            except:
                pass

In [None]:
def mediapipe_detection(image, model):
    """To process each frame/image & detect using MediaPipe model"""

    # COLOR CONVERSION BGR 2 RGB
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable

    # COLOR CONVERSION RGB 2 BGR
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    return image, results

In [None]:
def draw_styled_landmarks(image, results):
    """To draw the landmark points & connection lines"""
    # Draw Face Connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
                              mp_drawing.DrawingSpec(
                                  color=(80, 110, 10), thickness=1, circle_radius=1),
                              mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))

    # Draw Pose Connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(
                                  color=(80, 22, 10), thickness=2, circle_radius=4),
                              mp_drawing.DrawingSpec(color=(80, 44, 121), thickness=2, circle_radius=2))

    # Draw Left Hand Connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(
                                  color=(121, 22, 76), thickness=2, circle_radius=4),
                              mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=2, circle_radius=2))

    # Draw Right Hand Connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(
                                  color=(245, 117, 66), thickness=2, circle_radius=4),
                              mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))

In [None]:
def extract_keypoints(results):
    """To extract the keypoint values after MediaPipe Detection"""
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten(
    ) if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten(
    ) if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten(
    ) if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten(
    ) if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [None]:
def capture_signs():
    """To capture clips for sign(s) using OpenCV"""
    cap = cv2.VideoCapture(0)
    try:
        # Set mediapipe model
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

            for sign in signs:
                for sequence in range(no_sequences):
                    for frame_num in range(sequence_length):

                        # Read feed
                        ret, frame = cap.read()
                        frame = cv2.flip(frame, 1)

                        # Make detections
                        image, results = mediapipe_detection(frame, holistic)

                        # Draw landmarks
                        draw_styled_landmarks(image, results)

                        # Text & Wait logic
                        if frame_num == 0:
                            cv2.putText(image, 'STARTING COLLECTION', (120, 200),
                                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 4, cv2.LINE_AA)
                            cv2.putText(image, 'Collecting frames for {} sequence {}'.format(sign, (sequence+1)), (15, 12),
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                            # Show to screen
                            cv2.imshow('OpenCV Feed', image)
                            cv2.waitKey(1000)
                        else:
                            cv2.putText(image, 'Collecting frames for {} sequence {}'.format(sign, (sequence+1)), (15, 12),
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                            # Show to screen
                            cv2.imshow('OpenCV Feed', image)

                        # Export keypoints
                        keypoints = extract_keypoints(results)
                        npy_path = os.path.join(
                            DATA_PATH, sign, str(sequence), str(frame_num))
                        np.save(npy_path, keypoints)

                        # Break gracefully
                        if cv2.waitKey(10) & 0xFF == ord('q'):
                            raise BreakIt
            cap.release()
            cv2.destroyAllWindows()
    except BreakIt:
        cap.release()
        cv2.destroyAllWindows()
        pass

In [None]:
def preprocess():
    """To preprocess data & create labels and features"""
    signs = np.array([name for name in os.listdir('./'+DATA_PATH)])
    print(signs)
    label_map = {label: num for num, label in enumerate(signs)}
    sequences, labels = [], []

    for sign in signs:
        for sequence in range(no_sequences):
            window = []
            for frame_num in range(sequence_length):
                res = np.load(os.path.join(DATA_PATH, sign, str(
                    sequence), "{}.npy".format(frame_num)))
                window.append(res)
            sequences.append(window)
            labels.append(label_map[sign])

    np.save('sequences', sequences)
    np.save('labels', labels)

In [None]:
temp = []

no_of_signs = int(input('Enter the no. of signs to be added--> '))
for i in range(no_of_signs):
    sign_name = input(f'Enter the name of sign {(i+1)}--> ')
    temp.append(sign_name)
signs = np.array(temp)

create_folders(signs, no_sequences)
print(
    '\n*********** Starting SIGN INPUT COLLECTION for in 5 secs ***********\n')
time.sleep(5)
capture_signs(signs)

In [None]:
if len(signs) > 0:
    preprocess()

In [None]:
sequences = np.load('sequences.npy')
labels = np.load('labels.npy')

In [None]:
X = np.array(sequences)
Y = to_categorical(labels).astype(int)

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Bidirectional, Dropout, Conv1D, MaxPooling1D

In [None]:
model = Sequential()
model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(30, 1662)))
model.add(MaxPooling1D(pool_size=2))
model.add(Bidirectional(LSTM(64, return_sequences=True, activation='relu')))
model.add(Bidirectional(LSTM(128, return_sequences=True, activation='relu')))
model.add(Dropout(0.2))
model.add(Bidirectional(LSTM(64, return_sequences=False, activation='relu')))
model.add(Dropout(0.2))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(signs.shape[0], activation='softmax'))

In [None]:
from keras.optimizers import Adam

optimizer = Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer, loss='categorical_crossentropy',
              metrics=['categorical_accuracy'])

In [None]:
model.fit(X, Y, epochs=2000)

In [None]:
colors = [(47, 75, 124), (160, 81, 149), (249, 93, 106), (255, 166, 0), (0, 63, 92), (102, 81, 145), (212, 80, 135),
          (255, 124, 67)]

In [None]:
def prob_viz(res, input_frame):
    """To visualize the detection probability"""
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0, 60+num*40),
                      (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, signs[num], (0, 85+num*40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    return output_frame

In [None]:
sequence = []
sentence = []
threshold = 0.95

In [None]:
cap = cv2.VideoCapture(0)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        frame = cv2.flip(frame, 1)

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_styled_landmarks(image, results)

        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]

        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(signs[np.argmax(res)])

        # 3. Viz logic
            if res[np.argmax(res)] > threshold:
                if len(sentence) > 0:
                    if signs[np.argmax(res)] != sentence[-1]:
                        sentence.append(signs[np.argmax(res)])
                else:
                    sentence.append(signs[np.argmax(res)])

            if len(sentence) > 5:
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, image)

        cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # Show to screen
        cv2.imshow('SLR Demo', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()