#

## bindOI: Operating Interface Concept For the Visually Impaired
_(Computer Vision, Action Recognition, Image Recognition)_

#####

##### Haixiao (Harry) Feng

#

References:
- [openCV]()
- [MediaPipe](https://google.github.io/mediapipe/solutions/solutions.html)
- [Neural Networks For Action Recognition]()
- [Marching Sqaures ?]()

#

#### Util

##### Installations & Dependencies

In [5]:
# !pip install opencv-python
# !pip install matplotlib
# !pip install mediapipe
# !pip install tensorflow
# !pip install scipy
# !pip install sklearn

In [2]:
import os
import time
import numpy as np
from matplotlib import pyplot as plt

import cv2
import mediapipe as mp

from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

#####

##### Global Variables

In [13]:
mp_holistic = mp.solutions.holistic        # MediaPipe holistic actionRecModel
mp_drawing = mp.solutions.drawing_utils    # MediaPipe drawing utilities

In [14]:
commands = np.array(['Select', 'Confirm', 'Home'])    # These are the three commands that we are training for for the current version of the project

In [15]:
black = (0, 0, 0)
white = (255, 255, 255)
blue = (255, 0, 120)

In [16]:
num_seq = 30      # 30 sequences (video captures) for each category of traning data
seq_len = 30    # 30 frames for each sequence
folder_start = 1

#####

##### Helper Functions

In [17]:
def f_mp_detect(image, actionRecModel):
    
    '''
    Performs command recognition
    '''
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)    # Color conversion BGR -> RGB (per CV2 requirement)
    image.flags.writeable = False                     # Set image to un-writable (performance util)
    results = actionRecModel.process(image)           # Predict using specifically-trained actionRecModel
    image.flags.writeable = True                      # Set image back to writable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)    # Color conversion back or original
    
    return image, results

In [18]:
def f_extract_keypoints(results):
    
    '''
    Extracts keypoints for all components (pose, face, left hand, right hand) as a concatenated array.
    If none detected, return array of all 0s of the same dimensions
    '''
    
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    leftHand = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rightHand = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4) 
    keypointsConcat = np.concatenate([pose, face, leftHand, rightHand])
    
    return keypointsConcat

In [19]:
def f_draw_landmarks(image, results):
    
    '''
    Displays landmarks and connections
    '''
    
    # Draw face connections
    mp_drawing.draw_landmarks(image,
                             results.face_landmarks,
                             mp_holistic.FACEMESH_TESSELATION,                                    # landmark function
                             mp_drawing.DrawingSpec(color=blue, thickness=1, circle_radius=1),    # Landmark drawing spec
                             mp_drawing.DrawingSpec(color=blue, thickness=1, circle_radius=1)     # Connection drawing spec
                             )
    
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks,
                             mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=white, thickness=2, circle_radius=1),
                             mp_drawing.DrawingSpec(color=blue, thickness=2, circle_radius=2)
                             )
    
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks,
                             mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=white, thickness=2, circle_radius=1),
                             mp_drawing.DrawingSpec(color=blue, thickness=2, circle_radius=2)
                             )

    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks,
                             mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=white, thickness=2, circle_radius=1),
                             mp_drawing.DrawingSpec(color=blue, thickness=3, circle_radius=3)
                             )

In [20]:
def f_viz_prob_region(res, commands, inputFrame):
    
    '''
    Visualizes the probability region of command with a sliding rectangle
    '''
    
    outputFrame = inputFrame.copy()
    for num, prob in enumerate(res):
        
        # Draw sliding rectangle 60
        cv2.rectangle(img=outputFrame,
                      pt1=(0, 20+num*40),
                      pt2=(int(prob*100), 50+num*40),
                      color=blue,
                      thickness=-1
                     )
        
        # Display command texts 85
        cv2.putText(img=outputFrame,
                    text=commands[num],
                    org=(0, 45+num*40),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=1,
                    color=black,
                    thickness=2,
                    lineType=cv2.LINE_AA
                   )
        
    return outputFrame

#

#### MediaPipe Landmark Detection Demo

In [32]:
# Access webcam for demo

cap = cv2.VideoCapture(0)                             # Default webcam

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:    # Instantiate mediapipe holistic actionRecModel
    while cap.isOpened():
        
        ret, frame = cap.read()                       # Read capture
        image, results = f_mp_detect(frame, holistic)    # Detect landmarks
        f_draw_landmarks(image, results)                 # Draw landmarks
        cv2.imshow('OpenCV Feed', image)              # Display

        if cv2.waitKey(10) & 0xFF == ord('q'):        # Break if needed
            break
            
    cap.release()                                     # Release capture
    cv2.destroyAllWindows()                           # Close windows

