In [1]:
#@title Import necessary libraries

import os
import numpy as np
import random
import pandas as pd
import torch
import copy
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import DataLoader, TensorDataset, Dataset
from torch.nn import Linear, ReLU, LeakyReLU, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from torchvision import datasets, transforms
from math import sqrt
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot as plt
from torch.autograd import Variable
import torch.optim as optim
import torch.autograd as autograd
import copy
# from tensorflow import keras
import thread6 as thread
device = torch.device("cuda")

In [2]:
#@title deep unrolling training

# Function to create a downsampling operator for an image
def downsampling_operator(s):
    # Initialize an empty matrix for the downsampled image
    S = np.zeros((np.int_(64/s), 64))
    vector = np.zeros((1, 64))
    vector[0,0] = 1

    # Loop over the downsampled image indices and create the downsampled image by shifting the elements in the vector
    for i in range(np.int_(64/s)):
        S[i,:] = vector
        vector = np.roll(vector, s, axis=1)
    # Return as a PyTorch tensor
    return torch.from_numpy(S).float()

# Function to count the number of trainable parameters in a PyTorch model
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Function to add Gaussian noise to the training data, used for data augmentation
def add_noise_to_training_data(training_data_pred_ground_truth, noise_sensor=0.03):
    # Initialize an empty array to hold the noisy data
    training_data_pred_noisy = np.zeros(training_data_pred_ground_truth.shape)
    # Loop over the training data
    for i in range(training_data_pred_ground_truth.shape[0]):
        gt = training_data_pred_ground_truth[i,:,:,:]
        # Generate Gaussian noise with mean 0 and standard deviation equal to sensor_noise
        noise = np.random.normal(0, sensor_noise, gt.shape) # mu, sigma, size
        # If the ground truth is zero, make the noise zero as well
        noise[gt == 0] = 0
        # Add the noise to the ground truth
        training_data_pred_noisy[i,:,:,:] = gt + noise
    return training_data_pred_noisy

# Function to pretrain the DnCNN model
def pretraining(DnCnn_model, num_epochs, learning_rate, dataloader_pretrain, device, l2_w, batch_size):
    # Define the loss function (L1 loss)
    criterion = nn.L1Loss()
    # Define the optimizer (Adam optimizer with weight decay)
    optimizer = torch.optim.Adam(DnCnn_model.parameters(), lr=learning_rate, weight_decay=l2_w)

    # Loop over the epochs
    for epoch in range(num_epochs):
        acc_loss = 0.
        # Loop over the batches in the pretraining dataloader
        for data_noisy, clean in dataloader_pretrain:
            # If batch size is greater than 1, adjust it to the size of the current batch
            if batch_size>1:
                batch_size = len(data_noisy)           
            # Move data and labels to the device (GPU or CPU)
            data = (data_noisy.float()).to(device)
            clean = (clean.float()).to(device)
            # Rearrange the dimensions of the data and labels to match the DnCNN input
            data = torch.moveaxis(data, 3, 2)
            data = torch.moveaxis(data, 2, 1)
            clean = torch.moveaxis(clean, 3, 2)
            clean = torch.moveaxis(clean, 2, 1)
            # Forward pass: Compute predicted y by passing x to the model
            output = DnCnn_model(data)
            # Compute loss
            loss = torch.sqrt(criterion(output, clean))
            # Zero gradients, perform a backward pass, and update the weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            # Accumulate the loss
            acc_loss += loss.item()
        # Print the average loss for this epoch
        print(acc_loss / len(dataloader_pretrain))
        
# Function to train the Deep unrolling model
def unrolling_model_training(my_sc_model, num_epochs, learning_rate, dataloader_train, device, l2_w, batch_size, S, blocks):
    # Set the model to training mode
    my_sc_model.train()
    # Define the loss function (L1 loss)
    criterion = nn.L1Loss()
    # Define the optimizer (Adam optimizer)
    optimizer = torch.optim.Adam(my_sc_model.parameters(), lr=learning_rate)

    # Loop over the epochs
    for epoch in range(num_epochs):
       acc_loss = 0.
       # Loop over the batches in the training dataloader
       for data, clean in dataloader_train:
           # Adjust batch size to the size of the current batch
           batch_size = len(data)
           # Move data and labels to the device (GPU or CPU)
           data = (data.float()).to(device)
           clean = (clean.float()).to(device)
           # Rearrange the dimensions of the data and labels to match the model's input
           data = torch.moveaxis(data, 3, 2)
           data = torch.moveaxis(data, 2, 1)
           clean = torch.moveaxis(clean, 3, 2)
           clean = torch.moveaxis(clean, 2, 1)
           # Forward pass: Compute predicted y by passing x to the model
           output = my_sc_model(data, S, batch_size, device, blocks)  
           # Compute loss
           loss = criterion(output, clean)
           # Zero gradients, perform a backward pass, and update the weights
           optimizer.zero_grad()
           loss.backward()
           optimizer.step()
           # Accumulate the loss
           acc_loss += loss.item()
       # Print the average loss for this epoch
       print(acc_loss / len(dataloader_train))

