# Imports

In [None]:
# ¡¡¡ Conectar con Google Drive !!!

# Se instala e importa import_ipynb para poder acceder a clases/funciones de otros notebooks
!pip install -q import-ipynb

# Se cambia al directorio donde esté el notebook con las clases a importar
%cd "/content/drive/MyDrive/Colab_Notebooks"

#!pip install --upgrade --force-reinstall --no-deps albumentations
!pip install -q -U albumentations

!pip install -q torchmetrics

  Building wheel for import-ipynb (setup.py) ... [?25l[?25hdone
/content/drive/MyDrive/Colab_Notebooks
[K     |████████████████████████████████| 102 kB 37.9 MB/s 
[K     |████████████████████████████████| 37.1 MB 51 kB/s 
[K     |████████████████████████████████| 282 kB 37.3 MB/s 
[?25h

In [None]:
# Manejo y representación de datos
import numpy as np
import pandas as pd
import os 
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm

# Pytorch 
import torch
from torch import Tensor
from torch.autograd import Function
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.tensorboard import SummaryWriter

# Dataset y Dataloader
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset , DataLoader

# Transformaciones
import albumentations as A
from albumentations.pytorch import ToTensorV2
from skimage.transform import resize

# Modelos
import import_ipynb
from models import Generator, Discriminator, FeatureExtractor

# Métricas
from torchmetrics.functional import ssim
from torchmetrics.functional import psnr
from torchmetrics.functional import mean_squared_error as mse



# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

importing Jupyter notebook from models.ipynb


#Rutas y dataframe

In [None]:
img_fol= '/content/drive/MyDrive/Colab_Notebooks/carvana/train/' # Carpeta que contiene las imágenes

# División en entrenamiento y validación
df = pd.DataFrame(os.listdir(img_fol), columns = ['name'])       # Se estructura de forma tabular el contenido de la carpeta 'img_fol'
df = df.iloc[:20]    # Se reduce el dataframe a 2000 imagenes
df_train, df_val = train_test_split(df, test_size=.2, random_state=42, shuffle=True, stratify=None) 

# Tras realizar la división, es necesario reiniciar los índices
df_train = df_train.reset_index()
df_val = df_val.reset_index()

print(f"Total: {len(df)} -- Entrenamiento: {len(df_train)} -- Validación: {len(df_val)}")


Total: 20 -- Entrenamiento: 16 -- Validación: 4


#Dataset y dataloader

In [None]:
class CarDataset(Dataset):
    def __init__(self, df, transforms = None):
        # self.df=pd.read_csv('/content/drive/MyDrive/Colab Notebooks/carvana/train_masks.csv')
        self.df = df 
        self.transforms = transforms
        self.img_fol= '/content/drive/MyDrive/Colab_Notebooks/carvana/train/'
        self.mask_fol = '/content/drive/MyDrive/Colab_Notebooks/carvana/train_masks/' 
       

    def __getitem__(self, idx):
        img_name=self.df['name'][idx]
        img_path=os.path.join(self.img_fol,img_name)    # Ruta completa imagen

        img_HR = cv2.imread(img_path)      # Se carga la imagen
        img_HR= cv2.cvtColor(img_HR, cv2.COLOR_BGR2RGB)
        
        if self.transforms:
            img_HR = self.transforms(image=img_HR)['image']

        # Factor de escala x4 en cada dimensión. Se mantiene el nº de canales
        img_LR = resize(img_HR.numpy(), (3,img_HR.shape[1]//4,img_HR.shape[2]//4), anti_aliasing=True)
        img_LR = torch.from_numpy(img_LR)
        
        # Se realiza una interpolación bicúbica para tener las mismas dimensiones que en la imagen original
        # img_LR = cv2.resize(img_LR.transpose(1,2,0), (img_HR.shape[1],img_HR.shape[2]), interpolation = cv2.INTER_CUBIC).transpose(2,0,1)

        # Se normaliza entre 0 y 1 para plotear  
        img_HR = ((img_HR + img_HR.min().abs())/(img_HR + img_HR.min().abs()).max())
        img_LR = ((img_LR + img_LR.min().abs())/(img_LR + img_LR.min().abs()).max())
        
        return img_HR, img_LR

    def __len__(self):
        return len(self.df)

In [None]:
# Transformaciones
transforms = A.Compose([
            A.Resize(256, 256),
            A.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225]),
            ToTensorV2()
        ])

# Datasets de entrenamiento y evaluación
carDatasetTrain = CarDataset(df=df_train, transforms=transforms)
carDatasetval = CarDataset(df=df_val, transforms=transforms)

# Dataloaders de entrenamiento y evaluación
train_dataloader = DataLoader(carDatasetTrain, batch_size=4,
                        shuffle=True, num_workers=0)
val_dataloader = DataLoader(carDatasetval, batch_size=4,
                        shuffle=False, num_workers=0) # El dataloader de validación es fijo, no debe tener shuffle

In [None]:
img_HR, img_LR = next(iter(train_dataloader))
print(img_HR.shape, img_LR.shape)
bs = img_HR.shape[0]
plt.figure(figsize=(15,15))
print(img_LR.shape)
for i in range(bs):
    plt.subplot(bs,2,(2*i+1))
    plt.title(f"Imagen con transformaciones")
    plt.imshow(img_HR[i].numpy().transpose(1,2,0))
    plt.subplot(bs,2,(2*i+2))
    plt.title(f"Imagen submuestreada")
    plt.imshow(img_LR[i].numpy().transpose(1,2,0))

In [None]:
# Se instancian los modelos y se envían a la gpu si está disponible
generator = Generator().to(device)                # Generador
discriminator = Discriminator().to(device)        # Discriminador
feature_extractor = FeatureExtractor().to(device) # 

# Si hay modelos guardados, se cargan
if os.path.exists('/content/drive/MyDrive/Colab_Notebooks/saved_models/generator.pth'):
    generator.load_state_dict(torch.load('/content/drive/MyDrive/Colab_Notebooks/saved_models/generator.pth')) # Se cargan los pesos del modelo

if os.path.exists('/content/drive/MyDrive/Colab_Notebooks/saved_models/discriminator.pth'):
    discriminator.load_state_dict(torch.load('/content/drive/MyDrive/Colab_Notebooks/saved_models/discriminator.pth')) # Se cargan los pesos del modelo


# Funciones de pérdida
pixel_criterion = torch.nn.MSELoss().to(device)
content_criterion = torch.nn.MSELoss().to(device)
adversarial_criterion = torch.nn.MSELoss().to(device)     # Adversarial loss.

# Optimizers
optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_P = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.9, 0.999))  

cuda = torch.cuda.is_available()
Tensor = torch.cuda.FloatTensor if cuda else torch.Tensor


Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to /root/.cache/torch/hub/checkpoints/vgg19-dcbb9e9d.pth


  0%|          | 0.00/548M [00:00<?, ?B/s]

In [None]:
def train_generator(epoch, epochs, tloader):

    # Se calcula el número de batches de entrenamiento
    batches = len(train_dataloader)

    # Se establece el generador en modo entrenamiento
    generator.train()

    for hr, lr in tloader:
        # Los batches de imágenes se envían a la GPU si está disponible
        lr, hr= lr.to(device), hr.to(device)

        # Se ponen a cero los gradientes del generador
        optimizer_G.zero_grad()

        # Se obtiene la imagen SR
        sr = generator(lr)

        # Calculate the difference between the super-resolution image and the high-resolution image at the pixel level.
        pixel_loss = pixel_criterion(sr, hr)

        # Back propagation
        pixel_loss.backward()

        # Actualización de los pesos del generador
        optimizer_P.step()


def train_adversarial(epoch, epochs, tloader):
    # Se calcula el número de batches de entrenamiento
    batches = len(train_dataloader)

    # Se establece la red adversarial en modo entrenamiento
    discriminator.train()
    generator.train()

    for hr, lr in tloader:

        # Los batches de imágenes se envían a la GPU si está disponible
        lr, hr= lr.to(device), hr.to(device)

        # Cantidad de imágenes que forman un batch
        label_size = lr.size(0)

        # Se crean las etiquetas. Se establecen en 1 para el caso real, y en 0 para el caso falso.
        real_label = Tensor(np.ones((lr.size(0), *discriminator.output_shape)))
        fake_label = Tensor(np.zeros((lr.size(0), *discriminator.output_shape)))

        # Se ponen a cero los gradientes del discriminador
        optimizer_D.zero_grad()

        # Se calcula la pérdida del discriminador en la imagen HR.
        output = discriminator(hr)
        d_loss_hr = adversarial_criterion(output, real_label)
        d_loss_hr.backward()
        d_hr = output.mean().item()

        # Se obtienen las imágenes SR
        sr = generator(lr)

        # Se calcula la pérdida del discriminador en la imagen SR.
        output = discriminator(sr.detach())
        d_loss_sr = adversarial_criterion(output, fake_label)
        d_loss_sr.backward()
        d_sr1 = output.mean().item()

        # Actualización de los pesos del discriminador
        d_loss = d_loss_hr + d_loss_sr
        optimizer_D.step()

        # Se ponen a cero los gradientes del generador
        optimizer_G.zero_grad()

        # Se calcula la pérdida del discriminador en la imagen SR
        output = discriminator(sr)
        
        # Perceptual loss = 0.01 * pixel loss + 1.0 * content loss + 0.001 * adversarial loss.
        pixel_loss = 0.01 * pixel_criterion(sr, hr.detach())

        gen_features = feature_extractor(sr)
        real_features = feature_extractor(hr)
        content_loss = 1.0 * content_criterion(sr, hr.detach())

        adversarial_loss = 0.001 * adversarial_criterion(output, real_label)

        perceptual_loss = pixel_loss + content_loss + adversarial_loss

        # Actualización de los pesos del generador
        perceptual_loss.backward()
        optimizer_G.step()
        d_sr2 = output.mean().item()

        #------------
        # Métricas
        #------------
        
        # Se calcula la PSNR
        psnr_score = psnr(sr,hr)
        epoch_psnr += psnr_score.item()      

        # Se calcula el SSIM
        ssim_score = ssim(sr, hr)
        epoch_ssim += ssim_score.item()   

        # Se calcula el RSME
        rmse_score = mse(sr,hr,squared = True)
        epoch_rmse += rmse_score.item() 

        tloader.set_postfix(loss_G=loss_G.item(),loss_D=loss_D.item(), psnr=psnr_score.item(), ssim=ssim_score.item(), rmse=rmse_score.item())


    print(f'Train PSNR Epoch : {(epoch_psnr/n_train)}')
    print(f'Train SSIM Epoch : {(epoch_ssim/n_train)}')
    print(f'Train RMSE Epoch : {(epoch_rmse/n_train)}')

    # Se devuelve el valor medio de cada métrica
    return [(epoch_psnr/n_train),(epoch_ssim/n_train),(epoch_rmse/n_train)]


def validate(vloader):
    
    vloader.set_description(f'Validation')

    # Se calcula el número de batches de validación
    batches = len(val_dataloader)

    # Se establece el generador en modo evaluación
    generator.eval()

    # Se inicializan a 0 las variables que almacenarán las métricas   
    # obtenidas en cada batch
    epoch_psnr, epoch_ssim, epoch_rmse = 0,0,0

    with torch.no_grad():
        for index, (hr, lr) in enumerate(val_dataloader):

            # Los batches de imágenes se envían a la GPU si está disponible
            hr, lr = hr.to(device), lr.to(device)

            # Se obtienen las imágenes SR
            sr = generator(lr)

            #------------
            # Métricas
            #------------

            # Se calcula la PSNR
            psnr_score = psnr(sr,hr)
            epoch_psnr += psnr_score.item()      

            # Se calcula el SSIM
            ssim_score = ssim(sr, hr)
            epoch_ssim += ssim_score.item()   

            # Se calcula el RSME
            rmse_score = mse(sr,hr,squared = True)
            epoch_rmse += rmse_score.item()  

            if index == 0:
                  plt.figure(figsize=(15,15))
                  plt.suptitle(f"Epoch: {epoch+1}\n"
                              +f"SR:    RMSE: {rmse_score:.2f} -- PSNR: {psnr_score:.2f} -- SSIM: {ssim_score:.2f}\n",
                              fontsize=16, y=0.96)
                  for i in range(bs):
                      plt.subplot(bs,3,(3*i+1))
                      plt.title(f"HR")
                      plt.imshow(hr[i].numpy().transpose(1,2,0))
                      plt.subplot(bs,3,(3*i+2))
                      plt.title(f"LR")
                      plt.imshow(lr[i].numpy().transpose(1,2,0))
                      plt.subplot(bs,3,(3*i+3))
                      plt.title(f"SR")
                      plt.imshow(sr[i].numpy().transpose(1,2,0))

    print(f'Train PSNR Epoch : {(epoch_psnr/n_val)}')
    print(f'Train SSIM Epoch : {(epoch_ssim/n_val)}')
    print(f'Train RMSE Epoch : {(epoch_rmse/n_val)}')   
    
    # Se devuelve el valor medio de cada métrica
    return [(epoch_psnr/batches),(epoch_ssim/batches),(epoch_rmse/batches)]




In [None]:
epochs = 5
bs = 4
# Se inicializan listas vacias en las que se almacenarán lás metricas de
# entrenamiento para cada epoch 
train_psnr, train_ssim, train_rmse = [], [], []

# Se inicializan listas vacias en las que se almacenarán lás metricas de
# validación para cada epoch
val_psnr, val_ssim, val_rmse = [], [], []

# Entrenamiento generador


# Entrenamiento adversarial

for epoch in range(epochs) :

    print(epoch+1,'/',epochs)

    with tqdm(train_dataloader,unit='batch') as tloader : 

        train_generator(epoch,epochs,tloader)
        train_scores = train_adversarial(epoch,epochs,tloader)

        # Se almacenan las métricas en las listas correspondientes
        train_psnr.append(train_scores[0])
        train_ssim.append(train_scores[1])
        train_rmse.append(train_scores[2])

    with tqdm(val_dataloader,unit='batch') as vloader : 
        val_scores = validation(vloader)

        # Se almacenan las métricas en las listas correspondientes
        val_psnr.append(val_scores[0])
        val_ssim.append(val_scores[1])
        val_rmse.append(val_scores[2])

        if epoch == 0:
            best_psnr_value = 0  
        is_best = val_ssim[epoch] > best_psnr_value
        best_psnr_value = max(val_ssim[epoch], best_psnr_value)
        if is_best:
            torch.save(generator.state_dict(), "/content/drive/MyDrive/Colab_Notebooks/saved_models/generator.pth")
            torch.save(discriminator.state_dict(), "/content/drive/MyDrive/Colab_Notebooks/saved_models/discriminator.pth")


# Plots
plt.subplot(1,3,1)
plt.title(f"RMSE")
plt.xlabel('Epoch')
plt.ylabel('RMSE')
plt.plot(train_rmse, label='train')
plt.plot(val_rmse, label='validation')
plt.legend()

plt.subplot(1,3,2)
plt.title(f"PSNR")
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.plot(train_psnr, label='train')
plt.plot(val_psnr, label='validation')
plt.legend()


plt.subplot(1,3,3)
plt.title(f"SSIM")
plt.xlabel('Epoch')
plt.ylabel('SSIM')
plt.plot(train_ssim, label='train')
plt.plot(val_ssim, label='validation')
plt.legend()

1 / 5


  0%|          | 0/4 [00:00<?, ?batch/s]


NameError: ignored

In [None]:
plt.figure(figsize=(25,5))
plt.subplot(1,5,1)
plt.title(f"Loss_G")
plt.xlabel('Epoch')
plt.ylabel('Loss_G')
plt.plot(train_loss_G, label='train')
plt.plot(val_loss_G, label='validation')
plt.legend()

plt.subplot(1,5,2)
plt.title(f"Loss_D")
plt.xlabel('Epoch')
plt.ylabel('Loss_D')
plt.plot(train_loss_D, label='train')
plt.plot(val_loss_G, label='validation')
plt.legend()

plt.subplot(1,5,3)
plt.title(f"RMSE")
plt.xlabel('Epoch')
plt.ylabel('RMSE')
plt.plot(train_rmse, label='train')
plt.plot(val_rmse, label='validation')
plt.legend()

plt.subplot(1,5,4)
plt.title(f"PSNR")
plt.xlabel('Epoch')
plt.ylabel('PSNR')
plt.plot(train_psnr, label='train')
plt.plot(val_psnr, label='validation')
plt.legend()


plt.subplot(1,5,5)
plt.title(f"SSIM")
plt.xlabel('Epoch')
plt.ylabel('SSIM')
plt.plot(train_ssim, label='train')
plt.plot(val_ssim, label='validation')
plt.legend()
