The goal of the project is to simultaneously predict keypoints and age of the person in the image. This can be later combined with bounding box detection of a person or having multiple instances in the heatmaps.

## Helpers

### Libraries

In [1]:
from collections import Counter
import os
import json
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
import torchvision.models as models
import torchvision.io as io
import torch.nn.functional as F 
from PIL import Image
import gc

### Const

In [None]:
NUM_KEYPOINTS = 17
NUM_AGE_CLASSES = 2 #adult vs child, adult(1), child(0), infant in the future (1)
IMAGE_SIZE = (256, 192)
HEATMAP_SIZE = (8,6)#(64, 48)
B = 1
C = 2 #how important is age to the loss
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

### Functions

In [None]:
def gaussian_2d(shape:tuple, center:tuple, sigma=2)->torch.Tensor:
    """Gaussian formula for smoothing/blurring
    if sigma_x==sigma_y the shape is circular (keypoint), otherwise ellipsis

    Args:
        shape (tuple): heatmap size (h,w)
        center (tuple): coordinates (x,y)
        sigma (int, optional): parameter Defaults to 2.

    Returns:
        torch.Tensor: heatmap
    """
    h, w = shape
    y = torch.arange(0,h).float()
    x = torch.arange(0,w).float()
    yy, xx = torch.meshgrid(y,x,indexing="ij")
    x0, y0 = center
    return torch.exp(-((xx-x0)**2+(yy-y0)**2)/(2*sigma**2))

def kps_to_heatmaps(kps:torch.Tensor, nr_kps:int = NUM_KEYPOINTS, ht_size:int = HEATMAP_SIZE,th=0)-> dict:
    """Converts keypoints of one person to a dictionary of heatmaps (one per keypoint)

    Args:
        kps (torch.Tensor): person's keypoints NUM_KEYPOINTS*3
        nr_kps (int, optional): number of keypoints. Defaults to NUM_KEYPOINTS.
        ht_size (int, optional): size of the heatmap. Defaults to HEATMAP_SIZE.
        th (int, optional): threshold when keypoints are considered. Defaults to 0.

    Returns:
        dict: dictionary of heatmaps, with one per keypoint (x,y)
    """
    ht = torch.zeros(nr_kps, *ht_size)

    for k in range(nr_kps):
        x, y, _ = kps[k*3:k*3+3]
        x = x * ht_size[1]
        y = y * ht_size[0]

        ht[k] = gaussian_2d(
            ht_size, center=(x,y),sigma=2
        )
    return ht

