References: 
1. https://github.com/kinivi/tello-gesture-control
2. https://www.youtube.com/watch?v=doDUihpj6ro

In [1]:
import cv2
import numpy as np
import os
import mediapipe as mp

# 1. Setting Up Mediapipe Hands Model

In [2]:
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [3]:
def mediapipe_detection(image, model):
    
    # by default, the frame read by opencv is returned in BGR format instead of RGB
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # make the image no longer writeable prior to detection
    image.flags.writeable = False                  
    results = model.process(image)                 
    image.flags.writeable = True                   
    
    # convert back to BGR
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) 
    
    return image, results

In [4]:
# draw the hand landmarks and connections in-place (will not return a new image)
def draw_landmarks(image, results):
    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
          mp_drawing.draw_landmarks(
              image,
              hand_landmarks,
              mp_hands.HAND_CONNECTIONS,
              mp_drawing_styles.get_default_hand_landmarks_style(),
              mp_drawing_styles.get_default_hand_connections_style()
          )

# 2. Read Camera Feed with OpenCV & Perform Detection

In [16]:
# might need to change this if you have virtual devices set up for video capture
cap = cv2.VideoCapture(0)

# set camera resolution to 1280x720
# see https://stackoverflow.com/questions/11420748/setting-camera-parameters-in-opencv-python
# 3. CV_CAP_PROP_FRAME_WIDTH Width of the frames in the video stream.
# 4. CV_CAP_PROP_FRAME_HEIGHT Height of the frames in the video stream.
cap.set(3, 1280)
cap.set(4, 720)

with mp_hands.Hands(
            static_image_mode=True,
            max_num_hands=2,
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
        ) as hands:
    while cap.isOpened():

        # read the camera feed
        ret, frame = cap.read()

        # mirror the frame
        frame = cv2.flip(frame, 1)

        image, results = mediapipe_detection(frame, hands)
        
        draw_landmarks(image, results)

        # show frame to user
        cv2.imshow('OpenCV Feed', image)

        # if the q key is pressed, break
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
cap.release()
cv2.destroyAllWindows()

# 3. Extract Keypoint Values

In [5]:
# for a single hand, there are 21 landmarks (see https://google.github.io/mediapipe/solutions/hands.html)
# since we only care about x and y position of each landmark, we would need TOTAL_LANDMARKS * 2 points stored in the array for each hand
TOTAL_LANDMARKS = 21
TOTAL_POINTS = TOTAL_LANDMARKS * 2

In [6]:
# prepare the input for the neural network
def extract_keypoints(results):
    
    # initialise with all zeros
    left_hand = np.zeros(TOTAL_POINTS)
    right_hand = np.zeros(TOTAL_POINTS)
    
    if results.multi_hand_landmarks:
        
        # NOTE: 
        
        # handedness format with 2 hands,
        # [classification {
        #    index: 0
        #    score: 0.9656149
        #    label: "Left"
        #  },
        #  classification {
        #    index: 1
        #    score: 0.91598934
        #    label: "Right"
        #  }]
        
        # with only 1 hand (in this case, only a right hand),
        # [classification {
        #    index: 1
        #    score: 0.9744842
        #    label: "Right"
        #  }]
        
        for hand_landmarks, handedness in zip(results.multi_hand_landmarks, results.multi_handedness):
            
            # if there is a left hand
            if handedness.classification[0].index == 0:
                left_hand = np.array([[landmark.x, landmark.y] for landmark in hand_landmarks.landmark]).flatten()
                
            # if there is a right hand
            if handedness.classification[0].index == 1:
                right_hand = np.array([[landmark.x, landmark.y] for landmark in hand_landmarks.landmark]).flatten()
                
    return np.concatenate([left_hand, right_hand])

## Extract keypoints only for middle finger MCP position

In [6]:
def extract_keypoints(results):
    # initialise with all zeros
    # we only care about the middle finger MCP landmark, which contains x & y coordinates, so, 2 points total
    left_hand = np.zeros(2)
    right_hand = np.zeros(2)
    
    if results.multi_hand_landmarks:
        for hand_landmarks, handedness in zip(results.multi_hand_landmarks, results.multi_handedness):
            # if there is a left hand
            if handedness.classification[0].index == 0:
                left_hand = np.array([hand_landmarks.landmark[9].x, hand_landmarks.landmark[9].y])

            # if there is a right hand
            if handedness.classification[0].index == 1:
                right_hand = np.array([hand_landmarks.landmark[9].x, hand_landmarks.landmark[9].y])
    
    return np.concatenate([left_hand, right_hand])

