# 1) Install and Import dependencies

In [None]:
!pip install mediapipe matplotlib numpy tensorflow --user

In [None]:
import os
import cv2
import numpy as np
import mediapipe as mp
import tensorflow as tf
from tensorflow import keras as tfk
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, ConfusionMatrixDisplay

# 2) Create folders

In [None]:
# Set the path where data will be stored
path = os.path.join('Data')

# Create a list of sign names that best describe your gestures 
signs = ['Armor', 'Curse', 'Nothing', 'Psyonic', 'Shadow']

# Create a list of augmentations that will be used on the original camera feed
augmentations = ['Original', 'Saturation', 'Hue', 'Contrast', 'Brightness']

# Set a number of examples for each sign
len_sentence = 100

# Set a number of frames for each example
len_sequence = 30

# Extract a number of predictable classes
num_classes = len(signs)

In [None]:
# Iterate through every sign, augmentation, and sentence number
for sign in signs:
    for augmentation in augmentations:
        for sentence in range(1, len_sentence+1):
            
            # Make a path for an individual sentence (example)
            sentence_path = os.path.join(path, sign, augmentation, str(sentence))
                 
            try:
                
                # Create a sentence_path directory if it does not exist 
                os.makedirs(sentence_path)

            except OSError as error:
                
                # If the directory already exists it prints an error
                print(error)

# 3) Collect image data

In [None]:
def process_image(image, holistic):
    """
    Processes an image and detects facial, pose, and hand landmarks. 
    
    Arguments:
    image -- np.array, from which landmarks get detected 
    holistic -- MediaPipe Holistic model that detects landmarks from an image
    
    Returns:
    landmarks -- output from holistic that contains face, pose, and hand position
    """
    
    # Make our np.array immutable
    image.flags.writeable = False
    
    # Convert image from BGR to RGB color order
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Process image to get landmarks
    landmarks = holistic.process(image)
    
    # Make an array mutable again
    image.flags.writeable = True
    
    return landmarks

In [None]:
def extract_hand_landmarks(landmarks):
    """
    Pulls out the hand landmarks only.
    Normally, if a hand is in a frame, its landmarks should not be None.
    If hand's landmarks were not detected, we set them all equal to zero.
    
    Arguments:
    landmarks -- output from a holistic model with positions of face, pose, and hands
    
    Returns:
    hands -- list of values for left and right hand landmarks [shape = (126, )]
    """
    
    # Get x,y and z values for every landmark out of 21 for each hand
    # If landmarks are None - create an np.array filled with zeros (21 points with 3 coordinates)
    right_hand = np.array([[l.x, l.y, l.z] for l in landmarks.right_hand_landmarks.landmark]).flatten() if not landmarks.right_hand_landmarks == None else np.zeros(21*3)    
    left_hand = np.array([[l.x, l.y, l.z] for l in landmarks.left_hand_landmarks.landmark]).flatten() if not landmarks.left_hand_landmarks == None else np.zeros(21*3)
    
    hands = np.concatenate([right_hand, left_hand])
    return hands

In [None]:
def augment_image(image):
    """
    Applies different augmentation techniques to image.
    
    Arguments:
    image -- feed from webcam represented as np.array
    
    Returns:
    augmented_images -- list, which contains the original image and its augmented version
    """
    
    saturated = np.array(tf.image.random_saturation(image, 0.75, 1.75))
    hued = np.array(tf.image.random_hue(image, 0.1))
    contrasted = np.array(tf.image.random_contrast(image, 0.75, 1.5))
    brightnessed = np.array(tf.image.random_brightness(image, 0.2))
    
    # Create a list with all versions of the given image
    augmented_images = [image, saturated, hued, contrasted, brightnessed]
    
    return augmented_images

