<a href="https://colab.research.google.com/github/TheodorSergeev/optml_gan/blob/main/dcgan.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Adapted from https://pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html

# Initialisation

In [8]:
try:
    import google.colab
    IN_COLAB = True
except:
    IN_COLAB = False

if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')

    # packages to generate requirement.txt
    %pip install nbconvert
    %pip install pipreqs
    # for Frechet inception distance
    %pip install pytorch-fid

    %cd drive/My Drive/optml_gan2
    PATH = './'
else:
    PATH = './'

In [9]:
from __future__ import print_function

import time

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.utils.data

import torchvision.utils as vutils

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from scipy import linalg
from torch.nn.functional import adaptive_avg_pool2d
%matplotlib inline

# Generate Requirements

In [10]:
# # converts notebook to .py file for pipreqs
# !jupyter nbconvert --output-dir="./" --to script dcgan.ipynb

# # creates the requirement.txt file
# !pipreqs --force
# os.remove('./dcgan.py')  # deletes the .py file

# Source code

In [None]:
from src.data_handling import *
from src.utils import *
from src.model import *
from src.losses import *


loss_dict = {
    "kl": (loss_dis_kl, loss_gen_kl),
    "wass": (loss_dis_wasser, loss_gen_wasser),
    "hinge": (loss_dis_hinge, loss_gen_hinge)
}

# FID

from src.training import *
from src.visualisation import *
from src.serialisation import *

# https://keras.io/examples/generative/conditional_gan/
from src.architectures import *

from src.gridsearch import *

## FID

https://www.kaggle.com/code/ibtesama/gan-in-pytorch-with-fid/notebook  
https://github.com/mseitzer/pytorch-fid   
Currently uses the Kaggle stuff since calculating FID on the repo is run from the command line

In [20]:
import torchvision.models as models


class InceptionV3(nn.Module):
    """Pretrained InceptionV3 network returning feature maps"""

    # Index of default block of inception to return,
    # corresponds to output of final average pooling
    DEFAULT_BLOCK_INDEX = 3

    # Maps feature dimensionality to their output blocks indices
    BLOCK_INDEX_BY_DIM = {
        64: 0,   # First max pooling features
        192: 1,  # Second max pooling featurs
        768: 2,  # Pre-aux classifier features
        2048: 3  # Final average pooling features
    }

    def __init__(self,
                 output_blocks=[DEFAULT_BLOCK_INDEX],
                 resize_input=True,
                 normalize_input=True,
                 requires_grad=False):

        super(InceptionV3, self).__init__()

        self.resize_input = resize_input
        self.normalize_input = normalize_input
        self.output_blocks = sorted(output_blocks)
        self.last_needed_block = max(output_blocks)

        assert self.last_needed_block <= 3, \
            'Last possible output block index is 3'

        self.blocks = nn.ModuleList()

        inception = models.inception_v3(pretrained=True)

        # Block 0: input to maxpool1
        block0 = [
            inception.Conv2d_1a_3x3,
            inception.Conv2d_2a_3x3,
            inception.Conv2d_2b_3x3,
            nn.MaxPool2d(kernel_size=3, stride=2)
        ]
        self.blocks.append(nn.Sequential(*block0))

        # Block 1: maxpool1 to maxpool2
        if self.last_needed_block >= 1:
            block1 = [
                inception.Conv2d_3b_1x1,
                inception.Conv2d_4a_3x3,
                nn.MaxPool2d(kernel_size=3, stride=2)
            ]
            self.blocks.append(nn.Sequential(*block1))

        # Block 2: maxpool2 to aux classifier
        if self.last_needed_block >= 2:
            block2 = [
                inception.Mixed_5b,
                inception.Mixed_5c,
                inception.Mixed_5d,
                inception.Mixed_6a,
                inception.Mixed_6b,
                inception.Mixed_6c,
                inception.Mixed_6d,
                inception.Mixed_6e,
            ]
            self.blocks.append(nn.Sequential(*block2))

        # Block 3: aux classifier to final avgpool
        if self.last_needed_block >= 3:
            block3 = [
                inception.Mixed_7a,
                inception.Mixed_7b,
                inception.Mixed_7c,
                nn.AdaptiveAvgPool2d(output_size=(1, 1))
            ]
            self.blocks.append(nn.Sequential(*block3))

        for param in self.parameters():
            param.requires_grad = requires_grad

    def forward(self, inp):
        """Get Inception feature maps
        Parameters
        ----------
        inp : torch.autograd.Variable
            Input tensor of shape Bx3xHxW. Values are expected to be in
            range (0, 1)
        Returns
        -------
        List of torch.autograd.Variable, corresponding to the selected output
        block, sorted ascending by index
        """
        outp = []
        x = inp

        if self.resize_input:
            x = F.interpolate(x,
                              size=(299, 299),
                              mode='bilinear',
                              align_corners=False)

        if self.normalize_input:
            x = 2 * x - 1  # Scale from range (0, 1) to range (-1, 1)

        for idx, block in enumerate(self.blocks):
            x = block(x)
            if idx in self.output_blocks:
                outp.append(x)

            if idx == self.last_needed_block:
                break

        return outp

