In [None]:
# kitti dataset
!gdown --id '14P9-IjC63F-CNqbypmD1d2m4tJV7xVQw' --output KITTI_small_data.zip

Downloading...
From: https://drive.google.com/uc?id=14P9-IjC63F-CNqbypmD1d2m4tJV7xVQw
To: /content/KITTI_small_data.zip
4.98GB [00:46, 108MB/s] 


In [None]:
import zipfile
from zipfile import ZipFile
with zipfile.ZipFile("KITTI_small_data.zip","r") as zip_ref:
    zip_ref.extractall("targetdir")

Data

In [None]:
import numpy as np
import torch
import random

from torch.utils.data import Dataset, DataLoader
from PIL import Image


class KITTI_TrainAugmentDataset(Dataset):
    def __init__(self, files, inpainted_directory, transform=None):
        self.files = files
        self.inpainted_directory = inpainted_directory

    def __getitem__(self, idx):
        # sample is a list containing names of a RGB image and corresponding depth image
        directory, image_ID = self.files[idx]
        # PIL.Image.open() Opens and identifies the given image file.
        image = Image.open(os.path.join(self.inpainted_directory,directory,"rgb_image03_"+image_ID)) 
        depth = Image.open(os.path.join(self.inpainted_directory,directory,"d_image03_"+image_ID)) 
        # Augmentation: random horizontal flip
        if random.random() < 0.5:
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            depth = depth.transpose(Image.FLIP_LEFT_RIGHT)
        # resize the depth ground truth from (1280,384) to (640,192) that is consist with the size of the predicted output.
        image = image.resize((640,192))
        depth = depth.resize((320,96))
        
        image = torch.clamp(torch.from_numpy(np.array(image).reshape(192,640,3)).float()/255,0,1).permute(2, 0, 1)
        depth = torch.clamp(torch.from_numpy(np.array(depth).reshape(96,320,1)).float()/256-5,0,80).permute(2, 0, 1)
        sample = {'image': image, 'depth': depth}
        return sample

    def __len__(self):
        return len(self.files)


class KITTI_TestDataset(Dataset):
    def __init__(self, files, val_inpainted_directory, transform=None):
        self.files = files
        self.inpainted_directory = val_inpainted_directory

    def __getitem__(self, idx):
        # sample is a list containing names of a RGB image and corresponding depth image
        directory, image_ID = self.files[idx]
        # PIL.Image.open() Opens and identifies the given image file.
        image = Image.open(os.path.join(self.inpainted_directory,directory,"rgb_image03_"+image_ID)) 
        depth = Image.open(os.path.join(self.inpainted_directory,directory,"d_image03_"+image_ID))  
        # resize the depth ground truth from (1280,384) to (640,192) that is consist with the size of the predicted output.
        image = image.resize((640,192))
        depth = depth.resize((320,96))
        
        image = torch.clamp(torch.from_numpy(np.array(image).reshape(192,640,3)).float()/255,0,1).permute(2, 0, 1)
        depth = torch.clamp(torch.from_numpy(np.array(depth).reshape(96,320,1)).float()/256-5,0,80).permute(2, 0, 1)
        sample = {'image': image, 'depth': depth}
        return sample

    def __len__(self):
        return len(self.files)

loss

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from math import exp

def L1loss(y_pred, y_true):
    l1_criterion = nn.L1Loss()
    l_depth = l1_criterion(y_pred, y_true)
    return l_depth

