# Initial setup


In [None]:
global_debug_flag = False
is_colab = False
should_wandb = False

In [None]:
# Mount the Google Drive and import the necessary files if running on Google Colab
if is_colab:
    from google.colab import drive
    drive.mount("/content/drive")

    %mkdir datasets
    %mkdir datasets/nyu_data
    !unzip "/content/drive/MyDrive/datasets/nyu_data.zip" -d "/content/datasets/nyu_data"
    !unzip "/content/drive/MyDrive/datasets/nyu_test_data.zip" -d "/content/datasets/nyu_test_data"
    !unzip "/content/drive/MyDrive/datasets/endoslam_test_data.zip" -d "/content/datasets/endoslam_test_data"
    !unzip "/content/drive/MyDrive/datasets/kitti_test_data.zip" -d "/content/datasets/kitti_test_data"

In [None]:
# Install weights and biases
if is_colab:
    !pip install wandb -qU

In [None]:
# Import the necessary libraries
import numpy as np
import random
import os
import time
import math

import pathlib

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

from PIL import Image
from itertools import permutations

In [None]:
# Setup weights and biases
if should_wandb:
    import wandb

    wandb.login()

    wandb.init(
        project="leve",
    )

# The Data Loaders


## Utilities

A set of classes helpful to transform & augment the data


In [None]:
class RandomHorizontalFlip(object):
    def __init__(self, probability=0.5):
        self.probability = probability

    def __call__(self, sample):
        image, depth = sample["image"], sample["depth"]

        if not isinstance(image, Image.Image):
            raise TypeError("image should be of type PIL Image. Got {}".format(type(image)))
        if not isinstance(depth, Image.Image):
            raise TypeError("depth should be of type PIL Image. Got {}".format(type(depth)))

        if random.random() < self.probability:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            depth = depth.transpose(Image.FLIP_LEFT_RIGHT)

        return {"image": image, "depth": depth}

In [None]:
class RandomChannelSwap(object):
    def __init__(self, probability):
        self.probability = probability
        self.channels_permutations = list(permutations([0, 1, 2], 3))

    def __call__(self, sample):
        image, depth = sample["image"], sample["depth"]

        if not isinstance(image, Image.Image):
            raise TypeError("image should be of type PIL Image. Got {}".format(type(image)))
        if not isinstance(depth, Image.Image):
            raise TypeError("depth should be of type PIL Image. Got {}".format(type(depth)))

        if random.random() < self.probability:
            image = np.asarray(image)
            random_permutation = random.choice(self.channels_permutations)
            image = Image.fromarray(image[..., random_permutation])

        return {"image": image, "depth": depth}

In [None]:
class Resize(object):
    def __init__(self, resolution):
        self.resize = transforms.Resize(resolution)

    def __call__(self, sample):
        image, depth = sample["image"], sample["depth"]
        image = self.resize(image)
        depth = self.resize(depth)
        return {"image": image, "depth": depth}

In [None]:
class ToTensor(object):
    def __init__(self, is_train=True, max_depth=1000.0):
        self.is_train = is_train
        self.max_depth = max_depth

    def __call__(self, sample):
        image, depth = sample["image"], sample["depth"]
        to_tensor = transforms.ToTensor()

        # Transform the image to a tensor and normalize it within the range [0.0, 1.0]
        image = to_tensor(np.array(image).astype(np.float32) / 255.0)
        image = torch.clamp(image, 0.0, 1.0)

        # Transform the depth map to a tensor. The normalization depends on whether the sample is for training or not
        # For training, use the reciprocal of the depth as described by Ibraheem in [https://arxiv.org/abs/1812.11941]
        if self.is_train:
            # Start by transforming the depth map to a numpy array
            depth = np.array(depth).astype(np.float32)

            # Remember where the zero values are
            zero_mask = depth == 0.0

            # Transform the depth map to a tensor
            depth = to_tensor(depth)

            # Clamp the depth map pixels' values in the range [max_depth / 100.0, max_depth]
            # By doing this, there will be no zero valued pixel inside the depth map when applying the division
            depth = torch.clamp(depth, self.max_depth / 100.0, self.max_depth)

            # Apply the normalization related to the reciprocal of the depth
            depth = self.max_depth / depth

            # Bring back the initial zero values
            depth[:, zero_mask] = 0.0
        else:
            # For validation and testing no normalization is applied
            depth = to_tensor(np.array(depth).astype(np.float32))

        return {"image": image, "depth": depth}

## NYU Dataset Dataloader


In [None]:
def load_nyu_csv(is_debug=False):
    nyu2_train_csv_path = "datasets/nyu_data/data/nyu2_train.csv"
    nyu2_validation_csv_path = "datasets/nyu_data/data/nyu2_test.csv"

    # Each [nyu2_train.csv] & [nyu2_validation.csv] contains a rows of tuples of type (path_to_the_image, path_to_the_depth_map)
    with open(nyu2_train_csv_path, "r") as f_train, open(nyu2_validation_csv_path, "r") as f_validation:
        nyu2_train = [row.split(",") for row in f_train.read().split("\n") if len(row) > 0]
        nyu2_validation = [row.split(",") for row in f_validation.read().split("\n") if len(row) > 0]

    if is_debug:
        # If in debug mode, use a very small subset of the data
        nyu2_train = nyu2_train[:16]
        nyu2_validation = nyu2_validation[:16]

    print("Loaded {} train samples and {} validation samples".format(len(nyu2_train), len(nyu2_validation)))
    return nyu2_train, nyu2_validation

