# 1. Import and Install Dependencies

In [1]:
!pip install tensorflow opencv-python mediapipe scikit-learn 

In [2]:
import os
import cv2
import numpy as np
import mediapipe as mp

# 2. Keypoints using MP Holistic

Initialize MediaPipe Holistic model and drawing utilities

In [3]:
# 'mp_holistic' provides the holistic model which includes face, pose, and hand landmarks detection.
mp_holistic = mp.solutions.holistic

# 'mp_drawing' provides utility functions for drawing the detected landmarks on images.
mp_drawing = mp.solutions.drawing_utils

In [4]:
# Function to perform MediaPipe detection on an image
def mediapipe_detection(image, model):
    # Convert the image from BGR to RGB color space.
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Mark the image as not writable to improve performance by preventing unnecessary data copying.
    image.flags.writeable = False 
    
    # Process the image using the provided model to detect landmarks.
    results = model.process(image)
    
    # Mark the image as writable again for further operations.
    image.flags.writeable = True
    
    # Convert the image back from RGB to BGR color space.
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    # Return the processed image and the detection results.
    return image, results

In [5]:
# Function to draw landmarks on an image based on the detection results
def draw_landmarks(image, results):
    # If face landmarks are detected, draw them on the image.
    if results.face_landmarks:
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS)
    
    # If pose landmarks are detected, draw them on the image.
    if results.pose_landmarks:
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    
    # If left hand landmarks are detected, draw them on the image.
    if results.left_hand_landmarks:
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    
    # If right hand landmarks are detected, draw them on the image.
    if results.right_hand_landmarks:
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)


# 3. Extract Keypoint Values

In [6]:
# Function to extract keypoint values from the MediaPipe detection results
def extract_keypoints(results):
    # Extract pose landmarks (if available) and flatten the list.
    # Each landmark includes x, y, z coordinates and visibility.
    # If pose landmarks are not available, return an array of zeros with the same length.
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    
    # Extract face landmarks (if available) and flatten the list.
    # Each landmark includes x, y, z coordinates.
    # If face landmarks are not available, return an array of zeros with the same length.
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    
    # Extract left hand landmarks (if available) and flatten the list.
    # Each landmark includes x, y, z coordinates.
    # If left hand landmarks are not available, return an array of zeros with the same length.
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    
    # Extract right hand landmarks (if available) and flatten the list.
    # Each landmark includes x, y, z coordinates.
    # If right hand landmarks are not available, return an array of zeros with the same length.
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    
    # Concatenate all the extracted keypoints (pose, face, left hand, right hand) into a single array.
    return np.concatenate([pose, face, lh, rh])

# 4. Setup Folders for Collection

In [7]:
# Define the base directory where data will be stored
COLLECTIONS_PATH = os.path.join('collections') 

# Define the list of actions to be collected (e.g., 'one', 'two', 'three')
actions = np.array(['one', 'two', 'three'])

# Define the number of sequences to be collected for each action
no_sequences = 30

# Define the length of each sequence (number of frames per sequence)
sequence_length = 30

# Define the starting folder index
start_folder = 1

In [8]:
# Loop through each action to create necessary directories
for action in actions:
    # Create the path for the current action
    action_path = os.path.join(COLLECTIONS_PATH, action)
    
    # If the directory for the current action does not exist, create it
    if not os.path.exists(action_path):
        os.makedirs(action_path)
    
    # List all existing directories in the action path
    existing_dirs = np.array(os.listdir(action_path))
    
    # Find the highest existing directory number to continue numbering sequences sequentially
    dirmax = np.max(existing_dirs.astype(int)) if len(existing_dirs) > 0 else 0
    
    # Loop through the number of sequences to be created
    for sequence in range(1, no_sequences + 1):
        try:
            # Create a new directory for each sequence
            os.makedirs(os.path.join(COLLECTIONS_PATH, action, str(dirmax + sequence)))
        except:
            # Pass if there's any exception (e.g., directory already exists)
            pass


