In [1]:
# importing dependencies and stuff
import queue
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import tensorflow as tf
import pyvirtualcam as vcam
import pyttsx3
import pyaudio
import wave
from threading import Thread
import sounddevice as sd
import soundfile as sf
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from PIL import ImageFont, ImageDraw, Image
from multiprocessing import Process



In [2]:
# Creating mediapipe init variables

mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

In [3]:
# Detecting mediapipe values per frame

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [4]:
# Drawing mediapipe values per frame

def draw_landmarks(image, results):
    # mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
    #                          mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
    #                          mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))
    

In [5]:
# # Program to capture video, detect and render landmarks and output video //MAINLY USED FOR DEBUGGING STUFF
# cap = cv2.VideoCapture(0)
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#     while cap.isOpened():
#         ret, frame = cap.read()

#         image, results = mediapipe_detection(frame, holistic)
#         print(results)

#         draw_landmarks(image, results)

#         image = cv2.flip(image, 1)
#         cv2.imshow('Feed', image)
        
#         if cv2.waitKey(10) & 0xFF == ord('q'):
#             break
#     cap.release()
#     cv2.destroyAllWindows()

In [6]:
# Extract every landmark value and concatenating it into one array
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [7]:
# creating a folder to store action data for training, reading actions from list and initialization stuff

DATA_PATH = os.path.join('MP_Data') 

actions = np.array(['my', 'name', 'hello', 'yes', 'IDLE'])
# actions1 = np.array(['my', 'name', 'hello', 'yes', 'NULL'])

no_sequences = 120 # number of videos to record

sequence_length = 30 #length of each video

In [8]:
# Creating the directory structure

for action in actions: 
    for sequence in range(no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [9]:
# Creating an audio file for each action

engine = pyttsx3.init()
engine.setProperty('rate', 125)

for action in actions:
    engine.save_to_file(action, f"Audio/{action}.wav")
    engine.runAndWait()
    engine.setProperty('voice', engine.getProperty('voices')[1].id)

In [18]:
# data collection Function for each action if action has not been collected yet

cap = cv2.VideoCapture(0)
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    for action in actions:
        if os.path.isfile(os.path.join(DATA_PATH, action, "0", "0.npy")):
            continue
        for sequence in range(no_sequences):
            for frame_num in range(sequence_length):

                ret, frame = cap.read()

                image, results = mediapipe_detection(frame, holistic)

                draw_landmarks(image, results)
                
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)
                
                if cv2.waitKey(10) & 0xFF == ord('q'): #press q to quit after finished training action safely
                    break
        if cv2.waitKey(0) & 0xFF == ord('n'): #press n to go to next action after finished training current one
            pass
        if cv2.waitKey(0) & 0xFF == ord('q'): #press q to quit after finished training action safely
            break
    
    cap.release()
    cv2.destroyAllWindows()

In [7]:
# Cell to create training data for the model

label_map = {label:num for num, label in enumerate(actions)}
sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

x = np.array(sequences)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.05)
len(X_train)

570

In [10]:
# Creating the model's structure and stuff

log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

  super().__init__(**kwargs)


In [11]:
# Loading saved weights if u want

model.load_weights('joyV2.keras')

  saveable.load_own_variables(weights_store.get(inner_path))


In [33]:
# Displays model stuff

model.summary()

In [None]:
# IMPORTANT: THIS ACTUALLY TRAINS THE MODEL

model.fit(X_train, y_train, epochs=400, callbacks=[tb_callback], batch_size=10)

