## 1. Download dei dati ##

In [None]:
# Download dataset
!wget https://www.zemris.fer.hr/projects/LicensePlates/english/baza_slika.zip
# Unzip file zip
!unzip -o -j baza_slika.zip "*.jpg" -d dataset

## 2. Preparazione dei dati ##

In [6]:
import os

import matplotlib.pyplot as plt
import numpy as np
import random

from PIL import Image
from skimage.transform import rescale
from skimage.util import random_noise
from torch.utils.data import DataLoader, random_split

In [2]:
# Percorso alla cartella che contiene le immagini
datasetPath = "dataset"

# Lista per salvare le immagini caricate
imgList = []

# Scorrimento di tutti i file nella cartella
i = 0
for fileName in os.listdir(datasetPath):
    if fileName.lower().endswith(".jpg"):
        filePath = os.path.join(datasetPath, fileName)
        img = np.array(Image.open(filePath)).astype(np.float32) / 255.0
        img = rescale(img, (1/1.66, 1/1.66, 1)) # Passaggio da 640x480 a 386x289
        imgList.append(img)

In [3]:
# Creazione rumore
def addNoise(img, intensity=0.2):
    # Scelgo casualmente se applicare un rumore sale e pepe o uno gaussiano
    randInt = random.randint(1,4) 
    h,w,c = img.shape
    y = np.zeros_like(img)
    if randInt == 1:                    # Rumore gaussiano
        d = random.randint(10, 25)/255
        n = d*np.random.randn(h,w,c)
        y = img+n
    elif randInt == 2:                  # Rumore sale e pepe
        sp = random_noise(img, mode="s&p")
        y = img*sp
    elif randInt == 3:                  # Rumore poisson
        poi = random_noise(img, mode="poisson")
        y = img*poi
    else:                               # Rumore speckle
        n = np.random.randn(h,w,c) 
        y = img + img*n*intensity            
    
    if y.max() > 1.0:
        y[y > 1.0] = 1.0
    if y.min() < 0.0:
        y[y < 0.0] = 0.0
    
    return y


In [None]:
from torch.utils.data import Dataset
from torchvision import transforms
import numpy as np

# Modifica del dataset
class ImgListDataset(Dataset):
    def __init__(self, imgList):
        self.imgList = imgList
        self.target_size = (289, 386)           # Altezza e larghezza desiderata
        self.transform = transforms.Compose([
            transforms.ToPILImage(),            # Conversione a immagine
            transforms.Resize(self.target_size),
            transforms.ToTensor()               # Conversione a tensore
        ])

    def __len__(self):
        return len(self.imgList)

    def __getitem__(self, idx):
        img = self.imgList[idx]
        if isinstance(img, np.ndarray):
            img = self.transform(img)
        return img



In [None]:
# Divisione dell'intero dataset in training set, validation set e test set (80%, 10% e 10%)
trainSet, valSet, testSet = random_split(imgList, [0.8, 0.1, 0.1])

# Creazione dei dataset di immagini rumorose
trainSetNoise = []
valSetNoise = []
testSetNoise = []

for el in trainSet:
    imgNoise = addNoise(el)
    trainSetNoise.append(imgNoise)

for el in valSet:
    imgNoise = addNoise(el)
    valSetNoise.append(imgNoise)

for el in testSet:
    imgNoise = addNoise(el)
    testSetNoise.append(imgNoise)

In [None]:
# Definizione variabili
batchSize = 6
numWorkers = 4

# Modifica del dataset per le immagini originali
trainSetDataset = ImgListDataset(trainSet)
valSetDataset = ImgListDataset(valSet)
testSetDataset = ImgListDataset(testSet)

# Dataloader per le immagini originali
trainDataload = DataLoader(trainSetDataset, batch_size=batchSize, num_workers=numWorkers)
valDataload = DataLoader(valSetDataset, batch_size=batchSize, num_workers=numWorkers)
testDataload = DataLoader(testSetDataset, batch_size=batchSize, num_workers=numWorkers)