# 5. Collect Keypoint Values for Training and Testing

In [9]:
# Initialize video capture from the webcam (device index 1)
cap = cv2.VideoCapture(1)

# Use MediaPipe's holistic model for detection and tracking with specified confidence levels
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # Loop through each action to be collected
    for action in actions:
        # Loop through each sequence to be collected for the current action
        cv2.waitKey(1500)  # Wait for 1500 milliseconds (1.5 seconds)
        for sequence in range(start_folder, start_folder + no_sequences):
            # Loop through each frame in the sequence
            for frame_num in range(sequence_length):
                
                # Read a frame from the webcam
                ret, frame = cap.read()
                
                # Perform MediaPipe detection on the frame
                image, results = mediapipe_detection(frame, holistic)
                
                # Draw landmarks on the detected frame
                draw_landmarks(image, results)
                
                # If it's the first frame of the sequence, display a starting message
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120, 200), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15, 12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    # Show the frame with the starting message
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)  # Wait for 500 milliseconds (0.5 seconds)
                else:
                    # Display collection progress on subsequent frames
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15, 12), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    # Show the frame with the collection progress
                    cv2.imshow('OpenCV Feed', image)
                
                # Extract keypoints from the detection results
                keypoints = extract_keypoints(results)
                
                # Create a path for saving the keypoints as a numpy file
                npy_path = os.path.join(COLLECTIONS_PATH, action, str(sequence), str(frame_num))
                
                # Save the keypoints to the specified path
                np.save(npy_path, keypoints)
                
                # Break the loop if 'q' key is pressed
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

    # Release the video capture object and close all OpenCV windows
    cap.release()
    cv2.destroyAllWindows()


I0000 00:00:1717009830.276748 1679283 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M1
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1717009830.355686 1679629 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1717009830.361288 1679629 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1717009830.362503 1679629 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1717009830.362503 1679631 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1717009830.363124 1679626 inference_feedback_manager.cc:114] Feedback manager requires a mod

In [10]:
cv2.waitKey(1)
cv2.waitKey(1)
cap.release()
cv2.destroyAllWindows()

# 6. Preprocess Data and Create Labels and Features

In [11]:
# Import necessary libraries for data preprocessing and model training
from sklearn.model_selection import train_test_split
import keras
from keras.utils import to_categorical

# Create a dictionary to map each action label to a unique numeric identifier
label_map = {label: num for num, label in enumerate(actions)}

# Initialize lists to hold sequences and their corresponding labels
sequences, labels = [], []

In [12]:
# Loop through each action to process its sequences
for action in actions:
    # Loop through each sequence for the current action
    for sequence in np.array(os.listdir(os.path.join(COLLECTIONS_PATH, action))).astype(int):
        window = []  # Initialize a list to hold frames for the current sequence
        # Loop through each frame in the sequence
        for frame_num in range(sequence_length):
            file_path = os.path.join(COLLECTIONS_PATH, action, str(sequence), "{}.npy".format(frame_num))  # Construct the file path
            if os.path.isfile(file_path):  # Check if the file exists
                res = np.load(file_path)  # Load the keypoint data from the file
                window.append(res)  # Append the keypoint data to the current sequence
            else:
                print('File not found: ', file_path)  # Print a message if the file is not found
        sequences.append(window)  # Append the completed sequence to the sequences list
        labels.append(label_map[action])  # Append the corresponding label to the labels list

In [13]:
# Convert the list of sequences to a NumPy array
X = np.array(sequences)

In [14]:
# Convert labels to categorical format (one-hot encoding)
y = to_categorical(labels).astype(int)

In [15]:
# Split the data into training and testing sets (95% training, 5% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

# 7. Build and Train LSTM Neural Network

In [16]:
# Import necessary libraries for building and training the neural network
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.callbacks import TensorBoard

# Define the directory where TensorBoard logs will be saved
log_dir = os.path.join('Logs')
# Initialize the TensorBoard callback
tb_callback = TensorBoard(log_dir=log_dir)

