In [1]:
!pip install rawpy
!pip install opencv-python

Collecting rawpy
  Obtaining dependency information for rawpy from https://files.pythonhosted.org/packages/35/58/62caf571fba343c93924e3d301210630df49b4751f77491a9da54ebdf0c9/rawpy-0.19.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading rawpy-0.19.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.0 kB)
Downloading rawpy-0.19.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rawpy
Successfully installed rawpy-0.19.0


In [2]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.parallel import DataParallel
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.init
from torchvision.transforms import functional as TF
import torchvision.transforms as transforms
from torch.distributions import uniform


from rawpy import imread
import numpy as np
import time
import os

from torchvision.transforms import Compose
import cv2

from tqdm import tqdm

import torch.nn.functional as F
from math import exp

In [3]:
class CustomDataset(Dataset):
    def __init__(self, file_path, transform=None):
        self.file_path = file_path
        self.transform = transform
        self.data = self.read_file()

    def read_file(self):
        with open(self.file_path, 'r') as file:
            lines = file.readlines()
        
        data = []
        for line in lines:
            input_path, target_path, _, _ = line.split()
            input_path = input_path.replace('.', '/kaggle/input/sony-train-numpy', 1)
            target_path = target_path.replace('.', '/kaggle/input/sony-train-numpy', 1)
            data.append((input_path, target_path))
        
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        
        time_getItem = time.time()
        
        input_path, target_path = self.data[index]

        # Carica le immagini
        input_image_raw = imread(input_path)
        target_image_raw = imread(target_path)
        
        print(f'path: {input_path}')
        
        time_readRaw = time.time()

        # Applica trasformazioni se definite
        if self.transform:
            input_image, target_image = self.transform(input_image_raw, target_image_raw)
            input_image = input_image * 250
        else:
            input_image, target_image = input_image_raw, target_image_raw
            input_image = input_image * 250
            
        print(f"Total time: {time.time() - time_getItem}")

        return input_image, target_image

In [4]:
class NumpyDataset(Dataset):
    def __init__(self, file_path, transform=None, ops=0):
        self.file_path = file_path
        self.transform = transform
        self.ops = ops
        self.data = self.read_file()

        
    def read_file(self):
        with open(self.file_path, 'r') as file:
            lines = file.readlines()

        data = []
        for line in lines:
            input_path, target_path, _, _ = line.split()

            if self.ops == 0:
                prefix = '/kaggle/input/sony-train-numpy'
            elif self.ops == 1:
                prefix = '/kaggle/input/sony-valid-numpy'
            else:
                prefix = '/kaggle/input/sony-test-numpy'

            input_path = input_path.replace('.', prefix, 1)
            target_path = target_path.replace('.', prefix, 1)

            data.append((input_path, target_path))

        return data
    

    def __len__(self):
        return len(self.data)
    
    
    def transform(self, image, gt):
        
        # Transform to tensor
        image = TF.to_tensor(image)
        gt = TF.to_tensor(gt)
        return image, gt
    
    
    def ratio_eval(self, image_path, tg_path):
        
        try: 
            if self.ops == 0: 
                in_exposure = np.float16(image_path[9:-7])
                gt_exposure = np.float16(tg_path[9:-7])
            else:
                in_exposure = np.float16(image_path[9:-5])
                gt_exposure = np.float16(tg_path[9:-5])                
                
            ratio = min(gt_exposure / in_exposure, 300)
            
        except ValueError as e:
            print(f"Error converting to np.float16: {e}")
        
        return ratio

    
    def __getitem__(self, index):
        input_path, target_path = self.data[index]
        
        #time_num = time.time()
        
        # Carica le immagini
        input_image = np.load(input_path)
        target_image = np.load(target_path)
        
        #target_image = cv2.cvtColor(target_image, cv2.COLOR_RGB2HSV)
        #target_image = (target_image - target_image.min()) / (target_image.max() - target_image.min())
                        
        # Applica trasformazioni
        
        input_image = TF.to_tensor(input_image).permute(1, 2, 0)
        target_image = TF.to_tensor(target_image)
                        
        in_path = os.path.basename(input_path)
        gt_path = os.path.basename(target_path)
        
        input_image = input_image * self.ratio_eval(in_path, gt_path)
        
        #print(f"Numpy dataset total time: {time.time() - time_num}")
                
        return input_image, target_image

# Transform

In [5]:
class RandomCrop:
    """
    Custom function for random cropping (input,target): the results will have the shape
    [C, ps, ps] for input and [C, ps*2, ps*2] for target
    """
    def __init__(self, patch_size):
        self.patch_size = patch_size

    def __call__(self, input_image, target_image):
        
        time_cropStart = time.time()

        h = input_image.shape[1]
        w = input_image.shape[2]
        #print(f'Dimensioni immagini input: {h}x{w}')
        
        # Calcola le coordinate superiori sinistre del crop casuale
        top = torch.randint(0, h - self.patch_size + 1, (1,)).item()
        left = torch.randint(0, w - self.patch_size + 1, (1,)).item()
        
        # Applica il crop alle immagini
        input_image = np.minimum(input_image[:,top:top+self.patch_size,left:left+self.patch_size], 1.0)
        target_image = target_image[:,top*2:top*2+self.patch_size*2,left*2:left*2+self.patch_size*2].clamp(0,1)
                        
        print(f"Crop time: {time.time() - time_cropStart}")
                
        return input_image, target_image
        


