# 1. Import and Install Dependencies

In [None]:
#!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python mediapipe sklearn matplotlib attention

In [None]:
import cv2
import numpy as np
import os, glob
from matplotlib import pyplot as plt
import time
import mediapipe as mp

** 2. Keypoints using MP Holistic **

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [None]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

In [None]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

3. Extract Keypoint Values

In [None]:
def extract_keypoints(results, image_hight, image_width):
    if results.pose_landmarks:
        land_marks = [results.pose_landmarks.landmark[mp_holistic.PoseLandmark.NOSE], results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_SHOULDER], results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_SHOULDER], results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_ELBOW],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_ELBOW],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_WRIST],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_WRIST],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_PINKY],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_PINKY],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_INDEX],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_INDEX],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_THUMB],  results.pose_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_THUMB]]
        x_1 = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.NOSE].x * image_width
        y_1 = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.NOSE].y * image_hight
    else:
        np.zeros(13*2)
    pose = np.array([[(res.x * image_width)-x_1, (res.y * image_hight)-y_1] for res in land_marks]).flatten() if results.pose_landmarks else np.zeros(13*2)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])
    #return np.concatenate([lh, rh])
    #return pose

In [None]:
lebels = ['Bird', 'Bitter', 'Black', 'Book', 'Bread', 'Break', 'Caram', 'Chair', 'Clean', 'Come', 'Degrade', 'Door', 'Egg', 'Exercise', 'Exercise Book', 'Fate', 'February', 'Fish', 'Food', 'Good', 'Goodboy', 'Growth', 'Hearing_Impaired', 'January', 'Khoda Hafez', 'Large', 'Listen', 'March', 'Me', 'Meat', 'Mobile', 'More', 'Pencil', 'Picture', 'Procession', 'Quick', 'Remember', 'Rose Color', 'Salam', 'Short', 'Sit', 'Small', 'Snake', 'Table', 'Telephone', 'Thanks', 'Tiger', 'Together', 'Up', 'Wet']


print(len(lebels))
classes = np.array(lebels)
print(classes)
label_map = {label:num for num, label in enumerate(lebels)}

print(label_map)

In [None]:
####################### pre-processing #############################################
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical


import glob
import os
import os.path
import numpy as np


DATA_PATH = os.path.join('DataSet-50-W')
sequence_length = 30

#lebels = ['bird']
#lebels = ['Bird', 'Bitter','Black','Book']


label_map = {label:num for num, label in enumerate(lebels)}

print(label_map)


def get_video_parts(video_path):
    """Given a full path to a video, return its parts."""
    parts = video_path.split(os.path.sep)
    filename = parts[2]
    filename_no_ext = filename.split('.')[0]
    classname = parts[1]
    train_or_test = parts[0]

    return train_or_test, classname, filename_no_ext, filename

In [None]:
def get_video_parts(video_path):
    """Given a full path to a video, return its parts."""
    parts = video_path.split(os.path.sep)
    filename = parts[2]
    filename_no_ext = filename.split('.')[0]
    classname = parts[1]
    train_or_test = parts[0]

    return train_or_test, classname, filename_no_ext, filename


#np.load('0.npy')

DATA_PATH = os.path.join('DataSet-50-W')
data_file = []

sequence_length = 30
sequence = 0
s = 0
i = 0
j = 0


