# VAE and iVAE code is at the bottom #

In [1]:
# Imports

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

# packages needed for creating x
import torch
from torch import nn, Tensor
import torch.nn.functional as F

# import torch.optim as optim
import torch.nn.init as init
from torch.nn.parameter import Parameter

# from torch.nn.functional import softplus
from torch.distributions import Distribution, Normal

#import torch.nn as nn
import torch.distributions as dist

from typing import *

# data imports
from torch.utils.data import Dataset, DataLoader

# from torchvision.transforms import ToTensor
# from functools import reduce

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
def p(x):
    """Converts input array to pandas dataframe"""
    return pd.DataFrame(x)


##################################
# Generate Zs and Ys


def generate_z_and_y(E, samples, three_z=True):
    """
    Takes in a list of means representing different environments and
    generates latent variables (z's) and y for that environment

    Currently only works for 4 environments and 4 output variables

    Args:
        E (list): A list of four numbers representing four means of four different environments

    Returns:
        _envs (nested dictionary): A nested dictionary containing all environments
        and latent variables for each 
            ex: envs.keys() = [0,1] -> contains data for environment 0 and 1
                envs[0].keys() = ["Y","Zs"] -> Each environment contains a dictionary 
                with a numpy array of Y values and a numpy array for Z values 
                The Z values are organized column wise (ie the first column contains the first latent variable)
    """

    beta_z1 = 1  # np.random.normal(0,1)
    beta_z2 = 1  # np.random.normal(0,1)
    beta_z3 = 1  # np.random.normal(0,1)
    
    E_choice = np.random.choice(np.arange(0,len(E), 1), samples)
    Env = E[E_choice]
    
    Z1 = np.random.normal(Env, 1, size = samples)
    Z2 = np.random.normal(2*Env, 2, size = samples)
    Y = np.random.normal(Z1+Z2, 1, size = samples)
    Z3 = Y + np.random.normal(0, 1, size=samples)
    if three_z==True:
        Z = np.stack([Z1, Z2, Z3], axis = 1)
    else:
        Z = np.stack([Z1, Z2], axis = 1)
        
    return E_choice, Env, Y, Z


In [7]:
# importing required libraries
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt

def plot_latent_3d(df, m):
    # creating figure
    fig = plt.figure()
    ax = Axes3D(fig)

    # creating the plot
    plot = ax.scatter(df['Z1'], df['Z2'], df['Z3'], c=m)
    # setting title and labels
    ax.set_title("3D plot")
    ax.set_xlabel('Z1-axis')
    ax.set_ylabel('Z2-axis')
    ax.set_zlabel('Z3-axis')

    # displaying the plot
    plt.show()

In [2]:
# define network
class Net1(nn.Module):
    def __init__(self, num_features, num_hidden, num_output):
        super(Net1, self).__init__()
        torch.manual_seed(2)
        # hidden layer
        self.W_1 = Parameter(
            init.xavier_normal_(torch.Tensor(num_hidden, num_features))
        )
        self.b_1 = Parameter(init.constant_(torch.Tensor(num_hidden), 0))
        # output layer
        self.W_2 = Parameter(
            init.xavier_normal_(torch.Tensor(num_output, num_hidden))
        )
        self.b_2 = Parameter(init.constant_(torch.Tensor(num_output), 0))
        # define activation function in constructor
        self.activation = torch.nn.ReLU()

    def forward(self, x):
        x = F.linear(x, self.W_1, self.b_1)
        x = self.activation(x)
        x = F.linear(x, self.W_2, self.b_2)
        return x
    
class Net(nn.Module):
    def __init__(self, num_features, num_hidden, num_output):
        super().__init__()
        self.function = nn.Sequential(
            nn.Linear(in_features=num_features, out_features=num_hidden),
            nn.ReLU(),
            nn.Linear(in_features=num_hidden, out_features=num_output),
        )
    def forward(self, Z):
        return self.function(Z)


def generate_x_from_z(env, net):
    """ Runs neural network on latent variables to return X"""

    return net(torch.from_numpy(env["Zs"].astype("float32")))

