# A simple VAE


## Library imports
The following code contains the required libraries for the notebook.

In [1]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

import torchvision
from torchvision import transforms
from torchvision.utils import make_grid


#import torchbearer
#import torchbearer.callbacks as callbacks
#from torchbearer import Trial, state_key
#MU = state_key('mu')
#LOGVAR = state_key('logvar')

#import ...


## MNIST

In [160]:
#MNIST Settings
#classes = [0, 1, 4, 9]
batch_size = 64
eval_batch_size = 64

In [161]:
#MNIST Import
#source 7.1 autoencoder lab
from torchvision.datasets import MNIST

# transforms into tensor, can be extend
transformations = transforms.Compose([transforms.ToTensor()]) 


# Define the train and test sets
train_data = MNIST("./", train=True,  transform=transformations, download=True)
test_data  = MNIST("./", train=False, transform=transformations)


#def stratified_sampler(labels):
#    """Sampler that only picks datapoints corresponding to the specified classes"""
#    (indices,) = np.where(reduce(lambda x, y: x | y, [labels.numpy() == i for i in classes]))
#    indices = torch.from_numpy(indices)
#    return SubsetRandomSampler(indices)

#load the datasets into DataLoader classes, sampler removed!
train_loader = DataLoader(train_data, batch_size=batch_size)
test_loader  = DataLoader(test_data, batch_size=eval_batch_size)


## Fashion MNIST

In [162]:
from torchvision.datasets import FashionMNIST





## The Model 

inspired by: https://www.kaggle.com/ethanwharris/fashion-mnist-vae-with-pytorch-and-torchbearer

In [163]:
#https://debuggercafe.com/getting-started-with-variational-autoencoder-using-pytorch/
def final_loss(bce_loss, mu, logvar):
    BCE = bce_loss
    KLD = -.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return BCE + KLD

In [164]:
#Settings
latent_size = 5
learning_rate = 1e-3
num_epochs = 20

In [171]:
conv1 = (1, 32, 4, 1, 2)
conv2 = (conv1[1], 32, 4, 2, 1)
conv3 = (conv2[1], 64, 4, 2, 1)


##Look at 7.2 to see how to make encoder and decoder take non-fixed size image. 

class VariationalAutoEncoder(nn.Module):
    def __init__(self, latent_size):
        super(VariationalAutoEncoder,self).__init__()
        self.latent_size = latent_size
        
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=conv1[0],out_channels=conv1[1],kernel_size=conv1[2],stride=conv1[3],padding=conv1[4]),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=conv2[0],out_channels=conv2[1],kernel_size=conv2[2],stride=conv2[3],padding=conv2[4]),
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=conv3[0],out_channels=conv3[1],kernel_size=conv3[2],stride=conv3[3],padding=conv3[4]),
        )
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(in_channels=conv3[1],out_channels=conv3[0],kernel_size=conv3[2],stride=conv3[3],padding=conv3[4]),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(in_channels=conv2[1],out_channels=conv2[0],kernel_size=conv2[2],stride=conv2[3],padding=conv2[4],output_padding=1),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(in_channels=conv1[1],out_channels=conv1[0],kernel_size=conv1[2],stride=conv1[3],padding=conv1[4]),
            )
        
        self.mu = nn.Linear(64 * 7 * 7, latent_size)
        self.logvar = nn.Linear(64 * 7 * 7, latent_size)
        self.upsample = nn.Linear(latent_size, 64 * 7 * 7)
        
        
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu)

        
    # https://debuggercafe.com/getting-started-with-variational-autoencoder-using-pytorch/    
    def forward(self, x):
        # encoding
        
        x = self.encoder(x)
        x = x.view(-1,64*7*7)
        mu = self.mu(x)
        log_var = self.logvar(x)
        
        # get the latent vector through reparameterization
        z = self.reparameterize(mu, log_var)
        # decoding
        z = self.upsample(z)
        #print(z.shape)
        z = z.view(-1,64,7,7)
        
        x = self.decoder(z)
        reconstruction = torch.sigmoid(x)
        
        return reconstruction, mu, log_var
        
     
print(VariationalAutoEncoder(latent_size=latent_size))

