## Import Libraries

In [None]:
import os
from matplotlib import pyplot as plt
import random
import numpy as np
from zipfile import ZipFile
from io import BytesIO
from PIL import Image
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, UpSampling2D, LeakyReLU, Concatenate, Layer, InputSpec
from tensorflow.keras import Model
from tensorflow.keras.applications import DenseNet169, DenseNet121
from keras.models import load_model
import keras.utils.conv_utils as conv_utils
import keras.backend as K
from skimage.transform import resize
import skimage

## Model Selection

In [None]:
# choose whether to use our model or pretrained model
method = input("Choose which model to use - checkpoints | pretrained:")
print(f"Using {method} model")

In [None]:
# choose pretrained model for the encoder
pretrained_enc_model = input("Choose model used for encoder - DenseNet121 | DenseNet169:")
print(f"Using {pretrained_enc_model} for the encoder.")

## Load Model from Checkpoints

In [None]:
class UpscaleBlock(Model):
    # Define an upscaling block with skip connections
    def __init__(self, filters, name):
        super(UpscaleBlock, self).__init__()
        self.up = UpSampling2D(size=(2, 2), interpolation='bilinear', name=name + '_upsampling2d')
        self.concat = Concatenate(name=name + '_concat')
        self.convA = Conv2D(filters, 3, 1, 'same', name=name + '_convA')
        self.reluA = LeakyReLU(alpha=0.2)
        self.convB = Conv2D(filters, 3, 1, 'same', name=name + '_convB')
        self.reluB = LeakyReLU(alpha=0.2)

    # Define the forward pass through the block
   
    def call(self, x):
        upsampled = self.up(x[0]) # Upsample the input tensor
        concatenated = self.concat([upsampled, x[1]]) # Concatenate the upsampled tensor with the skip connection
        convA_output = self.reluA(self.convA(concatenated)) # Perform the convolution and apply the LeakyReLU activation function
        convB_output = self.reluB(self.convB(convA_output)) # Perform another convolution and apply the LeakyReLU activation function
        return convB_output

In [None]:
class Encoder(Model):
    # Define an encoder based on the DenseNet-169 architecture
    def __init__(self):
        super(Encoder, self).__init__()
        if pretrained_enc_model == 'DenseNet121':
            self.base_model = DenseNet121(input_shape=(None, None, 3), include_top=False, weights='imagenet')   
            print('Base model loaded {}'.format(DenseNet121.__name__))
        elif pretrained_enc_model == 'DenseNet169':
            self.base_model = DenseNet169(input_shape=(None, None, 3), include_top=False, weights='imagenet')   
            print('Base model loaded {}'.format(DenseNet169.__name__))
        
        layer_names = ['pool1', 'pool2_pool', 'pool3_pool', 'conv1/relu']
        outputs = [self.base_model.get_layer(name).output for name in layer_names]
        outputs.insert(0, self.base_model.outputs[-1])
        self.encoder = Model(inputs=self.base_model.inputs, outputs=outputs)

    # Define the forward pass through the encoder
    def call(self, x):
        return self.encoder(x)

