In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

import torch
from torch import nn
from tqdm.auto import tqdm
from torchvision import transforms
from torchvision.utils import make_grid, save_image
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
import torchvision.datasets as dtst
from torch import optim

from PIL import Image

import random
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

random.seed(99)
torch.manual_seed(99)

%matplotlib inline

In [None]:
# Path de la carpeta donde se ubican las imágenes de entrenamiento
data_folder = './images'

# Tamaño que deseamos que tengan las imágenes
image_size = 128
# Tamaño del lote de imágenes
batch_size = 128


dsimgs = dtst.ImageFolder(
    root=data_folder,
    transform=transforms.Compose([
        # Se usa el resize en caso no todas las imágenes de entrada tengan el tamaño de 128px
        transforms.Resize(image_size),
        # CenterCrop busca recortar la imagen en caso sea muy grande al tamaño dado
        transforms.CenterCrop(image_size),
        # ToTensor convierte finalmente la imagen a tensor
        transforms.ToTensor(),
        # Normalize permite la normalización de la información
        # El problema encontrado es que necesitamos hallar la desviación estandar
        # media de toda la información para realizar una correcta normalización
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ]))

dt_loader = DataLoader(dsimgs,
                       batch_size=batch_size,
                       shuffle=True,
                       num_workers=1,
                       droplast=True)

# Forzamos el uso de CUDA
device = torch.device('cuda:0')

#### Visualización de lote de imágenes de entrenamiento

In [None]:
batch_imagenes = next(iter(dt_loader))
plt.figure(figsize=(8, 8))
plt.axis("off")
plt.title("Imágenes de entrenamiento")
plt.imshow(
    np.transpose(
        make_grid(batch_imagenes[0].to(device)[:64], padding=2,
                  normalize=True).cpu(), (1, 2, 0)))

#### Función generadora de vector de ruido

In [None]:
def noise_generator(n, dimension, device):
    return torch.randn(n, dimension, device=device)

#### Función de visualización durante el entrenamiento

In [None]:
def show_tensor_images(image_tensor, num_images=25, size=(1, 64, 64)):
    '''
    '''
    image_tensor = (image_tensor + 1) / 2
    image_unflat = image_tensor.detach().cpu()
    image_grid = make_grid(image_unflat[:num_images], nrow=5)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

#### Parámetros

In [None]:
# dim_z es la cantidad de canales del vector de ruido de entrada
dim_z = 200
# ch_rgb son los canales 3 canales de color
ch_rgb = 3
# gd_hidd_ch es el valor base que tendrá la dimensión de los tensores 
# durante las convoluciones ocultas en el generador y en el discriminador
gd_hidd_ch = 64 
# el ratio de aprendizaje es la distancia que recorre el punto de 
# optimización al tomarse la gradiente del modelo para encontrar 
# mínimos locales o globales y así optimizar el modelo
lr = 0.0005

# Función de inicialización de pesos
def weights_init(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias, 0)

### RED GENERADORA

In [None]:
class redGeneradora(nn.Module):
    def __init__(self, dim_z=200, ch_rgb=3, hidd_ch=64):
        
        super(redGeneradora, self).__init__()
        
        self.z_dim=dim_z
        self.main = nn.Sequential(
            self.gen_blocks(dim_z, hidd_ch*8, stride=1, padding=0),
            self.gen_blocks(hidd_ch*8, hidd_ch*4),
            self.gen_blocks(hidd_ch*4, hidd_ch*2),
            self.gen_blocks(hidd_ch*2, hidd_ch),
            self.gen_blocks(hidd_ch, hidd_ch), ## ¿Afecta las ConvT a un mismo nivel?
            self.gen_blocks(hidd_ch, ch_rgb,f_layer=True)  # Obtenemos un "Tensor" 3*128*128
        )
    
    def gen_blocks(self, inp_ch, out_ch, kernel=4, stride=2, padding=1, f_layer=False):
        if not f_layer:
            return nn.Sequential(
                nn.ConvTranspose2d(inp_ch, out_ch, kernel, stride, padding, bias = False), # Probando bias = False
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True)
            )
        else:
            return nn.Sequential(
                nn.ConvTranspose2d(inp_ch, out_ch, kernel, stride, padding, bias = False),
                nn.Sigmoid()
            )
        
        
    def forward(self, input_t):
        # El tensor de ruido se redimensiona a 1*1*dim_Z donde 
        # dim_Z son los canales de entrada
        x = input_t.view(len(input_t), self.z_dim, 1, 1)
        return self.main(x)

