# 1. Dependencies

In [4]:
import cv2
import re
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp  
from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.python.client import device_lib

# CUDA TESTING - https://www.youtube.com/watch?v=hHWkvEcDBO0&t=335s
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU'))) # Checks whether there is a GPU available
print(device_lib.list_local_devices()) # Lists the available devices
#tf.debugging.set_log_device_placement(True) # Check what is being used, either GPU or CPU

# Make sure in a python 3.8 env
# pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python mediapipe sklearn matplotlib


Num GPUs Available:  1
[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 14850910289084346204
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 7011009824
locality {
  bus_id: 1
  links {
  }
}
incarnation: 11147271884191509029
physical_device_desc: "device: 0, name: NVIDIA GeForce GTX 1080, pci bus id: 0000:01:00.0, compute capability: 6.1"
]


# 2. Functions

In [5]:
mp_holistic = mp.solutions.holistic # bringing the holstic model
mp_drawing = mp.solutions.drawing_utils # drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction - detecting using mediapipe
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    
# CAN CHANGE THE COLOURS OF THESE TO MAKE IT DIFFERENT 
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

# Extracting data points
def extract_keypoints(results):
    #pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    #face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    #return np.concatenate([pose, face, lh, rh])
    return np.concatenate([lh, rh])

# 468*3+33*4+21*3+21*3 = 1662

def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

# 3A. Dataset generating 

In [7]:
INPUT_VIDEO_PATH = r'C:\Users\Tommaso\Google Drive\Current Courses\COMP9444\CodingTasks\Data'

SEQUENCE_LENGTH = 30
no_sequences = 30
cap = cv2.VideoCapture(0)
from string import ascii_uppercase
# Set mediapipe model 
writer_check = False
width= int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height= int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in ascii_uppercase:
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through video length aka sequence length
            for frame_num in range(SEQUENCE_LENGTH + 1):

                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
#                 print(results)

                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0:
                    name = action + '.' + str(sequence).zfill(3) + '.mp4'
                    file_path = os.path.join(INPUT_VIDEO_PATH, name)
                    if not os.path.exists(file_path):
                        writer_check = True
                        cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                        cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        # Show to screen
                        cv2.imshow('OpenCV Feed', image)
                        cv2.waitKey(2000)
                        writer= cv2.VideoWriter(file_path, cv2.VideoWriter_fourcc(*'DIVX'), 30, (width,height))
                    else:
                        writer_check = False
                        cv2.putText(image, 'DATA ALREADY COLLECTED', (120,200), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                        cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        cv2.imshow('OpenCV Feed', image)
                        cv2.waitKey(200)
                        break
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    writer.write(frame)
                    
                
                # NEW Export keypoints
                #keypoints = extract_keypoints(results)
                #npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                #np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    writer.release()
                    cap.release()
                    cv2.destroyAllWindows() 
                    break
            if writer_check == True:
                writer.release() 
                writer_check = False
            
    writer.release()          
    cap.release()
    cv2.destroyAllWindows()

KeyboardInterrupt: 

# 3B. Data Gathering

In [7]:
INPUT_VIDEO_PATH = r'C:\Users\Tommaso\Google Drive\Current Courses\COMP9444\CodingTasks\Data'

# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data') 

# Videos are going to be 30 frames in length
SEQUENCE_LENGTH = 30

label_map = {}
no_sequences = {}
actions = []
# VIDEO NAMING CONVENTION = "ACTION.SEQUENCE.mp4" 
# e.g. "V.001.mp4"
for i,vid in enumerate(os.listdir(INPUT_VIDEO_PATH)):
    print(vid,i)
    string = vid.split('.')
    action = string[0]
    sequence = int(string[1])
    cap = cv2.VideoCapture(vid)
    label_map[action] = i
    
    if action in no_sequences:
        no_sequences[action] += 1
    else:
       no_sequences[action] = 1
      
    if action not in actions:
        actions.append(action)
    try: 
        os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
    except:
        print('Directory Already Exists, passing file')
        continue
        
    vid_loc = os.path.join(INPUT_VIDEO_PATH,vid)
    cap = cv2.VideoCapture(vid_loc)
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        # this is code for reading from a video file
        frame_num = 0
        while(cap.isOpened() and frame_num < SEQUENCE_LENGTH):
            # Read feed
            ret, frame = cap.read()
            
            if ret:
                image, results = mediapipe_detection(frame, holistic)
                # Draw landmarks
                draw_styled_landmarks(image, results)
                cv2.waitKey(1)
                
                # UNCOMMENT THIS TO SEE THE VIDEO DISPLAYED
                # cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                #                 cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                # # # Show to screen
                # cv2.imshow('OpenCV Feed', image)
                
                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)
                frame_num += 1
            else:
                break
            
    cap.release()
    cv2.destroyAllWindows()



