<a href="https://colab.research.google.com/github/DLNinja/HandKeypointsDetection/blob/main/UNET.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import torch
import numpy as np
from tqdm import tqdm
from PIL import Image
from torchvision import transforms
import cv2
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
from tqdm import tqdm
import torch.nn.functional as F

# Display Methods for Coordinates and Heatmaps

In [None]:
def show_sample(image, coords, original_size=(224, 224), color='bw'):
    # Transform to numpy array
    if isinstance(image, torch.Tensor):
        image = image.cpu().detach().numpy()
    if not isinstance(image, np.ndarray):
        original_image = np.array(image)
    else:
        original_image = image

    original_image = original_image.copy()
    if original_image.dtype != np.uint8:
        original_image = (original_image * 255).astype(np.uint8)

    # Scale coordinates to image size
    img_h, img_w = original_size
    if isinstance(coords, torch.Tensor):
        coords = coords.detach().cpu().numpy()
    rescaled_keypoints = [(int(x * img_w), int(y * img_h)) for x, y in coords]

    # Draw teh lines - relation of keypoints on hand
    line_dict = [
        [0, 1, 2, 3, 4],
        [0, 5, 6, 7, 8],
        [9, 10, 11, 12],
        [13, 14, 15, 16],
        [0, 17, 18, 19, 20],
        [5, 9, 13, 17]
    ]
    for seq in line_dict:
        coords = []
        for i in seq:
            coords.append(rescaled_keypoints[i])
        for i in range(len(coords)-1):
            cv2.line(original_image, coords[i], coords[i+1], color=(0, 0, 0), thickness=1)

    # Draw keypoints on the original image
    for (x, y) in rescaled_keypoints:
        cv2.circle(original_image, (x, y), radius=2, color=(255, 255, 255), thickness=-1)

    # Display the image
    plt.figure(figsize=(6, 6))
    plt.imshow(original_image)
    plt.axis('off')
    plt.show()


### Displays the original image, combined heatmaps and overlayed heatmaps over the image

In [None]:
def show_heatmap(image, heatmaps, original_size=(224, 224)):
    # Transform to numpy array
    if isinstance(image, torch.Tensor):
        image = image.squeeze(0)
        image = image.permute(1, 2, 0).cpu().numpy()
    if not isinstance(image, np.ndarray):
        original_image = np.array(image)
    else:
        original_image = image

    original_image = original_image.copy()
    if original_image.dtype != np.uint8:
        original_image = (original_image * 255).astype(np.uint8)

    if isinstance(heatmaps, torch.Tensor):
        heatmaps = heatmaps.detach().cpu().numpy()

    H, W = image.shape[:2]
    combined_heatmap = np.sum(heatmaps, axis=0)
    combined_heatmap = np.clip(combined_heatmap, 0, 1)

    # Convert to color map for visualization
    colored_heatmap = cv2.applyColorMap((combined_heatmap * 255).astype(np.uint8), cv2.COLORMAP_JET)
    overlayed_image = cv2.addWeighted(image, 0.6, colored_heatmap, 0.4, 0)

    # Display
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 3, 1)
    plt.title("Original Image")
    plt.imshow(image)
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Combined Heatmap")
    plt.imshow(combined_heatmap, cmap='hot')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Overlayed Image")
    plt.imshow(overlayed_image[..., ::-1])  # Convert BGR to RGB
    plt.axis('off')

    plt.tight_layout()
    plt.show()

## Generate heatmaps based on coordinates

In [None]:
def generate_heatmap(heatmap_size, keypoints, sigma=2):
    num_keypoints = keypoints.shape[0]
    heatmaps = np.zeros((num_keypoints, heatmap_size[1], heatmap_size[0]), dtype=np.float32)

    for i, (x, y) in enumerate(keypoints):
        if x < 0 or y < 0:
            continue
        x = int(x * heatmap_size[0])
        y = int(y * heatmap_size[1])
        heatmap = np.zeros((heatmap_size[1], heatmap_size[0]), dtype=np.float32)
        heatmap = cv2.circle(heatmap, (x, y), sigma * 3, 1, -1)
        heatmap = cv2.GaussianBlur(heatmap, (0, 0), sigma)
        heatmap /= heatmap.max() + 1e-5
        heatmaps[i] = heatmap
    return heatmaps