##### Instanciado del modelo generador

In [None]:
# Instanciamos el modelo
anime_gen = redGeneradora(dim_z, ch_rgb, gd_hidd_ch).to(device)

# Asignamos pesos a la red generadora
anime_gen = anime_gen.apply(weights_init)

# Instanciamos su optimizador
gen_optim = torch.optim.Adam(anime_gen.parameters(), lr=lr, betas=(0.5, 0.999))

### RED DISCRIMINADORA

In [None]:
class redDiscriminadora(nn.Module):
    def __init__(self, dim_z=200, ch_rgb=3, hidd_ch=64):
        
        super(redDiscriminadora, self).__init__()
        
        self.z_dim=dim_z
        # Necesitamos realizar el proceso inverso
        self.main = nn.Sequential(
            # la entrada sería 3*128*128
            self.gen_blocks(ch_rgb, hidd_ch),
            self.gen_blocks(hidd_ch, hidd_ch),
            self.gen_blocks(hidd_ch, hidd_ch*2),
            self.gen_blocks(hidd_ch*2, hidd_ch*4),
            self.gen_blocks(hidd_ch*4, hidd_ch*8),
            self.gen_blocks(hidd_ch*8, 1,f_layer=True)  # Obtenemos un "Tensor" 1*128*128
        )
    
    def gen_blocks(self, inp_ch, out_ch, kernel=4, stride=2, padding=1, f_layer=False):
        if not f_layer:
            return nn.Sequential(
                nn.Conv2d(inp_ch, out_ch, kernel, stride, padding, bias = False), # Probando bias = False
                nn.BatchNorm2d(out_ch),
                nn.LeakyReLU( inplace=True)
            )
        else:
            return nn.Sequential(
                nn.Conv2d(inp_ch, out_ch, kernel, 1, 0, bias = False),
                nn.Sigmoid()
            )
        
        
    def forward(self, input_arr):
        y = self.main(input_arr)
        return y.view(len(y), -1)

##### Instanciando el modelo Discriminador

In [None]:
# Instanciamos el modelo
anime_dis = redDiscriminadora(dim_z, ch_rgb, gd_hidd_ch).to(device)

# Asignamos pesos a la red generadora
anime_dis = anime_dis.apply(weights_init)

# Instanciamos su optimizador
dis_optim = torch.optim.Adam(anime_dis.parameters(), lr=lr, betas=(0.5, 0.999))

##### Instanciamos la función de pérdida

In [None]:
criterion = nn.BCEWithLogitsLoss()

