# **DCGAN implementation in PyTorch using the CelebA Dataset**
We will be following the DCGAN Tutorial from PyTorch documentation that can be found in [DCGAN Tutorial](https://docs.pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html), in which a DCGAN is implemented following the architecture and recommendations from the DCGAN paper ([Unsupervised Representation Learning with Deep Convolutional Generative Adversarial Networks](https://arxiv.org/abs/1511.06434)).

# Libraries required:
First, let's start importing the necessary libraries:

In [None]:
import argparse
import os
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from PIL import Image, ImageDraw, ImageFont

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.utils as vutils

# Set random seed for reproducibility:

#manualSeed = 999
#manualSeed = random.randint(1, 10000) # for new results
#print("Random Seed: ", manualSeed)
#random.seed(manualSeed)
#torch.manual_seed(manualSeed)
#torch.use_deterministic_algorithms(True) # Needed for reproducible results

# Defining DCGAN model's input parameters:
We now define the hyperparameters of the DCGAN model, such as the learning rate, number of epochs, batch size or the dimension of the images generated and the latent space:

In [None]:
batch_size = 128      # Batch size during training
image_size = 64       # Spatial size of training images. All images will be resized to this size using a transformer.
image_channels = 3    # Number of channels in the training images. For color images (RGB) this is 3
latent_dim = 100      # Size of z latent vector (i.e. size of generator input)
ngf = 64              # Size of feature maps in generator
ndf = 64              # Size of feature maps in discriminator
num_epochs = 20       # Number of training epochs
lr = 0.0002           # Learning rate for optimizers
beta1 = 0.5           # Beta1 hyperparameter for Adam optimizers

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Decide which device we want to run on

# Loading CelebA Dataset:
There are different ways of loading datasets into Google Colab. We will assume that we downloaded the dataset from [CelebA Dataset](https://mmlab.ie.cuhk.edu.hk/projects/CelebA.html) and uploaded the file img_align_celeb.zip to Google Drive in the folder "/datasets/celeba". After, we'll mount Google Drive in Google Colab to access Drive files, extract the zip file into "/content/celeba" directory and confirm its content:

In [None]:
# CelebA dataset:
from google.colab import drive
drive.mount('/content/drive')
!unzip -n /content/drive/MyDrive/datasets/celeba/img_align_celeba.zip -d /content/celeba
!ls /content/celeba
!ls /content/celeba/img_align_celeba | head

Now, we are able to create the dataset, prepare it for training and visualize some of the training data:

In [None]:
dataroot = "/content/celeba" # Root directory for dataset
workers = 2 # Number of workers for dataloader

# Create the dataset
dataset = datasets.ImageFolder(root=dataroot,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))
# Create the dataloader
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=workers)
# Plot some training images
real_batch = next(iter(dataloader))
plt.figure(figsize=(8,8))
plt.axis("off")
plt.title("Training Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=2, normalize=True).cpu(),(1,2,0)))
plt.show()

# Weights initialization
We initialize the weights that will be used to train both the discriminator and generator.

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

# Defining the DCGAN architecture
## Creating the Generator network:
The generator is designed to transform an input latent space vector of dimensions 100x1x1 into an output image of size 3x64x64.

The generator network is implemented as a series of transposed convolutional layers, applying batch normalization after each of them and ReLU activation for all layers except for the output, which uses Tanh.

In [None]:
class Generator(nn.Module):

    def __init__(self):

        super(Generator, self).__init__()

        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(latent_dim, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),

            # state size. ``(ngf*8) x 4 x 4``
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),

            # state size. ``(ngf*4) x 8 x 8``
            nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),

            # state size. ``(ngf*2) x 16 x 16``
            nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            # state size. ``(ngf) x 32 x 32``
            nn.ConvTranspose2d( ngf, image_channels, 4, 2, 1, bias=False),
            nn.Tanh()

            # state size. ``(image_channels) x 64 x 64``
        )

    def forward(self, input):
        return self.main(input)

# Create the generator
netG = Generator().to(device)

# Apply the ``weights_init`` function to randomly initialize all weights
#  to ``mean=0``, ``stdev=0.02``.
netG.apply(weights_init)

# Print the model
print(netG)


## Creating the Discriminator network:
The discriminator is a binary classification network that takes an image of size 3x64x64 as input and outputs a scalar probability that the input image is real (as opposed to fake), i.e., a 1x1x1 output.

The discriminator network is implemented as a series of convolutional layers, using batch normalization after each of them and LeakyReLU activation for all layers except for the output, that uses Sigmoid.

In [None]:
class Discriminator(nn.Module):

    def __init__(self):

        super(Discriminator, self).__init__()

        self.main = nn.Sequential(
            # input is ``(image_channels) x 64 x 64``
            nn.Conv2d(image_channels, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),

            # state size. ``(ndf) x 32 x 32``
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),

            # state size. ``(ndf*2) x 16 x 16``
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),

            # state size. ``(ndf*4) x 8 x 8``
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),

            # state size. ``(ndf*8) x 4 x 4``
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)

