The goal of this lab is to implement a simple generative adversarial network for 2D data

In [None]:
# %%
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader
import torch.autograd as autograd

In [None]:
# Create a synthetic dataset
nb_samples = 10000
radius = 1
nz = .1
# generate the data
X_train = torch.zeros((nb_samples, 2))
r = radius + nz*torch.randn(nb_samples)
theta = torch.rand(nb_samples)*2*torch.pi
X_train[:, 0] = r*torch.cos(theta)
X_train[:, 1] = r*torch.sin(theta)

In [None]:
# Visualize the data
plt.figure(figsize=(6, 6))
plt.scatter(X_train[:, 0], X_train[:, 1], s=20, alpha=0.8, edgecolor='k', marker='o', label='original samples')
plt.grid(alpha=0.5)
plt.legend(loc='best')
plt.tight_layout()
plt.show()
# %%

In [None]:
# Implement a generator
# Generator is of class neural network
class Generator(nn.Module):
    def __init__(self, noise_dim=10):
        super(Generator, self).__init__()
        self.noise_dim = noise_dim
        # Code a neural network with Relu activation functions and 3 layers of size: 
        # noise_dim * 128
        # 128 * 64
        # 64 * 2
        self.model = nn.Sequential(
            #TODO
        )

    def forward(self, z):
        return self.model(z)


In [None]:
# Implement a discriminator
# Sometimes called a critic, because there are no [0, 1] constraintes in the WGANs setting 
class Discriminator(nn.Module):
    def __init__(self, noise_dim=10):
        super(Discriminator, self).__init__()
        # Code a neural network with Relu activation functions and 3 layers of size: 
        # noise_dim * 128
        # 128 * 64
        # 64 * 1
        self.noise_dim = noise_dim
        self.model = nn.Sequential(
            #TODO
        )

    def forward(self, x):
        return self.model(x)

In [None]:
# A function to monitor the generated samples while training
# It can be ignored 
def generate_images(generator_model, noise_dim, num_samples=1000):
    with torch.no_grad():
        z = torch.Tensor(np.random.normal(0, 1, (num_samples, noise_dim))).type(torch.float32)
        predicted_samples = generator_model(z)
    plt.figure(figsize=(6, 6))
    plt.scatter(X_train[:, 0], X_train[:, 1], s=40, alpha=0.2, edgecolor='k', marker='+', label='original samples')
    plt.scatter(predicted_samples[:, 0], predicted_samples[:, 1], s=10,
                alpha=0.9, c='r', edgecolor='k', marker='o', label='predicted')
    plt.grid(alpha=0.5)
    plt.legend(loc='best')
    plt.tight_layout()
    plt.show()


In [None]:
# Initialize the generator and the discriminator
noise_dim = 2
generator = Generator(noise_dim=noise_dim)
discriminator = Discriminator(noise_dim=noise_dim)

In [None]:
# Optimizers
lr_G = 0.001  # learning rate for the generator
lr_D = 0.001  # learning rate for the discriminator
n_epochs = 500  # number of "epochs"
clip_value = 0.3  # number of "epochs"
update_gen_every = 5  # update the generator every update_gen_every
batch_size = 128  # size of the batch
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr_G, betas=(0.5, 0.9))  # Optimizer for the generator 
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr_D, betas=(0.5, 0.9))  # Optimizer for the discriminator
dataloader = DataLoader(X_train, batch_size, shuffle=True)

In [None]:
# TODO: implement the alternated gradient descent ascent for a GAN 
for epoch in range(n_epochs):
    for i, x in enumerate(dataloader):
        # Configure input
        x = x.type(torch.float32)  # real data
        # ---------------------
        #  Train Discriminator
        # ---------------------
        optimizer_D.zero_grad()

        # Sample noise as generator input
        z = #TODO

        # Generate a batch of images
        # We do not want to update the generator in this loop
        fake_x =  # TODO
        # Adversarial loss
        loss_D = #TODO

        loss_D.backward()
        optimizer_D.step()

        # Clip weights of discriminator
        #TODO

        # Train the generator every n_critic iterations
        if i % update_gen_every == 0:
            # -----------------
            #  Train Generator
            # -----------------

            optimizer_G.zero_grad()

            # Generate a batch of images
            fake_x = # TODO
            # Adversarial loss
            loss_G = #TODO

            loss_G.backward()
            optimizer_G.step()

    # Visualization of intermediate results
    if epoch % 50 == 0:
        print("Epoch: ", epoch)
        generate_images(generator, noise_dim)

Wasserstein generative adversarial networks with gradent penalty

In [None]:
# Implementation of the gradient penalty
# The code below is bit hacky because of pytorch philosophy
def compute_gradient_penalty(D, real_samples, fake_samples):
    """Calculates the gradient penalty loss for WGAN GP"""
    # Random weight term for interpolation between real and fake samples
    alpha = torch.Tensor(np.random.random((real_samples.size(0), 1)))
    # Get random interpolation between real and fake samples
    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples)).requires_grad_(True)
    d_interpolates = D(interpolates)
    # Get gradient w.r.t. interpolates
    gradients = # TODO
    return gradient_penalty

In [None]:
# Initialize generator and discriminator
noise_dim = 2
generator = Generator(noise_dim=noise_dim)
discriminator = Discriminator()

In [None]:
# Optimizers
lr_G = 0.0001
lr_D = 0.001
n_epochs = 500  # 500
lambda_gp = 1.0
n_critic = 5
batch_size = 128
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr_G, betas=(0.5, 0.9))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr_D, betas=(0.5, 0.9))
dataloader = DataLoader(X_train, batch_size, shuffle=True)

In [None]:
for epoch in range(n_epochs):
    for i, x in enumerate(dataloader):
        # Configure input
        x = x.type(torch.float32)
        # ---------------------
        #  Train Discriminator
        # ---------------------
        optimizer_D.zero_grad()

        # Sample noise as generator input
        z = #TODO

        # Generate a batch of images
        fake_x =  # TODO
        # Adversarial loss
        gradient_penalty =  # TODO
        loss_D =  # TODO

        loss_D.backward()
        optimizer_D.step()


        # Train the generator every n_critic iterations
        if i % update_gen_every == 0:
            # -----------------
            #  Train Generator
            # -----------------

            optimizer_G.zero_grad()

            # Generate a batch of images
            fake_x = # TODO
            # Adversarial loss
            loss_G = # TODO

            loss_G.backward()
            optimizer_G.step()


    # Visualization of intermediate results
    if epoch % 10 == 0:
        print("Epoch: ", epoch)
        generate_images(generator, noise_dim)