In [None]:
class Decoder(Model):
    # Define a decoder with skip connections and upscaling blocks
    def __init__(self, decode_filters):
        super(Decoder, self).__init__()
        self.conv2 = Conv2D(decode_filters, 1, padding='same', name='conv2')
        self.up1 = UpscaleBlock(decode_filters // 2, name='up1')
        self.up2 = UpscaleBlock(decode_filters // 4, name='up2')
        self.up3 = UpscaleBlock(decode_filters // 8, name='up3')
        self.up4 = UpscaleBlock(decode_filters // 16, name='up4')
        self.conv3 = Conv2D(1, 3, 1, padding='same', name='conv3')

    # Define the forward pass through the decoder
    def call(self, features):
        x, pool1, pool2, pool3, conv1 = features
        up0 = self.conv2(x) # Perform a convolution on the input tensor
        up1 = self.up1([up0, pool3]) # Apply an upscaling block with skip connections
        up2 = self.up2([up1, pool2]) # Apply another upscaling block with skip connections
        up3 = self.up3([up2, pool1]) # Apply another upscaling block with skip connections
        up4 = self.up4([up3, conv1]) # Apply another upscaling block with skip connections
        return self.conv3(up4)

In [None]:
class DepthEstimate(Model):
    # Define the full depth estimation model
    def __init__(self):
        super(DepthEstimate, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder(decode_filters=int(self.encoder.layers[-1].output[0].shape[-1] // 2))
        print('\nModel created.')

    # Define the forward pass through the depth estimation model
    def call(self, x):
        return self.decoder(self.encoder(x))

In [None]:
# Check if the method is set to "checkpoints"
if method == "checkpoints":
    # Create an instance of the DepthEstimate model
    model = DepthEstimate()
    
    # Define the path to the checkpoint file based on the pretrained_enc_model variable
    checkpoint_path = f"/kaggle/input/{pretrained_enc_model.lower()}-checkpoints/training_1/cp.ckpt"
    
    # Load the weights of the model from the checkpoint file
    model.load_weights(checkpoint_path)
    
    # Print a message to indicate that the model weights have been successfully loaded
    print('Model weights loaded.')


## Load Pretrained Model

In [None]:
def normalize_data_format(value):
    # Check if the value is None, if so, use the current data format
    data_format = K.image_data_format() if value is None else value.lower()
    
    # Check if the data format is valid, it should be either 'channels_first' or 'channels_last'
    if data_format not in {'channels_first', 'channels_last'}:
        # Raise a ValueError if the data format is not valid
        raise ValueError('The `data_format` argument must be one of '
                         '"channels_first", "channels_last". Received: ' +
                         str(value))
    
    # Return the normalized data format
    return data_format

In [None]:
class BilinearUpSampling2D(Layer):
    def __init__(self, size=(2, 2), data_format=None, **kwargs):
        # Initialize the BilinearUpSampling2D layer
        super(BilinearUpSampling2D, self).__init__(**kwargs)
        
        # Normalize the data format and assign it to the layer
        self.data_format = normalize_data_format(data_format)
        
        # Normalize the size tuple and assign it to the layer
        self.size = conv_utils.normalize_tuple(size, 2, 'size')
        
        # Specify the input shape of the layer
        self.input_spec = InputSpec(ndim=4)

    def compute_output_shape(self, input_shape):
        # Compute the output shape of the layer based on the input shape and the size
        height = self.size[0] * input_shape[2] if self.data_format == 'channels_first' else self.size[0] * input_shape[1]
        width = self.size[1] * input_shape[3] if self.data_format == 'channels_first' else self.size[1] * input_shape[2]
        
        # Return the computed output shape as a tuple
        return tuple(input_shape[:2]) + (height, width)

    def call(self, inputs):
        # Get the shape of the input tensor
        input_shape = K.shape(inputs)
        
        # Compute the height and width of the output tensor based on the size and data format
        height = self.size[0] * input_shape[2] if self.data_format == 'channels_first' else self.size[0] * input_shape[1]
        width = self.size[1] * input_shape[3] if self.data_format == 'channels_first' else self.size[1] * input_shape[2]
        
        # Resize the input tensor using bilinear interpolation
        return tf.image.resize(inputs, [height, width], method=tf.image.ResizeMethod.BILINEAR)

    def get_config(self):
        # Get the configuration of the layer, including the size and data format
        config = {'size': self.size, 'data_format': self.data_format}
        
        # Get the base configuration from the superclass
        base_config = super(BilinearUpSampling2D, self).get_config()
        
        # Merge the base configuration and the layer-specific configuration
        return {**base_config, **config}

In [None]:
# Custom object needed for inference and training
custom_objects = {'BilinearUpSampling2D': BilinearUpSampling2D, 'depth_loss_function': None}

In [None]:
# Check if the method is set to "pretrained"
if method == "pretrained":
    # Load the pretrained model from the specified path
    model = load_model('/kaggle/input/depth-pretrained-models/nyu.h5', custom_objects=custom_objects, compile=False)
    
    # Print a message to indicate that the pretrained model has been loaded
    print("Loaded pretrained model.")

## Evaluate

In [None]:
def load_test_data():
    # Print a message to indicate that the test data loading has started
    print('Loading test data...', end='')
    
    # Load the RGB, depth, and crop data from the specified files
    rgb = np.load('/kaggle/input/nyu-test-data/eigen_test_rgb.npy')
    depth = np.load('/kaggle/input/nyu-test-data/eigen_test_depth.npy')
    crop = np.load('/kaggle/input/nyu-test-data/eigen_test_crop.npy')
    
    # Print a message to indicate that the test data has been successfully loaded
    print('Test data loaded.\n')
    
    # Return the loaded RGB, depth, and crop data
    return rgb, depth, crop

In [None]:
def DepthNorm(x, maxDepth):
    return maxDepth / x

In [None]:
def predict(model, images, minDepth=10, maxDepth=1000, batch_size=2):
    # Ensure the images have the correct dimensions
    images = np.atleast_3d(images)
    if images.shape[-1] != 3: 
        images = np.repeat(images[..., np.newaxis], 3, axis=-1)
    images = images[np.newaxis, ...] if images.ndim == 3 else images

    # Compute and normalize predictions
    predictions = model.predict(images, batch_size=batch_size)
    normalized_predictions = DepthNorm(predictions, maxDepth=1000)

    return np.clip(normalized_predictions, minDepth, maxDepth) / maxDepth

In [None]:
def scale_up(scale, images):
    # Create an empty list to store the scaled images
    scaled = []

    # Iterate over each image in the input list
    for img in images:
        # Compute the output shape of the scaled image based on the specified scale
        output_shape = (scale * img.shape[0], scale * img.shape[1])
        
        # Resize the image using the specified output shape and other parameters
        # Preserve the range of pixel values and apply reflection padding and anti-aliasing
        scaled.append(resize(img, output_shape, order=1, preserve_range=True, mode='reflect', anti_aliasing=True))

    # Stack the scaled images along a new axis to create a single array
    return np.stack(scaled)

In [None]:
def evaluate(model, rgb, depth, crop, batch_size=6):
    def compute_errors(gt, pred):
        # Compute the error metrics between the ground truth and predicted depth maps
        ratio = np.maximum(gt / pred, pred / gt)
        a1 = (ratio < 1.25).mean()
        a2 = (ratio < 1.25 ** 2).mean()
        a3 = (ratio < 1.25 ** 3).mean()
        abs_rel = np.mean(np.abs(gt - pred) / gt)
        rmse = np.sqrt(((gt - pred) ** 2).mean())
        log_10 = np.abs(np.log10(gt) - np.log10(pred)).mean()

        return a1, a2, a3, abs_rel, rmse, log_10

    # Initialize an array to store the depth scores for each evaluation sample
    depth_scores = np.zeros((6, len(rgb)))

    # Iterate over the evaluation samples in batches
    for i in range(len(rgb)//batch_size):    
        start, end = i*batch_size, (i+1)*batch_size

        # Get the RGB images and ground truth depth maps for the current batch
        x = rgb[start:end]
        true_y = depth[start:end]
        
        # Predict depth maps for the current batch using the model
        pred_y = scale_up(2, predict(model, x / 255, minDepth=10, maxDepth=1000, batch_size=batch_size)[:, :, :, 0]) * 10.0
        pred_y_flip = scale_up(2, predict(model, x[..., ::-1, :] / 255, minDepth=10, maxDepth=1000, batch_size=batch_size)[:, :, :, 0]) * 10.0

        # Crop the ground truth and predicted depth maps based on the specified crop region
        true_y = true_y[:, crop[0]:crop[1]+1, crop[2]:crop[3]+1]
        pred_y = pred_y[:, crop[0]:crop[1]+1, crop[2]:crop[3]+1]
        pred_y_flip = pred_y_flip[:, crop[0]:crop[1]+1, crop[2]:crop[3]+1]
        
        # Compute the error metrics for each evaluation sample in the batch
        for j, y in enumerate(true_y):
            errors = compute_errors(y, 0.5 * (pred_y[j] + np.fliplr(pred_y_flip[j])))
            depth_scores[:, start + j] = errors

    # Compute the mean error metrics over all evaluation samples
    e = depth_scores.mean(axis=1)

    # Print the error metrics
    print(f"{'a1':>10}, {'a2':>10}, {'a3':>10}, {'rel':>10}, {'rms':>10}, {'log_10':>10}")
    print(f"{e[0]:10.4f}, {e[1]:10.4f}, {e[2]:10.4f}, {e[3]:10.4f}, {e[4]:10.4f}, {e[5]:10.4f}")

In [None]:
rgb, depth, crop = load_test_data()
evaluate(model, rgb, depth, crop)

In [None]:
def load_images(image_files):
    # Load and preprocess the images from the specified list of image files
    loaded_images = [np.clip(np.asarray(Image.open(file), dtype=float) / 255, 0, 1) for file in image_files]
    
    # Stack the loaded images along a new axis to create a single array
    return np.stack(loaded_images, axis=0)

In [None]:
def to_multichannel(i):
    # Check if the input image is already in the multichannel format
    if i.shape[2] == 3:
        # If it is, return the input image as it is
        return i
    
    # If the input image has a single channel, repeat the channel values to create a multichannel image
    return np.repeat(i[:, :, 0][:, :, np.newaxis], 3, axis=2)

In [None]:
def display_images(outputs, inputs=None, gt=None, is_colormap=True, is_rescale=True):
    # Helper function to process an image by converting it to multichannel and resizing it
    def process_img(img, shape):
        img = to_multichannel(img)
        return resize(img, shape, preserve_range=True, mode='reflect', anti_aliasing=True)

    # Get the 'plasma' colormap from matplotlib
    plasma = plt.get_cmap('plasma')
    
    # Define the shape of the output images
    shape = (outputs[0].shape[0], outputs[0].shape[1], 3)

    # Initialize a list to store all the images
    all_images = []
    
    # Iterate over the outputs
    for i in range(outputs.shape[0]):
        # Initialize a list to store the images for the current output
        imgs = []
        
        # Add the input image to the list of images if available
        if isinstance(inputs, (list, tuple, np.ndarray)):
            imgs.append(process_img(inputs[i], shape))

        # Add the ground truth image to the list of images if available
        if isinstance(gt, (list, tuple, np.ndarray)):
            imgs.append(process_img(gt[i], shape))

        # Add the output image to the list of images
        if is_colormap:
            # If using a colormap, extract the output depth map and apply colormap
            rescaled = outputs[i][:,:,0]
            if is_rescale:
                rescaled = (rescaled - np.min(rescaled)) / np.max(rescaled)
            imgs.append(plasma(rescaled)[:,:,:3])
        else:
            # If not using a colormap, convert the output to multichannel
            imgs.append(to_multichannel(outputs[i]))

        # Concatenate the images horizontally and add them to the list of all images
        all_images.append(np.hstack(imgs))

    # Create a montage of all the images and return it
    return skimage.util.montage(np.stack(all_images), channel_axis=-1, fill=(0,0,0))

In [None]:
directory = '/kaggle/input/nyu-depth-v2/nyu_data/data/nyu2_test'

# Get the paths of files ending with "_colors.png" in the specified directory
color_file_paths = [
    os.path.join(directory, filename)
    for filename in os.listdir(directory)
    if filename.endswith('_colors.png')
]

# Get the total number of color file paths
total_size = len(color_file_paths)

# Select a specified number of color file paths randomly
select_size = 9
color_file_paths = random.choices(color_file_paths, k=select_size)

# Print the information about the selected random images
print("Selected ", select_size, " random images from a total of ", total_size)

In [None]:
# Input images
inputs = load_images(color_file_paths)
print('\nLoaded ({0}) images of size {1}.'.format(inputs.shape[0], inputs.shape[1:]))

# Compute results
outputs = predict(model, inputs)

# Display results
viz = display_images(outputs.copy(), inputs.copy())
plt.figure(figsize=(10,5))
plt.imshow(viz)
plt.axis('off')
plt.title('Random Test Images and Predicted Depth Maps')
# plt.savefig('test.png')
plt.show()