# Modifica del dataset per le immagini rumorose
trainSetRicDataset = ImgListDataset(trainSetNoise)
valSetRicDataset = ImgListDataset(valSetNoise)
testSetRicDataset = ImgListDataset(testSetNoise)

# Dataloader per le immagini rumorose
trainDataloadNoise = DataLoader(trainSetRicDataset, batch_size=batchSize, num_workers=numWorkers)
valDataloadNoise = DataLoader(valSetRicDataset, batch_size=batchSize, num_workers=numWorkers)
testDataloadNoise = DataLoader(testSetRicDataset, batch_size=batchSize, num_workers=numWorkers)

print("Effettuata suddivisione:")
print(f"- Training-set: {len(trainSet)} campioni.")
print(f"- Validation-set: {len(valSet)} campioni.")
print(f"- Test-set: {len(testSet)} campioni.")
print(f"- Training-set Noise: {len(trainSetNoise)} campioni.")
print(f"- Validation-set Noise: {len(valSetNoise)} campioni.")
print(f"- Test-set Noise: {len(testSetNoise)} campioni.")

Effettuata suddivisione:
- Training-set: 403 campioni.
- Validation-set: 50 campioni.
- Test-set: 50 campioni.
- Training-set Noise: 403 campioni.
- Validation-set Noise: 50 campioni.
- Test-set Noise: 50 campioni.


## 3. Architettura ##

In [None]:
import torch
import torch.nn as nn

class DnCNN(nn.Module):
    def __init__(self, in_channels=3, out_channels=64, kernel_size=3, padding=1):
        super(DnCNN, self).__init__()
        # in_channel = 3 poichè 3 canali
        # out_channel = 64 come scritto nella traccia (num feat)
        # kernel_size = 3 come scritto nella traccia (dim spaziale 3x3)
        layers = []

        # Iter 1: Convolution + ReLU
        layers.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, padding=padding))
        layers.append(nn.ReLU())

        # Iters 2-19: Convolution + BatchNorm + ReLU
        for i in range(18):
            layers.append(nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size, padding=padding))
            layers.append(nn.BatchNorm2d(out_channels))                    # Corrisponde al numero di canali di output del livello precedente
            layers.append(nn.ReLU())

        # Iters 20: Convolution
        layers.append(nn.Conv2d(in_channels=out_channels, out_channels=in_channels, kernel_size=kernel_size, padding=padding))

        # *layers corrisponde a layers[0],layers[1],..., quindi già spalmati come singoli elementi
        self.features = nn.Sequential(*layers)

        self.apply(self.kernel_initializer)

    def forward(self, x):
        return self.features(x)

    # Equivalente a kernel_inizializer="Orthogonal" in pytorch
    def kernel_initializer(self, module):
        if isinstance(module, nn.Conv2d):
            nn.init.orthogonal_(module.weight)


## 4. Addestramento ##

In [10]:
# Importiamo le librerie necessarie
import time

