# Preâmbulo

Imports, funções, downloads e instalação do Pytorch.

In [0]:
 # Basic imports.
import os
import time
import numpy as np
import torch

from torch import nn
from torch import optim
from torch.nn import functional as F

from torch.utils.data import DataLoader
from torch.utils import data
from torch.backends import cudnn

from torchvision import models
from torchvision import datasets
from torchvision import transforms

from skimage import io

from sklearn import metrics

from matplotlib import pyplot as plt

%matplotlib inline

cudnn.benchmark = True

In [0]:
# Setting predefined arguments.
args = {
    'epoch_num': 25,      # Number of epochs.
    'lr': 5e-4,           # Learning rate.
    'weight_decay': 5e-5, # L2 penalty.
    'num_workers': 8,     # Number of workers on data loader.
    'batch_size': 500,    # Mini-batch size.
    'print_freq': 1,      # Printing frequency.
    'z_dim': 100,         # Dimension of z input vector.
    'num_samples': 10,    # Number of samples to be generated in evaluation.
    'num_classes': 10,    # Number of classes in dataset.
    'img_size': 28,       # Image size (H and W).
}

if torch.cuda.is_available():
    args['device'] = torch.device('cuda')
else:
    args['device'] = torch.device('cpu')

print(args['device'])

# Carregando o  MNIST

In [0]:
# Root directory for the dataset (to be downloaded).
root = './'

# Transformations over the dataset.
data_transforms = transforms.Compose([
    transforms.ToTensor(),
])

# Setting datasets and dataloaders.
train_set = datasets.MNIST(root,
                           train=True,
                           download=True,
                           transform=data_transforms)
test_set = datasets.MNIST(root,
                          train=False,
                          download=False,
                          transform=data_transforms)

# Setting dataloaders.
train_loader = DataLoader(train_set,
                          args['batch_size'],
                          num_workers=args['num_workers'],
                          shuffle=True)
test_loader = DataLoader(test_set,
                         args['batch_size'],
                         num_workers=args['num_workers'],
                         shuffle=False)

# Printing training and testing dataset sizes.
print('Size of training set: ' + str(len(train_set)) + ' samples')
print('Size of test set: ' + str(len(test_set)) + ' samples')

# Treinamento Adversarial Condicional

Podemos inserir o conceito de classe na nossa GAN ao passar o rótulo *c* de cada amostra para tanto $G$ quanto $D$. Dessa forma, $G$ vai otimizar a distribuição $p(x | z, c)$ usando o rótulo para gerar amostras da classe correta.

![CGANS](https://www.dropbox.com/s/gqtc5710dsrd4rh/GANs_Architecture_CGAN.png?dl=1)

Fazemos isso ao adicionar one-hot encodings de $c$ para cada amostra que são concatenados ao batch na dimensão 1 (tanto em $G$ quanto em $D$). Esse tipo de rede é conhecido como uma Conditional GAN (CGAN).

# Atividade Prática: Transformar nossa GAN em uma CGAN

1.   Implementar função *one_hot_encodings()*. Ela deve retornar 2 one-hot encodings: um para ser passado para $G$ e um para ser passado para $D$;
2.   Reaproveitar as arquiteturas de $G$ e $D$ anteriores e adaptar a *self.fc* de $G$ para receber o one-hot encoding das classes das amostras junto de $z$, ou seja, *args\['num_classes'\]* dimensões a mais de input para $G$. O módulo $self.conv$ de $D$ também deve ser adaptado para receber *args\['num_classes'\]* canais a mais;
3.   Complementar função *train()* para passar os encodings corretos para o *forward()* de cada rede;
4.   Lembrar de realizar a concatenação de ($z$, *c*) no *forward()* de $G$ e ($x$, *c*) no *forward()* de $D$.

# Computando one-hot encodings para labels

In [0]:
# TO DO: Predefining one-hot encodings for G and D inputs.
def one_hot_encodings(labs, batch_size, num_classes, img_size):

    # TO DO: create array filled zeros with dimensions (B, #Classes).
    batch_one_hot_g = # ...
    
    # TO DO: create array filled zeros with dimensions (B, #Classes, 28, 28).
    batch_one_hot_d = # ...
    
    # TO DO: For each label in a sample in a batch, fill the corresponding
    #        other dimensions in batch_one_hot_g and batch_one_hot_d with 1.
    for i in range(labs.size(0)):

        # ...
        
        # ...
    
    return batch_one_hot_g, batch_one_hot_d

# Sanity test for function one_hot_encodings().
labs = torch.randint_like(torch.zeros((4)), high=10, dtype=torch.long).to(args['device'])
print(labs)
one_hot_g, one_hot_d = one_hot_encodings(labs, 4, 10, 2)
print(one_hot_g)

# Definindo o Gerador $G$

In [0]:
# Adversarial Generator.
class Generator(nn.Module):
    
    def __init__(self,
                 input_dim=100,
                 output_channels=1,
                 input_size=28,
                 num_classes=10):
    
        super(Generator, self).__init__()
        
        self.input_dim = input_dim
        self.output_channels = output_channels
        self.input_size = input_size
        self.num_classes = num_classes

        # TO DO: Define FC layers.
        self.fc = # ...
        
        # TO DO: Define deconv layers.
        self.deconv = # ...
        
        self.initialize_weights()

        
    # Function for randomly initializing weights.
    def initialize_weights(self):
        
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, z, c):
        
        # TO DO: Concat z and c.
        # ...
        
        # TO DO: Forward.
        # ...
        