NameError: name 'nn' is not defined

In [6]:
class Normal_dist(Distribution):
    def __init__(self, device='cpu'):
        super().__init__()
        self.device = device
        self.c = 2 * np.pi * torch.ones(1).to(self.device)
        self._dist = dist.normal.Normal(torch.zeros(1).to(self.device), torch.ones(1).to(self.device))
        self.name = 'gauss'

    def sample(self, mu, v):
        with torch.no_grad():
            eps = self._dist.sample(mu.size()).squeeze()
        scaled = eps.mul(v.sqrt())
        return scaled.add(mu)

    def log_pdf(self, x, mu, v, reduce=True, param_shape=None):
        """compute the log-pdf of a normal distribution with diagonal covariance"""
        if param_shape is not None:
            mu, v = mu.view(param_shape), v.view(param_shape)
        lpdf = -0.5 * (torch.log(self.c) + v.log() + (x - mu).pow(2).div(v))
        if reduce:
            return lpdf.sum(dim=-1)
        else:
            return lpdf

## VAE ##

In [None]:
import pdb
class UglyVAE(nn.Module):
    def __init__(self, x, y, e, input_size, output_size, input_size_de, output_size_de, input_size_pr, output_size_pr, beta=1):
        super().__init__()
        # prior_params
        self.prior_mean = torch.zeros(1).to('cpu')
        self.logl = torch.zeros(1).to('cpu')
        
        # Encoder
        self.func_en = nn.Sequential(
                nn.Linear(in_features=input_size, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=2*output_size)
            )
        
        # Decoder
        self.func_de = nn.Sequential(
                nn.Linear(in_features=input_size_de, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=output_size_de)
            )
        
        # Prior
        self.func = nn.Sequential(
                nn.Linear(in_features=input_size_pr, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=output_size_pr*2)
            )
        
        self.normal_dist = Normal_dist()
        
    def reparametrize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = self.normal_dist.sample(mu, std)
        return mu + eps * std
    
    def prior_params(self):
        return self.prior_mean, self.logl.exp()
    
    def forward(self, x, y, e, beta=1):
        # Format input
        xye=torch.cat((x, y, e), 1)
        
        # Get prior params
        prior_params = self.prior_params()
        
        # Get encoder params
        z_usigma = self.func_en(xye)
        zu, zsigma = z_usigma.chunk(2, dim = -1) # Get mu and sigma
        
        #zsigma = zsigma.exp()
        encoder_params = zu, zsigma

        # Sample Z
        z = self.reparametrize(zu, zsigma)

        # Get decoder params
        de_u = self.func_de(z)
        
        return zu, zsigma, de_u, z, prior_params

    
    def elbo(self, x, y, e, beta=1):
        zu, zsigma, de_u, z, prior_params = self.forward(x, y, e)
        # Samples from distributions with the acquired parameters
        lvar = 0.01*torch.ones(1).to('cpu')
        log_px_z = self.normal_dist.log_pdf(x, de_u, lvar) # p(x | u, sigma)
        log_qz_xy = self.normal_dist.log_pdf(z, zu, zsigma.exp())
        log_pz_ye = self.normal_dist.log_pdf(z, *prior_params)

        # And compute kl and elbo
        kl = -log_qz_xy + log_pz_ye

        elbo = log_px_z + beta*kl
        #pdb.set_trace()

        return elbo.mean()

    
    

## iVAE ##