In [None]:
# Abilitazione del dispositivo GPU per il training
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [None]:
# Legenda dataloader:
#   - dlTrain = immagini originali training
#   - dlTrainNoise = immagini rumorose training
#   - dlVal = immagini originali validation
#   - dlValNoise = immagini rumorose validation
def training (dlTrain, dlTrainNoise, dlVal, dlValNoise, numEpoch, model, criterion, optimizer, bestMse, bestPsnr, outputPath):

    # Liste dei risultati definite per la visualizzazione
    MSETrainList = []
    MSEValList = []
    PSNRTrainList = []
    PSNRValList = []

    # Iterazione per ogni epoch
    for epoch in range(numEpoch):

        model.cuda()
        # Conteggio del tempo per misurare la durata di un'epoca
        since = time.time()

        # Inizializzazione delle variabili
        modelMseTrain = 0.0
        totalSize = 0

        # Modello impostato in traning mode
        model.train()

        # Iterazione per ogni batch
        # Devo iterare su i 2 dataloader contemporaneamente, per fare ciò utilizzo zip
        for (inputsTrain, inputsNoise) in zip(dlTrain, dlTrainNoise):

            # Converto gli input in tensori float e li carico nella GPU
            inputsTrain = inputsTrain.type(torch.FloatTensor).cuda()
            inputsNoise = inputsNoise.type(torch.FloatTensor).cuda()

            # Reset dei gradienti, altrimenti i vecchi gradienti sono sommati ai nuovi, piuttosto
            # che essere sovrascritti
            optimizer.zero_grad()
            model.zero_grad()

            # Calcolo dettagli
            yTrain = model(inputsNoise)
            y = inputsNoise - yTrain

            # Calcolo della MSE
            loss = criterion(y, inputsTrain)
            # size(0) restituisce il numero di campioni nel batch, quindi si sta moltiplicando la loss
            # media per il numero di elementi per ottenere la somma totale della loss
            modelMseTrain += loss.item() * inputsTrain.size(0)
            totalSize += inputsTrain.size(0)

            # Calcolando il gradiente del tensore attuale
            loss.backward()

            optimizer.step()        # Aggiornamento dei parametri
            optimizer.zero_grad()   # Azzeramento dei gradienti per il prossimo ciclo di accumulo
            

        # Calcolo della MSE medio e del PSNR medio  dell'epoch
        modelMseEpochTrain = modelMseTrain/totalSize
        modelPsnrEpochTrain = 10 * torch.log10(torch.tensor(1/modelMseEpochTrain))

        # Salvataggio dei pesi per ogni iterazione (disabilitato)
        # torch.save(model.state_dict(), outputPath + "train_weights.pth")

        # Modello impostato in validation mode
        model.eval()

        # Inizializzazione delle variabili
        modelMseVal = 0.0
        totalSizeVal = 0

        with torch.no_grad():               # Disattiva il calcolo dei gradienti
        # Iterazione per ogni bach
        # Devo iterare su i 2 dataloader contemporaneamente, per fare ciò utilizzo zip
            for (inputsVal, inputsValNoise) in zip(dlVal, dlValNoise):

                # Converto gli input in tensori float e li carico nella GPU
                inputsVal = inputsVal.type(torch.FloatTensor).cuda()
                inputsValNoise = inputsValNoise.type(torch.FloatTensor).cuda()

                # Calcolo dettagli
                yVal = model(inputsValNoise)
                y = inputsValNoise - yVal 

                # Calcolo della MSE
                loss = criterion(y, inputsVal)
                # size(0) restituisce il numero di campioni nel batch, quindi si sta moltiplicando la loss
                # media per il numero di elementi per ottenere la somma totale della loss
                modelMseVal += loss.item() * inputsVal.size(0)
                totalSizeVal += inputsVal.size(0)

            # Calcolo della MSE medio e del PSNR medio  dell'epoch
            modelMseEpochVal = modelMseVal/totalSizeVal
            modelPsnrEpochVal = 10 * torch.log10(torch.tensor(1/modelMseEpochVal))
            timeElapsed = time.time()-since

        print('[Epoch %d][Train on %d [MSE: %.4f  PSNR: %.4f]][Val on %d [MSE: %.4f  PSNR: %.4f]][Time: %.0f m %.0f s]'
                %(epoch, totalSize, modelMseEpochTrain, modelPsnrEpochTrain, totalSizeVal, modelMseEpochVal,
                modelPsnrEpochVal, timeElapsed // 60, timeElapsed % 60))

        # Salvaggio dei risultati migliori
        if (modelMseEpochVal < bestMse):
            print("-------Saving best weights-------")
            bestMse = modelMseEpochVal
            bestPsnr = modelPsnrEpochVal
            # Salvataggio dei migliori risultati
            try:
                torch.save(model.cpu().state_dict(), outputPath)
                print("-------Best weights saved-------")
            except Exception as e:
                print("Error:", e)

        # Salvataggio dei risultati per la visualizzazione
        MSETrainList.append(modelMseEpochTrain)
        MSEValList.append(modelMseEpochVal)
        PSNRTrainList.append(modelPsnrEpochTrain)
        PSNRValList.append(modelPsnrEpochVal)

    return bestMse, bestPsnr, MSETrainList, MSEValList, PSNRTrainList, PSNRValList




In [13]:
# Funzione di testing

def testing (dlTest, dlTestNoise, model, criterion, weightPath):

    # Caricamento dei pesi
    model.load_state_dict(torch.load(weightPath, map_location=torch.device("cpu")))
    model.cuda()


    # Conteggio del tempo per misurare la durata di un'epoca
    since = time.time()

    # Inizializzazione delle variabili
    modelMseTest = 0.0
    totalSize = 0
    outputsTest = []


    #model.load_state_dict(torch.load(weightPath , map_location=device))
    model.eval()

    # Iterazione su batch
    with torch.no_grad():               # Disattiva il calcolo dei gradienti
        for (inputsTest, inputsTestNoise) in zip(dlTest, dlTestNoise):


            # Converto gli input in tensori float e li carico nella GPU
            inputsTest = inputsTest.type(torch.FloatTensor).cuda()
            inputsTestNoise = inputsTestNoise.type(torch.FloatTensor).cuda()

            # Calcolo dettagli e ricostruzione immagine
            yTest = model(inputsTestNoise)
            y = inputsTestNoise - yTest
            outputsTest.append(y)

            # Calcolo della MSE
            loss = criterion(y, inputsTest)
            # size(0) restituisce il numero di campioni nel batch, quindi si sta moltiplicando la loss
            # media per il numero di elementi per ottenere la somma totale della loss
            modelMseTest += loss.item() * inputsTest.size(0)
            totalSize += inputsTest.size(0)


        # Calcolo della MSE medio e del PSNR medio  dell'epoch
        modelMseEpochTest = modelMseTest/totalSize
        modelPsnrEpochTest = 10 * torch.log10(torch.tensor(1/modelMseEpochTest))
        timeElapsed = time.time()-since

        print("[Test] [MSE: %.4f  PSNR: %.4f] [Time: %.0f m %.0f s]"
          %(modelMseEpochTest, modelPsnrEpochTest, timeElapsed // 60, timeElapsed % 60))

        return outputsTest

## 5. Valutazione delle prestazioni ##

In [None]:
# Iper-parametri
epochList = [60]
learningRateList = [0.01, 0.001, 0.0001]

In [None]:
# Main
import torch.optim as optim

model = DnCNN().cuda()
criterion = torch.nn.MSELoss(size_average=None, reduce=None, reduction='mean')

bestMSEComb = 9000000000000000000.0              # Miglior MSE tra tutte le combinazioni
bestPSNRComb = -900000000000000000.0             # Miglior PSNR tra tutte le combinazioni

# Definizione dei percorsi per il salvataggio dei pesi
weightPath = "best_weights_generic_denoise.pth"
weightPathComb = "best_weights_generic_denoise_lr.pth"



In [None]:
# Training
for numEpoch in epochList:
    for lr in learningRateList:
        optimizer = optim.Adam(model.parameters(), lr=lr)

        bestMse = 9000000000000000000.0             # Miglior MSE della singola combinazione
        bestPsnr = -900000000000000000.0            # Miglior PSNR della singola combinazione

        bestMSES, bestPSNRS, MSETrainList, MSEValList, PSNRTrainList, PSNRValList = training (trainDataload, trainDataloadNoise, valDataload, valDataloadNoise, numEpoch, model, criterion, optimizer, bestMse, bestPsnr, weightPath)

        # Aggiorno i valori di miglior MSE
        if bestMSES < bestMSEComb:
            bestMSEComb = bestMSES
            bestPSNRComb = bestPSNRS
            # Caricamento dei pesi
            torch.save(model.cpu().state_dict(), weightPathComb)
            print(f"------Best Combination saved [Epoch: {numEpoch} - Learning Rate: {lr}]-------")
            model.cuda()

print(f"Best MSE: {bestMSEComb}")
print(f"Best PSNR: {bestPSNRComb}")

[Epoch 0][Train on 403 [MSE: 0.3734  PSNR: 4.2778]][Val on 50 [MSE: 0.0154  PSNR: 18.1290]][Time: 0 m 52 s]
-------Saving best weights-------
-------Best weights saved-------
[Epoch 1][Train on 403 [MSE: 0.0139  PSNR: 18.5660]][Val on 50 [MSE: 0.0142  PSNR: 18.4654]][Time: 0 m 52 s]
-------Saving best weights-------
-------Best weights saved-------
[Epoch 2][Train on 403 [MSE: 0.0150  PSNR: 18.2367]][Val on 50 [MSE: 0.0131  PSNR: 18.8296]][Time: 0 m 52 s]
-------Saving best weights-------
-------Best weights saved-------
[Epoch 3][Train on 403 [MSE: 0.0144  PSNR: 18.4305]][Val on 50 [MSE: 0.0128  PSNR: 18.9377]][Time: 0 m 52 s]
-------Saving best weights-------
-------Best weights saved-------
[Epoch 4][Train on 403 [MSE: 0.0137  PSNR: 18.6314]][Val on 50 [MSE: 0.0128  PSNR: 18.9361]][Time: 0 m 52 s]
[Epoch 5][Train on 403 [MSE: 0.0134  PSNR: 18.7253]][Val on 50 [MSE: 0.0144  PSNR: 18.4167]][Time: 0 m 52 s]
[Epoch 6][Train on 403 [MSE: 0.0133  PSNR: 18.7711]][Val on 50 [MSE: 0.0134  PS

In [None]:
# Percorsi output
outputInPath = "outputs/GenericDenoiser/testSet/"
outputTruthPath = "outputs/GenericDenoiser/groundTruth/"
outputOutPath = "outputs/GenericDenoiser/results/"
os.makedirs(outputInPath, exist_ok=True)
os.makedirs(outputTruthPath, exist_ok=True)
os.makedirs(outputOutPath, exist_ok=True)

# Testing
outputsTest = testing(testDataload, testDataloadNoise, model, criterion, weightPathComb)

# Iterazione sia su outputsTest che su testDataloadRic
index = 1
testIter = iter(testDataloadNoise)
truthIter = iter(testDataload)

# Salvataggio locale delle immagini 
for batchOut in outputsTest:
    batchIn = next(testIter)                # Ottenimento del batch originale (e label se presente)
    batchTruth = next(truthIter)

    for j in range(batchOut.size(0)):
        # --- Output ---
        imgOut = batchOut[j].detach().cpu().clamp(0, 1)
        imgOut = imgOut.permute(1, 2, 0).numpy()
        pathOut = os.path.join(outputOutPath, f"image_{index}.jpg")
        plt.imsave(pathOut, imgOut)
        # --- Input ---
        imgIn = batchIn[j].detach().cpu().clamp(0, 1)
        imgIn = imgIn.permute(1, 2, 0).numpy()
        pathIn = os.path.join(outputInPath, f"image_{index}.jpg")
        plt.imsave(pathIn, imgIn)
        # --- Ground Truth ---
        imgTruth = batchTruth[j].detach().cpu().clamp(0, 1)
        imgTruth = imgTruth.permute(1, 2, 0).numpy()
        pathTruth = os.path.join(outputTruthPath, f"image_{index}.jpg")
        plt.imsave(pathTruth, imgTruth)
        index += 1

[Test] [MSE: 0.0068  PSNR: 21.6957] [Time: 0 m 3 s]
