# Human Body Dimension Estimation (HBDE) from occluded images

### Authors: M. Beiwinkler, M. Krimpelstätter, I. Viertola and T. Wulz

In [None]:
IN_HOSTED_COLAB_RUNTIME = 1
if IN_HOSTED_COLAB_RUNTIME == 1:
    from google.colab import drive
    drive.mount('/content/drive')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import json
import os
import glob
import random
from skimage.io import imread
from PIL import Image, ImageDraw

import torch
from torch.utils.data import Dataset, DataLoader, ConcatDataset, SubsetRandomSampler
from torch.nn import Linear, ReLU, MSELoss, Sequential, Conv2d, MaxPool2d, Module, BatchNorm2d, LeakyReLU
from torch.optim import SGD

import tqdm
from sklearn.model_selection import KFold

%matplotlib inline

In [None]:
# Paths
if IN_HOSTED_COLAB_RUNTIME == 1:
    base = "/content/drive/MyDrive/data"  # Configure this variable accordingly where your dataset is in Google Drive
    TRAIN_TEST_PATH = os.path.join(base, "train_test_split.json")
else:
    base = os.getcwd()
    TRAIN_TEST_PATH = os.path.join(base, os.path.join("dataset", os.path.join("train_test", "train_test_split.json")))

DATASET_PATH = os.path.join(base, "dataset")
IMS_PATH = os.path.join(DATASET_PATH, os.path.join("synthetic_images", "200x200"))
ANNOS_PATH = os.path.join(DATASET_PATH, "annotations")

FULL_DATA_NP_ARRAY_PATH = os.path.join(base, "im_data.npy")
FULL_DATA_WITH_OCCLUSIONS_NP_ARRAY_PATH = os.path.join(base, "im_data_with_occlusions.npy")
CHOSEN_ANNOS_NP_ARRAY_PATH = os.path.join(base, "im_annos.npy")

MODEL_SAVE_PATH = os.path.join(base, "model.pth")

if not os.path.exists(IMS_PATH):
    Exception("Dataset does not exist {}".format(DATASET_PATH))

## Data preparation

In [None]:
# Train/test split by img indices
f = open(TRAIN_TEST_PATH, "r")
train_test_split = json.load(f)
f.close()

# Load data from file or from the dataset
if os.path.exists(FULL_DATA_NP_ARRAY_PATH):
    im_data = np.load(FULL_DATA_NP_ARRAY_PATH)

else:
    im_data = []
    for fn in sorted(glob.glob(os.path.join(IMS_PATH, os.path.join("**", "*.png")), recursive=True)):
        img = imread(fn)
        img = img.astype('float32')
        img /= 255.0
        im_data.append(img)

    im_data = np.array(im_data)
    with open(FULL_DATA_NP_ARRAY_PATH, "wb+") as f:
        np.save(f, im_data)
    f.close()

if not im_data.shape == (12000, 200, 200):
    Exception("Problems with image data.")

# Load annotations. One subject has 2 images (12000 images in total) -> 6000 annotations (1 annotation per subject).
if os.path.exists(CHOSEN_ANNOS_NP_ARRAY_PATH):
    im_annos = np.load(CHOSEN_ANNOS_NP_ARRAY_PATH)

else:
    im_annos = []
    for anno_fn in sorted(glob.glob(os.path.join(ANNOS_PATH, os.path.join("*", "*.json")))):
        f = open(anno_fn, "r")
        annotations = json.load(f)
        im_annos.append([annotations['human_dimensions']['height'], annotations['human_dimensions']['shoulder_width'],
                            annotations['human_dimensions']['left_arm_length'], annotations['human_dimensions']['right_arm_length'],
                            annotations['human_dimensions']['pelvis_circumference'], annotations['human_dimensions']['chest_circumference']])
    im_annos = np.array(im_annos)
    with open(CHOSEN_ANNOS_NP_ARRAY_PATH, "wb+") as f:
        np.save(f, im_data)
    f.close()

if not im_annos.shape == (6000, 6):
    Exception("Problems with image annotations.")

# Prepare occluded images

In [None]:
# Load occluded images from file or generate them

# Number of occluding rectangles per image. If you change this value, don't forget
# to delete any occluded images from disk before running this cell. This way you
# force new images with the appropriate number of occluding rectangles to be created.
# A value of zero leads to a value of one, two or three being chosen randomly for each image.
NUM_RECTANGLES = 1