In [None]:
def display_losses(cur_step, display_step, generator_losses,
                   discriminator_losses):
    if cur_step % display_step == 0 and cur_step > 0:
        gen_mean = sum(generator_losses[-display_step:]) / display_step
        disc_mean = sum(discriminator_losses[-display_step:]) / display_step
        step_bins = 20
        x_axis = sorted(
            [i * step_bins
             for i in range(len(generator_losses) // step_bins)] * step_bins)
        num_examples = (len(generator_losses) // step_bins) * step_bins
        plt.plot(range(num_examples // step_bins),
                 torch.Tensor(generator_losses[:num_examples]).view(
                     -1, step_bins).mean(1),
                 label="Generator Loss")
        plt.plot(range(num_examples // step_bins),
                 torch.Tensor(discriminator_losses[:num_examples]).view(
                     -1, step_bins).mean(1),
                 label="Discriminator Loss")
        plt.legend()
        plt.show()
    elif cur_step == 0:
        print("Tu modelo se está entrenando, espero funcione :'D ")

### Entrenamiento

In [None]:


G_losses = []
D_losses = []



num_epochs = 100
display_step = 250

cur_step = 0
mean_generator_loss = 0
mean_discriminator_loss = 0

for epoch in range(num_epochs):
    
    for real, _ in tqdm(dt_loader):
        
        # cur_batch_size: Cantidad de datos del batch del dataloader?
        cur_batch_size = len(real) 
        # real: iterador
        real = real.to(device)
        
        #######################################
        ##   Actualizamos el discriminador   ##
        #######################################
        
        # devolvemos a cero las gradientes
        dis_optim.zero_grad()   #####
        
        # fake_noise: nuevo vector de ruido para el batch de datos
        fake_noise = noise_generator(cur_batch_size, dim_z, device=device) ####
        
        # Introducimos el tensor de ruido al generador
        fake = anime_gen(fake_noise)
        
        # Obtenemos la predicción del discriminador de los datos falsos
        ###   D(G(z))
        disc_fake_pred = anime_dis(fake.detach())
        #Obs: Detach <> ¿puntero al tensor?
        
        # Obtenemos la predicción del discriminador de los datos reales
        ###   D(G(z))
        disc_real_pred = anime_dis(real)
        
        # Obtenemos los valores de pérdida al evaluar los datos falsos
        # Básicamente etiqueta a la distribución de estos datos como falsa
        # al darles como valor 0
        disc_fake_loss = criterion(disc_fake_pred, torch.zeros_like(disc_fake_pred))
        
        # Obtenemos los valores de pérdida al evaluar los datos reales
        # Básicamente etiqueta a la distribución de estos datos como verdadero
        # al darles como valor 1
        disc_real_loss = criterion(disc_real_pred, torch.ones_like(disc_real_pred))
        
        # Obtenemos la media general de las pérdidas en esta iteración
        disc_loss = (disc_fake_loss.float() + disc_real_loss.float()) / 2

        # Obtenemos el promedio ponderado de la pérdida del 
        # discriminador 
        mean_discriminator_loss += disc_loss.item()/display_step
        
        
        # Actualizamos gradientes
        # retain_graph=True permite mantener la pérdida durante el proceso
        disc_loss.backward(retain_graph=True)
        
        # Actualizamos el optimizador
        dis_optim.step()

        
        ###################################
        ##   Actualizamos el generador   ##
        ###################################
        
        # devolvemos a cero las gradientes
        gen_optim.zero_grad()
        
        # Creamos un nuevo tensor de ruido para el generador
        fake_noise_2 = noise_generator(cur_batch_size, dim_z, device=device)
        
        # Obtenemos el resultado de evaluar el tensor en el modelo
        fake_2 = anime_gen(fake_noise_2)
        
        # Obtenemos nuevamente la predicción del Discriminador
        disc_fake_pred = anime_dis(fake_2)
        
        # Obtenemos la evaluación de la función de pérdida 
        # Buscamos etiquetar nuestros datos falsos como verdaderos
        gen_loss = criterion(disc_fake_pred, torch.ones_like(disc_fake_pred))
        
        # Actualizamos gradientes de la función de pérdida del generador
        gen_loss.backward()
        
        # Actualizamos el optimizador
        gen_optim.step()
        
        
        # Obtenemos el promedio ponderado de la pérdida del 
        # generador 
        mean_generator_loss += gen_loss.item()/ display_step

        # Guardamos los valores de pérdida para su ploteo
        G_losses.append(gen_loss.item())
        D_losses.append(disc_loss.item())
        
        ## 
        if cur_step % display_step == 0 and cur_step > 0:
            print(f"Epoch {epoch}, step {cur_step}: Generator loss: {mean_generator_loss}, discriminator loss: {mean_discriminator_loss}")
            show_tensor_images(fake)
            show_tensor_images(real)
            display_losses(cur_step, display_step, G_losses, D_losses)
            mean_generator_loss = 0
            mean_discriminator_loss = 0
        cur_step += 1
        
    if epoch % 25 == 0:
        torch.save({
                'epoch': epoch,
                'model_state_dict': anime_gen.state_dict(),
                'optimizer_state_dict': gen_optim.state_dict(),
                'loss': G_losses,
                }, 'generator_anime_G4.pt')
        torch.save({
                'epoch': epoch,
                'model_state_dict': anime_dis.state_dict(),
                'optimizer_state_dict': dis_optim.state_dict(),
                'loss': D_losses,
                }, 'discriminator_anime_G4.pt')