# 4. Initial Setup for Data Collection

In [7]:
# root folder for data collection
DATA_PATH = os.path.join('MP_Data') 

# 'right swipe', 'left swipe', 'right swipe up', 'left swipe up', 'right swipe down', 'left swipe down', 'nogesture'
actions = np.array(['right swipe', 'left swipe', 'right swipe up', 'left swipe up', 'nogesture'])

# thirty videos to be recorded
no_sequences = 10

# videos are going to be 30 frames in length
sequence_length = 30

# which action that we should start from?
start_action_index = 0

# from which point should we continue the data collection?
# e.g. if there are already 30 videos, then we probably should start the data collection from video 31
# if we want to overwrite the data that we have collected so far, we can start from 0
start_folder = 0

In [22]:
for action in actions[start_action_index:]: 
    
    for sequence in range(start_folder, start_folder + no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            print(f'{os.path.join(DATA_PATH, action, str(sequence))} is already created')

# 5. Data Collection Process

In [23]:
break_flag = False

# might need to change this if you have virtual devices set up for video capture
cap = cv2.VideoCapture(0)

# set camera resolution to 1280x720
# see https://stackoverflow.com/questions/11420748/setting-camera-parameters-in-opencv-python
# 3. CV_CAP_PROP_FRAME_WIDTH Width of the frames in the video stream.
# 4. CV_CAP_PROP_FRAME_HEIGHT Height of the frames in the video stream.
cap.set(3, 1280)
cap.set(4, 720)

with mp_hands.Hands(
            static_image_mode=True,
            max_num_hands=2,
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
        ) as hands:
    for action in actions[start_action_index:]:
        # Loop through sequences aka videos
        for sequence in range(start_folder, start_folder + no_sequences):
            # Loop through video length aka sequence length
            # the first frame, i.e. frame 0, will only be used to warn the user that the collection is starting
            for frame_num in range(sequence_length + 1):

                # Read feed
                ret, frame = cap.read()
                
                # mirror the frame
                frame = cv2.flip(frame, 1)

                # Make detections
                image, results = mediapipe_detection(frame, hands)

                # Draw landmarks
                draw_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, f'STARTING COLLECTION FOR {action} Video Number {sequence}', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, f'Collecting frames for {action} Video Number {sequence}', (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(1000)
                else: 
                    cv2.putText(image, f'Collecting frames for {action} Video Number {sequence}', (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                
                    # NEW Export keypoints
                    keypoints = extract_keypoints(results)
                    npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                    np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break_flag = True
                    break
                
            if break_flag:
                break
        
        if break_flag:
                break
        
cap.release()
cv2.destroyAllWindows()

# 6. Create Labels & Features

In [8]:
label_map = { label : num for num, label in enumerate(actions) }

In [9]:
label_map

{'right swipe': 0,
 'left swipe': 1,
 'right swipe up': 2,
 'left swipe up': 3,
 'nogesture': 4}

In [10]:
sequences, labels = [], []
for action in actions:
    # get all folder names from 0 to total folders
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        # create the video of length sequence_length
        window = []
        # starts from 1 since frame 0 was only used to warn the user that the data collection for the next video is starting
        for frame_num in range(1, sequence_length+1):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), f"{frame_num}.npy"))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [11]:
# if we have 5 gestures with 30 videos each,
# each video has 30 frames,
# each frame contains 2 hands
# each hand contains 21 landmarks
# each landmark contains 2 positions (x, y),
# we have 150 videos of length 30 frames and each video contains 84 keypoints
# (150, 30, 84)
np.array(sequences).shape

(41, 30, 4)

In [12]:
np.array(labels).shape

(41,)

# 7. Preprocess Data

In [13]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [14]:
X = np.array(sequences)

In [15]:
X.shape

(41, 30, 4)

In [16]:
y = to_categorical(labels).astype(int)

In [17]:
y.shape

(41, 5)

## Preprocessing Method

Reference: [https://github.com/kinivi/tello-gesture-control](https://github.com/kinivi/tello-gesture-control)

Instead of feeding the raw output of the Mediapipe hands model immediately to our model, we preprocess the landmark by generating the position of each landmark relative to the first landmark (the palm of the hand).

For instance, if the position of the palm is (1, 1), then for each landmark, we deduct its position (x, y) by (1, 1).

Finally, we normalise each value by dividing each preprocessed landmark position with the maximum landmark value in a single frame.

In [21]:
def preprocess_landmark(frame):
    new_frame = []
    # base_x and base_y will be set to the position of the first non-zero landmark
    # often, this should be the position of the left palm
    # if there is no left palm detected, i.e. when the left palm position is (0,0), base_x and base_y will be set to the position of the right palm
    # e.g. when the gesture is only using the right hand, such as right point or right pan
    base_x, base_y = None, None
    for index in range(0, frame.shape[0], 2):
        # handle x and y
        for point in range(index, index + 2):
            new_point = frame[point]
            # handle x
            if point % 2 == 0:
                # first non-zero x
                if new_point != 0 and base_x is None:
                    base_x = new_point
                # 2nd, 3rd, etc. non-zero will be subtracted by base_x
                elif new_point != 0 and base_x is not None:
                    new_point -= base_x            
            # handle y
            else:
                # first non-zero y
                if new_point != 0 and base_y is None:
                    base_y = new_point
                # 2nd, 3rd, etc. non-zero will be subtracted by base_y
                elif new_point != 0 and base_y is not None:
                    new_point -= base_y
            new_frame.append(new_point)
        
    # normalisation
    max_value = max(list(map(abs, new_frame)))

    if max_value != 0:

        def normalize(n):
            return n / max_value

        new_frame = list(map(normalize, new_frame))
        
    return new_frame

## With Preprocessing

In [22]:
new_X = []

for index, x in enumerate(X):
    new_x = []
    for frame in x:
        new_x.append(preprocess_landmark(frame))

    new_X.append(new_x)

## Without Preprocessing

In [18]:
new_X = []

for index, x in enumerate(X):
    new_x = []
    for frame in x:
        new_x.append(frame.tolist())

    new_X.append(new_x)

## Output X and Y as JSON for Dynamic Time Warping Algorithm

In [19]:
import json

FILE_NAME = "../src/utilities/dataset.json"

dataset = []

for sequence, label in zip(new_X, labels):
    datum = {
        'label': label,
        'sequence': sequence
    }
    dataset.append(datum)

with open(FILE_NAME, 'w') as f:
    json.dump(dataset, f)

In [27]:
new_X = np.array(new_X)
new_X[0][-1]

array([ 0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.29173481,  1.        ,  0.00892439,
       -0.11847133, -0.01740389, -0.1967694 , -0.0567551 , -0.24572852,
       -0.0927835 , -0.26880913, -0.08825843, -0.19027201, -0.16152468,
       -0.24151182, -0.20763462, -0.26910344, -0.24454299, -0.28615804,
       -0.11309027, -0.12690495, -0.19417577, -0.18098953, -0.24831738,
       -0.20995101, -0.28958496, -0.2313285 , -0.12730157, -0.05

In [23]:
X[0][-1]

array([ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  2.19080925e-01,  8.27425778e-01,
        2.19543189e-01,  7.06519008e-01,  1.80476487e-01,  6.30904138e-01,
        1.32962376e-01,  6.02675378e-01,  9.93896276e-02,  5.99552870e-01,
        1.70456409e-01,  

In [24]:
X = new_X

In [25]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [26]:
X_train.shape

(1026, 30, 84)

In [27]:
y_train.shape

(1026, 4)

In [28]:
X_test.shape

(54, 30, 84)

In [29]:
y_test.shape

(54, 4)

# 8. Build and Train LSTM Neural Network

In [30]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping
import tensorflow as tf

tf.random.set_seed(42)

In [31]:
from datetime import datetime

log_dir = os.path.join('logs', 'train', datetime.now().strftime("%Y%m%d-%H%M%S"))
tb_callback = TensorBoard(log_dir=log_dir)

In [32]:
es_callback = EarlyStopping(patience=50, verbose=1, monitor='val_loss')

In [44]:
model = Sequential()
model.add(LSTM(16, return_sequences=False, activation='relu', input_shape=(X_train.shape[1],X_train.shape[2])))
model.add(Dense(8, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))



In [34]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [35]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm (LSTM)                  (None, 16)                6464      
_________________________________________________________________
dense (Dense)                (None, 8)                 136       
_________________________________________________________________
dense_1 (Dense)              (None, 4)                 36        
Total params: 6,636
Trainable params: 6,636
Non-trainable params: 0
_________________________________________________________________


In [36]:
model.fit(X_train, y_train, batch_size=256, epochs=1000, validation_data=(X_test, y_test), callbacks=[tb_callback, es_callback])

Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000
Epoch 38/1000
Epoch 39/1000
Epoch 40/1000
Epoch 41/1000
Epoch 42/1000
Epoch 43/1000
Epoch 44/1000
Epoch 45/1000
Epoch 46/1000
Epoch 47/1000
Epoch 48/1000
Epoch 49/1000
Epoch 50/1000
Epoch 51/1000
Epoch 52/1000
Epoch 53/1000
Epoch 54/1000
Epoch 55/1000
Epoch 56/1000
Epoch 57/1000
Epoch 58/1000
Epoch 59/1000
Epoch 60/1000
Epoch 61/1000
Epoch 62/1000
Epoch 63/1000
Epoch 64/1000
Epoch 00064: early stopping


<tensorflow.python.keras.callbacks.History at 0x249991d1f40>

# 9. Make Predictions

In [37]:
res = model.predict(X_test)

In [38]:
actions[np.argmax(res[0])]

'right finger snap'

In [39]:
actions[np.argmax(y_test[0])]

'right finger snap'

In [40]:
y_test[0]

array([0, 0, 1, 0])

# 10. Save Weights

In [41]:
MODEL_PATH = os.path.join('models') 

try: 
    os.makedirs(os.path.join(MODEL_PATH))
except:
    print(f'{os.path.join(MODEL_PATH)} is already created')

models is already created


In [42]:
model_name = f'{datetime.now().strftime("%Y%m%d-%H%M%S")}.h5'
model.save(os.path.join(MODEL_PATH, model_name))

In [43]:
# after running this cell, run the previous model definition cell
del model

In [45]:
model.load_weights(os.path.join(MODEL_PATH, '20220131-155214.h5'))

# 11. Evaluation using Confusion Matrix and Accuracy

In [46]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [47]:
yhat = model.predict(X_test)

In [48]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [49]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[40,  0],
        [ 1, 13]],

       [[43,  1],
        [ 0, 10]],

       [[37,  0],
        [ 0, 17]],

       [[41,  0],
        [ 0, 13]]], dtype=int64)

In [50]:
accuracy_score(ytrue, yhat)

0.9814814814814815

# 12. Real-time Test

In [51]:
# don't forget to change the number of colors when you change the total number of labels
colors = [(245,117,16), (117,245,16), (16,117,245), (117,245,16)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [52]:
# since the input needs to be 30 frames long, we need to store the frames into sequence array and only pass it to our model once it reaches 30 frames
sequence = []
sentence = []
predictions = []
threshold = 0.9

# might need to change this if you have virtual devices set up for video capture
cap = cv2.VideoCapture(0)

# set camera resolution to 1280x720
# see https://stackoverflow.com/questions/11420748/setting-camera-parameters-in-opencv-python
# 3. CV_CAP_PROP_FRAME_WIDTH Width of the frames in the video stream.
# 4. CV_CAP_PROP_FRAME_HEIGHT Height of the frames in the video stream.
cap.set(3, 1280)
cap.set(4, 720)

with mp_hands.Hands(
            static_image_mode=True,
            max_num_hands=2,
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
        ) as hands:
    while cap.isOpened():

        # read the camera feed
        ret, frame = cap.read()

        # mirror the frame
        frame = cv2.flip(frame, 1)

        image, results = mediapipe_detection(frame, hands)
        
        draw_landmarks(image, results)

        # append the new frame & grab the last 30 frames
        keypoints = extract_keypoints(results)
        keypoints = preprocess_landmark(keypoints)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        # if already 30 frames, start predicting
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            values, counts = np.unique(predictions[-15:], return_counts=True)
            max_index = np.argmax(counts)
            if values[max_index]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 

                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        
cap.release()
cv2.destroyAllWindows()

# 13. Convert Keras model to TF.js Layers format

In [53]:
!tensorflowjs_converter --input_format keras "models/20220131-155214.h5" "../public/model/"

2022-01-31 15:53:26.640282: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library cudart64_110.dll