Epoch 1/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 20ms/step - categorical_accuracy: 0.2011 - loss: 1.9957
Epoch 2/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - categorical_accuracy: 0.2023 - loss: 1.5701
Epoch 3/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 19ms/step - categorical_accuracy: 0.4049 - loss: 1.4877
Epoch 4/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - categorical_accuracy: 0.3297 - loss: 1.5059
Epoch 5/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - categorical_accuracy: 0.2041 - loss: 1.5826
Epoch 6/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step - categorical_accuracy: 0.3727 - loss: 1.3031
Epoch 7/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 19ms/step - categorical_accuracy: 0.2034 - loss: 1.5733
Epoch 8/400
[1m57/57[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 

In [27]:
# dont touch this
del model

In [None]:
# Checking if it can predict 2 frames
res = model.predict(X_test)

actions[np.argmax(y_train[1])]

actions[np.argmax(y_test[1])]

In [37]:
# Saves the model if u want

model.save("joyV2.keras")

In [12]:
# Checks accuracy of model

yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

multilabel_confusion_matrix(ytrue, yhat)

accuracy_score(ytrue, yhat)

NameError: name 'X_test' is not defined

In [12]:
# Function to play audio of subtitles

def AudioPlay(text):
  data, fs = sf.read(text)
  sd.play(data, fs)
  sd.wait()


In [15]:
# The final product

sequence = []
sentence = []
sound = []
threshold = 0.90

font = ImageFont.truetype("Minecraftia-Regular.ttf", 32)



sd.default.samplerate = 22050
sd.default.device = 10
i = 0
cap = cv2.VideoCapture(0)



# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    with vcam.Camera(width=640, height=480, fps=30) as cam:
        while cap.isOpened():
    
            # Read feed
            ret, frame = cap.read()
            i += 1
            # Make detections
            image, results = mediapipe_detection(frame, holistic)

            # Draw landmarks
            draw_landmarks(image, results)
            image = cv2.flip(image, 1)

            # Prediction logic stuff
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            sequence = sequence[-30:]


            if len(sequence) == 30 and i % 2 == 0:
                res = model.predict(np.expand_dims(sequence, axis=0), verbose = 0)[0]                
                
            # Visualisation stuff
                if res[np.argmax(res)] > threshold and actions[np.argmax(res)] != 'IDLE': 
                        if len(sentence) > 0: 
                            if actions[np.argmax(res)] != sentence[-1]:
                                sentence.append(actions[np.argmax(res)])
                                sound.append(f'Audio/{sentence[-1]}.wav')

                                
                        else:
                            sentence.append(actions[np.argmax(res)]) 
                            sound.append(f'Audio/{sentence[-1]}.wav')
                if len(sentence) > 5: 
                    sentence = sentence[-5:]

            if i % 10 == 0 and len(sound) > 0:
                Thread(target=AudioPlay, args=(sound[-1],)).start()
                sound.pop(0)
                
            if i % 60 == 0 and len(sentence) > 0:
                sentence.pop(0)

            img_pil = Image.fromarray(image)
            draw = ImageDraw.Draw(img_pil)
            draw.text((0, 400), '\t\t'.join(sentence), font=font, fill=(255, 255, 255, 0))

            image = np.array(img_pil)
            
            # Show to screen
            cv2.namedWindow('OpenCV Feed', cv2.WINDOW_NORMAL)
            cv2.imshow('OpenCV Feed', image)
    
            # Output into Vcam
            cam.send(cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_RGB2BGR) )
            cam.sleep_until_next_frame()
            
            # Break
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

In [28]:
# Do this if fail
cap.release()
cv2.destroyAllWindows()

In [23]:
tf.config.list_physical_devices('GPU')

[]

In [21]:
model(np.expand_dims(sequence, axis=0))[0]  

<tf.Tensor: shape=(5,), dtype=float32, numpy=
array([3.5177823e-04, 1.8362563e-08, 9.9874401e-01, 8.9288887e-04,
       1.1181803e-05], dtype=float32)>

In [26]:
!pip install tensorflow==2.10.0


Defaulting to user installation because normal site-packages is not writeable


ERROR: Could not find a version that satisfies the requirement tensorflow==2.10.0 (from versions: 2.16.0rc0, 2.16.1, 2.16.2, 2.17.0rc0, 2.17.0rc1, 2.17.0)
ERROR: No matching distribution found for tensorflow==2.10.0

[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: C:\Users\ryanl\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.12_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip
