<a href="https://colab.research.google.com/github/daliavaleriani/gan/blob/master/VanillaGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip install torch  



In [0]:
!pip3 install torchvision



In [0]:
!pip install tensorboardX



In [0]:
# Imports

import torch
from torch import nn, optim
from torch.autograd.variable import Variable
from torchvision import transforms, datasets


In [0]:
# Logger class
# La classe Logger viene utilizzata per rendere l'output più gradevole e leggibile.
# Ai fini del progetto non è stato ritenuto opportuno approfondire le sue funzionalità.

import os
import numpy as np
import errno
import torchvision.utils as vutils
from tensorboardX import SummaryWriter
from IPython import display
from matplotlib import pyplot as plt
import torch

'''
    TensorBoard Data will be stored in './runs' path
'''


class Logger:

    def __init__(self, model_name, data_name):
        self.model_name = model_name
        self.data_name = data_name

        self.comment = '{}_{}'.format(model_name, data_name)
        self.data_subdir = '{}/{}'.format(model_name, data_name)

        # TensorBoard
        self.writer = SummaryWriter(comment=self.comment)

    def log(self, d_error, g_error, epoch, n_batch, num_batches):

        # var_class = torch.autograd.variable.Variable
        if isinstance(d_error, torch.autograd.Variable):
            d_error = d_error.data.cpu().numpy()
        if isinstance(g_error, torch.autograd.Variable):
            g_error = g_error.data.cpu().numpy()

        step = Logger._step(epoch, n_batch, num_batches)
        self.writer.add_scalar(
            '{}/D_error'.format(self.comment), d_error, step)
        self.writer.add_scalar(
            '{}/G_error'.format(self.comment), g_error, step)

    def log_images(self, images, num_images, epoch, n_batch, num_batches, format='NCHW', normalize=True):
        '''
        input images are expected in format (NCHW)
        '''
        if type(images) == np.ndarray:
            images = torch.from_numpy(images)
        
        if format=='NHWC':
            images = images.transpose(1,3)
        

        step = Logger._step(epoch, n_batch, num_batches)
        img_name = '{}/images{}'.format(self.comment, '')

        # Make horizontal grid from image tensor
        horizontal_grid = vutils.make_grid(
            images, normalize=normalize, scale_each=True)
        # Make vertical grid from image tensor
        nrows = int(np.sqrt(num_images))
        grid = vutils.make_grid(
            images, nrow=nrows, normalize=True, scale_each=True)

        # Add horizontal images to tensorboard
        self.writer.add_image(img_name, horizontal_grid, step)

        # Save plots
        self.save_torch_images(horizontal_grid, grid, epoch, n_batch)

    def save_torch_images(self, horizontal_grid, grid, epoch, n_batch, plot_horizontal=True):
        out_dir = './data/images/{}'.format(self.data_subdir)
        Logger._make_dir(out_dir)

        # Plot and save horizontal
        fig = plt.figure(figsize=(16, 16))
        plt.imshow(np.moveaxis(horizontal_grid.numpy(), 0, -1))
        plt.axis('off')
        if plot_horizontal:
            display.display(plt.gcf())
        self._save_images(fig, epoch, n_batch, 'hori')
        plt.close()

        # Save squared
        fig = plt.figure()
        plt.imshow(np.moveaxis(grid.numpy(), 0, -1))
        plt.axis('off')
        self._save_images(fig, epoch, n_batch)
        plt.close()

    def _save_images(self, fig, epoch, n_batch, comment=''):
        out_dir = './data/images/{}'.format(self.data_subdir)
        Logger._make_dir(out_dir)
        fig.savefig('{}/{}_epoch_{}_batch_{}.png'.format(out_dir,
                                                         comment, epoch, n_batch))

    def display_status(self, epoch, num_epochs, n_batch, num_batches, d_error, g_error, d_pred_real, d_pred_fake):
        
        # var_class = torch.autograd.variable.Variable
        if isinstance(d_error, torch.autograd.Variable):
            d_error = d_error.data.cpu().numpy()
        if isinstance(g_error, torch.autograd.Variable):
            g_error = g_error.data.cpu().numpy()
        if isinstance(d_pred_real, torch.autograd.Variable):
            d_pred_real = d_pred_real.data
        if isinstance(d_pred_fake, torch.autograd.Variable):
            d_pred_fake = d_pred_fake.data
        
        
        print('Epoch: [{}/{}], Batch Num: [{}/{}]'.format(
            epoch,num_epochs, n_batch, num_batches)
             )
        print('Discriminator Loss: {:.4f}, Generator Loss: {:.4f}'.format(d_error, g_error))
        print('D(x): {:.4f}, D(G(z)): {:.4f}'.format(d_pred_real.mean(), d_pred_fake.mean()))

    def save_models(self, generator, discriminator, epoch):
        out_dir = './data/models/{}'.format(self.data_subdir)
        Logger._make_dir(out_dir)
        torch.save(generator.state_dict(),
                   '{}/G_epoch_{}'.format(out_dir, epoch))
        torch.save(discriminator.state_dict(),
                   '{}/D_epoch_{}'.format(out_dir, epoch))

    def close(self):
        self.writer.close()

    # Private Functionality

    @staticmethod
    def _step(epoch, n_batch, num_batches):
        return epoch * num_batches + n_batch

    @staticmethod
    def _make_dir(directory):
        try:
            os.makedirs(directory)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise




In [0]:
# Classe del Dataset

def mnist_data():
    # Si compongono diverse trasformazioni; ToTensor() converte immagini PIL in tensori;
    # Normalize() normalizza un immagine del tensore con media e deviazione standard:
    # la prima tupla rappreesenta la media e la seconda la deviazione standard.
    compose = transforms.Compose( 
        [transforms.ToTensor(), 
         transforms.Normalize([0.5],[0.5])
        ])
    out_dir = './dataset'
    return datasets.MNIST(root=out_dir, train=True, transform=compose, download=True)

# Load data
data = mnist_data()

# Si crea un loader con i dati, in modo da poterlo iterare.
# batch_size = numero di campioni da caricare per un batch.
# shuffle = True per avere i dati rimescolati in ogni epoca.
data_loader = torch.utils.data.DataLoader(data, batch_size=100, shuffle=True)
# Numero dei minibatch
num_batches = len(data_loader)

# N.B.: La lunghezza del loader si adatterà a batch_size.
# Quindi, se il dataset ha 1000 campioni e si usa un batch_size di 10, il loader avrà lunghezza 100.
# Infatti si utilizza il dataset MNIST che ha 60K esempi e un batch_size di 100: 60K/100=600 batches.

In [0]:
# Discriminatore

# torch.nn.Module = classe base per tutti i moduli della rete neurale.

