In [1]:
import cv2
import numpy as np
import tensorflow as tf
import pandas as pd
import mediapipe as mp
import os

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [2]:
class MediapipeUtil:
    @classmethod
    def MediapipeDetection(cls, image, model):
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
        image.flags.writeable = False                  
        results = model.process(image)                 
        image.flags.writeable = True                    
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        return image, results

    @classmethod
    def draw_landmarks(cls, image, results):
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    
    @classmethod
    def draw_styled_landmarks(cls, image, results):
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                )  
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                )
    
    @classmethod
    def extract_keypoints(cls, results):
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.array([-2]*63)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.array([-2]*63)
        return np.concatenate([lh, rh])
    
    @classmethod
    def videoUtil(cls):
        cap = cv2.VideoCapture(0)
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
            while cap.isOpened():
                ret, frame = cap.read()

                image, results = MediapipeUtil.MediapipeDetection(frame, holistic)

                MediapipeUtil.draw_styled_landmarks(image, results)
                keypoints = MediapipeUtil.extract_keypoints(results)
                print(keypoints)

                cv2.imshow('OpenCV Feed', cv2.flip(image, 1))

                if cv2.waitKey(5) & 0xFF == 27:
                    break

        cap.release()
        cv2.destroyAllWindows()


DATA_MP
- Hello
    - Video 1
        - Frame 1
        - Frame 2
            ...
        - Frame N
    - Video 2
....

In [18]:
def createDirs(actions: list, no_videos: list):
    if len(actions) != len(no_videos):
        return False
    
    _DATA_PATH = 'DATA_MP'
    actions = np.array(actions)
    for ind, action in enumerate(actions):
        if _DATA_PATH not in os.listdir():
            os.makedirs(_DATA_PATH)
        
        if action not in os.listdir(os.path.join(_DATA_PATH)):
            videos = no_videos[ind]
            for vid in range(1, videos+1):
                try:
                    os.makedirs(os.path.join(_DATA_PATH, action, str(vid)))
                except:
                    pass
        else:
            print('Data Exist')
    return True


In [21]:
print(createDirs(['5', 'Hello'], [3, 30]))

True