if os.path.exists(FULL_DATA_WITH_OCCLUSIONS_NP_ARRAY_PATH):
    im_data_occl = np.load(FULL_DATA_WITH_OCCLUSIONS_NP_ARRAY_PATH)
else:
    # Seed pseudo-random number generator for reproducibility
    random.seed(a="HBDE from occluded images", version=2)

    # Add rectangles to images
    num_rectangles = NUM_RECTANGLES
    im_data_occl = []
    for img_index in range(im_data.shape[0]):
        # Convert from NumPy array to instance of Pillow Image class
        img = Image.fromarray(im_data[img_index])
        # Create an object that can be used to draw in the given image
        draw = ImageDraw.Draw(img)

        if NUM_RECTANGLES == 0:
            num_rectangles = random.randrange(1, 4)
        for rect in range(num_rectangles):
            # Determine position, length and width of occluding rectangle
            if img_index < 3000: # female in pose 0
                upper_left_x = random.randrange(30, 150)
                upper_left_y = random.randrange(30, 150)
            elif img_index < 6000: # male in pose 0
                upper_left_x = random.randrange(15, 170)
                upper_left_y = random.randrange(20, 175)
            elif img_index < 9000: # female in pose 1
                upper_left_x = random.randrange(65, 115)
                upper_left_y = random.randrange(30, 150)
            else: # male in pose 1
                upper_left_x = random.randrange(60, 120)
                upper_left_y = random.randrange(20, 175)

            lower_right_x = upper_left_x + random.randrange(10, 50)
            lower_right_y = upper_left_y + random.randrange(10, 50)

            # Draw rectangle
            draw.rectangle((upper_left_x, upper_left_y, lower_right_x, lower_right_y), fill="black")

        # Add manipulated image to list of occluded images
        im_data_occl.append(np.asarray(img))
        
    im_data_occl = np.array(im_data_occl)
    with open(FULL_DATA_WITH_OCCLUSIONS_NP_ARRAY_PATH, "wb+") as f:
        np.save(f, im_data_occl)
    f.close()

In [None]:
# Divide training and testing data
train_data = []
test_data = []
train_annos = []
test_annos = []

# FILL TRAIN/TEST DATA WITH OCCLUDED
for gender in train_test_split['train']:
    for pose in train_test_split['train'][gender]:
        for index in train_test_split['train'][gender][pose]:
            train_data.append(np.take(im_data_occl, index, axis=0))
            index = index - 6000 if index > 5999 else index
            train_annos.append(np.take(im_annos, index, axis=0))

for gender in train_test_split['test']:
    for pose in train_test_split['test'][gender]:
        for index in train_test_split['test'][gender][pose]:
            test_data.append(np.take(im_data_occl, index, axis=0))
            index = index - 6000 if index > 5999 else index
            test_annos.append(np.take(im_annos, index, axis=0))

train_data = np.array(train_data, dtype=np.float32)
test_data = np.array(test_data, dtype=np.float32)
train_annos = np.array(train_annos, dtype=np.float32)
test_annos = np.array(test_annos, dtype=np.float32)

In [None]:
# Review data
im_data_to_plot = im_data_occl # I hope this won't result in the array to be copied
plt.figure(figsize=(15, 15))
plt.subplot(221), plt.imshow(im_data_to_plot[0], cmap='gray'), plt.title("Female, pose 0")
plt.subplot(222), plt.imshow(im_data_to_plot[3000], cmap='gray'), plt.title("Male, pose 0")
plt.subplot(223), plt.imshow(im_data_to_plot[6000], cmap='gray'), plt.title("Female, pose 1")
plt.subplot(224), plt.imshow(im_data_to_plot[9000], cmap='gray'), plt.title("Male, pose 1")

In [None]:
# Dataset class
class BodyImages(Dataset):
    def __init__(self, X, y, is_training) -> None:
        assert X.shape[0] == y.shape[0], "Sample amounts must match in X and y"
        self.X = X
        self.y = y
        self.is_training = is_training

        if torch.cuda.is_available(): 
            self.device = torch.device('cuda')
        else:
            self.device = torch.device('cpu')

        super().__init__()
    
    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return torch.unsqueeze(torch.tensor(self.X[idx,:,:], requires_grad=True, device=self.device), 0), torch.tensor(self.y[idx, :], device=self.device)

## Create the CNN and define test function
Selected features (6): 
- height
- shoulder width
- right arm length
- left arm length
- pelvis circumifrence
- chest circumifrence


In [None]:
feature_amnt = 6
lin_ftr_amnt = 512