def pack_raw(raw):
    """
    Custom function to transform the raw format into the RGBG one
    """
    
    time_packRawStart = time.time()
    
    im = raw.raw_image_visible.astype(np.float32)
    im = np.maximum(im - 512, 0) / (16383 - 512)  # sottrae il livello di nero e normalizza tra 0 e 1

    im = np.expand_dims(im, axis=2)
    img_shape = im.shape
    H = img_shape[0]
    W = img_shape[1]

    out = np.concatenate((im[0:H:2, 0:W:2, :],
                          im[0:H:2, 1:W:2, :],
                          im[1:H:2, 1:W:2, :],
                          im[1:H:2, 0:W:2, :]), axis=2) #serve per creare l'immagine a 4 canali
    
    print(f"Time pack raw: {time.time() - time_packRawStart}")
    
    return out



class RawTargetToImage(transforms.ToTensor):
    """
    Class that ovverrides transforms.ToTensor to produce a matrix [C, H, W] from a .arw file
    """
    def __call__(self, pic):
        if isinstance(pic, rawpy._rawpy.RawPy):
            return self.raw_to_tensor(pic)
        return super().__call__(pic)

    def raw_to_tensor(self, raw):
        
        time_postprocessStart = time.time()
        
        raw_array = raw.postprocess(use_camera_wb=True, half_size=False, no_auto_bright=True, output_bps=16)
        raw_array = np.float32(raw_array / 65535.0)
        
        print(f"Time postprocess: {time.time() - time_postprocessStart}")
        
        time_cc = time.time()
        
        hsi_image = cv2.cvtColor(raw_array, cv2.COLOR_RGB2HSV)
        
        print(f"Time cc: {time.time() - time_cc}")
        
        # Conversione array numpy in un tensore PyTorch
        tensor = super().__call__(hsi_image)
        
        print(tensor.shape)
        return tensor
    

    
class RawToModel:
    """
    Class that preprocess input and target: from .arw to matrices
    """
    def __init__(self):
        self.raw_to_tensor = RawTargetToImage()

    def __call__(self, input_image, target_image):
        
        input_image_new = np.transpose(pack_raw(input_image), (2, 0, 1))
        target_image_new = self.raw_to_tensor(target_image)
        return input_image_new, target_image_new
    
    

class MultiImageCompose(Compose):
    """
    Estensione di Compose che accetta trasformazioni per più immagini.
    """
    def __call__(self, *args):
        """
        Esegue le trasformazioni su ogni input in parallelo.
        """
        for transform in self.transforms:
            args = transform(*args)
        return args

## Dataset instantiation

In [6]:
### RAWPY ###

patch_size = 512
train_transform = MultiImageCompose([RawToModel(), RandomCrop(patch_size)])
val_transform = RawToModel()

train_path = '/kaggle/input/sid-sony/Sony_train_list.txt'
val_path = '/kaggle/input/sid-sony/Sony_val_list.txt'

train_dataset = CustomDataset(file_path=train_path, transform=train_transform)
val_dataset = CustomDataset(file_path=val_path, transform=val_transform)

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/sid-sony/Sony_train_list.txt'

In [None]:
### Prova tempi di elaborazione pipeline RAWPY

image, gt = train_dataset.__getitem__(0)

In [None]:
### NUMPY ###

train_np_path = '/kaggle/input/sony-train-list/Sony_train_newlist.txt'
val_np_path = '/kaggle/input/sony-valid-list/Sony_val_newlist.txt'

train_dataset = NumpyDataset(file_path=train_np_path, transform=None, ops=0)
val_dataset = NumpyDataset(file_path=val_np_path, transform=None, ops=1)

In [None]:
### Prova tempi di elaborazione pipeline NUMPY

image, gt = train_dataset.__getitem__(10)

display_image(gt, False)

# Utility

In [None]:
import matplotlib.pyplot as plt

def display_image(tensor_image, hsi=False):
    # Assume che tensor_image sia un tensore PyTorch con dimensioni [batch_index, Channel, h, w]
    
    print(f"image shape: {tensor_image.shape}")
    image = tensor_image.squeeze().permute(1, 2, 0).cpu().detach().numpy()
    image_to_show = image
    
    print(f"UTILITY - image shape: {image_to_show.shape}, image min: {np.min(image_to_show)}, image max: {np.max(image_to_show)}")
    
    if hsi is True:
        image_to_show = cv2.cvtColor(image*360, cv2.COLOR_HSV2RGB)
        
    plt.imshow(image_to_show)
    plt.axis('off')
    plt.show()

# Metric: SSIM Index

