# Applied Programming Coding Challenge #2

# General Information

This GAN aims to generate images of Pokemon based on 801 original images of Pokemon. 

In [1]:
import random
import urllib.request
from datetime import datetime

import matplotlib
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
from IPython.display import HTML
from torch.autograd import Variable
from torchvision.utils import save_image

```TIP FOR DEVS``` Detect whether notebook is executed in Colab or not. Use this information to load data from local directory or Google Drive.

In [2]:
try:
  from google.colab import drive
  IN_COLAB = True
except:
  IN_COLAB = False

In [3]:
# Set seed to get reproducible results
manualSeed = 42
# manualSeed = random.randint(1, 10000)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

# 0 Configuration

## 0.1 Configure device

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## 0.2 Configure parameters

In [5]:
if IN_COLAB:
    drive.mount('/content/gdrive')
    dataroot_parent = "/content/gdrive/My Drive/Colab Files"
else:
    dataroot_parent = ".."

dataroot = dataroot_parent + "/data/pokemon-images-jpg"
workers = 2
batch_size = 64
image_size = 64
color_channels = 3
latent_vector = 100
ngf = 64 # Size of feature maps in generator
ndf = 64 # Size of feature maps in discriminator
num_epochs = 100
learning_rate = 0.0002
beta1 = 0.5
ngpu = 1

# 1 Load data

## 1.1 Convert data to jpg

In [6]:
def convertToJpg():
    from PIL import Image
    import os
    import glob
    src = "../data/pokemon-images/raw"
    dst = "../data/pokemon-images-jpg/raw"
    
    for each in glob.glob(src+'/*.png'):
        png = Image.open(each)
        png.load()
        background = Image.new("RGB", png.size, (255,255,255))
        background.paste(png, mask=png.split()[3]) # 3 is the alpha channel
        print(each.replace("\\", "/"), os.path.join(dst, each.replace("\\", "/").split('/')[4].split('.')[0] + '.jpg'))
        background.save(os.path.join(dst, each.replace("\\", "/").split('/')[4].split('.')[0] + '.jpg'), 'JPEG')

## 1.2 Load sprites into notebook

In [9]:
# Define transformation pipeline
train_dataset = dset.ImageFolder(root=dataroot,
                           transform=transforms.Compose([
                               transforms.Resize(image_size),
                               transforms.CenterCrop(image_size),
                               transforms.ToTensor(),
                               transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
                           ]))

In [10]:
# Load data
dataloader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=False, 
                                           num_workers=workers)

# 2 Understand data

In [11]:
# Display sample from first batch
real_batch = next(iter(dataloader))
plt.figure(figsize=(16,8))
plt.axis("off")
plt.title("Training Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=2, normalize=True).cpu(),(1,2,0)))

# 3 Prepare net

## 3.1 Initialize weights

In [12]:
def weights_init(m):
    """Initializes weights based on layer type"""
    
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, mean=0.0, std=0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, mean=1.0, std=0.02)
        nn.init.constant_(m.bias.data, val=0)
        

## 3.2 Initialize generator

In [13]:
class Generator(nn.Module):
    """Generator net"""
    
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(in_channels=latent_vector, out_channels=ngf * 8, kernel_size=4, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(num_features=ngf * 8),
            nn.ReLU(inplace=True),

            nn.ConvTranspose2d(in_channels=ngf * 8, out_channels=ngf * 4, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ngf * 4),
            nn.ReLU(inplace=True),

            nn.ConvTranspose2d(in_channels= ngf * 4, out_channels=ngf * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ngf * 2),
            nn.ReLU(inplace=True),

            nn.ConvTranspose2d(in_channels=ngf * 2, out_channels=ngf, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ngf),
            nn.ReLU(inplace=True),

            nn.ConvTranspose2d(in_channels=ngf, out_channels=color_channels, kernel_size=4, stride=2, padding=1, bias=False),
            nn.Tanh()
        )

    def forward(self, input):
        return self.main(input)

