In [2]:
import numpy as np
import os
import time
import mediapipe as mp
import cv2
import matplotlib.pyplot as plt

2023-05-07 16:53:53.493715: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-07 16:53:53.529839: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-07 16:53:53.530837: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
mpipe_holistic = mp.solutions.holistic
mpipe_drawing = mp.solutions.drawing_utils

In [4]:
def mpipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    result = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    return result, image

In [5]:
def draw_landmarks(image, result):
    # For face landmarks
    mpipe_drawing.draw_landmarks(image, result.face_landmarks, mpipe_holistic.FACEMESH_CONTOURS,
                              mpipe_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=1),
                              mpipe_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1),
                             )
    
    # For pose landmarks
    mpipe_drawing.draw_landmarks(image, result.pose_landmarks, mpipe_holistic.POSE_CONNECTIONS,
                              mpipe_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=1),
                              mpipe_drawing.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1),
                             )
    
    # For left hand landmarks
    mpipe_drawing.draw_landmarks(image, result.left_hand_landmarks, mpipe_holistic.HAND_CONNECTIONS,
                              mpipe_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=4),
                              mpipe_drawing.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=2),
                             )
    
    # For right hand landmarks
    mpipe_drawing.draw_landmarks(image, result.right_hand_landmarks, mpipe_holistic.HAND_CONNECTIONS,
                              mpipe_drawing.DrawingSpec(color=(0, 0, 255), thickness=1, circle_radius=4),
                              mpipe_drawing.DrawingSpec(color=(0, 255, 255), thickness=1, circle_radius=2),
                             )    

In [6]:
def to_array_keypoints(res_landmarks):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in res_landmarks.pose_landmarks.landmark]).flatten() if res_landmarks.pose_landmarks else np.zeros(33 * 4)
    left_hand = np.array([[res.x, res.y, res.z] for res in res_landmarks.left_hand_landmarks.landmark]).flatten() if res_landmarks.left_hand_landmarks else np.zeros(21 * 3)
    right_hand = np.array([[res.x, res.y, res.z] for res in res_landmarks.right_hand_landmarks.landmark]).flatten() if res_landmarks.right_hand_landmarks else np.zeros(21 * 3)
    face = np.array([[res.x, res.y, res.z] for res in res_landmarks.face_landmarks.landmark]).flatten() if res_landmarks.face_landmarks else np.zeros(468 * 3)
    
    return np.concatenate([face, pose, left_hand, right_hand])

In [7]:
data_path = os.path.join(os.getcwd(), 'data')

actions = np.array(['hello', 'thanks', 'namaste'])

num_sequence = 30
len_sequence = 30

In [8]:
for action in actions:
    for sequence in range(num_sequence):
        try:
            os.makedirs(os.path.join(data_path, str(action), str(sequence)))
        except:
            pass

In [None]:
capture = cv2.VideoCapture(0)
with mpipe_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic:
    
    for action in actions:
        for sequence in range(num_sequence):
            for frame_idx in range(len_sequence):
                
                # frame capture
                ret, frame = capture.read()

                result, img = mpipe_detection(frame, holistic)
                draw_landmarks(img, result)
                
                if frame_idx == 0:
                    cv2.putText(img, str(action), (120, 200),
                                cv2.FONT_HERSHEY_PLAIN, 4, (0, 255, 0), 4, cv2.LINE_AA)
                
                cv2.putText(img, 'Action: {} Sequence {}'.format(action, sequence), (15, 12),
                            cv2.FONT_HERSHEY_PLAIN, 0.5, (0, 0, 255), 1, cv2.LINE_AA)

                # exporting keypoints
                np_data = to_array_keypoints(result)
                np_path = os.path.join(data_path, str(action), str(sequence), str(frame_idx))
                np.save(np_path, np_data)

                cv2.imshow("Video", img)
                
                if frame_idx == 0:
                    cv2.waitKey(2000)

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    capture.release()
    cv2.destroyAllWindows()

In [9]:
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras

In [10]:
class_map = {key:value for value, key in enumerate(actions)}
print(class_map)

{'hello': 0, 'thanks': 1, 'namaste': 2}


