In [None]:
import os, pathlib
!pip uninstall gdown -y && pip install gdown
if not os.path.exists('/content/weights'): os.mkdir('/content/weights')
if not os.path.exists('/content/data'): os.mkdir('/content/data')
if not os.path.exists('/content/results'): os.mkdir('/content/results')
if not os.path.exists('/content/weights/EfficientNetB0-CurriculumLearning.h5'):
  !gdown 1FLI2r7MtfczKdt9C4Og0mvDGtoD7Jq92 -O /content/weights/EfficientNetB0-CurriculumLearning.h5
if not os.path.exists('/content/SyntheticDatabase_testingset.zip'):
  !gdown 1vZgc6ofqKcRXVJUHCZ9Lb1r6SFCuuV51 -O /content/SyntheticDatabase_testingset.zip
  !unzip '/content/SyntheticDatabase_testingset.zip' -d '/content/data'

import time, pathlib
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '5'
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer, InputSpec
from tensorflow.python.keras.utils import conv_utils
from skimage.transform import resize
import numpy as np
import cv2

%reload_ext autoreload
%autoreload 2

# Utils

In [2]:
class BilinearUpSampling2D(Layer):
    def __init__(self, size=(2, 2), data_format=None, **kwargs):
        super(BilinearUpSampling2D, self).__init__(**kwargs)
        self.data_format = self.normalize_data_format(data_format)
        self.size = conv_utils.normalize_tuple(size, 2, 'size')
        self.input_spec = InputSpec(ndim=4)

    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            height = self.size[0] * input_shape[2] if input_shape[2] is not None else None
            width = self.size[1] * input_shape[3] if input_shape[3] is not None else None
            return (input_shape[0],
                    input_shape[1],
                    height,
                    width)
        elif self.data_format == 'channels_last':
            height = self.size[0] * input_shape[1] if input_shape[1] is not None else None
            width = self.size[1] * input_shape[2] if input_shape[2] is not None else None
            return (input_shape[0],
                    height,
                    width,
                    input_shape[3])

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        if self.data_format == 'channels_first':
            height = self.size[0] * input_shape[2] if input_shape[2] is not None else None
            width = self.size[1] * input_shape[3] if input_shape[3] is not None else None
        elif self.data_format == 'channels_last':
            height = self.size[0] * input_shape[1] if input_shape[1] is not None else None
            width = self.size[1] * input_shape[2] if input_shape[2] is not None else None
        return tf.compat.v1.image.resize(inputs, [height, width], method=tf.image.ResizeMethod.BILINEAR, align_corners=True)

    def get_config(self):
        config = {'size': self.size, 'data_format': self.data_format}
        base_config = super(BilinearUpSampling2D, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

    def normalize_data_format(self, value):
        if value is None:
            value = tf.keras.backend.image_data_format()
        data_format = value.lower()
        if data_format not in {'channels_first', 'channels_last'}:
            raise ValueError('The `data_format` argument must be one of '
                         '"channels_first", "channels_last". Received: ' +
                         str(value))
        return data_format

def DepthNorm(x, maxDepth):
    return maxDepth / x

def resize_img(img, resolution=480):
    return resize(img, (resolution, int(resolution*4/3)), preserve_range=True, mode='reflect', anti_aliasing=True )

def predict(model, images, minDepth=0.1, maxDepth=25, batch_size=2):
    # Support multiple RGBs, one RGB image, even grayscale
    if len(images.shape) < 3: images = np.stack((images,images,images), axis=2)
    if len(images.shape) < 4: images = images.reshape((1, images.shape[0], images.shape[1], images.shape[2]))
    # Compute predictions
    predictions = model.predict(images, batch_size=batch_size, verbose=0)

    # Put in expected range
    return np.clip(DepthNorm(predictions, maxDepth=25), minDepth, maxDepth) / maxDepth

def scale_up(scale, images):
    from skimage.transform import resize
    scaled = []

    for i in range(len(images)):
        img = images[i]
        output_shape = (scale * img.shape[0], scale * img.shape[1])
        scaled.append( resize(img, output_shape, order=1, preserve_range=True, mode='reflect', anti_aliasing=True ) )

    return np.stack(scaled)

def gamma_table(gamma=1.0):
    invGamma = 1.0 / gamma
    table_encoded = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype('uint8')
    table_decoded = np.array([((i / 255.0) ** gamma) * 255 for i in np.arange(0, 256)]).astype('uint8')
    return table_encoded, table_decoded

def compute_errors(gt, pred):
  thresh = np.maximum((gt / pred), (pred / gt))
  a1 = (thresh < 1.25   ).mean()
  a2 = (thresh < 1.25 ** 2).mean()
  a3 = (thresh < 1.25 ** 3).mean()
  abs_rel = np.mean(np.abs(gt - pred) / gt)
  rmse = (gt - pred) ** 2
  rmse = np.sqrt(rmse.mean())
  log_10 = (np.abs(np.log10(gt)-np.log10(pred))).mean()
  return a1, a2, a3, abs_rel, rmse, log_10


def sortingFiles(arr, n):
  arr.sort(key = lambda x: (len(x), x))
  return arr

def findFirstVector(value,vector):
  res = next(x for x, val in enumerate(vector) if val > value)
  return res

def evaluate_colon(model, batch_size=1, maxDepth=25, path_data = 'data_test/', params={}, verbose=True):

    bs = batch_size
    model_path = params['model_path']
    path_save = params['path_save']
    isRealVid = params['isRealVid']
    isSingleFolder = params['isSingleFolder']
    imgSave = params['imgSave']
    data_set=params['data_set']

    shape_rgb = (bs, 480, 640, 3)
    shape_depth = (bs, 480, 640)
    table_gamma, decode = gamma_table(gamma=1.5)

    for vid_name in os.listdir(path_data):

        if isSingleFolder:
          vid_path = path_data
          vid_path_save = path_save
        else:
          vid_path = path_data + vid_name + '/'
          vid_path_save = path_save + vid_name + '/'
        if not os.path.isdir(vid_path): continue

        print('\n------------------------------------------------------------------------------')
        print(vid_path)


        if not isSingleFolder and not(os.path.exists(path_save + vid_name + '/')):
            os.mkdir(path_save + vid_name + '/')


        file_list = []
        for file in os.listdir(vid_path+'img/'):
            file_list.append(file)


        files = sortingFiles(file_list,len(file_list))
        nfr = len(files)

        depth_scores = np.zeros((6, nfr))


        for i in range(nfr//bs):

            x = np.ndarray(shape_rgb)
            true_y = np.ndarray(shape_depth)
            idx = 0
            for ii in range(i*bs,(i+1)*bs,1):

                img= cv2.imread(vid_path + 'img/' + files[ii])

                x[idx,:,:,:] = resize_img(img,480)

                if not isRealVid:
                    true_y[idx,:,:] = resize_img(cv2.imread(vid_path + 'z/' + files[ii])[:,:,0],480)


                true_y[idx,:,:] = np.clip(np.asarray(true_y[idx,:,:], dtype=float) / 255, 0, 1)
                idx += 1

            # Compute results
            pred_y = scale_up(2, predict(model, x/255, minDepth=0.1,
                                         maxDepth=25, batch_size=bs)[:,:,:,0])

            idx = 0
            y_gamma = np.zeros((bs,480,640),dtype=np.uint8)

            # Invert gamma correction
            for ii in range(i*bs,(i+1)*bs,1):

                y_gamma[idx,:,:] = cv2.LUT((pred_y[idx,:,:]*255).astype('uint8'), decode)
                pred_color = np.uint8(y_gamma[idx,:,:])


                if imgSave:

                  vis = pred_color
                  print('save: ' + vid_path_save + files[ii])
                  cv2.imwrite(vid_path_save + files[ii], vis)

                idx += 1

            # Compute errors per image in batch
            if not isRealVid:
                for j in range(len(true_y)):
                    errors = compute_errors(true_y[j]*maxDepth, (y_gamma[j]/255)*maxDepth)


                for k in range(len(errors)):
                    depth_scores[k][(i*bs)+j] = errors[k]

                if imgSave:
                  print(vid_name + ' batch '+ str(i)+'/'+str(nfr//bs) + \
                        ' rmse: ' + str(round(depth_scores[4][(i*bs)+j],3)) + \
                        ' acc: ' + str(round(depth_scores[0][(i*bs)+j],3)))
                else:
                  print('\r\t'+vid_name + ' batch '+ str(i)+'/'+str(nfr//bs) + \
                        ' rmse: ' + str(round(depth_scores[4][(i*bs)+j],3)) + \
                        ' acc: ' + str(round(depth_scores[0][(i*bs)+j],3)), end='')
            else:
                print(vid_name + ' batch '+ str(i)+'/'+str(nfr//bs))



        e = depth_scores.mean(axis=1)
        if not isRealVid:
            np.savetxt(vid_path + 'error.csv', e, delimiter='')
            if verbose:
                print('\n-----------------------')
                print('Metrics of', vid_name)
                print("{:>10}, {:>10}, {:>10}, {:>10}, {:>10}, {:>10}".format( \
                        'a1', 'a2', 'a3', 'rel', 'rms', 'log_10'))
                print("{:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}".format(e[0],e[1],e[2],e[3],e[4],e[5]))

        if isSingleFolder == True: break
    return e

# Main

In [None]:
params = {}

params['weights'] = 'EfficientNetB0-CurriculumLearning.h5'
params['model_path'] = '/content/weights/' + params['weights']

params['data_set'] ='SyntheticDatabase_testingset'
params['path_data'] = '/content/data/' + params['data_set'] + '/'
params['path_save'] = '/content/results/'+ params['data_set'] +'/'

params['isRealVid'] = False
params['isSingleFolder'] = False
params['imgSave'] = False

if not os.path.exists(params['path_save']): os.mkdir(params['path_save'])

custom_objects = {'BilinearUpSampling2D': BilinearUpSampling2D}
print('Loading model ',params['model_path'] )
model = tf.keras.models.load_model(params['model_path'],custom_objects=custom_objects, compile=False)
start = time.time()
print('Testing...')

e = evaluate_colon(model, maxDepth=25, path_data=params['path_data'], params=params)
print("{:>10}, {:>10}, {:>10}, {:>10}, {:>10}, {:>10}".format('a1', 'a2', 'a3', 'rel', 'rms', 'log_10'))
print("{:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}, {:10.4f}".format(e[0],e[1],e[2],e[3],e[4],e[5]))

end = time.time()
print('\nTest time', end-start, 's')

print('\n\n'+ '-'*100 + '\nGLOBAL METRICS\n'+'-'*100)

path_vids='./results/' + params['data_set'] + '/'
accuracy_list =[]
rmse_list = []

for vid_name in os.listdir(path_vids):
  if vid_name[0] == '.': continue
  path_csv = path_vids + vid_name + '/error.csv'
  with open(path_csv) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    line_count = 0
    for row in csv_reader:
      if line_count == 0: accuracy_list.append(float(row[0]))
      if line_count == 4: rmse_list.append(float(row[0]))
      line_count +=1


print('Reported results in Synthetic database (Level 5) with the best configuration of SfSNet:\n' +
      'EfficientNetB0 using Curriculum learning strategy\n\n')
print('Threshold Accuracy:',np.mean(accuracy_list), '%')
print('Root Mean Square Error:',np.mean(rmse_list),'cm' )