In [156]:
import mediapipe as mp
import cv2
import numpy as np
import os
import uuid

In [157]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False # Image is no longer writeable
    results = model.process(image) # Make prediction
    image.flags.writeable = True # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )
    
    
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    
    # return np.concatenate([lh, rh])
    return np.concatenate([pose, face, lh, rh])

In [161]:
# Path for exported data, numpy arrays

# DATA_PATH = os.path.join("Dataset")
# DATA_PATH = os.path.join("Dataset2")
DATA_PATH = os.path.join("Landmarks-Dataset")

# Actions that we try to detect
# actions = np.array(['Yellow'])
# actions = np.array(['White','Gray','Black','Goodbye','Hello'])
actions = np.array(['a','aa','au'])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
# sequence_length = frame_count

# Folder start
start_folder = 1

In [162]:
for action in actions:
    # dirmax = np.max(np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int))
    for sequence in range(1, no_sequences+1):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
            # os.makedirs(os.path.join(DATA_PATH, action, str(dirmax + sequence)))
        except:
            pass

In [163]:
try:
    # cap = cv2.VideoCapture('./Dataset/{}/{} - {}.mp4'.format(action,action,sequence))
    # frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    # # Thirty videos worth of data
    # sequence_length = frame_count
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        
        # NEW LOOP
        # Loop through actions
        for action in actions:
        #     # Loop through sequences aka videos
            for sequence in range(1,no_sequences+1):
                cap = cv2.VideoCapture('./Dataset/{}/{} - {}.mp4'.format(action,action,sequence))
                frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
                # Thirty videos worth of data
                # sequence_length = frame_count
        #         # Loop through video length aka sequence length
                for frame_num in range(int(frame_count)):
                # for frame_num in range(50):

        # Read feed
                    # cap = cv2.VideoCapture('./Dataset/{}/{} - {}.mp4'.format(action,action,sequence))
                    ret, frame = cap.read()

                    # Make detections
                    image, results = mediapipe_detection(frame, holistic)

                    # Draw landmarks
                    draw_styled_landmarks(image, results)
                    
                    # NEW Apply wait logic
                    if frame_num == 1: 
                    #     cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                    #             cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    #     cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                    #             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    #     # Show to screen
                        cv2.imshow('OpenCV Feed', image)
                        # cv2.waitKey(1500)
                    else: 
                    #     cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                    #             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    #     # Show to screen
                        cv2.imshow('OpenCV Feed', image)
                    
                    # NEW Export keypoints
                    keypoints = extract_keypoints(results)
                    npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                    np.save(npy_path, keypoints)

                                # Break gracefully
                    if cv2.waitKey(10) & 0xFF == ord('q'):
                        break
                        
        cap.release()
        cv2.destroyAllWindows()
        
finally:
    cap.release()
    cv2.destroyAllWindows()

In [164]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [165]:
label_map = {label: num for num, label in enumerate(actions)}
print(label_map)

{'a': 0, 'aa': 1, 'au': 2}


In [166]:
dir_path=DATA_PATH

In [167]:
# sequences, labels = [], []
# for action in actions:
#     for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
#         window = []
#         for frame_num in range(1,sequence_length+1):
#             res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
#             window.append(res)
#         sequences.append(window)
#         labels.append(label_map[action])

# sequences, labels = [], []
# for action in actions:
#     for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
#         window = []
#         num_frames = len(os.listdir(os.path.join(DATA_PATH, action, str(sequence))))
#         for frame_num in range(num_frames):
#             res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
#             window.append(res)
#         sequences.append(window)
#         labels.append(label_map[action])

sequences, labels = [], []
max_length = 0
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        num_frames = len(os.listdir(os.path.join(DATA_PATH, action, str(sequence))))
        if num_frames > max_length:
            max_length = num_frames
        for frame_num in range(num_frames):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

# Pad sequences to maximum length
for i, seq in enumerate(sequences):
    seq_length = len(seq)
    if seq_length < max_length:
        num_padding_frames = max_length - seq_length
        padding_frames = [np.zeros_like(seq[0]) for _ in range(num_padding_frames)]
        sequences[i] = seq + padding_frames

sequences = np.array(sequences)




In [168]:
len(sequences)

90

In [169]:
np.array(sequences).shape

(90, 82, 1662)

In [170]:
np.array(labels).shape

(90,)

In [171]:
x = np.asarray(sequences)
y = to_categorical(labels).astype(int)