# Instantiating G.
net_G = Generator(input_dim=args['z_dim']).to(args['device'])

# Printing architecture.
print(net_G)

# Definindo o Discriminador $D$

In [0]:
# Adversarial Discriminator.
class Discriminator(nn.Module):
    
    def __init__(self,
                 input_channels=1,
                 output_channels=1,
                 input_size=28,
                 num_classes=10):
        
        super(Discriminator, self).__init__()
        
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.input_size = input_size
        self.num_classes = num_classes
        
        # TO DO: Define conv layers.
        self.conv = # ...
        
        # TO DO: Define FC layers.
        self.fc = # ...
        
        self.initialize_weights()

        
    # Function for randomly initializing weights.
    def initialize_weights(self):
        
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                
                
    def forward(self, x, c):
        
        # TO DO: Concat x and c.
        # ...
        
        # TO DO: Forward.
        # ...

# Instantiating D.
net_D = Discriminator().to(args['device'])

# Printing architecture.
print(net_D)

# Definindo o otimizadores

In [0]:
# TO DO: defining optimizer for G.
opt_G = # ...

# TO DO: defining optimizer for D.
opt_D = # ...

# Definindo um Scheduler para os Learning Rates

In [0]:
# TO DO: defining lr scheduler.
scheduler_G = # ...
scheduler_D = # ...

# Definindo a loss composta

In [0]:
# TO DO: defining adversarial loss.
criterion = # ...

# Criando funções para Treino e Teste