class DiscriminatorNet(torch.nn.Module):
    """
    Rete neurale discriminativa a tre strati nascosti
    """
    # Questa rete acquisisce un'immagine appiattita come input e restituisce la probabilità che appartenga
    # al set di dati reale o al dataset finto.
    # La dimensione di input per ciascuna immagine sarà 28x28 = 784. 
    # Per quanto riguarda la struttura di questa rete, avrà tre livelli nascosti,
    # ciascuno seguito dalla funzione di attivazione non lineare Leaky-ReLU e da un livello Dropout
    # per evitare l'overfitting. Una funzione Sigmoid viene applicata all'output con valore reale
    # per ottenere un valore nell'intervallo aperto (0, 1).
    
    def __init__(self):
        super(DiscriminatorNet, self).__init__()
        n_features = 784
        n_out = 1
        
        self.hidden0 = nn.Sequential( 
            # nn.Linear(input, output)
            nn.Linear(n_features, 1024),
            # nn.LeakyReLU(negative_slope)
            # La leakyReLU risolve il "dying ReLU" problem.
            nn.LeakyReLU(0.2),
            # nn.Dropout(p) viene usato per ridurre l'overfitting. Ogni elemento di input ha una probabilità p
            # di essere eliminato. Questa è una tecnica efficace per la regolarizzazione. Le uscite vengono
            # ridimensionate di un fattore di 1/(1-p) durante l'allenamento. Una p di 0.5 (default) è
            # accettabile per i livelli nascosti.
            # A volte un p intorno a 0.2 funziona bene, ma dipende molto dal dataset di dati, quindi è
            # compito dell'utente provare diverse combinazioni.
            nn.Dropout(0.3)
        )
        self.hidden1 = nn.Sequential(
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.hidden2 = nn.Sequential(
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.out = nn.Sequential(
            torch.nn.Linear(256, n_out),
            #funzione di attivazione sigmoide
            torch.nn.Sigmoid()
        )

# Nella funzione seguente si definisce come verrà eseguito il modello, a partire dall'input fino all'output.
    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.out(x)
        return x

discriminator = DiscriminatorNet()

if(torch.cuda.is_available):
    discriminator = discriminator.cuda()

In [0]:
# Dimensionality changing 
# Si necessita anche di alcune funzionalità aggiuntive che ci permettano di convertire un'immagine
# appiattita nella sua rappresentazione bidimensionale, e un'altra che faccia il contrario.

def images_to_vectors(images):
    return images.view(images.size(0), 784)

def vectors_to_images(vectors):
    return vectors.view(vectors.size(0), 1, 28, 28)

In [0]:
# Generatore

class GeneratorNet(torch.nn.Module):
    """
    Rete neurale generativa a tre strati nascosti
    """
    # D'altra parte, la rete generativa prende un vettore variabile latente come input e restituisce
    # un vettore di 784 valori, che corrisponde a un'immagine 28x28 appiattita.
    # lo scopo di questa rete è imparare come creare immagini indistinguibili di cifre scritte a mano,
    # motivo per cui il suo output è di per sé una nuova immagine. 
    # Questa rete avrà tre livelli nascosti, ciascuno seguito da una Leaky-ReLU.
    # Lo strato di output avrà una funzione di attivazione di TanH, che mappa i valori risultanti nell'intervallo (-1, 1),
    # che è lo stesso intervallo in cui le immagini MNIST pre-elaborate sono limitate.
    
    def __init__(self):
        super(GeneratorNet, self).__init__()
        n_features = 100
        n_out = 784
        
        self.hidden0 = nn.Sequential(
            nn.Linear(n_features, 256),
            nn.LeakyReLU(0.2)
        )
        self.hidden1 = nn.Sequential(            
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2)
        )
        self.hidden2 = nn.Sequential(
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2)
        )
        
        self.out = nn.Sequential(
            nn.Linear(1024, n_out),
            # Funzione di attivazione tangente iperbolica:
            nn.Tanh()
        )

    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.out(x)
        return x

generator = GeneratorNet()

if(torch.cuda.is_available):
    generator = generator.cuda()

In [0]:
# Random noise sampler

# Servono alcune funzionalità aggiuntive che consentano di creare il rumore casuale.
# Il rumore casuale verrà campionato da una distribuzione normale con media 0 e varianza 1.

def noise(size):
    '''
    Generates a 1-d vector of gaussian sampled random values
    '''
    n = Variable(torch.randn(size, 100))
    return n

In [0]:
# Optimizers

# Qui si utilizzerà Adam come algoritmo di ottimizzazione per entrambe le reti neurali,
# con un learning rate di 0.0002. 

# optim.Adam(params, learning_rate)
d_optimizer = optim.Adam(discriminator.parameters(),lr=0.0002)
g_optimizer = optim.Adam(generator.parameters(), lr=0.0002)


In [0]:
# Loss function

# Binary Cross Entropy Loss: si prende la media della loss calcolata per ogni minibatch.
# Questa è usata per misurare l'errore di ricostruzione.