In [None]:
# REFERENCE: https://github.com/aserdega/ssim-pytorch/blob/master/ssim.py

def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size//2)**2/float(2*sigma**2)) for x in range(window_size)])
    return gauss/gauss.sum()

def create_window(window_size, channel):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
    return window

def _ssim(img1, img2, window, window_size, channel, size_average = True):
    mu1 = F.conv2d(img1, window, padding = window_size//2, groups = channel)
    mu2 = F.conv2d(img2, window, padding = window_size//2, groups = channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1*mu2

    sigma1_sq = F.conv2d(img1*img1, window, padding = window_size//2, groups = channel) - mu1_sq
    sigma2_sq = F.conv2d(img2*img2, window, padding = window_size//2, groups = channel) - mu2_sq
    sigma12 = F.conv2d(img1*img2, window, padding = window_size//2, groups = channel) - mu1_mu2

    C1 = 0.01**2
    C2 = 0.03**2

    ssim_map = ((2*mu1_mu2 + C1)*(2*sigma12 + C2))/((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2))

    if size_average:
        return ssim_map.mean()
    else:
        return ssim_map.mean(1).mean(1).mean(1)

class SSIM(torch.nn.Module):
    def __init__(self, window_size = 11, size_average = True):
        """window_size default is 11, size_average is True"""
        super(SSIM, self).__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = create_window(window_size, self.channel)

    def forward(self, img1, img2):
        (_, channel, _, _) = img1.size()

        if channel == self.channel and self.window.data.type() == img1.data.type():
            window = self.window
        else:
            window = create_window(self.window_size, channel)

            if img1.is_cuda:
                window = window.cuda(img1.get_device())
            window = window.type_as(img1)

            self.window = window
            self.channel = channel

        ssim_value = _ssim(img1, img2, window, self.window_size, channel, self.size_average)
        ssim_loss = 1 - ssim_value  # Loss is 1 - SSIM
        
        return ssim_loss

def ssim(img1, img2, window_size = 11, size_average = True):
    (_, channel, _, _) = img1.size()
    window = create_window(window_size, channel)

    if img1.is_cuda:
        window = window.cuda(img1.get_device())
    window = window.type_as(img1)

    return _ssim(img1, img2, window, window_size, channel, size_average)

# Model

In [None]:
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.conv7 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.conv8 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.conv9 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        self.conv10 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)
        
        self.upconv6 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.upconv7 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.upconv8 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.upconv9 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)

        self.conv11 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1)
        self.conv12 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.conv13 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.conv14 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.conv15 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.conv16 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.conv17 = nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1)
        self.conv18 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        
        self.conv19 = nn.Conv2d(32, 12, kernel_size=1, stride=1)
        self.pixel_shuffle = nn.PixelShuffle(2)        
        self.lrelu = nn.LeakyReLU(negative_slope=0.2)
        self.apply(self.initialize_weights)
        
        
    def initialize_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            # Inizializzazione Xavier per i layer convoluzionali
            nn.init.xavier_normal_(m.weight, gain=nn.init.calculate_gain('leaky_relu'))
            
            # Inizializzazione dei bias a zero
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

        elif isinstance(m, nn.BatchNorm2d):
            # Inizializzazione dei pesi e dei bias per i layer di batch normalization
            nn.init.constant_(m.weight, 1)
            nn.init.constant_(m.bias, 0)

            
    def forward(self, x):
        
        conv1 = self.lrelu(self.conv1(x))
        #print('conv1:', conv1.shape)
        
        conv2 = self.lrelu(self.conv2(conv1))
        #print('conv2:', conv2.shape)
        
        pool1 = torch.nn.functional.max_pool2d(conv2, kernel_size=2, stride=2, padding=0)
        #print('pool1:', pool1.shape)
        
        conv3 = self.lrelu(self.conv3(pool1))
        #print('conv3:', conv3.shape)
        
        conv4 = self.lrelu(self.conv4(conv3))
        #print('conv4:', conv4.shape)
        
        pool2 = torch.nn.functional.max_pool2d(conv4, kernel_size=2, stride=2, padding=0)
        #print('pool2:', pool2.shape)

        conv5 = self.lrelu(self.conv5(pool2))
        #print('conv5:', conv5.shape)

        conv6 = self.lrelu(self.conv6(conv5))
        #print('conv6:', conv6.shape)
        
        pool3 = torch.nn.functional.max_pool2d(conv6, kernel_size=2, stride=2, padding=0)
        #print('pool3:', pool3.shape)

        conv7 = self.lrelu(self.conv7(pool3))
        #print('conv7:', conv7.shape)
        
        conv8 = self.lrelu(self.conv8(conv7))
        #print('conv8:', conv8.shape)
        
        pool4 = torch.nn.functional.max_pool2d(conv8, kernel_size=2, stride=2, padding=0)
        #print('pool4:', pool4.shape)

        conv9 = self.lrelu(self.conv9(pool4))
        #print('conv9:', conv9.shape)
        
        conv10 = self.lrelu(self.conv10(conv9))
        #print('conv10:', conv10.shape)

        up6 = self.lrelu(self.upconv6(conv10))
        #print("up6:", up6.shape)
        
        concat6 = torch.cat((up6, conv8), dim=1)
        #print("concat6:", concat6.shape)
        
        conv11 = self.lrelu(self.conv11(concat6))
        #print('conv11:', conv11.shape)
        
        conv12 = self.lrelu(self.conv12(conv11))
        #print('conv12:', conv12.shape)

        up7 = self.upconv7(conv12)
        #print('up7:', up7.shape)
        
        concat7 = torch.cat((up7, conv6), dim=1)
        #print('concat7:', concat7.shape)
        
        conv13 = self.lrelu(self.conv13(concat7))
        #print('conv13:', conv13.shape)
        
        conv14 = self.lrelu(self.conv14(conv13))
        #print('conv14:', conv14.shape)

        up8 = self.upconv8(conv14)
        #print('up8:', up8.shape)
        
        concat8 = torch.cat((up8, conv4), dim=1)
        #print('concat8:', concat8.shape)
        
        conv15 = self.lrelu(self.conv15(concat8))
        #print('conv15:', conv15.shape)
        
        conv16 = self.lrelu(self.conv16(conv15))
        #print('conv16:', conv16.shape)
        
        up9 = self.upconv9(conv16)
        #print('up9:', up9.shape)
        
        concat9 = torch.cat((up9, conv2), dim=1)
        #print('concat9:', concat9.shape)
        
        conv17 = self.lrelu(self.conv17(concat9))
        #print('conv17:', conv17.shape)
        
        conv18 = self.lrelu(self.conv18(conv17))
        #print('conv18:', conv18.shape)

        conv19 = self.conv19(conv18)
        #print('conv19:', conv19.shape)
        
        out = self.pixel_shuffle(conv19)
        return out

### Model: batch normalization

In [None]:
class BatchNetwork(nn.Module):
    def __init__(self):
        super(BatchNetwork, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.conv5 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.bn6 = nn.BatchNorm2d(128)
        self.conv7 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.bn7 = nn.BatchNorm2d(256)
        self.conv8 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.bn8 = nn.BatchNorm2d(256)
        self.conv9 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        self.bn9 = nn.BatchNorm2d(512)
        self.conv10 = nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1)
        self.bn10 = nn.BatchNorm2d(512)
        
        self.upconv6 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.bnup6 = nn.BatchNorm2d(256)
        self.upconv7 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.bnup7 = nn.BatchNorm2d(256)
        self.upconv8 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.bnup8 = nn.BatchNorm2d(128)
        self.upconv9 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.bnup9 = nn.BatchNorm2d(64)

        self.conv11 = nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1)
        self.bn11 = nn.BatchNorm2d(256)
        self.conv12 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.bn12 = nn.BatchNorm2d(256)
        self.conv13 = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1)
        self.bn13 = nn.BatchNorm2d(128)
        self.conv14 = nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1)
        self.bn14 = nn.BatchNorm2d(128)
        self.conv15 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.bn15 = nn.BatchNorm2d(64)
        self.conv16 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.bn16 = nn.BatchNorm2d(64)
        self.conv17 = nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1)
        self.bn17 = nn.BatchNorm2d(32)
        self.conv18 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        self.bn18 = nn.BatchNorm2d(32)
        self.conv19 = nn.Conv2d(32, 12, kernel_size=1, stride=1)
        self.bn19 = nn.BatchNorm2d(32)
        
        self.pixel_shuffle = nn.PixelShuffle(2)        
        self.lrelu = nn.LeakyReLU(negative_slope=0.2)
        self.apply(self.initialize_weights)
        
        
    def initialize_weights(self, m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
            # Inizializzazione Xavier per i layer convoluzionali
            nn.init.xavier_normal_(m.weight, gain=nn.init.calculate_gain('leaky_relu'))
            
            # Inizializzazione dei bias a zero
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

        elif isinstance(m, nn.BatchNorm2d):
            # Inizializzazione dei pesi e dei bias per i layer di batch normalization
            nn.init.constant_(m.weight, 1)
            nn.init.constant_(m.bias, 0)
            

    def forward(self, x):
        
        conv1 = self.lrelu(self.bn1(self.conv1(x)))
        #print('conv1:', conv1.shape)
        
        conv2 = self.lrelu(self.bn2(self.conv2(conv1)))
        #print('conv2:', conv2.shape)
        
        pool1 = torch.nn.functional.max_pool2d(conv2, kernel_size=2, stride=2, padding=0)
        #print('pool1:', pool1.shape)
        
        conv3 = self.lrelu(self.bn3(self.conv3(pool1)))
        #print('conv3:', conv3.shape)
        
        conv4 = self.lrelu(self.bn4(self.conv4(conv3)))
        #print('conv4:', conv4.shape)
        
        pool2 = torch.nn.functional.max_pool2d(conv4, kernel_size=2, stride=2, padding=0)
        #print('pool2:', pool2.shape)

        conv5 = self.lrelu(self.bn5(self.conv5(pool2)))
        #print('conv5:', conv5.shape)

        conv6 = self.lrelu(self.bn6(self.conv6(conv5)))
        #print('conv6:', conv6.shape)
        
        pool3 = torch.nn.functional.max_pool2d(conv6, kernel_size=2, stride=2, padding=0)
        #print('pool3:', pool3.shape)

        conv7 = self.lrelu(self.bn7(self.conv7(pool3)))
        #print('conv7:', conv7.shape)
        
        conv8 = self.lrelu(self.bn8(self.conv8(conv7)))
        #print('conv8:', conv8.shape)
        
        pool4 = torch.nn.functional.max_pool2d(conv8, kernel_size=2, stride=2, padding=0)
        #print('pool4:', pool4.shape)

        conv9 = self.lrelu(self.bn9(self.conv9(pool4)))
        #print('conv9:', conv9.shape)
        
        conv10 = self.lrelu(self.bn10(self.conv10(conv9)))
        #print('conv10:', conv10.shape)

        up6 = self.lrelu(self.bnup6(self.upconv6(conv10)))
        #print("up6:", up6.shape)
        
        concat6 = torch.cat((up6, conv8), dim=1)
        #print("concat6:", concat6.shape)
        
        conv11 = self.lrelu(self.bn11(self.conv11(concat6)))
        #print('conv11:', conv11.shape)
        
        conv12 = self.lrelu(self.bn12(self.conv12(conv11)))
        #print('conv12:', conv12.shape)

        up7 = self.upconv7(self.bnup7(conv12))
        #print('up7:', up7.shape)
        
        concat7 = torch.cat((up7, conv6), dim=1)
        #print('concat7:', concat7.shape)
        
        conv13 = self.lrelu(self.bn13(self.conv13(concat7)))
        #print('conv13:', conv13.shape)
        
        conv14 = self.lrelu(self.bn14(self.conv14(conv13)))
        #print('conv14:', conv14.shape)

        up8 = self.upconv8(self.bnup8(conv14))
        #print('up8:', up8.shape)
        
        concat8 = torch.cat((up8, conv4), dim=1)
        #print('concat8:', concat8.shape)
        
        conv15 = self.lrelu(self.bn15(self.conv15(concat8)))
        #print('conv15:', conv15.shape)
        
        conv16 = self.lrelu(self.bn16(self.conv16(conv15)))
        #print('conv16:', conv16.shape)
        
        up9 = self.upconv9(self.bnup9(conv16))
        #print('up9:', up9.shape)
        
        concat9 = torch.cat((up9, conv2), dim=1)
        #print('concat9:', concat9.shape)
        
        conv17 = self.lrelu(self.bn17(self.conv17(concat9)))
        #print('conv17:', conv17.shape)
        
        conv18 = self.lrelu(self.bn18(self.conv18(conv17)))
        #print('conv18:', conv18.shape)

        conv19 = self.conv19(self.bn19(conv18))
        #print('conv19:', conv19.shape)
        
        out = self.pixel_shuffle(conv19)
        return out

## Funzione di fit

In [None]:
def fit(epochs, model, train_loader, val_loader, criterion1, criterion2, alpha, optimizer, index, train_dim, val_dim, scheduler):
    
    model.to(device, non_blocking=True)
    
    if train_dim == 0:
        train_dim = len(train_loader)
        
    if val_dim == 0:
        val_dim = len(val_loader)
    
    if torch.cuda.device_count() > 1:
        print("Using", torch.cuda.device_count(), "GPUs!")
        model = DataParallel(model)
    
    torch.cuda.empty_cache()
    history = {}
    min_loss = np.inf
    
    fit_time = time.time()

    for epoch in range(epochs):
        
        val_iter_loss = []
        t_iter_loss = []

        since = time.time()
        running_loss = 0

        #training

        model.train()
        for batch_idx, (input_batch, target_batch) in enumerate(tqdm(train_loader)):

            input_images = input_batch.to(device, non_blocking=True)
            target_images = target_batch.to(device, non_blocking=True)
            
            output = model(input_images).clamp(0,1)
                        
            loss = alpha*criterion1(output, target_images) + (1-alpha)*(criterion2(output, target_images)/2)                
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            running_loss += loss.item()
            
            del output, input_images, target_images
            
            if batch_idx % 15 == 0 and scheduler is not None:
                scheduler.step()  
                current_lr = optimizer.param_groups[0]['lr']
                print(f'Batch index:{batch_idx}, Learning Rate: {current_lr}')
                        
            t_iter_loss.append(loss.item())  
            
            if batch_idx >= train_dim:
                break
                
        if epoch == epochs // 2:
            for param_group in optimizer.param_groups:
                param_group['lr'] /= 10.0
            print(f'Learning rate reduced to {optimizer.param_groups[0]["lr"]} at epoch {epoch+1}')

        model.eval()
        test_loss = 0
        
        with torch.no_grad():
            for batch_idx, (input_batch, target_batch) in enumerate(tqdm(val_loader)):

                input_images = input_batch.to(device, non_blocking=True)
                target_images = target_batch.to(device, non_blocking=True)
                
                output = model(input_images).clamp(0,1)
                
                loss = alpha*criterion1(output, target_images) + (1-alpha)*(criterion2(output, target_images)/2)                
                
                test_loss += loss.item()
                
                if batch_idx == val_dim-1:
                    output_np = output.detach().cpu().numpy()
                    output_np = np.transpose(np.squeeze(output_np), (1, 2, 0))
                    plt.imshow(output_np)
                    plt.axis('off')
                    plt.show()
                    del output_np
                                
                del output, input_images, target_images
                
                val_iter_loss.append(loss.item())
                   
                if batch_idx >= val_dim:
                    break

        epoch_dict = {'train_loss': running_loss/(train_dim), 'val_loss': test_loss/val_dim,
                     'train_iter_loss': t_iter_loss, 'val_iter_loss': val_iter_loss}
        
        history[epoch] = epoch_dict
        
        """
        train_losses.append(running_loss/(train_dim+1))
        test_losses.append(test_loss/(val_dim+1))

        """
        
        if min_loss > (test_loss/val_dim):
                print('Loss Decreasing {:.5f} >> {:.5f} '.format(min_loss, (test_loss/5)))
                min_loss = (test_loss/5)

                print('saving model...')
                torch.save(model.state_dict(), 'Unet-SID.pth')
        
        
        print("Epoch:{}/{}..".format(epoch+1, epochs),
                  "Train Loss: {:.5f}..".format(running_loss/(train_dim)),
                  "Val Loss: {:.5f}..".format(test_loss/(val_dim)),
                  "Time: {:.5f}s".format(time.time()-since))
                  
    #history = {'train_loss' : train_losses, 'val_loss': test_losses, 'alpha' : alpha}
    print('Total time: {:.5f} m' .format((time.time()- fit_time)/60))
    
    return history


In [None]:
from collections import OrderedDict

model = Network()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
models = []
weights_folder_path = "/kaggle/input/4-epoch"  

weight_files = [file for file in os.listdir(weights_folder_path) if file.endswith('.pth')]

for weight_file in weight_files:
    model = Network()
    try:
        weight_path = os.path.join(weights_folder_path, weight_file)
        state_dict = torch.load(weight_path)

        # Create a new OrderedDict that does not contain `module.`
        new_state_dict = OrderedDict()
        for k, v in state_dict.items():
            name = k[7:] # remove `module.`
            new_state_dict[name] = v

        # Load parameters
        model.load_state_dict(new_state_dict)
        print(f"Model loaded successfully from {weight_path}")

    except FileNotFoundError:
        print(f"INFO: The file at '{weight_file}' does not exist. Loading an empty default model.")

    models.append(model)

## Babysitting: definizione del learning rate

In [None]:
epochs = 5
train_batch_size = 9
val_batch_size = 1

train_shuffle = True
val_shuffle = False

train_dim = 13
val_dim = 3

criterion1 = nn.L1Loss().to(device, non_blocking=True)
criterion2 = SSIM(window_size=11, size_average=True).to(device, non_blocking=True)

history_tot = []

max_count = 50
for count in range(max_count):
    model = Network()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    lr = 10**np.random.uniform(-5, -3)
    alpha = np.random.uniform(0, 1)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    print(f"count: {count}\tlr: {lr}, alpha: {alpha}")

    train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=train_shuffle, num_workers=3, prefetch_factor=3, persistent_workers=True)
    val_dataloader = DataLoader(val_dataset, batch_size=val_batch_size, shuffle=val_shuffle, num_workers=3, prefetch_factor=3, persistent_workers=True)

    history = fit(epochs, model, train_dataloader, val_dataloader, criterion1, criterion2, alpha, optimizer, count, train_dim, val_dim)
    history_master = {'lr': lr, 'history': history, 'alpha': alpha}
    
    history_tot.append(history_master)