In [11]:
features = []
classes = []
for action in actions:
    for sequence in range(num_sequence):
        window=[]
        
        for frame_idx in range (len_sequence):
            res = np.load(os.path.join(data_path, str(action), str(sequence), "{}.npy".format(frame_idx)))
            window.append(res)
            
        features.append(window)
        classes.append(class_map[action])

In [12]:
X = np.array(features)
y = keras.utils.to_categorical(classes).astype(int)

In [13]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [14]:
X_train.shape

(85, 30, 1662)

In [15]:
y_train.shape[1]

3

In [16]:
log_path = os.path.join(os.getcwd(), 'logs')

try:
    os.mkdir(log_path)
except:
    pass

tb_callbacks = keras.callbacks.TensorBoard(log_dir=log_path)

In [17]:
from keras.layers import LSTM, Dense

model = keras.models.Sequential()
model.add(LSTM(32, return_sequences=True, activation='relu', input_shape=(30, 1662)))
# model.add(LSTM(32, return_sequences=True, activation='relu'))
model.add(LSTM(32, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

2023-05-07 16:54:10.953387: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


In [18]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [86]:
model.fit(X_train, y_train, epochs=100, callbacks=[tb_callbacks])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100


Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78/100
Epoch 79/100
Epoch 80/100
Epoch 81/100
Epoch 82/100
Epoch 83/100
Epoch 84/100
Epoch 85/100
Epoch 86/100
Epoch 87/100
Epoch 88/100
Epoch 89/100
Epoch 90/100
Epoch 91/100
Epoch 92/100
Epoch 93/100
Epoch 94/100
Epoch 95/100
Epoch 96/100
Epoch 97/100
Epoch 98/100
Epoch 99/100
Epoch 100/100


<keras.callbacks.History at 0x7f18900574c0>

In [19]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 30, 32)            216960    
                                                                 
 lstm_1 (LSTM)               (None, 32)                8320      
                                                                 
 dense (Dense)               (None, 64)                2112      
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 3)                 99        
                                                                 
Total params: 229,571
Trainable params: 229,571
Non-trainable params: 0
_________________________________________________________________


In [20]:
save_path = os.path.join(os.getcwd(), 'action.h5')

In [None]:
model.save(save_path)

In [21]:
model.load_weights(save_path)

In [22]:
test_res = model.predict(X_test)



In [23]:
for i in range(len(X_test)):
    print(actions[np.argmax(test_res[i])], actions[np.argmax(y_test[i])])

hello hello
hello hello
namaste namaste
hello hello
namaste namaste


In [24]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [25]:
ytrue = np.argmax(y_test, axis=1).tolist()
ycap = np.argmax(test_res, axis=1).tolist()

multilabel_confusion_matrix(ytrue, ycap)

array([[[2, 0],
        [0, 3]],

       [[3, 0],
        [0, 2]]])

In [26]:
accuracy_score(ytrue, ycap)

1.0

In [27]:
actions[np.argmax(test_res[0])]

'hello'

In [67]:
def visualize_prediction(pred, image):
    img = image.copy()
    colors = [(202, 180, 255), (255, 255, 0), (168, 168, 168)]
    for num, prob in enumerate(pred):
        cv2.rectangle(img, (0, 100+num*100), (int(prob * 200), 150+num*100), colors[num], -1)
        cv2.putText(img, actions[num], (8, 140+num*100),
                    cv2.FONT_HERSHEY_PLAIN, 2.5, (255, 0, 255), 2, cv2.LINE_AA)
        
    return img

In [68]:
sequence = []

capture = cv2.VideoCapture(0)

with mpipe_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic:
    while capture.isOpened():
        
        ret, frame = capture.read()
        
        result, img = mpipe_detection(frame, holistic)
        draw_landmarks(img, result)
        
        np_data = to_array_keypoints(result)
        sequence.append(np_data)
        
        if len(sequence) > 30:
            sequence = sequence[1:]
        
        if len(sequence) == 30:
            pred = model.predict(np.expand_dims(sequence, axis=0))[0]
            img = visualize_prediction(pred, img)

        cv2.imshow("Video", img)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    capture.release()
    cv2.destroyAllWindows()

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread (0x974a040).
Cannot move to target thread (0x90e4370)

QObject::moveToThread: Current thread (0x90e4370) is not the object's thread