In [21]:
def calculate_activation_statistics(images, inception_model, batch_size=128, dims=2048, cuda=False):
    inception_model.eval()
    act = np.empty((len(images), dims))

    if cuda:
        batch = images.cuda()
    else:
        batch = images

    pred = inception_model(batch)[0]

    # If model output is not scalar, apply global spatial average pooling.
    # This happens if you choose a dimensionality not equal 2048.
    if pred.size(2) != 1 or pred.size(3) != 1:
        pred = adaptive_avg_pool2d(pred, output_size=(1, 1))

    act = pred.cpu().data.numpy().reshape(pred.size(0), -1)

    mu = np.mean(act, axis=0)
    sigma = np.cov(act, rowvar=False)
    return mu, sigma

In [22]:
def calculate_frechet_distance(mu1, sigma1, mu2, sigma2, eps=1e-6):
    """Numpy implementation of the Frechet Distance.
    The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1)
    and X_2 ~ N(mu_2, C_2) is
            d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)).
    """

    mu1 = np.atleast_1d(mu1)
    mu2 = np.atleast_1d(mu2)

    sigma1 = np.atleast_2d(sigma1)
    sigma2 = np.atleast_2d(sigma2)

    assert mu1.shape == mu2.shape, \
        'Training and test mean vectors have different lengths'
    assert sigma1.shape == sigma2.shape, \
        'Training and test covariances have different dimensions'

    diff = mu1 - mu2

    covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False)
    if not np.isfinite(covmean).all():
        msg = ('fid calculation produces singular product; '
               'adding %s to diagonal of cov estimates') % eps
        print(msg)
        offset = np.eye(sigma1.shape[0]) * eps
        covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset))

    if np.iscomplexobj(covmean):
        if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3):
            m = np.max(np.abs(covmean.imag))
            raise ValueError('Imaginary component {}'.format(m))
        covmean = covmean.real

    tr_covmean = np.trace(covmean)

    return (diff.dot(diff) + np.trace(sigma1) +
            np.trace(sigma2) - 2 * tr_covmean)

In [23]:
def calculate_frechet(images_real, images_fake, model):
    mu_1, std_1 = calculate_activation_statistics(images_real, model, cuda=True)
    mu_2, std_2 = calculate_activation_statistics(images_fake, model, cuda=True)

    # Get Frechet distance
    fid_value = calculate_frechet_distance(mu_1, std_1, mu_2, std_2)
    return fid_value

# Hyperparameter optimisation (gridsearch)

In [29]:
# Root directory for dataset
dataroot = PATH + "data/"

# Dataset name
dataset_name = 'mnist'  # 'cifar10' or 'mnist'

# Number of workers for dataloader
workers = 2

# Spatial size of training images. All images will be resized to this size using a transformer
image_size = 28  # 28 for mnist, 64 for others

# Size of z latent vector (i.e. size of generator input)
nz = 128

# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1

In [37]:
create_repo_paths(PATH)

In [30]:
dataset, nc = get_dataset(dataset_name, image_size, dataroot)

# Decide which device we want to run on
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

In [None]:
grid_search(ngpu, device, dataset, workers,
            experiment_prefix='',           # add an extra word at the begining to the save path of the models and stats
            batch_size_list=[128],
            shuffle_list=[True],
            num_epochs_list=[300],
            loss_name_list=['wass'],        # wass, hinge
            optimizer_name_list=['adam'],   # 'adam' 'sgd' 'rmsprop'
            beta1_list=[0.9],               # 0.9 == default # Beta1 hyperparam for Adam optimizers
            lr_list=[1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7],
            momentums_list=[(0, 0)],        # [(momentumD, momentumG)]
            plot=False,
            save_stats=True,                # save the stats to disk
            create_dir=True,                # create the directories to save files
            save_epochs=10,                 # save the model every save_epochs epochs
            save_models=True,               # save the models to disk
            manualSeed=123,                 # keep at 123
            nc=nc, nz=nz
            )