In [5]:
#@title Deep unrolling architecture

# Function to enable dropout layers during testing phase
def enable_dropout(model):
    """ 
    This function is used to enable dropout layers during testing phase. Dropout is typically
    only used during training, and this can help to create a more robust model.
    """
    for m in model.modules():
        if m.__class__.__name__.startswith('Dropout'):
            m.train()

# Define a DnCNN network (a deep learning model for image denoising)
class DnCnn_net(nn.Module):
       def __init__(self, num_of_layers, features, input_channels):
         """
         Initialize the DnCnn_net model with given number of layers, features, and input channels.
         """
         super().__init__()
         kernel_size = 3  # Kernel size for the convolutional layers
         padding = 1  # Padding for the convolutional layers
         features = features  # Number of feature maps
         channels = input_channels  # Number of input channels
         num_of_layers = num_of_layers  # Number of layers in the network
         layers = []  # List to hold the layers

         # First layer of the network (Convolution + ReLU + Dropout)
         layers.append(nn.utils.spectral_norm(nn.Conv2d(in_channels=channels, out_channels=features, kernel_size=kernel_size, padding=padding, bias=False)))
         layers.append(nn.ReLU())  
         layers.append(nn.Dropout(0.05))

         # Intermediate layers (Convolution + ReLU + Dropout)
         for _ in range(num_of_layers-2):
             layers.append(nn.utils.spectral_norm(nn.Conv2d(in_channels=features, out_channels=features, kernel_size=kernel_size, padding=padding, bias=False)))
             layers.append(nn.ReLU())   
             layers.append(nn.Dropout(0.05))

         # Final layer of the network (Convolution)
         layers.append(nn.utils.spectral_norm(nn.Conv2d(in_channels=features, out_channels=1, kernel_size=kernel_size, padding=padding, bias=False)))
         
         # Create the model by putting all the layers in a sequential order
         self.dncnn = (nn.Sequential(*layers))
        
       def forward(self, input):
           """
           Implement the forward pass of the network. This is called when we pass data through the model.
           """
           return self.dncnn(input) 
       

# Define a deep unrolled sparse coding model
class deep_unrolled_sc(Module):   
    def __init__(self, DnCnn):
        super(deep_unrolled_sc, self).__init__()
        self.DnCnn = DnCnn
        self.alpha=nn.Parameter(torch.tensor(10.99), requires_grad=True)
        # self.DtD_approx = nn.Parameter(torch.rand(64, 64), requires_grad=True)

    def forward(self, data, S, batch_size, device, blocks):
        low_dim = S.shape[0]
        high_dim = S.shape[1]        
        
        # Use broadcasting instead of explicit repetition
        S = S.unsqueeze(0).expand(batch_size, -1, -1).to(device)
        II = torch.eye(S.shape[2]).unsqueeze(0).expand(batch_size, -1, -1, -1).to(device)

        # Use batched matrix multiplication
        STS = torch.bmm(S.transpose(1, 2), S)
        STS = STS.unsqueeze(dim=1).to(device)
        

        A = torch.matmul(S.transpose(1, 2).unsqueeze(dim=1), data)
        A = self.alpha * A
        
        DtD = torch.linalg.inv(II + STS * self.alpha)
        # DtD = self.DtD_approx.unsqueeze(0).unsqueeze(0).expand(batch_size, -1, -1, -1)

        # Initialize Z using zeros instead of ones
        Z = torch.zeros(batch_size, 1, high_dim, data.shape[3]).to(device)

        for kk in range(blocks):
            x = torch.matmul(DtD, (A + Z))
            Z = self.DnCnn(x)

        return Z

In [6]:
#@title Read the training data


# Import helper functions to load data from the 'data_paper' module
from data_paper import *

# Load training data
# training_data_input: The input data for training
# training_data_pred_ground_truth: The corresponding ground truth data
training_data_input, training_data_pred_ground_truth = load_train_data()

# Create a TensorDataset from the training data.
train_ds = TensorDataset(torch.from_numpy(training_data_input), torch.from_numpy(training_data_pred_ground_truth))

# Define the batch size for training
batch_size = 6
dataloader_train = DataLoader(train_ds, batch_size, shuffle=True)

# Initialize the downsampling operator 'S' by calling the defined function with the downsampling factor 4.
S = downsampling_operator(4)


ouster_range_image
##################################################
Input image resolution:   16 by 1024
Output image resolution:  64 by 1024
Upscaling ratio:          4
##################################################
Sensor minimum range:     2.0
Sensor maximum range:     80.0
Sensor view angle:        -16.6 to 16.6
Range normalize ratio:    100.0
##################################################
Training data-set:        /home/alexgi/Workspace/lidar_super_resolution/Lidar Super-Resolution/carla_ouster_range_image.npy 
Testing data-set:         /home/alexgi/Workspace/lidar_super_resolution/Lidar Super-Resolution/ouster_range_image.npy
add noise level: 0.03


In [8]:
#@title Train the deep unrolling model

# Initialize the DnCnn model with 5 layers, 64 features, and 1 input channel
# This model will be used as part of the deep unrolled super-resolution model to act as denoiser-regularizer
DnCnn_model = DnCnn_net(5, 64, 1).to(device)

# Initialize the deep unrolled super-resolution (sc) model with the DnCnn model
# This model will be used for training on the super-resolution task
my_sc_model = deep_unrolled_sc(DnCnn_model).cuda()

# Calculate the total number of trainable parameters in the super-resolution model
# This information is useful for understanding the capacity of the model
num_of_param = count_parameters(my_sc_model)
print('Number of parameters:', num_of_param)

# Set the learning rate for the Adam optimizer
learning_rate = 1e-3
# Set the number of unrolling blocks for the deep unrolled model
blocks  = 6
# Set the number of epochs for training the model
num_epochs = 50
# Set the L2 regularization weight for preventing overfitting
l2_w = 1e-6

# Train the deep unrolled model with the specified parameters
unrolling_model_training(my_sc_model, num_epochs, learning_rate, dataloader_train, 
                         device, l2_w, batch_size, 
                         S, blocks)

# After the initial round of training, decrease the learning rate to refine the parameters
learning_rate = 1e-4

# Continue training the deep unrolled model with the lowered learning rate
unrolling_model_training(my_sc_model, num_epochs, learning_rate, dataloader_train, 
                         device, l2_w, batch_size, 
                         S, blocks)


Number of parameters: 115841
2.0038869713768364
0.09332608881592751
0.09204575098305941
0.09076253075152635
0.08726141490414739
0.0776420999802649
0.06597186245769263
0.05393738604336977
0.04388619832880795
0.03680832852423191
0.03273154521547258
0.02946196016855538
0.026508570382371546
0.025015232061967253
0.023426709088496863
0.02185383191332221
0.02083214063383639
0.01946202701330185
0.018196363140828908
0.0171221939381212
0.016140389575622974
0.015639372297562657
0.015174192999489605
0.014782508102245628
0.014476693000644445
0.01415575338806957
0.013878646411001682
0.013884387313388287
0.013665334686636926
0.013269385248422623
0.013186699378304183
0.013156322318129242
0.012910316251218319
0.012807581613305957
0.012720054478384554
0.012634680661838501
0.012645330363884568
0.012529811615124346
0.012499766464345157
0.012486009208485483
0.012359317543450743
0.012309590389020741
0.012245889580342919
0.012222120181657374
0.012212040639016777
0.012146015568170696
0.012142755012027918
0.01

In [10]:
#@title Test

# Import the necessary functions from your data handling module
from data_paper import *

# Load the test data inputs and ground truth
test_data_input, test_data = load_test_data()

# Initialize a list to hold the L1 loss for each test image
l1_error = []

# Initialize an array to hold the predicted range images
predicted_range_images = np.zeros_like(test_data)

