<a href="https://colab.research.google.com/github/aimlrl/Human-Pose-Estimation-MS-COCO/blob/master/Complete_Pose_Estimation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The problem of pose estimation is to detect the pose of the person in the image. Here if you see the following images: 

<img src = 'https://drive.google.com/uc?id=1I4s10X1xXNEQCelZnJqurfACmvvI8FO_'>

# Then you can clearly tell the pose of the person in each image. Well the question arrises is that how we were able to clearly tell the pose of the person. 

# Well, if you notice clearly in the above images then the pose of the person is determined by the positions of the joints in the body visibl in the images. Now, the question arrises is that overall how many joints are there in the body of the person. The answer is that there are overall 17 joints in the body of the person as shown in the image below on the left: 

<img src = 'https://drive.google.com/uc?id=1sEvIgYdzovWcQ5gvYJdriaxkztDHpRGh'>

# As can be observed that there are 17 points on the image of the human body and majority of them are location of joints in the body. All of these 17 (in the image 18 are shown but 17 points is the standard in Human Pose Esimation and we take 17) points are called Keypoints. Out of the 17 keypoints, maximum keypoints are the joints in the body (Keypoint Indices 2, 1, 5, 3, 6, 4, 8, 11, 7, 9, 12, 10, 13) and the remaining keypoints (Keypoint indices 16, 14, 15, 17) indicate the position of Right Ear, Right Eye, Left Eye and Left Ear. 

# Change in the location of keypoints in the image marks the change in the posture or pose of the human body in the image. Therefore, we can say that the problem of Human Pose Estimation can be solved by detecting the location of all these 17 keypoints if possible (will explain why we  mentioned "if possible") to roughly (will explain why we mentioned "roughly") pixel location (at which the the keypoint is present) level accuracy. 

# Now, why we mentioned "if possible". Well we mentioned "if possible" because in some scenarios, the photographs of the people are clicked in such a manner that some keypoints are not present in the photograph as shown below: 

<img src = 'https://drive.google.com/uc?id=1WFTITXOtnW3amVh1patDRJQf-GXFEXkz'>

# In the picture above, it can be seen that not all the keypoints are present for the person in red T-shirt (Please note that this image has been taken just for reference to show you and explain you the concept, we are here only performing single person pose estimation). So, something like that can also happen with us in some of the images and hence therefore it won't be possible for us to detect all the 17 keypoints. 

# In other cases, it may happen that the image of a person is occluded by an object or it's been taken in dark region or with less light then we can still detect keypoints as shown in the image below: 

<img src = 'https://drive.google.com/uc?id=1vmS-bVNZ97U2VDqxT0CWYoOBg9C_UhDX'>



# So, now the question arrises is that what will be the training data for this task ? Well the training data for this task is going to be our input image of a single person which we are going to insert it into our CNN and the labels will be the location of all the keypoints present in the image. Now, the question arrises is that how we can define the location of keypoints. The location of keypoints can be defined by (x,y) coordinates. But, as it can be the case that some keypoints are not even present in the image or are present but occluded by an object or in the region where the light is less. Therefore, we will be having a validity scalar value (validity = 0 for a keypoint if not present in the image, validity = 1 if occluded, validity = 2 if not occluded but present in the image having very less light) for each of the 17 keypoints present in the image. Therefore, every keypoint of all the 17 keypoints, there will be a 3d vector (x,y,validity) so for all the 17 keypoints, there will be a (17,3) matrix. 

#Therefore our training data will consist of Input Image of a single person and labels as a matrix of shape (17,3) and we will be training our network to output a prediction of shape (17,3) where this prediction will be "roughly" giving the location of keypoints at pixel location level accuracy. 

# Now, getting the predictions right to pixel location level accuracy is very difficult and it may also result in a lot of fluctuations in the loss function (exploding or vanishing gradients of the loss function), therefore, we will be little bit noise to the labels. 

# In order to perform pose estimation on the images of a single person first, only the image of the person needs to be cropped out of the whole image. We should not train our network for performing pose estimation on the whole image and just on the cropped image of the person from inside the whole image (Now, think about it that why we wan to do something like that ?). So, to crop out an image of a person from the whole image, we need bounding box coordinates of the person in the image. In short, we have to detect person in the image and that can be done by an Object Detection Algorithm which will now be acting as a person detector. Therefore, before performing Human Pose Estimation, we have to forst perform Object Detection on the image and then crop out the image of a person from the whole image and warp it and then input it into the human pose estimation neural network to output a prediction of (17,3). Therefore, the flow of Human Pose Estimation will be: 

# Whole Image --> Object detection Neural Network --> Cropped Image of the person from the whole image --> image warping --> Human Pose Estimation Neural Network --> (17,3) prediction. 

# So, usually we have to train or use a pretrained object detector to detect person in the image through the bounding box and crop out the person image, warp it to some fixed size and then train Huamn Pose Estimation network through these warped images with labels as ground truth keypoint coordinates in the form of (17,3) matrix. 