## Extract Coordinates from heatmaps

In [None]:
def soft_argmax(heatmaps, beta=100):
    B, K, H, W = heatmaps.shape
    heatmaps = heatmaps.view(B, K, -1)
    heatmaps = F.softmax(heatmaps * beta, dim=2)

    indices = torch.arange(H * W, device=heatmaps.device).float()
    x_coords = (indices % W).unsqueeze(0)
    y_coords = (indices // W).unsqueeze(0)

    x = torch.sum(heatmaps * x_coords, dim=2) / W
    y = torch.sum(heatmaps * y_coords, dim=2) / H

    return torch.stack([x, y], dim=2)


# Dataset Loader

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset
from torchvision import transforms

class HandKeypointsDataset(Dataset):
    def __init__(self, npz_file, img_size=(224, 224)):
        self.npz_file = npz_file
        self.img_size = img_size

        self.data = np.load(self.npz_file)
        self.images = self.data['images']
        self.keypoints = self.data['keypoints']

        self.transform = transforms.Compose([
            transforms.ToTensor(),
        ])

    def __len__(self):
        return len(self.data["images"])

    def __getitem__(self, idx):
        image = self.images[idx]
        keypoints = self.keypoints[idx].reshape(21, 3)
        keypoints = keypoints[:, :2]

        image = self.transform(image)

        return image, torch.tensor(keypoints, dtype=torch.float32)


# Model Definition
UNET architecture

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=21, init_features=64):
        super(UNet, self).__init__()
        features = init_features

        self.encoder1 = self._block(in_channels, features)
        self.pool1 = nn.MaxPool2d(2)
        self.encoder2 = self._block(features, features * 2)
        self.pool2 = nn.MaxPool2d(2)
        self.encoder3 = self._block(features * 2, features * 4)
        self.pool3 = nn.MaxPool2d(2)
        self.encoder4 = self._block(features * 4, features * 8)
        self.pool4 = nn.MaxPool2d(2)

        self.bottleneck = self._block(features * 8, features * 16)

        self.upconv4 = nn.ConvTranspose2d(features * 16, features * 8, kernel_size=2, stride=2)
        self.decoder4 = self._block(features * 16, features * 8)
        self.upconv3 = nn.ConvTranspose2d(features * 8, features * 4, kernel_size=2, stride=2)
        self.decoder3 = self._block(features * 8, features * 4)
        self.upconv2 = nn.ConvTranspose2d(features * 4, features * 2, kernel_size=2, stride=2)
        self.decoder2 = self._block(features * 4, features * 2)
        self.upconv1 = nn.ConvTranspose2d(features * 2, features, kernel_size=2, stride=2)
        self.decoder1 = self._block(features * 2, features)

        self.output = nn.Conv2d(features, out_channels, kernel_size=1)

    def _block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        e1 = self.encoder1(x)
        e2 = self.encoder2(self.pool1(e1))
        e3 = self.encoder3(self.pool2(e2))
        e4 = self.encoder4(self.pool3(e3))

        b = self.bottleneck(self.pool4(e4))

        d4 = self.upconv4(b)
        d4 = self.decoder4(torch.cat((d4, e4), dim=1))
        d3 = self.upconv3(d4)
        d3 = self.decoder3(torch.cat((d3, e3), dim=1))
        d2 = self.upconv2(d3)
        d2 = self.decoder2(torch.cat((d2, e2), dim=1))
        d1 = self.upconv1(d2)
        d1 = self.decoder1(torch.cat((d1, e1), dim=1))

        return self.output(d1)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = UNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

