Imports

In [None]:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import tensorflow.compat.v1.keras
from tensorflow.compat.v1.keras.layers import *
from tensorflow.compat.v1.keras.models import *


import numpy as np
import cv2
import glob
import tensorflow.compat.v1.keras.backend as K

import sys

Importing a single image such that we can TEST the network (find its output sizes etc etc)

we show two different hourglasses (one from KNEEL paper and one from our paper, the latter which is used for final results) and use the entry and exit blocks from the cited KNEEL paper (A Tiulpin et al)

In [None]:
def HMP_Block(input_size,output_size):
  m = output_size
  n = input_size[-1]
  inputs = Input((None,None,n))

  padded_inputs = ZeroPadding2D(padding=(1, 1))(inputs)
  x = Conv2D(int(m/2),kernel_size = (3,3), activation = 'relu', name = "HMP_1")(padded_inputs)
  x = BatchNormalization()(x)
  y = ZeroPadding2D(padding=(1, 1))(x)
  y = Conv2D(int(m/4),kernel_size = (3,3), activation = 'relu',name = "HMP_2")(y)
  y = BatchNormalization()(y)

  z = ZeroPadding2D(padding=(1, 1))(y)
  z = Conv2D(int(m/4),kernel_size = (3,3), activation = 'relu',name = "HMP_3")(z)
  z = BatchNormalization()(z)

  concat = Concatenate()([x,y,z])


  #must be here to make block versatile to input sizes.
  if (n != m):
    skip = Conv2D(m,kernel_size = (1,1), activation = 'relu', name = "HMP_skip")(inputs) #don't need to pad since if it convolutes it's 1x1 anyway
  else:
    skip = Lambda(lambda x: x)(inputs)


  addition = Add()([skip,concat])

  model = Model(inputs,addition)
  return model


def Hourglass_Block(input_size,depth,width,upsampling):
  mid_output_list = []
  up = upsampling
  inputs = Input(input_size)
  #downsampling loop
  for i in range(depth):
    #first section
    if i == 0:
      pooled = MaxPooling2D(pool_size = 2)(inputs)
    else:
      #should be the output of the previous end of start
      # print("end of start",end_of_start)
      pooled = MaxPooling2D(pool_size = 2)(end_of_start)

    #three blocks now
    x = HMP_Block(pooled.shape[1:],width*4)(pooled)
    x = HMP_Block(x.shape[1:],width*4)(x)
    end_of_start = HMP_Block(x.shape[1:],width*4)(x)

    #now we begin the middle section
    #each iteration has its own middle section
    x = HMP_Block(end_of_start.shape[1:],width*4)(end_of_start)
    if i != depth-1: #the bottom layer has one less HMP
      x = HMP_Block(x.shape[1:],width*4)(x)
    end_of_mid = HMP_Block(x.shape[1:],width*8)(x)
    mid_output_list.append(end_of_mid) #we use these results in addition in the upsample section

  mid_output_list.reverse() #list is now in reverse order

  #upsampling loop
  final_j = len(mid_output_list) -1 #how the indexing and length works
  for j, output_of_mid in enumerate(mid_output_list):
    if j == final_j:
      break #the way that the algorithm works, at the current layer, it upsample and adds to the above layer,
      #so when we reach the top, the output of the addition of the top layer and the upsample is already known.
      #This is the outpout of the hourglass
    
    desired_x, desired_y = mid_output_list[j+1].shape[1:3] #find the size of the layer above's image
    if j == 0:
      current_x,current_y = output_of_mid.shape[1:3] #current layer's image shape
    else:
      current_x,current_y = addition_after.shape[1:3] #current layer's image shape

    upsample_x_factor = int(int(desired_x)/int(current_x))
    upsample_y_factor = int(int(desired_y)/int(current_y))


    if j == 0:
      upsampled = UpSampling2D(size = (upsample_x_factor,upsample_y_factor),interpolation = up)(output_of_mid) #this inputs[1:2] part may be wrong.
    else:
      upsampled = UpSampling2D(size = (upsample_x_factor,upsample_y_factor),interpolation = up)(addition_after) #recursive


    #now we have the initial "seed" upsample, we can begin the addition process
    next_mid_out = mid_output_list[j+1] #layer above's mid output


    """
    So since Keras' upsampling uses an integer multiplying factor,
    sometimes the upsampled ikmage will not be the exact correct size 
    (usually one off), therefore we should assymetrically pad it
    """
    if upsampled.shape[1] != next_mid_out.shape[1]:
      upsampled = ZeroPadding2D(padding=((1, 0),(0,0)))(upsampled) #pad the top
    if upsampled.shape[2] != next_mid_out.shape[2]:
      upsampled = ZeroPadding2D(padding=((0, 0),(0,1)))(upsampled) #pad the right

    addition = Add()([upsampled,next_mid_out])

    if j != final_j -1: #when we're on the second-top layer,there is no 8N block after the upsample tok the above layer.
      addition_after = HMP_Block(addition.shape[1:],width*8)(addition)#after the 8N residual block
    else:
      outputs = addition #for unique output

  model = Model(inputs,outputs)
  return model

