# Ergonomic pose fine-tuning: 

## MMPose Hyperparameter Optimization

In [1]:
from mmpose.apis import MMPoseInferencer
import numpy as np
from PIL import Image
import torch
from functools import partial
from torch.utils.data import random_split, DataLoader, Subset, TensorDataset
from torchvision import datasets, transforms
import cv2
import time

num_classes = 3
num_keypoints = 133
data_path = 'E:/Users/Vipin/Documents/BHT/3. Semester/Learning from images/Pose Dataset Complete'

In [2]:
def get_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps")
    else:
        return torch.device("cpu")
    
def set_seed(seed):
    torch.manual_seed(seed) 
    np.random.seed(seed)
    torch.use_deterministic_algorithms(True)
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.enabled = True
    
get_device()

device(type='cuda')

In [3]:
class PILToNumpyTransform:
    def __call__(self, pil_img):
        """
        Convert a PIL Image to an OpenCV Image / Numpy Array.

        Parameters:
            pil_img (PIL.Image): The PIL Image to be converted.

        Returns:
            np.ndarray: The converted OpenCV Image in RGB format.
        """
        # Convert PIL Image to NumPy array (RGB)
        img_array = np.array(pil_img)
        img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB)

        return img_array

data_transforms = transforms.Compose([
    transforms.Resize((640, 480)), # Resize images to 640x640
    transforms.ToTensor(), # Convert images to PyTorch tensors
])

own_dataset = datasets.ImageFolder(root=data_path, transform=data_transforms)
display(own_dataset)

Dataset ImageFolder
    Number of datapoints: 118
    Root location: E:/Users/Vipin/Documents/BHT/3. Semester/Learning from images/Pose Dataset Complete
    StandardTransform