## Choosing a model
Both models use same architecture and optimizer settings.

First model extracts coordinates from predicted heatmaps, computes the loss with ground truth coordinates using L1 Loss.

Second model transforms ground truth coordinates it gt heatmaps, computes loss with predicted heatmaps using MSE Loss.

In [None]:
model_name = 'unet_v1'
checkpoint = '/content/drive/MyDrive/HandKeypoints/models/handpose_checkpoint_unet.pth'

In [None]:
model_name = 'unet_v2'
checkpoint = '/content/drive/MyDrive/HandKeypoints/models/handpose_checkpoint_unet_V2.pth'

In [None]:
# load pre-trained model from checkpoint
if os.path.exists(checkpoint):
    checkpoint = torch.load(checkpoint, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1
    previous_total_loss = checkpoint['total_loss']
    previous_avg_loss = checkpoint['avg_loss']
    history = checkpoint['history']
    best_acc = history['test_accuracy'][-1]
    print(f"Resuming training from epoch {start_epoch}")
else:
    start_epoch = 1
    previous_acc = 0
    history = {
        'train_loss': [],
        'test_loss': [],
        'test_accuracy': [],
        'test_avg_time_per_sample': []
    }
    best_acc = 0.0

In [None]:
def keypoints_to_heatmaps(keypoints, heatmap_size=(224, 224), sigma=2):
    B, K, _ = keypoints.shape
    W, H = heatmap_size
    device = keypoints.device

    # Create a mesh grid for the heatmap
    x = torch.arange(W, dtype=torch.float32, device=device)
    y = torch.arange(H, dtype=torch.float32, device=device)
    yy, xx = torch.meshgrid(y, x, indexing='ij')  # shape: (H, W)

    xx = xx[None, None, :, :].expand(B, K, H, W)
    yy = yy[None, None, :, :].expand(B, K, H, W)

    # Rescale keypoints to pixel coordinates
    kp_x = keypoints[:, :, 0] * W  # (B, 21)
    kp_y = keypoints[:, :, 1] * H  # (B, 21)
    kp_x = kp_x[:, :, None, None]  # (B, 21, 1, 1)
    kp_y = kp_y[:, :, None, None]

    # Compute squared distances
    heatmaps = torch.exp(-((xx - kp_x) ** 2 + (yy - kp_y) ** 2) / (2 * sigma ** 2))

    # Mask invalid keypoints (x < 0 or y < 0)
    mask = (keypoints[:, :, 0] >= 0) & (keypoints[:, :, 1] >= 0)  # (B, 21)
    mask = mask[:, :, None, None].float()
    heatmaps *= mask

    # Normalize to have peak of 1
    max_vals = heatmaps.amax(dim=(2, 3), keepdim=True) + 1e-5
    heatmaps /= max_vals

    return heatmaps  # shape: (B, 21, H, W)


In [None]:
import time
# Test Model method - on each epoch, after training process is finished
def test_model(model, test_loader, device, threshold=0.05):
    model.eval()
    total_loss = 0.0
    total_samples = 0
    correct_preds = 0
    start_time = time.time()

    with torch.no_grad():
        for images, targets in test_loader:
            images = images.to(device)
            targets = targets.to(device)

            pred_heatmaps = model(images)
            preds = soft_argmax(pred_heatmaps)

            loss = F.l1_loss(preds, targets, reduction='sum')
            total_loss += loss.item()

            dists = torch.norm(preds - targets, dim=2)
            within_thresh = (dists < threshold).float()

            correct_preds += within_thresh.sum().item()
            total_samples += dists.numel()

    elapsed_time = time.time() - start_time
    avg_time_per_sample = elapsed_time / total_samples

    avg_loss = total_loss / total_samples
    accuracy = correct_preds / total_samples

    return avg_loss, accuracy, avg_time_per_sample

# Training Section
One training method for each model version, one for L1 Loss, one for MSE Loss.

### Coordinates Comparison using L1 Loss

In [None]:
from torch.utils.data import DataLoader
import random
import time

chunk_dir = "/content/drive/MyDrive/HandKeypoints/dataset/"
test_chunk = "/content/drive/MyDrive/HandKeypoints/dataset/val_chunk0.npz"
chunk_files = sorted([f for f in os.listdir(chunk_dir) if f.endswith(".npz") and f.startswith('train')])
batch_size = 8

final_epoch = 31
for epoch in range(start_epoch, final_epoch):
    model.train()
    total_loss = 0
    print(f"\nEpoch {epoch}/{final_epoch-1}")

    total_keypoints = 0
    # randomize chunk order, then start

    step = 0
    random.shuffle(chunk_files)
    for chunk_file in chunk_files:
        step += 1
        chunk_path = os.path.join(chunk_dir, chunk_file)

        # load one dataset chunk at a time
        dataset = HandKeypointsDataset(npz_file=chunk_path)
        # DataLoader - wraps iterable around Dataset to enable easy access to the samples
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
        # ProgressBar - show real-time statistics about training
        progress_bar = tqdm(dataloader, desc=f"[{chunk_file}] Step {step}/10", leave=False, ncols=100, dynamic_ncols=True)

        for i, (images, targets) in enumerate(progress_bar):
            images = images.to(device)
            targets = targets.to(device)
            # predict -> apply loss -> update optimizer
            pred_heatmaps = model(images)
            preds = soft_argmax(pred_heatmaps)

            loss = F.l1_loss(preds, targets)
            total_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            total_keypoints += images.size(0) * 21
            progress_bar.set_postfix(loss=loss.item())

    # same as before, load test chunk to obtain test scores
    test_dataset = HandKeypointsDataset(test_chunk)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

    test_loss, test_acc, test_avg_time = test_model(model, test_loader, device)

    # predict first image from last chunk, for visual proof of training
    img = images[0].clone().detach().cpu().numpy()
    img = np.transpose(img, (1, 2, 0))
    img = np.ascontiguousarray(img)
    img = (img * 255).clip(0, 255).astype(np.uint8)

    predicted = preds[0].detach().cpu()
    show_sample(img, predicted, original_size=(224, 224))

    # Print epoch statistics
    avg_loss = total_loss / total_keypoints
    print(f"Epoch {epoch}, Total Loss: {total_loss:.4f}, Avg Loss: {avg_loss:.4f}")
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Avg Time per Keypoint: {test_avg_time:.6f}s")

    history['train_loss'].append(avg_loss)
    history['test_loss'].append(test_loss)
    history['test_accuracy'].append(test_acc)
    history['test_avg_time_per_sample'].append(test_avg_time)

    # if better model than before, save it
    if test_acc > best_acc:
        print("Saving model!")
        best_acc = test_acc
        torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'total_loss': total_loss,
                'avg_loss': avg_loss,
                'history': history
            }, checkpoint)
    torch.cuda.empty_cache()

