In [1]:
import torch
import torch.nn as nn
from torch import optim

try:
    from tqdm import tqdm
except ImportError:
    tqdm = lambda x: x

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#### Load data

In [3]:
device = torch.device ('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
noisy_imgs_1, noisy_imgs_2 = torch.load('drive/MyDrive/data/train_data.pkl')
noisy_imgs, clean_imgs = torch.load('drive/MyDrive/data/val_data.pkl')

In [5]:
train_input, train_target = noisy_imgs_1.float()/255.0, noisy_imgs_2.float()/255.0
test_input, test_target = noisy_imgs.float()/255.0, clean_imgs.float()/255.0

# put tensors on cuda
train_input, train_target = train_input.to(device), train_target.to(device)
test_input, test_target = test_input.to(device), test_target.to(device)

img_channels = noisy_imgs_1.shape[1]
img_width = noisy_imgs_1.shape[2]
img_height = noisy_imgs_1.shape[3]

In [6]:
def psnr(denoised,ground_truth):
    mse=torch.mean((denoised-ground_truth)**2)
    return  - 10 * torch.log10(((denoised-ground_truth) ** 2).mean((1,2,3))).mean()

In [21]:
from torch.nn.modules import padding
### For mini - project 1
class Model():
    def __init__(self) -> None :
        # instantaiate model + optimizer + loss function + any other stuff you need

        # model: UNet - first idea: use alternate convd2 with relu activation and leakyrelu at the end
        #self.model = nn.Sequential(
         #   nn.Linear(32, 128),
          #  nn.ReLU(),
           # nn.Linear(128, 32)
        #)
        """
        self.model = nn.Sequential(
            nn.Conv2d(img_channels, 64, 3, stride=1, padding=1),
            nn.ReLU(inplace=True), 
            nn.Conv2d(64, 48, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(48, 48, 3, padding=1),
            nn.ReLU(inplace=1),
            nn.Conv2d(48, 64, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, img_channels, 3, stride=1, padding=1),
            nn.LeakyReLU(0.1)
        )"""

        self.model = nn.Sequential(
            # encoder
            nn.Conv2d(img_channels, 32, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding='same'),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding='same'),
            nn.ReLU(),
            # decoder
            nn.Conv2d(256, 256, 3, padding='same'),
            nn.ReLU(),
            nn.UpsamplingNearest2d(scale_factor=2),
            nn.ConvTranspose2d(256, 128, 3, padding=1),
            nn.Conv2d(128, 128, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding='same'),
            nn.ReLU(),
            nn.UpsamplingNearest2d(scale_factor=2),
            nn.ConvTranspose2d(128, 64, 3, padding=1),
            nn.Conv2d(64, 64, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding='same'),
            nn.ReLU(),
            nn.UpsamplingNearest2d(scale_factor=2),
            nn.ConvTranspose2d(64, 32, 3, padding=1),
            nn.Conv2d(32, 32, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(32, img_channels, 3, padding='same'),
            nn.ReLU(),
            nn.Conv2d(img_channels, img_channels, 1, padding='same'),
            nn.Sigmoid()
        )
        
        # can also use upsampling2d layer or7and maxpooling2d for decoding
        self.model.to(device)

        # optimizer: Adam; try different learning rates
        self.optimizer = optim.Adam(self.model.parameters(), lr = 1e-3)
  
        # loss function: MSE or HDRLoss for MonteCarlo images ? 
        self.loss = nn.MSELoss()
        self.loss.to(device)

    def load_pretrained_model(self) -> None :
        ## This loads the parameters saved in bestmodel .pth into the model
        pass

    def train(self, train_input, train_target, num_epochs) -> None :
        #: train˙input : tensor of size (N, C, H, W) containing a noisy version of the images
        #: train˙target : tensor of size (N, C, H, W) containing another noisy version of the same images , which only differs from the input by their noise .
        
        mini_batch_size = 100

        for e in range(num_epochs):
          print('Epoch:', e)
          losses = []
          for b in range(0, train_input.size(0), mini_batch_size):
              output = self.model(train_input.narrow(0, b, mini_batch_size))
              loss = self.loss(output, train_target.narrow(0, b, mini_batch_size))
              losses.append(loss)
      
              self.optimizer.zero_grad()
              loss.backward()
              self.optimizer.step()
          print('Loss = ', sum(losses)/(train_input.size(0)/mini_batch_size))

    def predict(self, test_input ) -> torch.Tensor:
        #:test_input : tensor of size (N1 , C, H, W) that has to be denoised by the trained or the loaded network .
        #: returns a tensor of the size (N1 , C, H, W)
        return self.model(test_input)


In [22]:
model = Model()

In [None]:
# train the model
model.train(train_input, train_target, 30)

Epoch: 0
Loss =  tensor(0.0764, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 1
Loss =  tensor(0.0742, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 2
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 3
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 4
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 5
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 6
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 7
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 8
Loss =  tensor(0.0741, device='cuda:0', grad_fn=<DivBackward0>)
Epoch: 9


In [16]:
output = model.predict(test_input)
print(output)

tensor([[[[0.5210, 0.5403, 0.6315,  ..., 0.7755, 0.7619, 0.7665],
          [0.5393, 0.5783, 0.6661,  ..., 0.8335, 0.8235, 0.8030],
          [0.5639, 0.5947, 0.6345,  ..., 0.8486, 0.8394, 0.8108],
          ...,
          [0.5502, 0.5550, 0.5452,  ..., 0.4651, 0.4448, 0.4516],
          [0.5551, 0.5518, 0.5456,  ..., 0.4830, 0.4745, 0.4689],
          [0.5360, 0.5253, 0.5215,  ..., 0.4700, 0.4815, 0.4710]],

         [[0.4872, 0.4903, 0.5883,  ..., 0.7583, 0.7395, 0.7438],
          [0.5063, 0.5336, 0.6123,  ..., 0.8007, 0.7792, 0.7702],
          [0.5398, 0.5601, 0.5959,  ..., 0.8204, 0.8079, 0.7848],
          ...,
          [0.5374, 0.5437, 0.5353,  ..., 0.2931, 0.2954, 0.3450],
          [0.5397, 0.5346, 0.5323,  ..., 0.3513, 0.3563, 0.3835],
          [0.5257, 0.5066, 0.5025,  ..., 0.3719, 0.4037, 0.4236]],

         [[0.4781, 0.4734, 0.5852,  ..., 0.7752, 0.7658, 0.7710],
          [0.5088, 0.5175, 0.5880,  ..., 0.8081, 0.7979, 0.7946],
          [0.5512, 0.5587, 0.5818,  ..., 0

In [17]:
model_outputs = []
for b in range(0, test_input.size(0), 100):
    output = model.predict(test_input.narrow(0, b, 100))
    model_outputs.append(output)
model_outputs = torch.cat(model_outputs, dim=0)

output_psnr = psnr(model_outputs, test_target.to(device))
output_psnr

tensor(25.2335, device='cuda:0', grad_fn=<MulBackward0>)