# Compare keypoints metrics on the original, SR and upscaled prediction iamges

In [1]:
import torch
import torchvision
from torchvision.models.detection import keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights

In [2]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
import numpy as np
np.random.seed(42)

In [3]:
print(torch.cuda.is_available())

True


In [4]:
model = torchvision.models.detection.keypointrcnn_resnet50_fpn(weights=KeypointRCNN_ResNet50_FPN_Weights.DEFAULT)
model.eval()

KeypointRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(640, 672, 704, 736, 768, 800), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.

In [5]:
from torchvision.io import read_image
from torch.utils.data import DataLoader
import sys
sys.path.insert(0, '/datadrive/facediffusion')
from fdh256_dataset import FDF256Dataset

dataset_path = '/datadrive/FDF/dataset/val'
dataset = FDF256Dataset(dirpath=dataset_path, load_keypoints=True, load_impath=True)
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=22,
                            prefetch_factor=2, persistent_workers=True, pin_memory=True)

Could not load pyspng. Defaulting to pillow image backend.
Dataset loaded from: /datadrive/FDF/dataset/val. Number of samples:6531


In [6]:
EMA = False

In [7]:
pred_path = '/datadrive/facediffusion/Coginfocom/images-ema' if EMA else '/datadrive/facediffusion/Coginfocom/images'
sr_path = '/datadrive/facediffusion/Coginfocom/DFDNet-SR-Coginfocom-EMA' if EMA else '/datadrive/facediffusion/Coginfocom/DFDNet-SR-Coginfocom'

# Annotations

The annotation tensor is a 7x2 size tensor where the keypoints are:
- nose
- left eye
- right eye
- left ear
- right ear
- left shoulder
- right shoulder

The model output tensor is a 17x3 tensor, where the coordinates are the x, y, and visibility (0 is invisible, 1 is visible). The keypoints are:
- nose
- left eye
- right eye
- left ear
- right ear
- left shoulder
- right shoulder
- left elbow
- right elbow
- left wrist
- right wrist
- left hip
- right hip
- left knee
- right knee
- left ankle
- right ankle


In [8]:
# from https://learnopencv.com/human-pose-estimation-using-keypoint-rcnn-in-pytorch/, rewritten a bit

from torch import Tensor


def filter_keypoints_per_person(all_keypoints, all_scores, confs, keypoint_threshold=2, conf_threshold=0.9):
    # iterate for every person detected
    for person_id in range(len(all_keypoints)):
      # check the confidence score of the detected person
      if confs[person_id]>conf_threshold:
        # grab the keypoint-locations for the detected person
        keypoints:Tensor = all_keypoints[person_id, ...]
        # grab the keypoint-scores for the keypoints
        scores: Tensor = all_scores[person_id, ...]
        # iterate for every keypoint-score
        for kp in range(len(scores)):
            # check the confidence score of detected keypoint
            if scores[kp] < keypoint_threshold:
                # convert the keypoint float-array to a python-list of integers
                keypoints[kp, 2] = 0
        return keypoints # return the first person with enough confidence
    
    return None

In [9]:
import latexify
import math

@latexify.with_latex
def DE(x, y, xhat, yhat, w) -> float:
    return math.sqrt((x - xhat)**2 + (y - yhat)**2) / w

DE

<latexify.frontend.LatexifiedFunction at 0x7f0dc0420b50>

In [10]:
def RMSE(truth, pred):
    sum = 0.0
    N = 0
    for i in range(0, 7):
        x, y, v = truth[i]
        x_hat, y_hat, v_hat = pred[i]
        if v == 0 or v_hat == 0:
            continue
        sum += ((x - x_hat)**2 + (y - y_hat)**2)
        N += 1
    
    if N == 0:
        return None
    
    return math.sqrt(sum / float(N))

In [11]:
import numpy as np
kpt_oks_sigmas = np.array([.26, .25, .25, .35, .35, .79, .79])/10.0 * 256# from https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/cocoeval.py#L523


