In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

import tensorflow as tf
from tensorflow import keras,lite

from keras.models import load_model
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import TensorBoard

from sklearn.model_selection import train_test_split
from keras.utils import to_categorical

from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [2]:
tf.version.VERSION

'2.10.0'

In [3]:
mp_hands = mp.solutions.hands # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results


def draw_styled_landmarks(image, results):
    # Draw left hand connections
    if not results.multi_hand_landmarks:
        return
    for hand_landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    
def draw_landmarks(image, results):
    if not results.multi_hand_landmarks:
        return
    for hand_landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS) # Draw left hand connections
    
def extract_keypoints(results):
    lh = np.array([[res.landmark[point].x, res.landmark[point].y, res.landmark[point].z] for point in mp_hands.HandLandmark for res in results.multi_hand_landmarks], dtype=float).flatten() if results.multi_hand_landmarks else np.zeros(21*3*2)
    if lh.shape[0] == 63:
        rh = np.array([[res.landmark[point].x, res.landmark[point].y, res.landmark[point].z] for point in mp_hands.HandLandmark for res in results.multi_hand_landmarks], dtype=float).flatten() if results.multi_hand_landmarks else np.zeros(21*3)
        return np.concatenate([lh, rh])
    # rh = np.array([[res.landmark[point].x, res.landmark[point].y, res.landmark[point].z] for point in mp_hands.HandLandmark for res in results.multi_hand_landmarks], dtype=float).flatten() if results.multi_hand_landmarks else np.zeros(21*3*2)
    # return lh
    return np.concatenate([lh])


In [21]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data') 

# Actions that we try to detect
actions = np.array(['hello', 'hii', 'by'])

# Thirty videos worth of data
no_sequences = 30
# no_sequences = 10

# Videos are going to be 30 frames in length
sequence_length = 30

# Folder start
start_folder = 0

In [22]:
for action in actions: 
    for sequence in range(0,no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [23]:
try:
    cap = cv2.VideoCapture(0)
    # Set mediapipe model 
    with mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:
        
        # NEW LOOP

        # Loop through actions
        for action in actions:
            # Loop through sequences aka videos
            for sequence in range(no_sequences):
                # Loop through video length aka sequence length
                for frame_num in range(sequence_length):

                    # Read feed
                    ret, frame = cap.read()

                    # Make detections
                    image, results = mediapipe_detection(frame, hands)

                    # Draw landmarks
                    draw_styled_landmarks(image, results)
                    
                    # NEW Apply wait logic
                    if frame_num == 0: 
                        cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                        cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        # Show to screen
                        cv2.imshow('OpenCV Feed', image)
                        cv2.waitKey(500)
                    else: 
                        cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        # Show to screen
                        cv2.imshow('OpenCV Feed', image)
                    
                    # NEW Export keypoints
                    keypoints = extract_keypoints(results)
                    # print(keypoints.shape)
                    npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                    np.save(npy_path, keypoints)

                    # Break gracefully
                    if cv2.waitKey(10) & 0xFF == ord('q'):
                        break
                        
        cap.release()
        cv2.destroyAllWindows()

finally:
    # print("error")
    cap.release()
    cv2.destroyAllWindows()

In [87]:
cap.release()
cv2.destroyAllWindows()

In [24]:
label_map = {label: num for num, label in enumerate(actions)}
print(label_map)

{'hello': 0, 'hii': 1, 'by': 2}


In [25]:
sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])


In [26]:
# print(sequences)
print(np.array(sequences).shape)
print(np.array(labels).shape)
X = np.array(sequences)
print(X.shape)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)
print(y_test.shape)

(30, 30, 126)
(30,)
(30, 30, 126)
(2, 3)


In [27]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(sequence_length,X.shape[2])))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [245]:
# del model

In [28]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_3 (LSTM)               (None, 30, 64)            48896     
                                                                 
 lstm_4 (LSTM)               (None, 30, 128)           98816     
                                                                 
 lstm_5 (LSTM)               (None, 64)                49408     
                                                                 
 dense_3 (Dense)             (None, 64)                4160      
                                                                 
 dense_4 (Dense)             (None, 32)                2080      
                                                                 
 dense_5 (Dense)             (None, 3)                 99        
                                                                 
Total params: 203,459
Trainable params: 203,459
Non-tr

In [29]:
model.fit(X_train, y_train, epochs=60, callbacks=[tb_callback])


Epoch 1/60
Epoch 2/60
Epoch 3/60
Epoch 4/60
Epoch 5/60
Epoch 6/60
Epoch 7/60
Epoch 8/60
Epoch 9/60
Epoch 10/60
Epoch 11/60
Epoch 12/60
Epoch 13/60
Epoch 14/60
Epoch 15/60
Epoch 16/60
Epoch 17/60
Epoch 18/60
Epoch 19/60
Epoch 20/60
Epoch 21/60
Epoch 22/60
Epoch 23/60
Epoch 24/60
Epoch 25/60
Epoch 26/60
Epoch 27/60
Epoch 28/60
Epoch 29/60
Epoch 30/60
Epoch 31/60
Epoch 32/60
Epoch 33/60
Epoch 34/60
Epoch 35/60
Epoch 36/60
Epoch 37/60
Epoch 38/60
Epoch 39/60
Epoch 40/60
Epoch 41/60
Epoch 42/60
Epoch 43/60
Epoch 44/60
Epoch 45/60
Epoch 46/60
Epoch 47/60
Epoch 48/60
Epoch 49/60
Epoch 50/60
Epoch 51/60
Epoch 52/60
Epoch 53/60
Epoch 54/60
Epoch 55/60
Epoch 56/60
Epoch 57/60
Epoch 58/60
Epoch 59/60
Epoch 60/60


<keras.callbacks.History at 0x2c201aebfa0>

In [31]:
res = model.predict(X_test)
res



array([[5.8107585e-10, 9.9995816e-01, 4.1829247e-05],
       [1.0000000e+00, 0.0000000e+00, 0.0000000e+00]], dtype=float32)

In [37]:
position = 1
print(actions[np.argmax(res[position])])
print(actions[np.argmax(y_test[position])])

hello
hello


In [38]:
model.save("action.h5")
# del model
model.load_weights("action.h5")

In [43]:
model = load_model('action.h5')

converter = lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.experimental_new_converter=True
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]

tflite_model = converter.convert()
open("action.tflite", "wb").write(tflite_model)

INFO:tensorflow:Assets written to: C:\Users\meetb\AppData\Local\Temp\tmptdfr1r91\assets


INFO:tensorflow:Assets written to: C:\Users\meetb\AppData\Local\Temp\tmptdfr1r91\assets


233952

In [39]:
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

multilabel_confusion_matrix(ytrue, yhat)



array([[[1, 0],
        [0, 1]],

       [[1, 0],
        [0, 1]]], dtype=int64)

In [40]:
accuracy_score(ytrue, yhat)

1.0