In [None]:
import pdb
class UglyiVAE(nn.Module):
    def __init__(self, x, y, e, input_size, output_size, input_size_de, output_size_de, input_size_pr, output_size_pr, beta=1):
        super().__init__()
        # Encoder
        self.func_en = nn.Sequential(
                nn.Linear(in_features=input_size, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=2*output_size)
            )
        
                # Decoder
        self.func_de = nn.Sequential(
                nn.Linear(in_features=input_size_de, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=output_size_de)
            )
        
                # Prior
        self.func = nn.Sequential(
                nn.Linear(in_features=input_size_pr, out_features=6),
                nn.ReLU(),
                nn.Linear(in_features=6, out_features=output_size_pr*2)
            )
        
        self.normal_dist = Normal_dist()
        
        
    
    def forward(self, x, y, e, beta=1):
        # Format input
        xye=torch.cat((x, y, e), 1)
        
        # Get encoder params
        z_usigma = self.func_en(xye)
        zu, zsigma = z_usigma.chunk(2, dim = -1) # Get mu and sigma
        zsigma = zsigma.exp()
        encoder_params = zu, zsigma

        # Sample Z
        z = self.normal_dist.sample(zu, zsigma)

        # Get decoder params
        de_u = self.func_de(z)
        
        # Get prior params
        ye = torch.cat((y, e), 1)
        prior_u, prior_sigma = self.func(ye).chunk(2, axis = -1)
        prior_sigma = prior_sigma.exp()
        
        return zu, zsigma, de_u, prior_u, prior_sigma, z
    
    def elbo(self, x, y, e, beta=1):
        zu, zsigma, de_u, prior_u, prior_sigma, z = self.forward(x, y, e)
        # Samples from distributions with the acquired parameters
        lvar = 0.01*torch.ones(1).to('cpu')
        log_px_z = self.normal_dist.log_pdf(x, de_u, lvar) # p(x | u, sigma)
        log_qz_xye = self.normal_dist.log_pdf(z, zu, zsigma)
        log_pz_ye = self.normal_dist.log_pdf(z, prior_u, prior_sigma)

        # And compute kl and elbo
        kl = -log_qz_xye + log_pz_ye

        elbo = log_px_z + beta*kl

        return elbo.mean()

    
    

In [None]:
from torch.utils.data import Dataset, DataLoader

class EnvDataset(Dataset):
    def __init__(self, X, Y, E):
        super().__init__()
        self.X = torch.tensor(X)
        self.Y = torch.tensor(Y).unsqueeze(1)
        self.E = torch.nn.functional.one_hot(torch.tensor(E).long())
    
    def __getitem__(self, index):
        return self.X[index], self.Y[index], self.E[index] 
    
    def __len__(self):
        return(len(self.X))

In [None]:
import numpy as np
from scipy.optimize import linear_sum_assignment
import random
import matplotlib.pyplot as plt


def MCC(true_z, predicted_z):
    """Calculates the Correlation Coefficient between all pairs of true 
    and recovered latent variables for one environment 

    Uses Pearsons Corr Coef

    from paper: 
    We also compute the mean correlation coefficient (MCC) used in Khemakhem et al. (2020a), which
    can be obtained by calculating the correlation coefficient between all pairs of true and recovered
    latent factors and then solving a linear sum assignment problem by assigning each recovered latent
    factor to the true latent factor with which it best correlates

    Args:
        true_z (numpy array): 2D dimensional numpy array, where columns represent variables
        predicted_z (numpy array): _description_
    """
    num_true = len(true_z[0])
    num_predicted = len(predicted_z[0])
    corr_matrix = np.corrcoef(true_z, predicted_z, rowvar=False)
    reduced_matrix = corr_matrix[
        0:num_true, num_true : len(corr_matrix[0]) + 1
    ]  # where rows are true and columns are predicted
    row_ind, col_ind = linear_sum_assignment(reduced_matrix)

    mcc = [reduced_matrix[row_ind[i], col_ind[i]] for i in range(len(row_ind))]
    print(mcc)
    mcc = np.sum(mcc) / (num_predicted + num_true)
    return mcc


def plot_MCC(mcc_model, mcc_mean, mcc_var):
    """_summary_

    Args:
        mcc_model (list): names of models that MCC was performed on
        mcc_mean (list): the returned value of the MCC function
        mcc_var (list): the variance that corresponds with the mean values given above
    """
    plt.bar(mcc_model, mcc_mean, yerr=mcc_var)


# test MCC
#random.seed(10)
#true_z = np.random.rand(4, 3)
#test_z = np.random.rand(4, 3)
## print(f"true z: {true_z}")
#print(MCC(true_z, test_z))