def OKS(y_true, y_pred, visibility, bbox_area: float):
    SCALE = bbox_area / float(256**2)
    # Compute the L2/Euclidean Distance
    distances = np.linalg.norm(y_pred - y_true, axis=-1)
    # Compute the exponential part of the equation
    exp_vector = np.exp(-(distances**2) / (2 * (SCALE**2) * (kpt_oks_sigmas**2)))
    # The numerator expression
    numerator = np.dot(exp_vector, visibility.astype(bool).astype(int))
    # The denominator expression
    denominator = np.sum(visibility.astype(bool).astype(int))
    return numerator / denominator

In [12]:
experiment_name = "DFDNet-EMA-kpts" if EMA else "DFDNet-kpts"

In [13]:
from tqdm import tqdm
import os
from PIL import Image
from einops import rearrange
import matplotlib.pyplot as plt
import logging

logging.basicConfig(filename=experiment_name,
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)

logging.info("Running kpts comparison")

logger = logging.getLogger('DFDNet')

pred_sum_rmse = 0.0
pred_sum_de = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
pred_sum_oks = 0.0
sr_sum_rmse = 0.0
sr_sum_de = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
sr_sum_oks = 0.0

pred_num_rmse = 0
pred_num_de = 0
pred_num_oks = 0
sr_num_rmse = 0
sr_num_de = 0
sr_num_oks = 0

model = model.to('cuda')

for i in tqdm(range(len(dataset))):
        
    original_image = dataset[i]['img'] # tensor of shape (256, 256, 3)
    original_kpts = dataset[i]['keypoints'] # tensor of shape (7, 2)
    original_kpts = original_kpts * 256 # rescale with image size
    # Append the tensor with a visibility flag
    visibility = np.where(np.all((original_kpts >= 0) & (original_kpts <= 256), axis=1), 1.0, 0.0)
    original_kpts = np.concatenate((original_kpts, visibility.reshape(-1, 1)), axis=1)
    
    face_bbox = dataset[i]["face_bbox"]
    x0, y0, x1, y1 = face_bbox
    face_width: float = (x1 - x0).item()
    face_height: float = (y1 - y0).item()
    
    imname = str(dataset[i]['impath']).split('/')[-1]
    img_idx = imname.split('.')[0]
        
    logger.info("Evaluating image " + imname)
    #logger.info("Original kpts = " + str(original_kpts))
    
    scaled_image_path = os.path.join(pred_path, f'{img_idx}.jpeg')
    with Image.open(scaled_image_path) as fp:
        fp = fp.resize((256, 256), Image.BICUBIC) # bicubic interpolation
        pred_image = np.array(fp, dtype=np.float32)
        pred_image = pred_image / 256.0
        pred_image = rearrange(pred_image, "h w c -> c h w")
        pred_image = pred_image[0:3, :, :] # remove the alpha channel, if exists
        pred_image = torch.from_numpy(pred_image).to('cuda')
    if pred_image is None:
        continue
    
    sr_image_path = os.path.join(sr_path, f'{img_idx}.png')
    with Image.open(sr_image_path) as fp:
        sr_image = np.array(fp, dtype=np.float32)
        sr_image = sr_image / 256.0
        sr_image = rearrange(sr_image, "h w c -> c h w")
        sr_image = sr_image[0:3, :, :] # remove the alpha channel, if exists
        sr_image = torch.from_numpy(sr_image).to('cuda')
    if sr_image is None:
        continue
    
    predictions = model([pred_image, sr_image])
    pred_kpts: Tensor = predictions[0]['keypoints'] # tensor of shape (N, 17, 3)
    pred_kpts_scores: Tensor = predictions[0]['keypoints_scores']
    pred_scores: Tensor = predictions[0]['scores']
    filtered_pred_kpts = filter_keypoints_per_person(pred_kpts, pred_kpts_scores, pred_scores)
    if filtered_pred_kpts is not None:
        filtered_pred_kpts = filtered_pred_kpts[0:7] # shape (7, 3)
    else:
        continue
    #logger.info("Predicted kpts = " + str(filtered_pred_kpts))
    
    sr_kpts: Tensor = predictions[1]['keypoints'] # tensor of shape (N, 17, 3)
    sr_kpts_scores: Tensor = predictions[1]['keypoints_scores']
    sr_scores: Tensor = predictions[1]['scores']
    filtered_sr_kpts = filter_keypoints_per_person(sr_kpts, sr_kpts_scores, sr_scores)
    if filtered_sr_kpts is not None:
        filtered_sr_kpts = filtered_sr_kpts[0:7] # shape (7, 3)
    else:
        continue
    
    
    pred = filtered_pred_kpts.cpu().detach().numpy()
    sr = filtered_sr_kpts.cpu().detach().numpy()
    
    
    
    
    
    # Detection error PRED
    for i in range(0, 7):
        x, y, v = original_kpts[i]
        x_hat, y_hat, v_hat = pred[i]
        if v == 1 and v_hat == 1:
            de_i: float = DE(x, y, x_hat, y_hat, face_width)
            logger.info(f"Pred DE for kpt {i} = {de_i}")
            pred_sum_de[i] += de_i
            pred_num_de += 1
        else:
            logger.warning("No DE for kpt " + str(i))    
    
    # Detection error SR
    for i in range(0, 7):
        x, y, v = original_kpts[i]
        x_hat, y_hat, v_hat = sr[i]
        if v == 1 and v_hat == 1:
            de_i: float = DE(x, y, x_hat, y_hat, face_width)
            logger.info(f"SR DE for kpt {i} = {de_i}")
            sr_sum_de[i] += de_i
            sr_num_de += 1
        else:
            logger.warning("No DE for kpt " + str(i))    
    
    
    
    
    # RMSE
    rmse = RMSE(original_kpts, pred)
    logger.info("PRED RMSE = " + str(rmse))
    if rmse is not None:
        pred_sum_rmse += rmse
        pred_num_rmse += 1
    
    # RMSE
    rmse = RMSE(original_kpts, sr)
    logger.info("SR RMSE = " + str(rmse))
    if rmse is not None:
        sr_sum_rmse += rmse
        sr_num_rmse += 1
    
    
    
    # OKS
    oks: float = OKS(original_kpts[:, 0:1], pred[:, 0:1], original_kpts[:, 2].astype(int), (face_width * face_height))
    logger.info(f'PRED OKS = {oks}')
    pred_sum_oks += oks
    pred_num_oks += 1
    
    # OKS
    oks: float = OKS(original_kpts[:, 0:1], sr[:, 0:1], original_kpts[:, 2].astype(int), (face_width * face_height))
    logger.info(f'SR OKS = {oks}')
    sr_sum_oks += oks
    sr_num_oks += 1




    