# But the dataset which we are using to train our Human Pose Estimation Neural Network is MS COCO (Common Objects in Context) 20 GB dataset. This dataset is already annotated with the ground truth bounding boxes of different objects in the image (useful for training object detection) and 17 ground truth keypoints in the image of a person. 

# But it is our luck you can say that our Object Detector is already trained on MS COCO dataset, therefore we dont need to first train our object detector in this case and we can straightly crop out person images from the whole images using ground truth bounding boxes of persons present in our dataset and warp them and pass them to our created Human Pose Estimation Neural Network to output keypoint predictions. For object detection, you can use Tensorflow 2 Object Detection API. 

In [None]:
import os
import pandas as pd
import tensorflow.compat.v1 as tf
from PIL import Image
from collections import namedtuple
import glob
import io

In [None]:
cd /content/drive/MyDrive

# Now, lets first download the dataset

In [None]:
! wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip

In [None]:
! unzip /content/drive/MyDrive/annotations_trainval2017.zip

In [None]:
import numpy as np
import json

In [None]:
def compile_examples(json_annotations):

  file_handle = open(json_annotations)
  annotations = json.load(file_handle)

  image_urls = list()

  for image_detail in annotations["images"]:
    image_urls.append(image_detail["coco_url"])

  image_ids = list()

  for single_annotation in annotations["annotations"]:
    image_ids.append(single_annotation["image_id"])

  urls_dict = dict()

  for img_url in image_urls:
    urls_dict[img_url.split("/")[-1]] = img_url

  image_details = dict()
  images_added = list()

  for single_annotation in annotations["annotations"]:

    img_filename = "000000"+str(single_annotation["image_id"])+".jpg"

    keypoints = np.array(single_annotation["keypoints"]).reshape(17,3)
    gt_validity = keypoints[:,2] > 0

    if img_filename in list(urls_dict.keys()) and sum(gt_validity) > 0 \
    and single_annotation["iscrowd"] == 0 and single_annotation["bbox"][2] > 48 and single_annotation["bbox"][3] > 64:

      images_added.append(img_filename)
      image_details[img_filename] = dict()
      image_details[img_filename]["url"] = urls_dict[img_filename]
      image_details[img_filename]["num_keypoints"] = single_annotation["num_keypoints"]
      image_details[img_filename]["bbox"] = single_annotation["bbox"]
      image_details[img_filename]["keypoints"] = single_annotation["keypoints"]

  return image_details

# Now, try to understand that what is going on the function above by dry running it and if possible, try to make it run parallely using concurrent.futures or multiprocessing library. 

In [None]:
training_annotations = compile_examples("/content/drive/MyDrive/annotations/person_keypoints_train2017.json")

json_string = json.dumps(training_annotations)
json_file_handle = open("training_annotations.json","w")
json_file_handle.write(json_string)
json_file_handle.close()

In [None]:
cv_annotations = compile_examples("/content/drive/MyDrive/annotations/person_keypoints_val2017.json")

json_string = json.dumps(cv_annotations)
json_file_handle = open("cv_annotations.json","w")
json_file_handle.write(json_string)
json_file_handle.close()

In [None]:
import requests
import time
from requests.exceptions import ConnectionError

In [None]:
! sudo python3 -m pip install "requests[security]"

In [None]:
! pip install pyopenssl ndg-httpsclient pyasn1

In [None]:
def download_image(image_filename):

  tries = 0
  while True:
    tries = tries + 1
    try:
      r = requests.get(training_annotations[image_filename]["url"])
      file_handle = open(os.path.join("/content/drive/MyDrive/person_train_images",image_filename),"wb")
      file_handle.write(r.content)
      file_handle.close()
      print("Written {}".format(image_filename))
      break
    except ConnectionError as err:
      if tries == 11:
        raise err
      else:
        time.sleep(1)

In [None]:
def download_cv_image(image_filename):

  tries = 0
  while True:
    tries = tries + 1
    try:
      r = requests.get(cv_annotations[image_filename]["url"])
      file_handle = open(os.path.join("/content/drive/MyDrive/person_cv_train_images",image_filename),"wb")
      file_handle.write(r.content)
      file_handle.close()
      print("Written {}".format(image_filename))
      break
    except ConnectionError as err:
      if tries == 11:
        raise err
      else:
        time.sleep(1)

In [None]:
os.cpu_count()

In [None]:
from concurrent.futures import ThreadPoolExecutor

In [None]:
with ThreadPoolExecutor(max_workers=4) as executor:
  executor.map(download_image,training_annotations.keys())

In [None]:
with ThreadPoolExecutor(max_workers=4) as executor:
  executor.map(download_cv_image,cv_annotations.keys())

In [None]:
len(os.listdir("/content/drive/MyDrive/person_train_images"))

In [None]:
len(os.listdir("/content/drive/MyDrive/person_cv_train_images"))

In [None]:
import matplotlib.pyplot as plt
import cv2
from PIL import UnidentifiedImageError
from scipy.ndimage import gaussian_filter

