# 0. Import Libraries/Dependencies

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import tensorflow as tf
import math
import glob

from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, classification_report

from tensorflow.keras.utils import to_categorical
from tensorflow.keras import backend as K
K.clear_session()
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

from tensorflow.keras.models import Sequential, Model

from tensorflow.keras.layers import (LSTM, Dense, Concatenate, Attention, Dropout, Softmax,
                                     Input, Flatten, Activation, Bidirectional, Permute, multiply, 
                                     ConvLSTM2D, MaxPooling3D, TimeDistributed, Conv2D, MaxPooling2D)

from tensorflow.keras.optimizers.legacy import Adam

from scipy import stats

# disable some of the tf/keras training warnings 
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "3"
tf.get_logger().setLevel("ERROR")
tf.autograph.set_verbosity(1)

# suppress untraced functions warning
import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)

# 1. Keypoints using MP Pose
- Load MediaPipe's (MP) Pose model for full-body keypoint detection
- Initialize MP's drawing utilities to visualize the landmarks

In [None]:
# Pre-trained pose estimation model from Google Mediapipe
mp_pose = mp.solutions.pose

# Supported Mediapipe visualization tools
mp_drawing = mp.solutions.drawing_utils

In [None]:
def mediapipe_detection(image, model):
    """
    This function detects human pose estimation keypoints from webcam footage
    
    """
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [None]:
def draw_landmarks(image, results):
    """
    This function draws keypoints and landmarks detected by the human pose estimation model
    
    """
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
                                 )

# 2. Extract Frames from Videos
- Load each video file using OpenCV
- Sample frames at a consistent frame rate (e.g. 15-30 fps) for temporal consistency
- Resize frames to a fixed size (e.g. 224x224) to reduce computation (optional)

In [None]:
def extract_frames(video_path, frame_rate=15):
    """
    This function extracts and resizes frames from a video at a specified sampling rate

    """
    cap = cv2.VideoCapture(video_path)
    frames = []     # Define frames as an empty list
    fps = cap.get(cv2.CAP_PROP_FPS)     # Extract the fps from the video
    frame_interval = int(fps / frame_rate)
    count = 0

    while cap.isOpened():
        # Read a frame from the video file    
        ret, frame = cap.read()

        if not ret:
            break     # Break loop if the video is finished

        # If the modulo is 0 btwn the count and the frame_interval --> resize the frame
        if count % frame_interval == 0:
            frame = cv2.resize(frame, (224, 224))
            frames.append(frame)     # Adds the frame to the frames list (at the end)

        count += 1
    
    cap.release()
    return frames

# 3. Extract Keypoints
- Extract landmark coordinates from each frame using MediaPipe
- Convert landmarks into feature vectors (i.e. angles, distances, or normalized coordinates)

In [None]:
def extract_pose_features(frames):
    """
    This function extracts the landmarks from each frame and converts them into feature vectors

    """
    pose = mp_pose.Pose(static_image_mode=False)
    features = []

    for frame in frames:
        results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if results.pose_landmarks:
            
            # Extract landmark positions / compute angles/distances here
            landmarks = [(lm.x, lm.y, lm.z) for lm in results.pose_landmarks.landmark]
            features.append(landmarks)
        else:
            features.append(None)
        
    pose.close()
    return features

# 4. Load Data from Directory
- Define the path to the video dataset (video_dir)
- Load videos from a directory (folder), and create labels for the videos based on the sub directory (the sub folders labelled "bicep curl", "squat" etc.)
- For each sequence (including pose features per frame) apply a label, creating LSTM-ready sequences

In [None]:
def extract_label_names(video_dir):
    """
    This function extracts labels from the folder structure of a video dataset
    
    """
    video_paths = glob.glob(os.path.join(video_dir, '**','*.mp4'), recursive=True)
    label_names = []
    
    # Infer label from the folder name
    for video_path in video_paths:
        #filename = os.path.basename(video_path)
        label = os.path.basename(os.path.dirname(video_path))
        label_names.append(label)

    return label_names    