# Create the Discriminator
netD = Discriminator().to(device)

# Apply the ``weights_init`` function to randomly initialize all weights
# like this: ``to mean=0, stdev=0.2``.
netD.apply(weights_init)

# Print the model
print(netD)

## Defining loss functions and optimizers:

In [None]:
# Initialize the Binary Cross-Entropy loss function
criterion = nn.BCELoss()

# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, latent_dim, 1, 1, device=device)

# Visualize the input random noise:
with torch.no_grad():
    fake_init = netG(fixed_noise).detach().cpu()

plt.figure(figsize=(8,8))
plt.axis("off")
plt.title("Latent space before training")
plt.imshow(np.transpose(vutils.make_grid(fake_init, padding=2, normalize=True), (1,2,0)))
plt.show()

# Establish convention for real and fake labels during training
real_label = 1.
fake_label = 0.

# Setup Adam optimizers for both G and D
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

# Training the DCGAN model:
We will train the model applying mini-batch stochastic gradient descent (SGD).

To track the progress, we are saving the losses of the discriminator and the generator and they will be printed every 50 iterations.
Moreover, we will visualize and save the images generated after every epoch.

In [None]:
# Lists to keep track of progress
G_losses = []
D_losses = []
img_list = []

print("Starting Training Loop...")
# For each epoch
for epoch in range(num_epochs):

    # For each batch in the dataloader
    for i, data in enumerate(dataloader, 0):

        ### (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))

        netD.zero_grad()

        ## Train with all-real batch:

        # Format batch
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)

        # Forward pass real batch through D
        output_real = netD(real_cpu).view(-1)

        # Calculate loss on all-real batch
        errD_real = criterion(output_real, label)

        # Calculate gradients for D in backward pass
        errD_real.backward()
        D_x = output_real.mean().item()

        ## Train with all-fake batch:

        # Generate batch of latent vectors
        noise = torch.randn(b_size, latent_dim, 1, 1, device=device)

        # Generate fake image batch with G
        fake = netG(noise)
        label.fill_(fake_label)

        # Classify all fake batch with D
        output_fake = netD(fake.detach()).view(-1)

        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output_fake, label)

        # Calculate the gradients for this batch, accumulated (summed) with previous gradients
        errD_fake.backward()
        D_G_z1 = output_fake.mean().item()

        # Compute error of D as sum over the fake and the real batches
        errD = errD_real + errD_fake

        # Update D
        optimizerD.step()

        ### (2) Update G network: maximize log(D(G(z)))

        netG.zero_grad()

        label.fill_(real_label)  # fake labels are real for generator cost

        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake).view(-1)

        # Calculate G's loss based on this output
        errG = criterion(output, label)

        # Calculate gradients for G
        errG.backward()
        D_G_z2 = output.mean().item()

        # Update G
        optimizerG.step()

        ### Output training stats:
        if i % 50 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        ### Save Losses for plotting later:
        G_losses.append(errG.item())
        D_losses.append(errD.item())

    ### Visualize and save results after each epoch:
    with torch.no_grad():
      fake = netG(fixed_noise).detach().cpu()

    grid = vutils.make_grid(fake, padding=2, normalize=True)
    img_list.append(grid)

    plt.figure(figsize=(8,8))
    plt.axis("off")
    plt.title(f"Epoch {epoch}")
    plt.imshow(np.transpose(vutils.make_grid(fake, padding=2, normalize=True), (1,2,0)))
    plt.show()

    os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
    vutils.save_image(fake, f"/content/drive/MyDrive/dcgan_progress/epoch_{epoch:03d}.png", normalize=True)



# Saving the trained model:

In [None]:
os.makedirs("/content/drive/MyDrive/models", exist_ok=True)

torch.save(netG.state_dict(), "/content/drive/MyDrive/models/G.pth")
torch.save(netD.state_dict(), "/content/drive/MyDrive/models/D.pth")

torch.save(optimizerG.state_dict(), "/content/drive/MyDrive/models/optimizerG.pth")
torch.save(optimizerD.state_dict(), "/content/drive/MyDrive/models/optimizerD.pth")

# Results:
## 1. Animated gif of the DCGAN training progress

In [None]:
frames = []

font = ImageFont.load_default()

for i, img in enumerate(img_list):

    frame = np.transpose(img.numpy(), (1,2,0))
    frame = (frame * 255).astype(np.uint8)

    pil_img = Image.fromarray(frame)

    text_height_space = 30
    canvas = Image.new('RGB', (pil_img.width, pil_img.height + text_height_space), color='white')
    canvas.paste(pil_img, (0,0))

    draw = ImageDraw.Draw(canvas)
    text = f"Epoch {i}"

    bbox = draw.textbbox((0,0), text, font=font)
    text_width = bbox[2] - bbox[0]
    text_height = bbox[3] - bbox[1]

    x = (canvas.width - text_width) // 2
    y = pil_img.height + (text_height_space - text_height) // 2
    draw.text((x, y), text, fill='black', font=font)

    frames.append(np.array(canvas))

