## **0. Setup environment**

In [None]:
!nvidia-smi -L

In [None]:
!tree -d ../../images

In [None]:
import numpy as np
import time
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torchvision.utils as vutils
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from torch.utils.tensorboard import SummaryWriter

In [None]:
"""
import random
# Set random seed for reproducibility
manualSeed = 42
#manualSeed = random.randint(1, 10000) # use if you want new results
random.seed(manualSeed)
torch.manual_seed(manualSeed)
print("Random Seed: ", manualSeed)
"""
pass

### **1. DCGAN**

In [None]:
class DCGAN():

    def __init__(
        self,                   
        dataroot,               # Root directory for dataset
        workers = 2,            # Number of workers for dataloader
        batch_size = 128,       # Batch size during training
        image_size = 64,        # Spatial size of training images. All images will be resized to this size using a transformer.
        nc = 3,                 # Number of channels in the training images. For color images this is 3
        nz = 100,               # Size of z latent vector (i.e. size of generator input)
        ngf = 64,               # Size of feature maps in generator
        ndf = 64,               # Size of feature maps in discriminator
        num_epochs = 5,         # Number of training epochs
        lr = 0.0002,            # Learning rate for optimizers
        beta1 = 0.5,            # Beta1 hyperparam for Adam optimizers
        ngpu = 1                # Number of GPUs available. Use 0 for CPU mode.
        ):
        # Hyperparameters etc.
        self.device = torch.device("cuda" if (torch.cuda.is_available() and ngpu > 0) else "cpu")
        self.LEARNING_RATE = lr  # could also use two lrs, one for gen and one for disc
        self.BATCH_SIZE = batch_size
        self.IMAGE_SIZE = image_size
        self.CHANNELS_IMG = nc
        self.NOISE_DIM = nz
        self.NUM_EPOCHS = num_epochs
        self.FEATURES_DISC = ndf
        self.FEATURES_GEN = ngf
        self.BETA = beta1

        self.transform = transforms.Compose(
            [
                transforms.Resize((self.IMAGE_SIZE, self.IMAGE_SIZE)),
                transforms.ToTensor(),
                transforms.Normalize(
                    [0.5 for _ in range(self.CHANNELS_IMG)], [0.5 for _ in range(self.CHANNELS_IMG)]
                ),
            ]
        )

        # comment mnist above and uncomment below if train on CelebA
        self.dataset = datasets.ImageFolder(root=dataroot, transform=self.transform)
        self.dataloader = DataLoader(self.dataset, batch_size=self.BATCH_SIZE, shuffle=True, num_workers=workers)
        self.gen = self.Generator(self.NOISE_DIM, self.CHANNELS_IMG, self.FEATURES_GEN, ngpu).to(self.device)
        self.disc = self.Discriminator(self.CHANNELS_IMG, self.FEATURES_DISC, ngpu).to(self.device)
        if (self.device.type == 'cuda') and (ngpu > 1):
            self.gen = nn.DataParallel(self.gen, list(range(ngpu)))
            self.disc = nn.DataParallel(self.disc, list(range(ngpu)))
        self.gen.apply(self.weights_init)
        self.disc.apply(self.weights_init)

        # Initialize BCELoss function
        self.criterion = nn.BCELoss()
        # Create batch of latent vectors that we will use to visualize the progression of the generator
        self.fixed_noise = torch.randn(64, self.NOISE_DIM, 1, 1, device=self.device)
        # Establish convention for real and fake labels during training
        self.real_label = 1.
        self.fake_label = 0.
        # Setup Adam optimizers for both G and D
        self.opt_gen = optim.Adam(self.gen.parameters(), lr=self.LEARNING_RATE, betas=(self.BETA, 0.999))
        self.opt_disc = optim.Adam(self.disc.parameters(), lr=self.LEARNING_RATE, betas=(self.BETA, 0.999))


    def weights_init(self, m):
        classname = m.__class__.__name__
        if classname.find('Conv') != -1:
            nn.init.normal_(m.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm') != -1:
            nn.init.normal_(m.weight.data, 1.0, 0.02)
            nn.init.constant_(m.bias.data, 0)

    def train(self):
        writer_real = SummaryWriter(f"logs/real")
        writer_fake = SummaryWriter(f"logs/fake")

        # Training Loop

        # Lists to keep track of progress
        img_list = []
        G_losses = []
        D_losses = []
        iters = 0

        # For each epoch
        for epoch in range(self.NUM_EPOCHS):
            # For each batch in the dataloader
            if epoch % 5 == 0:
                time.sleep(1)
                print("Epochs done: {}".format(epoch))
                time.sleep(1)
            for i, data in enumerate(tqdm(self.dataloader)):

                ############################
                # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
                ###########################
                ## Train with all-real batch
                self.disc.zero_grad()
                # Format batch
                real_cpu = data[0].to(self.device)
                b_size = real_cpu.size(0)
                label = torch.full((b_size,), self.real_label, dtype=torch.float, device=self.device)
                # Forward pass real batch through D
                output = self.disc(real_cpu).view(-1)
                # Calculate loss on all-real batch
                errD_real = self.criterion(output, label)
                # Calculate gradients for D in backward pass
                errD_real.backward()
                D_x = output.mean().item()

                ## Train with all-fake batch
                # Generate batch of latent vectors
                noise = torch.randn(b_size, self.NOISE_DIM, 1, 1, device=self.device)
                # Generate fake image batch with G
                fake = self.gen(noise)
                label.fill_(self.fake_label)
                # Classify all fake batch with D
                output = self.disc(fake.detach()).view(-1)
                # Calculate D's loss on the all-fake batch
                errD_fake = self.criterion(output, label)
                # Calculate the gradients for this batch, accumulated (summed) with previous gradients
                errD_fake.backward()
                D_G_z1 = output.mean().item()
                # Compute error of D as sum over the fake and the real batches
                errD = errD_real + errD_fake
                # Update D
                self.opt_disc.step()

                ############################
                # (2) Update G network: maximize log(D(G(z)))
                ###########################
                self.gen.zero_grad()
                label.fill_(self.real_label)  # fake labels are real for generator cost
                # Since we just updated D, perform another forward pass of all-fake batch through D
                output = self.disc(fake).view(-1)
                # Calculate G's loss based on this output
                errG = self.criterion(output, label)
                # Calculate gradients for G
                errG.backward()
                D_G_z2 = output.mean().item()
                # Update G
                self.opt_gen.step()

                # Save Losses for plotting later
                G_losses.append(errG.item())
                D_losses.append(errD.item())

                # Check how the generator is doing by saving G's output on fixed_noise
                if (iters % 100 == 0) or ((epoch == self.NUM_EPOCHS-1) and (i == len(self.dataloader)-1)):
                    with torch.no_grad():
                        no_grad_fake = self.gen(self.fixed_noise).detach().cpu()
                    img_list.append(vutils.make_grid(no_grad_fake, padding=2, normalize=True))

                    img_grid_real = torchvision.utils.make_grid(data[0][:32], normalize=True)
                    img_grid_fake = torchvision.utils.make_grid(no_grad_fake[:32], normalize=True)

                    writer_real.add_image("Real", img_grid_real, global_step=iters)
                    writer_fake.add_image("Fake", img_grid_fake, global_step=iters)

                iters += 1
            time.sleep(1)
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, self.NUM_EPOCHS, i, len(self.dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
            time.sleep(1)
        return img_list, G_losses, D_losses 

    class Generator(nn.Module):
        def __init__(self, channels_noise, channels_img, features_g, ngpu):
            super(DCGAN.Generator, self).__init__()
            self.ngpu = ngpu
            self.gen = nn.Sequential(
                # input is Z, going into a convolution
                nn.ConvTranspose2d(channels_noise, features_g * 8, 4, 1, 0, bias=False),
                nn.BatchNorm2d(features_g * 8),
                nn.ReLU(True),
                # state size. (features_g*8) x 4 x 4
                nn.ConvTranspose2d(features_g * 8, features_g * 4, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_g * 4),
                nn.ReLU(True),
                # state size. (features_g*4) x 8 x 8
                nn.ConvTranspose2d(features_g * 4, features_g * 2, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_g * 2),
                nn.ReLU(True),
                # state size. (features_g*2) x 16 x 16
                nn.ConvTranspose2d(features_g * 2, features_g, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_g),
                nn.ReLU(True),
                # state size. (features_g) x 32 x 32
                nn.ConvTranspose2d(features_g, channels_img, 4, 2, 1, bias=False),
                nn.Tanh()
                # state size. (channels_img) x 64 x 64
            )

        def forward(self, x):
            return self.gen(x)

    class Discriminator(nn.Module):
        def __init__(self, channels_img, features_d, ngpu):
            super(DCGAN.Discriminator, self).__init__()
            self.ngpu = ngpu
            self.disc = nn.Sequential(
                # input is (nc) x 64 x 64
                nn.Conv2d(channels_img, features_d, 4, 2, 1, bias=False),
                nn.LeakyReLU(0.2, inplace=True),
                # state size. (ndf) x 32 x 32
                nn.Conv2d(features_d, features_d * 2, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_d * 2),
                nn.LeakyReLU(0.2, inplace=True),
                # state size. (ndf*2) x 16 x 16
                nn.Conv2d(features_d * 2, features_d * 4, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_d * 4),
                nn.LeakyReLU(0.2, inplace=True),
                # state size. (ndf*4) x 8 x 8
                nn.Conv2d(features_d * 4, features_d * 8, 4, 2, 1, bias=False),
                nn.BatchNorm2d(features_d * 8),
                nn.LeakyReLU(0.2, inplace=True),
                # state size. (ndf*8) x 4 x 4
                nn.Conv2d(features_d * 8, 1, 4, 1, 0, bias=False),
                nn.Sigmoid()
            )

        def forward(self, x):
            return self.disc(x)

In [None]:
DCGAN = DCGAN('../../images')

In [None]:
img_list, G_losses, D_losses = DCGAN.train()

In [None]:
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

In [None]:
# Grab a batch of real images from the dataloader
real_batch = next(iter(DCGAN.dataloader))

In [None]:
# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(DCGAN.device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

# Plot the fake images from the last epoch
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(img_list[-1],(1,2,0)))
plt.show()

In [None]:
# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(2,1,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(DCGAN.device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

# Plot the fake images from the last epoch
plt.subplot(2,1,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(img_list[-1],(1,2,0)))
plt.show()