class CNN(Module):
    def __init__(self):
        super(CNN, self).__init__()

        self.layers = Sequential(
            Conv2d(in_channels=1, out_channels=feature_amnt, kernel_size=5),
            LeakyReLU(inplace=True),
            BatchNorm2d(num_features=feature_amnt),
            MaxPool2d(kernel_size=5, stride=2),
            # Second convolutional layer
            Conv2d(in_channels=6, out_channels=(feature_amnt*2), kernel_size=5),
            LeakyReLU(inplace=True),
            MaxPool2d(kernel_size=5, stride=2),
        )
        self.linear_layers = Sequential(
            Linear(in_features=23232, out_features=lin_ftr_amnt),
            LeakyReLU(inplace=True),
            Linear(in_features=lin_ftr_amnt, out_features=feature_amnt)
        )
    
    def forward(self, x):
        x = self.layers(x)
        x = torch.flatten(x, start_dim=1, end_dim=-1)
        x = self.linear_layers(x)
        return x

def test(dataloader, model, loss_fn, results, fold):
    # Enable evaluation mode for model to disable any training-mode specific model behavior,
    # just in case our model behaves differently in training and eval mode
    # (not to be mistaken for a mechanism to disable gradient computation).
    model.eval()

    num_batches = len(dataloader)
    test_loss = 0

    i = 0
    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            pred = model(X)
            results[fold, i:i+len(y), 0] = y.cpu().numpy()
            results[fold, i:i+len(pred), 1] = pred.cpu().numpy()
            test_loss += loss_fn(pred, y).item()
            i += len(y)

    test_loss /= num_batches
    print(f"Avg loss: {test_loss:>8f} \n")

    return results

## Train the model

In [None]:
def reset_weights(m):
    for layer in m.children():
        if hasattr(layer, 'reset_parameters'):
            print(f'Reset trainable parameters of layer = {layer}')
            layer.reset_parameters()

# TODO: Now we only calculate MSE over all the 6 selected features.
#       Implement a model validation function where we calculate the MSE over specific feature.
#       This way we get a better idea how accurate our model really is for different HBDs.
def train(model, optim, loss_fn, dataloader):
    # Enable training mode for model to enable possible training-mode specific model behavior,
    # just in case our model behaves differently in training and eval mode.
    model.train()
    current_loss = 0.
    with tqdm.tqdm(dataloader, unit='batch') as b:
        for batch, (X, y) in enumerate(b):
            # Zero-out gradients
            optim.zero_grad()
            # Forward pass
            pred = model(X)
            # Compute loss
            loss = loss_fn(pred, y)
            # Backward pass and gradient computation
            loss.backward()
            # Parameter update
            optim.step()

if torch.cuda.is_available():
    torch.cuda.empty_cache()

lr = 0.001 # lr acc. to. paper: 0.01
momentum = 0.9
batch_size = 100
epochs = 20
k_folds = 5

train_data_ds = BodyImages(train_data, train_annos, is_training=True)
test_data_ds = BodyImages(test_data, test_annos, is_training=False)
k_fold = KFold(n_splits=k_folds, shuffle=True)
dataset = ConcatDataset([train_data_ds, test_data_ds])
loss_fn = MSELoss()

results = np.empty((5, 2400, 2, 6))

if torch.cuda.is_available(): 
    device = torch.device('cuda')
    # network.cuda()
    loss_fn.cuda()
else:
    device = torch.device('cpu')
  
print("Using:", device, "\n")

# K-Fold cross validation model evaluation
for fold, (train_ids, test_ids) in enumerate(k_fold.split(dataset)):
    print(15 * "-" + "FOLD #{}".format(fold+1) + 15 * "-")

    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
    # Define data loaders for training and testing data in this fold
    trainloader = DataLoader(dataset, batch_size=batch_size, sampler=train_subsampler)
    testloader = DataLoader(dataset, batch_size=batch_size, sampler=test_subsampler)
    
    network = CNN()
    network.apply(reset_weights)
    optimizer = SGD(network.parameters(),lr=lr, momentum=momentum)

    if torch.cuda.is_available(): 
      device = torch.device('cuda')
      network.cuda()

    for e in range(epochs):
        print("-"*15, "epoch:", e + 1, "-"*15)
        train(network, optimizer, loss_fn, trainloader)

    torch.save(network.state_dict(), os.path.join(base, "model-{}-lr{}-mmnt{}.pth".format(fold+1, lr, momentum)))
    results = test(testloader, network, loss_fn, results, fold)