loss = nn.BCELoss()

In [0]:
# Vettori target
# Funzioni che aiuteranno in seguito nell'etichettatura (targeting) di immagini reali o false.
# Infatti si può osservare che i target di immagini reali sono sempre 1, mentre quelli di immagini finte sono 0.

def ones_target(size):
    '''
    Tensor containing ones, with shape = size
    '''
    data = Variable(torch.ones(size, 1))
    return data

def zeros_target(size):
    '''
    Tensor containing zeros, with shape = size
    '''
    data = Variable(torch.zeros(size, 1))
    return data

In [0]:
# Addestramento del discriminatore

# Sommando le due loss del discriminatore, si ottengono la loss totale del mini-batch per il discriminatore.
# In pratica, si calcoleranno i gradienti separatamente, per poi aggiornarli insieme.

def train_discriminator(optimizer, real_data, fake_data):
    N = real_data.size(0)
    # Reset gradients
    optimizer.zero_grad()
    
    # 1.1 Train on Real Data
    prediction_real = discriminator(real_data.cuda())
    prediction_real = prediction_real.cuda()
    # Calculate error and backpropagate
    error_real = loss(prediction_real.cuda(), ones_target(N).cuda())
    error_real = error_real.cuda()
    error_real.backward()

    # 1.2 Train on Fake Data
    prediction_fake = discriminator(fake_data.cuda())
    prediction_fake = prediction_fake.cuda()
    # Calculate error and backpropagate
    error_fake = loss(prediction_fake.cuda(), zeros_target(N).cuda())
    error_fake = error_fake.cuda()
    error_fake.backward()
    
    # 1.3 Update weights with gradients
    optimizer.step()
    
    # Return error and predictions for real and fake inputs
    return error_real + error_fake, prediction_real, prediction_fake

In [0]:
# Addestramento del generatore

def train_generator(optimizer, fake_data):
    N = fake_data.size(0)

    # Reset gradients
    optimizer.zero_grad()

    # Sample noise and generate fake data
    prediction = discriminator(fake_data.cuda())
    prediction = prediction.cuda()

    # Calculate error and backpropagate
    error = loss(prediction.cuda(), ones_target(N).cuda())
    error = error.cuda()
    error.backward()

    # Update weights with gradients
    optimizer.step()

    # Return error
    return error

In [0]:
# Testing

# Numero di immagini che compaiono nell'output finale:

num_test_samples = 16
test_noise = noise(num_test_samples)

In [0]:
# Training

# Si screa l'istanza di logger:
logger = Logger(model_name='VGAN', data_name='MNIST')

# Numero totale di epoche da addestrare:
num_epochs = 150

for epoch in range(num_epochs):
    for n_batch, (real_batch,_) in enumerate(data_loader):
        N = real_batch.size(0)

        # 1. Train Discriminator
        real_data = Variable(images_to_vectors(real_batch).cuda())

        # Generate fake data and detach 
        # (so gradients are not calculated for generator)
        fake_data = generator((noise(N).cuda().detach()))

        # Train D
        d_error, d_pred_real, d_pred_fake = \
              train_discriminator(d_optimizer, real_data.cuda(), fake_data.cuda())

        # 2. Train Generator

        # Generate fake data
        fake_data = generator(noise(N).cuda())

        # Train G
        g_error = train_generator(g_optimizer, fake_data)

        # Log batch error
        logger.log(d_error, g_error, epoch, n_batch, num_batches)

        # Display Progress every few batches
        if (n_batch) % 200 == 0: 
            test_images = vectors_to_images(generator(test_noise.cuda()))
            test_images = test_images.data

            logger.log_images(
                test_images.cpu(), num_test_samples, 
                epoch, n_batch, num_batches
            );
            # Display status Logs
            logger.display_status(
                epoch, num_epochs, n_batch, num_batches,
                d_error, g_error, d_pred_real, d_pred_fake
            )