# **Implementación de una DCGAN en PyTorch usando el conjunto de datos CelebA**
Seguiremos el tutorial de la documentación de PyTorch, que puede encontrarse en [Tutorial DCGAN](https://docs.pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html), donde se implementa una DCGAN siguiendo la arquitectura y recomendaciones del artículo original de las DCGAN  ([Unsupervised Representation Learning with Deep Convolutional Generative Adversarial Networks](https://arxiv.org/abs/1511.06434)).

# Librerías necesarias:
Primero, empezamos importando las librerías requeridas:

In [None]:
import argparse
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from PIL import Image, ImageDraw, ImageFont

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.utils as vutils

# Fijamos la semilla de los números aleatorios para poder reproducirlo:

#semilla_manual = 999
semilla_manual = random.randint(1, 10000) # para nuevos resultados
print("Semilla aleatoria: ", semilla_manual)
random.seed(semilla_manual)
torch.manual_seed(semilla_manual)
torch.use_deterministic_algorithms(True) # Necesario para resultados reproducibles

# Definir los parámetros de entrada de la DCGAN:
Ahora, definimos los hiperparámetros del modelo DCGAN, como la tasa de aprendizaje, el número de épocas, el tamaño del minilote o la dimensión de las imágenes generadas y del espacio latente:

In [None]:
batch_size = 128    # Tamaño del minilote (batch) durante el entrenamiento
image_size = 64     # Dimensión espacial de las imágenes de entrenamiento. Todas las imágenes se redimensionarán a este tamaño mediante una transformación
image_channels = 3  # Número de canales de las imágenes de entrenamiento. Para imágenes a color (RGB) es 3
latent_dim = 100    # Dimensión de un vector z del espacio latente (es decir, de la entrada del generador)
ngf = 64            # Tamaño del mapa de características del generador
ndf = 64            # Tamaño del mapa de características del discriminador
num_epochs = 20     # Número de épocas de entrenamiento
lr = 0.0002         # Tasa de aprendizaje (learning rate) para la optimización
beta1 = 0.5         # Hiperparámetro Beta1 del optimizador Adam

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Decidir en qué unidad queremos ejecutarlo

# Cargar el conjunto de datos CelebA:
Hay varias formas de cargar conjuntos de datos en Google Colab. Asumiremos que hemos descargado el conjunto de datos de [CelebA Dataset](https://mmlab.ie.cuhk.edu.hk/projects/CelebA.html) y que hemos guardado el archivo img_align_celeb.zip en la carpeta "/datasets/celeba" de Google Drive. Después, montaremos Google Drive en Google Colab para poder acceder a los archivos en Drive y extraer el contenido del archivo zip en una carpeta "/content/celeba". Después, confirmaremos que tenemos el contenido deseado:


In [None]:
# CelebA dataset:
from google.colab import drive
drive.mount('/content/drive')
!unzip -n /content/drive/MyDrive/datasets/celeba/img_align_celeba.zip -d /content/celeba
!ls /content/celeba
!ls /content/celeba/img_align_celeba | head

Ahora, podemos crear el dataset en PyTorch, prepararlo para el entrenamiento y visualizar algunas de las imágenes de entrenamiento:

In [None]:
dataroot = "/content/celeba" # Dirección de la carpeta del dataset
workers = 2 # Número de trabajadores para dataloader

# Crear el dataset:
dataset = datasets.ImageFolder(root=dataroot,
                               transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))
# Cargar los datos en minilotes con el dataloader:
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=workers)
# Visualizar imágenes de entrenamiento:
real_batch = next(iter(dataloader))
plt.figure(figsize=(8,8))
plt.axis("off")
plt.title("Imágenes de entrenamiento")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=2, normalize=True).cpu(),(1,2,0)))
plt.show()

# Inicializar pesos:
Inicializaremos los pesos que usaremos para entrenar al discriminador y al generador siguiendo una distribución N(0,0.02):

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

# Definir la arquitectura del modelo DCGAN
## Crear la red del Generador:
El generador está diseñado para transformar un vector del espacio latente de dimensiones 100x1x1 en una imagen de dimensiones 3x64x64.

La red Generador estará implementada como una serie de capas convolucionales traspuestas, aplicando normalización del minilote tras cada una y usando ReLU como función de activación en todas excepto para la capa de salida, que usa tanh.

In [None]:
class Generator(nn.Module):

    def __init__(self):

        super(Generator, self).__init__()

        self.main = nn.Sequential(
            # Entrada: vector z. Dimensión: 100 x 1 x 1
            nn.ConvTranspose2d(latent_dim, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),

            # Dimensión: (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),

            # Dimensión: (ngf*4) x 8 x 8
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),

            # Dimensión: (ngf*2) x 16 x 16
            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            # Dimensión: (ngf) x 32 x 32
            nn.ConvTranspose2d(ngf, image_channels, 4, 2, 1, bias=False),
            nn.Tanh()

            # Dimensión de la salida: (image_channels) x 64 x 64
        )

    def forward(self, input):
        return self.main(input)