# Compute averages
pred_avg_rmse = pred_sum_rmse / pred_num_rmse
pred_avg_de = [de_kpt / pred_num_de for de_kpt in pred_sum_de]
pred_avg_oks = pred_sum_oks / pred_num_oks

sr_avg_rmse = sr_sum_rmse / sr_num_rmse
sr_avg_de = [de_kpt / sr_num_de for de_kpt in sr_sum_de]
sr_avg_oks = sr_sum_oks / sr_num_oks

logger.info("--------EVALUATION COMPLETE--------")
logger.info(f'Average RMSE: {pred_avg_rmse}')
logger.info(f'Average DEs: {pred_avg_de}')
logger.info(f'Average OKS: {pred_avg_oks}')
logger.info(f'Average RMSE: {sr_avg_rmse}')
logger.info(f'Average DEs: {sr_avg_de}')
logger.info(f'Average OKS: {sr_avg_oks}')

100%|██████████| 6531/6531 [12:25<00:00,  8.76it/s]


In [14]:
print("--------EVALUATION COMPLETE--------")
print(f'Average RMSE: {pred_avg_rmse}')
print(f'Average DEs: {pred_avg_de}')
print(f'Average OKS: {pred_avg_oks}')
print(f'Average RMSE: {sr_avg_rmse}')
print(f'Average DEs: {sr_avg_de}')
print(f'Average OKS: {sr_avg_oks}')

--------EVALUATION COMPLETE--------
Average RMSE: 30.958071430318856
Average DEs: [0.027892838160655353, 0.025184058072169502, 0.026695445286606032, 0.04493417460006078, 0.04759476434332469, 0.002892176121812578, 0.004220795199165836]
Average OKS: 0.3896908707629083
Average RMSE: 16.649107928472986
Average DEs: [0.010988184569279608, 0.010622688118413085, 0.010650759713635001, 0.025262061460247304, 0.024458929986516408, 0.003657863030239219, 0.005398846665224996]
Average OKS: 0.567310674406386