# Save and visualize gif:
os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
gif_file = os.path.join("/content/drive/MyDrive/dcgan_progress", "dcgan_training.gif")

fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

## 2. Real vs fake images visualization

In [None]:
# Grab a batch of real images from the dataloader
real_batch = next(iter(dataloader))

# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

# Plot the fake images from the last epoch
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(vutils.make_grid(fake, padding=5, normalize=True), (1,2,0)))
plt.savefig("/content/drive/MyDrive/dcgan_progress/comparison.png")
plt.show()


## 3. Plotting generator and discriminator losses during training.

In [None]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig("/content/drive/MyDrive/dcgan_progress/losses.png")
plt.show()

## 4. Generating fake images from new random noise

In [None]:

latent_dim = 100
new_images = 4
fixed_noise = []
for j in range(new_images):
  fixed_noise.append(torch.randn(64, latent_dim, 1, 1, device=device))

  # Save and visualize results after each epoch:
  with torch.no_grad():
    fake = netG(fixed_noise[j]).detach().cpu()

  plt.figure(figsize=(8,8))
  plt.axis("off")
  plt.title("Generated image " f"{j+1}")
  plt.imshow(np.transpose(vutils.make_grid(fake, padding=2, normalize=True), (1,2,0)))
  plt.show()

  os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
  vutils.save_image(fake, f"/content/drive/MyDrive/dcgan_progress/generated_{j+1}.png", normalize=True)


# If you want to keep training...
We can continue training the DCGAN by loading the trained model

In [None]:
# Device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Generator
netG = Generator.to(device)
netD = Discriminator.to(device)

# Optimizers
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))

checkpoint_dir = "/content/drive/MyDrive/models"

netG.load_state_dict(torch.load(f"{checkpoint_dir}/G.pth", map_location=device))
netD.load_state_dict(torch.load(f"{checkpoint_dir}/D.pth", map_location=device))

optimizerG.load_state_dict(torch.load(f"{checkpoint_dir}/optimizerG.pth"))
optimizerD.load_state_dict(torch.load(f"{checkpoint_dir}/optimizerD.pth"))

print("Checkpoint loaded. Resuming training...")

In [None]:
# Lists to keep track of progress
G_losses = []
D_losses = []

start_epoch = 20
num_epochs = 10  # additional number of epochs

for epoch in range(start_epoch, start_epoch + num_epochs):

    # For each batch in the dataloader
    for i, data in enumerate(dataloader, 0):

        ### (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))

        netD.zero_grad()

        ## Train with all-real batch:

        # Format batch
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)

        # Forward pass real batch through D
        output_real = netD(real_cpu).view(-1)

        # Calculate loss on all-real batch
        errD_real = criterion(output_real, label)

        # Calculate gradients for D in backward pass
        errD_real.backward()
        D_x = output_real.mean().item()

        ## Train with all-fake batch:

        # Generate batch of latent vectors
        noise = torch.randn(b_size, latent_dim, 1, 1, device=device)

        # Generate fake image batch with G
        fake = netG(noise)
        label.fill_(fake_label)

        # Classify all fake batch with D
        output_fake = netD(fake.detach()).view(-1)

        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output_fake, label)

        # Calculate the gradients for this batch, accumulated (summed) with previous gradients
        errD_fake.backward()
        D_G_z1 = output_fake.mean().item()

        # Compute error of D as sum over the fake and the real batches
        errD = errD_real + errD_fake

        # Update D
        optimizerD.step()

        ### (2) Update G network: maximize log(D(G(z)))

        netG.zero_grad()

        label.fill_(real_label)  # fake labels are real for generator cost

        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake).view(-1)

        # Calculate G's loss based on this output
        errG = criterion(output, label)

        # Calculate gradients for G
        errG.backward()
        D_G_z2 = output.mean().item()

        # Update G
        optimizerG.step()

        ### Output training stats:
        if i % 50 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, start_epoch + num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        ### Save Losses for plotting later:
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        ### Check how the generator is doing by saving G's output on fixed_noise:
        #if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
         #   with torch.no_grad():
         #       fake = netG(fixed_noise).detach().cpu()
          #  img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
        #iters += 1

    ### Visualize and save results after each epoch:
    with torch.no_grad():
      fake = netG(fixed_noise).detach().cpu()

    plt.figure(figsize=(8,8))
    plt.axis("off")
    plt.title(f"Epoch {epoch}")
    plt.imshow(np.transpose(vutils.make_grid(fake, padding=2, normalize=True), (1,2,0)))
    plt.show()

    os.makedirs("/content/drive/MyDrive/dcgan_progress", exist_ok=True)
    vutils.save_image(fake, f"/content/drive/MyDrive/dcgan_progress/epoch_{epoch:03d}.png", normalize=True)