Transform: Compose(
               Resize(size=(640, 480), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
           )

In [4]:
generator1 = torch.Generator().manual_seed(13)  # set seed for reproducibility of the split
train_and_val_dataset, test_dataset = random_split(own_dataset, [0.8, 0.2], generator=generator1)  # 80% training and evaluation, 20% testing

### Define the models

In [5]:
import torch
import torch.nn as nn

class KeypointClassifier(nn.Module):
    def __init__(self, num_keypoints, num_classes, h1, h2):
        super(KeypointClassifier, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Linear(num_keypoints*2, h1),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.layer2 = nn.Sequential(
            nn.Linear(h1, h2),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.layer3 = nn.Sequential(
            nn.Linear(h2, h2 // 2),
            nn.ReLU()
        )
        self.dense = nn.Sequential(
            nn.Linear(h2 // 2, num_classes)
        )
    
    def forward(self, keypoints_flattened):
        x = self.layer1(keypoints_flattened)
        x = self.layer2(x)
        x = self.layer3(x)
        output = self.dense(x)
        
        return output

class KeypointScorer(nn.Module):
    def __init__(self, num_keypoints, h1, h2):
        super(KeypointScorer, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Linear(num_keypoints*2, h1),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.layer2 = nn.Sequential(
            nn.Linear(h1, h2),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.layer3 = nn.Sequential(
            nn.Linear(h2, h2 // 2),
            nn.ReLU()
        )
        self.dense = nn.Sequential(
            nn.Linear(h2 // 2, 1)
        )
    
    def forward(self, keypoints_flattened):
        x = self.layer1(keypoints_flattened)
        x = self.layer2(x)
        x = self.layer3(x)
        output = self.dense(x)
        
        return output

In [6]:
def tensor_to_cv2_image(tensor):
    """
    Converts a PyTorch tensor to an OpenCV image.
    
    Parameters:
    - tensor: A PyTorch tensor, in the format CxHxW with values normalized to [0, 1].
    
    Returns:
    - An OpenCV image, in BGR format.
    """
    
    # Denormalize the tensor and convert it to a numpy array
    image = tensor.mul(255).byte().permute(1, 2, 0).cpu().numpy()
    
    # Convert the color space from RGB to BGR
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    
    return image


def get_keypoints_from_mmpose(model, inputs):
    with torch.no_grad(): # gradients are not computed for the frozen model
        results = []
        for input in inputs:
            start_time = time.perf_counter()
            
            cv2_img = tensor_to_cv2_image(input)
            result = list(next(model(cv2_img, show=False)).values())
            keypoints = result[1][0][0]['keypoints']
            
            ellapsed_time_ms = (time.perf_counter() - start_time) * 1000
            print(f"Image processed in {ellapsed_time_ms:.2f} ms")
            results.append(torch.tensor(keypoints))
    return torch.stack(results)

processing keypoints:

In [7]:
# z-Value
def process_keypoints_for_classifier(keypoints):
    # Calculate mean and standard deviation with keepdim=True to preserve broadcasting compatibility
    mean_vals = keypoints.mean(dim=1, keepdim=True)
    std_vals = keypoints.std(dim=1, keepdim=True) + 1e-6  # Adding a small epsilon to prevent division by zero

    # Perform standardization
    kp_tensor_standardized = (keypoints - mean_vals) / std_vals
    
    # Flatten the last two dimensions while keeping the batch dimension
    batch_flattened = kp_tensor_standardized.view(keypoints.size(0), -1)
    
    return batch_flattened

#### training the model:

- with cross-entropy loss function (fits our classification task)
- Adam optimizer

HPO:

In [8]:
import optuna
from sklearn.metrics import f1_score, confusion_matrix
import numpy as np

In [9]:
def preprocess_dataset(kp_detection_model, dataset, device):
    processed_keypoints_list = []
    labels_list = []
    
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    
    # Assuming `dataset` is an iterable of (input, label) pairs
    for inputs, labels in dataloader:
        inputs = inputs.to(device)
        keypoints = get_keypoints_from_mmpose(kp_detection_model, inputs)  # Extract keypoints        
        processed_kps = process_keypoints_for_classifier(keypoints)  # Process keypoints        
        processed_keypoints_list.append(processed_kps)
        labels_list.append(labels)

    # Convert lists to tensors
    processed_keypoints_tensor = torch.cat(processed_keypoints_list, dim=0)
    labels_tensor = torch.cat(labels_list, dim=0)
    
    # Create a new TensorDataset and DataLoader
    preprocessed_dataset = TensorDataset(processed_keypoints_tensor, labels_tensor)
    
    return preprocessed_dataset
    
def train_and_eval_model(kp_model, optimizer, loss_fn, num_epochs, train_loader, val_loader, device):
    
    kp_model = kp_model.to(device)  # move model to device
    kp_model.train()  # set model to training mode
    
    print("type of train_loader", type(train_loader))

    for epoch in range(num_epochs):
        for batch in train_loader:
            # access images and labels
            inputs = batch[0].to(device)
            labels = batch[1].to(device)
            
            optimizer.zero_grad()  # Zero the parameter gradients
            classification_output = kp_model(inputs) # get results for the classification
            
            # Use this for classification
            # loss = loss_fn(classification_output, labels)
            
            # Use this for scoring
            loss = loss_fn(classification_output.float(), labels.float())
            
            loss.backward()  # Backpropagate the loss
            optimizer.step()  # Update weights

            # todo: further processing, such as calculating accuracy or loss, goes here

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
            
        
        kp_model.eval()  # model to evaluation mode

        total_loss = 0
        all_predictions = []
        all_labels = []

        with torch.no_grad():  # no need to compute gradients, because we are in evaluation mode
            for inputs, labels in val_loader:  # iterate over validation dataset
                inputs, labels = inputs.to(device), labels.to(device)  # move data to device
                
                # Not necessary because already preprocessed
                # keypoints = get_keypoints_from_yolo(yolo_model, inputs) # get keypoints from the YOLO model
                # processed_kps = process_keypoints_for_classifier(keypoints) # prepare the keypoints for the classifier
                
                classification_output = kp_model(inputs) # get results for the classification 
                
                # Use this for classification
                # loss = loss_fn(classification_output, labels)
                
                # Use this for scoring
                loss = loss_fn(classification_output.float(), labels.float())
                
                total_loss += loss.item()  # accumulate the loss
                # get predictions for output
                _, predicted = torch.max(classification_output.data, 1)
                # collect the predictions and labels
                all_predictions.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
            # calculate the validation metrics:
            avg_loss = total_loss / len(val_loader)  # get the average loss
            # accuracy = (np.array(all_predictions) == np.array(all_labels)).mean()
            # f1 = f1_score(all_labels, all_predictions, average='weighted')  
            # conf_matrix = confusion_matrix(all_labels, all_predictions)
            
            print(f"Validation Loss: {avg_loss:.4f}")
            # print(f"Validation Accuracy: {accuracy:.4f}")
            # print(f"Validation F1 Score: {f1:.4f}")
            # print("Confusion Matrix:")
            # print(conf_matrix)
            
            # here: F1 score chosen as the metric to optimize
            # other options: - combining metrics like accuracy and F1 score to maximize on both
            #                        - or multi-objective optimization on F1 score and accuracy
    return avg_loss 

In [10]:
# this function was generated by gpt-4

def get_k_fold_indices(n, k=5, random_seed=None):
    """
    Generate indices for k-fold cross-validation.

    Parameters:
    - n: Total number of samples in the dataset.
    - k: Number of folds.
    - random_seed: Optional seed for reproducibility.

    Returns:
    - A list of tuples, each containing (train_indices, val_indices) for a fold.
    """
    # Initialize the random generator
    g = torch.Generator()
    if random_seed is not None:
        g.manual_seed(random_seed)
    
    # Generate a random permutation of indices
    indices = torch.randperm(n, generator=g).tolist()
    
    # Calculate fold sizes
    fold_sizes = [n // k for _ in range(k)]
    for i in range(n % k):
        fold_sizes[i] += 1
    
    # Generate train and validation indices for each fold
    current = 0
    folds_indices = []
    for fold_size in fold_sizes:
        start, end = current, current + fold_size
        val_indices = indices[start:end]
        train_indices = indices[:start] + indices[end:]
        folds_indices.append((train_indices, val_indices))
        current = end
    
    return folds_indices


In [11]:
def objective(trial, dataset):
    # Define hyperparameters to optimize
    lr = trial.suggest_categorical("lr", [1e-3, 1e-4, 5e-5])
    h1 = trial.suggest_categorical("h1", [256, 512, 1024])
    h2 = trial.suggest_categorical("h2", [256, 512, 1024])
    batch_size = trial.suggest_categorical("batch_size", [4, 8, 16])
    num_epochs = trial.suggest_categorical("num_epochs", [100, 200, 300])

    validation_scores = []

    n = len(dataset)
    k = 5
    folds_indices = get_k_fold_indices(n, k, random_seed=13)
    device = get_device()
    
    for fold, (train_idx, val_idx) in enumerate(folds_indices, start=1):
        print(f"Trial {trial.number}, Fold {fold}/{k}")
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)
        
        # Create data loaders for training and validation
        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)
        
        # Initializing the model and optimizer with the chosen hyperparameters
        kp_model = KeypointScorer(num_keypoints, h1, h2).to(device)
        # kp_model = KeypointClassifier(num_keypoints=num_keypoints, num_classes=num_classes, h1=h1, h2=h2).to(device)
        optimizer = torch.optim.Adam(kp_model.parameters(), lr=lr, weight_decay=1e-5)
    
        # training and evaluating the model - Watch out for the Loss - CrossEntropyLoss for classification and MSELoss for scoring
        validation_score = train_and_eval_model(kp_model, optimizer, nn.MSELoss(), num_epochs, train_loader, val_loader, device)
        validation_scores.append(validation_score)

    return np.mean(validation_scores)


In [None]:
mmpose_model = MMPoseInferencer('rtmw-m_8xb1024-270e_cocktail14-256x192')
preprocessed_dataset = preprocess_dataset(mmpose_model, train_and_val_dataset, get_device())
optimize = partial(objective, dataset = preprocessed_dataset)
search_space = {
    "lr": [1e-3, 1e-4, 5e-5],
    "h1": [256, 512, 1024],
    "h2": [256, 512, 1024],
    "batch_size": [4, 8, 16],
    "num_epochs": [100, 200, 300]
}

study = optuna.create_study(sampler=optuna.samplers.GridSampler(search_space), direction='minimize')
study.optimize(optimize, n_trials=243)

# Best hyperparameters
print("Best hyperparameters:", study.best_params)

In [13]:
best_params = study.best_params
best_params

{'lr': 5e-05, 'h1': 512, 'h2': 1024, 'batch_size': 4, 'num_epochs': 200}

In [14]:
best_params = study.best_params
# extract best_parameters
batch_size = best_params['batch_size']
lr = best_params['lr']
h1 = best_params['h1']
h2 = best_params['h2']
num_epochs = best_params['num_epochs']

#### train and evaluate final model on test set:

Unfortunately Optuna cannot output the best model, so we train again on the combined train and validation set with the best parameters found.

dataloader for final evaluation:

In [15]:
train_and_eval_loader = preprocess_dataset(mmpose_model, train_and_val_dataset, get_device())
test_loader = preprocess_dataset(mmpose_model, test_dataset, get_device())

Image processed in 112.85 ms
Image processed in 62.68 ms
Image processed in 61.85 ms
Image processed in 58.51 ms
Image processed in 62.82 ms
Image processed in 63.22 ms
Image processed in 60.42 ms
Image processed in 60.82 ms
Image processed in 54.75 ms
Image processed in 57.09 ms
Image processed in 58.36 ms
Image processed in 58.68 ms
Image processed in 63.09 ms
Image processed in 62.29 ms
Image processed in 63.19 ms
Image processed in 60.65 ms
Image processed in 58.50 ms
Image processed in 64.25 ms
Image processed in 66.18 ms
Image processed in 63.61 ms
Image processed in 60.31 ms
Image processed in 61.46 ms
Image processed in 61.57 ms
Image processed in 61.95 ms
Image processed in 61.38 ms
Image processed in 62.43 ms
Image processed in 58.98 ms
Image processed in 57.29 ms
Image processed in 58.04 ms
Image processed in 53.93 ms
Image processed in 61.89 ms
Image processed in 60.12 ms
Image processed in 60.72 ms
Image processed in 52.80 ms
Image processed in 62.11 ms
Image processed in 

final evaluation on test set:

In [16]:
# new instance of the model:
kp_model = KeypointScorer(num_keypoints, h1, h2)
# kp_model = KeypointClassifier(num_keypoints, num_classes, h1, h2)
kp_model = kp_model.to(get_device())  # move model to device
kp_model.train()  # set model to training mode

# loss function and optimizer
criterion = nn.MSELoss() # nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(kp_model.parameters(), lr=lr, weight_decay=1e-5)

for epoch in range(num_epochs):
    for inputs, labels in train_and_eval_loader:  # Assuming data_loader is your DataLoader instance for the dataset
        inputs, labels = inputs.to(get_device()), labels.to(get_device())  # move data to device
        optimizer.zero_grad()  # Zero the parameter gradients
        classification_output = kp_model(inputs) # get results for the classification 
        
        # Use this for classification
        # loss = criterion(classification_output, labels)
        
        # Use this for scoring
        loss = criterion(classification_output.float(), labels.float())
        
        loss.backward()  # Backpropagate the loss
        optimizer.step()  # Update weights

        # todo: further processing, such as calculating accuracy or loss, goes here

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        
    
kp_model.eval()  # model to evaluation mode

total_loss = 0
all_predictions = []
all_labels = []
with torch.no_grad():  # no need to compute gradients, because we are in evaluation mode
    for inputs, labels in test_loader:  # iterate over validation dataset
        inputs, labels = inputs.to(get_device()), labels.to(get_device())  # move data to device
        classification_output = kp_model(inputs) # get results for the classification 
        
        # Use this for classification
        loss = criterion(classification_output, labels)
        
        # Use this for scoring
        # loss = criterion(classification_output.float(), labels.float())
        
        total_loss += loss.item()  # accumulate the loss
        
        if classification_output.data.dim() == 1:
            classification_output.data = classification_output.data.unsqueeze(0)
        
        _, predicted = torch.max(classification_output.data, 1)
        # collect the predictions and labels
        all_predictions.extend(predicted.cpu().numpy())
        
        if labels.dim() == 0:
            labels = labels.unsqueeze(0)  # Add a dimension to make it iterable
        all_labels.extend(labels.cpu().numpy())
    # calculate the validation metrics:
    avg_loss = total_loss / len(test_loader)  # get the average loss
    # accuracy = (np.array(all_predictions) == np.array(all_labels)).mean()
    # f1 = f1_score(all_labels, all_predictions, average='weighted')  
    # conf_matrix = confusion_matrix(all_labels, all_predictions)
    
    print(f"Test Loss: {avg_loss:.4f}")
    # print(f"Test Accuracy: {accuracy:.4f}")
    # print(f"Test F1 Score: {f1:.4f}")
    # print("Confusion Matrix:")
    # print(conf_matrix)

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/200], Loss: 11.8278
Epoch [2/200], Loss: 3.8881
Epoch [3/200], Loss: 3.2793
Epoch [4/200], Loss: 3.0183
Epoch [5/200], Loss: 0.8485
Epoch [6/200], Loss: 2.2129
Epoch [7/200], Loss: 3.3966
Epoch [8/200], Loss: 0.5521
Epoch [9/200], Loss: 3.3730
Epoch [10/200], Loss: 1.7887
Epoch [11/200], Loss: 0.3325
Epoch [12/200], Loss: 0.4270
Epoch [13/200], Loss: 0.4965
Epoch [14/200], Loss: 0.6571
Epoch [15/200], Loss: 0.5159
Epoch [16/200], Loss: 0.5595
Epoch [17/200], Loss: 0.4413
Epoch [18/200], Loss: 0.6999
Epoch [19/200], Loss: 1.2832
Epoch [20/200], Loss: 0.2218
Epoch [21/200], Loss: 0.2286
Epoch [22/200], Loss: 0.0025
Epoch [23/200], Loss: 0.1119
Epoch [24/200], Loss: 0.0659
Epoch [25/200], Loss: 0.4963
Epoch [26/200], Loss: 0.0802
Epoch [27/200], Loss: 0.0000
Epoch [28/200], Loss: 1.3841
Epoch [29/200], Loss: 0.6992
Epoch [30/200], Loss: 0.3252
Epoch [31/200], Loss: 0.0203
Epoch [32/200], Loss: 1.6401
Epoch [33/200], Loss: 1.0623
Epoch [34/200], Loss: 1.3181
Epoch [35/200], Loss: 