with open(os.path.join(base, "results-lr{}-mmnt{}.npy".format(lr, momentum)), "wb+") as f:
    np.save(f, results)
f.close()

# Evaluation
k-fold cross validation with 5 folds. Each fold contains 2400 instances

In [None]:
#results = np.load('/content/drive/MyDrive/data/results.npy')
print(results.shape)

In [None]:
accuracies_mad = []
for f in range(results.shape[0]):
    accuracies_mad.append(np.mean(np.absolute(results[f, :, 1, :] - results[f, :, 0, :]), axis=0))
print(accuracies_mad)
accuracies_mad_meanoverk = np.mean(accuracies_mad, axis=0)
print(accuracies_mad_meanoverk)

In [None]:
accuracies_rpe = []
for f in range(results.shape[0]):
    accuracies_rpe.append(np.mean(np.absolute(np.divide(results[f, :, 1, :] - results[f, :, 0, :], results[f, :, 0, :])), axis=0))
print(accuracies_rpe)
accuracies_rpe_meanoverk = np.mean(accuracies_rpe, axis=0)
print(accuracies_rpe_meanoverk)

In [None]:
print("Estimation errors for occluded images:")
print("----- MAD")
print("  height: {:.3f} mm".format(accuracies_mad_meanoverk[0] * 1000))
print("  shoulder_width: {:.3f} mm".format(accuracies_mad_meanoverk[1] * 1000))
print("  left_arm_length: {:.3f} mm".format(accuracies_mad_meanoverk[2] * 1000))
print("  right_arm_length: {:.3f} mm".format(accuracies_mad_meanoverk[3] * 1000))
print("  pelvis_circumference: {:.3f} mm".format(accuracies_mad_meanoverk[4] * 1000))
print("  chest_circumference: {:.3f} mm".format(accuracies_mad_meanoverk[5] * 1000))
print()
print("----- RPE")
print("  height: {:.3f}%".format(accuracies_rpe_meanoverk[0]*100))
print("  shoulder_width: {:.3f}%".format(accuracies_rpe_meanoverk[1]*100))
print("  left_arm_length: {:.3f}%".format(accuracies_rpe_meanoverk[2]*100))
print("  right_arm_length: {:.3f}%".format(accuracies_rpe_meanoverk[3]*100))
print("  pelvis_circumference: {:.3f}%".format(accuracies_rpe_meanoverk[4]*100))
print("  chest_circumference: {:.3f}%".format(accuracies_rpe_meanoverk[5]*100))

# Visualise results

In [None]:
model = CNN()
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
model.load_state_dict(torch.load('/content/drive/MyDrive/data/model.pth', map_location=device))

In [None]:
# Divide training and testing data
#train_data_occluded = []
test_data_occluded = []

#for gender in train_test_split['train']:
#    for pose in train_test_split['train'][gender]:
#        for indices in train_test_split['train'][gender][pose]:
#            train_data_occluded.append(np.take(im_data_occl, indices, axis=0))

for gender in train_test_split['test']:
    for pose in train_test_split['test'][gender]:
        for indices in train_test_split['test'][gender][pose]:
            test_data_occluded.append(np.take(im_data_occl, indices, axis=0))

#train_data_occluded = np.array(train_data_occluded, dtype=np.float32)
test_data_occluded = np.array(test_data_occluded, dtype=np.float32)

In [None]:
batch_size = 100
loss_fn = MSELoss()
if torch.cuda.is_available():
    loss_fn.cuda()
    model.cuda()

occluded_test_data_ds = BodyImages(test_data_occluded, test_annos, is_training=False)
occluded_test_dataloader = DataLoader(occluded_test_data_ds, batch_size=batch_size)
occluded_results = test(occluded_test_dataloader, model, loss_fn, np.empty((1, test_annos.shape[0], 2, test_annos.shape[1])), 0)

In [None]:
occ_accuracies_mad = []
for f in range(occluded_results.shape[0]):
    occ_accuracies_mad.append(np.mean(np.absolute(occluded_results[f, :, 1, :] - occluded_results[f, :, 0, :]), axis=0))
print(occ_accuracies_mad)
occ_accuracies_mad = list(occ_accuracies_mad[0])

In [None]:
occ_accuracies_rpe = []
for f in range(occluded_results.shape[0]):
    occ_accuracies_rpe.append(np.mean(np.absolute(np.divide(occluded_results[f, :, 1, :] - occluded_results[f, :, 0, :], occluded_results[f, :, 0, :]+.000001)), axis=0))
