In [None]:
import tensorflow as tf
import numpy as np

import scipy.io

import os
import matplotlib.pyplot as plt
from PIL import Image
import re

In [None]:
!wget 'https://sam.johnson.io/research/lsp_dataset.zip'

In [None]:
!unzip lsp_dataset.zip

In [None]:

def get_data(start=0,noi=10):

  master_dir = '/content/images/'

  imgs_files = os.listdir(master_dir)[start:noi]
  all_imgs = []
  coords = []

  mat = scipy.io.loadmat('/content/joints.mat')
  joint_data = np.array(mat['joints'])

  x=[]
  y=[]

  for each_file in imgs_files:

    res = re.findall( r'([a-z]*)([0-9]*).jpg',each_file )
    idx = int(res[0][-1])
    index = idx-1

    img = Image.open( master_dir+each_file )
    w,h = img.size

    img_arr = np.array( img.resize((220,220)) )/255.0
    all_imgs.append( img_arr )

    
    coords.append( np.concatenate( (joint_data[0,:,index]/w , joint_data[1,:,index]/h),axis=0 ) )

  return all_imgs,coords



In [None]:
imgs,coords = get_data()

In [None]:
print( len(coords) )
print( coords[0] )

In [None]:
# View some sample images
for j in range(10):

  show = plt.imshow(imgs[j])
  for i in range( 14 ):
    
    plt.plot( int(coords[j][i]*220),int(coords[j][14+i]*220) ,marker='o' )
  
  plt.show()

In [None]:
def shuffle_data(x,y):

  shuffle_x = []
  shuffle_y = []

  perm_index = np.random.permutation( len(y) )

  for i in perm_index:
    shuffle_x.append( x[i] )
    shuffle_y.append( y[i] )

  return shuffle_x,shuffle_y

In [None]:
init = tf.keras.initializers.he_uniform(seed=5)
inputs = tf.keras.layers.Input((220,220,3))

out = tf.keras.layers.Conv2D(32,(5,5),strides=(1,1),activation="relu",kernel_regularizer=tf.keras.regularizers.l2(0.001),kernel_initializer=init)(inputs)
out = tf.keras.layers.BatchNormalization()(out)

out = tf.keras.layers.Conv2D(64,(3,3),strides=(1,1),activation="relu",kernel_regularizer=tf.keras.regularizers.l2(0.001),kernel_initializer=init)(out)
out = tf.keras.layers.BatchNormalization()(out)
out = tf.keras.layers.MaxPool2D((2,2))(out)

out = tf.keras.layers.Conv2D(128,(3,3),strides=(1,1),activation="relu",kernel_regularizer=tf.keras.regularizers.l2(0.001),kernel_initializer=init)(out)
out = tf.keras.layers.BatchNormalization()(out)
out = tf.keras.layers.MaxPool2D((2,2))(out)

out = tf.keras.layers.Conv2D(128,(3,3),strides=(1,1),activation="relu",kernel_regularizer=tf.keras.regularizers.l2(0.001),kernel_initializer=init)(out)
out = tf.keras.layers.BatchNormalization()(out)
out = tf.keras.layers.MaxPool2D((2,2))(out)

out = tf.keras.layers.Conv2D( 256,(3,3),activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init )(out)
out = tf.keras.layers.BatchNormalization()(out)

out = tf.keras.layers.Conv2D(256,(3,3),activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init )(out)
out = tf.keras.layers.MaxPool2D((2,2))(out)

out = tf.keras.layers.Conv2D(512,(3,3),activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init )(out)

out = tf.keras.layers.Conv2D(512,(3,3),activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init)(out)
out = tf.keras.layers.MaxPool2D((2,2))(out)

out = tf.keras.layers.Flatten()(out)

out = tf.keras.layers.Dense(512,activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init )(out)
out = tf.keras.layers.Dense(256,activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.001) ,kernel_initializer=init )(out)

outputs = tf.keras.layers.Dense(28,kernel_initializer=init)(out)

model = 0
model = tf.keras.Model(inputs,outputs)
model.summary()