In [21]:
def generateData():
    _DATA_PATH = 'DATA_MP'
    cap = cv2.VideoCapture(0)
    with mp_holistic.Holistic(min_detection_confidence=0.5,
                              min_tracking_confidence=0.5) as holistic:
        for ind, action in enumerate(actions):
            videos = no_videos[ind]

            # for each video in an action
            for vid in range(1, videos+1):
                # for each frame in an video
                for frame_num in range(1, frames[ind] + 1):
                    # Capture frame
                    ret, image = cap.read()
                    image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                    keypoints = MediapipeUtil.extract_keypoints(results)
                    MediapipeUtil.draw_styled_landmarks(results)

                    # for static images:
                    npy_path = os.path.join(_DATA_PATH, action, str(vid), str(frame_num))
                    np.save(npy_path, keypoints)

                    # for dynamic signs:
                    if frame_num == 0:
                        cv2.putText(image, f'START Collecting frames for {action} Video Number {vid}', (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        cv2.imshow('STARTING COLLECTION', image) # show image
                        cv2.waitKey(2000) # wait for 2 seconds
                    else:
                        cv2.putText(image, f'Collecting frames for {action} Video Number {vid}', (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                    keypoints = MediapipeUtil.extract_keypoints(results)
                    MediapipeUtil.draw_styled_landmarks(image, results)

                    cv2.imshow('Feed', image)



Data Exist
Data Exist
True


In [24]:
_DATA_PATH = 'DATA_MP'
cap = cv2.VideoCapture(0)
actions = ['5', 'Hello']
no_videos = [3, 30]
no_frames = [30, 30]
isDynamic = [False, True]

with mp_holistic.Holistic(min_detection_confidence=0.5,
                              min_tracking_confidence=0.5) as holistic:
        for ind, action in enumerate(actions):
            videos = no_videos[ind]

            # for each video in an action
            for vid in range(1, videos+1):
                # for each frame in an video
                for frame_num in range(1, no_frames[ind] + 1):
                    # Capture frame
                    ret, image = cap.read()
                    
                    # for static signs
                    if not isDynamic[ind]:
                        image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                        keypoints = MediapipeUtil.extract_keypoints(results)
                        MediapipeUtil.draw_styled_landmarks(image, results)

                        image = cv2.flip(image, 1)
                        cv2.putText(image, f'Collecting frames for {action} Video Number {vid}', (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        cv2.imshow('Feed', image)

                        # for static images:
                        npy_path = os.path.join(_DATA_PATH, action, str(vid), str(frame_num))
                        np.save(npy_path, keypoints)

                    # for dynamic signs:
                    else:
                        image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                        keypoints = MediapipeUtil.extract_keypoints(results)
                        MediapipeUtil.draw_styled_landmarks(image, results)

                        image = cv2.flip(image, 1)
                        if frame_num == 1:
                            cv2.putText(image, f'START Collecting frames for {action} Video Number {vid}', (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                            cv2.imshow('Feed', image) # show image
                            cv2.waitKey(2000) # wait for 2 seconds
                        else:
                            cv2.putText(image, f'Collecting frames for {action} Video Number {vid}', (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                            cv2.imshow('Feed', image)

                        npy_path = os.path.join(_DATA_PATH, action, str(vid), str(frame_num))
                        np.save(npy_path, keypoints)

                    if cv2.waitKey(5) & 0xFF == 27:
                        break

cap.release()
cv2.destroyAllWindows()

In [23]:
cap.release()
cv2.destroyAllWindows()

In [68]:
class DataGenerator:
    _DATA_PATH = 'DATA_MP'

    def __init__(self, actions = [], videos = [], isDynamic = [], no_of_frames = 30):
        self.actions = actions
        self.no_videos = videos
        self.no_frames = no_of_frames
        self.isDynamic = isDynamic
    
    def createDirs(self):
        if len(self.actions) != len(self.no_videos):
            return False
        
        self.actions = np.array(self.actions)
        for ind, action in enumerate(self.actions):
            if _DATA_PATH not in os.listdir():
                os.makedirs(_DATA_PATH)
            
            if action not in os.listdir(os.path.join(_DATA_PATH)):
                videos = self.no_videos[ind]
                for vid in range(1, videos+1):
                    try:
                        os.makedirs(os.path.join(_DATA_PATH, action, str(vid)))
                    except:
                        pass
            else:
                print('Data Exist')
        return True
    
    def createData(self):
        cap = cv2.VideoCapture(0)

        with mp_holistic.Holistic(min_detection_confidence=0.5,
                                    min_tracking_confidence=0.5) as holistic:
                for ind, action in enumerate(self.actions):
                    videos = self.no_videos[ind]

                    # for each video in an action
                    for vid in range(1, videos+1):
                        # for each frame in an video
                        for frame_num in range(1, self.no_frames + 1):
                            # Capture frame
                            ret, image = cap.read()
                            
                            # for static signs
                            if not self.isDynamic[ind]:
                                image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                                keypoints = MediapipeUtil.extract_keypoints(results)
                                MediapipeUtil.draw_styled_landmarks(image, results)

                                image = cv2.flip(image, 1)
                                cv2.putText(image, f'Collecting frames for {action} Video Number {vid}', (15,12), 
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                                cv2.imshow('Feed', image)

                                # for static images:
                                npy_path = os.path.join(_DATA_PATH, action, str(vid), str(frame_num))
                                np.save(npy_path, keypoints)

                            # for dynamic signs:
                            else:
                                image, results = MediapipeUtil.MediapipeDetection(image, holistic)
                                keypoints = MediapipeUtil.extract_keypoints(results)
                                MediapipeUtil.draw_styled_landmarks(image, results)

                                image = cv2.flip(image, 1)
                                if frame_num == 1:
                                    cv2.putText(image, f'START Collecting frames for {action} Video Number {vid}', (15,12), 
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                                    cv2.imshow('Feed', image) # show image
                                    cv2.waitKey(2000) # wait for 2 seconds
                                else:
                                    cv2.putText(image, f'Collecting frames for {action} Video Number {vid}', (15,12), 
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                                    cv2.imshow('Feed', image)

                                npy_path = os.path.join(_DATA_PATH, action, str(vid), str(frame_num))
                                np.save(npy_path, keypoints)

                            if cv2.waitKey(5) & 0xFF == 27:
                                break

        cap.release()
        cv2.destroyAllWindows()

    @classmethod
    def loadData(cls):
        sequences, labels = [], []
        actions = os.listdir(_DATA_PATH)
        label_map = {label: num for num, label in enumerate(actions)}
            
        for ind, action in enumerate(actions):
            no_videos = len(os.listdir(os.path.join(_DATA_PATH, action)))
            for vid in range(1, no_videos+1):
                no_frames = len(os.listdir(os.path.join(_DATA_PATH, action, str(vid))))
                window = []
                for frame_num in range(1, no_frames+1):
                    res = np.load(os.path.join(_DATA_PATH, action, str(vid), "{}.npy".format(frame_num)))
                    window.append(res)
                sequences.append(window)
                labels.append(label_map[action])

        return sequences, labels, actions      


3

In [43]:
os.listdir(os.path.join(_DATA_PATH, '5'))

['1', '2', '3']

In [33]:
cap.release()
cv2.destroyAllWindows()

In [31]:
create_data = DataGenerator(['Thanks'], [30], [True])
create_data.createDirs()

Data Exist


True

In [32]:
create_data.createData()

In [34]:
# class LSTMModel:
#     def __init__(self):
#         from tensorflow.keras.callbacks import TensorBoard
#         log_dir = os.path.join('Logs')
#         self.tb_callback = TensorBoard(log_dir=log_dir)

#     def getModel(self, load_from_file = None):
#         if not load_from_file:
#             from tensorflow.keras.models import Sequential
#             from tensorflow.keras.layers import LSTM, Dense

#             self.model = Sequential()
#             self.model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,126)))
#             self.model.add(LSTM(128, return_sequences=True, activation='relu'))
#             self.model.add(LSTM(64, return_sequences=False, activation='relu'))
#             self.model.add(Dense(64, activation='relu'))
#             self.model.add(Dense(32, activation='relu'))
#             self.model.add(Dense(np.array(actions).shape[0], activation='softmax'))

#             self.model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
        
#         else:
#             self.model = tf.keras.load_model(load_from_file)
    
#     def trainModel(self, X_train, y_train, epochs = 1000):
#         history = self.model.fit(X_train, y_train, epochs=epochs, callbacks=[self.tb_callback])
        
#     def predict(self, y):
#         return self.model.predict(y)

Loading Data and testing the model

In [69]:
sequences, labels, actions = DataGenerator.loadData()
actions

['5', 'Hello', 'Thanks']

In [70]:
actions = np.array(actions)
actions.shape[0]

3

In [71]:
np.array(sequences).shape

(63, 30, 126)

In [72]:
np.array(labels).shape

(63,)

In [73]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [74]:
X = np.array(sequences)
X.shape

(63, 30, 126)

In [75]:
y = to_categorical(labels).astype(int)

In [76]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [77]:
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

(56, 30, 126)
(7, 30, 126)
(56, 3)
(7, 3)


In [78]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
log_dir = os.path.join('Logs')

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,126)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [79]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [80]:
tb_callback = TensorBoard(log_dir=log_dir)
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

Epoch 1/2000
Epoch 2/2000
Epoch 3/2000
Epoch 4/2000
Epoch 5/2000
Epoch 6/2000
Epoch 7/2000
Epoch 8/2000
Epoch 9/2000
Epoch 10/2000
Epoch 11/2000
Epoch 12/2000
Epoch 13/2000
Epoch 14/2000
Epoch 15/2000
Epoch 16/2000
Epoch 17/2000
Epoch 18/2000
Epoch 19/2000
Epoch 20/2000
Epoch 21/2000
Epoch 22/2000
Epoch 23/2000
Epoch 24/2000
Epoch 25/2000
Epoch 26/2000
Epoch 27/2000
Epoch 28/2000
Epoch 29/2000
Epoch 30/2000
Epoch 31/2000
Epoch 32/2000
Epoch 33/2000
Epoch 34/2000
Epoch 35/2000
Epoch 36/2000
Epoch 37/2000
Epoch 38/2000
Epoch 39/2000
Epoch 40/2000
Epoch 41/2000
Epoch 42/2000
Epoch 43/2000
Epoch 44/2000
Epoch 45/2000
Epoch 46/2000
Epoch 47/2000
Epoch 48/2000
Epoch 49/2000
Epoch 50/2000
Epoch 51/2000
Epoch 52/2000
Epoch 53/2000
Epoch 54/2000
Epoch 55/2000
Epoch 56/2000
Epoch 57/2000
Epoch 58/2000
Epoch 59/2000
Epoch 60/2000
Epoch 61/2000
Epoch 62/2000
Epoch 63/2000
Epoch 64/2000
Epoch 65/2000
Epoch 66/2000
Epoch 67/2000
Epoch 68/2000
Epoch 69/2000
Epoch 70/2000
Epoch 71/2000
Epoch 72/2000
E

<keras.callbacks.History at 0x196a5d5dbb0>

In [82]:
res = model.predict(X_train)
print(actions[np.argmax(res[4])])

Hello


In [85]:
actions[np.argmax(y_test[4])]

'Hello'

In [86]:
model.save('action_5,Hello,Thanks.h5')

In [87]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [88]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[3, 0],
        [0, 4]],

       [[4, 0],
        [0, 3]]], dtype=int64)

In [89]:
accuracy_score(ytrue, yhat)

1.0

In [91]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame


In [92]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = MediapipeUtil.MediapipeDetection(frame, holistic)
        # print(results)
        
        # Draw landmarks
        MediapipeUtil.draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = MediapipeUtil.extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == 27:
            break
    cap.release()
    cv2.destroyAllWindows()

Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
Hello
5
Hello
Hello
Hello
Hello
Hello
Thanks
Thanks
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
5
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Hello
Thanks
Thanks
Hello
Hello
Hello
Hello
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thanks
Thank

KeyboardInterrupt: 

In [93]:
cap.release()
cv2.destroyAllWindows()