In [None]:
def collect_sign_data(path, sign, augmentations, len_sentence, len_sequence):
    """
    Collects, processes, and saves an image dataset.
    When particular key is pressed, it starts to collect every frame.
    To every collected frame applies data augmentation techniques to expand the dataset.
    When sentence is formed (collected len_sequence number of frames), it saves the data to the corresponding folder.
    
    Arguments:
    path -- path, which indicates where to save the data
    sign -- action name, for which data is collecting 
    augmentations -- list of augmentations to apply to an image
    len_sentence -- number of examples for each sign
    len_sequence -- number of frames for each sentence (example)
    
    Returns:
    None
    """
    
    # Indicates when to start collecting the data
    process = False
    
    # Checks when a number of examples is reached
    sentence_cnt = 1
    
    # List to store the augmented images 
    images_sentence = []
    
    # Specify a webcam device
    cap = cv2.VideoCapture(0)
    
    # Set up the MediaPipe Holistic model
    with mp.solutions.holistic.Holistic(min_detection_confidence=0.75, min_tracking_confidence=0.75) as holistic:

        # Until the capturing device is not shut down and we did not reach necessary amount of sentences (examples)
        while cap.isOpened() and sentence_cnt <= len_sentence:

            # Get feed from the webcam
            _, frame = cap.read()       
            
            # Start gathering the data when 'p' is pressed
            if cv2.waitKey(10) & 0XFF == ord('p'):                   
                process = True
                    
            
            if process:
                
                # Augment the original image and save it with other augmented versions to the sentence list 
                images = augment_image(frame)
                images_sentence.append(images) 
                
                # When frames are currently collecting
                if len(images_sentence) < len_sequence-1:
                    cv2.putText(frame, "Collecting samples [{} out of {}]".format(sentence_cnt, len_sentence), 
                            (0, 20), cv2.FONT_HERSHEY_PLAIN, 1.5, (0, 171, 255), 2, cv2.LINE_AA)
                     
                else:
                    cv2.putText(frame, "Saving...", (0, 20), cv2.FONT_HERSHEY_PLAIN, 1.5, (0, 171, 255), 2, cv2.LINE_AA)
                  
            # When gathered images are enough to form a sentence
            if len(images_sentence) == len_sequence:
                    
                # Iterate through every augmented image and the corresponding augmentation name
                for image, augmentation in zip(images, augmentations):
                    
                    # Get landmarks from an image
                    landmarks = process_image(image, holistic)
                    
                    # Extract hand landmarks
                    image_landmarks = extract_hand_landmarks(landmarks)
                    
                    # Create a path where to save the sentence of certain augmentation
                    image_path = os.path.join(path, sign, augmentation, str(sentence_cnt), str(images_cnt+1))
                    
                    # Save the data using the specified path
                    np.save(image_path, image_landmarks)
                    
                # Reset variables' values
                sentence_cnt, process, images_sentence = sentence_cnt+1, False, []
            
            # Break from the while loop if necessary
            if cv2.waitKey(10) & 0XFF == ord('q'):
                break
            
            # Show the frame in a window and an actual sign in a window's name 
            cv2.imshow("Sign - '{}'".format(sign), frame)
            
            
        # Release the capturing device and destroy the output window    
        cap.release()
        cv2.destroyAllWindows()

In [None]:
# Iterate through every sign and collect the corresponding data
for sign in signs:
    collect_sign_data(path, sign, augmentations, len_sentence, len_sequence)

# 4) Load image dataset

In [None]:
# Create lists to store image data and labels
features, labels = [], []

# Iterate through every sentence (example)
for label, sign in enumerate(os.listdir(path)):
    for augmentation in augmentations:
        for sentence in range(1, len_sentence+1):

            # Create an empty list to store the sentence data
            example = []
            for sequence in range(1, len_sequence+1):

                # Load every frame within the sentence and append it to the sentence list
                frame = np.load(os.path.join(path, sign, augmentation, str(sentence), str(sequence) + '.npy')).flatten()
                example.append(frame)

            # Append every sentence and its label to the appropriate lists 
            features.append(example)
            labels.append(label)

In [None]:
# Convert the features list to the np.array type
X = np.array(features)

# Convert the labels list to the sparse matrix (in each column 1 equals to true label)
Y = tfk.utils.to_categorical(labels).astype(int)