In [None]:
model = tf.keras.models.load_model( '/content/deepPose (1).h5')

In [None]:
cost_fn = tf.keras.losses.MeanSquaredError()

# Custom defined cost 
# def cost_fn(y_true,y_pred):
#   locost = (1/y_true.shape[0])*tf.reduce_sum( tf.square(y_true- tf.cast(y_pred,tf.float64) ) )
#   return locost

In [None]:
def val_test( model,test_x,test_y ):

  mean = tf.keras.metrics.Mean()
  mean.reset_states()
  # Batch size
  b_size = 8
  test_nob = int( len(test_x)/b_size )
  test_nob

  for n in range(test_nob):

    x_single = tf.convert_to_tensor( test_x[n*b_size:(n+1)*b_size] )
    y_single = tf.convert_to_tensor( test_y[n*b_size:(n+1)*b_size] )

    test_pred = model(x_single)
    pred_hot = cost_fn(y_single*220,test_pred*220)

    mean.update_state( pred_hot )    

  return mean.result()

In [None]:
test_img,test_coods = get_data(1800,1999)

In [None]:
len(test_img)

In [None]:
test_loss = val_test(model,test_img,test_coods)

In [None]:
# Loss on untrained model
print(test_loss)

In [None]:
epochs = 30
l_rate = 0.0001
# MAX 2000
num_of_ex = 1800 
batch_size = 8
nob = int(num_of_ex/batch_size) 

optimizer = tf.keras.optimizers.RMSprop(l_rate)
mean = tf.keras.metrics.Mean()

with tf.device('/device:GPU:0'):

  data_img,coords = get_data(0,num_of_ex)

  print( len(data_img) )
  print( data_img[0].shape )
  
  print( len(coords) )
  print( len(coords[0]) )


  for e in range(epochs):

    ctr = 0
    mean.reset_states()

    shuffle_x,shuffle_y = shuffle_data( data_img,coords )

    for b in range( nob ):

      x_batch = tf.convert_to_tensor( shuffle_x[ b*batch_size:(b+1)*batch_size ] )
      y_batch = tf.convert_to_tensor( shuffle_y[ b*batch_size:(b+1)*batch_size ] )

      with tf.GradientTape() as tape:

        pred = model(x_batch)
        
        # estimation of size to be given
        total_loss = cost_fn( y_batch*220 ,pred*220 )
    
      mean.update_state( total_loss )
      grad = tape.gradient(total_loss,model.trainable_variables)
      optimizer.apply_gradients( zip(grad,model.trainable_variables) )

      if ctr%200==0:
        
        show = plt.imshow( x_batch[0].numpy() )
        for i in range( 14 ):
          
          plt.plot( int( (pred[0][i])*220 ) , int( (coords[0][14+i])*220) ,marker='o' )

        plt.show()
        
        show = plt.imshow( x_batch[0].numpy() )
        for i in range( 14 ):
          
          plt.plot( int( (y_batch[0][i])*220 ) , int( (y_batch[0][14+i])*220) ,marker='o' )
        plt.show()

        print( "loss is ==>  {}".format(mean.result()) )

      ctr = ctr+1

    print("Train Loss is === >  {}".format(mean.result()))



In [None]:
model.save('deepPose.h5')

In [None]:
model.trainable_variables

In [None]:
test_loss = val_test(model,test_img,test_coods)
print("Test  Loss is === >  {}".format(test_loss))

In [None]:
def predict_one(img_arr):

  tf_img = tf.convert_to_tensor(img_arr)
  pred_one = model(img_arr)
  print(pred_one.shape)

  show = plt.imshow( img_arr[0] )
  for i in range( 14 ):
    
    plt.plot( int( (pred_one[0][i])*220 ) , int( (pred_one[0][14+i])*220) ,marker='o' )

  plt.show()
  
  show = plt.imshow( img_arr[0] )
  plt.show()

  return


In [None]:
# test_img,test_coods = get_data(1800,1999)
np_img = np.expand_dims( data_img[10] , 0 )
print(np_img.shape)
predict_one(np_img)