In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
import torch.nn.init as init

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import numpy as np
import torchvision.transforms.functional as TF
from PIL import Image
import glob
import random


In [33]:
class DenoisingDatasetSingular(Dataset):
    def __init__(self, image_paths, crop_size = 512, gaussian_sigma = 50, add_noise = True):
        self.image_paths = image_paths
        self.crop_size = crop_size
        self.gaussian_sigma = gaussian_sigma
        self.add_noise = add_noise
        
        if crop_size is None:
            transforms_list = []
        else:
            # Random crop followed by either a random horizontal flip or a random vertical flip
            transforms_list = [transforms.RandomCrop(crop_size), 
                               transforms.RandomChoice([transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip()])]
            
        transforms_list += [transforms.ToTensor()]
        self.transforms = transforms.Compose(transforms_list)
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        real_image = Image.open(self.image_paths[idx])
        transformed_real = self.transforms(real_image)
        noisy_image = transformed_real
        
        if self.add_noise:
            noisy_image += torch.randn_like(transformed_real) * self.gaussian_sigma/255.0
            noisy_image = torch.clamp(noisy_image, 0, 1)
        
        return transformed_real, noisy_image


In [34]:
class ConvolutionalResidualBlock(nn.Module):
    def __init__(self, kernel_size):
        super(ConvolutionalResidualBlock, self).__init__()
        self.kernel_size = kernel_size
        self.num_channels = 64
        self.padding = int((self.kernel_size-1)/2)
        self.conv_1 = nn.Conv2d(self.num_channels, self.num_channels, kernel_size = self.kernel_size, padding = self.padding, bias = False)
        self.conv_2 = nn.Conv2d(self.num_channels, self.num_channels, kernel_size = self.kernel_size, padding = self.padding, bias = False)
        
        
    def forward(self, x):
        identity = x
        out = self.conv_1(x)
        out = F.relu(out)
        out = self.conv_2(out)
        out = out + identity # Residual Connection
        out = F.relu(out)
        return out

In [35]:
class DeepConvolutionalModel(nn.Module):
    def __init__(self, depth = 17, kernel_size = 3):
        super(DeepConvolutionalModel, self).__init__()
        self.layers = []
        self.kernel_size = kernel_size
        self.depth = depth
        self.padding = int((self.kernel_size - 1)/2)
        self.num_channels = 64
        self.layers.append(nn.Conv2d(3, self.num_channels, kernel_size = self.kernel_size, padding = self.padding, bias = False))
        self.layers.append(nn.ReLU(inplace = True))
        

        # Stack convolutional blocks with residual connections
        for _ in range(depth-2):
            self.layers.append(ConvolutionalResidualBlock(self.kernel_size))
            
        self.layers.append(nn.Conv2d(self.num_channels, 3, kernel_size = self.kernel_size, padding = self.padding, bias = False))
        self.model = nn.Sequential(*self.layers)
        self._initialize_weights() 
        
    
    def forward(self, noisy_image):
        residual = self.model(noisy_image)
        return noisy_image - residual
    
    def _initialize_weights(self):
        
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.orthogonal_(m.weight)
                
                if m.bias is not None:
                    init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                init.constant_(m.weight, 1)
                init.constant_(m.bias, 0)

In [36]:
from skimage.measure import compare_psnr, compare_ssim

def validation_step(model, dataloader, refining_steps = 1):
    real_psnr_aggregate = []
    denoised_psnr_aggregate = []
    real_ssim_aggregate = []
    denoised_ssim_aggregate = []
    loss_vals = []

    with torch.no_grad():
        model = model.eval()   # Switches model to evaluation mode (no gradient computation)
        for data_bunch in dataloader:
            real, noisy = data_bunch
#             real, noisy = real.cuda(), noisy.cuda()
            current_noisy = noisy
            for _ in range(refining_steps):
                denoised_image = model(current_noisy)
                loss = criterion(denoised_image, real)
                loss = loss.div_(2)
                current_noisy = denoised_image

            loss_vals.append(loss.item())
            
            # Convert the PyTorch tensors representing real, noisy, and denoised 
            # images to NumPy arrays and reshape them to the expected format (channels-last) for PSNR and SSIM computations.
            
            real_numpy, noisy_numpy, denoised_numpy = [img.cpu().numpy().transpose(0, 2, 3, 1) for img in [real, noisy, denoised_image]]

            for idx in range(real_numpy.shape[0]):
                real, noisy, denoised = real_numpy[idx], noisy_numpy[idx], denoised_numpy[idx]
                real_psnr, denoised_psnr = compare_psnr(real, noisy), compare_psnr(real, denoised)
                real_ssim, denoised_ssim = compare_ssim(real, noisy, multichannel=True), compare_ssim(real, denoised, multichannel=True)
                real_ssim_aggregate.append(real_ssim)
                denoised_ssim_aggregate.append(denoised_ssim)
                real_psnr_aggregate.append(real_psnr)
                denoised_psnr_aggregate.append(denoised_psnr)
        model = model.train()

    avg_real_psnr, avg_denoised_psnr, avg_ssim_real, avg_ssim_denoised, avg_loss = np.mean(real_psnr_aggregate), np.mean(denoised_psnr_aggregate), np.mean(real_ssim_aggregate), np.mean(denoised_ssim_aggregate), np.mean(loss_vals)
    return avg_real_psnr, avg_denoised_psnr, avg_ssim_real, avg_ssim_denoised, avg_loss
     