In [None]:
# Split the dataset into train and test data
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size = 0.1)

# 5) Create Sign Recognition model

In [None]:
# Create Keras Sequential model
model = tfk.Sequential()

# Input Layer with the shape (30, 126), where 126 equals to number of hand landmarks (21 points * 3 coordinates * 2 hands) 
model.add(tfk.layers.InputLayer(input_shape=(len_sequence, 126)))

# Long-Short Term Memory layer with applying dropout to prevent overfitting
model.add(tfk.layers.LSTM(units=32, dropout=0.1, return_sequences=True))
model.add(tfk.layers.LSTM(units=64, return_sequences=True))
model.add(tfk.layers.LSTM(units=64, dropout=0.15, return_sequences=True))
model.add(tfk.layers.LSTM(units=32, dropout=0.1, return_sequences=False))

# Dense (Fully-connected) layer with 'relu' activation function
model.add(tfk.layers.Dense(units=32, activation='relu'))
model.add(tfk.layers.Dense(units=16, activation='relu'))

# Dense layer with number of units equal to the number of signs (classes we want to predict) and 'softmax' activation function 
model.add(tfk.layers.Dense(units=num_classes, activation='softmax'))

In [None]:
# Check the model's acrhitecture information
model.summary()

In [None]:
def learning_rate_decay(epoch, learning_rate):
    """
    Decreases learning_rate when certain number of epochs is reached.
    
    Arguments:
    epoch -- iteration number that train data was passed through the model when training
    learning_rate -- value, which controls the length of 'steps' that model takes  
    
    Returns:
    learning_rate -- appropriate epoch-dependent value of learning_rate
    """
    if epoch % 9 == 0:
        return learning_rate * 0.75
    
    return learning_rate

In [None]:
# Allow to apply the 'learning_rate_decay' function when training the model
scheduler = tfk.callbacks.LearningRateScheduler(learning_rate_decay)

# Set an optimizer to use when training the model
optimizer = tfk.optimizers.Adam(learning_rate=5e-4)

In [None]:
# Group layers into trainable object with specific parameters
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# Train the compiled model
model.fit(X_train, Y_train, batch_size = 16, epochs=100, callbacks=[scheduler])

# 6) Save and Load model

In [None]:
# Set a path where the trained model will be saved
model_path = 'model.h5'

In [None]:
# Save the model
model.save(os.path.join(model_path))

In [None]:
# Load the model
model = tfk.models.load_model(model_path)

# 7) Evaluate model performance

In [None]:
def get_ytrue_yhat(X, Y, model):
    """
    Calculates and transforms true and predicted labels.
    Previously trained model is going to make predictions (determine labels) based on data X.
    Labels Y are needed to get the index of the correct label from each instance.
    
    Arguments:
    X -- Data, from which predictions are made
    Y -- True labels assigned to each data example 
    model -- trained object that makes predictions
    
    Returns:
    ytrue -- extracted indexes of true labels
    yhat -- extracted indexes of predicted by model labels
    """
    
    # Use the model to predict labels using the provided data
    yhat = model.predict(X)
    
    # Extract label indexes with the highest probability
    ytrue = np.argmax(Y, axis = 1).tolist()
    yhat = np.argmax(yhat, axis = 1).tolist()
    
    return ytrue, yhat

In [None]:
def display_confusion_matrix(ytrue, yhat, num_classes):
    """
    Creates and displays a confusion matrix to evaluate results.
    
    Arguments:
    ytrue -- indexes of true lables
    yhat -- indexes of predicted by model labels
    num_classes -- number of classes that the model is trained to predict
    
    Returns:
    None
    """
    
    # Create a confusion matrix with 5 classes
    cm_train = confusion_matrix(ytrue, yhat, labels=np.arange(num_classes))
    
    # Create a visualization for the confusion matrix
    cmd_train = ConfusionMatrixDisplay(cm_train)
    
    # Display the results
    cmd_train.plot()
    plt.show()