import pickle

with open('history_tot.pkl', 'wb') as file:
    pickle.dump(history_tot, file)

### Plot dei risultati

In [None]:
import matplotlib.pyplot as plt

valori_per_grafico = 10
sottoliste = [history_tot[i:i+valori_per_grafico] for i in range(0, len(history_tot), valori_per_grafico)]

for indice, sottolista in enumerate(sottoliste, start=1):
    plt.figure(figsize=(10, 6))
    for data_dict in sottolista:
        alpha = data_dict['history']['alpha']
        lr = data_dict['lr']
        val_loss_values = data_dict['history']['val_loss']
        
        plt.plot(range(1, len(val_loss_values) + 1), val_loss_values, label = f'Lr: {lr:.5f}, a: {alpha:.2f}')

    # Impostazioni del grafico
    plt.xlabel('Epoch')
    plt.ylabel('Validation Loss')
    plt.title(f'Andamento Validation Loss (Grafico {indice})')
    plt.legend()
    
    file_name = f'grafico_{indice}.jpg'
    plt.savefig(file_name)
    plt.close() 
    
plt.show()

## Plot mappe di attivazione

In [None]:
for name, param in model.state_dict().items():
    print(f"Layer: {name}, Shape: {param.shape}")
    print(param)

# Learning Rate Scheduler: Cosine Annealing Warm Restart

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts

class CustomCosineAnnealingWarmRestarts(CosineAnnealingWarmRestarts):
    def __init__(self, optimizer, T_0, T_mult=1, eta_min=0, save_model_callback=None):
        self.save_model_callback = save_model_callback
        self.counter = 0
        super(CustomCosineAnnealingWarmRestarts, self).__init__(optimizer, T_0, T_mult, eta_min)
        

    def step(self, epoch=None):
        super(CustomCosineAnnealingWarmRestarts, self).step(epoch)

        if self.T_cur == 0 and self.save_model_callback is not None:
            self.save_model_callback(self.last_epoch, self.counter)
            self.counter += 1


## Train

In [None]:
train_batch_size = 9
val_batch_size = 1
train_shuffle = True
val_shuffle = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Network()
criterion1 = nn.L1Loss().to(device, non_blocking=True)
criterion2 = SSIM(window_size=11, size_average=True).to(device, non_blocking=True)
lr = 0.0002
alpha = 0.60
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def save_model(epoch, counter):
    print(f"Saving model state at the end of epoch {epoch}")
    torch.save(model.state_dict(), f'{counter}_SID.pth')

scheduler = CustomCosineAnnealingWarmRestarts(optimizer, T_0=222, T_mult=2, eta_min=1e-6, save_model_callback=save_model)