In [None]:
grid_search(ngpu, device, dataset, workers,
            experiment_prefix='',           # add an extra word at the begining to the save path of the models and stats
            batch_size_list=[128],
            shuffle_list=[True],
            num_epochs_list=[300],
            loss_name_list=['wass'],        # wass, hinge
            optimizer_name_list=['sgd'],    # 'adam' 'sgd' 'rmsprop'
            beta1_list=[0.9],               # 0.9 == default # Beta1 hyperparam for Adam optimizers
            lr_list=[1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7],
            momentums_list=[(0, 0)],        # [(momentumD, momentumG)]
            plot=False,
            save_stats=True,                # save the stats to disk
            create_dir=True,                # create the directories to save files
            save_epochs=10,                 # save the model every save_epochs epochs
            save_models=True,               # save the models to disk
            manualSeed=123,                 # keep at 123
            nc=nc, nz=nz
            )

# Training example

## Parameters

In [None]:
# Root directory for dataset
dataroot = PATH + "data/"

# Dataset name
dataset_name = 'mnist'  # 'cifar10' or 'mnist'

# Number of workers for dataloader
workers = 2

# Batch size during training
batch_size = 128

# Spatial size of training images. All images will be resized to this size using a transformer.
image_size = 28  # 28 for mnist, 64 for others

# Size of z latent vector (i.e. size of generator input)
nz = 128

# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1

In [None]:
# Number of training epochs
num_epochs = 3

# Learning rate for optimizers
lrD = 2e-4
lrG = 2e-4

# Beta1 hyperparam for Adam optimizers
beta1 = 0.9  # 0.9 == default

In [None]:
dataset, nc = get_dataset(dataset_name, image_size, dataroot)

# Create the dataloader
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                         shuffle=True, num_workers=workers)

# Decide which device we want to run on
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

In [None]:
loss_name = "wass"  # wass, hinge
iter_dis, iter_gen, grad_penalty_coef = 1, 1, 0.0

if loss_name == "wass":
    iter_dis, grad_penalty_coef = 5, 10.0

netG = init_net(Generator(ngpu, nc, nz), device, ngpu)
print('Generator parameters', count_parameters(netG))

netD = init_net(Discriminator(ngpu, nc, loss_name), device, ngpu)
print('Discriminator parameters', count_parameters(netD))

## Run

In [None]:
fixed_noise, real_label, fake_label, optimizerD, optimizerG = init_optimizers(netD, netG, lrD, lrG, beta1, nz, device)
experiment_prefix = ''  # add extra word to add the automatically generate one if you really need it, ideally keep empty
gan_training = Training(loss_name, netD, netG, device, real_label, fake_label,
                        dataloader, num_epochs, fixed_noise,
                        grad_penalty_coef, lrD, lrG, beta1, experiment_prefix, save_models, PATH, save_stats=True, create_dir=True,
                        iter_per_epoch_dis=1, iter_per_epoch_gen=1, grad_penalty_coef=0.0)

stats = gan_training.train()

In [None]:
img_list = stats['img_list']
G_losses = stats['G_losses']
D_losses = stats['D_losses']

# Visualisation

In [None]:
plot_loss(G_losses, D_losses, PATH, save=False)

In [None]:
plot_realvsfake(dataloader, device, img_list, PATH, save=False)

## G’s progression



In [None]:
fig = plt.figure(figsize=(8, 8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i, (1, 2, 0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

# Serialisation

In [None]:
epoch = 999999999
experiment_prefix = ''
experiment_path, stats_path, models_path = generate_paths(PATH, experiment_prefix, loss_name, lrD, lrG, beta1, iter_dis, iter_gen, grad_penalty_coef, create_dir=True)
save_path_G, save_path_D = model_paths(experiment_path, epoch, models_path)

print(experiment_path)
print(stats_path)
print(save_path_G)
print(save_path_D)

In [None]:
save_models(netG, netD, save_path_G, save_path_D)

In [None]:
pickle_save(stats, stats_path)

In [None]:
# Load model
netD, netG = load_models(ngpu, Discriminator, Generator, save_path_G, save_path_D, nc, nz, loss_name, device)

stats = pickle_load(stats_path)

# Metrics

In [None]:
def sample_gen(netG, nz, data):
    with torch.no_grad():
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)

        noise = torch.randn(b_size, nz, 1, 1, device=device)
        fake = netG(noise)
    return real_cpu, fake

In [None]:
# Create the dataloader
batch_size_eval = 100

dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size_eval,
                                         shuffle=True, num_workers=workers)

In [None]:
# Load inception model
block_idx = InceptionV3.BLOCK_INDEX_BY_DIM[2048]
inception_model = InceptionV3([block_idx])
inception_model = inception_model.cuda()

In [None]:
# Take first batch from the dataloader to get 500 samples :
with torch.no_grad():
    sample_batch = next(iter(dataloader))

    real_cpu, fake = sample_gen(netG, nz, sample_batch)

    t_frechet = time.time()
    frechet_dist = calculate_frechet(real_cpu, fake, inception_model)
    print('frechet dist:', frechet_dist, '| time to calculate :', time.time()-t_frechet, 's')