### Heatmaps Comparison using MSE Loss

In [None]:
from torch.utils.data import DataLoader
import random
import time

chunk_dir = "/content/drive/MyDrive/HandKeypoints/dataset/"
test_chunk = "/content/drive/MyDrive/HandKeypoints/dataset/val_chunk0.npz"
chunk_files = sorted([f for f in os.listdir(chunk_dir) if f.endswith(".npz") and f.startswith('train')])
batch_size = 8

final_epoch = 31
for epoch in range(start_epoch, final_epoch):
    model.train()
    total_loss = 0
    print(f"\nEpoch {epoch}/{final_epoch-1}")

    total_keypoints = 0
    # randomize chunk order, then start

    step = 0
    random.shuffle(chunk_files)
    for chunk_file in chunk_files:
        step += 1
        chunk_path = os.path.join(chunk_dir, chunk_file)

        # load one dataset chunk at a time
        dataset = HandKeypointsDataset(npz_file=chunk_path)
        # DataLoader - wraps iterable around Dataset to enable easy access to the samples
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
        # ProgressBar - show real-time statistics about training
        progress_bar = tqdm(dataloader, desc=f"[{chunk_file}] Step {step}/10", leave=False, ncols=100, dynamic_ncols=True)

        for i, (images, targets) in enumerate(progress_bar):
            images = images.to(device)
            targets = targets.to(device)
            # turn targets to heatmaps
            # then predict -> apply loss -> update optimizer
            with torch.no_grad():
                gt_heatmaps = keypoints_to_heatmaps(targets).to(device)

            pred_heatmaps = model(images)

            loss = F.mse_loss(pred_heatmaps, gt_heatmaps)
            total_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            total_keypoints += images.size(0) * 21
            progress_bar.set_postfix(loss=loss.item())

    # same as before, load test chunk to obtain test scores
    test_dataset = HandKeypointsDataset(test_chunk)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

    test_loss, test_acc, test_avg_time = test_model(model, test_loader, device)

    # predict first image from last chunk, for visual proof of training
    img = images[0].clone().detach().cpu().numpy()
    img = np.transpose(img, (1, 2, 0))
    img = np.ascontiguousarray(img)
    img = (img * 255).clip(0, 255).astype(np.uint8)

    # Get coordinates from predicted heatmap and plot results
    preds = soft_argmax(pred_heatmaps)
    predicted = preds[0].detach().cpu()
    show_sample(img, predicted, original_size=(224, 224))

    # Print epoch statistics
    avg_loss = total_loss / total_keypoints
    print(f"Epoch {epoch}, Total Loss: {total_loss:.4f}, Avg Loss: {avg_loss:.4f}")
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Avg Time per Keypoint: {test_avg_time:.6f}s")

    history['train_loss'].append(avg_loss)
    history['test_loss'].append(test_loss)
    history['test_accuracy'].append(test_acc)
    history['test_avg_time_per_sample'].append(test_avg_time)

    # if better model than before, save it
    if test_acc > best_acc:
        print("Saving model!")
        best_acc = test_acc
        torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'total_loss': total_loss,
                'avg_loss': avg_loss,
                'history': history
            }, checkpoint)
    torch.cuda.empty_cache()