#

#### Data Collection
Collects training data for each of the commands

In [33]:
# Make local directory and sub-directories

data_path = os.path.join('data_train_folder')    # Local path to store data

for command in commands:
    command = command.lower()
    for sequence in range(1, num_seq+1):
        try:
            os.makedirs(os.path.join(data_path, command, str(sequence)))    # Make directory if it doesn't already exit
        except:
            pass

In [35]:
cap = cv2.VideoCapture(0)   # Default webcam

# Instantiate mediapipe holistic actionRecModel
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # Iterate thru the 3 commands
    for command in commands:
        # Iterate thru sequences (video captures)
        for sequence in range(folder_start, folder_start+num_seq):
            if cv2.waitKey(10) & 0xFF == ord('q'):   # Option to exit loop
                break
            # Loop thru sequence (video) length
            for frameNum in range(seq_len):
                ret, frame = cap.read()                         # Read capture
                image, results = f_mp_detect(frame, holistic)   # Detect landmarks
                f_draw_landmarks(image, results)                # Draw landmarks
                
                # Display info during capture
                if frameNum==0:
                    cv2.putText(img=image,
                                text='Repositioning...',
                                org=(10, 50),
                                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                fontScale=1,
                                color=blue,
                                thickness=2,
                                lineType=cv2.LINE_AA
                               )
                    cv2.putText(img=image,
                                text='Collecting frames for command: [{}], sequence #{}'.format(command, sequence),
                                org=(10, 20),
                                fontFace=cv2.FONT_HERSHEY_PLAIN,
                                fontScale=1,
                                color=blue,
                                thickness=1,
                                lineType=cv2.LINE_AA
                               )
                    
                    cv2.putText(img=image,
                                text='----------------------------',
                                org=(0, 300),
                                fontFace=cv2.FONT_HERSHEY_PLAIN,
                                fontScale=2,
                                color=blue,
                                thickness=2,
                                lineType=cv2.LINE_AA
                               )
        
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(1000)                  # *Add wait time between each capture
                else:                    
                    cv2.putText(img=image,
                                text='Collecting frames for command: [{}], sequence #{}'.format(command, sequence),
                                org=(10, 20),
                                fontFace=cv2.FONT_HERSHEY_PLAIN,
                                fontScale=1,
                                color=blue,
                                thickness=1,
                                lineType=cv2.LINE_AA
                               )
                    
                    cv2.putText(img=image,
                                text='----------------------------',
                                org=(0, 300),
                                fontFace=cv2.FONT_HERSHEY_PLAIN,
                                fontScale=2,
                                color=blue,
                                thickness=2,
                                lineType=cv2.LINE_AA
                               )
                    
                    cv2.imshow('OpenCV Feed', image)
                
                # Extract key points & save to path
                keypoints = f_extract_keypoints(results)
                savePath = os.path.join(data_path, command, str(sequence), str(frameNum+1))
                np.save(savePath, keypoints)

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

#

#### Data Preprocessing
- Load training data
- Create labels & features

In [None]:
commandMap = {label:num for num, label in enumerate(commands)}

In [None]:
# commandMap

In [None]:
# Load training data
sequences, labels = [], []
for command in commands:
    for sequence in np.array(os.listdir(os.path.join(data_path, command))).astype(int):
        window = []
        for frame in range(seq_len):    # 30
            res = np.load(os.path.join(data_path, command, str(sequence), "{}.npy".format(frame+1)))
            window.append(res)
        sequences.append(window)
        labels.append(commandMap[command])

In [None]:
# len(sequences)

In [None]:
# len(labels)

In [None]:
# Create X & y variables, and create test batch
X = np.array(sequences)
y = to_categorical(labels).astype(int)

In [None]:
Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, test_size=0.1)

In [None]:
# X[61][0].shape

In [None]:
# X

In [None]:
# y

In [None]:
# X.shape    # Confirm dimension

In [None]:
# ytrain.shape    # Confirm dimension

In [None]:
# ytest.shape    # Confirm dimension

#

#### Modeling & Training (LSTM)

In [None]:
# del actionRecModel

In [27]:
# Define actionRecModel
# *This is the best actionRecModel that I managed to come up with so far. Decent balance between performane and computational costs
actionRecModel = Sequential()
actionRecModel.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30, 1662)))
actionRecModel.add(LSTM(128, return_sequences=True, activation='relu'))
actionRecModel.add(LSTM(64, return_sequences=False, activation='relu'))
actionRecModel.add(Dense(64, activation='relu'))
actionRecModel.add(Dense(32, activation='relu'))
actionRecModel.add(Dense(commands.shape[0], activation='softmax'))    # The Softmax activation function outputs a combined probability between
                                                                      # 0 and 1 across all categories (the three commands, in this case)
                                                                      # this is what we want because we will be selecting the prediction
                                                                      # with the greatest probability as the action recognition result

