In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F


# Problem 1

a) Instantiate a sequence to sequence transformer (self-attention) using a standard API (tensorflow, pytorch, etc). Verify with some examples that it is permutation equivariant.

b) Prove analytically using the defining equations of self-attention that the transformer architecture is permutation equivariant.

c) Repeat (b) for multi-headed attention.

# Problem 2

a) Train the MNIST VAE from class. Illustrate with some example scans across the 2d latent space how the digits morph from one into another.


Here we've built a VAE from scratch (I chose to do this so I could learn what was going on under the hood). First, let's instantiate the data. 

In [None]:

# Import MNIST dataset

from torchvision import datasets
import torchvision.transforms as T
from torch.utils.data import DataLoader


mnist_data = datasets.MNIST(root='data', train=True, 
                            transform = T.Compose(
                                        [T.ToTensor(), 
                                        T.Lambda(lambda x: x.view(-1))]
                                        ),
                            download=False)

# Parameters for training
batch_size = 100
batch_size_test = 100
num_epochs = 10


# Try using dataloader module to randomly provide datapts from dataset

size_mnist = 28*28

Now we'll architect the VAE.

In [None]:
# Architecting the variational autoencoder

class VAE_MNIST(nn.Module):
    def __init__(self, latent_space_dim): 
        super(VAE_MNIST, self).__init__()
        
        # define the encoder 
        layer_one_size = 500
        layer_two_size = 200
        layer_three_size = 20
        
        self.fc1 = nn.Linear(size_mnist, layer_one_size)
        self.fc2 = nn.Linear(layer_one_size, layer_two_size)
        self.fc3 = nn.Linear(layer_two_size, layer_three_size)
        self.mean = nn.Linear(layer_three_size, latent_space_dim)
        self.std_dev = nn.Linear(layer_three_size, latent_space_dim)
        
    
        # define decoder layers 
        
        self.dfc1 = nn.Linear(latent_space_dim, layer_three_size)
        self.dfc2 = nn.Linear(layer_three_size, layer_two_size)
        self.dfc3 = nn.Linear(layer_two_size, layer_one_size)
        self.img = nn.Linear(layer_one_size, size_mnist)
        
        # Bring the image down multiple layers into the latent dimensional space
        
    def encoder(self, x): 
        
        
        h = F.leaky_relu(self.fc1(x))
        h = F.leaky_relu(self.fc2(h))
        h = self.fc3(h)
        return self.mean(h), self.std_dev(h)

    
    # Performs the reparameterization trick
    
    def sampling(self, mu, log_variance):
        std = torch.exp(0.5*log_variance)
        epsilon = torch.randn_like(std)
        return mu + std*epsilon # double check if this is allowed 
    
    # Bring point in latent dimensional space to image space 
    
    def decoder(self, x): 
        h = F.leaky_relu(self.dfc1(x))
        h = F.leaky_relu(self.dfc2(h))
        h = F.leaky_relu(self.dfc3(h))
        
        return torch.sigmoid(self.img(h))

    # Forward step 
    
    def forward(self, x): 
        # encode in latent space
        mu, log_variance = self.encoder(x)
        # sample from the distribution
        z = self.sampling(mu, log_variance)
        # decode 
        return self.decoder(z), mu, log_variance
    
    def loss(self, output, input, mu, log_variance):
        # Since the prior is just the normal dist. the KL divergence takes on the form 
        KL = -1/2*torch.sum(1+log_variance-mu.pow(2)-log_variance.exp())
        # Binary cross-entropy is same as usual
        BCE = F.binary_cross_entropy(output, input, reduction='sum')
        return BCE+KL

In [None]:
from torch.utils.data import Subset

# Define autoencoder model

mnist_vae = VAE_MNIST(latent_space_dim=2)
mnist_vae_optim = torch.optim.Adam(mnist_vae.parameters())

# train the model


def vae_train(model, epoch_size, dataset, optimizer): 
    model.train()
    dataloader = DataLoader(dataset=Subset(mnist_data, np.random.choice(len(mnist_data), 5000)), batch_size=100, shuffle=True)
    for epoch in range(epoch_size):
        for input, target in dataloader:
            optimizer.zero_grad()
            output, mu, log_variance = model(input)
            
            loss = model.loss(output, input, mu, log_variance)
            loss.backward()
            optimizer.step()

In [None]:
vae_train(mnist_vae, 20, mnist_data, mnist_vae_optim)

In [None]:
# Map out all of the different numbers in our latent space


# plot_dataloader = DataLoader(dataset=Subset(mnist_data, np.random.choice(len(mnist_data), 1000)), batch_size=1, shuffle=True)
# colors = plt.cm.tab10(np.linspace(0,1,10))


# xlist  = []
# ylist = []
# for input, target in plot_dataloader:
#     result, mu, log_variance = mnist_vae(input)
#     with torch.no_grad():
#         plt.scatter(mu.numpy()[0][0], mu.numpy()[0][1], color = colors[target])