In [14]:
# Create generator
netG = Generator(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netG = nn.DataParallel(netG, list(range(ngpu)))

# Apply weights
netG.apply(weights_init)

# Show generator
print(netG)

## 3.3 Initialize discriminator

In [15]:
class Discriminator(nn.Module):
    """Discriminator net"""
    
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.Conv2d(in_channels=color_channels, out_channels=ndf, kernel_size=4, stride=2, padding=1, bias=False),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            
            nn.Conv2d(in_channels=ndf, out_channels=ndf * 2, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ndf * 2),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            
            nn.Conv2d(in_channels=ndf * 2, out_channels=ndf * 4, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ndf * 4),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            
            nn.Conv2d(in_channels=ndf * 4, out_channels=ndf * 8, kernel_size=4, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(num_features=ndf * 8),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            
            nn.Conv2d(in_channels=ndf * 8, out_channels=1, kernel_size=4, stride=1, padding=0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)
    

In [16]:
# Create discriminator
netD = Discriminator(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netD = nn.DataParallel(netD, list(range(ngpu)))

# Apply weights
netD.apply(weights_init)

# Show discriminator
print(netD)

## 3.4 Init loss fucolor_channelstion

In [17]:
# Initialize binary cross entropy loss
criterion = nn.BCELoss()

# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, latent_vector, 1, 1, device=device)

# Establish convention for real and fake labels during training
real_label = 1
fake_label = 0

# Setup Adam optimizers
optimizerD = optim.Adam(netD.parameters(), lr=learning_rate, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=learning_rate, betas=(beta1, 0.999))

# 4 Training

In [18]:
def saveFileToGoogleDrive(image, fileName, dirName = "Colab Files"):
    """Saves a file to Google Drive"""
    
    save_image(image, '/content/gdrive/My Drive/' + dirName + '/' + fileName)

def saveFileToLocal(image, fileName, dirName = "pokemon-images-simple"):
    """Saves a file to local directory"""
    
    save_image(image, '../build/' + dirName + '/' + fileName)
    
def generateImage(epoch, image):
    """Generates an image based on a given input vector"""
        
    # print("generateImage(epoch=" + str(epoch) + ")")
    now = datetime.now()
    fileName = str(now.strftime('%Y-%m-%dT%H-%M-%S')) + '_epoch_' + str(epoch) + '.png'
    
    if IN_COLAB:
        saveFileToGoogleDrive(image, fileName)
    else:
        saveFileToLocal(image, fileName)

In [19]:
# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []

# Input vector to generate new images
input_vector = Variable(torch.randn(batch_size, latent_vector).to(device))

In [20]:
def processEpoch(_epoch):
    """Processes one epoch"""
    
    _last_errG = 1000
    
    for i, data in enumerate(dataloader, 0):
        
        _errG = processBatch(_epoch, data, i)
                
        if (_errG > _last_errG * 2.5):
            return False
        
        _last_errG = _errG
        
        with torch.no_grad():
            fake = netG(fixed_noise).detach().cpu()
          
    # Save image
    if (_epoch % 10 == 0):
        generateImage(_epoch, fake)
    
    img_list.append(vutils.make_grid(fake, padding=2, normalize=True))

In [21]:
def processBatch(_epoch, _data, _i):
    """Processes a batch"""
    
    #---
    # Train discriminator
    #---
    
    ## Train with batch of real images
    
    netD.zero_grad()
    # Format batch
    batch = _data[0].to(device)
    batch_size = batch.size(0)
    label = torch.full((batch_size,), real_label, device=device)
    
    # Forward pass real batch through discriminator
    output = netD(batch).view(-1)
    
    # Calculate loss on all-real batch
    errD_real = criterion(output, label)
    
    # Calculate gradients for discriminator in backward pass
    errD_real.backward()
    D_x = output.mean().item()

    ## Train with batch of fake images
    
    # Generate batch of latent vectors
    noise = torch.randn(batch_size, latent_vector, 1, 1, device=device)
    
    # Generate fake image batch with generator
    fake = netG(noise)
    label.fill_(fake_label)
    
    # Classify all fake batch with discriminator
    output = netD(fake.detach()).view(-1)
    
    # Calculate discriminator's loss on the all-fake batch
    errD_fake = criterion(output, label)
    
    # Calculate the gradients for this batch
    errD_fake.backward()
    D_G_z1 = output.mean().item()
    
    # Add the gradients from the all-real and all-fake batches
    errD = errD_real + errD_fake
    
    # Update discriminator
    optimizerD.step()
    
    #---
    # Train generator
    #---
    
    netG.zero_grad()
    label.fill_(real_label)
    
    # Sicolor_channelse we just updated D, perform another forward pass of all-fake batch through D
    output = netD(fake).view(-1)
    
    # Calculate generator's loss based on this output
    errG = criterion(output, label)
    
    # Calculate gradients for generator
    errG.backward()
    D_G_z2 = output.mean().item()
    
    # Update generator
    optimizerG.step()
    
    # ---
    # Store results
    # ---

    # Output training stats
    if _i % 10 == 0:
        print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
              % (_epoch+1, num_epochs, _i+1, len(dataloader),
                 errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

    # Save Losses for plotting later
    G_losses.append(errG.item())
    D_losses.append(errD.item())
    
    return errG.item()

## 4.2 Training loop

In [None]:
# Iterate over epochs
def processGan():
    for epoch in range(num_epochs):
        proceed = %time processEpoch(epoch)
        
        if (proceed == False):
            break

In [None]:
%time processGan()

In [None]:
# Plot loss
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# Plot examples
matplotlib.rcParams['animation.embed_limit'] = 2**128

fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())