In [None]:
Results_path = "Your_Results_path"

In [None]:
!nvidia-smi

In [None]:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

In [None]:
# import the necessary packages
from imutils import paths
import imutils
import json
import time
import cv2
import os
import math
import numpy as np
import pandas as pd
import PIL
from PIL import Image
import random
import h5py
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten, GlobalAveragePooling2D
from tensorflow.keras.layers import AveragePooling2D
from tensorflow.keras.callbacks import LearningRateScheduler, EarlyStopping
from tensorflow.keras.layers import UpSampling2D
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Dropout, LeakyReLU, BatchNormalization, Input, Concatenate, Activation, concatenate, Input
from keras.initializers import RandomNormal
from tensorflow.keras.models import Model, load_model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.utils import plot_model

### Loading SpeakingFaces dataset

In [None]:
# function to correct landmarks' order after mirroring

def correct_landmarks_order(pts_mirr):
    # pts_mirr - list of landmarks for one face

	pts_mirr_c = []

	# chin
	pts_mirr_c.append(pts_mirr[16])
	pts_mirr_c.append(pts_mirr[15])
	pts_mirr_c.append(pts_mirr[14])
	pts_mirr_c.append(pts_mirr[13])
	pts_mirr_c.append(pts_mirr[12])
	pts_mirr_c.append(pts_mirr[11])
	pts_mirr_c.append(pts_mirr[10])
	pts_mirr_c.append(pts_mirr[9])
	pts_mirr_c.append(pts_mirr[8])
	pts_mirr_c.append(pts_mirr[7])
	pts_mirr_c.append(pts_mirr[6])
	pts_mirr_c.append(pts_mirr[5])
	pts_mirr_c.append(pts_mirr[4])
	pts_mirr_c.append(pts_mirr[3])
	pts_mirr_c.append(pts_mirr[2])
	pts_mirr_c.append(pts_mirr[1])
	pts_mirr_c.append(pts_mirr[0])

	# left eyebrow
	pts_mirr_c.append(pts_mirr[26])
	pts_mirr_c.append(pts_mirr[25])
	pts_mirr_c.append(pts_mirr[24])
	pts_mirr_c.append(pts_mirr[23])
	pts_mirr_c.append(pts_mirr[22])

	# right eyebrow
	pts_mirr_c.append(pts_mirr[21])
	pts_mirr_c.append(pts_mirr[20])
	pts_mirr_c.append(pts_mirr[19])
	pts_mirr_c.append(pts_mirr[18])
	pts_mirr_c.append(pts_mirr[17])

	# nose bridge
	pts_mirr_c.append(pts_mirr[27])
	pts_mirr_c.append(pts_mirr[28])
	pts_mirr_c.append(pts_mirr[29])
	pts_mirr_c.append(pts_mirr[30])

	# nose tip
	pts_mirr_c.append(pts_mirr[35])
	pts_mirr_c.append(pts_mirr[34])
	pts_mirr_c.append(pts_mirr[33])
	pts_mirr_c.append(pts_mirr[32])
	pts_mirr_c.append(pts_mirr[31])

	# left eye
	pts_mirr_c.append(pts_mirr[45])
	pts_mirr_c.append(pts_mirr[44])
	pts_mirr_c.append(pts_mirr[43])
	pts_mirr_c.append(pts_mirr[42])
	pts_mirr_c.append(pts_mirr[47])
	pts_mirr_c.append(pts_mirr[46])

	# right eye
	pts_mirr_c.append(pts_mirr[39])
	pts_mirr_c.append(pts_mirr[38])
	pts_mirr_c.append(pts_mirr[37])
	pts_mirr_c.append(pts_mirr[36])
	pts_mirr_c.append(pts_mirr[41])
	pts_mirr_c.append(pts_mirr[40])

	# lips
	pts_mirr_c.append(pts_mirr[50])
	pts_mirr_c.append(pts_mirr[49])
	pts_mirr_c.append(pts_mirr[48])
	pts_mirr_c.append(pts_mirr[51])
	pts_mirr_c.append(pts_mirr[52])
	pts_mirr_c.append(pts_mirr[53])

	return pts_mirr_c

In [None]:
# path to the dataset
Im_type = 'gray'
datasetPath = './D5050/Images_'+ Im_type +'/'



