# Globals

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
global_var = {
    # Resolutions
    'RGB_img_res': (3, 634, 488),
    'batch_size': 64,
    'n_workers': 2,
}

augmentation_parameters = {
    # TODO
}

In [None]:
dataset_root = '/content/drive/MyDrive/NN_project/SSID_dataset/'
save_model_root = '/content/drive/MyDrive/NN_project/'

# Imports

In [None]:
#!pip install einops torchsummaryX



In [None]:
import math
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as TT

from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split

# Data

## Data augmentation

In [None]:
# TODO

## Dataset

In [None]:
class SSID_Dataset(Dataset):
    def __init__(self, data_root):
        self.dataset_path = data_root
        self.dir_list = data_root + "Scene_Instances.txt"
        self.data_dir = data_root + "Data/"
        self.data_directiories = []
        self.img_paths = []
        self.target_paths = []
        self.post_processing = TT.Compose([
            TT.ToTensor(),
            TT.Resize((global_var['RGB_img_res'][2], global_var['RGB_img_res'][1]),antialias=None),
        ])

        data_dir_file = open(dataset_root+"Scene_Instances.txt", 'r')
        self.data_directories = [elem.strip() for elem in data_dir_file.readlines()]
        data_dir_file.close()

        for elem in self.data_directories:
          data_path = self.data_dir + elem
          content = sorted(os.listdir(data_path))
          self.target_paths.append(content[0])
          self.img_paths.append(content[1])

    def __getitem__(self, index):
        img_path = self.data_dir + self.data_directories[index] + "/" + self.img_paths[index]
        img = self.post_processing(Image.open(img_path))

        target_path = self.data_dir + self.data_directories[index] + "/" + self.target_paths[index]
        target = self.post_processing(Image.open(target_path))

        return img, target

    def __len__(self):
        return len(self.img_paths)

## Dataloader

In [None]:
dataset = SSID_Dataset(dataset_root)
train_dataset, test_dataset = random_split(dataset, [112, 48])

train_loader = DataLoader(dataset=train_dataset,
                          batch_size = global_var['batch_size'],
                          num_workers = global_var['n_workers'],
                          shuffle = True)

test_loader = DataLoader(dataset=test_dataset,
                         batch_size = global_var['batch_size'],
                         num_workers = global_var['n_workers'],
                         shuffle = True)

print("Train data percentage: ", len(train_dataset)/(len(train_dataset)+len(test_dataset)))
print("Test data percentage: ", len(test_dataset)/(len(train_dataset)+len(test_dataset)))

Train data percentage:  0.7
Test data percentage:  0.3


# Loss

In [None]:
class loss_function(nn.Module):
  def __init__(self,truth, pred, epsilon=1e-3):
    super(loss_function,self).__init__()
    self.epsilon = epsilon

  def forward(self,pred,truth):
    return torch.mean(torch.sqrt((pred-truth)**2 + self.epsilon**2))

# Evaluation metrics

In [None]:
# ATTENTION: PYTORCH HAS PIXEL RANGE BETWEEN 0.0 AND 1.0, NOT BETWEEN 0 AND 255
# It works, compared with torchmetrics.image import PeakSignalNoiseRatio
def psnr(original_img, compressed_img, max_pix_val=1.0):
  mse = torch.mean((original_img-compressed_img)**2)
  return 20 * torch.log10(max_pix_val/torch.sqrt(mse))

import cv2
from torch.autograd import Variable