In [None]:
# Calculate ytrue and yhat based on the training data
ytrue_train, yhat_train = get_ytrue_yhat(X_train, Y_train, model)

# Visualize the confusion matrix to evaluate the results
display_confusion_matrix(ytrue_train, yhat_train, num_classes)

In [None]:
# Calculate the model's accuracy based on training data
acc_train = accuracy_score(ytrue_train, yhat_train)
print(acc_train)

In [None]:
# Calculate ytrue and yhat based on testing data
ytrue_test, yhat_test = get_ytrue_yhat(X_test, Y_test, model)

# Visualize the confusion matrix to evaluate the results
display_confusion_matrix(ytrue_test, yhat_test, num_classes)

In [None]:
# Calculate the model's accuracy based on testing data
accuracy_score(ytrue_test, yhat_test)

# 8) Test model in real-time

In [None]:
def draw_landmarks(image, landmarks, mp_draw_utils, mp_holistic):
    """
    Annotates an image with facial, body, and hand landmarks. 
    
    Arguments:
    image -- feed from webcam represented as np.array
    landmarks -- output from a holistic model, with positions of face, pose, and hands
    mp_draw_utils -- MediaPipe Drawing Utils help to visualize landmarks
    mp_holistic -- MediaPipe Holistic model that detects landmarks from an image
    
    Returns:
    image -- image annotated with landmarks 
    """
    
    # Draw face landmarks
    mp_draw_utils.draw_landmarks(image = image,
                        landmark_list = landmarks.face_landmarks,
                        connections = mp_holistic.FACEMESH_CONTOURS,
                        landmark_drawing_spec = mp_draw_utils.DrawingSpec(color = (55, 129, 5), thickness=2, circle_radius=1),
                        connection_drawing_spec = mp_draw_utils.DrawingSpec(color = (187, 233, 157), thickness=2, circle_radius=1))
    
    # Draw pose landmarks
    mp_draw_utils.draw_landmarks(image = image,
                        landmark_list = landmarks.pose_landmarks,
                        connections = mp_holistic.POSE_CONNECTIONS,
                        landmark_drawing_spec = mp_draw_utils.DrawingSpec(color = (203, 83, 46), thickness=2, circle_radius=2),
                        connection_drawing_spec = mp_draw_utils.DrawingSpec(color = (228, 162, 142), thickness=4, circle_radius=2))
    # Draw right hand landmarks
    mp_draw_utils.draw_landmarks(image = image,
                        landmark_list = landmarks.right_hand_landmarks,
                        connections = mp_holistic.HAND_CONNECTIONS,
                        landmark_drawing_spec = mp_draw_utils.DrawingSpec(color = (0, 0, 0), thickness=2, circle_radius=2),
                        connection_drawing_spec = mp_draw_utils.DrawingSpec(color = (105, 105, 105), thickness=3, circle_radius=2))
    
    # Draw left hand landmarks
    mp_draw_utils.draw_landmarks(image = image,
                        landmark_list = landmarks.left_hand_landmarks,
                        connections = mp_holistic.HAND_CONNECTIONS,
                        landmark_drawing_spec = mp_draw_utils.DrawingSpec(color = (133, 72, 148), thickness=2, circle_radius=2),
                        connection_drawing_spec = mp_draw_utils.DrawingSpec(color = (219, 196, 225), thickness=3, circle_radius=2))
    
    return image 