In [0]:
# Training procedure.
def train(train_loader,
          net_G, net_D,
          criterion,
          opt_G, opt_D,
          epoch,
          train_loss_G, train_loss_D):
    
    tic = time.time()
    
    # Predefining ones and zeros for batches.
    y_real = torch.ones(args['batch_size'], 1).to(args['device'])
    y_fake = torch.zeros(args['batch_size'], 1).to(args['device'])

    # Setting networks for training mode.
    net_D.train()
    net_G.train()
    
    # Iterating over batches.
    for i, batch_data in enumerate(train_loader):
        
        # Obtaining images and labels for batch.
        x, labs = batch_data
        
        # TO DO: Creating random vector z with size (batch_size, z_dim).
        z = # ...
        
        # TO DO: Generating random values for batch fake_labs.
        fake_labs = torch.randint_like(labs, high=10, dtype=torch.long).to(args['device'])
        
        # TO DO: Casting to correct device (x, z, labs and fake_labs).
        # ...
        
        # TO DO: Computing one-hot encodings for batch labs and fake_labs.
        one_hot_g_fake, one_hot_d_fake = # ...
        one_hot_g_real, one_hot_d_real = #... 
        
        ###############
        # Updating D. #
        ###############
        
        # TO DO: Clearing the gradients of D optimizer.
        # ...

        # TO DO: Forwarding real data.
        D_real = # ... # Through D.
        
        # TO DO: Computing loss for real data.
        D_real_loss = # ...

        # TO DO: Forwarding fake data.
        G_out = # ... # Through G.
        
        D_fake = # ... # Through D.
        
        # TO DO: Computing loss for fake data.
        D_fake_loss = # ...

        # TO DO: Computing total loss for D.
        D_loss = # ...
        
        # TO DO: Computing backpropagation for D.
        # ...
        
        # TO DO: Taking step in D optimizer.
        # ...

        ###############
        # Updating G. #
        ###############
        
        # TO DO: Clearing the gradients of G optimizer.
        # ...

        # TO DO: Forwarding fake data.
        G_out = # ... # Through G.
        
        D_fake = # ... # Through D.
        
        # TO DO: Computing loss for G.
        G_loss = # ...
        
        # TO DO: Computing backpropagation for G.
        # ...
        
        # TO DO: Taking step in G optimizer.
        # ...
        
        
        # Updating lists.
        train_loss_G.append(G_loss.data.item())
        train_loss_D.append(D_loss.data.item())

    toc = time.time()
    
    # Printing training epoch loss.
    print('-------------------------------------------------------------------')
    print('[epoch %d], [training time %.2f]' % (
        epoch, (toc - tic)))
    print('-------------------------------------------------------------------')
    
    if epoch % args['print_freq'] == 0:
        
        # Plotting losses.
        fig, ax = plt.subplots(1, 2, figsize=(16, 4))

        ax[0].plot(np.asarray(train_loss_G), 'r-', label='G loss')
        ax[0].legend()
        
        ax[1].plot(np.asarray(train_loss_D), 'b--', label='D loss')
        ax[1].legend()

        plt.show()
        
    return train_loss_G, train_loss_D

In [0]:
# Evaluating procedure.
def evaluate(net_G, criterion, epoch):
    
    # Setting networks for training mode.
    net_D.eval()
    net_G.eval()
    
    fake_labs = torch.randint_like(torch.zeros((args['num_samples'] * args['num_samples'])),
                                   high=10,
                                   dtype=torch.long).to(args['device'])
    
    one_hot_g, one_hot_d = one_hot_encodings(fake_labs,
                                             args['num_samples'] * args['num_samples'],
                                             args['num_classes'],
                                             args['img_size'])
    
    # Creating random vector z.
    z = torch.rand((args['num_samples'] * args['num_samples'], args['z_dim']))
    
    # Casting to correct device.
    z = z.to(args['device'])
    
    # Generating new samples.
    G_out = net_G(z, one_hot_g)
    
    # Plotting.
    fig, ax = plt.subplots(args['num_samples'],
                           args['num_samples'],
                           figsize=(20, 20))
    
    for i in range(args['num_samples']):
        
        for j in range(args['num_samples']):
            
            sample = G_out[j * args['num_samples'] + i]
            
            ax[j, i].imshow(sample.detach().cpu().numpy().squeeze(),
                            cmap=plt.get_cmap('gray'))
            ax[j, i].set_yticks([])
            ax[j, i].set_xticks([])
            ax[j, i].set_title('Label: ' + str(fake_labs[j * args['num_samples'] + i].item()))
            
    plt.show()

# Iterando sobre epochs

In [0]:
# Lists for losses.
train_loss_G = []
train_loss_D = []

# Iterating over epochs.
for epoch in range(1, args['epoch_num'] + 1):

    # Training function.
    train_loss_G, train_loss_D = train(train_loader,
                                       net_G, net_D,
                                       criterion,
                                       opt_G, opt_D,
                                       epoch,
                                       train_loss_G, train_loss_D)
    
    # Taking step on scheduler.
    scheduler_G.step()
    scheduler_D.step()

    if epoch % args['print_freq'] == 0:
        
        # Testing function for sample generation.
        evaluate(net_G, criterion, epoch)