# VAE using Lars

This code is provided only to show our implementation of the algorithm described in the article "Resampled Priors for Variational Autoencoders". To evaluate our work, please look at the other .ipynb script that loads and infere our most performant model

In [12]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Load the CSV file into a DataFrame
file_path = './data_train_log_return.csv'
data_train_log_return = pd.read_csv(file_path)

Unnamed: 0,0,0.01249535315117,0.0111256706670408,0.0032520459252687,0.0066249108779032
0,1,0.011439,0.002691,0.001206,0.006947
1,2,0.000632,0.007277,0.004049,7.4e-05
2,3,0.017828,0.02821,0.007758,0.007382
3,4,0.021115,0.019642,0.009238,0.011499
4,5,0.001177,0.002096,0.001348,0.004966


In [13]:
file_path = 'data_train_log_return.csv'
data = pd.read_csv(file_path, header=None)
data = data.drop(data.columns[0], axis=1)

ori_data = pd.DataFrame.to_numpy(data)

num_samples = 746
data_dim = 4
sigma = 0.1

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributions as tdist

original_dim = ori_data.shape[1]   # minus 1 to exclude the index column
latent_dim = 2  # Dimension of the latent space

class VAE(nn.Module):
    def __init__(self, original_dim, latent_dim):
        super(VAE, self).__init__()
        self.original_dim = original_dim
        self.latent_dim = latent_dim

        # Encoder layers
        self.encoder = nn.Sequential(
            nn.Linear(original_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, latent_dim * 2)  # Outputting both mean and log-variance
        )

        # Decoder layers
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, original_dim)
        )

        # Lars layers
        self.lars = nn.Sequential(
            nn.Linear(latent_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()  # Outputting the probability of acceptance
        )

    def reparameterize(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + std * eps

    def forward(self, x):
        # Encoder forward pass
        enc_output = self.encoder(x)
        z_mean, z_log_var = torch.split(enc_output, self.latent_dim, dim=-1)
        z = self.reparameterize(z_mean, z_log_var)

        # Decoder forward pass
        decoded = self.decoder(z)

        return decoded, z, z_mean, torch.abs(z_log_var) + 0.01

    def lars_forward(self, z):
        return self.lars(z)

# Instantiate the VAE model
vae = VAE(original_dim, latent_dim)

def simulate_norm_Z(model, encoder_input, z_r, z_s, Z_moving_average, eps=0.1):
    S = z_s.shape[0]
    R = z_r.shape[0]
    Z_s = torch.sum(model.lars_forward(z_s)) / S
    Z_r_curr = torch.zeros((R))
    Z_r_smooth = torch.zeros((R))
    Z_r = torch.zeros((R))
    

    for i in range(R):
        x_r = encoder_input[i]
        pi = torch.distributions.MultivariateNormal(torch.tensor([0., 0.]), torch.diag(torch.tensor([1., 1.])))
        with torch.no_grad():
            q = torch.distributions.MultivariateNormal(model.forward(x_r)[2], torch.diag(model.forward(x_r)[3]))
        Z_r_curr[i] = (S * Z_s + torch.autograd.Variable(pi.log_prob(z_r[i]) / q.log_prob(z_r[i]) * model.lars_forward(z_r[i]))) / (S + 1)
        Z_r_smooth[i] = eps * Z_r_curr[i] + (1 - eps) * Z_moving_average
        Z_r[i] = Z_r_curr[i] + torch.autograd.Variable(Z_r_smooth[i] - Z_r_curr[i])

    Z_new_moving_average = torch.sum(Z_r_smooth.detach()) / R
    log_Z = torch.sum(torch.log(Z_r)) / R

    return Z_new_moving_average, log_Z

# Loss function
def loss_function(recon_x, x, z_mean, z_log_var, a_r, log_Z):
    recon_loss = F.mse_loss(recon_x, x, reduction='sum') / (original_dim)
    kl_loss = -0.5 * torch.sum(1 + z_log_var - z_mean.pow(2) - z_log_var.exp() - a_r) - log_Z
    return recon_loss + kl_loss 

optimizer = torch.optim.Adam(vae.parameters())

# Training loop example
def train_model(model, data, epochs=50, batch_size=32):
    optimizer = torch.optim.Adam(model.parameters())
    criterion = nn.MSELoss()

    data = torch.Tensor(data)
    dataset = torch.utils.data.TensorDataset(data)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    Z_moving_average = 1

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in dataloader:
            x_r = batch[0]
            z_r = model.forward(x_r)[1]
            z_sample = np.random.normal(size=(x_r.shape[0], latent_dim))
            z_s = torch.Tensor(z_sample)
            Z_new_moving_average, log_Z = simulate_norm_Z(model, x_r, z_r, z_s, Z_moving_average, eps=0.1)
            optimizer.zero_grad()
            recon_batch, z, z_mean, z_log_var = model(batch[0])
            a_r = model.lars_forward(z_r)
            loss = loss_function(recon_batch, batch[0], z_mean, z_log_var, a_r, log_Z)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(data)}")

    return log_Z


In [16]:
log_Z = train_model(vae, ori_data, epochs=5, batch_size=32)
Z = torch.exp(log_Z)

Epoch 1/5, Loss: nan
Epoch 2/5, Loss: 0.2542198743002025
Epoch 3/5, Loss: 0.0643997028190393
Epoch 4/5, Loss: 0.015025316670177449
Epoch 5/5, Loss: 0.007481997458129402


In [17]:
samples = []
pi = torch.distributions.MultivariateNormal(torch.tensor([0., 0.]), torch.diag(torch.tensor([1., 1.])))

while len(samples)<2:
    z_sample = np.random.normal(size=(1, latent_dim))
    acceptance_prob = vae.lars_forward(torch.tensor(z_sample, dtype=torch.float32)) / (torch.exp(pi.log_prob(torch.tensor(z_sample, dtype=torch.float32))) + 1e-10)
    uniform_sample = torch.rand(1)  # Sample from a uniform distribution
    if uniform_sample < acceptance_prob:
        samples.append(z_sample)

new_samples = np.array(samples)
samples_bis = new_samples
samples_bis_tensor = torch.tensor(samples_bis)
z_sample_tensor = torch.tensor(samples_bis_tensor, dtype=torch.float)

  z_sample_tensor = torch.tensor(samples_bis_tensor, dtype=torch.float)