In [None]:
def process_videos_from_directory(video_dir, frame_rate=15, sequence_length=30):
    """
    This function converts labeled videos into pose-based training sequences for an ML model
    
    """

    video_paths = glob.glob(os.path.join(video_dir, '**','*.mp4'), recursive=True)
    all_sequences = []
    all_labels = []
    label_names = []

    # Infer label from the folder name
    for video_path in video_paths:
        label = os.path.basename(os.path.dirname(video_path))
        label_names.append(label)        

    # Create a label map (map numerical labels to class names)
    label_map = {label: idx for idx, label in enumerate(sorted(set(label_names)))}

    # Read videos and extract frames
    for video_path in video_paths:
        label = os.path.basename(os.path.dirname(video_path))
        label_idx = label_map[label]

        # Extract frames and features
        print(f"Processing: {video_path}")
        frames = extract_frames(video_path, frame_rate=frame_rate)
        if not frames:     # Print a warning if no frames were extracted
            print("No frames extracted!")
            continue
        
        pose_features = extract_pose_features(frames)
        pose_features = [f for f in pose_features if f is not None] # Remove frames with missing data

        print(f" --> {len(pose_features)} pose features.")
        
        if len(pose_features) < sequence_length:
            print("Warning: Not enough valid frames for sequence")
            continue

        # Convert to np.array for easier slicing
        pose_features = np.array(pose_features)
        
        # Create sliding window sequences
        for i in range(len(pose_features) - sequence_length + 1):
            sequence = pose_features[i : i + sequence_length]
            all_sequences.append(sequence)
            all_labels.append(label_idx)
    
    # Testing/Debugging - check that number of sequences and labels is the same
    print("Number of sequences: ", len(all_sequences))
    print("Number of labels: ", len(all_labels))
    print("Label map: ", label_map)

    X = np.array(all_sequences)
    y = to_categorical(all_labels).astype(int)

    # Returns: X (sequences), y (labels), label_map (dict)
    return X, y, label_map

In [None]:
# file path - where the videos are located
# copy the path to the main data folder, NOT the sub-folders (bicep curl, squat, etc.)
video_dir = 'Copy_file_path_here'
sequence_length = 30

X, y, label_map = process_videos_from_directory(video_dir, frame_rate=15, sequence_length=sequence_length)

# Make sure the first dimensions of arrays match
print(X.shape, y.shape)    # X.shape: # of samples (sequences), time_steps (sequence_length), features per timestep (33 landmarks from MP), landmark values (x,y,z)
                           # y.shape: encoded labels (# sequences), # of classes

# 5. Split into Training, Validation, and Testing datasets
- Split: 75% training, 15% validation, 10% test


In [None]:
# Split 10% of data for the testing dataset
    # random_state= --> ensures reproducibility, meaning you'll get the same split every time you run the code
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, random_state=1)

# Check the number of samples in the training dataset
print(X_train.shape, y_train.shape)

# Split about 16% of the remaining training data into validation (~ 15% of the total dataset)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=15/90, random_state=2)

# Adjust shape of samples to fit in the LSTM [batch_size (num_sequences), time_steps (sequence_length), features_per_timestep (landmarks * coordinates per landmark)]
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 33 * 3)
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 33 * 3)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 33 * 3)



# 6. Build and Train Neural Networks
- Configuring model training: 
    - EarlyStopping - stops training if loss stops improving
    - ReduceLROnPlateau - reduce learning rate is loss stops improving
    - ModelCheckpoint - saves the best model w/ the least loss
    - Implementing optimizers, and hyperparameters
- Sequential LSTM Model Construction
- Attention-based bidirectinal LSTM (basically an enhanced version of the sequential LSTM)

In [None]:
# Callbacks to be used during neural network training 
es_callback = EarlyStopping(monitor='val_loss', min_delta=5e-4, patience=10, verbose=0, mode='min')
lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.00001, verbose=0, mode='min')
chkpt_callback = ModelCheckpoint(filepath=video_dir, monitor='val_loss', verbose=0, save_best_only=True, 
                                 save_weights_only=False, mode='min', save_freq=1)

# Optimizer
opt_lstm = Adam(learning_rate=0.001) # for the LSTM model
opt_attn = Adam(learning_rate=0.001) # for the LSTM + Attention model

# some hyperparamters
batch_size = 32     # number samples per training batch
max_epochs = 500    # epoch - one full training iteration over the full dataset

## 6a. LSTM
- Baseline sequential LSTM model
- TensorBoard & Callbacks setup
    - For monitoring, debugging, and optimizing model performance
- Model Architecture:
    - Input layer -> 3 LSTM layers -> 2 Dense layers -> Output layer
- Compiles and trains LSTM