# Try to complete the code above. Let's give you a hint, we are actually trying to generate the ground truth labels (keypoints) but not in the form of a matrix of shape (17,3) but in 3D matrix of shape:

# (1/4*Width of the warped cropped person image, 1/4*Height of the warped cropped person image, 17)

# Therefore, it can be said that the ground truth matrix of (17,3) is actually been converted into an image whose dimensions will be equal to the dimensions of the input warped image with 17 channels, called ground truth heatmap. Each channel of the ground truth heatmap will ba having all the pixels with zero pixel intensity except at the location of one of a ground truth keypoint location. Furthermore, to add little bit noise to the pixel intensity at the location of one of the keypoints at it's respective channel of the heatmap, all the channels are convolved with gaussian filter.

# Now, why the ground truth label is called heatmap. It is called heatmap because there are 17 channels of the ground truth heatmap with each channel corresponding to one of the ground truth keypoints and the region in that specific channel of a ground truth keypoint is brighten at the location of that specific keypoint in the image. 

# To know more about gaussian filter, you can navigate to this link: 
# https://docs.opencv.org/4.x/d4/d13/tutorial_py_filtering.html

#So, the size of the input warped image will be (256,192,3). This size has been actually taken from the following research paper: 
# https://arxiv.org/abs/1804.06208

# Therefore, the gound truth label matrix of keypoints of shape (17,3) will be converted into ground truth heatmap of shape (1/4*256, 1/4*192, 17) = (64,48,17). We are converting the output into a smaller but not the same size as input warped image size becauase we don't want to increase the computational complexity of the training and loss function computation and it is also recommended in the above paper. 

In [None]:
def create_gt_heatmap_labels(img,annotations,resize_shape=(64,48)):

  gt_heatmap = np.zeros(shape=(17,resize_shape[0],resize_shape[1]))
  keypoints = np.array(annotations[img]["keypoints"]).reshape(17,3)
  xmin,ymin,w,h = annotations[img]["bbox"]
  bbox_offset = np.array([xmin,ymin,0])
  bbox_dims = np.array([w,h,1])
  gt_heatmap_dims = np.array([resize_shape[1],resize_shape[0],1])
  keypoints = np.round((keypoints - bbox_offset)*gt_heatmap_dims/bbox_dims).astype(int)

  for i in range(17):

    if keypoints[i,2] > 0:
      y = keypoints[i,0]
      x = keypoints[i,1]

      if x < 0 or y < 0 or x >= resize_shape[0] or y >= resize_shape[1]:
        keypoints[i,2] = 0.0
        continue

      gt_heatmap[i,x,y] = 1.0
      gt_heatmap[i,:,:] = gaussian_filter(input=gt_heatmap[i,:,:],sigma=2,mode="constant",cval=0.0)
      gt_heatmap[i,:,:] = gt_heatmap[i,:,:]/np.max(gt_heatmap[i,:,:])

  gt_validity = keypoints[:,2] > 0
  gt_validity = gt_validity.reshape(gt_validity.shape[0],1,1)

  return gt_heatmap.astype(float), gt_validity.astype(float)

# The below function is the custom data generator for generating the batches of training examples. Every single batch will be having 32 warped cropped images of persons that is the size of each batch of input images will be (32,256,192,3) and also the respective heatmaps of each of the warped cropped images in the batch, that is (32,64,48,17). This custom data generator is going to get the ground truth heatmaps from the function above by calling it. 

In [None]:
def train_generator(train_images,annotations,batch_size=32,resize_shape=(256,192)):

  num_train_images = len(train_images)
  train_images = np.array(train_images)
  img_norm_mean = np.array([0.485,0.456,0.456])
  img_norm_std = np.array([0.229,0.224,0.225])

  while True:

    for offset in range(0,num_train_images,batch_size):
      
      batch_images = list(np.random.choice(train_images,size=batch_size,replace=False))
      images_batch = []
      gt_heatmaps_batch = []
      gt_validities_batch = []

      for img in batch_images:

        try:
          image = Image.open(os.path.join("/content/drive/MyDrive/person_train_images",img))
          xmin,ymin,w,h = annotations[img]["bbox"]
          cropped_image = image.resize(size=(resize_shape[1],resize_shape[0]),box=(xmin,ymin,xmin+w,ymin+h))
          cropped_image = np.array(cropped_image)

          if len(cropped_image.shape) != 3:
            cropped_image = np.stack((cropped_image,)*3,axis=-1)

          cropped_image = cropped_image.astype(float)/255.0
          cropped_image = (cropped_image - img_norm_mean)/img_norm_std

          cropped_image = cropped_image.reshape(resize_shape[0],resize_shape[1],3)
          images_batch.append(cropped_image)

          gt_heatmap,gt_validity = create_gt_heatmap_labels(img,annotations)
          gt_heatmaps_batch.append(gt_heatmap)
          gt_validities_batch.append(gt_validity)

        except UnidentifiedImageError as err:
          continue

      images_batch = np.array(images_batch)
      gt_heatmaps_batch = np.array(gt_heatmaps_batch)
      gt_validities_batch = np.array(gt_validities_batch)

      yield images_batch,gt_heatmaps_batch,gt_validities_batch

