# Import Libraries

In [None]:
import torch
from torch import nn
import torch.utils.data as data_utils
from torch.utils.data import DataLoader, ConcatDataset
from IPython import display
import math
import torchvision
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.multivariate_normal import MultivariateNormal
from torch.distributions.bernoulli import Bernoulli


# Autoencoders

Below are the classes for the autoencoders with both 2 and 3 hidden layers. The input size, dimensions of the hidden layers, the latent size are the tunable parameters of the models.

Each class contains 3 functions:

The encode function - which reduces input data to the dimensions of the specified latent size

The decode function -  which decodes data from latent variable representation back into data with original dimensions.

The forward function - which is a standard function in neural networks which when called, in this case returns the output from the whole process of encoding and decoding data.

In [None]:



class AE2(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, latent_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.fc3 = nn.Linear(hidden_size2, latent_size)
        self.fc4 = nn.Linear(latent_size, hidden_size2)
        self.fc5 = nn.Linear(hidden_size2, hidden_size1)
        self.fc6 = nn.Linear(hidden_size1, input_size)
        self.lrelu = nn.LeakyReLU(0.1)

    def encode(self, x):
        p_x = self.lrelu(self.fc1(x))
        p_x = self.lrelu(self.fc2(p_x))
        p_x = self.lrelu(self.fc3(p_x))

        return p_x

    def decode(self, z_x):
        q_x = self.lrelu(self.fc4(z_x))
        q_x = self.lrelu(self.fc5(q_x))
        q_x = torch.sigmoid(self.fc6(q_x))

        return q_x

    def forward(self, x):
        p_x = self.encode(x)
        q_z = self.decode(p_x)

        return q_z, p_x

In [None]:



class AE3(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, latent_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.fc3 = nn.Linear(hidden_size2, hidden_size3)
        self.fc4 = nn.Linear(hidden_size3, latent_size)
        self.fc5 = nn.Linear(latent_size, hidden_size3)
        self.fc6 = nn.Linear(hidden_size3, hidden_size2)
        self.fc7 = nn.Linear(hidden_size2, hidden_size1)
        self.fc8 = nn.Linear(hidden_size1, input_size)
        self.lrelu = nn.LeakyReLU(0.1)

    def encode(self, x):
        p_x = self.lrelu(self.fc1(x))
        p_x = self.lrelu(self.fc2(p_x))
        p_x = self.lrelu(self.fc3(p_x))

        return p_x

    def decode(self, z_x):
        q_x = self.lrelu(self.fc4(z_x))
        q_x = self.lrelu(self.fc5(q_x))
        q_x = torch.sigmoid(self.fc6(q_x))

        return q_x

    def forward(self, x):
        p_x = self.encode(x)
        q_z = self.decode(p_x)

        return q_z, p_x

# Variational Autoencoder

Below is the class for the variational autoencoder with 2 hidden layers. The input size, dimensions of the hidden layers, the latent size are the tunable parameters of the models.

The class contains the same 3 functions as the standard AE although they work slightly differently and produce different outputs:

The encode function - this reduced the data to a mean and standard deviation of the latent vairables. The latent variables are then obtained by sampling from normal distributions with the means and standard deviations (which is multiplied by some noise or error taken from a normal distribution) of the latent variables. The encode function takes data as an input and produces the latent variables, mean, log variance, and standard deviation as the ouptut.

The decode function -  takes latent variables as an input and decodes the data from latent variable representation back into data with original dimensions.

The forward function - in this case the forward function takes the data and the beta weighting value as input. It runs the data through the encode and decode functions, and returns the optimization criterion (made up of the sum of the kl divergence term multiplied by beta, and negative cross entropy loss), the ouput of the decode function and the latent variables.

In [None]:



class BernoulliVAE2(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2,  latent_size):
        super(BernoulliVAE2, self).__init__()
        # Encoder parameters
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.fc3 = nn.Linear(hidden_size2, latent_size)
        self.lrelu = nn.LeakyReLU(0.1)
        self.enc_mu = nn.Linear(latent_size, latent_size)
        self.enc_logvar = nn.Linear(latent_size, latent_size)

        # Distribution to sample for the reparameterization trick
        self.normal_dist = MultivariateNormal(torch.zeros(latent_size),
                                              torch.eye(latent_size))

        # Decoder parameters
        self.fc4 = nn.Linear(latent_size, hidden_size2)
        self.fc5 = nn.Linear(hidden_size2, hidden_size1)
        self.fc6 = nn.Linear(hidden_size1, input_size)

        





        # Reconstruction loss: binary cross-entropy
        self.criterion = nn.BCELoss(reduction='sum')


    def encode(self, x):
        # Obtain the parameters of the latent variable distribution
        h = self.lrelu(self.fc1(x))
        h = self.lrelu(self.fc2(h))
        h = self.lrelu(self.fc3(h))
        mu_e = self.enc_mu(h)
        logvar_e = self.enc_logvar(h)
        std = torch.exp(0.5 * logvar_e)
        noise = torch.randn_like(std)

        # Get a latent variable sample with the reparameterization trick
        
        z = mu_e + (std * noise)

        return z, mu_e, logvar_e, std

    def decode(self, z):
        # Obtain the parameters of the observation distribution
        h = self.lrelu(self.fc4(z))
        h = self.lrelu(self.fc5(h))
        output = torch.sigmoid(self.fc6(h))

        return output
        



    def forward(self, x, beta):
        """ Calculate the negative lower bound for the given input """
        z, mu_e, logvar_e, std = self.encode(x)
        output = self.decode(z)
        neg_cross_entropy = self.criterion(output, x)
        kl_div = -0.5* (1 + logvar_e - mu_e**2 - torch.exp(logvar_e)).sum()
        beta = beta

        # Since the optimizer minimizes, we return the negative
        # of the lower bound that we need to maximize
        return neg_cross_entropy + kl_div*beta, output, z

In [None]:
class BernoulliVAE2_1(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2,  latent_size):
        super(BernoulliVAE2_1, self).__init__()
        # Encoder parameters
        self.fc1 = nn.Linear(input_size, hidden_size1)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2)
        self.fc3 = nn.Linear(hidden_size2, latent_size)
        self.lrelu = nn.LeakyReLU(0.1)
        self.enc_mu = nn.Linear(latent_size, latent_size)
        self.enc_logvar = nn.Linear(latent_size, latent_size)

        # Distribution to sample for the reparameterization trick
        self.normal_dist = MultivariateNormal(torch.zeros(latent_size),
                                              torch.eye(latent_size))

        # Decoder parameters
        self.fc4 = nn.Linear(latent_size, hidden_size2)
        self.fc5 = nn.Linear(hidden_size2, hidden_size1)
        self.fc6 = nn.Linear(hidden_size1, input_size)

        





        # Reconstruction loss: binary cross-entropy
        self.criterion = nn.BCELoss(reduction='sum')


    def encode(self, x):
        # Obtain the parameters of the latent variable distribution
        h = self.lrelu(self.fc1(x))
        h = self.lrelu(self.fc2(h))
        h = self.lrelu(self.fc3(h))
        mu_e = self.enc_mu(h)
        logvar_e = self.enc_logvar(h)
        std = torch.exp(0.5 * logvar_e)
        noise = torch.randn_like(std)

        # Get a latent variable sample with the reparameterization trick
        
        z = mu_e + (std * noise)

        return z, mu_e, logvar_e, std

    def decode(self, z):
        # Obtain the parameters of the observation distribution
        h = self.lrelu(self.fc4(z))
        h = self.lrelu(self.fc5(h))
        output = torch.sigmoid(self.fc6(h))

        return output
        



    def forward(self, x, beta):
        """ Calculate the negative lower bound for the given input """
        z, mu_e, logvar_e, std = self.encode(x)
        output = self.decode(z)
        neg_cross_entropy = self.criterion(output, x)
        kl_div = -0.5* (1 + logvar_e - mu_e**2 - torch.exp(logvar_e)).sum()
        beta = beta

        # Since the optimizer minimizes, we return the negative
        # of the lower bound that we need to maximize
        return neg_cross_entropy + kl_div*beta, output, z