In [37]:
from sklearn.model_selection import train_test_split

In [38]:
train_image_paths = glob.glob("../Simple-Datasets/DIV2K-Dataset/DIV2K_train_HR/DIV2K_train_HR/*.png")
val_image_paths = glob.glob("../Simple-Datasets/DIV2K-Dataset/DIV2K_valid_HR/DIV2K_valid_HR/*.png")

train_image_paths, val_image_paths = train_test_split(train_image_paths + val_image_paths, test_size = 0.2, random_state = 42)
cbs_image_paths = glob.glob("../Simple-Datasets/CBSD68-Dataset/CBSD68/original/*.jpg")

train_dataset = DenoisingDatasetSingular(train_image_paths, crop_size = 128, gaussian_sigma = 50)
train_dataloader = DataLoader(train_dataset, batch_size = 32, num_workers = 0)

test_dataset = DenoisingDatasetSingular(val_image_paths, crop_size = 128, gaussian_sigma = 50)
test_dataloader = DataLoader(test_dataset, batch_size = 32, num_workers = 0)

cbs_dataset = DenoisingDatasetSingular(cbs_image_paths, crop_size = 256, gaussian_sigma = 50)
cbs_dataloader = DataLoader(cbs_dataset, batch_size = 16, num_workers = 0)

In [39]:
model = DeepConvolutionalModel(depth = 3, kernel_size = 3)
# model = model.cuda()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience = 3)
criterion = torch.nn.MSELoss()

In [40]:
print(model)

DeepConvolutionalModel(
  (model): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): ReLU(inplace=True)
    (2): ConvolutionalResidualBlock(
      (conv_1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv_2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    )
    (3): Conv2d(64, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  )
)


In [41]:
from tqdm import tqdm, trange, tqdm_notebook
num_epochs = 10
validation_iter = 50
total_iterations = 0

training_loss_vals = []

validation_real_psnr = []
validation_denoised_psnr = []
validation_ssim_real = []
validation_ssim_denoised = []
validation_loss = []
iteration_list = []

for epoch in tqdm_notebook(range(num_epochs)):

    scheduler.step(epoch)

    for data_bunch in train_dataloader:

        if total_iterations%validation_iter == 0:
            # total_iterations = 0
            iteration_list.append(total_iterations)
            avg_real_psnr, avg_denoised_psnr, avg_ssim_real, avg_ssim_denoised, avg_loss = validation_step(model, test_dataloader)  
            validation_real_psnr.append(avg_real_psnr)
            validation_denoised_psnr.append(avg_denoised_psnr)
            validation_ssim_real.append(avg_ssim_real)
            validation_ssim_denoised.append(avg_ssim_denoised)
            validation_loss.append(avg_loss)

        real, noisy = data_bunch
#         real, noisy = real.cuda(), noisy.cuda()
        denoised_image = model(noisy)

        loss = criterion(denoised_image, real)
        loss = loss.div_(2)

        training_loss_vals.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_iterations += 1

avg_real_psnr, avg_denoised_psnr, avg_ssim_real, avg_ssim_denoised, avg_loss = validation_step(model, test_dataloader)
validation_real_psnr.append(avg_real_psnr)
validation_denoised_psnr.append(avg_denoised_psnr)
validation_ssim_real.append(avg_ssim_real)
validation_ssim_denoised.append(avg_ssim_denoised)
validation_loss.append(avg_loss)
iteration_list.append(total_iterations)
     

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  from ipykernel import kernelapp as app


  0%|          | 0/10 [00:00<?, ?it/s]



ValueError: im_true has intensity values outside the range expected for its data type.  Please manually specify the data_range

In [26]:
import matplotlib.pyplot as plt
plt.plot(np.log(training_loss_vals), label='Training loss')
plt.gca().set_ylabel('Loss (Log scale)')
plt.gca().set_xlabel('Iterations')
plot_data = validation_loss
plt.plot(iteration_list, np.log(plot_data), label='Validation loss')
plt.title("Loss vs Iterations (Sigma = 50)")
plt.legend()
plt.savefig("loss_vs_iterations_sigma_50.jpg")

NameError: name 'training_loss_vals' is not defined

In [28]:
train_dataloader

<torch.utils.data.dataloader.DataLoader at 0x1b0a990d7f0>