# Now, it's upto you to implement the Cross Validation data generator

In [None]:
def cv_generator(cv_images,annotations,batch_size=32,resize_shape=(256,192)):

  num_cv_train_images = len(cv_images)
  img_norm_mean = np.array([0.485,0.456,0.456])
  img_norm_std = np.array([0.229,0.224,0.225])

  while True:

    for offset in range(0,num_cv_train_images,batch_size):
      
      batch_images = cv_images[offset:offset+batch_size]
      images_batch = []
      gt_heatmaps_batch = []
      gt_validities_batch = []

      for img in batch_images:

        try:
          image = Image.open(os.path.join("/content/drive/MyDrive/person_cv_train_images",img))
          xmin,ymin,w,h = annotations[img]["bbox"]
          cropped_image = image.resize(size=(resize_shape[1],resize_shape[0]),box=(xmin,ymin,xmin+w,ymin+h))
          cropped_image = np.array(cropped_image)

          if len(cropped_image.shape) != 3:
            cropped_image = np.stack((cropped_image,)*3,axis=-1)

          cropped_image = cropped_image.astype(float)/255.0
          cropped_image = (cropped_image - img_norm_mean)/img_norm_std

          cropped_image = cropped_image.reshape(resize_shape[0],resize_shape[1],3)
          images_batch.append(cropped_image)

          gt_heatmap,gt_validity = create_gt_heatmap_labels(img,annotations)
          gt_heatmaps_batch.append(gt_heatmap)
          gt_validities_batch.append(gt_validity)

        except UnidentifiedImageError as err:
          continue

      images_batch = np.array(images_batch)
      gt_heatmaps_batch = np.array(gt_heatmaps_batch)
      gt_validities_batch = np.array(gt_validities_batch)

      yield images_batch,gt_heatmaps_batch,gt_validities_batch

In [None]:
from keras.layers import Conv2D,Conv2DTranspose,BatchNormalization
from keras.applications import resnet_v2
from keras.models import Model
from keras.layers import Input
from keras.layers import ReLU
from keras.layers import Reshape
from keras.initializers import random_normal
import tensorflow as tf
import keras

# Read the above research paper and implement the Human Pose Estimation Neural Network using Keras:
# https://arxiv.org/abs/1804.06208

# and fill up the following function. 

In [None]:
def create_pretrained_pose_resnet(resize_shape=(64,48)):

  images_batch = Input(shape=(256,192,3))
  pretrained_resnet = resnet_v2.ResNet50V2(include_top=False,input_shape=(256,192,3))

  pretrained_resnet.trainable = False

  pretrained_resnet_output = pretrained_resnet(images_batch,training=False)

  pretrained_resnet_out = Conv2DTranspose(filters=256,kernel_size=4,strides=2,padding="same",
                                          kernel_initializer=random_normal(stddev=0.001))(pretrained_resnet_output)
  pretrained_resnet_out = BatchNormalization()(pretrained_resnet_out)
  pretrained_resnet_out = ReLU()(pretrained_resnet_out)

  pretrained_resnet_out = Conv2DTranspose(filters=256,kernel_size=4,strides=2,padding="same",
                                          kernel_initializer=random_normal(stddev=0.001))(pretrained_resnet_out)
  pretrained_resnet_out = BatchNormalization()(pretrained_resnet_out)
  pretrained_resnet_out = ReLU()(pretrained_resnet_out)

  pretrained_resnet_out = Conv2DTranspose(filters=256,kernel_size=4,strides=2,padding="same",
                                          kernel_initializer=random_normal(stddev=0.001))(pretrained_resnet_out)
  pretrained_resnet_out = BatchNormalization()(pretrained_resnet_out)
  pretrained_resnet_out = ReLU()(pretrained_resnet_out)

  pretrained_resnet_out = Conv2D(filters=17,kernel_size=1,kernel_initializer=random_normal(stddev=0.001))(pretrained_resnet_out)

  pretrained_resnet_out = Reshape((17,resize_shape[0],resize_shape[1]))(pretrained_resnet_out)

  pretrained_pose_resnet = Model(images_batch,pretrained_resnet_out)

  return pretrained_pose_resnet

In [None]:
mse_loss_fn = tf.keras.losses.MeanSquaredError()

In [None]:
def mse_loss(heatmap_pred,heatmap_train,heatmap_val_train):

  heatmap_pred = heatmap_val_train * tf.cast(heatmap_pred,tf.float64)
  heatmap_train = heatmap_val_train * heatmap_train

  mse = mse_loss_fn(y_true=heatmap_train,y_pred=tf.cast(heatmap_pred,tf.float64))
  return mse

In [None]:
pose_estimate_optimizer = keras.optimizers.adam_v2.Adam(learning_rate=0.0001)

In [None]:
pose_estimate_finetune_optimizer = keras.optimizers.adam_v2.Adam(learning_rate=0.0000001)