In [28]:
# Define actionRecModel compiler & optimization function
actionRecModel.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [29]:
actionRecModel.load_weights('model_action_rec_curr.h5')

In [None]:
# class trainingHaltClass(tf.keras.callbacks.Callback):
#     def on_epoch_end(self, epoch, logs={}):
#         if(logs.get('loss') <= 0.05):
#             print('\n Training halted at J = 0.1. \n')
#             self.model.stop_training = True
            
# trainingHalt = trainingHaltClass()

# actionRecModel.fit(Xtrain, ytrain, epochs=1000, callbacks=[trainingHalt])

In [None]:
# %%capture

# # Callback object for stop training after 50 successive epochs without improvement
# callback = tf.keras.callbacks.EarlyStopping(monitor='loss',
#                                             patience=50,
#                                             verbose=1,
#                                             restore_best_weights=True)

# # Train the model
# history = actionRecModel.fit(Xtrain, ytrain,
#                              epochs=300,
#                              callbacks=[callback])

In [None]:
# actionRecModel.save('actionRecModel_05.h5')    # Save model

In [None]:
# history_dict = history.history
# print(history_dict.keys())

In [None]:
# # Graphical summary of traing accuracy and loss
# plt.plot(history.history['categorical_accuracy'])
# plt.title('model accuracy')
# plt.ylabel('accuracy')
# plt.xlabel('epoch')
# plt.legend(['train', 'test'], loc='upper left')
# plt.show()
# plt.plot(history.history['loss'])
# # plt.plot(history.history['val_loss'])
# plt.title('model loss')
# plt.ylabel('loss')
# plt.xlabel('epoch')
# plt.legend(['train', 'test'], loc='upper left')
# plt.show()

In [None]:
# actionRecModel.summary()

#

#### Predictions & Evaluations

In [26]:
# res = actionRecModel.predict(Xtest)

In [None]:
yhat = actionRecModel.predict(Xtest)



In [None]:
ytrue = np.argmax(ytest, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
'''
+---------------+----------------+
| True Negative | False Positive |
+---------------+----------------+
| False Negative | True Positive |
+---------------+----------------+
'''
multilabel_confusion_matrix(ytrue, yhat)

array([[[5, 0],
        [0, 4]],

       [[8, 0],
        [0, 1]],

       [[5, 0],
        [0, 4]]], dtype=int64)

In [36]:
# accuracy_score(ytrue, yhat)

##### * The model is decently accurate

#

#### Real-Time Demo

In [31]:
%%capture

# util var
sequenceList = []
predList = []
commandList = []
threshold = 0.75    # *Recognizes (determines) command when the current output probability is greater than this threshold

cap = cv2.VideoCapture(0)    # Default webcam

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:    # Instantiate mediapipe holistic actionRecModel
    while cap.isOpened():

        ret, frame = cap.read()                           # Read capture
        image, results = f_mp_detect(frame, holistic)        # Detect landmarks
        f_draw_landmarks(image, results)                     # Draw landmarks
        
        # Extract and append the LAST 30 key points to sequence list for prediction
        keypoints = f_extract_keypoints(results)
        sequenceList.append(keypoints)
        sequenceList = sequenceList[-30:]
        
        # Predict current command and Append result to predictions list
        if len(sequenceList) == 30:
            res = actionRecModel.predict(np.expand_dims(sequenceList, axis=0))[0]
            predList.append(np.argmax(res))
            
            if np.unique(predList[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold:                         # Check if the result with the greatest prb is above the threshold
                    if len(commandList) > 0:                                # Check if there are already words in the commands list
                        if commands[np.argmax(res)] != commandList[-1]:     # Update (append) current command IFF different from previous
                            commandList.append(commands[np.argmax(res)])    # Append current command
                    else:
                        commandList.append(commands[np.argmax(res)])        # Append current (new) command

            if len(commandList) > 5: 
                commandList = commandList[-5:]            # Restrict amount of output to render to display
                
            image = f_viz_prob_region(res, commands, image)   # Viz probabilities

        
        cv2.putText(img=image,
                    text='----------------------------',
                    org=(0, 300),
                    fontFace=cv2.FONT_HERSHEY_PLAIN,
                    fontScale=2,
                    color=blue,
                    thickness=2,
                    lineType=cv2.LINE_AA
                   )
        
        
        cv2.imshow('OpenCV Feed', image)                  # Display

        if cv2.waitKey(10) & 0xFF == ord('q'):            # Break if needed
            break
            
    cap.release()
    cv2.destroyAllWindows()