In [None]:
# Set up Tensorboard logging and callbacks
NAME = f"ExerciseRecognition-LSTM-{int(time.time())}"
log_dir = os.path.join(os.getcwd(), 'logs', NAME,'')
tb_callback = TensorBoard(log_dir=log_dir)

callbacks = [tb_callback, es_callback, lr_callback, chkpt_callback]

In [None]:
# num_input_values: 33 landmarks (from MP) *3 (x, y, z)
num_input_values = 33 * 3
X = X.reshape(X.shape[0], sequence_length, num_input_values)
print(X.shape)

# an array of the exercises
label_names = extract_label_names(video_dir)

lstm = Sequential()
lstm.add(Input(shape=(sequence_length, num_input_values)))      # Explicit input
lstm.add(LSTM(128, return_sequences=True, activation='relu'))   # Looks at short-term patterns in the motion
lstm.add(LSTM(256, return_sequences=True, activation='relu'))   # Looks deeper at more abstract movement sequences
lstm.add(LSTM(128, return_sequences=False, activation='relu'))  # Condenses all info to make final decision
# Fully connected layers to interpret LSTM output
lstm.add(Dense(128, activation='relu'))
lstm.add(Dense(64, activation='relu'))
# Output a probability for each possible action using 'softmax'
lstm.add(Dense(y.shape[1], activation='softmax'))

print(lstm.summary())

In [None]:
lstm.compile(optimizer=opt_lstm, loss='categorical_crossentropy', metrics=['categorical_accuracy'])
lstm.fit(X_train, y_train, batch_size=batch_size, epochs=max_epochs, validation_data=(X_val, y_val), callbacks=callbacks)

## 6b. LSTM + Attention
- Attention mechanism (attention_block)
- Model Architecture:
    - Input layer -> Bi-LSTM layer -> Attention layer -> Dense layer -> Output layer
- Compiles & Trains AttnLSTM

In [None]:
# Set up Tensorboard logging and callbacks
NAME = f"ExerciseRecognition-AttnLSTM-{int(time.time())}"
log_dir = os.path.join(os.getcwd(), 'logs', NAME,'')
tb_callback = TensorBoard(log_dir=log_dir)

callbacks = [tb_callback, es_callback, lr_callback, chkpt_callback]

In [None]:
def attention_block(inputs, time_steps):
    """
    Attention layer for deep neural network
    
    """
    # Attention weights
    a = Permute((2, 1))(inputs)
    a = Dense(time_steps, activation='softmax')(a)
    
    # Attention vector
    a_probs = Permute((2, 1), name='attention_vec')(a)
    
    # Luong's multiplicative score
    output_attention_mul = multiply([inputs, a_probs], name='attention_mul') 
    
    return output_attention_mul

In [None]:
HIDDEN_UNITS = 256

# Input
inputs = Input(shape=(sequence_length, num_input_values))

# Bi-LSTM: looks at a sequence forward and backward
lstm_out = Bidirectional(LSTM(HIDDEN_UNITS, return_sequences=True))(inputs)

# Attention
attention_mul = attention_block(lstm_out, sequence_length)
attention_mul = Flatten()(attention_mul)

# Fully Connected Layer
x = Dense(2*HIDDEN_UNITS, activation='relu')(attention_mul)
x = Dropout(0.5)(x)

# Output
x = Dense(y.shape[1], activation='softmax')(x)

# Bring it all together
AttnLSTM = Model(inputs=[inputs], outputs=x)
print(AttnLSTM.summary())

In [None]:
# categorical crossentropy: measures difference btwn predicted probability distribution and actual (true) distribution of classes
# aka softmax loss or log loss

AttnLSTM.compile(optimizer=opt_attn, loss='categorical_crossentropy', metrics=['categorical_accuracy'])
AttnLSTM.fit(X_train, y_train, batch_size=batch_size, epochs=max_epochs, validation_data=(X_val, y_val), callbacks=callbacks)

In [None]:
# Model map
models = {
    'LSTM': lstm, 
    'LSTM_Attention_128HUs': AttnLSTM, 
}

# 7. Save & Load Weights
- Save the models (architecture + weights + optimizer state) 
- Reload the models' trained weights into rebuilt model objects
    - Useful for reusing models after training them once, and to avoid retraining when restarting your script/notebook

In [None]:
# Save Weights
for model_name, model in models.items():
    save_dir = os.path.join(os.getcwd(), f"{model_name}.h5")
    model.save(save_dir)        # saves the full model, not just weights