In [None]:
pretrained_pose_resnet = create_pretrained_pose_resnet()

In [None]:
pretrained_pose_resnet.summary()

In [None]:
pose_estimate_checkpoint_dir = "/content/drive/MyDrive/pose_estimate_checkpoints"
checkpoint_prefix = os.path.join(pose_estimate_checkpoint_dir,"ckpt")
checkpoint = tf.train.Checkpoint(optimizer=pose_estimate_optimizer,model=pretrained_pose_resnet)

In [None]:
pose_estimate_finetune_checkpoint_dir = "/content/drive/MyDrive/pose_estimate_finetune_checkpoints"
finetune_checkpoint_prefix = os.path.join(pose_estimate_finetune_checkpoint_dir,"ckpt")
finetune_checkpoint = tf.train.Checkpoint(optimizer=pose_estimate_finetune_optimizer,model=pretrained_pose_resnet)

In [None]:
@tf.function
def train_step(images_batch,heatmap_train_batch,heatmap_val_batch):

    with tf.GradientTape() as pose_estimate_tape:

      pretrained_pose_resnet.trainable = True
      pretrained_pose_resnet.layers[1].trainable = False
      
      heatmap_pred_batch = pretrained_pose_resnet(images_batch,training=True)

      pose_estimate_loss = mse_loss(heatmap_pred=heatmap_pred_batch,heatmap_train=heatmap_train_batch,heatmap_val_train=heatmap_val_batch)

      pose_estimate_gradients = pose_estimate_tape.gradient(pose_estimate_loss, pretrained_pose_resnet.trainable_variables)
      pose_estimate_optimizer.apply_gradients(zip(pose_estimate_gradients, pretrained_pose_resnet.trainable_variables))

    return heatmap_pred_batch,pose_estimate_loss

In [None]:
@tf.function
def finetune_train_step(images_batch,heatmap_train_batch,heatmap_val_batch):

  pretrained_pose_resnet.trainable = True

  for layer in pretrained_pose_resnet.layers[1].layers[:-13]:
    layer.trainable = False

    with tf.GradientTape() as pose_estimate_tape:
      
      heatmap_pred_batch = pretrained_pose_resnet(images_batch,training=True)

      for layer in pretrained_pose_resnet.layers[1].layers[177:]:
        if "bn" in layer.name:
          layer.trainable = False

      pose_estimate_loss = mse_loss(heatmap_pred=heatmap_pred_batch,heatmap_train=heatmap_train_batch,heatmap_val_train=heatmap_val_batch)

      pose_estimate_gradients = pose_estimate_tape.gradient(pose_estimate_loss, pretrained_pose_resnet.trainable_variables)
      pose_estimate_finetune_optimizer.apply_gradients(zip(pose_estimate_gradients, pretrained_pose_resnet.trainable_variables))

    return heatmap_pred_batch,pose_estimate_loss

In [None]:
@tf.function
def cv_step(cv_images_batch,cv_heatmap_train_batch,cv_heatmap_val_batch):

  pretrained_pose_resnet.trainable = False
      
  cv_heatmap_pred_batch = pretrained_pose_resnet(cv_images_batch,training=False)

  pose_estimate_cv_loss = mse_loss(heatmap_pred=cv_heatmap_pred_batch,heatmap_train=cv_heatmap_train_batch,
                                  heatmap_val_train=cv_heatmap_val_batch)

  return cv_heatmap_pred_batch,pose_estimate_cv_loss

In [None]:
epochs = 90
batch_size = 116
cv_batch_size = 149

In [None]:
finetune_epochs = 20

In [None]:
# ORIGINAL FROM HERE:
# https://github.com/microsoft/human-pose-estimation.pytorch/blob/715d29e55f59ae555116542e85ed7175d57120e6/lib/core/evaluate.py
# ------------------------------------------------------------------------------
# Copyright (c) Microsoft
# Licensed under the MIT License.
# Written by Bin Xiao (Bin.Xiao@microsoft.com)
# ------------------------------------------------------------------------------
# Calculates Percentage of Correct Key-points (PCK) accuracy
# A detected joint is considered correct if the distance between the predicted 
# and the true joint is within a certain threshold. 


def get_max_preds(batch_heatmaps):
    '''
    get predictions from score maps
    heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
    '''

    assert isinstance(batch_heatmaps, np.ndarray), 'batch_heatmaps should be numpy.ndarray'
    assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'

    batch_size = batch_heatmaps.shape[0]
    num_joints = batch_heatmaps.shape[1]
    width = batch_heatmaps.shape[3]
    heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
    idx = np.argmax(heatmaps_reshaped, 2)
    maxvals = np.amax(heatmaps_reshaped, 2)

    maxvals = maxvals.reshape((batch_size, num_joints, 1))
    idx = idx.reshape((batch_size, num_joints, 1))

    preds = np.tile(idx, (1, 1, 2)).astype(np.float32)

    preds[:, :, 0] = (preds[:, :, 0]) % width
    preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)

    pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
    pred_mask = pred_mask.astype(np.float32)

    preds *= pred_mask
    return preds, maxvals