In [172]:
x.shape

(90, 82, 1662)

In [173]:
y

array([[1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0,

In [174]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.05)

In [175]:
x_train.shape

(85, 82, 1662)

In [176]:
y_test.shape

(5, 3)

In [177]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [178]:
log_dir = os.path.join('Logs')
# tb_callback = TensorBoard(log_dir=log_dir)

In [179]:
actions

array(['a', 'aa', 'au'], dtype='<U2')

In [186]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(82,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [187]:
from keras.utils import plot_model
plot_model(model, to_file='model.png')

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model/model_to_dot to work.


In [188]:
actions

array(['a', 'aa', 'au'], dtype='<U2')

In [189]:
x.shape

(90, 82, 1662)

In [190]:
model.compile(optimizer='Adam',loss='categorical_crossentropy',metrics=['categorical_accuracy'])

In [191]:
model.fit(x_train,y_train,epochs=200)

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

<keras.callbacks.History at 0x22329f3ebb0>

In [192]:
model.summary()

Model: "sequential_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_21 (LSTM)              (None, 82, 64)            442112    
                                                                 
 lstm_22 (LSTM)              (None, 82, 128)           98816     
                                                                 
 lstm_23 (LSTM)              (None, 64)                49408     
                                                                 
 dense_21 (Dense)            (None, 64)                4160      
                                                                 
 dense_22 (Dense)            (None, 32)                2080      
                                                                 
 dense_23 (Dense)            (None, 3)                 99        
                                                                 
Total params: 596,675
Trainable params: 596,675
Non-tr

In [193]:
res = model.predict(x_test)



In [194]:
actions[np.argmax(res[0])]

'aa'

In [195]:
actions[np.argmax(y_test[0])]

'au'

In [202]:
model.save('IndHandSigns.h5')

In [203]:
model.load_weights('IndHandSigns.h5')

In [204]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [205]:
ypred = model.predict(x_test)
ytrue = np.argmax(y_test, axis=1).tolist()
ypred = np.argmax(ypred, axis=1).tolist()



In [206]:
multilabel_confusion_matrix(ytrue, ypred)

array([[[3, 1],
        [1, 0]],

       [[1, 2],
        [2, 0]],

       [[2, 1],
        [1, 1]]], dtype=int64)

In [207]:
accuracy_score(ytrue, ypred)

0.2

In [208]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.7

try:
    cap = cv2.VideoCapture(0)
    # Set mediapipe model
    with mp_holistic.Holistic(
        min_detection_confidence=0.6, min_tracking_confidence=0.6
    ) as holistic:
        while cap.isOpened():

            # Read camera
            ret, frame = cap.read()

            # Make detections
            image, results = mediapipe_detection(frame, holistic)
            # print(results)

            # Draw landmarks
            draw_styled_landmarks(image, results)

            # 2. Prediction logic
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]

            if len(sequence) == 30:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                # print(actions[np.argmax(res)])
                predictions.append(np.argmax(res))

                # 3. Vizualization logic
                if np.unique(predictions[-1:])[0] == np.argmax(res):
                    if res[np.argmax(res)] > threshold:

                        word = ""
                        if len(sentence) > 0:
                            if actions[np.argmax(res)] != sentence[-1]:
                                sentence.append(actions[np.argmax(res)])
                                word = actions[np.argmax(res)]
                        else:
                            sentence.append(actions[np.argmax(res)])
                            word = actions[np.argmax(res)]

                        # engine = pyttsx3.init()
                        # engine.say(word)
                        # # play the speech
                        # engine.runAndWait()

                if len(sentence) > 1:
                    sentence = sentence[-1:]

                # Probabilities Vizualization
                # image = probability_vizulization(res, actions, image, colors)
                
                # cv2.putText(image, 'PROB', (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
                # cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)],2)) , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

            cv2.rectangle(image, (0, 0), (640, 40), (245, 117, 16), -1)
            cv2.putText(
                image,
                " ".join(sentence),
                (3, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (255, 255, 255),
                2,
                cv2.LINE_AA,
            )
            
            # Flip the image
            # image = cv2.flip(image, 1)

            # Show to screen
            cv2.imshow("OpenCV Feed", image)

            # Break gracefully
            if cv2.waitKey(10) & 0xFF == ord("q"):
                break
        cap.release()
        cv2.destroyAllWindows()

finally:
    cap.release()
    cv2.destroyAllWindows()