In [None]:
for vid_class in classes:
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

        class_files = glob.glob(os.path.join(DATA_PATH, vid_class, '*.mp4'))
        #print(class_files)
        i += 1

        for video_path in class_files:
            print(video_path)
            j += 1

            cap = cv2.VideoCapture(video_path)
            property_id = int(cv2.CAP_PROP_FRAME_COUNT)
            length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            print("The total frame number is {0}".format(str(length)))
            print("The property_id number is {0}".format(str(property_id)))




            # Get the parts of the file.
            video_parts = get_video_parts(video_path)
            print(video_parts)

            train_or_test, classname, filename_no_ext, filename = video_parts
            #print(classname)
            #print(filename_no_ext)
            #print(filename)

            sequence += 1

            #while cap.isOpened():
            for frame_num in range(length):

                # Read feed
                ret, frame = cap.read()
                print(frame)
                s += 1
                print(i)
                print(j)
                print(s)
                print("################# Done  ########################################")

                #print(frame)
                # Print nose coordinates.
                image_hight, image_width, _ = frame.shape
                print(image_hight)
                print(image_width)

                image, results = mediapipe_detection(frame, holistic)
                


                                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)

                # NEW Export keypoints
                keypoints = extract_keypoints(results, image_hight, image_width)
                print(keypoints)
                print(keypoints.shape)

                npy_path = os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext, str(frame_num))

                if not(os.path.exists(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext))):
                    os.mkdir(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext))





                np.save(npy_path, keypoints)




        cap.release()
        cv2.destroyAllWindows() 

In [None]:

step_size = 5
sequences, labels = [], []
for lebel in lebels:
    class_files = glob.glob(os.path.join(DATA_PATH, lebel, '*.mp4'))
    
    for class_file in class_files:
        video_parts = get_video_parts(class_file)

        train_or_test, classname, filename_no_ext, filename = video_parts
        
        generated_files = glob.glob(os.path.join(DATA_PATH, lebel, filename_no_ext, '*.npy'))
        #print(generated_files)
        #print(generated_files[5])
        
        total_npy = len(generated_files)
        print("The total_npy number is {0}".format(str(total_npy)))
        
        j = 0
        i = 0
        step_size = int(((total_npy-sequence_length)+5)/5)
        print(step_size)
        #window = []
        print("The Folder name is "+ filename_no_ext)

        
        while(i < step_size): 
            window = []
            for frame_num in range(j, j+sequence_length):
                #print(frame_num)    
                if j+30 <= total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)
            i += 1
            j += 5
            print(j)
            #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel]) 
               
        # previous comment starts here
        """
        
        for frame_num in range(j, j+30):
            #print(frame_num)    
            if j+30 < total_npy:
                print("The frame number is {0}".format(str(frame_num)))
                res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                window.append(res)   

            #i += 1
            #j += 5
            print(j)
            #generated_files = generated_files[i * 5] 
        sequences.append(window)
        labels.append(label_map[lebel]) 

        j += 5 
        i += 1
        if i < step_size:


            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)   
                else:
                    break
                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel])     
        
        j += 5  
        i += 1
        if i < step_size:

            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)   
                else:
                    break
                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel])   
        
        j += 5 
        i += 1
        if i < step_size:
        
            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)  
                else:
                    break
                

                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel]) 

        
        j += 5  
        i += 1
        if i < step_size:

            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)  
                else:
                    break     

                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel])   
        
        j += 5  
        i += 1
        if i < step_size:
        
            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res) 
                else:
                    break      

                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel]) 
        
        j += 5 
        i += 1
        if i < step_size:

            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)  
                else:
                    break      

                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel])   
        
        j += 5  
        i += 1
        if i < step_size:
        
            for frame_num in range(j, j+30):
                #print(frame_num)    
                if j+30 < total_npy:
                    print("The frame number is {0}".format(str(frame_num)))
                    res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
                    window.append(res)   
                else:
                    break 
                #i += 1
                #j += 5
                print(j)
                #generated_files = generated_files[i * 5] 
            sequences.append(window)
            labels.append(label_map[lebel]) 
        #previous comment ends here
        """
        
            
            
            


print(np.array(sequences).shape)
print(np.array(labels).shape)



X = np.array(sequences)
print(X)



print(X.shape)

y = to_categorical(labels).astype(int)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

print(y_test.shape)

############# Build and Train LSTM Neural Network ###################################


In [None]:
#------------Lstm with attention ---------------------------

import numpy as np
from tensorflow.keras import Input, metrics
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.models import load_model, Model