In [None]:
def calc_dists(preds, target, normalize):
    preds = preds.astype(np.float32)
    target = target.astype(np.float32)
    dists = np.zeros((preds.shape[1], preds.shape[0]))
    for n in range(preds.shape[0]):
        for c in range(preds.shape[1]):
            if target[n, c, 0] > 1 and target[n, c, 1] > 1:
                normed_preds = preds[n, c, :] / normalize[n]
                normed_targets = target[n, c, :] / normalize[n]
                dists[c, n] = np.linalg.norm(normed_preds - normed_targets)
            else:
                dists[c, n] = -1
    return dists

In [None]:
def dist_acc(dists, thr=0.5):
    ''' Return percentage below threshold while ignoring values with a -1 '''
    dist_cal = np.not_equal(dists, -1)
    num_dist_cal = dist_cal.sum()
    if num_dist_cal > 0:
        return np.less(dists[dist_cal], thr).sum() * 1.0 / num_dist_cal
    else:
        return -1

In [None]:
def accuracy(output, target, hm_type='gaussian', thr=0.5):
    '''
    Calculate accuracy according to PCK,
    but uses ground truth heatmap rather than x,y locations
    First value to be returned is average accuracy across 'idxs',
    followed by individual accuracies
    '''
    idx = list(range(output.shape[1]))
    norm = 1.0
    if hm_type == 'gaussian':
        pred, _ = get_max_preds(output.numpy())
        target, _ = get_max_preds(target)
        h = output.shape[2]
        w = output.shape[3]
        norm = np.ones((pred.shape[0], 2)) * np.array([h, w]) / 10
    dists = calc_dists(pred, target, norm)

    acc = np.zeros((len(idx) + 1))
    avg_acc = 0
    cnt = 0

    for i in range(len(idx)):
        acc[i + 1] = dist_acc(dists[idx[i]],thr=thr)
        if acc[i + 1] >= 0:
            avg_acc = avg_acc + acc[i + 1]
            cnt += 1

    avg_acc = avg_acc / cnt if cnt != 0 else 0
    if cnt != 0:
        acc[0] = avg_acc
    return avg_acc

# The above four functions which you can see are taken from a github repository to implement the calculation of special kind of accuracy metric used for Human Pose Estimation Neural Network. This special kind of accuracy metric is called PCK Accuracy (Percentage of Correct Keypoints Accuracy). So, how it is evaluated ? Well, in short what we do is that for every prediction of the network of shape (64,48,17), we compute the difference between the location of predicted keypoints and location of ground truth keypoints for the 17 heatmaps and if the difference between the locations is less than 0.5 then we consider the predicted keypoint to be detected correctly and count it as correct keypoint detection and similarly we do it for all the other remaining 16 keypoints and then compute the fraction of correctly predicted keypoints out of 17 keypoints. 

# Similarly, we do it for all the images in a batch of 32 images and take the average of the fraction of correct keypoints over all the batch of images. And that is what has been done in the above four functions. 

In [None]:
class pck_accuracy_metric(keras.metrics.Metric):

  def __init__(self,name="pck_accuracy",**kwargs):
    super().__init__(name=name,**kwargs)
    self.pck_avg_accuracy_sum = self.add_weight(name="pck_avg_accuracy_sum",initializer="zeros",dtype="float32")
    self.total_batches = self.add_weight(name="total_batches",initializer="zeros",dtype="int32")

  def update_state(self,y_true,y_pred,sample_weight=None):
    pck_avg_accuracy = accuracy(output=y_pred,target=y_true,thr=0.5)
    self.pck_avg_accuracy_sum.assign_add(pck_avg_accuracy)
    self.total_batches.assign_add(1)

  def result(self):
    return self.pck_avg_accuracy_sum/tf.cast(self.total_batches,tf.float32)

  def reset_state(self):
    self.pck_avg_accuracy_sum.assign(0.0)
    self.total_batches.assign(0)

In [None]:
def lr_scheduler(epoch,lr):
    
    lr = 1e-3

    if epoch > 120:
        lr *= 1e-2
    elif epoch > 90:
        lr *= 1e-1

    return lr

In [None]:
pose_train_acc_metric = pck_accuracy_metric()
pose_val_acc_metric = pck_accuracy_metric()

In [None]:
json_file_handle = open("/content/drive/MyDrive/training_annotations.json")
json_string = json_file_handle.read()
training_annotations = json.loads(json_string)

In [None]:
json_file_handle = open("/content/drive/MyDrive/cv_annotations.json")
json_string = json_file_handle.read()
cv_annotations = json.loads(json_string)

# Please also write the training loop to train the network below. 