train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=train_shuffle, num_workers=3, prefetch_factor=3, persistent_workers=True)
val_dataloader = DataLoader(val_dataset, batch_size=val_batch_size, shuffle=val_shuffle, num_workers=3, prefetch_factor=3, persistent_workers=True)

epochs = 11
count = 0  
history = fit(epochs, models[0], train_dataloader, val_dataloader, criterion1, criterion2, alpha, optimizer, count, 0, 0, None)

In [None]:
import json

with open('dizionario.txt', 'w') as file:
    json.dump(history, file, indent=4)
    
torch.save(model.state_dict(), 'Unet-SID.pth')

# Prediction on Test Data

### Model loading

In [None]:
test_np_path = '/kaggle/input/sony-test-list/Sony_test_newlist.txt'
test_dataset = NumpyDataset(file_path=test_np_path, transform=None, ops=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

test_batch_size = 1
test_shuffle = True
score_criterion = SSIM(window_size=11, size_average=True).to(device, non_blocking=True)

models = []
weights_folder_path = "/kaggle/input/sid-modelensemble/Models"  

weight_files = [file for file in os.listdir(weights_folder_path) if file.endswith('.pth')]

for weight_file in weight_files:
    model = Network()
    try:
        weight_path = os.path.join(weights_folder_path, weight_file)
        model.load_state_dict(torch.load(weight_path))
        print(f"Model loaded successfully from {weight_path}")

    except FileNotFoundError:
        print(f"INFO: The file at '{weight_file}' does not exist. Loading an empty default model.")

    models.append(model)
    
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=test_shuffle, num_workers=3, prefetch_factor=3, persistent_workers=True)