# original image size and intended image size
#H = 348
#W = 464
#H = 768
#W = 1024
H = 512
W = 640

h = 256
w = 256

# number of facial landmarks
KEYPOINTS = 54


In [None]:
# function to import dataset
def import_data(t,s):
    # s - train / val / test
    # t- gray/iron/arrays
    # list to store imported data
    images = []

    masks =[]
    annotations =[]
    #a,b,c,d=[],[],[],[]
    extn_map={"arrays":'.npy',"gray":'.jpg',"iron":'.png'}
    # extract paths to json files
    # we use grayscaled images for landmark prediction
    #jsonFolder = os.path.join(datasetPath, "gray", s, 'json')
    #jsonPaths = list(paths.list_files(jsonFolder, validExts="json"))
    #jsonPaths = sorted(jsonPaths)
    #datesetPath='/thermal/MyDrive/dataset'
    labelsFolder=os.path.join(datasetPath,s)
    #labelsFolder='/thermal/MyDrive/dataset'
    txtPaths = list(paths.list_files(labelsFolder, validExts="txt"))
    txtPaths = sorted(txtPaths)

    # loop over the json files
    for ind, txtPath in enumerate(txtPaths, 1):

        print("[INFO] Processing {} file ({}/{})".format(txtPath.split("/")[-1], ind, len(txtPaths)))

        # opening the text file
        f = open(txtPath,)
        f=f.readlines()

        # loading the image and converting it to grayscale
        if t=="gray":
          imagePath = txtPath.replace('labels','images')
          imagePath = imagePath.replace('.txt', '.jpg')
          image = cv2.imread(imagePath)
          image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        if t=="iron":
          imagePath = txtPath.replace('labels','images')
          imagePath = imagePath.replace('.txt', '.png')
          image = cv2.imread(imagePath)
          image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        if t=="arrays":
          imagePath = txtPath.replace('labels','images')
          imagePath = imagePath.replace('.txt', '.npy')
          image=np.load(imagePath)
          #image = cv2.imread(imagePath)
          #image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # loading and processing the mask
        maskPath =imagePath.replace('images','masks_bb2')
        maskPath =maskPath.replace(extn_map[t],'.png')
        mask = cv2.imread(maskPath)
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY)
        mask = mask.astype('float32')
        mask = mask / 255.0
        # mirrored mask
        mask_mirr = cv2.flip(mask, flipCode=1)

        # iterating through the shapes
        landmarks = []


        land=[]
        for point in f:
            point=point.split(' ')
            x=point[1]
            y=point[2]
            (x,y)=(W*float(x),H*float(y))

            land.append([y, x])
        land = np.array(land)

        #land = np.array(f['landmarks']['points'])

        # extracting bounding box by padding landmarks
        ys = int(np.min(land[:,0]))-40
        ye = int(np.max(land[:,0]))+5
        xs = int(np.min(land[:,1]))-5
        xe = int(np.max(land[:,1]))+5

        (xs, ys, xe, ye) = (int(xs), int(ys), int(xe), int(ye))

        # crop image to bounding box, resize, normalize
        crop_image = image[ys:ye+1, xs:xe+1]
        crop_image = cv2.resize(crop_image, (w, h) )
        crop_image = crop_image.astype('float32')
        crop_image = crop_image / 255.0

        # mirror the processed image
        crop_image_flip = cv2.flip(crop_image, flipCode=1)

        for point in f:
            point=point.split(' ')
            x=point[1]
            y=point[2]
            (x,y)=(float(x)*W,float(y)*H)



            #adjust landmarks to bounding box and normalize
            (crop_x, crop_y) = (x-xs, y-ys)
            (crop_x, crop_y) = (crop_x / (xe-xs), crop_y / (ye-ys))

            landmarks.append([crop_y, crop_x])




        landmarks = np.array(landmarks)

        # mirror the landmarks
        landmarks_mirr = landmarks.copy()
        landmarks_mirr[:,1] = 1 - landmarks_mirr[:,1]
        landmarks_mirr = correct_landmarks_order(landmarks_mirr.tolist())

        # flatten the landmarks
        landmarks = landmarks.flatten()
        landmarks_mirr = np.array(landmarks_mirr).flatten()

        # store imported and processed data
        images.append(crop_image)
        annotations.append(landmarks)
        masks.append(mask)

        images.append(crop_image_flip)
        annotations.append(landmarks_mirr)
        masks.append(mask_mirr)


    # converting to numpy arrays
    # expanding image dimensions from (N, h, w) to (N, h, w, 1)
    images = np.expand_dims(np.array(images), axis = 3)
    masks = np.expand_dims(np.array(masks), axis = 3)
    annotations = np.array(annotations)

    return images, masks, annotations