# Crear el generador:
netG = Generator().to(device)

# Aplicar la función weights_init para inicializar los pesos:
netG.apply(weights_init)

# Imprimir la red G:
print(netG)


## Crear la red del Discriminador:
El discriminador es un clasificador binario que toma como entrada una imagen de dimensiones 3x64x64 y devuelve un valor que representa la probabilidad de que la imagen de entrada sea real, es decir, una salida de dimensión 1x1x1.

La red Discriminador estará implementada como una serie de capas convolucionales, usando normalización del minilote tras cada una y LeakyReLU como función de activación en todas excepto para la salida, que usa sigmoide.

In [None]:
class Discriminator(nn.Module):

    def __init__(self):

        super(Discriminator, self).__init__()

        self.main = nn.Sequential(
            # Dimensión de la entrada: (image_channels) x 64 x 64
            nn.Conv2d(image_channels, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            # Dimensión: (ndf) x 32 x 32
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),

            # Dimensión: (ndf*2) x 16 x 16
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),

            # Dimensión: (ndf*4) x 8 x 8
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),

            # Dimensión: (ndf*8) x 4 x 4
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)

# Crear el discriminador:
netD = Discriminator().to(device)

# Aplicar la función weights_init para inicializar los pesos:
netD.apply(weights_init)

# Imprimir la red D:
print(netD)

## Definir la función de pérdida y los optimizadores:
Usaremos la función de pérdida de la entropía cruzada binaria (BCE, Binarry Cross-Entropy) y el optimizador Adam.

In [None]:
# Inicializar la función de pérdida entropía cruzada binaria (BCE):
criterion = nn.BCELoss()

# Crear un minilote de 64 vectores latentes que usaremos para
# visualizar la evolución del generador:
fixed_noise = torch.randn(64, latent_dim, 1, 1, device=device)

# Visualizamos el espacio latente inicial:
with torch.no_grad():
    fake_init = netG(fixed_noise).detach().cpu()

plt.figure(figsize=(8,8))
plt.axis("off")
plt.title("Espacio latente antes del entrenamiento")
plt.imshow(np.transpose(vutils.make_grid(fake_init, padding=2, normalize=True), (1,2,0)))
plt.show()

# Establecemos las etiquetas para las muestras reales y falsas durante el
# entrenamiento:
real_label = 1.
fake_label = 0.

# Optimizadores Adam para D y G:
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

# Entrenamiento del modelo DCGAN:
Entrenaremos el modelo aplicando el algoritmo de descenso de gradiente estocástico (SGD) por minilotes, entrenando primero D y después G en cada iteración.

Para seguir la evolución, guardaremos las pérdidas de D y G y se imprimirán cada 50 iteraciones. Además, visualizaremos y guardaremos imágenes generadas por G tras cada época.
We will train the model applying mini-batch stochastic gradient descent (SGD).

In [None]:
# Listas para seguir el proceso:
G_losses = []
D_losses = []
img_list = []

print("Comenzando bucle de entrenamiento...")
# Por cada época:
for epoch in range(num_epochs):

    # Por cada minilote en el dataloader:
    for i, data in enumerate(dataloader, 0):

        ### (1) Actualizar la red D: maximizar log(D(x)) + log(1 - D(G(z)))

        netD.zero_grad()

        ## Entrenar con minilote de muestras reales:

        # Formato del minilote:
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)

        # Propagar el minilote real a través de D:
        output_real = netD(real_cpu).view(-1)

        # Calcular el error para las muestras reales:
        errD_real = criterion(output_real, label)

        # Calcular los gradientes de D para la retropropagación:
        errD_real.backward()
        D_x = output_real.mean().item()

        ## Entrenar con minilote de muestras falsas:

        # Generar minilote de vectores latentes:
        noise = torch.randn(b_size, latent_dim, 1, 1, device=device)

        # Generar minilote de muestras falsas con G:
        fake = netG(noise)
        label.fill_(fake_label)

        # Clasificar las muestras falsas con D:
        output_fake = netD(fake.detach()).view(-1)

        # Calcular el error de D para las muestras falsas:
        errD_fake = criterion(output_fake, label)

        # Caclular los gradientes de D del minilote de muestras falsas:
        errD_fake.backward()
        D_G_z1 = output_fake.mean().item()

        # Sumar el error de D sobre las muestras reales y las falsas:
        errD = errD_real + errD_fake

        # Actualizar D:
        optimizerD.step()

        ### (2) Actualizar la red G: maximizar log(D(G(z)))

        netG.zero_grad()

        label.fill_(real_label)  # las etiquetas de las muestras falsas se ponen como reales para calcular el coste de G

        # Como acabamos de actualizar D, propagamos de nuevo el minilote de muestras falsas a través de D:
        output = netD(fake).view(-1)

        # Calcular el error de G para sus muestras (falsas):
        errG = criterion(output, label)

        # Calcular los gradientes de G:
        errG.backward()
        D_G_z2 = output.mean().item()

        # Actualizar G:
        optimizerG.step()

        ### Evolución del entrenamiento:
        if i % 50 == 0:
            print('[%d/%d][%d/%d]\tPérdida_D: %.4f\tPérdida_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        ### Guardamos las pérdidas para graficarlas después:
        G_losses.append(errG.item())
        D_losses.append(errD.item())

    ### Visualizamos y guardamos los resultados tras cada época:
    with torch.no_grad():
      fake = netG(fixed_noise).detach().cpu()

    grid = vutils.make_grid(fake, padding=2, normalize=True)
    img_list.append(grid)

    plt.figure(figsize=(8,8))
    plt.axis("off")
    plt.title(f"Época {epoch}")
    plt.imshow(np.transpose(vutils.make_grid(fake, padding=2, normalize=True), (1,2,0)))
    plt.show()

    os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
    vutils.save_image(fake, f"/content/drive/MyDrive/dcgan_progress/epoch_{epoch:03d}.png", normalize=True)



# Guardar el modelo entrenado:

In [None]:
os.makedirs("/content/drive/MyDrive/models", exist_ok=True)

torch.save(netG.state_dict(), "/content/drive/MyDrive/models/G.pth")
torch.save(netD.state_dict(), "/content/drive/MyDrive/models/D.pth")

torch.save(optimizerG.state_dict(), "/content/drive/MyDrive/models/optimizerG.pth")
torch.save(optimizerD.state_dict(), "/content/drive/MyDrive/models/optimizerD.pth")

# Resultados:
## 1. Gif animado de la evolución del entrenamiento del modelo DCGAN

In [None]:
frames = []

font = ImageFont.load_default()

for i, img in enumerate(img_list):

    frame = np.transpose(img.numpy(), (1,2,0))
    frame = (frame * 255).astype(np.uint8)

    pil_img = Image.fromarray(frame)

    text_height_space = 30
    canvas = Image.new('RGB', (pil_img.width, pil_img.height + text_height_space), color='white')
    canvas.paste(pil_img, (0,0))

    draw = ImageDraw.Draw(canvas)
    text = f"Época {i}"

    bbox = draw.textbbox((0,0), text, font=font)
    text_width = bbox[2] - bbox[0]
    text_height = bbox[3] - bbox[1]

    x = (canvas.width - text_width) // 2
    y = pil_img.height + (text_height_space - text_height) // 2
    draw.text((x, y), text, fill='black', font=font)

    frames.append(np.array(canvas))

# Guardar y visualizar gif:
os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
gif_file = os.path.join("/content/drive/MyDrive/dcgan_progress", "dcgan_training.gif")

fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

## 2. Comparación de imágenes reales vs imágenes falsas

In [None]:
# Tomar un minilote de imágenes reales
real_batch = next(iter(dataloader))

# Visualizar las imágenes reales:
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Imágenes reales")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

# Visualizar las imágenes falsas:
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Imágenes falsas")
plt.imshow(np.transpose(vutils.make_grid(fake, padding=5, normalize=True), (1,2,0)))
plt.savefig("/content/drive/MyDrive/dcgan_progress/comparison.png")
plt.show()


## 3. Gráfica de la pérdida del generador y el discriminador durante el entrenamiento

In [None]:
plt.figure(figsize=(10,5))
plt.title("Pérdida del generador y el discriminador durante el entrenamiento")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iteraciones")
plt.ylabel("pérdida")
plt.legend()
plt.savefig("/content/drive/MyDrive/dcgan_progress/losses.png")
plt.show()

## 4. Generar imágenes artificiales a partir de nuevos vectores de ruido aleatorio

In [None]:

latent_dim = 100
new_images = 4
fixed_noise = []
for j in range(new_images):
  fixed_noise.append(torch.randn(64, latent_dim, 1, 1, device=device)) # generamos new_images conjuntos de 64 vectores aleatorios

  # Generar nuevas imágenes dando a G como entrada los vectores de ruido:
  with torch.no_grad():
    fake = netG(fixed_noise[j]).detach().cpu()

  plt.figure(figsize=(8,8))
  plt.axis("off")
  plt.title("Imagen generada " f"{j+1}")
  plt.imshow(np.transpose(vutils.make_grid(fake, padding=2, normalize=True), (1,2,0)))
  plt.show()

  os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
  vutils.save_image(fake, f"/content/drive/MyDrive/dcgan_progress/generated_{j+1}.png", normalize=True)