## Model ensembling test

#### SSIM modify

In [None]:
# REFERENCE: https://github.com/aserdega/ssim-pytorch/blob/master/ssim.py

def gaussian(window_size, sigma):
    gauss = torch.Tensor([exp(-(x - window_size//2)**2/float(2*sigma**2)) for x in range(window_size)])
    return gauss/gauss.sum()

def create_window(window_size, channel):
    _1D_window = gaussian(window_size, 1.5).unsqueeze(1)
    _2D_window = _1D_window.mm(_1D_window.t()).float().unsqueeze(0).unsqueeze(0)
    window = _2D_window.expand(channel, 1, window_size, window_size).contiguous()
    return window

def _ssim(img1, img2, window, window_size, channel, size_average = True):
    mu1 = F.conv2d(img1, window, padding = window_size//2, groups = channel)
    mu2 = F.conv2d(img2, window, padding = window_size//2, groups = channel)

    mu1_sq = mu1.pow(2)
    mu2_sq = mu2.pow(2)
    mu1_mu2 = mu1*mu2

    sigma1_sq = F.conv2d(img1*img1, window, padding = window_size//2, groups = channel) - mu1_sq
    sigma2_sq = F.conv2d(img2*img2, window, padding = window_size//2, groups = channel) - mu2_sq
    sigma12 = F.conv2d(img1*img2, window, padding = window_size//2, groups = channel) - mu1_mu2

    C1 = 0.01**2
    C2 = 0.03**2

    ssim_map = ((2*mu1_mu2 + C1)*(2*sigma12 + C2))/((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2))
    return ssim_map
    

class SSIM_mod(torch.nn.Module):
    def __init__(self, window_size = 11, size_average = True):
        """window_size default is 11, size_average is True"""
        super(SSIM, self).__init__()
        self.window_size = window_size
        self.size_average = size_average
        self.channel = 1
        self.window = create_window(window_size, self.channel)

    def forward(self, img1, img2):
        (_, channel, _, _) = img1.size()

        if channel == self.channel and self.window.data.type() == img1.data.type():
            window = self.window
        else:
            window = create_window(self.window_size, channel)

            if img1.is_cuda:
                window = window.cuda(img1.get_device())
            window = window.type_as(img1)

            self.window = window
            self.channel = channel

        ssim_map = _ssim(img1, img2, window, self.window_size, channel, self.size_average)
        ssim_loss = 1 - ssim_map.mean()

        return ssim_loss, ssim_map

### Prediction strategies

In [None]:
from collections.abc import Iterable

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

AVERAGE = 0
TOP_GSCORE = 1
LOC_SCORE = 2

def calculate_psnr(prediction, target):
    mse = torch.mean((prediction - target) ** 2)
    max_pixel_value = 1.0  # Assuming the images are normalized to the range [0, 1]
    psnr = 20 * torch.log10(max_pixel_value / torch.sqrt(mse))
    return psnr

def average(predictions, score_criterion, target_images):
    output = sum(predictions) / len(predictions)
    loss = score_criterion(output, target_images) / 2
    return output, loss

def top_gscore(predictions, score_criterion, losses):
    min_loss_index = min(range(len(losses)), key=losses.__getitem__)
    return predictions[min_loss_index], losses[min_loss_index]

def test(models, test_loader, score_criterion, test_dim, show_img, strategy):
    models = [models] if not isinstance(models, Iterable) else models

    for model in models:
        model.to(device, non_blocking=True)
        model.eval()

    torch.cuda.empty_cache()
    test_time = time.time()

    since = time.time()
    running_score = 0
    running_psnr = 0

    with torch.no_grad():
        for batch_idx, (input_batch, target_batch) in enumerate(tqdm(test_loader)):

            input_images = input_batch.to(device, non_blocking=True)
            target_images = target_batch.to(device, non_blocking=True)

            predictions = []
            losses = []

            for model in models:
                output = model(input_images)
                predictions.append(output)
                losses.append(score_criterion(output, target_images) / 2)

            if strategy == AVERAGE:
                ensemble_output, loss = average(predictions, score_criterion, target_images)

            elif strategy == TOP_GSCORE:
                ensemble_output, loss = top_gscore(predictions, score_criterion, losses)

            running_score += 1 - loss.item()

            psnr_value = calculate_psnr(ensemble_output, target_images)
            running_psnr += psnr_value.item()

            if show_img is True:
                ensemble_output_np = ensemble_output.detach().cpu().numpy()
                ensemble_output_np = np.transpose(np.squeeze(ensemble_output_np), (1, 2, 0))

                target_np = target_images.detach().cpu().numpy()
                target_np = np.transpose(np.squeeze(target_np), (1, 2, 0))

                plt.figure(figsize=(10, 5))

                plt.subplot(1, 2, 1)
                plt.imshow(ensemble_output_np)
                plt.axis('off')
                plt.title('Ensemble Model Output')

                plt.subplot(1, 2, 2)
                plt.imshow(target_np)
                plt.axis('off')
                plt.title('Target Image')

                plt.show()

            del ensemble_output, input_images, target_images

            if batch_idx >= test_dim - 1:
                break

        average_score = running_score / test_dim
        average_psnr = running_psnr / test_dim
        print("Test Score: {:.5f}..".format(average_score),
              "Test PSNR: {:.5f}..".format(average_psnr),
              "Time: {:.5f}s".format(time.time() - since))

    print('Total time: {:.5f} m'.format((time.time() - test_time) / 60))
    return average_score, average_psnr

In [None]:
import json

test_dim = len(test_dataloader)
scores = []

for model in models:
    scores.append(test(model, test_dataloader, score_criterion, test_dim, False, TOP_GSCORE))
    
with open('scores.txt', 'w') as file:
    json.dump(scores, file, indent=4)

# Miscellaneous

### Print andamento loss

In [None]:
import json
import matplotlib.pyplot as plt

with open('/kaggle/input/nosched-trial/no_scheduler.txt', 'r') as f:
    data = json.load(f)

train_iter_loss = [loss for epoch in data.values() for loss in epoch['train_iter_loss']]
val_iter_loss = [loss for epoch in data.values() for loss in epoch['val_iter_loss']]

plt.figure(figsize=(10, 6))
iterations = list(range(len(train_iter_loss)))
plt.plot(iterations, train_iter_loss, color='blue', label='Train Loss')
plt.xlabel('Iterazione')
plt.ylabel('Train Loss')
plt.legend()
plt.show()

plt.figure(figsize=(10, 6))
iterations = list(range(len(val_iter_loss)))
plt.plot(iterations, val_iter_loss, color='red', label='Val Loss')
plt.xlabel('Iterazione')
plt.ylabel('Val Loss')
plt.legend()
plt.show()