VariationalAutoEncoder(
  (encoder): Sequential(
    (0): Conv2d(1, 32, kernel_size=(4, 4), stride=(1, 1), padding=(2, 2))
    (1): LeakyReLU(negative_slope=0.01)
    (2): Conv2d(32, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01)
    (4): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(64, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.01)
    (2): ConvTranspose2d(32, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (3): LeakyReLU(negative_slope=0.01)
    (4): ConvTranspose2d(32, 1, kernel_size=(4, 4), stride=(1, 1), padding=(2, 2))
  )
  (mu): Linear(in_features=3136, out_features=5, bias=True)
  (logvar): Linear(in_features=3136, out_features=5, bias=True)
  (upsample): Linear(in_features=5, out_features=3136, bias=True)
)


### KL

In [172]:
#def beta_kl(mu_key, logvar_key, beta=5):
#    #@callbacks.add_to_loss
#    def callback(state):
#        mu = state[mu_key]
#        logvar = state[logvar_key]
#        return -0.5*torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) * beta
#    
#    return callback

### Visualisation

In [173]:
#def plot_progress(key=torchbearer.Y_PRED, num_images=100, nrow=10):
#    #@callbacks.on_step_validation
#    #@callbacks.once_per_epoch
#    def callback(state):
#        images = state[key]
#        image = make_grid(images[:num_images], nrow=nrow, normalize=True)[0, :, :]
#        plt.imshow(image.detach().cpu().numpy(), cmap="gray")
#        plt.show()
#    
#    return callback

### Training

In [174]:
vae = VariationalAutoEncoder(latent_size=latent_size)
optimizer = optim.Adam(vae.parameters(),lr=learning_rate)
criterion = nn.BCELoss(reduction='sum')




In [184]:
def fit_vae(vae, train_loader):
    vae.train()
    running_loss = 0.0
    # Run each batch in training dataset
    for x, y in train_loader:
        # x.view?
        x = x.to(device)
        optimizer.zero_grad()
        reconstruction, mu, logvar = vae(x)
        bce_loss = criterion(reconstruction, x)
        loss = final_loss(bce_loss, mu, logvar)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        
    train_loss = running_loss/len(train_loader.dataset)
    return train_loss

def test_vae(vae, test_loader):
    vae.eval()
    running_loss = 0.0
    with torch.no_grad():
        for x, y in test_loader:
            # x.view?
            x = x.to(device)
            reconstruction, mu, logvar = vae(x)
            bce_loss = criterion(reconstruction, x)
            loss = final_loss(bce_loss, mu, logvar)
            running_loss += loss.item()
            
    val_loss = running_loss/len(test_loader.dataset)
    return val_loss
            

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f">> Using device: {device}")
vae = vae.to(device)

##
train_loss = []
test_loss = []

for current_epoch in range(num_epochs):
    print(f"Epoch {current_epoch+1} of {num_epochs}")
    train_epoch_loss = fit_vae(vae, train_loader)
    test_epoch_loss = test_vae(vae, test_loader)
    train_loss.append(train_epoch_loss)
    test_loss.append(test_epoch_loss)
    print(f"Train Loss: {train_epoch_loss:.4f}")
    print(f"Val Loss: {test_epoch_loss:.4f}")
    


>> Using device: cuda:0
Epoch 1 of 20
Train Loss: 124.2903
Val Loss: 126.9559
Epoch 2 of 20
Train Loss: 123.5877
Val Loss: 126.7237
Epoch 3 of 20
Train Loss: 123.0580
Val Loss: 125.1069
Epoch 4 of 20
Train Loss: 122.6120
Val Loss: 125.2017
Epoch 5 of 20
Train Loss: 122.2345
Val Loss: 124.3963
Epoch 6 of 20


### Training (with torchbearer)

In [98]:
vae = VariationalAutoEncoder(latent_size=latent_size)
optimizer = optim.Adam(vae.parameters(),lr=learning_rate)
trial = Trial(
    vae, 
    optimizer, 
    nn.MSELoss(reduction='mean'), metrics=['acc', 'loss'], 
    callbacks=[
        beta_kl(MU, LOGVAR),
        callbacks.ConsolePrinter(),
        plot_progress()],
    verbose=1).with_generators(train_generator=train_loader,test_generator=test_loader)
trial.to('cuda')
trial.run(5)
trial.evaluate(verbose=0, data_key=torchbearer.TEST_DATA)

NameError: name 'Trial' is not defined

In [55]:
vae = VariationalAutoEncoder(latent_size=latent_size)
optimizer = optim.Adam(vae.parameters(),lr=learning_rate)
trial = Trial(
    vae, 
    optimizer, 
    #mean-squared error or cross-entropy
    nn.MSELoss(reduction='sum'), metrics=['acc', 'loss'], 
    callbacks=[
        beta_kl(MU, LOGVAR),
        callbacks.ConsolePrinter(),
        plot_progress()],
    verbose=1).with_generators(train_generator=train_loader,test_generator=test_loader)
trial.to('cuda')
trial.run(5)
trial.evaluate(verbose=0, data_key=torchbearer.TEST_DATA)

NameError: name 'Trial' is not defined