# Initialize a sequential model
model = Sequential()

# Add the first LSTM layer with 64 units, return sequences, and ReLU activation
# The input shape is (30, 1662), where 30 is the sequence length and 1662 is the number of features (keypoints)
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30, 1662)))

# Add a second LSTM layer with 128 units, return sequences, and ReLU activation
model.add(LSTM(128, return_sequences=True, activation='relu'))

# Add a third LSTM layer with 64 units, without returning sequences, and ReLU activation
model.add(LSTM(64, return_sequences=False, activation='relu'))

# Add a dense (fully connected) layer with 64 units and ReLU activation
model.add(Dense(64, activation='relu'))

# Add another dense layer with 32 units and ReLU activation
model.add(Dense(32, activation='relu'))

# Add the output layer with a number of units equal to the number of actions, and softmax activation
# Softmax activation is used for multi-class classification
model.add(Dense(actions.shape[0], activation='softmax'))

# Compile the model with the Adam optimizer, categorical crossentropy loss, and categorical accuracy metric
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

  super().__init__(**kwargs)


In [17]:
# Train the model with the training data
# Set the number of epochs to 2000 and use the TensorBoard callback for logging
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

Epoch 1/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 35ms/step - categorical_accuracy: 0.2420 - loss: 1.4489
Epoch 2/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step - categorical_accuracy: 0.2229 - loss: 3.6525
Epoch 3/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step - categorical_accuracy: 0.1884 - loss: 1.3507
Epoch 4/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step - categorical_accuracy: 0.3413 - loss: 1.3198
Epoch 5/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step - categorical_accuracy: 0.4101 - loss: 1.2246
Epoch 6/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step - categorical_accuracy: 0.4784 - loss: 1.0669
Epoch 7/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step - categorical_accuracy: 0.4166 - loss: 1.1091
Epoch 8/2000
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/ste

<keras.src.callbacks.history.History at 0x30ffacc20>

In [18]:
# Print the summary of the model architecture
model.summary()

# 8. Make Predictions

In [19]:
# Use the trained model to make predictions on the test data
res = model.predict(X_test)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 163ms/step


In [20]:
# Get the predicted action for the fifth sample in the test data
# np.argmax(res[4]) returns the index of the highest probability in the prediction array for the fifth sample
# actions[np.argmax(res[4])] maps this index to the corresponding action
predicted_action = actions[np.argmax(res[4])]

# Get the true action for the fifth sample in the test data
# np.argmax(y_test[4]) returns the index of the highest value in the one-hot encoded true label for the fifth sample
# actions[np.argmax(y_test[4])] maps this index to the corresponding action
true_action = actions[np.argmax(y_test[4])]

# Print the predicted and true actions for comparison
print(f"Predicted action: {predicted_action}")
print(f"True action: {true_action}")

Predicted action: peace
True action: peace


# 9. Save Weights
Save the trained model's weights and architecture to a file named 'collection.h5'

In [21]:
model.save('model.h5')



# 10. Evaluation using Confusion Matrix and Accuracy

In [22]:

# Import necessary functions for evaluation
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

# Use the trained model to make predictions on the test data
yhat = model.predict(X_test)

# Convert the true labels from one-hot encoding to integer labels
ytrue = np.argmax(y_test, axis=1).tolist()

# Convert the predicted labels from one-hot encoding to integer labels
yhat = np.argmax(yhat, axis=1).tolist()

# Compute the multilabel confusion matrix
confusion_matrix = multilabel_confusion_matrix(ytrue, yhat)

# Compute the accuracy score
accuracy = accuracy_score(ytrue, yhat)

# Print the results
print("Confusion Matrix:\n", confusion_matrix)
print("Accuracy Score:", accuracy)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
Confusion Matrix:
 [[[3 0]
  [3 0]]

 [[1 5]
  [0 0]]

 [[5 0]
  [1 0]]

 [[4 0]
  [1 1]]]
Accuracy Score: 0.16666666666666666