In [None]:
#Preparing Training set
img_train,bmask_train,l_train=import_data(Im_type,'train')

In [None]:
#Preparing val set
img_val,bmask_val,l_val=import_data(Im_type,'val')

In [None]:
#Preparing Test set
img_test,bmask_test,l_test=import_data(Im_type,'test')

In [None]:
# sanity check
print(img_train.shape)
print(img_val.shape)
print(img_test.shape)
print(l_train.shape)
print(l_val.shape)
print(l_test.shape)
print(bmask_train.shape)
print(bmask_val.shape)
print(bmask_test.shape)

In [None]:
# visualize to check if images were correctly uploaded

def visualize(image, landmarks, mask):

    fig, axs = plt.subplots(1, 2, figsize=(10,5))

    axs[0].imshow(image[:,:,0], cmap='gray')
    axs[1].imshow(mask[:,:,0], cmap='gray')

    keys = landmarks.copy().reshape(KEYPOINTS,2)

    axs[0].plot(keys[:,1]*w, keys[:,0]*h, 'gD', markersize=3)

In [None]:
for i in range(50):
    visualize(img_train[i], l_train[i], bmask_train[i])

### Loading RWTH-Aachen dataset (additional test set)

In [None]:
# paths to the dataset
jsonFolder = "./rwth_aachen/FaceDB_PNG_2935/"
jsonPaths = list(paths.list_files(jsonFolder, validExts="ljson"))
jsonPaths = sorted(jsonPaths)

# original image size
H = 768
W = 1024

# list to store read information
l_aachen = []
img_aachen = []

In [None]:
# loop over the json files
for ind, jsonPath in enumerate(jsonPaths, 1):

    print("[INFO] Processing {} file ({}/{})".format(jsonPath.split("/")[-1], ind, len(jsonPaths)))

    # opening the json file
    f = open(jsonPath,)

    # returns the json object as a dictionary
    data = json.load(f)

    # extracting filename
    filename = jsonPath.split("/")[-1]
    filename = filename.split(".ljson")[0]
    filename_img = os.path.join(jsonFolder, "{}.png".format(filename))

    # subject id
    subj = int(filename.split('sub0')[1].split('_')[0])

    # reading images
    image = cv2.imread(filename_img)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # reading landmarks
    land = np.array(data['landmarks']['points'])

    # extracting bounding box by padding landmarks
    ys = int(np.min(land[:,0]))-40
    ye = int(np.max(land[:,0]))+5
    xs = int(np.min(land[:,1]))-5
    xe = int(np.max(land[:,1]))+5

    # normalize landmarks
    land[:,0] = (land[:,0] - ys) / (ye - ys)
    land[:,1] = (land[:,1] - xs) / (xe - xs)

    # downsample landmarks
    landmarks = np.zeros((54, 2))
    landmarks[:49,:] = land[:49,:]
    landmarks[49,:] = land[51,:]
    landmarks[50,:] = land[54,:]
    landmarks[51,:] = land[57,:]
    landmarks[52,:] = land[62,:]
    landmarks[53,:] = land[66,:]

    landmarks = landmarks.flatten()

    # crop image to bounding box, resize, normalize
    crop_image = image[ys:ye+1, xs:xe+1]
    crop_image = cv2.resize(crop_image, (w, h) )
    crop_image = crop_image.astype('float32')
    crop_image = crop_image / 255.0

    img_aachen.append(crop_image)
    l_aachen.append(landmarks)

    # closing file
    f.close()

In [None]:
# expand dimensions so that images have one channel
img_aachen = np.expand_dims(np.array(img_aachen), axis = 3)
l_aachen = np.array(l_aachen)

# sanity check
print(img_aachen.shape)
print(l_aachen.shape)

### Model