In [None]:
def train(epochs):

  train_images = os.listdir("/content/drive/MyDrive/person_train_images")
  cv_images = os.listdir("/content/drive/MyDrive/person_cv_train_images")

  cv_datagen = cv_generator(cv_images,cv_annotations,batch_size=cv_batch_size)

  #callback.on_train_begin()

  for epoch in range(epochs):

    time_step = 1

    #callback.on_epoch_begin(epoch=epoch)

    for images_batch,gt_heatmaps_batch,gt_validities_batch in train_generator(train_images,
                                                                              training_annotations,batch_size):

      heatmaps_pred_batch,pose_estimate_loss = train_step(images_batch,gt_heatmaps_batch,gt_validities_batch)

      if time_step > len(os.listdir("/content/drive/MyDrive/person_train_images"))//batch_size:
        break

      if time_step % 100 == 0:

        pose_train_acc_metric.update_state(gt_heatmaps_batch,heatmaps_pred_batch)
        training_pck_accuracy = pose_train_acc_metric.result()

        """
        cv_time_step = 1
        avg_cv_loss = 0

        for cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch in cv_generator(cv_images,cv_annotations,
                                                                                        batch_size=cv_batch_size):
          
          cv_heatmaps_pred_batch,pose_estimate_cv_loss = cv_step(cv_images_batch,cv_gt_heatmaps_batch,
                                                                 cv_gt_validities_batch)
          
          if cv_time_step > len(os.listdir("/content/drive/MyDrive/person_cv_train_images"))//cv_batch_size:
            break

          pose_val_acc_metric.update_state(cv_gt_heatmaps_batch,cv_heatmaps_pred_batch)
          avg_cv_loss = avg_cv_loss + float(pose_estimate_cv_loss)
          cv_time_step = cv_time_step + 1

        avg_cv_loss = avg_cv_loss/float(cv_time_step)
        cv_pck_accuracy = pose_val_acc_metric.result()
        
        """
        cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch = next(cv_datagen)
        cv_heatmaps_pred_batch,pose_estimate_cv_loss = cv_step(cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch)
        pose_val_acc_metric.update_state(cv_gt_heatmaps_batch,cv_heatmaps_pred_batch)
        cv_pck_accuracy = pose_val_acc_metric.result()

        print("Epoch: {} Time Step: {} Training Loss: {} Training Accuracy: {} Val Loss: {} Val Accuracy: {}".format(epoch,time_step,float(pose_estimate_loss),
                                                                                                                     float(training_pck_accuracy),
                                                                                                                     float(pose_estimate_cv_loss),
                                                                                                                     float(cv_pck_accuracy)))
        
        pose_train_acc_metric.reset_state()
        pose_val_acc_metric.reset_state()

      #print("Epoch: {},Time Step: {}".format(epoch,time_step))

      time_step = time_step + 1

    checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
def finetune_train(epochs):

  train_images = os.listdir("/content/drive/MyDrive/person_train_images")
  cv_images = os.listdir("/content/drive/MyDrive/person_cv_train_images")

  cv_datagen = cv_generator(cv_images,cv_annotations,batch_size=cv_batch_size)

  #callback.on_train_begin()

  for epoch in range(epochs):

    time_step = 1

    #callback.on_epoch_begin(epoch=epoch)

    for images_batch,gt_heatmaps_batch,gt_validities_batch in train_generator(train_images,
                                                                              training_annotations,batch_size):

      heatmaps_pred_batch,pose_estimate_loss = finetune_train_step(images_batch,gt_heatmaps_batch,gt_validities_batch)

      if time_step > len(os.listdir("/content/drive/MyDrive/person_train_images"))//batch_size:
        break

      if time_step % 100 == 0:

        pose_train_acc_metric.update_state(gt_heatmaps_batch,heatmaps_pred_batch)
        training_pck_accuracy = pose_train_acc_metric.result()

        """
        cv_time_step = 1
        avg_cv_loss = 0

        for cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch in cv_generator(cv_images,cv_annotations,
                                                                                        batch_size=cv_batch_size):
          
          cv_heatmaps_pred_batch,pose_estimate_cv_loss = cv_step(cv_images_batch,cv_gt_heatmaps_batch,
                                                                 cv_gt_validities_batch)
          
          if cv_time_step > len(os.listdir("/content/drive/MyDrive/person_cv_train_images"))//cv_batch_size:
            break

          pose_val_acc_metric.update_state(cv_gt_heatmaps_batch,cv_heatmaps_pred_batch)
          avg_cv_loss = avg_cv_loss + float(pose_estimate_cv_loss)
          cv_time_step = cv_time_step + 1

        avg_cv_loss = avg_cv_loss/float(cv_time_step)
        cv_pck_accuracy = pose_val_acc_metric.result()
        
        """
        cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch = next(cv_datagen)
        cv_heatmaps_pred_batch,pose_estimate_cv_loss = cv_step(cv_images_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch)
        pose_val_acc_metric.update_state(cv_gt_heatmaps_batch,cv_heatmaps_pred_batch)
        cv_pck_accuracy = pose_val_acc_metric.result()

        print("Epoch: {} Time Step: {} Training Loss: {} Training Accuracy: {} Val Loss: {} Val Accuracy: {}".format(epoch,time_step,float(pose_estimate_loss),
                                                                                                                     float(training_pck_accuracy),
                                                                                                                     float(pose_estimate_cv_loss),
                                                                                                                     float(cv_pck_accuracy)))
        
        pose_train_acc_metric.reset_state()
        pose_val_acc_metric.reset_state()

      #print("Epoch: {},Time Step: {}".format(epoch,time_step))

      time_step = time_step + 1

    finetune_checkpoint.save(file_prefix = finetune_checkpoint_prefix)

