# 1. Install and Import Dependencies

In [None]:
pip install mediapipe opencv-python

In [2]:
pip install keras

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 23.2.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
pip install tensorflow tensorflow-gpu

In [None]:
pip install tensorflow --user

In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import mediapipe as mp
from keras.models import load_model

# 2. Keypoints using MP Holistic

In [2]:
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [3]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [4]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )
    
    

In [5]:
def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(21*3)
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(21*3)
    
    return np.concatenate([lh, rh, face, pose])

# 3.Create Folder

In [6]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data') 

# Actions that we try to detect
actions = np.array(['Xin chào', 'Cảm ơn', 'Xin lỗi', 'Tạm biệt'])

# Five videos worth of data
no_sequences = 5

# Videos are going to be 20 frames in length
sequence_length = 20

# Folder start
start_folder = 1

In [8]:
for action in actions: 
    # Create the action directory if it doesn't exist
    try: 
        os.makedirs(os.path.join(DATA_PATH, action))
    except FileExistsError:
        pass
    
    # Get the maximum existing sequence number in the action directory
    action_dir = os.path.join(DATA_PATH, action)
    if os.path.exists(action_dir):
        existing_sequences = [int(folder) for folder in os.listdir(action_dir) if folder.isdigit()]
        if existing_sequences:
            dirmax = max(existing_sequences)
        else:
            dirmax = 0
    else:
        dirmax = 0
    
    # Create new sequence directories
    for sequence in range(1, no_sequences+1):
        try: 
            os.makedirs(os.path.join(action_dir, str(dirmax+sequence)))
        except FileExistsError:
            pass

# 4. Capture data

In [14]:
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(start_folder, start_folder+no_sequences):
            # Loop through video length aka sequence length
            for frame_num in range(sequence_length):

                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
                draw_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                
                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

# 5. Preprocess Data and Create Labels and Features


In [7]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [8]:
label_map = {label:num for num, label in enumerate(actions)}

In [9]:
label_map

{'Xin chào': 0, 'Cảm ơn': 1, 'Xin lỗi': 2, 'Tạm biệt': 3}

In [10]:
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences

sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

x = np.array(sequences)


In [11]:
x.shape

(20, 20, 1629)

In [12]:
y = to_categorical(labels).astype(int)

In [13]:
y.shape

(20, 4)

In [14]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

In [15]:
X_test.shape

(4, 20, 1629)

# 6. Build And Train Model

In [16]:
import tensorflow as tf

In [55]:
# Initializing the CNN
cnn = tf.keras.models.Sequential()

# Note the input shape is the desired size of the image 64*64 with 3 bytes color
# Create the first Convolutional Layer
cnn.add(tf.keras.layers.Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=list(X_train.shape[1:3])))

# Create a Pooling Layer
cnn.add(tf.keras.layers.MaxPool1D(pool_size=2, strides=2))

# Create the second Convolutional Layer
cnn.add(tf.keras.layers.Conv1D(filters=32, kernel_size=3, activation='relu'))

# Add another Pooling Layer
cnn.add(tf.keras.layers.MaxPool1D(pool_size=2, strides=2))

# Flatten the results to feed into the CNN
cnn.add(tf.keras.layers.Flatten())

# Fully Connected Convolutional Neural Network with 128 neuron hidden layer
cnn.add(tf.keras.layers.Dense(units=64, activation= 'relu'  )) ### Choose Activation Function

# Creating the Output Layer
cnn.add(tf.keras.layers.Dense(units=actions.shape[0], activation= 'sigmoid'  )) ### Choose Activation Function

In [56]:
cnn.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_2 (Conv1D)           (None, 18, 32)            156416    
                                                                 
 max_pooling1d_2 (MaxPoolin  (None, 9, 32)             0         
 g1D)                                                            
                                                                 
 conv1d_3 (Conv1D)           (None, 7, 32)             3104      
                                                                 
 max_pooling1d_3 (MaxPoolin  (None, 3, 32)             0         
 g1D)                                                            
                                                                 
 flatten_1 (Flatten)         (None, 96)                0         
                                                                 
 dense_2 (Dense)             (None, 64)               

In [57]:
from keras.callbacks import EarlyStopping
cnn.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
early_stopping = EarlyStopping(monitor='categorical_accuracy', patience=30, mode='max', verbose=1)

In [58]:
cnn.fit(X_train, y_train, epochs=1000,  callbacks=[early_stopping])

Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000
Epoch 38/1000
Epoch 39/1000
Epoch 40/1000
Epoch 41/1000
Epoch 42/1000
Epoch 43/1000
Epoch 44/1000
Epoch 45/1000
Epoch 46/1000
Epoch 47/1000
Epoch 48/1000
Epoch 49/1000
Epoch 50/1000
Epoch 51/1000
Epoch 52/1000
Epoch 53/1000
Epoch 54/1000
Epoch 55/1000
Epoch 56/1000
Epoch 57/1000
Epoch 58/1000
Epoch 59/1000
Epoch 60/1000
Epoch 61/1000
Epoch 62/1000
Epoch 63/1000
Epoch 64/1000
Epoch 65/1000
Epoch 66/1000
Epoch 67/1000
Epoch 68/1000
Epoch 69/1000
Epoch 70/1000
Epoch 71/1000
Epoch 72/1000
E

<keras.src.callbacks.History at 0x2a181a22b90>

# 7.Test the Model

In [59]:
res = cnn.predict(X_test)



In [60]:
res

array([[2.5166386e-01, 9.4853270e-01, 1.7975153e-02, 7.4175811e-01],
       [1.9067880e-03, 9.6399450e-01, 5.0463128e-01, 4.2467171e-01],
       [9.6126978e-04, 9.8714983e-01, 1.1368436e-01, 7.0067739e-01],
       [9.9974316e-01, 1.8777589e-01, 2.9618504e-06, 9.8719841e-01]],
      dtype=float32)

In [61]:
print(actions[np.argmax(res[0])], actions[np.argmax(res[1])])

Cảm ơn Cảm ơn


In [62]:
print(actions[np.argmax(y_test[0])], actions[np.argmax(y_test[1])])

Cảm ơn Xin lỗi


In [63]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [64]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(res, axis=1).tolist()

In [65]:
multilabel_confusion_matrix(ytrue, yhat)

array([[[3, 1],
        [0, 0]],

       [[1, 1],
        [0, 2]],

       [[3, 0],
        [1, 0]],

       [[3, 0],
        [1, 0]]], dtype=int64)

In [66]:
accuracy_score(ytrue, yhat)

0.5

# 8. Test on Video

In [67]:
from scipy import stats

In [71]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        if num < len(colors):
            # Draw the rectangle using the corresponding color from the 'colors' list
            cv2.rectangle(output_frame, (0, 60 + num * 40), (int(prob * 100), 90 + num * 40), colors[num], -1)
        else:
            # If 'num' exceeds the valid range, use a default color (e.g., white)
            cv2.rectangle(output_frame, (0, 60 + num * 40), (int(prob * 100), 90 + num * 40), (255, 255, 255), -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [72]:
kp = np.load(os.path.join(DATA_PATH, action, str(1), "{}.npy".format(1)))

In [73]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.7

cap = cv2.VideoCapture('test.mp4')
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        
        # Draw landmarks
        draw_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        if keypoints.shape[0]==252:
            keypoints=np.zeros([kp.shape[0]])

        sequence.append(keypoints)
        sequence = sequence[-20:]
        
        if len(sequence) == 20:
            res = cnn.predict(np.expand_dims(sequence, axis=0))[0]       
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
Xin chào
X