def lossgradient(y_pred, y_true):
    # y_true gradient step=1
    y_pred_left = y_pred
    y_pred_right = F.pad(y_pred, [0, 1, 0, 0])[:, :, :, 1:]
    y_pred_top = y_pred
    y_pred_bottom = F.pad(y_pred, [0, 0, 0, 1])[:, :, 1:, :]
    # dx, dy = torch.abs(right - left), torch.abs(bottom - top)
    dx_pred, dy_pred = y_pred_right - y_pred_left, y_pred_bottom - y_pred_top 
    dx_pred[:, :, :, -1] = 0
    dy_pred[:, :, -1, :] = 0

    # y_true gradient step=1
    y_true_left = y_true
    y_true_right = F.pad(y_true, [0, 1, 0, 0])[:, :, :, 1:]
    y_true_top = y_true
    y_true_bottom = F.pad(y_true, [0, 0, 0, 1])[:, :, 1:, :]
    # dx, dy = torch.abs(right - left), torch.abs(bottom - top)
    dx_true, dy_true = y_true_right - y_true_left, y_true_bottom - y_true_top 
    # dx will always have zeros in the last column, right-left
    # dy will always have zeros in the last row,    bottom-top
    dx_true[:, :, :, -1] = 0
    dy_true[:, :, -1, :] = 0
    
    l_edges = torch.mean(torch.abs(dy_pred - dy_true) + torch.abs(dx_pred - dx_true))
    return l_edges

def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size//2)**2/float(2*sigma**2)) for x in range(window_size)])
    return gauss/gauss.sum()

def create_window(window_size, channel=1):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
    return window