A.000.mp4 0
Directory Already Exists, passing file
A.001.mp4 1
Directory Already Exists, passing file
A.002.mp4 2
Directory Already Exists, passing file
A.003.mp4 3
Directory Already Exists, passing file
A.004.mp4 4
Directory Already Exists, passing file
A.005.mp4 5
Directory Already Exists, passing file
A.006.mp4 6
Directory Already Exists, passing file
A.007.mp4 7
Directory Already Exists, passing file
A.008.mp4 8
Directory Already Exists, passing file
A.009.mp4 9
Directory Already Exists, passing file
A.010.mp4 10
Directory Already Exists, passing file
A.011.mp4 11
Directory Already Exists, passing file
A.012.mp4 12
Directory Already Exists, passing file
A.013.mp4 13
Directory Already Exists, passing file
A.014.mp4 14
Directory Already Exists, passing file
A.015.mp4 15
Directory Already Exists, passing file
A.016.mp4 16
Directory Already Exists, passing file
A.017.mp4 17
Directory Already Exists, passing file
A.018.mp4 18
Directory Already Exists, passing file
A.019.mp4 19
Directory

In [11]:
np.load(r'C:\Users\Tommaso\Google Drive\Current Courses\COMP9444\CodingTasks\Perceptron-Heros\ASL_LSTM\MP_Data\B\0\12.npy').shape

(126,)

# 3. Data Preprocessing

In [8]:
print(no_sequences)
print(actions)
label_map = {label:num for num, label in enumerate(actions)}
sequences, labels = [], []
for action in actions:
    for sequence in range(20):
        window = []
        for frame_num in range(SEQUENCE_LENGTH):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence+1), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

print(label_map)
X = np.array(sequences)
y = to_categorical(labels).astype(int)
print(np.array(sequences).shape)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

{'A': 30, 'B': 30, 'C': 30, 'D': 30, 'E': 30, 'F': 30, 'G': 30, 'H': 30, 'I': 30, 'J': 30, 'K': 30, 'L': 30, 'M': 30, 'N': 30, 'O': 30, 'P': 30, 'Q': 30, 'R': 30, 'S': 30, 'T': 30, 'U': 30, 'V': 30, 'W': 30, 'X': 30, 'Y': 30, 'Z': 30}
['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']
{'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25}
(520, 30, 126)


# 4. Build and Train LSTM Neural Network

In [9]:

# Reasons for doing this
# - less data to produce a hyper accurate model
# - much denser neural network (rather than 30 40 million paramters have BLANK)
# - It was a whole heap faster in detecting in real time
print(X.shape)
actions = np.asarray(actions)
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,126)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))
model.summary()
#model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
#model.fit(X_train, y_train, epochs=40000, callbacks=[tb_callback])