In [None]:
class NYUTrainDataset(Dataset):
    def __init__(self, split, transform=None, is_debug=False):
        # This is the dataset used in the training pipeline - it contains both training & validation data
        self.split = split
        self.transform = transform
        self.is_debug = is_debug

        if self.split == "train":
            self.nyu2_data, _ = load_nyu_csv(is_debug)
        else:
            _, self.nyu2_data = load_nyu_csv(is_debug)

    def __getitem__(self, idx):
        # Retreive the path to the image and the depth map
        path_to_the_image = self.nyu2_data[idx][0]
        path_to_the_depth_map = self.nyu2_data[idx][1]

        # Open the image and the depth map
        image = Image.open("datasets/nyu_data/" + path_to_the_image)
        depth = Image.open("datasets/nyu_data/" + path_to_the_depth_map)

        # Based on the split, transform the depth image in such a way that each pixel value represents the depth in meters
        depth = np.array(depth).astype(np.float32)
        if self.split == "train":
            # For training, normalize each pixel value first and then multiply it with 10 to get the depth in meters
            depth = depth / 255.0 * 10.0
        elif self.split == "validation":
            # For anything else other than training (validation or testing), divide each pixel value by 1000 to get the depth in meters
            depth = depth * 0.001

        # After doing all those transformations on the depth map, transform it back to an Image object, since this is what it is expected to be returned
        depth = Image.fromarray(depth)

        # Build the output
        sample = {"image": image, "depth": depth}
        if self.transform:
            sample = self.transform(sample)

        sample = {
            "image": sample["image"],
            "depth": sample["depth"],
            "image_path": "datasets/nyu_data/" + path_to_the_image,
            "depth_path": "datasets/nyu_data/" + path_to_the_depth_map,
        }
        return sample

    def __len__(self):
        return len(self.nyu2_data)

In [None]:
class NYUTestDataset(Dataset):
    def __init__(self):
        self.directory = "datasets/nyu_test_data/" if not is_colab else "datasets/nyu_test_data/nyu_test_data/"
        self.files = os.listdir(self.directory)
        self.files.sort()

    def __getitem__(self, idx):
        sample = np.load(self.directory + self.files[idx])
        image, depth = sample["image"], sample["depth"]
        image = np.array(image)
        depth = np.array(depth)

        return image, depth

    def __len__(self):
        return len(self.files)

In [None]:
def train_nyu_transform(resolution):
    return transforms.Compose([Resize(resolution), RandomHorizontalFlip(0.5), RandomChannelSwap(0.5), ToTensor(is_train=True, max_depth=10.0)])


def validation_nyu_transform(resolution):
    return transforms.Compose([Resize(resolution), ToTensor(is_train=False, max_depth=10.0)])

In [None]:
def get_nyu_dataset(split, resolution=(480, 640), is_debug=False):
    if split == "test":
        dataset = NYUTestDataset()
    elif split == "train":
        dataset = NYUTrainDataset(split=split, transform=train_nyu_transform(resolution), is_debug=is_debug)
    elif split == "validation":
        dataset = NYUTrainDataset(split=split, transform=validation_nyu_transform(resolution), is_debug=is_debug)

    return dataset

## Kitti Dataset Loader


In [None]:
class KITTIDataset(Dataset):
    def __init__(self):
        self.directory = "datasets/kitti_test_data/" if not is_colab else "datasets/kitti_test_data/kitti_test_data/"
        self.files = os.listdir(self.directory)
        self.files.sort()

    def __getitem__(self, idx):
        sample = np.load(self.directory + self.files[idx])
        image, depth = sample["image"], sample["depth"]
        image = np.array(image)
        depth = np.array(depth)

        return image, depth

    def __len__(self):
        return len(self.files)

In [None]:
def get_kitti_dataset():
    return KITTIDataset()

## EndoSLAM Dataset Loader


In [None]:
class EndoSLAMDataset(Dataset):
    def __init__(self):
        self.directory = "datasets/endoslam_test_data/" if not is_colab else "datasets/endoslam_test_data/endoslam_test_data/"
        self.frame_files = os.listdir(self.directory + "frames/")
        self.frame_files.sort()
        self.depth_files = os.listdir(self.directory + "depths/")
        self.depth_files.sort()

    def __getitem__(self, idx):
        frame = Image.open(self.directory + "frames/" + self.frame_files[idx])
        depth = Image.open(self.directory + "depths/" + self.depth_files[idx])

        frame = np.array(frame)
        depth = np.array(depth) / 255.0 + 1e-6

        return frame, depth

    def __len__(self):
        return len(self.frame_files)

In [None]:
def get_endoslam_dataset():
    return EndoSLAMDataset()

## Generic High-Level Data Loader


In [None]:
def get_dataloader(dataset_name, split="train", resolution=(480, 640), batch_size=8, is_debug=False):
    if dataset_name == "nyu":
        dataset = get_nyu_dataset(split=split, resolution=resolution, is_debug=is_debug)
    elif dataset_name == "kitti":
        dataset = get_kitti_dataset()
    elif dataset_name == "endoslam":
        dataset = get_endoslam_dataset()
    else:
        raise ValueError("Unknown dataset name: {}".format(dataset_name))

    shuffle_flag = False
    if split == "train" and is_debug == False:
        # Shuffle the data only if the dataset is used for training and it is not in debug mode
        shuffle_flag = True

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle_flag)
    return dataloader

# The Model


## Encoder - DDRNet_23_slim

Taken from: https://github.com/mic-rud/GuidedDecoding/blob/main/model/DDRNet_23_slim.py


In [None]:
BatchNorm2d = nn.BatchNorm2d
bn_mom = 0.1

In [None]:
def depthwise(in_channels, kernel_size):
    padding = (kernel_size - 1) // 2
    assert 2 * padding == kernel_size - 1, "parameters incorrect. kernel={}, padding={}".format(kernel_size, padding)
    return nn.Sequential(
        nn.Conv2d(in_channels, in_channels, kernel_size, stride=1, padding=padding, bias=False, groups=in_channels),
        nn.BatchNorm2d(in_channels),
        nn.ReLU(inplace=True),
    )

In [None]:
def pointwise(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 1, 1, 0, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    )

In [None]:
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, no_relu=False):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = BatchNorm2d(planes, momentum=bn_mom)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = BatchNorm2d(planes, momentum=bn_mom)
        self.downsample = downsample
        self.stride = stride
        self.no_relu = no_relu

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual

        if self.no_relu:
            return out
        else:
            return self.relu(out)

In [None]:
class Bottleneck(nn.Module):
    expansion = 2

    def __init__(self, inplanes, planes, stride=1, downsample=None, no_relu=True):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = BatchNorm2d(planes, momentum=bn_mom)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = BatchNorm2d(planes, momentum=bn_mom)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = BatchNorm2d(planes * self.expansion, momentum=bn_mom)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride
        self.no_relu = no_relu

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        if self.no_relu:
            return out
        else:
            return self.relu(out)