In [None]:
def show_probs(prediction, signs, frame):
    """
    Visualizes the probabilities of each predictible class.
    All signs are stacked vertically in the left side of the frame.
    Each sign has its own colored rectangle. The longer the rectangle, the higher the probability.
    
    Arguments:
    prediction -- np.array() of predicted by model labels
    signs -- list of signs that the model is trying to classify
    frame -- np.array() feed from webcam
    
    Returns:
    frame -- annotated frame with visualized class probabilities
    """
    
    # List of RGB colors for each sign
    colors = [(245,117,16), (24,24,240), (89,216,106), (225, 34, 200), (55, 55, 55)]
    
    # Iterate through every class and its probability
    for index, prob in enumerate(prediction):
        
        # Create a colored rectangle based on sign's index and probability
        cv2.rectangle(frame, (0, 60 + index * 40), (int(prob * 100), 90 + index * 40), colors[index], -1)
        
        # Add the sign's name and calculate its appropriate location 
        cv2.putText(frame, signs[index], (0, 85 + index * 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return frame

In [None]:
def calculate_org(rect_length, rect_width, sign_prediction):
    """
    Finds where to put the sign's name so that it's right in the middle of the rectangle.
    The names of signs differ by their length, so it's required to consider that.
    
    Arguments:
    rect_length -- length of the rectangle
    rect_width -- width of the rectangle 
    sign_prediction -- name of the sign that needed to be shown in the center of a rectangle
    
    Returns:
    sign_x -- value of the X-axis where the sign's name should be placed 
    sign_y -- value of the Y-axis where the sign's name should be placed
    """
    
    # Calculate the text size based on the provided parameters 
    sign_size = cv2.getTextSize(shown_sign, cv2.FONT_HERSHEY_PLAIN, 2.5, 3)[0]
    
    # Calculate the appropriate coordinates to place the given text  
    sign_x = (rect_length - int(1.5* sign_size[0])) // 2
    sign_y = (rect_width + sign_size[1]) // 2

    return sign_x, sign_y

In [None]:
# Set MediaPipe Holistic, which detects landmarks on a detected person
mp_holistic = mp.solutions.holistic

# Set MediaPipe drawing utils, which draws bounding boxes and keypoints on the image
mp_draw_utils = mp.solutions.drawing_utils

In [None]:
# Store a list of frames, which will form a sentence
sequence = []

# Store predictions made by a trained model on a provided sentence 
predictions = []

# Set up a lower limit of prediction confidence
threshold = 0.85

# Store information about the most recent prediction
sign_prediction = ""

In [None]:
# Set the capturing device 
cap = cv2.VideoCapture(0)

# Set up the MediaPipe Holistic model
with mp.solutions.holistic.Holistic(min_detection_confidence=0.75, min_tracking_confidence=0.75) as holistic:
    
    # Until the capturing device is not shut down
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Get landmarks from the frame
        landmarks = process_image(frame, holistic)
        
        # Draw landmarks
        frame = draw_landmarks(frame, landmarks, mp_draw_utils, mp_holistic)
        
        # Extract hand landmarks and append them to the sequence list in the correct order
        keypoints = extract_hand_landmarks(landmarks)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        # If enough frames were gathered to form a sentence
        if len(sequence) == 30:
            
            # Make a prediction and get the index with the highest probability
            prediction = model.predict(np.expand_dims(sequence, axis=0))[0]
            prediction_index = np.argmax(prediction)
            predictions.append(prediction_index)
            
            # If the last 15 predictions have the same label 
            if np.all(np.array(predictions[-15:])==prediction_index):
                
                # And their probability passes the confidence level 
                if prediction[prediction_index] > threshold:
                    
                    # And it is not the most recent predicted sign
                    if signs[prediction_index] != sign_prediction:
                        
                        # Then assign a new final prediction 
                        sign_prediction = signs[prediction_index] if signs[prediction_index] != "Nothing" else ""
    
            # Vizualize the probabilities of each gesture
            frame = show_probs(prediction, signs, frame)
              
        # Calculate the location of the last predicted action and correspondingly annotate the image        
        sign_x, sign_y = calculate_org(640, 40, sign_prediction)
        cv2.rectangle(frame, (0,0), (640, 40), (0, 171, 255), -1)
        cv2.putText(frame, ' '.join(sign_prediction), (sign_x, sign_y), 
                       cv2.FONT_HERSHEY_PLAIN, 2.5, (255, 255, 255), 3, cv2.LINE_AA)
        
        # Create a window to output the webcam feed
        cv2.imshow('OpenCV Feed', frame)

        # Break from a loop by pressing 'q' if necessary
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    
    # Release the capturing device and destroy the webcam output window
    cap.release()
    cv2.destroyAllWindows() 