"""
In order to be able to stack the hourglasses, they must be symmetric, as they are in the
stacked paper - however, in the KNEEL paper they are not symmetric and hence must be
adjusted here. This is the difference with our hourglass
"""

def Symmetric_Hourglass_Block(input_size,depth,width,upsampling,name="symmetricHourglass"):
  mid_output_list = []
  up = upsampling
  inputs = Input(input_size)
  #downsampling loop
  for i in range(depth):
    #first section
    if i == 0: #the image input to the hourglass
      x = HMP_Block(input_size,width*4)(inputs)
      x = HMP_Block(x.shape[1:],width*4)(x)
      input_image_mid_end = HMP_Block(x.shape[1:],width*8)(x)
      mid_output_list.append(input_image_mid_end)
      pooled = MaxPooling2D(pool_size = 2)(input_image_mid_end) #now we just start the general looping for downsampling
    else:
      #should be the output of the previous end of start
      pooled = MaxPooling2D(pool_size = 2)(end_of_start)

    #three blocks now
    x = HMP_Block(pooled.shape[1:],width*4)(pooled)
    x = HMP_Block(x.shape[1:],width*4)(x)
    end_of_start = HMP_Block(x.shape[1:],width*4)(x)

    #now we begin the middle section
    #each iteration has its own middle section
    x = HMP_Block(end_of_start.shape[1:],width*4)(end_of_start)
    if i != depth-1: #the bottom layer has one less HMP
      x = HMP_Block(x.shape[1:],width*4)(x)
    end_of_mid = HMP_Block(x.shape[1:],width*8)(x)
    mid_output_list.append(end_of_mid) #we use these results in addition in the upsample section

  mid_output_list.reverse() #list is now in reverse order

  #upsampling loop
  final_j = len(mid_output_list) -1 #how the indexing and length works
  for j, output_of_mid in enumerate(mid_output_list):
    if j == final_j:
      break #the way that the algorithm works, at the current layer, it upsample and adds to the above layer,
      #so when we reach the top, the output of the addition of the top layer and the upsample is already known.
      #This is the outpout of the hourglass
    
    desired_x, desired_y = mid_output_list[j+1].shape[1:3] #find the size of the layer above's image
    if j == 0:
      current_x,current_y = output_of_mid.shape[1:3] #current layer's image shape
    else:
      current_x,current_y = addition_after.shape[1:3] #current layer's image shape

    upsample_x_factor = int(int(desired_x)/int(current_x))
    upsample_y_factor = int(int(desired_y)/int(current_y))


    if j == 0:
      upsampled = UpSampling2D(size = (upsample_x_factor,upsample_y_factor),interpolation = up)(output_of_mid) #this inputs[1:2] part may be wrong.
    else:
      upsampled = UpSampling2D(size = (upsample_x_factor,upsample_y_factor),interpolation = up)(addition_after) #recursive


    #now we have the initial "seed" upsample, we can begin the addition process
    next_mid_out = mid_output_list[j+1] #layer above's mid output


    """
    So since Keras' upsampling uses an integer multiplying factor,
    sometimes the upsampled ikmage will not be the exact correct size 
    (usually one off), therefore we should assymetrically pad it
    """
    if upsampled.shape[1] != next_mid_out.shape[1]:
      upsampled = ZeroPadding2D(padding=((1, 0),(0,0)))(upsampled) #pad the top
    if upsampled.shape[2] != next_mid_out.shape[2]:
      upsampled = ZeroPadding2D(padding=((0, 0),(0,1)))(upsampled) #pad the right

    addition = Add()([upsampled,next_mid_out])

    if j != final_j -1: #when we're on the second-top layer,there is no 8N block after the upsample tok the above layer.
      addition_after = HMP_Block(addition.shape[1:],width*8)(addition)#after the 8N residual block
    else:
      outputs = addition #for unique output

  model = Model(inputs,outputs,name=name)
  return model


def KNEEL_Entry_Block(input_size,width,name="entryBlock"):
  inputs = Input(input_size)
  x = Conv2D(width,kernel_size = (7,7), activation = 'relu', name = "initial_conv")(inputs)
  x = BatchNormalization()(x)
  x = HMP_Block(x.shape[1:],2*width)(x)
  x = MaxPooling2D(pool_size = 2)(x)
  x = HMP_Block(x.shape[1:],2*width)(x)
  x = HMP_Block(x.shape[1:],2*width)(x)
  x = HMP_Block(x.shape[1:],4*width)(x)
  outputs = x
  model = Model(inputs,outputs,name=name)
  return model