(520, 30, 126)
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm (LSTM)                  (None, 30, 64)            48896     
_________________________________________________________________
lstm_1 (LSTM)                (None, 30, 128)           98816     
_________________________________________________________________
lstm_2 (LSTM)                (None, 30, 128)           131584    
_________________________________________________________________
lstm_3 (LSTM)                (None, 64)                49408     
_________________________________________________________________
dense (Dense)                (None, 64)                4160      
_________________________________________________________________
dense_1 (Dense)              (None, 32)                2080      
_________________________________________________________________
dense_2 (Dense)              (None, 26)  

# 5. Validating and Predicting the Model

In [19]:
res = model.predict(X_test)
print(actions[np.argmax(res[12])])
print(actions[np.argmax(y_test[12])])


K
K


# 5A. Save Model

In [20]:
model.save('action_test_full.h5')

# 5B. Load Model

In [25]:
del model

In [10]:

model.load_weights('action_test_full.h5')

# 6. Model Evaluation

In [31]:
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
print(multilabel_confusion_matrix(ytrue, yhat))
accuracy_score(ytrue, yhat)

[[[24  0]
  [ 1  1]]

 [[25  0]
  [ 0  1]]

 [[25  0]
  [ 0  1]]

 [[25  0]
  [ 0  1]]

 [[25  0]
  [ 0  1]]

 [[23  0]
  [ 0  3]]

 [[25  0]
  [ 0  1]]

 [[25  0]
  [ 0  1]]

 [[25  0]
  [ 0  1]]

 [[25  1]
  [ 0  0]]

 [[25  0]
  [ 0  1]]

 [[24  0]
  [ 0  2]]

 [[24  0]
  [ 0  2]]

 [[23  1]
  [ 0  2]]

 [[25  0]
  [ 1  0]]

 [[24  0]
  [ 0  2]]

 [[25  0]
  [ 0  1]]

 [[23  0]
  [ 0  3]]]


0.9230769230769231

# 7. Real-time Testing

In [17]:
# 1. New detection variables
sequence = []
sentence = []
no_hand_count = 0
threshold = 0.8
colors = [(245,117,16), (117,245,16), (16,117,245)]

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        #print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        if not keypoints.any():
            no_hand_count += 1 
            if no_hand_count > 5:
                sequence = []
                sentence = []
#         sequence.insert(0,keypoints)
#         sequence = sequence[:30]
        else:
            no_hand_count = 0
            sequence.append(keypoints)
            sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            
            
        #3. Viz logic
            if res[np.argmax(res)] > threshold: 
                if len(sentence) > 0:
                    sentence = [] 
                    sentence.append(actions[np.argmax(res)])
                else:
                    sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            #image = prob_viz(res, actions, image, colors)
            
        #cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (350,50), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

R
R
R
R
R
R
R
R
R
R
R
R
R
R
V
V
V
V
V
V
V
V
V
V
V
V
V
V
R
R
R
R
R
R
R
R
R
R
R
R
R
R
R
R
U
U
U
U
U
U
U
U
U
U
U
U
R
R
R
R
R
R
R
U
U
U
U
U
U
U
U
U
U
U
U
R
R
R
R
R
R
U
U
U
U
U
U
U
U
U
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
K
L
L
L
L
L
L
L
L
L
L
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
E
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
Y
I
I
I
I
I
I
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
A
I
I
I
I
I
I
J
J
J
F
F
F
F
F
F
F
F
C
C
C
C
O
O
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
F
O
O
O
O
O
O
O
O
O
O
O
O
O
O
F
F
F
F
F
F
F
F
F
F
F
O
O
O
O
O
O
O
O
O
O
O
O
O
O
O
O
O
O
P
P
P
P
P
P
P
P
P
P
P
P
P
P
Q
Q
Q
Q
Q
Q
Q
Q
Q
Q
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
W
V
V
V
V
V
V
V
V
V
V
V
V
V
V
V


# 8. Txt to speech

In [None]:
import pyttsx3

text_speech = pyttsx3.init()
text_speech.say(sentence)
text_speech.runAndWait() # this line says it in real time

# myobj.save("txt1.mp3") 
# os.system("mpg321 txt1.mp3")