# Import Dependecies

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

# Keypoints using mediapipe Holistic

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
def mediapipe_detection(image, model):
    """Makes detections on the image.

    Parameters
    ----------
    image : frame
        Frame from cv2.VideoCapture
    model : 
        Model used for making detections

    Returns
    -------
    image : frame
        Frame from cv2.VideoCapture
    results : mediapipe.python.solution_base.SolutionOutputs
        Results from model
    """
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image) # Make prediction.
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [None]:
def draw_landmarks(image, results):
    """Visualizes detections(landamrks) on image.

    Parameters
    ----------
    image : 
        Image on which visualization is performed
    results : mediapipe.python.solution_base.SolutionOutputs
        Results from model
    """
    #mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections


In [None]:
cap = cv2.VideoCapture(0)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_landmarks(image, results)

        # Show
        cv2.imshow('OpenCV Feed', image)

        # Break
        if cv2.waitKey(10) & 0xFF == ord('s'):
            break
    cap.release()
    cv2.destroyAllWindows()


In [None]:
draw_landmarks(frame, results)

In [None]:
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

# Extract Keypoints Values

In [None]:
# Both hands have 21 landmarks - key points on themself.
len(results.right_hand_landmarks.landmark)

In [None]:
# Pose has 33 landmarks.
len(results.pose_landmarks.landmark)

In [None]:
# Face has 468 landmarks but here I will not using them.
len(results.face_landmarks.landmark)

In [None]:
# All of them have 3 values x,y and z but pose has visibility also.

In [None]:
def extract_keypoints(results):
    """Takes and returns all values from results concatenated.
        If some of body parts are not detected by camera than results do not have values for it,
        therefore error occured if we try to access it. For that reason we set all zeros for its value.

    Parameters
    ----------
    results : mediapipe.python.solution_base.SolutionOutputs
        Results of mediapipe model

    Returns
    -------
    np.array
        a np.array of all values of left hand and right hand from results
    """

    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    #return np.concatenate([pose, lh, rh])
    return np.concatenate([lh, rh]) # Using only landmarks from hands for better accuracy

In [None]:
result_test = extract_keypoints(results)

In [None]:
len(result_test)

# Setup Folders for Collection

In [None]:
"""Setting folders for collection for every word, action.
For each word, 30 videos will be collected.
Each video will be 75 frames long.
"""

In [None]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data')

# Actions that we detect
actions = np.array(['ja', 'ti', 'moj', 'tvoj', 'mi', 'vi', 'oni', 'svi', 'niko',
                   'uzmi', 'daj', 'ostavi', 'nemoj', 'mogu', 'nemogu', 'imam', 'nemam',
                   'nista', 'kako', 'zasto', 'treba', 'netreba', 'zdravo', 'dobardan',
                   'staradite', 'kakoste', 'staimanovo', 'hvala', 'izvinite', 'dovidjenja',
                   'zaomije', 'hocu', 'necu'])

# Thirty videos worth of data
no_sequences = 30

# Each video is going to be 75 frames in length (2.15 sec, on 30fps)
sequence_length = 75

In [None]:
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

# Collect Keypoint Values for Training and Testing

In [None]:
"""Collecting keypoint values for every action in every video.
For every video, there will npy files containing values for keypoints.
"""

In [None]:
cap = cv2.VideoCapture(0)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # Loop through actions
    for action in actions:
        # Loop through sequnces
        for sequence in range(no_sequences):
            # Loop through every frame, sequence length
            for frame_num in range(sequence_length):

                # Reed feed
                ret, frame = cap.read()

                # Makde detections
                image, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
                draw_landmarks(image, results)

                # Wait logic for every video
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), cv2.FONT_HERSHEY_SIMPLEX, 1,
                               (0,255,0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
                    # Show
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
                    # Show
                    cv2.imshow('OpenCV Feed', image)

                # Export keypoints

                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('s'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

# Preprocess Data and Create Labels and Features

In [None]:
"""Preprocessing data from the previous step.
Creating feature data and labels.
"""

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
sequences = [] # Feature data, X-data
labels = [] # Y-data, target

# Loop through actions
for action in actions:
    # Loop through videos
    for sequence in range(no_sequences):
        window = []
        # Loop through frames
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res) # Will have 75 arrays and each array has 258(126 if using just hands) values(keypoints)
        sequences.append(window) # Will have 990 videos (33 actions * 30 videos)
        labels.append(label_map[action])

In [None]:
X = np.array(sequences)

In [None]:
X.shape

In [None]:
labels_array = np.array(labels)

In [None]:
labels_array = np.reshape(labels_array, (990,1))

In [None]:
labels_array.shape

In [None]:
y = to_categorical(labels).astype(int) # Converts a class vector (integers) to binary class matrix.

In [None]:
y.shape

In [None]:
y[56].shape

In [None]:
# Saving X and y for following use.

In [None]:
np.save("X", X)

In [None]:
np.save("y", y)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

# Build and Train Neural Network

In [None]:
# Model is built and trained in Google Colab.

In [None]:
# ​RandomForestClassifier is used from sklearn.ensemble.

In [23]:
# Model is trained with data that was saved in previous step and saved with joblib library.

# Load Model and Weights

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
import sklearn

In [None]:
import joblib

In [None]:
joblib.__version__

In [None]:
model = joblib.load("RFC_2.joblib")

# Make Predictions

In [None]:
X = np.load("X1.npy")

In [None]:
X.shape

In [None]:
y = np.load("y1.npy")

In [None]:
X = X.reshape(990, 9450) # 75 * 126 = 9450

In [None]:
X.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [None]:
val_predictions = model.predict(X_test)

In [None]:
actions[np.argmax(val_predictions[4])]

In [None]:
actions[np.argmax(y_test[4])]


# Evaluation using Confusion Matrix and Accuracy

In [None]:
"""Compute a confusion matrix for each class or sample.
Compute class-wise multilabel confusion matrix to evaluate
the accuracy of a classification, and output confusion matrices 
for each class or sample.

Accuracy classification score.
In multilabel classification, this function computes subset accuracy.
"""

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
yhat = model.predict(X_test)

In [None]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
accuracy_score(ytrue, yhat)

# Test in Real Time

In [None]:
# Detection variables
sequence = []
sentence = [] # Sentence that will be shown, that contains predicted words(last five words).
threshold = 0.7

cap = cv2.VideoCapture(0)
# Set mediapipe mode
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detection
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_landmarks(image, results)

        # Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-75:]   # Last 75 frames

        
        # Predict
        if len(sequence) == 75:
            arr = np.expand_dims(sequence, axis=0) # Shape (1, 75, 258)  (1, 75, 126)
            arr = np.reshape(arr, (1, arr.size))   # Shape (1, 19350) (1, 9450), input shape for the model must be 2D
            res = model.predict(arr)[0]
            #print(actions[np.argmax(res)])
            

        # Vizualize
            if res[np.argmax(res)] > threshold:
                if len(sentence) > 0:
                    if actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])
                else:
                    sentence.append(actions[np.argmax(res)])
            if len(sentence) > 5:
                sentence = sentence[-5:]


        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)



        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('s'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()