def one_person_heatmaps_to_kps(hts:dict,img_size:tuple=IMAGE_SIZE, ht_size:tuple=HEATMAP_SIZE, th:int=0)->list:
    """Converts heatmaps of one person to keypoints
       Each keypoint has (x,y,conf) where conf=2 means it is visible, conf=0 it was not detected

    Args:
        hts (dict): heatmaps
        img_size (tuple, optional): size of the image. Defaults to IMAGE_SIZE.
        ht_size (tuple, optional): _size/dimensions of the image. Defaults to HEATMAP_SIZE.
        th (int, optional):threshold. Defaults to 0.

    Returns:
        list: list of keypoints
    """
    k_nr, ht_h, ht_w = hts.shape
    img_w, img_h = img_size
    kps = []
    for k in range(k_nr):
        temp = hts[k]
        if temp.max() <= th:
            kps.extend([0,0,0])
            continue
        #resize
        idx = temp.argmax()
        x_img = (idx.item() % ht_w) * img_w/ht_w
        y_img = (idx.item() // ht_w) * img_h/ht_h
        kps.extend([x_img,y_img,2])
    return kps

def people_heatmaps_to_kps(ht)->list:
    """Converts heatmaps of multiple people to keypoints

    Args:
        ht (_type_): _description_

    Returns:
        list: list of keypoints
    """
    N, _, _, _ = ht.shape
    all_keypoints = []
    for n in range(N):
        all_keypoints.extend(one_person_heatmaps_to_kps(ht[n]))
    return all_keypoints

def ht_to_coord(ht, topk:int=17)->torch.Tensor:
    """Converts heatmap to coordinates. Softmax is used to normalize the heatmaps

    Args:
        ht (_type_): heatmap
        topk (int, optional): _description_. Defaults to 17.

    Returns:
        torch.Tensor: coordinates
    """
    N, C, H, W = ht.shape
    score, index = ht.view(N,C,1,-1).topk(topk, dim=-1)
    coord = torch.cat([index%W, index//H], dim=2)
    return (coord*F.softmax(score, dim=-1)).sum(-1)

def load_data(ann, label_keys, th:int)->list:
    """Loads data as a list of dictionaries
    Removes duplicates - only images where one person is detected are used

    Args:
        ann (dict): annotations
        label_keys (dict_keys): _description_
        th (int):min number of keypoints

    Returns:
        list: data with annotations
    """
    data = [a for a in ann if a['num_keypoints'] > th and a['image_id'] in label_keys]
    counts = Counter(d['image_id'] for d in data)
    data = [d for d in data if counts[d['image_id']]==1]
    print(f"Annotations:{len(data)}")
    return data

### Classes

In [4]:
class InputDataset(Dataset):
    def __init__(self, ann_path:str, data_dir:str, label_path:str,th:int=0,is_gag:bool=False):
        self.data_dir = data_dir
        with open(ann_path, "r" ) as f:
            ann = json.load(f)
        self.labels = pd.read_csv(label_path).set_index('image_id').T.to_dict()
        self.data = load_data(ann['annotations'], self.labels.keys(), th)
        self.id_to_name_map = {
            img['id'] : img['file_name']
            for img in ann['images']
        }
        self.is_gag = is_gag
        self.transform = T.Compose([
            T.Resize(IMAGE_SIZE),
            T.ToTensor(),
            T.Normalize(mean=[0.485,0.456,0.406],
                        std=[0.229,0.224,0.225])
        ])
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        kp = self.data[index]
        img_id = kp['image_id']
        img_path = os.path.join(
            self.data_dir,
            self.id_to_name_map[img_id]
        )
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)
        kps = torch.tensor(kp['keypoints']).float()
        ht = kps_to_heatmaps(kps)
        age = torch.tensor(int(self.labels[img_id]['age']=='adult'), dtype=torch.int64)        
        return image, ht, age
    

class MixedModel(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.backbone = nn.Sequential(*list(resnet.children())[:-2])#removes classification
        self.pose_head = nn.Sequential(
            nn.Conv2d(2048,256,3,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256,NUM_KEYPOINTS,1)
        )
        self.age_head = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(2048,256),
            nn.ReLU(inplace=True),
            nn.Linear(256, NUM_AGE_CLASSES)
        )

    def forward(self, x):
        feat = self.backbone(x)
        ht = self.pose_head(feat)
        age = self.age_head(feat)
        return ht, age

## Training

### Helper Functions

In [None]:
def train_epochs(model:MixedModel, epochs:int, loader:DataLoader, optimizer, scheduler, 
                 pose_criterion, age_criterion, device:str=DEVICE,
                 path:str='./model_heatmap.pth', step_print:bool=True, 
                 th:int=0.25, B:int=1,C:int=2):
    """Train the model for `epochs`. Save model in the given `path`

    Args:
        model (MixedModel): _description_
        epochs (int): _description_
        loader (DataLoader): _description_
        optimizer (_type_): _description_
        scheduler (_type_): _description_
        pose_criterion (_type_): _description_
        age_criterion (_type_): _description_
        device (str, optional): _description_. Defaults to DEVICE.
        path (str, optional): _description_. Defaults to './model_heatmap.pth'.
    """
    for e in range(epochs):
        model.train()
        total_loss = 0
        num_batches = 0
        for images, gt_heatmaps, age_labels in loader:
            optimizer.zero_grad()
            images = images.to(device)
            gt_heatmaps = gt_heatmaps.to(device)
            pred_heatmaps, pred_age = model(images)
            loss_pose = pose_criterion(pred_heatmaps, gt_heatmaps)
            loss = loss_pose
            if age_labels[0] is not None:
                age_labels = age_labels.to(device)
                loss_age = age_criterion(pred_age, age_labels)
                loss = B * loss + C * loss_age            
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            num_batches += 1
        scheduler.step()
        epoch_loss = round(total_loss / num_batches,4)
        lr = round(scheduler.get_last_lr()[0],4)
        print(f"Epoch {e}: loss = {epoch_loss:.4f} lr = {lr}")
        if step_print and epoch_loss < th:
            torch.save(model.state_dict(),f'./model_train_heatmap_{epoch_loss}loss_{e}_{epochs}ep_{lr}lr_Adam_OneCycleLR.pth')
    torch.save(model.state_dict(), path)  
    return epoch_loss

* MSELoss ranges from $0$ to $1$
* CrossEntropyLoss (for two classes, `adult` and `child`) ranges from $0$ to $0.693$ 
  
The final loss formula is `B * pose_loss + C * age_loss` where $C$ is a constant

I decided to make $B$ equal to $1$ and $C$ equal to $2$ since age is really imporant in this model

The final loss ranges from $0$ to $2.386$ with good values of loss being less than $0.24$ (scaled to 0-1)

In [6]:
pose_criterion = nn.MSELoss(reduction='mean') #normalize heatmap, good especially for large heatmaps
age_criterion = nn.CrossEntropyLoss()

* `keypoints_path` - json from coco website with information. It has `annotations` where each annotation has the `keypoints`, `num_keypoints`, `image_id`, `id` and `images` with `filename` and other information about the images.
* `img_dir` - directory with jpg images from `keypoints_path` json
* `label_path` - path to a csv. The ages were predicted using both the body ratios (MMU gag dataset) and captions from coco dataset, then manually verified by looking through images. Csv has three columns `age` (child|adult), `image_id` (filename), `id`
* `lr` - learning rate chosen using optuna after adding the scheduler
* `epochs` - number of epochs chosen using optuna + verified by saving each model after certain number of epochs
* `opt` - Adam optimizer used for training
* `sch` - OneCycleLR used as a scheduler, the learning rate is annealed until we reach maximum and then we decrease the learning rate lower than the initial learning rate `lr`
* `heatmap` - this version of the model uses heatmaps while predicting

In [None]:
is_train = "train"
keypoints_path = f"./data/person_keypoints_{is_train}2017.json"
img_dir = f'./data/train2017_man_{is_train}'
label_path = f'./data/label_coco_man_{is_train}.csv'

lr = 0.005
epochs = 10
opt = "Adam"
sch = "OneCycleLR"
ht = "heatmap"

path = f'./models/model_{is_train}_{ht}_lr{lr}_ep{epochs}_opt{opt}_sch{sch}.pth'

train_data = InputDataset(keypoints_path,img_dir, label_path)
train_loader = DataLoader(train_data, batch_size=64, num_workers=0, shuffle=True)

Annotations:3096


### Run Training

In [None]:
if __name__ == '__main__':
    torch.cuda.empty_cache()
    gc.collect() #clears cache bcs sometimes it breaks
    
    model = MixedModel().to(DEVICE)

    optimizer = optim.Adam(model.parameters(), lr)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer,
                                                max_lr = lr,
                                                steps_per_epoch=len(train_loader),
                                                epochs=epochs,
                                                pct_start=0.1,
                                                anneal_strategy='cos'
                                                )

    for p in model.age_head.parameters():
        p.requires_grad = False

    epoch_loss = train_epochs(model, epochs, train_loader, optimizer,scheduler, pose_criterion, age_criterion, path=path)

1.4530417919158936
2.078248620033264
1.8649099667867024
1.7049901187419891
1.5985094070434571
1.5312675038973491
1.4686125005994524
1.4135803878307343
1.3561420573128595
1.3199161469936371
1.2806583480401472
1.274994785586993
1.2370428718053377
1.2060591280460358
1.1879186550776164
1.176233023405075
1.1572419860783745
1.155966450770696
1.1332629103409617
1.1172718584537507
1.1051660123325528
1.085524702614004
1.0685441053431968
1.067219781378905
1.0663857913017274
1.061854185966345
1.0475118822521634
1.0410564371517725
1.037568410922741
1.0275382816791534
1.0220191267228895
1.0078760907053947
1.0002230261311387
0.9916905497803408
0.9799893702779497
0.977400971783532
0.9774602619377343
0.9679893270919198
0.9608410719113473
0.9586742758750916
0.955235761840169
0.9466909993262518
0.9433327885561211
0.940576508641243
0.9370939254760742
0.936847518319669
0.9347328340753596
0.9320910212894281
0.9258451157686661
Epoch 0: loss = 0.9258 lr = 0.0002
0.3453093469142914
0.41063089668750763
0.37630

### Optuna study

#### helper functions

In [None]:
from sklearn.base import accuracy_score
import optuna
from optuna.samplers import TPESampler

def objective(trial)->float:
    """Runs each trial

    Args:
        trial (_type_): _description_

    Returns:
        float: Returns loss when training
    """
    model = MixedModel().to(DEVICE)
    is_train = "train"
    keypoints_path = f"./data/person_keypoints_{is_train}2017.json"
    img_dir = f'./data/train2017_man_{is_train}'
    label_path = f'./data/label_coco_man_{is_train}.csv'
    train_data = InputDataset(keypoints_path,img_dir, label_path)
    train_loader = DataLoader(train_data, batch_size=64, num_workers=0, shuffle=True)
    pose_criterion = nn.MSELoss(reduction='mean') #normalize heatmap, good especially for large heatmaps
    age_criterion = nn.CrossEntropyLoss()

    for p in model.age_head.parameters():
        p.requires_grad = False
    epochs = trial.suggest_int("epochs",0,100)
    lr = trial.suggest_int("lr",1e-3, 1e-1)
    pct_start = trial.suggest_float("pct_start",0.05, 0.3)
    optimizer = optim.Adam(model.parameters(), lr)
    scheduler = optim.lr_scheduler.OneCycleLR(optimizer,
                                                max_lr = lr,
                                                steps_per_epoch=len(train_loader),
                                                epochs=epochs,
                                                pct_start=pct_start,
                                                anneal_strategy='cos'
                                                )
    return train_epochs(model, epochs, train_loader, optimizer, scheduler, pose_criterion, age_criterion, step_print=False)
    # model.fit(X_train, y_train)
    # y_pred = model.predict(X_test)
    # return accuracy_score(y_test, y_pred)
    #TO DO
    # add accuracy of age and separately for heatmaps/keypoints


optuna.logging.set_verbosity(optuna.logging.WARNING)

sampler = TPESampler(seed=1)

study = optuna.create_study(study_name="coco", direction="minimize", sampler=sampler)
study.optimize(objective, n_trials=100)

## Validation

Helper function

In [None]:
def run_test(model:MixedModel, loader:DataLoader,
                 pose_criterion, age_criterion, device:str=DEVICE,
                 B:int=1, C:int=2)->tuple[list,list]:
    """ Predict for all data in the test loader

    Args:
        model (MixedModel): _description_
        epochs (int): _description_
        loader (DataLoader): _description_
        pose_criterion (_type_): _description_
        age_criterion (_type_): _description_
        device (str, optional): _description_. Defaults to DEVICE.
        path (str, optional): _description_. Defaults to './model_heatmap.pth'.   
    Returns:
        tuple[list,list]: returns two lists of keypoints, one extracted normally and one softmaxed
    """
    kps = []
    kps2 = []
    total_loss = 0.0
    num_batches = 0
    model.eval()
    with torch.no_grad():
        for images, gt_heatmaps, age_labels in loader:
                images = images.to(device)
                gt_heatmaps = gt_heatmaps.to(device)
                pred_heatmaps, pred_age = model(images)
                kps.extend(people_heatmaps_to_kps(pred_heatmaps))
                kps2.extend(ht_to_coord(pred_heatmaps))
                loss_pose = pose_criterion(pred_heatmaps, gt_heatmaps)
                loss = loss_pose
                if age_labels[0] is not None:
                    age_labels = age_labels.to(DEVICE)
                    loss_age = age_criterion(pred_age, age_labels)
                    loss = B * loss + C * loss_age
                total_loss += loss.item()
                num_batches += 1
        print(f"Testing loss = {total_loss / num_batches:.4f}")
    return kps, kps2

Load Test/Validation Data

In [None]:
is_train = "val"
keypoints_path = f"./data/person_keypoints_{is_train}2017.json"
img_dir = f'./data/train2017_man_{is_train}'
label_path = f'./data/label_coco_man_{is_train}.csv'
val_data = InputDataset(keypoints_path,img_dir, label_path)
val_loader = DataLoader(val_data, batch_size=64, num_workers=6, shuffle=True)

In [None]:
if __name__ == '__main__':
    torch.cuda.empty_cache()
    gc.collect() #clears cache bcs sometimes it breaks
    model_path = path #can be changed from path in model to other saved models
    model.load_state_dict(torch.load(path, weights_only=True))
    model = MixedModel().to(DEVICE)
    kps, soft_kps = run_test(model, val_loader, pose_criterion, age_criterion)

## Visualisation

## Run on real videos

The plan is to track people, crop the boxes and detect keypoints+age on the cropped images. The issue is occlusion and people leaving and reappearing in the view.

TO DO: permanent id tracking, saving the ids for some time and reid (current version has iding based on ratios which is not memory efficient)

In [None]:
import csv
import cv2
from ultralytics import YOLO

keypoint_names = [
"nose","left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder","left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee","left_ankle", "right_ankle"
]


def calc_skeleton(kps, conf, frame_number, person_id)->list:
    """Calculate skeleton"""
    kp = np.array(kps)
    c = np.array(conf)
    row = [frame_number,person_id]
    for i, (x, y) in enumerate(kp):
        row.extend([float(x), float(y),float(c[i])])
    return row 

def gen_boxes(v:str, v_name:str, device:str=DEVICE):
    """Generate boxes"""
    box_model = YOLO("models/yolo11n.pt")
    box_model.to(device)
    try:
        cap = cv2.VideoCapture(v)
    except:
        print("No video")
        return
    
    header = ["frame", "person_id"]
    for name in keypoint_names:
        header.extend([f"{name}_x", f"{name}_y",f"{name}_conf"])
        
    rows = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        results = box_model.track(
            frame,
            classes=[0],
            persist=True,
            verbose=False,
            show=False, 
            tracker="botsort2.yaml"
        )
        
        result = results[0]
        
        if (result.boxes is not None and 
            result.boxes.id is not None):
            
            tracker_ids = result.boxes.id.int().cpu().tolist()
            boxes_xyxy = result.boxes.xyxy.cpu().numpy()
            boxes_xywh = result.boxes.xywh.cpu().numpy()
            
            #if frame_nr % 10:
            #    print(f"frame:{frame_nr}")

    with open(f"{v_name}_keypoints.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(rows)

    cv2.destroyAllWindows()

### Run prediction

### Display keypoints

## References

1. [Tutorial on adding another head when predicting](https://y-t-g.github.io/tutorials/yolov8n-add-classes/)
2. [Ultralytics model training](https://docs.ultralytics.com/modes/train/#idle-gpu-training)
3. [Roboflow tutorial](https://colab.research.google.com/github/roboflow-ai/notebooks/blob/main/notebooks/train-yolov8-keypoint.ipynb)
4. [Keypoints with heatmaps](https://www.slingacademy.com/article/creating-a-keypoint-detection-model-with-pytorch-and-heatmap-regression/)
5. [Tutorial on keypoints regression, used some heatmap functions](https://elte.me/2021-03-10-keypoint-regression-fastai)
6. [Heatmap transform](https://github.com/baoshengyu/H3R/blob/master/torchalign/heatmap_head/transforms/functional.py)
7. [Heatmap regression via randomized rounding](https://github.com/baoshengyu/H3R)