# plt.show()


# Map out all of the different numbers in our latent space


colors = plt.cm.tab10(np.linspace(0,1,10))


empty = []
list_coords = [[[],[]] for _ in range(10)]
    
with torch.no_grad():
    for input, target in Subset(mnist_data, np.random.choice(len(mnist_data),5000)):
        result, mu, log_variance = mnist_vae(input)
        list_coords[target][0].append(mu[0])
        list_coords[target][1].append(mu[1])

    for target, element in enumerate(list_coords):
        plt.scatter(element[0], element[1], color = colors[target], marker = '.')
plt.show()


In [None]:
# Now attempt to generate a number from the graph

# new_image = new_image.reshape((28,28)) 
# with torch.no_grad():
#     plt.imshow(new_image.numpy(), cmap='Greys')
    
# grid


rows = 10
cols = 10

xlist = np.linspace(-4, 4, rows)
ylist = np.linspace(-4, 4, cols)

fig, axes = plt.subplots(rows, cols)

xpts, ypts = np.meshgrid(xlist, ylist)

plt.plot(xpts, ypts)
    
    
with torch.no_grad():
    for i, row in enumerate(xpts): 
        for j, item in enumerate(xpts):
            new_img = mnist_vae.decoder(torch.tensor([xpts[i][j], ypts[i][j]], dtype=torch.float))
            new_img = new_img.reshape((28, 28))
            im = axes[i][j].imshow(new_img.numpy(), cmap = 'Greys')
            axes[i][j].set_xticks([])
            axes[i][j].set_yticks([])



b) Train a vanilla autoencoder on MNIST and compare the latent space to the VAE.

In [None]:
# Compare to a standard autoencoder and see the results


class standard_autoencoder(nn.Module):
    def __init__(self, latent_space_dim): 
        super(standard_autoencoder, self).__init__()
        
        # define the encoder 
        layer_one_size = 100
        layer_two_size = 50
        layer_three_size = 20
        
        self.fc1 = nn.Linear(size_mnist, layer_one_size)
        self.fc2 = nn.Linear(layer_one_size, layer_two_size)
        self.fc3 = nn.Linear(layer_two_size, layer_three_size)
        self.mean = nn.Linear(layer_three_size, latent_space_dim)
        self.std_dev = nn.Linear(layer_three_size, latent_space_dim)
        
        
        # define decoder layers 
        
        self.dfc1 = nn.Linear(latent_space_dim, layer_three_size)
        self.dfc2 = nn.Linear(layer_three_size, layer_two_size)
        self.dfc3 = nn.Linear(layer_two_size, layer_one_size)
        self.img = nn.Linear(layer_one_size, size_mnist)
        
        
        # Bring the image down multiple layers into the latent dimensional space
        
    def encoder(self, x): 
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(h))
        h = F.relu(self.fc3(h))
        return self.mean(h), self.std_dev(h)
    
    # Bring point in latent dimensional space to image space 
    
    def decoder(self, x): 
        h = F.relu(self.dfc1(x))
        h = F.relu(self.dfc2(h))
        h = F.relu(self.dfc3(h))
        
        return torch.sigmoid(self.img(h))

    # Forward step 
    
    def forward(self, x): 
        # encode in latent space
        mu, log_variance = self.encoder(x)
        # decode 
        return self.decoder(mu), mu, log_variance
    
    def loss(self, output, input, mu, log_variance):
        # Just the binary cross-entropy loss
        BCE = F.binary_cross_entropy(output, input, reduction='sum')
        return BCE


In [None]:
# train this normal autoencoder 

mnist_autoencoder = standard_autoencoder(latent_space_dim=2)
mnist_autoencoder_optimizer = torch.optim.Adam(mnist_autoencoder.parameters())  

vae_train(model=mnist_autoencoder, epoch_size=20, dataset=mnist_data, optimizer=mnist_autoencoder_optimizer)

In [None]:


colors = plt.cm.tab10(np.linspace(0,1,10))

empty = []
list_coords = [[[],[]] for _ in range(10)]
    
with torch.no_grad():
    for input, target in Subset(mnist_data, np.random.choice(len(mnist_data),5000)):
        result, mu, log_variance = mnist_autoencoder(input)
        list_coords[target][0].append(mu[0])
        list_coords[target][1].append(mu[1])

    for target, element in enumerate(list_coords):
        plt.scatter(element[0], element[1], color = colors[target], marker = '.')
plt.show()


In [None]:
# Plot one of those cool diagrams


rows = 15
cols = 15

xlist = np.linspace(-10, 10, rows)
ylist = np.linspace(-10, 20, cols)

fig, axes = plt.subplots(rows, cols)

xpts, ypts = np.meshgrid(xlist, ylist)

