## Import

In [1]:
#!pip install tensorflow opencv-python mediapipe==0.10.11 scikit-learn matplotlib
#python 3.8.0

In [2]:
import numpy as np
import os
import cv2
from matplotlib import pyplot as plt
import time
import mediapipe as mp

## Using mediapipe (mp)

In [3]:
mp_holistic = mp.solutions.holistic #holistic model means that it will include hands, face, body detection
mp_drawing = mp.solutions.drawing_utils #for arranging landmarks

In [4]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #opencv take frames as BGR but mediapipe takes RGB therefore have to convert
    image.flags.writeable = False #prevent from changing image while mp.process (only readable)
    results = model.process(image) #process (detects hands face body)
    image.flags.writeable = True #make writeable again after mp.process
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) #convert the color again because opencv will display the image

    return image, results


def draw_landmarks(image, results):
    if results.pose_landmarks: #if body in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)

    if results.face_landmarks:#if face in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS)

    if results.left_hand_landmarks:#if left hand in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

    if results.right_hand_landmarks:#if right in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

def draw_landmarks_restyled(image, results):
    if results.pose_landmarks: #if body in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color=(128, 0, 255), thickness=1, circle_radius=1), #this is for landmarks
                                  mp_drawing.DrawingSpec(color=(255, 128, 0), thickness=1, circle_radius=1) #this is for connections of landmarks
                                  )

    if results.face_landmarks:#if face in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                                  mp_drawing.DrawingSpec(color=(128, 0, 255), thickness=1, circle_radius=1), #this is for landmarks
                                  mp_drawing.DrawingSpec(color=(255, 128, 0), thickness=1, circle_radius=1) #this is for connections of landmarks
                                  )
    if results.left_hand_landmarks:#if left hand in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color=(128, 0, 255), thickness=1, circle_radius=1), #this is for landmarks
                                  mp_drawing.DrawingSpec(color=(255, 128, 0), thickness=1, circle_radius=1) #this is for connections of landmarks
                                  )

    if results.right_hand_landmarks:#if right in the image, make draw landmarks
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color=(128, 0, 255), thickness=1, circle_radius=1), #this is for landmarks
                                  mp_drawing.DrawingSpec(color=(255, 128, 0), thickness=1, circle_radius=1) #this is for connections of landmarks
                                  )


def show_preprocessed_video(model):
    cap = cv2.VideoCapture(0)

    while cap.isOpened():
        ret, frame = cap.read() #take frame and ret returns True if camera read is sucsessfully done else False

        image, results = mediapipe_detection(frame, model) #detect hand face body

        draw_landmarks_restyled(image, results) #draw landmarks

        cv2.imshow("MediaPipe processed video", image) #display frame 


        if cv2.waitKey(1) & 0xFF == ord("q"):#q = break and wait 1ms
            break

    cap.release() #relase the cam
    cv2.destroyAllWindows() #close the display screen 



# For displaying your video.
holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) #create the model that recognize face, hands, body
show_preprocessed_video(holistic)


## Extracting landmarks