from attention import Attention


def main():
    # Dummy data. There is nothing to learn in this example.
    num_samples, time_steps, input_dim, output_dim = 2159, 30, 1556, 4
    #data_x = np.random.uniform(size=(num_samples, time_steps, input_dim))
    #data_y = np.random.uniform(size=(num_samples, output_dim))

    # Define/compile the model.
    model_input = Input(shape=(time_steps, input_dim))
    x = LSTM(128, return_sequences=True)(model_input)
    #x = LSTM(128, return_sequences=True, activation='relu')(x)
    x = Attention(units=64)(x)
    #x = LSTM(64, activation='relu')(x)
    x = Dense(64, activation = 'relu')(x)
    x = Dense(classes.shape[0], activation='softmax')(x)
    model = Model(model_input, x)
    
    c_backs = [callbacks.EarlyStopping(patience=40, \
       restore_best_weights=True)]
    
    rms = optimizers.RMSprop(lr=0.0001, decay=1e-6)

    model.compile( loss = "categorical_crossentropy", 
                optimizer = rms, 
                metrics=['accuracy'])
    
    #model.compile(loss='mae', optimizer='adam',  metrics=['categorical_accuracy'])
    model.summary()

    # train.
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100)

    # test save/reload model.
    pred1 = model.predict(X_train)
    model.save('lstm+attention_bsl_model.h5')
    model_h5 = load_model('lstm+attention_bsl_model.h5', custom_objects={'Attention': Attention})
    pred2 = model_h5.predict(X_train)
    np.testing.assert_almost_equal(pred1, pred2)
    print('Success.')


if __name__ == '__main__':
    main()

In [None]:
#------------BiLstm with attention ---------------------------


import numpy as np
from tensorflow.keras import Input, metrics, callbacks, optimizers
from tensorflow.keras.layers import Bidirectional, Dense, LSTM, GRU
from tensorflow.keras.models import load_model, Model

from attention import Attention


def main():
    # Dummy data. There is nothing to learn in this example.
    num_samples, time_steps, input_dim, output_dim = 16252, 30, 1556, 4
    #data_x = np.random.uniform(size=(num_samples, time_steps, input_dim))
    #data_y = np.random.uniform(size=(num_samples, output_dim))

    # Define/compile the model.
    model_input = Input(shape=(time_steps, input_dim))
    x = Bidirectional(LSTM(64, return_sequences=True))(model_input)
    x = Bidirectional(LSTM(128, return_sequences=True, activation='relu'))(x)
    x = Attention(units=64)(x)
    #x = Bidirectional(LSTM(64, return_sequences=True, activation='relu'))(x)
    x = Dense(64, activation='relu')(x)
    x = Dense(classes.shape[0], activation='softmax')(x)
    model = Model(model_input, x)
    
    
    c_backs = [callbacks.EarlyStopping(patience=40, \
       restore_best_weights=True)]
    
    rms = optimizers.RMSprop(lr=0.0001, decay=1e-6)

    model.compile( loss = "categorical_crossentropy", 
                optimizer = rms, 
                metrics=['accuracy'])
    
    #model.compile(loss='mae', optimizer='adam',  metrics=['categorical_accuracy'])
    model.summary()

    # train.
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=150)

    # test save/reload model.
    pred1 = model.predict(X_train)
    model.save('BiLSTM+attention_bsl_model_w.h5')
    model_h5 = load_model('BiLSTM+attention_bsl_model.h5_w', custom_objects={'Attention': Attention})
    pred2 = model_h5.predict(X_train)
    np.testing.assert_almost_equal(pred1, pred2)
    print('Success.')


if __name__ == '__main__':
    main()

In [None]:
#------------GRU with attention ---------------------------

import numpy as np
from tensorflow.keras import Input, metrics
from tensorflow.keras.layers import Dense, LSTM, GRU
from tensorflow.keras.models import load_model, Model

from attention import Attention


