# 1. Import Libraries:

In [2]:
from matplotlib import pyplot as plt
import mediapipe as mp
import numpy as np
import time
import cv2
import os

In [3]:
text_background = (198, 63, 88)  # PURPLE
corner_color = (53, 53, 249)     # RED
text_color = (239, 239, 239)     # WHITE
border_color = (61, 147, 8)      # GREEN

# 5. Setup Folders for Collection:

In [4]:
DATA_PATH = os.path.join("Data")
actions = np.array(["You", "Yes", "WhatAreYouDoing", "TryBeing", "ToMeet", "ThankYou", "TakeCare", "SameAsYou", "Question", "Point", "Nothing", "IHear", "HowAreYou", "Hello", "Bye", "Good", "Busy", "_BLANK"])
no_sequences = 20
sequence_length = 30

In [5]:
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

# 6. Collect MP Keypoints:

In [6]:
import mediapipe as mp
import numpy as np
import cv2

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands


def mediapipe_detections(frame, model):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame.flags.writeable = False
    results = model.process(frame)
    frame.flags.writeable = True
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    return frame, results


def draw_landmarks(frame, results, color):
    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)


def extract_keypoints(results):
    hand_landmarks = np.zeros(63)
    
    if results.multi_hand_landmarks:
        hand_landmarks = np.array(
            [
                [landmark.x, landmark.y, landmark.z]
                for landmark in results.multi_hand_landmarks[0].landmark
            ]
        ).flatten()

    return hand_landmarks

In [8]:
mp_drawing_styles = mp.solutions.drawing_styles
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(max_num_hands=1)

stream_url = "http://192.168.169.196:81/stream"
cap = cv2.VideoCapture(stream_url)

no_frames_counter = 0
no_sequences_counter = 0
index = 12                        # Curr Action  : HowAreYou
current_action = actions[index]   # Done Actions : You, Yes, WhatAreYouDoing, TryBeing, ToMeet, ThankYou, TakeCare, SameAsYou, Question, Point, Nothing, IHear, 
                                  # Correction   : WhatAreYouDoing, ToMeet, ThankYou, SameAsYou, IHear

while True:
    _, image = cap.read()
    
    image, results = mediapipe_detections(image, hands)
    draw_landmarks(image, results, corner_color)
    right_hand = extract_keypoints(results)
    # print(hand_landmarks)
    
    key = cv2.waitKey(1) & 0xFF
    
    if key == ord("s"):
        npy_path = os.path.join(DATA_PATH, current_action, str(no_sequences_counter), f"{no_frames_counter}.npy")
        np.save(npy_path, right_hand)
        no_frames_counter += 1
        if no_frames_counter == sequence_length:
            no_frames_counter = 0
            no_sequences_counter += 1
            if no_sequences_counter == no_sequences:
                break
    
    cv2.putText(image, f"Collecting Frames for '{current_action}'", (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 0, 0), 2, cv2.LINE_AA)
    cv2.putText(image, f"Video Num: {no_sequences_counter}", (15, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 0), 2, cv2.LINE_AA)
    cv2.putText(image, f"Frame Num: {no_frames_counter}", (15, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.imshow("Image", image)
    
    if key == ord("n"):
        cv2.imwrite(f"{current_action}-image-{no_frames_counter}-{no_sequences_counter}.jpg", image)
    
    if key == ord("q"):
        break
    
cap.release()
cv2.destroyAllWindows()


In [None]:
cap.release()
cv2.destroyAllWindows()

# 7. Preprocess Data and Create Labels and Features:

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), f"{frame}.npy"))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
X = np.array(sequences)
print(X.shape)

In [None]:
y = to_categorical(labels).astype(int)

In [None]:
y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

# 8. Build and Train LSTM Neural Network:

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
log_dir = os.path.join("logs")
tb_callback = TensorBoard(log_dir=log_dir) # for moritoring the NN training

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation="relu", input_shape=(5, 63)))
model.add(LSTM(128, return_sequences=True, activation="relu"))
model.add(LSTM(64, return_sequences=False, activation="relu"))
model.add(Dense(64, activation="relu"))
model.add(Dense(32, activation="relu"))
model.add(Dense(2, activation="softmax"))

In [None]:
res = [.7, .3]
actions[np.argmax(res)]
actions.shape[0]

In [None]:
X_train.shape
y_train.shape

In [None]:
model.compile(optimizer="Adam", loss="categorical_crossentropy", metrics=["categorical_accuracy"])

In [None]:
model.fit(X_train, y_train, epochs=1000, callbacks=[tb_callback])

In [None]:
model.load_weights("action.keras")

# 9. Make Predictions:

In [None]:
model.summary(line_length=100)

In [None]:
results = model.predict(X_test)

In [None]:
actions[np.argmax(results[0])]

In [None]:
actions[np.argmax(y_test[0])]

# 10. Save Weights:

In [None]:
model.save("action.keras")

# 11. Evaluation using Confusion Matrix and Accuracy:

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
yhat = model.predict(X_test)

In [None]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
accuracy_score(ytrue, yhat)

In [None]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

# 12. Test Real Time:

In [None]:
sequence = []
sentence = []
predictions = []
threshold = 0.7

cap = cv2.VideoCapture(0)

while True:
    _, frame = cap.read()

    frame, results = mediapipe_detections(frame, holistic)
    draw_landmarks(frame, results, corner_color)

    right_hand = extract_keypoints(results)
    sequence.append(right_hand)
    sequence = sequence[-5:]
    
    if len(sequence) == 5:
        res = model.predict(np.expand_dims(sequence, axis=0))[0]
        predictions.append(np.argmax(res))
    
        if np.unique(predictions[-10:])[0] == np.argmax(res):
            if res[np.argmax(res)] > threshold:
                if len(sentence) > 0:
                    if actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])
                        print(" ".join(sentence))
                else:
                    sentence.append(actions[np.argmax(res)])
            
    frame = prob_viz(res, actions, frame, colors)
    cv2.imshow("Frame", frame)

    key = cv2.waitKey(1) & 0xFF
    if key == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()