In [5]:
#this function will extact the landmarks from results.
#if there is no results it will create a numpy array that consist of
def extract_landmarks(results):
    pose = np.array([[result.x, result.y, result.z, result.visibility] for result in  results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(132)
    face = np.array([[result.x, result.y, result.z] for result in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404)
    lh = np.array([[result.x, result.y, result.z] for result in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(63)
    rh = np.array([[result.x, result.y, result.z] for result in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(63)
    return np.concatenate([pose, face, lh, rh])

## Setup folders for datas

In [6]:
data_path = os.path.join("mp_data") #the path that videos will be saved

actions = np.array(["hello", "thanks", "I love you"]) #actions that will be detected by AI model

sequences = 30 #number of videos

sequence_length = 30 #number of frames in a video

In [18]:
for action in actions: #for every move
    for sequence in range(1, sequences+1): #video
        try:
            os.makedirs(os.path.join(data_path, action, str(sequence))) #create folders 30 folders for every move.
        except:
            pass

## Collecting Landmarks for Training and Testing

In [19]:
# this function will save frames as numpy arrays 
# so we will use this function for saving our videos 
# for collecting datas

def show_preprocessed_video_collect(model, actions, sequences, sequence_length):
    cap = cv2.VideoCapture(0)
    
    for action in actions: # loop actions
        ret, frame = cap.read() #take frame and ret returns True if camera read is sucsessfully done else False

        image, results = mediapipe_detection(frame, model) #detect hand face body
        draw_landmarks_restyled(image, results) #draw landmarks
        
        cv2.putText(image, "Starting Collection", (120, 200),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
        cv2.putText(image, "Collecting frames for {}".format(action), (15, 12),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
        cv2.imshow("MediaPipe processed video", image) #display frame 

        cv2.waitKey(5000) #wait 5 second before recording videos for new action

        for sequence in range(1, sequences+1): # loop videos
            ret, frame = cap.read() #take frame and ret returns True if camera read is sucsessfully done else False

            image, results = mediapipe_detection(frame, model) #detect hand face body
            draw_landmarks_restyled(image, results) #draw landmarks


            cv2.putText(image, "Starting Collection", (120, 200),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
            cv2.putText(image, "Collecting frames for {}, video number {}".format(action, sequence), (15, 12),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
            cv2.imshow("MediaPipe processed video", image) #display frame 

            cv2.waitKey(2000) #wait 2 second before starting a new video
        
            
            for frame_num in range(1, sequence_length+1): # loop frames
        
                ret, frame = cap.read() #take frame and ret returns True if camera read is sucsessfully done else False

                image, results = mediapipe_detection(frame, model) #detect hand face body

                draw_landmarks_restyled(image, results) #draw landmarks
                
                # with this if statement you can check if your video start collecting datas
                # or your video continiuing collecting datas
                # and thanks to cv2.waitKey(2000) program will wait 2 seconds betweeen every video

                cv2.putText(image, "Collecting frames for {}, video number {}".format(action, sequence), (15, 12),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)

                cv2.imshow("MediaPipe processed video", image) #display frame 
                    
                landmarks = extract_landmarks(results) #extracts landmarks for every frame

                landmarks_path = os.path.join(data_path, action, str(sequence), str(frame_num) + ".npy")

                np.save(landmarks_path, landmarks)



                if cv2.waitKey(1) & 0xFF == ord("q"):#q = break and wait 1ms
                    break

    cap.release() #relase the cam
    cv2.destroyAllWindows() #close the display screen 

In [20]:
show_preprocessed_video_collect(holistic, actions, sequences, sequence_length)

## Preprocess data

In [12]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [13]:
# make a map for training and testing data 
# the map will be look like this {'hello': 0, 'thanks': 1,...}
label_map = {label:num for num, label in enumerate(actions)} 

In [None]:
videos = []
labels = []

for action in actions:
    for sequence in range(1, sequences+1):
        frames = []
        for frame_num in range(1, sequence_length +1):
            np_frame = np.load(os.path.join(data_path, action, str(sequence), str(frame_num) + ".npy" )) # load the frame data that we created
            frames.append(np_frame) # add frames one by one
        videos.append(frames) # add one video (30 frames)
        labels.append(label_map[action]) # add action of that video (ex: {'hello': 0, 'thanks': 1,...} so that if action is "hello" it will add 0)


In [18]:
np.array(videos).shape #90 videos, 30 frames for every video, 1662 landmarks for every frames.

(90, 30, 1662)

In [20]:
x = np.array(videos)
x.shape

(90, 30, 1662)

In [24]:
y = to_categorical(labels).astype(int) # now labels will look like tihs (1, 0, 0) this is very important for training.
y.shape                                                               # (0, 1, 0)

(90, 3)

In [25]:
# This will train our data
# it will use 0.9 of data for training
# 0.1 of data for testing
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.1)

In [27]:
x_train.shape

(81, 30, 1662)

## Building and Training a LSTM (Long Short Term Memory) Neural Network

In [78]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [79]:
log_dir = os.path.join("logs")
tensorboard_callback = TensorBoard(log_dir=log_dir)

In [80]:
model = Sequential() # this will our neural network
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662))) #64 units and it will return a sequence (30 frame)
model.add(LSTM(128, return_sequences=True, activation='relu')) #128 units, normally activation is 'tanh' however since this is a small project relu is better
model.add(LSTM(64, return_sequences=False, activation='relu')) #64 units and return one vector not a sequence
model.add(Dense(64, activation='relu')) #Dense layer for Non-linearity learning 64 units
model.add(Dense(32, activation='relu')) #32 units
# This is very important
# activation='softmax' means that it will return probabilities
# for ex: hello: %80, thanks you: %5, I love you: %15 [0.8, 0.05, 0.15]
model.add(Dense(actions.shape[0], activation='softmax'))

In [81]:
# IMPORTANT: You can chande optimizer and metrics howoever do not change the loss function 
# since we are doing a multi-class classification model, we have to use 'categorical_crossentropy' for loss function!
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [82]:
model.fit(x_train, y_train, epochs=170, callbacks=[tensorboard_callback])

Epoch 1/170
Epoch 2/170
Epoch 3/170
Epoch 4/170
Epoch 5/170
Epoch 6/170
Epoch 7/170
Epoch 8/170
Epoch 9/170
Epoch 10/170
Epoch 11/170
Epoch 12/170
Epoch 13/170
Epoch 14/170
Epoch 15/170
Epoch 16/170
Epoch 17/170
Epoch 18/170
Epoch 19/170
Epoch 20/170
Epoch 21/170
Epoch 22/170
Epoch 23/170
Epoch 24/170
Epoch 25/170
Epoch 26/170
Epoch 27/170
Epoch 28/170
Epoch 29/170
Epoch 30/170
Epoch 31/170
Epoch 32/170
Epoch 33/170
Epoch 34/170
Epoch 35/170
Epoch 36/170
Epoch 37/170
Epoch 38/170
Epoch 39/170
Epoch 40/170
Epoch 41/170
Epoch 42/170
Epoch 43/170
Epoch 44/170
Epoch 45/170
Epoch 46/170
Epoch 47/170
Epoch 48/170
Epoch 49/170
Epoch 50/170
Epoch 51/170
Epoch 52/170
Epoch 53/170
Epoch 54/170
Epoch 55/170
Epoch 56/170
Epoch 57/170
Epoch 58/170
Epoch 59/170
Epoch 60/170
Epoch 61/170
Epoch 62/170
Epoch 63/170
Epoch 64/170
Epoch 65/170
Epoch 66/170
Epoch 67/170
Epoch 68/170
Epoch 69/170
Epoch 70/170
Epoch 71/170
Epoch 72/170
Epoch 73/170
Epoch 74/170
Epoch 75/170
Epoch 76/170
Epoch 77/170
Epoch 78

<keras.src.callbacks.History at 0x1dc0cbb0b80>

In [83]:
model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_18 (LSTM)              (None, 30, 64)            442112    
                                                                 
 lstm_19 (LSTM)              (None, 30, 128)           98816     
                                                                 
 lstm_20 (LSTM)              (None, 64)                49408     
                                                                 
 dense_18 (Dense)            (None, 64)                4160      
                                                                 
 dense_19 (Dense)            (None, 32)                2080      
                                                                 
 dense_20 (Dense)            (None, 3)                 99        
                                                                 
Total params: 596675 (2.28 MB)
Trainable params: 59667

## Predictions

In [85]:
prediction = model.predict(x_test)



In [86]:
# as you can see it predicts correctly
actions[np.argmax(prediction[3])]

'I love you'

In [87]:
actions[np.argmax(y_test[3])]

'I love you'

##### Save the model (weights of neural network)

In [88]:
model.save("sign_AI_model.h5")

  saving_api.save_model(


In [89]:
model.load_weights('sign_AI_model.h5')

## Real Time Test

In [106]:
seq = []
sentence = []
threshold = 0.7


cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read() #take frame and ret returns True if camera read is sucsessfully done else False

    image, results = mediapipe_detection(frame, holistic) #detect hand face body


    landmarks = extract_landmarks(results) #extract the landmarks
    seq.append(landmarks) #append landmarks of this frame
    seq = seq[-30:] #seq always keep last 30 frames

    if (len(seq) == 30): # if there is 30 frame
        # normally prediction.shape is something like (30, 1662) but model.predict waits (num_sequences, 30, 1662) 
        # therefore we need to expand_dims.
        prediction = model.predict(np.expand_dims(seq, axis=0))[0] 

        if prediction[np.argmax(prediction)] > threshold:
            if len(sentence) > 0:
                if actions[np.argmax(prediction)] != sentence[-1]:
                    sentence.append(actions[np.argmax(prediction)])
                    print(actions[np.argmax(prediction)])
            else:
                sentence.append(actions[np.argmax(prediction)])

        if len(sentence) > 5:
            sentence = sentence[-5:]

        cv2.rectangle(image, (0, 0), (640,40), (245, 117, 16), -1)
        cv2.putText(image, "".join(sentence), (300, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), cv2.LINE_AA)

    cv2.imshow("MediaPipe processed video", image) #display frame 


    if cv2.waitKey(1) & 0xFF == ord("q"):#q = break and wait 1ms
        break

cap.release() #relase the cam
cv2.destroyAllWindows() #close the display screen 


hello
I love you
hello
I love you
hello
I love you