def Soft_Argmax(input_size, beta = 1,name="softArgmax"):
   #inputs in form: batch size, height, width, channels
    inputs = Input(input_size)
    new_inputs = inputs * beta
    batch, height, width, channels = new_inputs.shape

    #transpose tensor such that the channels are first:
    transpose = tf.transpose(new_inputs,[0,3,1,2]) #batch, channels, height, width

    #now reshape the system
    #first source has different shape to second one - going for the deep pipeline as it makes more sense
    reshape = K.reshape(transpose,(-1,channels, height*width)) #essentially flattening image - batch needs -1 as it is unknown
    softmax = tf.cast(K.reshape(K.softmax(reshape, axis = 2),(-1,channels,height,width)),dtype=tf.float64) #softmax the image coordinate axis

    original_size = K.reshape(softmax,(-1,channels,height,width))
    original = tf.transpose(original_size, [0,2,3,1]) #reshape to original size - batch,height,width,channels

    #create weight tensor of ones the same shape as image
    #using channels first so that the H*W is at the end!
    weights = K.ones_like(original_size,dtype=tf.float64)

    weights_x = tf.math.divide(tf.range(width),width)
    weights_x = tf.cast(tf.math.multiply(weights_x,weights),dtype=tf.float64) #element-wise multiplication

    weights_y = tf.math.divide(tf.range(height),height)
    weights_y = tf.transpose(tf.math.multiply(weights_y,tf.transpose(weights,[0,1,3,2])),[0,1,3,2]) # #NEED TO ADD SOME TRANSPOSES IN ??

    approx_x = K.reshape(tf.math.multiply(softmax,weights_x),(-1,channels, height*width))
    approx_x = tf.reduce_sum(approx_x,2) #sum and reduce the second axis, so we have batch x channels*1

    approx_y = K.reshape(tf.math.multiply(softmax,weights_y),(-1,channels, height*width))
    approx_y = tf.reduce_sum(approx_y,2)

    #want to make it of size btahc x channels * 2 (for coordinates), therefore:
    amount_of_points = int(approx_x.shape[-1])
    approx_x = tf.expand_dims(approx_x,-1) #makes it so we have a column for x and a column for y, and then we can flatten for x1,y1,x2,y2,...
    approx_y = tf.expand_dims(approx_y,-1)
    conc_x_y = tf.concat([approx_x,approx_y],2)
    flattened_coords = tf.reshape(conc_x_y,(-1,amount_of_points * 2)) #multiply by 2 for x and y
    outputs = flattened_coords
    model = Model(inputs,outputs,name=name)
    return model


def KNEEL_Output_Block(input_size,width,number_of_outputs, name="outputBlock",drop_rate=0.25):
  inputs = Input(input_size)
  x = SpatialDropout2D(drop_rate)(inputs)
  x = tf.cast(x,dtype = tf.float32)
  x = Conv2D(8*width,(1,1),activation = 'relu')(x)
  x = BatchNormalization()(x)
  x = SpatialDropout2D(drop_rate)(x)
  x = tf.cast(x,dtype = tf.float32)
  x = Conv2D(4*width,(1,1),activation = 'relu')(x)
  x = BatchNormalization()(x)
  outputs = x #removed soft argmax and final conv
  model = Model(inputs,outputs,name=name)
  return model


def KNEEL(input_size,number_of_outputs,depth,width,upsampling):
    up = upsampling
    inputs = Input(input_size)
    cast_inputs = tf.cast(inputs,tf.float32)
    x = KNEEL_Entry_Block(input_size,width = width)(cast_inputs)
    x = Hourglass_Block = Hourglass_Block(x.shape[1:],
                                          depth = depth,
                                          width = width,
                                          upsampling = upsampling)(x)
    x = KNEEL_Output_Block(input_size = x.shape[1:],width = width ,number_of_outputs=number_of_outputs)(x)
    outputs = x
    model = Model(inputs,outputs)
    return model


def Symmetric_PBL_Network(input_size,number_of_outputs,depth,width,upsampling,name="symmetricPBL"):
    up = upsampling
    inputs = Input(input_size)
    cast_inputs = tf.cast(inputs,tf.float32)
    x = KNEEL_Entry_Block(input_size,width = width)(cast_inputs)
    x = Hourglass_Block = Symmetric_Hourglass_Block(x.shape[1:],
                                                    depth = depth,
                                                    width = width,
                                                    upsampling = upsampling)(x)
    x = KNEEL_Output_Block(input_size = x.shape[1:],width = width ,number_of_outputs=number_of_outputs)(x)
    x = Conv2D(number_of_outputs,(1,1),input_size=x.shape[1:])(x)
    x = Soft_Argmax(x.shape[1:])(x)
    outputs = x
    model = Model(inputs,outputs,name=name)
    return model