def main():
    # Dummy data. There is nothing to learn in this example.
    num_samples, time_steps, input_dim, output_dim = 2159, 30, 1556, 4
    #data_x = np.random.uniform(size=(num_samples, time_steps, input_dim))
    #data_y = np.random.uniform(size=(num_samples, output_dim))

    # Define/compile the model.
    model_input = Input(shape=(time_steps, input_dim))
    x = GRU(128, return_sequences=True)(model_input)
    #x = LSTM(128, return_sequences=True, activation='relu')(x)
    x = Attention(units=64)(x)
    #x = LSTM(64, activation='relu')(x)
    x = Dense(64, activation = 'relu')(x)
    x = Dense(classes.shape[0], activation='softmax')(x)
    model = Model(model_input, x)
    
    c_backs = [callbacks.EarlyStopping(patience=40, \
       restore_best_weights=True)]
    
    rms = optimizers.RMSprop(lr=0.0001, decay=1e-6)

    model.compile( loss = "categorical_crossentropy", 
                optimizer = rms, 
                metrics=['accuracy'])
    
    #model.compile(loss='mae', optimizer='adam',  metrics=['categorical_accuracy'])
    model.summary()

    # train.
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=100)

    # test save/reload model.
    pred1 = model.predict(X_train)
    model.save('GRU+attention_bsl_model.h5')
    model_h5 = load_model('GRU+attention_bsl_model.h5', custom_objects={'Attention': Attention})
    pred2 = model_h5.predict(X_train)
    np.testing.assert_almost_equal(pred1, pred2)
    print('Success.')


if __name__ == '__main__':
    main()

In [None]:
#------------BiGRU with attention ---------------------------

import numpy as np
from tensorflow.keras import Input, metrics, callbacks, optimizers
from tensorflow.keras.layers import Bidirectional, Dense, LSTM, GRU
from tensorflow.keras.models import load_model, Model

from attention import Attention


def main():
    # Dummy data. There is nothing to learn in this example.
    num_samples, time_steps, input_dim, output_dim = 16252, 30, 1556, 4
    #data_x = np.random.uniform(size=(num_samples, time_steps, input_dim))
    #data_y = np.random.uniform(size=(num_samples, output_dim))

    # Define/compile the model.
    model_input = Input(shape=(time_steps, input_dim))
    x = Bidirectional(GRU(64, return_sequences=True))(model_input)
    x = Bidirectional(GRU(128, return_sequences=True, activation='relu'))(x)
    x = Attention(units=64)(x)
    x = Dense(64, activation='relu')(x)
    x = Dense(classes.shape[0], activation='softmax')(x)
    model = Model(model_input, x)
    
    c_backs = [callbacks.EarlyStopping(patience=40, \
       restore_best_weights=True)]
    
    rms = optimizers.RMSprop(lr=0.0001, decay=1e-6)

    model.compile( loss = "categorical_crossentropy", 
                optimizer = rms, 
                metrics=['accuracy'])
    
    
    #model.compile(loss='mae', optimizer='adam',  metrics=['categorical_accuracy'])
    model.summary()

    # train.
    model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs= 200, callbacks = c_backs)
    
    model.save('BiGRU+attention_bsl_model_w.h5')

    # test save/reload model.
    pred1 = model.predict(X_train)
    model.save('BiGRU+attention_bsl_model_w.h5')
    model_h5 = load_model('BiGRU+attention_bsl_model_w.h5', custom_objects={'Attention': Attention})
    pred2 = model_h5.predict(X_train)
    np.testing.assert_almost_equal(pred1, pred2)
    print('Success.')


if __name__ == '__main__':
    main()

################### Evaluation using Confusion Matrix and Accuracy ################

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

model_h5 = load_model('BiGRU+attention_bsl_model.h5', custom_objects={'Attention': Attention})
yhat = model_h5.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
print(multilabel_confusion_matrix(ytrue, yhat))
print(accuracy_score(ytrue, yhat))