p5 = Conv2D(256, (1, 1), activation='relu', padding='same')(conv5_)
    p4 = concatenate([UpSampling2D(size=(4, 4))(p5), conv4_])
    p3 = concatenate([UpSampling2D(size=(2, 2))(p4), conv3_])

    # Lateral connections
    p3 = Conv2D(256, (1, 1), activation='relu', padding='same')(p3)
    p4 = Conv2D(256, (1, 1), activation='relu', padding='same')(p4)
    p5 = Conv2D(256, (1, 1), activation='relu', padding='same')(p5)

In [None]:

def create_model(image_shape):
    # Prepare the kernel initializer values
    weight_init = RandomNormal(stddev=0.02)

    # Prepare the Input layer
    net_input = Input((image_shape))

    # Download mobile net, and use it as the base.
    mobile_net_base = MobileNetV2(
        include_top=False,
        input_shape=(224, 224, 3),
        weights='imagenet'
    )
    resized_input = tf.image.resize(net_input, (224, 224))
    expand_channels = Conv2D(3, (1, 1), padding='same', use_bias=False,
                             kernel_initializer=RandomNormal(stddev=0.02))(resized_input)
    mobilenet = mobile_net_base(expand_channels)


    # Encoder block #
    # 224x224
    conv1 = Conv2D(64, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(net_input)
    conv1 = LeakyReLU(alpha=0.2)(conv1)

    # 112x112
    conv2 = Conv2D(128, (3, 3), strides=(1, 1), padding='same', kernel_initializer=weight_init)(conv1)
    conv2 = LeakyReLU(alpha=0.2)(conv2)

    # 112x112
    conv3 = Conv2D(128, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(conv2)
    conv3 =  Activation('relu')(conv3)

    # 56x56
    conv4 = Conv2D(256, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(conv3)
    conv4 = Activation('relu')(conv4)

    # 28x28
    conv4_ = Conv2D(256, (3, 3), strides=(1, 1), padding='same', kernel_initializer=weight_init)(conv4)
    conv4_ = Activation('relu')(conv4_)

    # 28x28
    conv5 = Conv2D(512, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(conv4_)
    conv5 = Activation('relu')(conv5)


    # 14x14
    conv5_ = Conv2D(256, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(conv5)
    conv5_ = Activation('relu')(conv5_)

    #7x7

    #Pyramid
    p5 = Conv2D(256, (1, 1), activation='relu', padding='same')(conv5_)
    p4 = concatenate([UpSampling2D(size=(2, 2))(p5), conv5 ])
    p3 = concatenate([UpSampling2D(size=(2, 2))(p4), conv4_ ])

    # Lateral connections
    p3 = Conv2D(256, (1, 1), activation='relu', padding='same')(p3)
    p4 = Conv2D(256, (1, 1), activation='relu', padding='same')(p4)
    p5 = Conv2D(256, (1, 1), activation='relu', padding='same')(p5)



    mobilenet = tf.image.resize(mobilenet, (8, 8))

    # Fusion layer - Connects MobileNet with our encoder
    fusion = concatenate([mobilenet, p5])
    #fusion = Conv2D(512, (1, 1), padding='same', kernel_initializer=weight_init)(conc)
    #fusion = Activation('relu')(fusion)




    # Decoder block #
    # 7x7
    decoder = Conv2DTranspose(512, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(fusion)
    decoder = Activation('relu')(decoder)
    decoder = Dropout(0.25)(decoder)


    fusion2 = concatenate([decoder, p4])

    # 14x14
    decoder = Conv2DTranspose(256, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(fusion2)
    decoder = Activation('relu')(decoder)
    decoder = Dropout(0.25)(decoder)


    fusion3 = concatenate([decoder, p3])

    # 28x28
    decoder = Conv2DTranspose(128, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(fusion3)
    decoder = Activation('relu')(decoder)
    decoder = Dropout(0.25)(decoder)

    # 56x56
    decoder = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(decoder)
    decoder = Activation('relu')(decoder)
    decoder = Dropout(0.25)(decoder)

    # 112x112
    decoder = Conv2DTranspose(64, (3, 3), strides=(1, 1), padding='same', kernel_initializer=weight_init)(decoder)
    decoder = Activation('relu')(decoder)

    # 112x112
    decoder = Conv2DTranspose(32, (3, 3), strides=(2, 2), padding='same', kernel_initializer=weight_init)(decoder)
    decoder = Activation('relu')(decoder)

    # 224x224
    # Output layer, with 1 channel (assuming grayscale output)
    output_layer = Conv2D(1, (1, 1), activation='tanh')(decoder)

    model = Model(net_input, output_layer)

    return model


In [None]:

# Assuming MK is created using the create_model function
MK = create_model((256,256,1))
MK.summary()

 conv2d_9 (Conv2D)              (None, 32, 32, 256)  262400      ['concatenate_1[0][0]']          
                                                                                                  
 concatenate_4 (Concatenate)    (None, 32, 32, 512)  0           ['dropout_1[0][0]',              
                                                                  'conv2d_9[0][0]']               
                                                                                                  
 conv2d_transpose_2 (Conv2DTran  (None, 64, 64, 128)  589952     ['concatenate_4[0][0]']          
 spose)                                                                                           
                                                                                                  
 activation_7 (Activation)      (None, 64, 64, 128)  0           ['conv2d_transpose_2[0][0]']     
                                                                                                  
 dropout_2

### First Stage: Pre-Training

In [None]:
# create checkpoint
checkpoint_dir = "checkpoints-1"
callback_checkpoint = keras.callbacks.ModelCheckpoint(filepath=checkpoint_dir+"/landmark",
                                                      save_weights_only=True,
                                                      save_best_only=True,monitor='val_loss',
                                                      verbose=1)

# initializing callback function
callbacks0 = [callback_checkpoint]

if not tf.io.gfile.exists(checkpoint_dir):
    tf.io.gfile.mkdir(checkpoint_dir)
    print("Checkpoint directory created: {}".format(checkpoint_dir))

# if checkpoint exists, load the weights
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)

if latest_checkpoint:
    print("Checkpoint found: {}, restoring...".format(latest_checkpoint))
    MK.load_weights(latest_checkpoint)
    print("Checkpoint restored: {}".format(latest_checkpoint))
else:
    print("Checkpoint not found. Model weights will be initialized randomly.")

In [None]:
# learning rate scheduler

def lr_schedule( epoch ):

    lr = 5e-4

    if epoch >= 10:
        lr = 1e-4

    print("Learning rate: ", lr)
    return lr

In [None]:
# setting learning rate scheduler
lr_scheduler = LearningRateScheduler(lr_schedule)

# setting early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

callbacks0.append(lr_scheduler)
callbacks0.append(es)

In [None]:
# compile
MK.compile(loss='binary_crossentropy', optimizer = keras.optimizers.Adam(lr_schedule(0)))

In [None]:
# train

history = MK.fit(img_train, bmask_train, validation_data=(img_val, bmask_val), shuffle=True, batch_size=16, epochs=100, callbacks=callbacks0)

In [None]:
# plot loss vs epochs curve

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.ylabel('Loss')
plt.xlabel('# epochs')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# visualizing predicted mask

def visualize_mask_prediction(ind, images, masks_gt):

    fig, axs = plt.subplots(1, 3, figsize=(15,5))

    # predict mask on one image
    img = images[ind].copy()
    img = np.expand_dims(img, axis = 0)
    mask_pr = MK.predict(img)

    # plot
    axs[0].imshow(img[0, :,:,0], cmap = 'gray')           # image
    axs[1].imshow(masks_gt[ind, :,:,0], cmap = 'gray')    # ground truth mask
    axs[2].imshow(mask_pr[0, :,:,0], cmap = 'gray')       # predicted mask

    axs[0].axis('off')
    axs[1].axis('off')
    axs[2].axis('off')

In [None]:
visualize_mask_prediction(42, img_train, bmask_train)

In [None]:
visualize_mask_prediction(1028, img_test, bmask_test)

### Second Stage: Pre-Training

In [None]:
# fully connected layers
avg_pooling = AveragePooling2D(pool_size=(7, 7))(MK.output)
flat = Flatten() (avg_pooling)

fc0 = Dense(2048, activation = 'relu') (flat)
dropout0 = Dropout(0.3)(fc0)
fc1 = Dense(512, activation = 'relu') (dropout0)
dropout1 = Dropout(0.3)(fc1)
fc2  = Dense(108, activation = 'sigmoid') (dropout1)

# define full model
model_full = keras.Model(inputs=MK.input, outputs=fc2)

# freeze layers
MK.trainable = False

In [None]:
# to check whether layers are trainable or not

for i,layer in enumerate(model_full.layers):
    print(i,layer.name,layer.trainable)

In [None]:
# Wing loss

def wing_loss(land_gt, land_pred, w=10.0, epsilon=2.0):

    with tf.name_scope('wing_loss'):

        # compute constant C
        C = w * (1.0 - math.log(1.0 + w / epsilon))

        x = land_gt - land_pred
        abs_x = tf.abs(x)

        # if absolute x is smaller than w, then first equation
        # otherwise, second
        losses = tf.where(tf.greater(w, abs_x), w * tf.math.log(1.0 + abs_x / epsilon), abs_x - C)
        loss = tf.reduce_mean(losses)

        return loss

In [None]:
# create checkpoint
checkpoint_dir = "checkpoints-2"

callback_checkpoint = keras.callbacks.ModelCheckpoint(filepath=checkpoint_dir+"/landmark",
                                                      save_weights_only=True,
                                                      save_best_only=True,monitor='val_loss',
                                                      verbose=1)

# initializing callback function
callbacks1 = [callback_checkpoint]

if not tf.io.gfile.exists(checkpoint_dir):
    tf.io.gfile.mkdir(checkpoint_dir)
    print("Checkpoint directory created: {}".format(checkpoint_dir))

# if checkpoint exists, load the weights
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
    print("Checkpoint found: {}, restoring...".format(latest_checkpoint))
    MK.load_weights(latest_checkpoint)
    print("Checkpoint restored: {}".format(latest_checkpoint))
else:
    print("Checkpoint not found. Model weights will be initialized randomly.")


In [None]:
# learning rate scheduler
def lr_schedule( epoch ):

    lr = 1e-3

    if epoch >= 20:
        lr = 1e-4

    print("Learning rate: ", lr)
    return lr

In [None]:
# setting learning rate scheduler
lr_scheduler = LearningRateScheduler(lr_schedule)

# setting early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

callbacks1.append(lr_scheduler)
callbacks1.append(es)

In [None]:

def anisotropic_loss(land_gt, land_pred, a=4.0, b=2.0):


    land_gt = tf.reshape(land_gt, (-1, land_gt.shape[1] // 2, 2))
    land_pred = tf.reshape(land_pred, (-1, land_pred.shape[1] // 2, 2))

    # Compute tangent vectors ti at each point
    ti = (land_pred[:, 2:] - land_pred[:, :-2]) / tf.norm(land_pred[:, 2:] - land_pred[:, :-2], axis=-1, keepdims=True)
    ti = tf.concat([ti[:, :1], ti, ti[:, -1:]], axis=1)  # Extend ti for the endpoints

    # Compute weight matrices Wi based on the tangent vectors
    Wi = 1 / a**2 * tf.einsum('bij,bik->bijk', ti, ti) + 1 / b**2 * (tf.eye(2) - tf.einsum('bij,bik->bijk', ti, ti))

    # Compute the loss
    loss = tf.reduce_sum(tf.einsum('bij,bij->bi', land_gt - land_pred, tf.einsum('bijk,bij->bik', Wi, land_gt - land_pred)))

    # Average the loss over the batch
    loss = tf.reduce_mean(loss)

    return loss


In [None]:
# compile
model_full.compile(optimizer=keras.optimizers.Adam(lr_schedule(0)), loss=anisotropic_loss)# 'mean_squared_error')#wing_loss)#anisotropic_loss


In [None]:
# train
history2 = model_full.fit(img_train, l_train, validation_data=(img_val, l_val), batch_size=16, shuffle=True, epochs=100, callbacks=callbacks1)

In [None]:
# plot loss vs epochs curve

plt.plot(history2.history['loss'])
plt.plot(history2.history['val_loss'])
plt.ylabel('Loss')
plt.xlabel('# epochs')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# visualize prediction on single image

def visualize_pred(ind, images, landmarks):

    plt.figure(figsize=(5,5))

    # load an image
    img = images[ind].copy()
    img = np.expand_dims(img, axis = 0)
    plt.imshow(img[0][:,:,0], cmap='gray')

    land_pred = model_full.predict(img)

    # ground truth landmarks
    keypoints = landmarks[ind].copy().reshape(KEYPOINTS,2)
    keypoints[:,0] = keypoints[:,0] * h
    keypoints[:,1] = keypoints[:,1] * w
    #plt.plot(keypoints[:,1], keypoints[:,0], 'go', markersize=3)

    # predicted landmarks
    keypointsT = land_pred[0].copy().reshape(KEYPOINTS,2)
    keypointsT[:,0] = keypointsT[:,0] * h
    keypointsT[:,1] = keypointsT[:,1] * w
    plt.plot(keypointsT[:,1], keypointsT[:,0], 'ro', markersize=3)

    plt.axis('off')

In [None]:
visualize_pred(42, img_train, l_train)

In [None]:
visualize_pred(1028, img_test, l_test)

In [None]:
# visualize prediction on random ten images

def visualize_pred_rand(images, landmarks, gt=True):

    fig, axs = plt.subplots(2, 3, figsize=(10,6))
    plt.subplots_adjust(wspace=0, hspace=0.1)

    # selecting 10 random images from set
    indexes = np.random.randint(0,images.shape[0],10)
    img = images[indexes]
    land = landmarks[indexes]
    land_pred = model_full.predict(img)

    # iterate over images
    ind = 0

    for i in range(2):
        for j in range(3):

            # display image
            axs[i, j].imshow(img[ind][:,:,0], cmap='gray')

            # display predicted landmarks
            keypointsT = land_pred[ind].copy().reshape(KEYPOINTS,2)
            keypointsT[:,0] = keypointsT[:,0] * h
            keypointsT[:,1] = keypointsT[:,1] * w

            axs[i, j].plot(keypointsT[:,1], keypointsT[:,0], 'ro', markersize=2)

            # if gt is true, display ground truth landmarks
            if gt:
                keypoints = land[ind].copy().reshape(KEYPOINTS,2)
                keypoints[:,0] = keypoints[:,0] * h
                keypoints[:,1] = keypoints[:,1] * w
                axs[i, j].plot(keypoints[:,1], keypoints[:,0], 'go', markersize=2)

            # turn off axis display
            axs[i, j].axis('off')

            ind += 1

    # save figure
    plt.savefig("results.png")

In [None]:
visualize_pred_rand(img_test, l_test)

In [None]:
def NME(land, land_pred, split, folder_path):
    N = land.shape[0]
    l = land.reshape(land.shape[:-1] + (KEYPOINTS, 2))
    lt = land_pred.reshape(land_pred.shape[:-1] + (KEYPOINTS, 2))

    # Error calculation
    nme = (np.sum(np.sqrt(np.sum((l - lt) ** 2, axis=2)), axis=1)
           / np.sqrt(np.sum((l[:, 45] - l[:, 36]) ** 2, axis=1))) / KEYPOINTS

    # Average over the whole sample
    nme_avg = np.sum(nme) / N

    sorted_nme = np.sort(nme)
    cumulative_errors = np.arange(1, N + 1) / N * 100

    threshold = 0.1
    failure_count = np.sum(nme >= threshold)
    failure_count2 = np.sum(nme >= 0)
    failure_rate = (failure_count / N) * 100

    # Create a folder if it doesn't exist
    result_folder = os.path.join(folder_path, split)
    os.makedirs(result_folder, exist_ok=True)

    # Save results to a file
    results_file_path = os.path.join(result_folder, f'{split}_results.txt')
    with open(results_file_path, 'w') as f:
        f.write(f'Average NME: {nme_avg}\n')
        f.write(f'Failure Rate ({threshold * 100}% threshold): {failure_rate}%\n')
        f.write(f'sorted_nme : {", ".join(map(str, sorted_nme))}\n')
        f.write(f'cumulative_errors : {", ".join(map(str, cumulative_errors))}\n')

    # Plot and save the CED
    ced_plot_path = os.path.join(result_folder, f'{split}_ced_plot.png')
    plt.plot(sorted_nme, cumulative_errors)
    plt.xlabel('NME')
    plt.ylabel('Percentage of Samples (%)')
    plt.title(f'Cumulative Error Distribution (CED) - Threshold {threshold * 100}%')
    plt.grid(True)
    plt.savefig(ced_plot_path)
    plt.close()

    return nme_avg, failure_rate, cumulative_errors, sorted_nme


In [None]:
# predicting landmarks
l_test_pred = model_full.predict(img_test)
l_train_pred = model_full.predict(img_train)
l_val_pred = model_full.predict(img_val)

# evaluating NME of predictions
print("Normalized Mean Error on training set: ", NME(l_train, l_train_pred,"train1", Results_path))
print("Normalized Mean Error on validation set: ", NME(l_val, l_val_pred,"val1", Results_path))
print("Normalized Mean Error on testing set: ", NME(l_test, l_test_pred,"test1", Results_path))

### Third Stage: Full Training

In [None]:
# unfreezing the layers
MK.trainable = True

for i,layer in enumerate(model_full.layers):
    print(i,layer.name,layer.trainable)

In [None]:
# create checkpoint
checkpoint_dir = "checkpoints-3"

callback_checkpoint = keras.callbacks.ModelCheckpoint(filepath=checkpoint_dir+"/landmark",
                                                      save_weights_only=True,
                                                      save_best_only=True,monitor='val_loss',
                                                      verbose=1)

# initializing callback function
callbacks2 = [callback_checkpoint]

if not tf.io.gfile.exists(checkpoint_dir):
    tf.io.gfile.mkdir(checkpoint_dir)
    print("Checkpoint directory created: {}".format(checkpoint_dir))

# if checkpoint exists, load the weights
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)

if latest_checkpoint:
    print("Checkpoint found: {}, restoring...".format(latest_checkpoint))
    model_full.load_weights(latest_checkpoint)
    print("Checkpoint restored: {}".format(latest_checkpoint))
else:
    print("Checkpoint not found. Model weights will be initialized randomly.")

In [None]:
# learning rate scheduler

def lr_schedule( epoch ):

    lr = 1e-4

    print("Learning rate: ", lr)
    return lr

In [None]:
# setting learning rate scheduler
lr_scheduler = LearningRateScheduler(lr_schedule)

# setting early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

callbacks2.append(lr_scheduler)
callbacks2.append(es)

In [None]:
# compile
model_full.compile(optimizer = keras.optimizers.Adam(lr_schedule(0)), loss=anisotropic_loss)#'mean_squared_error')#wing_loss)#'mean_squared_error')#

In [None]:
# train
history3 = model_full.fit(img_train, l_train, validation_data=(img_val, l_val), batch_size=16, shuffle=True, epochs=100, callbacks=callbacks2)

In [None]:
# plot loss vs epochs curve

plt.plot(history3.history['loss'])
plt.plot(history3.history['val_loss'])
plt.ylabel('Loss')
plt.xlabel('# epochs')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# load best weights from checkpoint

In [None]:
# visualize ten random predictions from test set

visualize_pred_rand(img_test, l_test, False)

In [None]:
# visualize ten random predictions from RWTH-Aachen set

visualize_pred_rand(img_aachen, l_aachen, False)

In [None]:
visualize_pred(1020, img_test, l_test)

# calculate Mean Absolute Error

def MAE(l, l_pred):
    
    mae = keras.losses.mean_absolute_error(l, l_pred)
    
    return np.mean(tf.Session().run(mae))

In [None]:
# calculate average time needed to predict landmarks on an image

def inference_speed(images):

    timings = []

    # iterate over images
    for img in images:

        img = np.expand_dims(img, axis = 0)

        # record time
        start = time.time()
        preds = model_full.predict(img)
        end = time.time()

        timings.append(end - start)

    return np.average(timings)

In [None]:
# predicting landmarks
l_train_pred = model_full.predict(img_train)
l_val_pred = model_full.predict(img_val)
l_test_pred = model_full.predict(img_test)
l_test_aa_pred = model_full.predict(img_aachen)

In [None]:
# evaluating NME of predictions
# evaluating NME of predictions
print("Normalized Mean Error on training set: ", NME(l_train, l_train_pred,"train2-2-1", Results_path))
print("Normalized Mean Error on validation set: ", NME(l_val, l_val_pred,"val2-2-1", Results_path))
print("Normalized Mean Error on testing set: ", NME(l_test, l_test_pred,"test2-2-1", Results_path))
print("Normalized Mean Error on RWTH-Aachen: ", NME(l_aachen, l_test_aa_pred,"test-aachen", Results_path))


In [None]:
# inference speed

print("Inference speed (average per image): ", inference_speed(img_test))

In [None]:
# save weights

model_full.save('model.h5')
print("Model size (KB): ", os.path.getsize('model.h5')/1000)