def ssim(img1, img2, maxDepthVal, window_size=11, window=None, size_average=True, full=False):
    L = maxDepthVal

    padd = 0
    (_, channel, height, width) = img1.size()
    if window is None:
        real_size = min(window_size, height, width)
        window = create_window(real_size, channel=channel).to(img1.device)

    mu1 = F.conv2d(img1, window, padding=padd, groups=channel)
    mu2 = F.conv2d(img2, window, padding=padd, groups=channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1 * mu2

    sigma1_sq = F.conv2d(img1 * img1, window, padding=padd, groups=channel) - mu1_sq
    sigma2_sq = F.conv2d(img2 * img2, window, padding=padd, groups=channel) - mu2_sq
    sigma12 = F.conv2d(img1 * img2, window, padding=padd, groups=channel) - mu1_mu2

    C1 = (0.01 * L) ** 2
    C2 = (0.03 * L) ** 2

    v1 = 2.0 * sigma12 + C2
    v2 = sigma1_sq + sigma2_sq + C2
    cs = torch.mean(v1 / v2)  # contrast sensitivity

    ssim_map = ((2 * mu1_mu2 + C1) * v1) / ((mu1_sq + mu2_sq + C1) * v2)

    if size_average:
        ret = ssim_map.mean()
    else:
        ret = ssim_map.mean(1).mean(1).mean(1)

    if full:
        return ret, cs

    return ret

def depth_loss(y_pred, y_true):
    l_depth = L1loss(y_pred, y_true)
    l_edges = lossgradient(y_pred, y_true)
    l_ssim = torch.clamp((1 - ssim(y_pred, y_true, maxDepthVal=1000.0/10.0)) * 0.5, 0, 1)
    
    w1 = 1.0
    w2 = 1.0
    w3 = 1.0
    loss = ((w1 * l_depth) + (w2 * l_edges) + (w3 * l_ssim))
    return loss

loss_modify

In [None]:
# import torch
# import torch.nn.functional as func
# from math import exp

# #Ldepth(y, ŷ)
# def l1_criterion(y_pred,y_true):
#   #compute point-wise L1 loss
#   l_depth = torch.mean(torch.abs(y_pred - y_true))
#   return l_depth

# #Lgrad(y, ŷ)
# def image_gradients(image):
#   #compute image gradient loss
#   left = image
#   right = func.pad(image,[0, 1, 0, 0])[:, :, :, 1:]
#   top = image
#   bottom = func.pad(image, [0, 0, 0, 1])[:, :, 1:, :]

#   dx = right - left
#   dy = bottom - top
#   dx[:, :, :, -1] = 0
#   dy[:, :, -1, :] = 0

#   return dx,dy

# #LSSIM (y, ŷ)
# #Structural similarity index is a method for predicting similarity of two images
# #An image quality metric that assesses the visual impact of three characteristics of an image: luminance, contrast and structure.
# def gaussian(window_size, sigma):
#   # Calculate the one-dimensional Gaussian distribution vector
#   def gauss(point):
#     return -(point - (window_size // 2))**2 / float(2 * sigma**2)
#   gauss = torch.Tensor([exp(gauss(point)) for point in range(window_size)])
#   return gauss / gauss.sum()

# def create_window(window_size, channel=1):
#   # Create a Gaussian kernel, obtained by matrix multiplication of two one-dimensional Gaussian distribution vectors
#   gaussian_kernel1d = gaussian(window_size, 1.5).unsqueeze(1)
#   gaussian_kernel2d = gaussian_kernel1d.mm(gaussian_kernel1d.t()).float().unsqueeze(0).unsqueeze(0)
#   window = gaussian_kernel2d.expand(channel, 1, window_size, window_size).contiguous()
#   return window

# def ssim(y_pred, y_true, data_range=None, window=None, size_average=True):
#   #If the data is not provided use max and min value from image to calculate data range
#   if data_range is None:
#     if torch.max(y_pred) > 128:
#       max_point = 255
#     else:
#       max_point = 1

#     if torch.min(y_pred) < -0.5:
#       min_point = -1
#     else:
#       min_point = 0
#     data_range = max_point - min_point


#     #get parameter from image
#     (_, channel, height, width) = y_pred.size()
#     #window_size is 11 by default
#     default_window_size=11

#     if window is None:
#         realWindowsize = min(default_window_size, height, width)
#         window = create_window(realWindowsize, channel=channel).to(y_pred.device)

#     #The formula Var(X)=E[X^2]-E[X]^2, cov(X,Y)=E[XY]-E[X]E[Y] is used when calculating variance and covariance .    
#     #mu_x the average of x
#     mu_x = func.conv2d(y_pred, window, padding=0, groups=channel)
#     #mu_y the average of y
#     mu_y = func.conv2d(y_true, window, padding=0, groups=channel)
#     #sigma_xy the covariance of x and y
#     Sigma_xy = func.conv2d(y_pred * y_true, window, padding=0, groups=channel) - (mu_x * mu_x) * (mu_y * mu_y)

#     #K1 = 0.01 and k2 = 0.03 by default
#     K1 = 0.01
#     K2 = 0.03
#     #C1 and C2 two variables to stabilize the division with weak denominator
#     ##L is the dynamic range of the pixel-values which either provided by user or calculate from before
#     C1 = (K1 * data_range) ** 2
#     C2 = (K2 * data_range) ** 2

#     Denominator = (2 * mu_x * mu_y + C1) * (2 * Sigma_xy + C2)
#     #sigma_x^2 is the variance of x
#     Sigma_x_sq = func.conv2d(y_pred * y_pred, window, padding=0, groups=channel) - mu_x * mu_x
#     #sigma_y^2 is the variance of y
#     Sigma_y_sq = func.conv2d(y_true * y_true, window, padding=0, groups=channel) - mu_y * mu_y

#     Numerator = (mu_x * mu_x + C1) *(Sigma_x_sq + Sigma_y_sq + C2)

#     ssim_map = Denominator / Numerator

#     if size_average:
#         return ssim_map.mean()
#     else:
#         return ssim_map.mean(1).mean(1).mean(1)

# def depth_loss(y_pred, y_true):
#   #Ldepth(y, ŷ)
#   l_depth = l1_criterion(y_pred, y_true)
#   #Lgrad(y, ŷ)
#   dx_true, dy_true = image_gradients(y_true)
#   dx_pred, dy_pred = image_gradients(y_pred)
#   l_edges = torch.mean(torch.abs(dy_pred - dy_true) + torch.abs(dx_pred - dx_true))
#   #LSSIM (y, ŷ)
#   l_ssim = torch.clamp((1 - ssim(y_pred, y_true)) * 0.5, 0, 1)

#   #loss 
#   w1 = 0.1
#   w2 = 1.0
#   w3 = 1.0
#   loss= (w1 * l_depth) + (w2 * l_edges) + (w3 * l_ssim)
#   return loss

model

In [None]:
import torch
import torch.nn as nn
from torchvision import models
import torch.nn.functional as F

class UpSample(nn.Sequential):
    def __init__(self, skip_input, output_features):
        super(UpSample, self).__init__()        
        self.convA = nn.Conv2d(skip_input, output_features, kernel_size=3, stride=1, padding=1)
        self.leakyreluA = nn.LeakyReLU(0.2)
        self.convB = nn.Conv2d(output_features, output_features, kernel_size=3, stride=1, padding=1)
        self.leakyreluB = nn.LeakyReLU(0.2)

    def forward(self, x, concat_with):
        up_x = F.interpolate(x, size=[concat_with.size(2), concat_with.size(3)], mode='bilinear', align_corners=True)
        return self.leakyreluB( self.convB( self.leakyreluA(self.convA( torch.cat([up_x, concat_with], dim=1)  ) )))
        # return self.leakyreluB( self.convB( self.convA( torch.cat([up_x, concat_with], dim=1)  ) )  )

class Decoder(nn.Module):
    def __init__(self, num_features=1664, decoder_width = 1.0):
        super(Decoder, self).__init__()
        features = int(num_features * decoder_width)

        self.conv2 = nn.Conv2d(num_features, features, kernel_size=1, stride=1, padding=0)

        self.up1 = UpSample(skip_input=features//1 + 256, output_features=features//2)
        self.up2 = UpSample(skip_input=features//2 + 128,  output_features=features//4)
        self.up3 = UpSample(skip_input=features//4 + 64,  output_features=features//8)
        self.up4 = UpSample(skip_input=features//8 + 64,  output_features=features//16)

        self.conv3 = nn.Conv2d(features//16, 1, kernel_size=3, stride=1, padding=1)

    def forward(self, features):
        x_block0, x_block1, x_block2, x_block3, x_block4 = features[3], features[4], features[6], features[8], features[12]
        x_d0 = self.conv2(F.relu(x_block4))

        x_d1 = self.up1(x_d0, x_block3)
        x_d2 = self.up2(x_d1, x_block2)
        x_d3 = self.up3(x_d2, x_block1)
        x_d4 = self.up4(x_d3, x_block0)
        return self.conv3(x_d4)

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        # pretrained (bool): If True, returns a model pre-trained on ImageNet
        self.original_model = models.densenet169( pretrained=True )

    def forward(self, x):
        features = [x]
        for k, v in self.original_model.features._modules.items(): features.append( v(features[-1]) )
        return features

class DepthModel(nn.Module):
    def __init__(self):
        super(DepthModel, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, x):
        return self.decoder( self.encoder(x) )

train

In [None]:
import time
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import os
import cv2

def main():
    # send the tensor to GPU if you have a GPU; otherwise, send the tensor to CPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Create model
    model = DepthModel().to(device)
    
    # set batch size and the numer of epoches
    batch_size = 8
    num_epoch = 50
    
    # Adam optimizer with learning rate of 0.0001
    optimizer = torch.optim.Adam( model.parameters(), lr=0.0001 )

    # train
    annotated_directory = "./targetdir/KITTI_small_data/kitti/annotated/train/"
    inpainted_directory = "./targetdir/KITTI_small_data/kitti/inpainted/train/"
    files = []
    for directory in os.listdir(annotated_directory):
        if not directory.startswith('.'):
            for image in os.listdir(os.path.join(annotated_directory,directory,"proj_depth/groundtruth/image_03")):
                files.append((directory, image))
    train_set = KITTI_TrainAugmentDataset(files, inpainted_directory)
    train_loader = DataLoader(train_set, batch_size, shuffle=True)
    
    #val
    val_annotated_directory = "./targetdir/KITTI_small_data/kitti/annotated/val/"
    val_inpainted_directory = "./targetdir/KITTI_small_data/kitti/inpainted/val/"
    files = []
    for directory in os.listdir(val_annotated_directory):
        if not directory.startswith('.'):
            for image in os.listdir(os.path.join(val_annotated_directory,directory,"proj_depth/groundtruth/image_03")):
                if not image.startswith('.'):
                    files.append((directory, image))
    test_set = KITTI_TestDataset(files, val_inpainted_directory)
    test_loader = DataLoader(test_set, batch_size, shuffle=False)

    train_avg_losses = []
    test_avg_losses = []

    # Start training...
    for epoch in range(num_epoch):
        epoch_start_time = time.time()
        train_loss = 0.0
        test_loss = 0.0
        
        # Switch to train mode
        model.train()
        for i, sample_batched in enumerate(train_loader):
            # zero the gradients
            optimizer.zero_grad()
            # Prepare RGB sample and corresponding depth ground truth, and send only one batch to the device.
            train_image = torch.autograd.Variable(sample_batched['image'].to(device))
            train_depth = torch.autograd.Variable(sample_batched['depth'].to(device))
            # Predict depth
            train_output = model(train_image)
            # Compute the loss between the prediction and the ground truth
            batch_loss = depth_loss(train_output, train_depth)
            # auto-compute all gradients 
            batch_loss.backward()
            # update the parameters of the model using the computed gradients
            optimizer.step()
            
            # accumulate loss
            train_loss += batch_loss.item()
            
            # display information about running speed and batch loss
            if i % 10 == 0:
                print('Epoch [{}/{}][{}/{}], {:.2f} sec(s), Batch loss:{:.5f} (Avg:{:.5f})'
                  .format(epoch+1, num_epoch, i+1, train_loader.__len__(), (time.time()-epoch_start_time)*10/(i+1), batch_loss, train_loss/(i+1)))
            if i % 400 == 0:
                input_rgb = train_image.permute(0, 2, 3, 1)
                input_rgb = torch.squeeze(input_rgb[-1]).cpu().detach().numpy()
                plt.figure(1)
                plt.imshow( input_rgb )
                plt.show()
                output_depth = train_output.permute(0, 2, 3, 1)
                output_depth = torch.squeeze(output_depth[-1]).cpu().detach().numpy()
                plt.figure(2)
                plt.imshow( -output_depth, cmap='plasma' )
                plt.show()
        
        
        # switch to test mode 
        model.eval()
        # disable any gradient calculation
        with torch.no_grad():
            for i, sample_batched in enumerate(test_loader):
                # prepare test dataset
                test_image = torch.autograd.Variable(sample_batched['image'].to(device))
                test_depth = torch.autograd.Variable(sample_batched['depth'].to(device))
                # predict
                test_output = model(test_image)
                # loss
                batch_loss = depth_loss(test_output, test_depth)
                
                # accumulate loss
                test_loss += batch_loss.item()
                
                
                # display the depth prediction of last RGB image in test dataset
                if i == len(test_loader)-1:
                    # change the dimensions of the output tensor from (N x C x H x W) to (N x H x W x C)
                    output_depth = test_output.permute(0, 2, 3, 1)
                    # removes all dimensions with a length of one from tensor, it will return a tensor with the size of (H x W)
                    # transfer from tensor to numpy after removing gradients using torch.detach()
                    # output_depth = torch.squeeze(output_depth[-1]).detach().numpy()
                    output_depth = torch.squeeze(output_depth[-1]).cpu().detach().numpy()
                    plt.figure(1)
                    plt.imshow( output_depth, cmap='plasma' )
                    plt.show()
            
            # record average batch losses for training and test sets at one epoch
            train_avg_losses.append(train_loss/train_loader.__len__())
            test_avg_losses.append(test_loss/test_loader.__len__())
            # display information about running speed of one epoch and batch loss
            print('Epoch [{}/{}], {:.2f} sec(s), Avg Train loss:{:.5f}, Avg Test loss:{:.5f}'
                  .format(epoch+1, num_epoch, time.time()-epoch_start_time, train_loss/train_loader.__len__(), test_loss/test_loader.__len__()))
            

    # plot average batch losses for training and test sets
    plt.figure(3)
    plt.plot(train_avg_losses, 'o-', label='average train loss')
    plt.plot(test_avg_losses, 'o-', label='average test loss')
    plt.legend()
    plt.title('train/test losses')
    plt.savefig('losses.png')
    plt.show()
    

    # save model's parameters
    path = 'nyusmall_para.pt'
    torch.save(model.state_dict(), path)

if __name__ == '__main__':
    main()

Output hidden; open in https://colab.research.google.com to view.