In [7]:
import PIL
import torch
import wandb
from torch.utils.data import DataLoader
from torch.utils.data import random_split

from Dataset.AerialDataset import AerialDataset
from tasks.SRDiffTrainer import SRDiffTrainer

- Generar imagenes bicubicas
- Construir Dataset
- Construir Dataloader
- SR3
- SRdiff
- SR3+

## 64 -> 256


#### Generación de imagenes bicubicas

import os
import numpy as np
from scipy.interpolate import interpn
from PIL import Image
from tqdm import tqdm

##TODO Mover esto a su propio modulo donde poder usarlo para preparar los datos a posteriori.
## TODO cambiar metodo para q vaya más rapido.


def bicubic_interpolation(image, objective_dim): # De momento lo implementare para 1 sola foto a la vez
    #Calculo nuevas dimensiones
    height, width = image.shape[0] , image.shape[1]
    new_width, new_height = objective_dim[0], objective_dim[1]
    new_image = np.zeros((new_height, new_width, image.shape[2]))
    
     # Generar cuadrículas para las coordenadas X e Y de la imagen original y la interpolada
    x = np.linspace(0, width - 1, width)
    y = np.linspace(0, height - 1, height)
    new_x = np.linspace(0, width - 1, new_width)
    new_y = np.linspace(0, height - 1, new_height)
    new_image = interpn((y, x), image, (new_y[:,None], new_x), method='cubic', bounds_error=False, fill_value=0)
    return new_image

dataset_dir = 'E:\\TFG\\air_dataset\\64'
for image_file in tqdm(os.listdir(dataset_dir)):
    image = Image.open(os.path.join(dataset_dir, image_file))
    image = np.array(image)
    interpolated = bicubic_interpolation(image, (256, 256))
    interpolated_image = Image.fromarray(interpolated.astype(np.uint8))
    interpolated_image.save(f'E:\\TFG\\air_dataset\\sr\\64_256\\{image_file}')
    

## Entrenamiento

In [None]:
lr_size = 64
hr_size = 256
batch_size = 16
dataset_dir = 'E:\\TFG\\air_dataset'

In [None]:
dataset = AerialDataset(dataset_dir, lr_size, hr_size)
train_dataset, val_dataset, test_dataset = random_split(dataset, [0.6, 0.2, 0.2])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### SRDiff

#### Modelo

In [None]:
from models.SRDiff.diffusion import GaussianDiffusion
from models.SRDiff.diffsr_modules import Unet, RRDBNet

hidden_size = 64
dim_mults = [1,2,2,4]
rrdb_num_features = 32
rrdb_num_blocks = 8
timesteps = 100
losstype = 'l1'

denoise_fn = Unet(
    hidden_size, out_dim=3, cond_dim=rrdb_num_features, dim_mults=dim_mults, rrdb_num_block=rrdb_num_blocks, sr_scale=4)

rrdb = RRDBNet(3, 3, rrdb_num_features, rrdb_num_blocks, rrdb_num_features// 2)

model = GaussianDiffusion(
    denoise_fn=denoise_fn,
    rrdb_net=rrdb,
    timesteps= timesteps,
    loss_type=losstype
)

model.to(device)

#### Optimizador y scheduler

In [None]:
lr= 0.0002
decay_steps= 100000
gamma = 0.5

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=decay_steps, gamma=gamma)

In [None]:
max_steps = 5
hyperparams = {
    "max_steps": 100,
    "model": "SRDiff",
    "learning_rate": lr,
    "decay_steps": decay_steps,
    "gamma": gamma,
    "batch_size": batch_size,
    "hidden_size": hidden_size,
    "dim_mults": dim_mults,
    "rrdb_num_features": rrdb_num_features,
    "rrdb_num_blocks": rrdb_num_blocks,
    "loss_type": losstype
}
project_name = "SR model benchmarking"
run_name = "SRDiff Standart Params"
wandb.login()
wandb.init(project=project_name, config=hyperparams, name=run_name)

trainer = SRDiffTrainer(metrics_used=["ssim", "psnr"])
trainer.set_model(model)
trainer.set_optimizer(optimizer)
trainer.set_scheduler(scheduler)
for step in range(max_steps):
    train_loss = trainer.train(train_dataloader)
    torch.cuda.empty_cache()
    
    trainer.save_model("checkpoints\\SRDiff", step)
    with torch.no_grad():
        val_loss = trainer.validate(val_dataloader)
    torch.cuda.empty_cache()
    wandb.log({"train_loss": train_loss, "validation_loss": val_loss})
    
test_metrics = trainer.test(test_dataloader)
wandb.log(test_metrics)
wandb.finish() 

### SR3

In [None]:
from models.SR3.diffusion import GaussianDiffusion
from models.SR3.model import UNet
hyperparams = {
    "steps" : 2000,
    "sample_steps" : 100,
    "lr":1e-5,
    "epochs":5,
    "eta_min":1e-7
}
model = UNet(3, hyperparams["steps"]) #Valores por defecto ya que la tarea base es la misma upsample por 4
SR3_model = GaussianDiffusion(model, hyperparams["steps"], hyperparams["sample_steps"])
SR3_model.to(device)

In [None]:
optimizer = torch.optim.Adam(SR3_model.parameters(), lr=hyperparams["lr"])
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=hyperparams["epochs"]*len(train_dataloader), eta_min=hyperparams["eta_min"])

In [None]:
project_name = "SR model benchmarking"
run_name = "SR3 Standart Params"
wandb.login()
wandb.init(project=project_name, config=hyperparams, name=run_name)

trainer = Trainer(metrics_used=["ssim", "psnr"])
trainer.set_model(SR3_model)
trainer.set_optimizer(optimizer)
trainer.set_scheduler(scheduler)
for step in range(hyperparams["epochs"]):
    train_loss = trainer.train(train_dataloader)
    torch.cuda.empty_cache()
    
    trainer.save_model("checkpoints\\SR3", step)
    with torch.no_grad():
        val_loss = trainer.validate(val_dataloader)
    torch.cuda.empty_cache()
    wandb.log({"train_loss": train_loss, "validation_loss": val_loss})
    
test_metrics = trainer.test(test_dataloader)
wandb.log(test_metrics)
wandb.finish() 