# Loop over each image in the test set
for i in range(0, test_data.shape[0]):
    # Extract the low resolution and high resolution images for this test case
    low_res = test_data_input[i,:,:,:]
    high_res = test_data[i,:,:,:]

    # Convert these numpy arrays to PyTorch tensors and move them to the GPU
    low_res = torch.from_numpy(low_res).to(device)
    high_res = torch.from_numpy(high_res).to(device)

    # Add a new dimension to these tensors to indicate the batch size
    low_res = torch.unsqueeze(low_res, dim=0)
    high_res = torch.unsqueeze(high_res, dim=0)

    # Change the order of the dimensions to be compatible with PyTorch's expectations
    low_res = torch.moveaxis(low_res, 3, 2)
    low_res = torch.moveaxis(low_res, 2, 1)
    high_res = torch.moveaxis(high_res, 3, 2)
    high_res = torch.moveaxis(high_res, 2, 1)

    # Feed the low resolution image into the model to get the high resolution prediction
    output = my_sc_model(low_res.float(), S, 1, device, blocks)  

    # Clamp the output values to be between 0 and 1
    output[output < 0] = 0
    output[output > 1] = 1
    
    # Detach the output tensor from the computation graph and move it to the CPU
    high = output[0,0:,:,:].cpu().detach().numpy()
    high = np.squeeze(high, axis=0)
    
    # Store this high resolution prediction in the appropriate place in the predicted_range_images array
    predicted_range_images[i,:,:,0] = high

    # Calculate the L1 loss between the high resolution prediction and ground truth
    criterion = nn.L1Loss()
    l1_error.append(criterion(output, high_res).item())

# Print the average L1 loss across all the test images
print('L1 error: ', np.mean(l1_error))

L1 error:  0.022618724518526798


In [11]:
#@title Define the Monte Carlo Dropout function
def MC_drop(test_data_input, iterate_count=10):

    # Initialize arrays to hold temporary and final results
    this_test = np.empty([iterate_count, image_rows_low, image_cols, channel_num], dtype=np.float32)
    test_data_prediction = np.empty([test_data_input.shape[0], image_rows_high, image_cols, 2], dtype=np.float32)
    
    # Set the model to evaluation mode and enable dropout
    my_sc_model.eval()
    enable_dropout(my_sc_model)

    # Loop over each image in the input data
    for i in range(test_data_input.shape[0]):
        print('Processing {} th of {} images ... '.format(i, test_data_prediction.shape[0]))
        
        # Initialize an array to hold the predictions for this image
        this_prediction = torch.zeros(iterate_count, 1, 64, 1024)

        # Perform `iterate_count` forward passes with dropout
        for j in range(iterate_count):
            this_test = test_data_input[i]
            low_res = torch.from_numpy(this_test).to(device)
            low_res = torch.unsqueeze(low_res, dim=0)
            low_res = torch.moveaxis(low_res, 3, 2)
            low_res = torch.moveaxis(low_res, 2, 1)
            ooo = my_sc_model(low_res.float(), S, 1, device, blocks)
            this_prediction[j] = ooo.cpu().detach()

        # Rearrange dimensions to match expectation
        this_prediction = torch.moveaxis(this_prediction, 2, 1)
        this_prediction = torch.moveaxis(this_prediction, 3, 2)

        # Convert to numpy array
        this_prediction = this_prediction.cpu().detach().numpy()

        # Compute the mean and standard deviation of the predictions
        this_prediction_mean = np.mean(this_prediction, axis=0)
        this_prediction_var = np.std(this_prediction, axis=0)

        # Store the mean and variance in the final results array
        test_data_prediction[i,:,:,0:1] = this_prediction_mean
        test_data_prediction[i,:,:,1:2] = this_prediction_var

    # Return the final results array
    return test_data_prediction

In [13]:
#@title Testing Range Images 

# Load the test data input (ignoring ground truth data with `_`)
test_data_input, _ = load_test_data()

# Call the MC_drop function on your test data input, requesting 50 iterations for each sample
# This will provide an array of mean and variance for each test sample's prediction
predImages = MC_drop(test_data_input, iterate_count=50)

# Rescale the predicted images by multiplying by 100 (this may be specific to your particular application or data)
predImages = predImages *100

# Save the prediction results as a numpy array in a .npy file for future use or analysis
np.save('predImages_6blocks_test_June.npy', predImages)