def gaussian(window_size, sigma):
    gauss = torch.Tensor([math.exp(-(x - window_size//2)**2/float(2*sigma**2)) for x in range(window_size)])
    return gauss/gauss.sum()

def create_window(window_size, channel=1):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
    return window

# ATTENTION: PYTORCH HAS PIXEL RANGE BETWEEN 0.0 AND 1.0, NOT BETWEEN 0 AND 255
# ATTENTION: 4D tensors needed
# It works, compared with StructuralSimilarityIndexMeasure from torchmetrics.image
def ssim(original_img, restored_img, max_pix_val=1.0, window_size=11, window=None, size_average=True, full=False):
    (_, channel, height, width) = original_img.size()
    real_size = min(window_size, height, width)    
    window = create_window(real_size, channel=channel).to(original_img.device)

    mu1 = F.conv2d(original_img, window, padding=0, groups=channel)
    mu2 = F.conv2d(restored_img, window, padding=0, groups=channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1 * mu2

    sigma1_sq = F.conv2d(original_img ** 2, window, padding=0, groups=channel) - mu1_sq
    sigma2_sq = F.conv2d(restored_img ** 2, window, padding=0, groups=channel) - mu2_sq
    sigma12 = F.conv2d(original_img * restored_img, window, padding=0, groups=channel) - mu1_mu2

    C1 = (0.01 * max_pix_val) ** 2
    C2 = (0.03 * max_pix_val) ** 2

    v1 = 2.0 * sigma12 + C2
    v2 = sigma1_sq + sigma2_sq + C2
    cs = torch.mean(v1 / v2)

    return (((2 * mu1_mu2 + C1) * v1) / ((mu1_sq + mu2_sq + C1) * v2)).mean()


def compute_evaluation(test_dataloader, model, device='cpu'):
  model.eval()
  psnr_values = []
  ssim_values = []

  for i, (inputs, targets) in enumerate(test_dataloader):
      inputs, targets = inputs.to(device=device), targets.to(device=device)

      with torch.no_grad():
          predictions = model(inputs)

      psnr_values.append(psnr(targets,predictions))
      ssim_values.append(ssim(targets,predictions, val_range=1000.0))

  return np.mean(np.array(psnr_values)),np.mean(np.array(ssim_values)),

# Architecture

In [None]:
# attention components

class W_MSA(nn.module):
  def __init__(self, C, heads, B):
    super(W_MSA, self).__init__()
    self.C = C
    self.B = B
    self.heads = heads
    self.head_dim = C // heads

    self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
    self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
    self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
    self.fc_out = nn.Linear(heads*self.head_dim, C)

  def forward(self, values, keys, queries):
    attention = torch.softmax((queries*torch.transpose(keys) / self.head_dim) + self.B)
    out = attention * values

    return self.fc_out(out)


class LeFF(nn.module):
  def __init__(self, dim=32, hidden_dim=128):
    super(LeFF, self).__init__()
    self.dim = dim
    self.hidden_dim = hidden_dim

    self.layer1 = nn.Sequential(nn.Linear(dim, hidden_dim), nn.GELU)
    self.layer2 = nn.Sequential(nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1), nn.GELU)
    self.layer3 = nn.Sequential(nn.Linear(hidden_dim, dim))

  def forward(self, x):
    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)

    return x





# NN BLOCKS

# LeWin Transformer Block
class TransformerBlock(nn.Module):
  def __init__(self, dim, C, B, heads, dropout):
    self.norm1 = nn.LayerNorm(dim)
    self.w_msa = W_MSA(C, heads, B)
    self.norm2 = nn.LayerNorm(dim)
    self.leff = LeFF()
    self.dropout = nn.Dropout(dropout)

  def forward(self, values, keys, queries):
    w_msa = self.w_msa(values, keys, queries)
    x = self.dropout(self.norm1(w_msa)) # l'input del layer norm1 è sicuramente sbagliato
    x = self.dropout(self.norm2(w_msa))
    x = self.leff(x)

    return x



# Down-sampling Block (reduces the size of the feature map)
# reshape the flattened features into 2D spatial feature maps, and then down-sample the maps, double the channels using 4 × 4 convolution with stride 2
class DownsampleBlock(nn.Module):
    def __init__(self, in_channel, out_channel):
        super(Downsample, self).__init__()
        self.in_channel = in_channel
        self.out_channel = out_channel
        self.conv = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=4, stride=2, padding=1),
        )

    def forward(self, x): # remember that x is a tensor!!
        B, L, C = x.shape
        H = int(math.sqrt(L))
        W = int(math.sqrt(L))
        x = x.transpose(1, 2).contiguous().view(B, C, H, W) # this transposes the 1st and 2nd dimension of x, then the size of x is reshaped with view (the new size is (B, C, H, W))
                                                            # (.contiguous() is required to make view workable, since view works only on contiguous data)

        out = self.conv(x).flatten(2).transpose(1, 2).contiguous() # this pass the input x to the downsample layer, then the 2nd dimension of the output is flattened with the 3rd
                                                                   # and finally its 1st and 2nd dimensions are transposed

                                                                   # (B, C, H*W) is the size of the out after flatten(2)
                                                                   # (B H*W C) is the final size of the out after transpose(1, 2)
        return out


# Up-sampling Block (reduces half of the channels and doubles the size of the feature map)
# 2 × 2 transposed convolution with stride 2
class UpsampleBlock(nn.Module):
    def __init__(self, in_channel, out_channel):
      self.in_channel = in_channel
      self.out_channel = out_channel
      super(Upsample, self).__init__()
      self.deconv = nn.Sequential(
        nn.ConvTranspose2d(in_channel, out_channel, kernel_size=2, stride=2),
      )

    def forward(self, x):
      B, L, C = x.shape
      H = int(math.sqrt(L))
      W = int(math.sqrt(L))
      x = x.transpose(1, 2).contiguous().view(B, C, H, W)
      out = self.deconv(x).flatten(2).transpose(1, 2).contiguous() # B H*W C

      return out


# Input Projection Block (extracts the low-level features)
# 3 x 3 convolutional layer with LeakyReLu
class InputProjBlock(nn.Module):
    def __init__(self, in_channel=3, out_channel=64):
        super().__init__()
        self.proj = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=1, padding=kernel_size//2),
            nn.LeakyReLU(inplace=True)
        )

    def forward(self, x):
        B, C, H, W = x.shape
        x = self.proj(x).flatten(2).transpose(1, 2).contiguous()  # B H*W C

        return x



# Output Projection Block (returns the residual R)
# 3 x 3 convolutional layer
class OutputProjBlock(nn.Module):
    def __init__(self, in_channel=64, out_channel=3):
        super().__init__()
        self.in_channel = in_channel
        self.out_channel = out_channel
        self.proj = nn.Sequential(
            nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=1, padding=kernel_size//2),
        )

    def forward(self, x):
        B, L, C = x.shape
        H = int(math.sqrt(L))
        W = int(math.sqrt(L))
        x = x.transpose(1, 2).view(B, C, H, W)
        x = self.proj(x)

        return x



# AFTER THE OUTPUT PROJECTION, WE HAVE TO SUM UP THE RESIDUAL R AND THE ORIGINAL INPUT (DEGRADED IMAGE I) TO OBTAIN THE RESTORED IMAGE I'