In [None]:
class DAPPM(nn.Module):
    def __init__(self, inplanes, branch_planes, outplanes):
        super(DAPPM, self).__init__()
        self.scale1 = nn.Sequential(
            nn.AvgPool2d(kernel_size=5, stride=2, padding=2),
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, branch_planes, kernel_size=1, bias=False),
        )
        self.scale2 = nn.Sequential(
            nn.AvgPool2d(kernel_size=9, stride=4, padding=4),
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, branch_planes, kernel_size=1, bias=False),
        )
        self.scale3 = nn.Sequential(
            nn.AvgPool2d(kernel_size=17, stride=8, padding=8),
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, branch_planes, kernel_size=1, bias=False),
        )
        self.scale4 = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, branch_planes, kernel_size=1, bias=False),
        )
        self.scale0 = nn.Sequential(
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, branch_planes, kernel_size=1, bias=False),
        )
        self.process1 = nn.Sequential(
            BatchNorm2d(branch_planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(branch_planes, branch_planes, kernel_size=3, padding=1, bias=False),
        )
        self.process2 = nn.Sequential(
            BatchNorm2d(branch_planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(branch_planes, branch_planes, kernel_size=3, padding=1, bias=False),
        )
        self.process3 = nn.Sequential(
            BatchNorm2d(branch_planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(branch_planes, branch_planes, kernel_size=3, padding=1, bias=False),
        )
        self.process4 = nn.Sequential(
            BatchNorm2d(branch_planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(branch_planes, branch_planes, kernel_size=3, padding=1, bias=False),
        )
        self.compression = nn.Sequential(
            BatchNorm2d(branch_planes * 5, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(branch_planes * 5, outplanes, kernel_size=1, bias=False),
        )
        self.shortcut = nn.Sequential(
            BatchNorm2d(inplanes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(inplanes, outplanes, kernel_size=1, bias=False),
        )

    def forward(self, x):
        width = x.shape[-1]
        height = x.shape[-2]
        x_list = []

        x_list.append(self.scale0(x))
        x_list.append(self.process1((F.interpolate(self.scale1(x), size=[height, width], mode="bilinear") + x_list[0])))
        x_list.append((self.process2((F.interpolate(self.scale2(x), size=[height, width], mode="bilinear") + x_list[1]))))
        x_list.append(self.process3((F.interpolate(self.scale3(x), size=[height, width], mode="bilinear") + x_list[2])))
        x_list.append(self.process4((F.interpolate(self.scale4(x), size=[height, width], mode="bilinear") + x_list[3])))
        out = self.compression(torch.cat(x_list, 1)) + self.shortcut(x)
        return out

In [None]:
class Segmenthead(nn.Module):
    def __init__(self, inplanes, interplanes, outplanes, scale_factor=None):
        super(Segmenthead, self).__init__()
        self.bn1 = BatchNorm2d(inplanes, momentum=bn_mom)
        self.conv1 = nn.Conv2d(inplanes, interplanes, kernel_size=3, padding=1, bias=False)
        self.bn2 = BatchNorm2d(interplanes, momentum=bn_mom)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(interplanes, outplanes, kernel_size=1, padding=0, bias=True)
        self.scale_factor = scale_factor

    def forward(self, x):
        x = self.conv1(self.relu(self.bn1(x)))
        out = self.conv2(self.relu(self.bn2(x)))

        if self.scale_factor is not None:
            height = x.shape[-2] * self.scale_factor
            width = x.shape[-1] * self.scale_factor
            out = F.interpolate(out, size=[height, width], mode="bilinear")

        return out

In [None]:
class DualResNet(nn.Module):
    def __init__(self, block, layers, out_features=19, planes=64, spp_planes=128, head_planes=128, augment=False, skip_out=False):
        super(DualResNet, self).__init__()

        highres_planes = planes * 2
        self.augment = augment
        self.skip_out = skip_out

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, planes, kernel_size=3, stride=2, padding=1),
            BatchNorm2d(planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(planes, planes, kernel_size=3, stride=2, padding=1),
            BatchNorm2d(planes, momentum=bn_mom),
            nn.ReLU(inplace=True),
        )

        self.relu = nn.ReLU(inplace=False)
        self.layer1 = self._make_layer(block, planes, planes, layers[0])
        self.layer2 = self._make_layer(block, planes, planes * 2, layers[1], stride=2)
        self.layer3 = self._make_layer(block, planes * 2, planes * 4, layers[2], stride=2)
        self.layer4 = self._make_layer(block, planes * 4, planes * 8, layers[3], stride=2)

        self.compression3 = nn.Sequential(
            nn.Conv2d(planes * 4, highres_planes, kernel_size=1, bias=False),
            BatchNorm2d(highres_planes, momentum=bn_mom),
        )
        self.compression4 = nn.Sequential(
            nn.Conv2d(planes * 8, highres_planes, kernel_size=1, bias=False),
            BatchNorm2d(highres_planes, momentum=bn_mom),
        )
        self.down3 = nn.Sequential(
            nn.Conv2d(highres_planes, planes * 4, kernel_size=3, stride=2, padding=1, bias=False),
            BatchNorm2d(planes * 4, momentum=bn_mom),
        )
        self.down4 = nn.Sequential(
            nn.Conv2d(highres_planes, planes * 4, kernel_size=3, stride=2, padding=1, bias=False),
            BatchNorm2d(planes * 4, momentum=bn_mom),
            nn.ReLU(inplace=True),
            nn.Conv2d(planes * 4, planes * 8, kernel_size=3, stride=2, padding=1, bias=False),
            BatchNorm2d(planes * 8, momentum=bn_mom),
        )

        self.layer3_ = self._make_layer(block, planes * 2, highres_planes, 2)
        self.layer4_ = self._make_layer(block, highres_planes, highres_planes, 2)
        self.layer5_ = self._make_layer(Bottleneck, highres_planes, highres_planes, 1)
        self.layer5 = self._make_layer(Bottleneck, planes * 8, planes * 8, 1, stride=2)
        self.spp = DAPPM(planes * 16, spp_planes, planes * 4)
        self.final_layer = Segmenthead(planes * 4, head_planes, out_features)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, inplanes, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion, momentum=bn_mom),
            )

        layers = []
        layers.append(block(inplanes, planes, stride, downsample))
        inplanes = planes * block.expansion
        for i in range(1, blocks):
            if i == (blocks - 1):
                layers.append(block(inplanes, planes, stride=1, no_relu=True))
            else:
                layers.append(block(inplanes, planes, stride=1, no_relu=False))

        return nn.Sequential(*layers)

    def forward(self, x):
        width_output = x.shape[-1] // 8
        height_output = x.shape[-2] // 8
        layers = []
        connections = [x]

        x = self.conv1(x)
        if self.skip_out:
            x1 = x

        x = self.layer1(x)
        layers.append(x)
        connections.append(x)

        x = self.layer2(self.relu(x))
        layers.append(x)

        x = self.layer3(self.relu(x))
        layers.append(x)
        x_ = self.layer3_(self.relu(layers[1]))

        x = x + self.down3(self.relu(x_))
        x_ = x_ + F.interpolate(self.compression3(self.relu(layers[2])), size=[height_output, width_output], mode="bilinear")

        x = self.layer4(self.relu(x))
        layers.append(x)
        x_ = self.layer4_(self.relu(x_))

        x = x + self.down4(self.relu(x_))
        x_ = x_ + F.interpolate(self.compression4(self.relu(layers[3])), size=[height_output, width_output], mode="bilinear")
        connections.append(x_)

        x_ = self.layer5_(self.relu(x_))
        x = F.interpolate(self.spp(self.layer5(self.relu(x))), size=[height_output, width_output], mode="bilinear")

        x_ = self.final_layer(x + x_)
        return connections, x_

In [None]:
def DualResNetBackbone(pretrained=False, features=64):
    model = DualResNet(BasicBlock, [2, 2, 2, 2], out_features=features, planes=32, spp_planes=128, head_planes=64, augment=False)
    if pretrained:
        if is_colab:
            checkpoint = torch.load("/content/drive/MyDrive/weights/" + "DDRNet23s_imagenet.pth", map_location="cpu")
        else:
            checkpoint = torch.load("weights/" + "DDRNet23s_imagenet.pth", map_location="cpu")

        model.load_state_dict(checkpoint, strict=False)
    return model

In [None]:
class Interpolate(nn.Module):
    def __init__(self, scale_factor, mode="bilinear"):
        super(Interpolate, self).__init__()
        self.scale_factor = scale_factor
        self.mode = mode

    def forward(self, x):
        return F.interpolate(x, self.scale_factor, mode=self.mode)

## Leve Encoder


In [None]:
class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.backbone = DualResNetBackbone(pretrained=True, features=64)

    def forward(self, x):
        return self.backbone(x)

## Leve Decoder


In [None]:
class LayeredConvolutionBlock(nn.Module):
    def __init__(self, num_input_features, num_output_features):
        super(LayeredConvolutionBlock, self).__init__()
        self.conv1 = nn.Conv2d(num_input_features, num_output_features, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(num_output_features, num_output_features, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(num_output_features)
        self.bn2 = nn.BatchNorm2d(num_output_features)
        self.relu = nn.ReLU(inplace=False)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))
        return x

In [None]:
class UpsamplingBlock(nn.Module):

    def __init__(self, num_input_features, num_output_features, connection_num_features, connection_upscale_factor):
        super(UpsamplingBlock, self).__init__()
        self.connection_conv = nn.Conv2d(connection_num_features, num_input_features, kernel_size=1, padding=0)
        self.dual_conv = LayeredConvolutionBlock(num_input_features, num_input_features)
        self.result_conv = nn.Conv2d(num_input_features, num_output_features, kernel_size=1, padding=0)
        self.connection_upscale_factor = connection_upscale_factor

    def forward(self, x, connection):
        upsampled_x = F.interpolate(x, scale_factor=2, mode="bilinear")
        x = self.dual_conv(upsampled_x)

        if connection is not None:
            if self.connection_upscale_factor > 1:
                connection = F.interpolate(connection, scale_factor=self.connection_upscale_factor, mode="bilinear")
            connection = self.connection_conv(connection)
            x = x + connection

        x = x + upsampled_x
        x = self.result_conv(x)
        return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, num_features, connection_num_features, connection_upscale_factor):
        super(DecoderLayer, self).__init__()
        self.upsampling_block1 = UpsamplingBlock(
            num_input_features=num_features[0],
            num_output_features=num_features[1],
            connection_num_features=connection_num_features[0],
            connection_upscale_factor=connection_upscale_factor[0],
        )
        self.upsampling_block2 = UpsamplingBlock(
            num_input_features=num_features[1],
            num_output_features=num_features[2],
            connection_num_features=connection_num_features[1],
            connection_upscale_factor=connection_upscale_factor[1],
        )
        self.upsampling_block3 = UpsamplingBlock(
            num_input_features=num_features[2],
            num_output_features=1,
            connection_num_features=connection_num_features[2],
            connection_upscale_factor=connection_upscale_factor[2],
        )

    def forward(self, x, connections):
        x = self.upsampling_block1(x, None)
        x = self.upsampling_block2(x, None)
        x = self.upsampling_block3(x, None)
        return x

## Leve


In [None]:
class Leve(nn.Module):
    def __init__(self):
        super(Leve, self).__init__()
        self.encoder = EncoderLayer()
        self.decoder = DecoderLayer(num_features=[64, 32, 16], connection_num_features=[64, 32, 3], connection_upscale_factor=[1, 1, 1])

    def forward(self, x):
        connections, x = self.encoder(x)
        x = self.decoder(x, connections)
        return x

## Utilities


In [None]:
def get_model(state_path):
    model = Leve()
    if state_path != "":
        model_state_dict = torch.load(state_path, map_location="cpu")
        model.load_state_dict(model_state_dict)
    return model

# The Losses

Depth Loss by Alhashim et al. [https://arxiv.org/abs/1812.11941]
<br>
Some PyTorch code parts taken from [https://arxiv.org/abs/2203.04206] - [https://github.com/mic-rud/GuidedDecoding/blob/main/losses.py]


## SSIM Custom Loss


In [None]:
def gaussian(window_size, sigma):
    gauss = torch.Tensor([math.exp(-((x - window_size // 2) ** 2) / float(2 * sigma**2)) for x in range(window_size)])
    return gauss / gauss.sum()

In [None]:
def create_window(window_size, channel=1):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
    return window

In [None]:
def ssim(img1, img2, val_range, window_size=11, window=None, size_average=True, full=False):
    L = val_range

    padd = 0
    (_, channel, height, width) = img1.size()
    if window is None:
        real_size = min(window_size, height, width)
        window = create_window(real_size, channel=channel).to(img1.device)
        padd = window_size // 2

    mu1 = F.conv2d(img1, window, padding=padd, groups=channel)
    mu2 = F.conv2d(img2, window, padding=padd, groups=channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1 * mu2

    sigma1_sq = F.conv2d(img1 * img1, window, padding=padd, groups=channel) - mu1_sq
    sigma2_sq = F.conv2d(img2 * img2, window, padding=padd, groups=channel) - mu2_sq
    sigma12 = F.conv2d(img1 * img2, window, padding=padd, groups=channel) - mu1_mu2

    C1 = (0.01 * L) ** 2
    C2 = (0.03 * L) ** 2

    v1 = 2.0 * sigma12 + C2
    v2 = sigma1_sq + sigma2_sq + C2
    cs = torch.mean(v1 / v2)

    ssim_map = ((2 * mu1_mu2 + C1) * v1) / ((mu1_sq + mu2_sq + C1) * v2)

    if size_average:
        ret = ssim_map.mean()
    else:
        ret = ssim_map.mean(1).mean(1).mean(1)

    if full:
        return ret, cs

    return ret

## Gradient Custom Loss


In [None]:
def gradient(x):
    """
    idea from tf.image.image_gradients(image)
    https://github.com/tensorflow/tensorflow/blob/r2.1/tensorflow/python/ops/image_ops_impl.py#L3441-L3512
    """
    left = x
    right = F.pad(x, [0, 1, 0, 0])[:, :, :, 1:]
    top = x
    bottom = F.pad(x, [0, 0, 0, 1])[:, :, 1:, :]

    dx, dy = right - left, bottom - top

    # dx will always have zeros in the last column, right-left
    # dy will always have zeros in the last row,    bottom-top
    dx[:, :, :, -1] = 0
    dy[:, :, -1, :] = 0

    return dx, dy

In [None]:
def gradient_loss(gen_frames, gt_frames, alpha=1):
    gen_dx, gen_dy = gradient(gen_frames)
    gt_dx, gt_dy = gradient(gt_frames)

    grad_diff_x = torch.abs(gt_dx - gen_dx)
    grad_diff_y = torch.abs(gt_dy - gen_dy)

    grad_comb = grad_diff_x**alpha + grad_diff_y**alpha

    return torch.mean(grad_comb)

## The Actual Loss


In [None]:
class Loss:
    def __init__(self, alpha, beta, gamma, max_depth=10.0):
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.max_depth = max_depth
        self.L1_loss = nn.L1Loss()

    def __call__(self, output, target, knowledge=None):
        l_depth1 = self.L1_loss(output, target)
        l_ssim1 = torch.clamp(1 - ssim(output, target, self.max_depth) * 0.5, 0, 1)
        l_grad1 = gradient_loss(output, target)
        loss1 = self.alpha * l_depth1 + self.beta * l_ssim1 + self.gamma * l_grad1
        return loss1

# Metrics

Code from FastDepth
Diana Wofk et al, FastDepth: Fast Monocular Depth
Estimation on Embedded Devices, International Conference on Robotics and
Automation (ICRA), 2019
https://github.com/dwofk/fast-depth


In [None]:
def log10(x):
    """Convert a new tensor with the base-10 logarithm of the elements of x."""
    return torch.log(x) / math.log(10)

In [None]:
class Result(object):
    def __init__(self):
        self.irmse, self.imae = 0, 0
        self.mse, self.rmse, self.mae = 0, 0, 0
        self.absrel, self.lg10 = 0, 0
        self.delta1, self.delta2, self.delta3 = 0, 0, 0
        self.data_time, self.gpu_time = 0, 0
        self.rmse_log = 0

    def set_to_worst(self):
        self.irmse, self.imae = np.inf, np.inf
        self.mse, self.rmse, self.mae = np.inf, np.inf, np.inf
        self.rmse_log = np.inf
        self.absrel, self.lg10 = np.inf, np.inf
        self.delta1, self.delta2, self.delta3 = 0, 0, 0
        self.data_time, self.gpu_time = 0, 0

    def update(self, irmse, imae, mse, rmse, mae, rmse_log, absrel, lg10, delta1, delta2, delta3, gpu_time, data_time):
        self.irmse, self.imae = irmse, imae
        self.mse, self.rmse, self.mae = mse, rmse, mae
        self.rmse_log = rmse_log
        self.absrel, self.lg10 = absrel, lg10
        self.delta1, self.delta2, self.delta3 = delta1, delta2, delta3
        self.data_time, self.gpu_time = data_time, gpu_time

    def evaluate(self, output, target):
        abs_diff = (output - target).abs()

        self.mse = float((torch.pow(abs_diff, 2)).mean())
        self.rmse = math.sqrt(self.mse)
        self.mae = float(abs_diff.mean())
        self.lg10 = float((log10(output) - log10(target)).abs().mean())
        self.rmse_log = math.sqrt(torch.pow(log10(output) - log10(target), 2).mean())
        self.absrel = float((abs_diff / target).mean())

        maxRatio = torch.max(output / target, target / output)
        self.delta1 = float((maxRatio < 1.25).float().mean())
        self.delta2 = float((maxRatio < 1.25**2).float().mean())
        self.delta3 = float((maxRatio < 1.25**3).float().mean())
        self.data_time = 0
        self.gpu_time = 0

        inv_output = 1 / output
        inv_target = 1 / target
        abs_inv_diff = (inv_output - inv_target).abs()
        self.irmse = math.sqrt((torch.pow(abs_inv_diff, 2)).mean())
        self.imae = float(abs_inv_diff.mean())

In [None]:
class AverageMeter(object):
    def __init__(self):
        self.reset()

    def reset(self):
        self.count = 0.0

        self.sum_irmse, self.sum_imae = 0, 0
        self.sum_mse, self.sum_rmse, self.sum_mae = 0, 0, 0
        self.sum_rmse_log = 0
        self.sum_absrel, self.sum_lg10 = 0, 0
        self.sum_delta1, self.sum_delta2, self.sum_delta3 = 0, 0, 0
        self.sum_data_time, self.sum_gpu_time = 0, 0

    def update(self, result, gpu_time, data_time, n=1):
        self.count += n

        self.sum_irmse += n * result.irmse
        self.sum_imae += n * result.imae
        self.sum_mse += n * result.mse
        self.sum_rmse += n * result.rmse
        self.sum_rmse_log += n * result.rmse_log
        self.sum_mae += n * result.mae
        self.sum_absrel += n * result.absrel
        self.sum_lg10 += n * result.lg10
        self.sum_delta1 += n * result.delta1
        self.sum_delta2 += n * result.delta2
        self.sum_delta3 += n * result.delta3
        self.sum_data_time += n * data_time
        self.sum_gpu_time += n * gpu_time

    def average(self):
        avg = Result()
        avg.update(
            self.sum_irmse / self.count,
            self.sum_imae / self.count,
            self.sum_mse / self.count,
            self.sum_rmse / self.count,
            self.sum_mae / self.count,
            self.sum_rmse_log / self.count,
            self.sum_absrel / self.count,
            self.sum_lg10 / self.count,
            self.sum_delta1 / self.count,
            self.sum_delta2 / self.count,
            self.sum_delta3 / self.count,
            self.sum_gpu_time / self.count,
            self.sum_data_time / self.count,
        )
        return avg

# Training


## Initialization


In [None]:
# Training session model details
pretrained_checkpoint_path = ""
state_path = ""
name = "leve_no_skip_connections"

In [None]:
# Training sessions dataset details
dataset_name = "nyu"
resolution = (240, 320)
max_depth = 10.0

In [None]:
# Create directories to save the necessary information for the training session
if pretrained_checkpoint_path == "":
    id = name + "__" + time.strftime("%Y_%m_%d-%H_%M", time.localtime())
    root = "/content/drive/MyDrive/models/trained_models/" if is_colab else "models/trained_models/"
    model_dir = root + id
else:
    model_dir = pretrained_checkpoint_path.split("/checkpoints")[0]

checkpoints_dir = model_dir + "/checkpoints"
results_dir = model_dir + "/results"

pathlib.Path(model_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(checkpoints_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(results_dir).mkdir(parents=True, exist_ok=True)

print("Output: " + model_dir)

In [None]:
# Load the model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = get_model(state_path=state_path).to(device)
print("Model loaded")

In [None]:
# Training sessions parameters
epoch = 0
num_epochs = 20
batch_size = 8 if is_colab else 2
val_losses = []
loss = Loss(alpha=0.1, beta=1, gamma=1, max_depth=max_depth)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
# Load the dataloaders
train_loader = get_dataloader(dataset_name, split="train", resolution=resolution, batch_size=batch_size, is_debug=global_debug_flag)
validation_loader = get_dataloader(dataset_name, split="validation", resolution=resolution, batch_size=batch_size, is_debug=global_debug_flag)

In [None]:
# Load the checkpoint if necessary
if pretrained_checkpoint_path != "":
    checkpoint = torch.load(pretrained_checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint["model"])
    optimizer.load_state_dict(checkpoint["optimizer"])
    scheduler.load_state_dict(checkpoint["scheduler"])
    val_losses = checkpoint["val_losses"]
    epoch = checkpoint["epoch"]
    epoch_val_loss = checkpoint["epoch_val_loss"]
    rmse = checkpoint["rmse"]

## Utilities


In [None]:
# Functions to normalize and denormalize the depth maps using the previously mentioned reciprocal of the depth
def depth_norm(depth):
    zero_mask = depth == 0
    depth = torch.clamp(depth, max_depth / 100, max_depth)
    depth = max_depth / depth
    depth[zero_mask] = 0.0
    return depth


def inverse_depth_norm(depth):
    zero_mask = depth == 0
    depth = max_depth / depth
    depth = torch.clamp(depth, max_depth / 100, max_depth)
    depth[zero_mask] = 0.0
    return depth

In [None]:
# Function used to visualize results (taken from [https://github.com/mic-rud/GuidedDecoding/blob/main/training.py])
def show_images(image, target, output):
    import matplotlib.pyplot as plt

    image_np = image[0].cpu().permute(1, 2, 0).numpy()
    target[0, 0, target[0, 0] == 100.0] = 0.1

    _, axs = plt.subplots(1, 3, figsize=(15, 5))
    axs[0].imshow(image_np)
    axs[0].set_title("Image")
    axs[1].imshow(target[0, 0].cpu())
    axs[1].set_title("Target")
    axs[2].imshow(output[0, 0].detach().cpu())
    axs[2].set_title("Output")

    plt.show()

In [None]:
def save_checkpoint(epoch_val_loss, rmse):
    checkpoint_file = os.path.join(checkpoints_dir, "checkpoint_{}.pth".format(epoch))
    torch.save(
        {
            "epoch": epoch + 1,
            "val_losses": val_losses,
            "model": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "scheduler": scheduler.state_dict(),
            "epoch_val_loss": epoch_val_loss,
            "rmse": rmse,
        },
        checkpoint_file,
    )

    current_time = time.strftime("%H:%M", time.localtime())
    print("{} - Model saved".format(current_time))

In [None]:
def save_model():
    # Choose the one with the best RMSE
    checkpoints = os.listdir(checkpoints_dir)
    checkpoints = [int(checkpoint.split(".")[0].split("_")[1]) for checkpoint in checkpoints]
    best_epoch = 0
    best_rmse = np.inf

    for checkpoint in checkpoints:
        checkpoint_pth = os.path.join(checkpoints_dir, "checkpoint_{}.pth".format(checkpoint))
        checkpoint = torch.load(checkpoint_pth)
        if checkpoint["rmse"] < best_rmse:
            best_rmse = checkpoint["rmse"]
            best_epoch = checkpoint["epoch"] - 1

    best_checkpoint_pth = os.path.join(checkpoints_dir, "checkpoint_{}.pth".format(best_epoch))

    best_model_pth = os.path.join(results_dir, "best_model.pth")
    checkpoint = torch.load(best_checkpoint_pth)

    torch.save(checkpoint["model"], best_model_pth)
    print("Model saved.")

## Validation


In [None]:
def validation_loop(is_end_of_epoch=False):
    torch.cuda.empty_cache()
    model.eval()
    accumulated_loss = 0.0
    average_meter = AverageMeter()

    with torch.no_grad():
        for i, data in enumerate(validation_loader):
            t0 = time.time()
            image, target = data["image"].to(device), data["depth"].to(device)
            data_time = time.time() - t0

            t0 = time.time()
            output = model(image)
            # The model is trained to predict inverse depth due to the reciprocal of the depth
            # So to get the actual depth map, the inverse of the output is taken
            real_output = inverse_depth_norm(output)
            gpu_time = time.time() - t0

            # In the case of the validation data, the target is not normalized, so it is necessary to normalize it before calculating the loss
            loss_value = loss(output, depth_norm(target))
            accumulated_loss += loss_value.item()

            if i == 0:
                show_images(image, target, real_output)

            result = Result()
            result.evaluate(real_output.data, target.data)
            average_meter.update(result, gpu_time, data_time, image.size(0))

    avg = average_meter.average()
    current_time = time.strftime("%H:%M", time.localtime())
    average_loss = accumulated_loss / (len(validation_loader) + 1)
    val_losses.append(average_loss)
    print("{} - Average Validation Loss: {:3.4f}".format(current_time, average_loss))

    print(
        "RMSE={average.rmse:.3f}\n"
        "MAE={average.mae:.3f}\n"
        "Delta1={average.delta1:.3f}\n"
        "Delta2={average.delta2:.3f}\n"
        "Delta3={average.delta3:.3f}\n"
        "REL={average.absrel:.3f}\n"
        "Lg10={average.lg10:.3f}\n"
        "t_GPU={time:.3f}\n"
        "*********************\n".format(average=avg, time=avg.gpu_time)
    )

    if should_wandb:
        wandb.log(
            {
                "validation_loss": average_loss,
                "rmse": avg.rmse,
                "mae": avg.mae,
                "delta1": avg.delta1,
                "delta2": avg.delta2,
                "delta3": avg.delta3,
                "rel": avg.absrel,
                "lg10": avg.lg10,
                "t_GPU": gpu_time,
            }
        )

    if is_end_of_epoch:
        save_checkpoint(average_loss, avg.rmse)

## Actual Training


In [None]:
def training_loop():
    model.train()
    accumulated_loss = 0.0

    for i, data in enumerate(train_loader):
        image, target = data["image"].to(device), data["depth"].to(device)

        optimizer.zero_grad()
        output = model(image)
        loss_value = loss(output, target)
        loss_value.backward()
        optimizer.step()
        accumulated_loss += loss_value.item()

        print("Batch: {} - Loss: {:3.4f}".format(i, loss_value.item()))
        if i % 1000 == 0 and i != 0:
            validation_loop(is_end_of_epoch=False)

    current_time = time.strftime("%H:%M", time.localtime())
    average_loss = accumulated_loss / (len(train_loader) + 1)
    print("{} - Average Training Loss: {:3.4f}".format(current_time, average_loss))

In [None]:
# Train the model
torch.cuda.empty_cache()

for epoch in range(epoch, num_epochs):
    current_time = time.strftime("%H:%M", time.localtime())
    print("{} - Epoch {}".format(current_time, epoch))
    training_loop()
    validation_loop(is_end_of_epoch=True)

save_model()
if should_wandb:
    wandb.finish()

# Evaluation


## Generic evaluation class


In [None]:
class Evaluation:
    def __init__(self, dataset_name, resolution_required_by_model, dataset_resolution, max_depth, crop, model_dir):
        # Initialization parameters
        self.dataset_name = dataset_name
        self.resolution_required_by_model = resolution_required_by_model
        self.max_depth = max_depth
        self.crop = crop
        self.dataset_resolution = dataset_resolution

        # Get / Create the corresponding directories
        root = "/content/drive/MyDrive/models/trained_models/" if is_colab else "models/trained_models/"
        self.model_dir = root + model_dir
        self.results_dir = self.model_dir + "/" + dataset_name + "_results"
        pathlib.Path(self.results_dir).mkdir(parents=True, exist_ok=True)

        # Load the model
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.load_model()

        # Load the dataloader
        self.test_loader = get_dataloader(dataset_name, split="test", resolution=dataset_resolution, batch_size=1, is_debug=False)

        # Utilities
        self.downscale_image = transforms.Resize(resolution_required_by_model)
        self.to_tensor = ToTensor(is_train=False, max_depth=max_depth)
        self.images_to_visualize = [0, 100, 200, 300, 400, 500, 600]

    def load_model(self):
        state_path = self.model_dir + "/results/best_model.pth"
        self.model = get_model(state_path=state_path).to(self.device)
        print("Model loaded")

    def save_results(self, average):
        results_file = os.path.join(self.results_dir, "results.txt")
        with open(results_file, "w") as f:
            f.write("RMSE,MAE,REL, RMSE_log,Lg10,Delta1,Delta2,Delta3\n")
            f.write(
                "{average.rmse:.3f}"
                ",{average.mae:.3f}"
                ",{average.absrel:.3f}"
                ",{average.rmse_log:.3f}"
                ",{average.lg10:.3f}"
                ",{average.delta1:.3f}"
                ",{average.delta2:.3f}"
                ",{average.delta3:.3f}".format(average=average)
            )

    def depth_norm(self, depth):
        zero_mask = depth == 0
        depth = torch.clamp(depth, self.max_depth / 100, self.max_depth)
        depth = self.max_depth / depth
        depth[zero_mask] = 0.0
        return depth

    def inverse_depth_norm(self, depth):
        zero_mask = depth == 0
        depth = self.max_depth / depth
        depth = torch.clamp(depth, self.max_depth / 100, self.max_depth)
        depth[zero_mask] = 0.0
        return depth

    def save_image_results(self, image, target, output, image_id):
        # Function taken from [https://github.com/mic-rud/GuidedDecoding/blob/main/inference.py]
        import matplotlib.pyplot as plt

        image = image[0].permute(1, 2, 0).cpu()
        target = target[0, 0].permute(0, 1).cpu()
        output = output[0, 0].permute(0, 1).detach().cpu()

        error_map = target - output
        vmax_error = self.max_depth / 10.0
        vmin_error = 0.0
        cmap = "viridis"

        vmax = torch.max(target[target != 0.0])
        vmin = torch.min(target[target != 0.0])

        # Save the image
        save_file = os.path.join(self.results_dir, "image_{}.png".format(image_id))
        fig = plt.figure(frameon=False)
        ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0])
        ax.set_axis_off()
        fig.add_axes(ax)
        ax.imshow(image)
        fig.savefig(save_file)
        plt.clf()

        # Save the errors
        save_file = os.path.join(self.results_dir, "errors_{}.png".format(image_id))
        fig = plt.figure(frameon=False)
        ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0])
        ax.set_axis_off()
        fig.add_axes(ax)
        errors = ax.imshow(error_map, vmin=vmin_error, vmax=vmax_error, cmap="Reds")
        fig.colorbar(errors, ax=ax, shrink=0.8)
        fig.savefig(save_file)
        plt.clf()

        # Save the ground truth
        save_file = os.path.join(self.results_dir, "target_{}.png".format(image_id))
        fig = plt.figure(frameon=False)
        ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0])
        ax.set_axis_off()
        fig.add_axes(ax)
        ax.imshow(target, vmin=vmin, vmax=vmax, cmap=cmap)
        fig.savefig(save_file)
        plt.clf()

        # Save the output
        save_to_dir = os.path.join(self.results_dir, "output_{}.png".format(image_id))
        fig = plt.figure(frameon=False)
        ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0])
        ax.set_axis_off()
        fig.add_axes(ax)
        ax.imshow(output, vmin=vmin, vmax=vmax, cmap=cmap)
        fig.savefig(save_to_dir)
        plt.clf()

        # Close the figures
        plt.close("all")

    def test_inference_speed(self):
        torch.cuda.empty_cache()
        times = 0.0

        warm_up_runs = 10
        num_test_runs = 200
        for i in range(num_test_runs + warm_up_runs):
            if i == warm_up_runs:
                times = 0.0

            x = torch.randn([2, 3, *self.dataset_resolution]).to(self.device)
            torch.cuda.synchronize()
            t0 = time.time()
            _ = self.model(x)
            torch.cuda.synchronize()
            times += time.time() - t0

        times = times / num_test_runs
        fps = 1 / times
        print("[PyTorch] Runtime: {}s".format(times))
        print("[PyTorch] FPS: {}\n".format(fps))
        print("Number of parameters: {}".format(sum(p.numel() for p in self.model.parameters() if p.requires_grad)))

    def evaluate(self):
        self.model.eval()
        average_meter = AverageMeter()

        for i, data in enumerate(self.test_loader):
            # Load the data
            t0 = time.time()
            sample = self.to_tensor({"image": data[0][0], "depth": data[1][0]})
            image, target = sample["image"].to(self.device), sample["depth"].to(self.device)

            image = self.downscale_image(image.unsqueeze(0))
            image_flip = self.downscale_image(torch.flip(image, [3]))

            target = target.unsqueeze(0)
            target_flip = torch.flip(target, [3])

            data_time = time.time() - t0

            # Forward pass
            t0 = time.time()

            output = self.model(image)
            real_output = self.inverse_depth_norm(output)

            output_flip = self.model(image_flip)
            real_output_flip = self.inverse_depth_norm(output_flip)

            gpu_time = time.time() - t0

            # Prepare the results
            # Only the input image that goes through the model is downscaled
            # This implies that a function to upscale the output of the model to the original size is needed
            upscale_image = transforms.Resize(target.shape[-2:])
            real_output = upscale_image(real_output)
            real_output_flip = upscale_image(real_output_flip)

            if i in self.images_to_visualize:
                self.save_image_results(image, target, real_output, i)

            # Apply the crop
            target = target[:, :, self.crop[0] : self.crop[1], self.crop[2] : self.crop[3]]
            target_flip = target_flip[:, :, self.crop[0] : self.crop[1], self.crop[2] : self.crop[3]]
            real_output = real_output[:, :, self.crop[0] : self.crop[1], self.crop[2] : self.crop[3]]
            real_output_flip = real_output_flip[:, :, self.crop[0] : self.crop[1], self.crop[2] : self.crop[3]]

            # Update the results
            result = Result()
            result.evaluate(real_output.data, target.data)
            average_meter.update(result, gpu_time, data_time, image.size(0))

            result_flip = Result()
            result_flip.evaluate(real_output_flip.data, target_flip.data)
            average_meter.update(result_flip, gpu_time, data_time, image.size(0))

        # Report the results
        avg = average_meter.average()
        self.save_results(avg)
        print(
            "RMSE={average.rmse:.3f}\n"
            "MAE={average.mae:.3f}\n"
            "Delta1={average.delta1:.3f}\n"
            "Delta2={average.delta2:.3f}\n"
            "Delta3={average.delta3:.3f}\n"
            "REL={average.absrel:.3f}\n"
            "Lg10={average.lg10:.3f}\n"
            "t_GPU={time:.3f}\n"
            "*********************\n".format(average=avg, time=avg.gpu_time)
        )

## Parameters


In [None]:
my_resolution_required_by_model = (240, 320)
my_model_dir = "leve_no_skip_connections"

## NYU Depth V2


In [None]:
nyu_evaluator = Evaluation(
    dataset_name="nyu",
    resolution_required_by_model=my_resolution_required_by_model,
    dataset_resolution=(480, 640),
    max_depth=10.0,
    crop=[20, 460, 24, 616],
    model_dir=my_model_dir,
)

In [None]:
nyu_evaluator.evaluate()

In [None]:
nyu_evaluator.test_inference_speed()

## Kitti


In [None]:
kitti_evaluator = Evaluation(
    dataset_name="kitti",
    resolution_required_by_model=my_resolution_required_by_model,
    dataset_resolution=(384, 1280),
    max_depth=80.0,
    crop=[128, 381, 45, 1196],
    model_dir=my_model_dir,
)

In [None]:
kitti_evaluator.evaluate()

## EndoSLAM


In [None]:
endoslam_evaluator = Evaluation(
    dataset_name="endoslam",
    resolution_required_by_model=my_resolution_required_by_model,
    dataset_resolution=(320, 320),
    max_depth=1,
    crop=[0, 320, 0, 320],
    model_dir=my_model_dir,
)

In [None]:
endoslam_evaluator.evaluate()