## 1. Install & Import Depencies

In [4]:
!pip install mediapipe



In [3]:
import cv2
import numpy as np
import os
import matplotlib.pyplot as plt
import time
import mediapipe as mp # library with model for face and hand catpures

## 2. Initialise Model and Detection Function

In [4]:
mp_holistic = mp.solutions.holistic # holistic model
mp_drawing = mp.solutions.drawing_utils # drawing utilities

In [5]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
    image.flags.writeable = False # saving memory
    
    results = model.process(image)
    
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    return image, results

In [6]:
def draw_landmarks(image, results): # landmark is a point on the body, face, or hand
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

## 3. Video Capture

In [7]:
cap = cv2.VideoCapture(0)

with mp_holistic.Holistic(min_detection_confidence=0.7, min_tracking_confidence=0.7) as holistic: # set mediapipe model
    while cap.isOpened():
        # capture video frame
        ret, frame = cap.read()
        frame = cv2.flip(frame, 1)
        
        # make face, hand, and body detections
        image, results = mediapipe_detection(frame, holistic)
        
        # draw detections
        draw_landmarks(image, results)
        
        # display frame
        cv2.imshow("Webcam Feed", image)

        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()

## 4. Extract Landmarks

In [8]:
def extract_landmarks(results):
    try:
        p_landmarks = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten()
    except:
        p_landmarks = np.zeros(33*4) # 33 landmarks * 4 coords (x,y,z,vis)

    try:
        f_landmarks = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten()
    except:
        f_landmarks = np.zeros(468*3) # 468 landmarks * 3 coords (x,y,z)

    try:
        lh_landmarks = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten()
    except:
        lh_landmarks = np.zeros(21*3) # 21 landmarks * 3 coords (x,y,z)

    try:
        rh_landmarks = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten()
    except:
        rh_landmarks = np.zeros(21*3) # 21 landmarks * 3 coords (x,y,z)
        
    return np.concatenate([p_landmarks, f_landmarks, lh_landmarks, rh_landmarks])

## 5. Dataset

In [11]:
DATA_PATH = os.path.join("MP_Data") # path for exported data - numpy arrays

actions = np.array(["hello", "thanks", "are_you_okay"])

no_sequences = 40 # 30 videos for each action
sequence_length = 30 # 30 frames for action

In [13]:
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [14]:
cap = cv2.VideoCapture(0)

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: # set mediapipe model
    for action in actions: # for each action
        for sequence in range(no_sequences): # for each 30 videos
            for frame_num in range(sequence_length): # for each 30 frames
                ret, frame = cap.read()
                frame = cv2.flip(frame, 1)

                # make face, hand, and body detections
                image, results = mediapipe_detection(frame, holistic)

                # draw detections
                draw_landmarks(image, results)
                
                # easier to distinguish between recordings
                if frame_num == 0:
                    cv2.putText(image, "STARTING COLLECTION", (120, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, "Collecting frames for {} video number {}".format(action, sequence), (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
                    
                    cv2.imshow("Webcam Feed", image)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image, "Collecting frames for {} video number {}".format(action, sequence), (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
                    cv2.imshow("Webcam Feed", image)
                
                keypoints = extract_landmarks(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num)) # saving data to correct file
                np.save(npy_path, keypoints)

                if cv2.waitKey(10) & 0xFF == ord("q"):
                    break

    cap.release()
    cv2.destroyAllWindows()

## 6. Process Data and Create Labels

In [15]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [16]:
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'hello': 0, 'thanks': 1, 'are_you_okay': 2}

In [18]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = [] # represent all frames for that sequence
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window) # append all videos to a sequence array - 90 videos (30 videos * 3 classes)
        labels.append(label_map[action])

In [19]:
np.array(sequences).shape # 90 videos of 30 frames with 1662 landmarks

(120, 30, 1662)

In [20]:
np.array(labels).shape # list of labels for each video (0, 0, 0, ..., 1, 1, 1, ..., 2, 2, 2, ...)

(120,)

In [21]:
X = np.array(sequences)
y = to_categorical(labels).astype(int) # turns to binary sort of list [1, 0, 0] [0, 1, 0] [0, 0, 1]

In [22]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

## 7. Build and Train LSTM Neural Network

In [23]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [24]:
log_dir = os.path.join("Logs")
tb_callback = TensorBoard(log_dir=log_dir)

In [25]:
model = Sequential()

model.add(LSTM(64, return_sequences=True, activation="relu", input_shape=(30, 1662)))
model.add(LSTM(128, return_sequences=True, activation="relu"))
model.add(LSTM(64, return_sequences=False, activation="relu")) # return sequences False because next layer isnt LSTM

model.add(Dense(64, activation="relu"))
model.add(Dense(32, activation="relu"))
model.add(Dense(actions.shape[0], activation="softmax")) # returns [0.9, 0.2, 0.1] <- why we need "to_categorical"

In [26]:
model.compile(optimizer="Adam", loss="categorical_crossentropy", metrics=["categorical_accuracy"])

In [None]:
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

In [28]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm (LSTM)                  (None, 30, 64)            442112    
_________________________________________________________________
lstm_1 (LSTM)                (None, 30, 128)           98816     
_________________________________________________________________
lstm_2 (LSTM)                (None, 64)                49408     
_________________________________________________________________
dense (Dense)                (None, 64)                4160      
_________________________________________________________________
dense_1 (Dense)              (None, 32)                2080      
_________________________________________________________________
dense_2 (Dense)              (None, 3)                 99        
Total params: 596,675
Trainable params: 596,675
Non-trainable params: 0
__________________________________________________

In [29]:
res = model.predict(X_test)

In [35]:
n = 4
print(actions[np.argmax(res[n])])
print(actions[np.argmax(y_test[n])])

hello
hello


## 8. Evaluation

In [36]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [37]:
yhat = model.predict(X_test)

In [38]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [39]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[5, 0],
        [0, 1]],

       [[3, 0],
        [0, 3]],

       [[4, 0],
        [0, 2]]], dtype=int64)

In [40]:
accuracy_score(ytrue, yhat)

1.0

## 9. Real time testing

In [None]:
# detection variables
sequence = []
sentence = []
threshold = 0.5

cap = cv2.VideoCapture(0)

with mp_holistic.Holistic(min_detection_confidence=0.7, min_tracking_confidence=0.7) as holistic: # set mediapipe model
    while cap.isOpened():
        # capture video frame
        ret, frame = cap.read()
        frame = cv2.flip(frame, 1)
        
        # make face, hand, and body detections
        image, results = mediapipe_detection(frame, holistic)
        
        # draw detections
        draw_landmarks(image, results)
        
        # create predictions
        keypoints = extract_landmarks(results)
        sequence.insert(0, keypoints)  # inputs frame data at start of sequence
        sequence = sequence[:30] # retrieving the most recent 30 frames
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0] # expected in shape (1, 30, 1662)
            
        if res[np.argmax(res)] > threshold:
            if len(sentence) > 0:
                if actions[np.argmax(res)] != sentence[-1]:
                    sentence.append(actions[np.argmax(res)])
                    print(sentence)
            else:
                sentence.append(actions[np.argmax(res)])
            
            if len(sentence) > 5:
                sentence = sentence[-5:]
        
        # display frame
        cv2.imshow("Webcam Feed", image)

        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()