print(occ_accuracies_rpe)
occ_accuracies_rpe = list(occ_accuracies_rpe[0])

In [None]:
print("Estimation errors for occluded images:")
print("----- MAD")
print("  height: {:.3f}".format(occ_accuracies_mad[0]))
print("  shoulder_width: {:.3f}".format(occ_accuracies_mad[1]))
print("  left_arm_length: {:.3f}".format(occ_accuracies_mad[2]))
print("  right_arm_length: {:.3f}".format(occ_accuracies_mad[3]))
print("  pelvis_circumference: {:.3f}".format(occ_accuracies_mad[4]))
print("  chest_circumference: {:.3f}".format(occ_accuracies_mad[5]))
print()
print("----- RPE")
print("  height: {:.3f}%".format(occ_accuracies_rpe[0]*100))
print("  shoulder_width: {:.3f}%".format(occ_accuracies_rpe[1]*100))
print("  left_arm_length: {:.3f}%".format(occ_accuracies_rpe[2]*100))
print("  right_arm_length: {:.3f}%".format(occ_accuracies_rpe[3]*100))
print("  pelvis_circumference: {:.3f}%".format(occ_accuracies_rpe[4]*100))
print("  chest_circumference: {:.3f}%".format(occ_accuracies_rpe[5]*100))

In [None]:
# Plot first NUM_ELEMENTS elements of set of occluded images together with abs. differences and rel. perc. errors for each hbd

NUM_ELEMENTS = 10

abs_diffs = np.absolute(occluded_results[0, 0:NUM_ELEMENTS, 1, :] - occluded_results[0, 0:NUM_ELEMENTS, 0, :])
rel_per_errs = np.absolute(np.divide(occluded_results[0, 0:NUM_ELEMENTS, 1, :] - occluded_results[0, 0:NUM_ELEMENTS, 0, :], occluded_results[0, 0:NUM_ELEMENTS, 0, :]+.000001))

plt.figure(figsize=(18, 9*(NUM_ELEMENTS+1)/2))
for index in range(NUM_ELEMENTS):
    # During test items in test dataset should have been queried by used simple
    # DataLoader in an ordered manner starting with index zero
    #assert test_annos[index, :].all == occluded_results[0, index, 0, :].all
    
    img = test_data_occluded[index,:,:]
    #annos = occluded_results[0, index, 0, :]
    #annos = test_annos[index, :]
    #predicts = occluded_results[0, index, 1, :]

    plt.subplot((NUM_ELEMENTS+1)/2, 2, index+1)
    plt.title(f"{abs_diffs[index]}\n{rel_per_errs[index]}")
    plt.imshow(img, cmap='gray')


In [None]:
example_im = np.array([np.take(im_data_occl, 9000, axis=0)])
example_ann = np.array([np.take(im_annos, 3000, axis=0)])

In [None]:
example_res = np.empty((1, 1, 2, 6))
example_dl = DataLoader(BodyImages(example_im, example_ann, is_training=False))
example_res = test(example_dl, network, loss_fn, example_res, 0)

plt.figure(figsize=(5, 5))
plt.imshow(example_im[0], cmap='gray'), plt.title("Male pose 1")

mad = np.mean(np.absolute(results[0, :, 1, :] - results[0, :, 0, :]), axis=0)
rpe = np.mean(np.absolute(np.divide(results[0, :, 1, :] - results[0, :, 0, :], results[0, :, 0, :])), axis=0)

print("Estimation errors for example image:")
print("----- MAD")
print("  height: {:.3f} mm".format(mad[0] * 1000))
print("  shoulder_width: {:.3f} mm".format(mad[1] * 1000))
print("  left_arm_length: {:.3f} mm".format(mad[2] * 1000))
print("  right_arm_length: {:.3f} mm".format(mad[3] * 1000))
print("  pelvis_circumference: {:.3f} mm".format(mad[4] * 1000))
print("  chest_circumference: {:.3f} mm".format(mad[5] * 1000))
print()
print("----- RPE")
print("  height: {:.3f}%".format(rpe[0]*100))
print("  shoulder_width: {:.3f}%".format(rpe[1]*100))
print("  left_arm_length: {:.3f}%".format(rpe[2]*100))
print("  right_arm_length: {:.3f}%".format(rpe[3]*100))
print("  pelvis_circumference: {:.3f}%".format(rpe[4]*100))
print("  chest_circumference: {:.3f}%".format(rpe[5]*100))