plt.plot(xpts, ypts)
    
    
with torch.no_grad():
    for i, row in enumerate(xpts): 
        for j, item in enumerate(xpts):
            new_img = mnist_autoencoder.decoder(torch.tensor([xpts[i][j], ypts[i][j]], dtype=torch.float))
            new_img = new_img.reshape((28, 28))
            im = axes[i][j].imshow(new_img.numpy(), cmap = 'Greys')
            axes[i][j].set_xticks([])
            axes[i][j].set_yticks([])


# Problem 3

a) Train a vanilla GAN, WGAN and variational autoencoder on MNIST data. You can use the examples provided in class. Generate samples from each and train a binary classifier on each vs the reference data. What AUC scores do you get?

b) Train a log posterior metric on all three models. Which performs best?

c) Train a supervised classifier on MNIST and use this to diagnose mode collapse in the three generative models. How do they fare?

In [71]:
class Discriminator(nn.Module): 
    def __init__(self, input_dim): 
        super(Discriminator, self).__init__()
        
        layer_one_size = 512
        layer_two_size = 256
        
        self.model = nn.Sequential(
            nn.Linear(input_dim, layer_one_size), 
            nn.ReLU(), 
            nn.Linear(layer_one_size, layer_two_size), 
            nn.ReLU(), 
            nn.Linear(layer_two_size, 1), #bring it to just one output
        )
    
    def forward(self, input): 
        return self.model(input)

    def loss(self, input, output): 
        return F.binary_cross_entropy(input, output)

class Generator(nn.Module): 
    def __init__(self, latent_dim_size, input_dim): 
        super(Generator, self).__init__()
        
        
        layer_one_size = 64
        layer_two_size = 128
        layer_three_size = 256
        
        self.model = nn.Sequential(
            nn.Linear(latent_dim_size, layer_one_size), 
            nn.LeakyReLU(0.2), 
            nn.Dropout(0.3),
            nn.Linear(layer_one_size, layer_two_size), 
            nn.LeakyReLU(0.2), 
            nn.Dropout(0.3),
            nn.Linear(layer_two_size, layer_three_size), 
            nn.LeakyReLU(0.2), 
            nn.Dropout(0.3),nn.Linear(layer_three_size, input_dim), 
            nn.LeakyReLU(0.2), 
            nn.Sigmoid()  # a good idea since the MNIST data set is bound [0,1]
        )
        
        self.latent_dim_size = latent_dim_size
        
    def forward(self, input): 
        return self.model(input)

    def loss(self, input, output):
        return F.binary_cross_entropy(input, output)


In [None]:
# we need to define the losses 

def train_gan(discriminator: Discriminator, generator: Generator, discriminator_optimizer, generator_optimizer, dataset, epochs): 
    # first generate set of data
    
    batch_size = 100
    
    dataloader = DataLoader(dataset=Subset(dataset, np.random.choice(len(dataset), 10000)), batch_size=batch_size, shuffle=True)
    for epoch in range(epochs):
        for i, real_data in enumerate(dataloader):
            # generate elements
            latent_space_sample = torch.randn((batch_size, generator.latent_dim_size))
            generated_data = generator(latent_space_sample)
            
            data_in = real_data[0]
            
            real_labels = torch.ones((batch_size))
            fake_labels = torch.zeros((batch_size))

            
            # feed to discriminator
            discriminator.train()
            discriminator_optimizer.zero_grad()
            real_loss = F.binary_cross_entropy_with_logits(discriminator(data_in), real_labels.unsqueeze(1))
            fake_loss = F.binary_cross_entropy_with_logits(discriminator(generated_data), fake_labels.unsqueeze(1))
            discriminator_loss = real_loss+fake_loss
            discriminator_loss.backward()
            discriminator_optimizer.step()
            
            # Now, train the generator
            
            latent_space_sample = torch.randn((batch_size, generator.latent_dim_size))
            
            generator.train()
            
            generator_optimizer.zero_grad()
            
            output_discriminator_generated = discriminator(generator(latent_space_sample))
            # test against all ones, see how it does
            generated_loss  = F.binary_cross_entropy_with_logits(output_discriminator_generated, real_labels.unsqueeze(1))
            
            generated_loss.backward()
            generator_optimizer.step()
            
        if epoch%5==0:
            with torch.no_grad():
                new_img = (generator(torch.randn((generator.latent_dim_size))).reshape((28,28)).numpy())
                plt.imshow(new_img, cmap='Grays')
            print(f'Current Generator Loss: {generated_loss}')
            print(f'Current Discriminator Loss: {discriminator_loss}')
            plt.show()


In [79]:
mnist_generator = Generator(32, 28*28)
mnist_discriminator = Discriminator(28*28)

mnist_generator_optim = torch.optim.Adam(mnist_generator.parameters(), lr=0.001)
mnist_discriminator_optim = torch.optim.Adam(mnist_discriminator.parameters(), lr=0.001)

train_gan(mnist_discriminator, mnist_generator, mnist_discriminator_optim, mnist_generator_optim, mnist_data, 200)

ValueError: Target size (torch.Size([100])) must be the same as input size (torch.Size([100, 1]))