In [None]:
train(epochs)

In [None]:
latest_checkpoint = tf.train.latest_checkpoint("/content/drive/MyDrive/pose_estimate_checkpoints")

In [None]:
latest_checkpoint

In [None]:
finetune_train(finetune_epochs)

In [None]:
finetune_latest_checkpoint = tf.train.latest_checkpoint("/content/drive/MyDrive/pose_estimate_finetune_checkpoints")

In [None]:
finetune_latest_checkpoint

In [None]:
finetune_checkpoint.restore(finetune_latest_checkpoint)

In [None]:
cv_images = os.listdir("/content/drive/MyDrive/person_cv_train_images")
cv_datagen = cv_generator(cv_images,cv_annotations,batch_size=cv_batch_size)

In [None]:
def plotting_data(test_img,heatmap,test_validity):

  test_img = test_img[0]

  mean=np.array([0.485, 0.456, 0.406])
  std=np.array([0.229, 0.224, 0.225])

  test_img = test_img*std + mean
  heatmap = np.sum(heatmap[0],axis=0)

  fig = plt.figure(2,figsize=(20,20))
  plt.gray()

  ax1 = fig.add_subplot(121)
  ax1.imshow(test_img)

  ax2 = fig.add_subplot(122)
  ax2.imshow(heatmap)
  
  plt.show()

In [None]:
cv_imgs_batch,cv_gt_heatmaps_batch,cv_gt_validities_batch = next(cv_datagen)
single_cv_img = cv_imgs_batch[0,:,:,:]
single_cv_img = single_cv_img.reshape(1,single_cv_img.shape[0],single_cv_img.shape[1],single_cv_img.shape[2])
single_cv_gt_heatmap = cv_gt_heatmaps_batch[0,:,:,:]
single_cv_gt_heatmap = single_cv_gt_heatmap.reshape(1,single_cv_gt_heatmap.shape[0],single_cv_gt_heatmap.shape[1],single_cv_gt_heatmap.shape[2])
single_cv_gt_validity = cv_gt_validities_batch[0,:,:,:]
single_cv_gt_validity = single_cv_gt_validity.reshape(1,single_cv_gt_validity.shape[0],single_cv_gt_validity.shape[1],single_cv_gt_validity.shape[2])

In [None]:
single_cv_img.shape

In [None]:
single_cv_gt_heatmap.shape

In [None]:
single_cv_gt_validity.shape

In [None]:
pretrained_pose_resnet.trainable = False
cv_img_heatmap_pred = pretrained_pose_resnet(single_cv_img,training=False)

In [None]:
cv_img_heatmap_pred

In [None]:
print("GROUNDTRUTH HEATMAP")
plotting_data(single_cv_img,single_cv_gt_heatmap,single_cv_gt_validity)

In [None]:
print("PREDICTED HEATMAP")
plotting_data(single_cv_img,cv_img_heatmap_pred.numpy(),single_cv_gt_validity)

In [None]:
train_images = os.listdir("/content/drive/MyDrive/person_train_images")
train_datagen = train_generator(train_images,training_annotations,batch_size=batch_size)

In [None]:
train_imgs_batch,train_gt_heatmaps_batch,train_gt_validities_batch = next(train_datagen)
single_train_img = train_imgs_batch[0,:,:,:]
single_train_img = single_train_img.reshape(1,single_train_img.shape[0],single_train_img.shape[1],
                                            single_train_img.shape[2])
single_train_gt_heatmap = train_gt_heatmaps_batch[0,:,:,:]
single_train_gt_heatmap = single_train_gt_heatmap.reshape(1,single_train_gt_heatmap.shape[0],
                                                          single_train_gt_heatmap.shape[1],
                                                          single_train_gt_heatmap.shape[2])
single_train_gt_validity = train_gt_validities_batch[0,:,:,:]
single_train_gt_validity = single_train_gt_validity.reshape(1,single_train_gt_validity.shape[0],
                                                            single_train_gt_validity.shape[1],
                                                            single_train_gt_validity.shape[2])

In [None]:
print("GROUNDTRUTH HEATMAP")
plotting_data(single_train_img,single_train_gt_heatmap,single_train_gt_validity)

In [None]:
pretrained_pose_resnet.trainable = False
train_img_heatmap_pred = pretrained_pose_resnet(single_train_img,training=False)

In [None]:
print("PREDICTED HEATMAP")
plotting_data(single_train_img,train_img_heatmap_pred.numpy(),single_train_gt_validity)