In [None]:
# Load Weights
# Run model rebuild before doing this
for model_name, model in models.items():
    load_dir = os.path.join(os.getcwd(), f"{model_name}.h5")
    model.load_weights(load_dir)

# 8. Make Predictions

In [None]:
for model in models.values():
    res = model.predict(X_test, verbose=0)   

# 9. Evaluations using Confusion Matrix and Accuracy
- This helps to understand how well the model is performing


In [None]:
# Dictionary to store overall evaluation results
eval_results = {}
eval_results['confusion matrix'] = None            # Placeholder for overall confusion matrix
eval_results['accuracy'] = None                    # Placeholder for overall accuracy
eval_results['precision'] = None                   # Placeholder for overall precision
eval_results['recall'] = None                      # Placeholder for overall recall
eval_results['f1 score'] = None                    # Placeholder for overall F1 score

# Dictionaries to store evaluation metrics per model
confusion_matrices = {}                            # Holds the confusion matrix for each model
classification_accuracies = {}                     # Holds the accuracy score for each model
precisions = {}                                    # Hold the precision score
recalls = {}                                       # Hold the recall score
f1_scores = {}                                     # Hold the F1 score

## 9a. Confusion Matrices
- Predicted vs Actual Labels
- Shows how many times the model:
    - correctly predicted each class (diagonal values)
    - confused a class for another (off-diagonal values)

In [None]:
# Loop through each model (lstm and AttnLSTM)
for model_name, model in models.items():
    # Predict labels for test data
    yhat = model.predict(X_test, verbose=0)
    
    # Get list of classification predictions
    ytrue = np.argmax(y_test, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()
    
    # Confusion matrix
    confusion_matrices[model_name] = multilabel_confusion_matrix(ytrue, yhat)
    print(f"{model_name} confusion matrix: {os.linesep}{confusion_matrices[model_name]}")

# Collect/store results 
eval_results['confusion matrix'] = confusion_matrices

## 9b. Accuracy
- The overall percentage of correct predictions
- Accuracy = (correct predictions) / (total predictions)

In [None]:
for model_name, model in models.items():
    # Predict labels for test data
    yhat = model.predict(X_test, verbose=0)
    
    # Get list of classification predictions
    ytrue = np.argmax(y_test, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()
    
    # Model accuracy
    classification_accuracies[model_name] = accuracy_score(ytrue, yhat)    
    print(f"{model_name} classification accuracy = {round(classification_accuracies[model_name]*100,3)}%")

# Collect results 
eval_results['accuracy'] = classification_accuracies

## 9c. Precision, Recall, and F1 Score
- Precision - out of all times the model predicted a class, how often was it correct?
    - Precision = (True positives) / (True positives + False positives)
- Recall - out of all times a class actually occurred, how often did the model catch it?
    - Recall = (True positives) / (True positives + False negatives)
- F1 Score - a score that balances both Precision and Recall (the harmonic mean of the 2)
    - F1 Score = 2 * (Precision * Recall) / (Precision + Recall)

In [None]:
for model_name, model in models.items():
    # Predict labels for test data
    yhat = model.predict(X_test, verbose=0)
    
    # Get list of classification predictions
    ytrue = np.argmax(y_test, axis=1).tolist()
    yhat = np.argmax(yhat, axis=1).tolist()

    label_names = [label for label, idx in sorted(label_map.items(), key=lambda x:x[1])]
    print(label_names)
    
    # Precision, recall, and f1 score
    report = classification_report(ytrue, yhat, target_names=label_names, output_dict=True)
    
    precisions[model_name] = report['weighted avg']['precision']
    recalls[model_name] = report['weighted avg']['recall']
    f1_scores[model_name] = report['weighted avg']['f1-score'] 
   
    print(f"{model_name} weighted average precision = {round(precisions[model_name],3)}")
    print(f"{model_name} weighted average recall = {round(recalls[model_name],3)}")
    print(f"{model_name} weighted average f1-score = {round(f1_scores[model_name],3)}\n")

# Collect results 
eval_results['precision'] = precisions
eval_results['recall'] = recalls
eval_results['f1 score'] = f1_scores

# 10. Choose Model to Test in Real Time

In [None]:
model = AttnLSTM          # you can also choose to test the baseline model ("lstm")
model_name = 'AttnLSTM'

# Now the ML model is ready to be used in an application!