In [1]:
import numpy as np
import cv2
import mediapipe as mp
import time
import psutil

from tensorflow.keras.models import Model
from tensorflow.keras.layers import (LSTM, Dense, Dropout, Input, Flatten, 
                                     Bidirectional, Permute, multiply)

2024-04-02 15:01:35.129311: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-02 15:01:35.808819: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-02 15:01:35.808943: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-02 15:01:35.900626: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-02 15:01:36.258979: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-02 15:01:36.278269: I tensorflow/core/platform/cpu_feature_guard.cc:1

In [2]:
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

I0000 00:00:1712050306.827484 1026507 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1712050306.880132 1027641 gl_context.cc:344] GL version: 3.2 (OpenGL ES 3.2 Mesa 23.0.4-0ubuntu1~22.04.1), renderer: Mesa Intel(R) UHD Graphics 620 (KBL GT2)


In [5]:
def extract_keypoints(results):
    """
    Processes and organizes the keypoints detected from the pose estimation model 
    to be used as inputs for the exercise decoder models
    
    """
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten()

    return pose

In [6]:
def attention_block(inputs, time_steps):
    """
    Attention layer for deep neural network
    
    """
    # Attention weights
    a = Permute((2, 1))(inputs)
    a = Dense(time_steps, activation='softmax')(a)
    
    # Attention vector
    a_probs = Permute((2, 1), name='attention_vec')(a)
    
    # Luong's multiplicative score
    output_attention_mul = multiply([inputs, a_probs], name='attention_mul') 
    
    return output_attention_mul


In [7]:
def build_model(HIDDEN_UNITS=256, sequence_length=30, num_input_values=33*4, num_classes=3):
    """
    Function used to build the deep neural network model on startup

    Args:
        HIDDEN_UNITS (int, optional): Number of hidden units for each neural network hidden layer. Defaults to 256.
        sequence_length (int, optional): Input sequence length (i.e., number of frames). Defaults to 30.
            num_input_values (_type_, optional): Input size of the neural network model. Defaults to 33*4 (i.e., number of keypoints x number of metrics).
            num_classes (int, optional): Number of classification categories (i.e., model output size). Defaults to 3.

    Returns:
        keras model: neural network with pre-trained weights
    """
    # Input
    inputs = Input(shape=(sequence_length, num_input_values))
    # Bi-LSTM
    lstm_out = Bidirectional(LSTM(HIDDEN_UNITS, return_sequences=True))(inputs)
    # Attention
    attention_mul = attention_block(lstm_out, sequence_length)
    attention_mul = Flatten()(attention_mul)
    # Fully Connected Layer
    x = Dense(2*HIDDEN_UNITS, activation='relu')(attention_mul)
    x = Dropout(0.5)(x)
    # Output
    x = Dense(num_classes, activation='softmax')(x)
    # Bring it all together
    model = Model(inputs=[inputs], outputs=x)

    ## Load Model Weights
    load_dir = "../models/LSTM.h5"  
    model.load_weights(load_dir)
    
    return model


In [8]:
CONFIDENCE_THRESHOLD = 0.8
SEQUENCE_LENGTH = 60

In [11]:
model = build_model()

cap = cv2.VideoCapture('../dataset/squat/squat_11.mp4')
# cap = cv2.VideoCapture('../dataset/hammer curl/hammer curl_11.mp4')
# cap = cv2.VideoCapture(0)

# cap = cv2.VideoCapture('../dataset/barbell biceps curl/barbell biceps curl_11.mp4')


sequence = []
actions = np.array(['curl', 'press', 'squat'])
current_action = ''

prev_frame_time = 0
new_frame_time = 0

while cap.isOpened():
      ret, frame = cap.read()
    
      if not ret: 
            break
      
      rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      results = pose.process(rgb_frame)
    
      if results.pose_landmarks:
            landmarks = extract_keypoints(results)
            mp.solutions.drawing_utils.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
      else: 
            continue 
      
      sequence.append(landmarks)
      
      sequence = sequence[-SEQUENCE_LENGTH:]
      
      if len(sequence) == SEQUENCE_LENGTH:
            res = model.predict(np.expand_dims(sequence[-30:], axis=0), verbose=0)[0]
            
            current_action = actions[np.argmax(res)]
            confidence = np.max(res)
            
            if (confidence < 0.8):
                  current_action = ''
      
      # CPU analysis
      new_frame_time = time.time() 

      fps = 1/(new_frame_time-prev_frame_time) 
      prev_frame_time = new_frame_time 
      fps = str(int(fps)) 

      ram_usage = psutil.virtual_memory().percent
      cpu_usage = psutil.cpu_percent()

      # putting the FPS count on the frame 
      cv2.putText(frame, 'FPS: {}'.format(fps), (1000, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (100, 255, 0), 2) 

      # Display the RAM usage
      cv2.putText(frame, f"RAM Usage: {ram_usage}%", (1000, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (100, 255, 0), 2)

      # Display CPU usage
      cv2.putText(frame, f"CPU Usage: {cpu_usage}%", (1000, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (100,255,0), 2)
      
      cv2.rectangle(frame, (0,0), (360, 40), 0.5, -1)
      cv2.putText(frame, 'Exercise ' + current_action, (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    

      cv2.imshow('Output Video', frame)

      if cv2.waitKey(10) & 0xFF==ord('q'):
            break
        
cap.release()
cv2.destroyAllWindows()