In [None]:
test_path = "/content/drive/MyDrive/HandKeypoints/dataset/val_chunk0.npz"
dataset = HandKeypointsDataset(test_path)

In [None]:
img_list = [33, 71, 76, 94, 100, 239, 249, 290, 311, 331]
for i in img_list:
    (image, target) = dataset.__getitem__(i*5)

    model.eval()

    test_img = image.unsqueeze(0).to(device)

    start_time = time.time()
    with torch.no_grad():
        predicted = model(test_img)
    elapsed_time = time.time() - start_time
    print(f"Prediction time: {elapsed_time}s")
    predicted_coords = soft_argmax(predicted)
    predicted_coords = predicted_coords.squeeze(0).cpu()

    predicted = predicted.squeeze(0)
    predicted.size()

    img = image.squeeze(0).clone().detach().cpu().numpy()
    img = np.transpose(img, (1, 2, 0))
    img = np.ascontiguousarray(img)
    img = (img * 255).clip(0, 255).astype(np.uint8)

    print(f"Image {i}:")
    show_heatmap(img, predicted)
    show_sample(img, predicted_coords, original_size=(224, 224))

In [None]:
import json

export_data = {
    'model_name': model_name,
    'history': checkpoint.get('history', {}),
    'final_epoch': checkpoint.get('epoch'),
    'total_loss': checkpoint.get('total_loss'),
    'avg_loss': checkpoint.get('avg_loss')
}

save_dir = '/content/drive/MyDrive/HandKeypoints'
save_path = os.path.join(save_dir, f'{model_name}_stats.json')

with open(save_path, 'w') as f:
    json.dump(export_data, f)