# Variational Autoencoder for Regression 

- Paper https://arxiv.org/abs/1904.05948

- Repository https://github.com/QingyuZhao/VAE-for-Regression


In [11]:
import itertools
import numpy as np 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
from torch.utils.data import DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import mean_squared_error, r2_score

### Load Example Data

In [12]:
# Load Toy Example Data
training_feature = np.loadtxt('data/X.txt')
training_feature.shape

Y = np.loadtxt('data/Y.txt')
ground_truth_r = Y

np.random.seed(seed=0)

original_dim = training_feature.shape[1]
num_train = training_feature.shape[0]

In [13]:
def sampling(args):
    '''
    Arguments:
        args (tensor): mean and log of variance of Q(z|X)
    Returns:
        z (tensor): sampled latent vector
    '''
    mean, log_var = args
    epsilon = torch.randn_like(mean) 
    return mean + torch.exp(0.5*log_var)*epsilon 

### Build VAE Regression Model 

In [14]:
class Encoder(nn.Module):
    def __init__(self, input_shape_x, intermediate_dim, latent_dim):
        super(Encoder, self).__init__()
        self.dropout = nn.Dropout(p=0.25)
        self.fc1 = nn.Linear(input_shape_x, 128)
        self.act1 = nn.Tanh()
        self.fc2 = nn.Linear(128, intermediate_dim)
        self.act2 = nn.Tanh() 
        
        # posterior on Y; probabilistic regressor 
        self.r_mean = nn.Linear(intermediate_dim, 1)
        self.r_logvar = nn.Linear(intermediate_dim, 1) 

        # q(z|x) 
        self.z_mean = nn.Linear(intermediate_dim, latent_dim)
        self.z_logvar = nn.Linear(intermediate_dim, latent_dim)

        # latent generator 
        self.gen_z = weight_norm(nn.Linear(1, latent_dim))


    def forward(self, x):
        x = self.dropout(x)
        x = self.act1(self.fc1(x))
        x = self.act2(self.fc2(x))

        r_mean = self.r_mean(x)
        r_logvar = self.r_logvar(x)

        z_mean = self.z_mean(x)
        z_logvar = self.z_logvar(x)

        # reparameterization trick
        r = sampling(self.r_mean, self.r_logvar)
        z = sampling(self.z_mean, self.z_logvar)

        pz_mean = self.gen_z(r) 

        return r_mean, r_logvar, r, z_mean, z_logvar, z, pz_mean


In [15]:
class Decoder(nn.Module):
    def __init__(self, input_shape_x, intermediate_dim, latent_dim):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, intermediate_dim)
        self.act1 = nn.Tanh()
        self.fc2 = nn.Linear(intermediate_dim, 128)
        self.act2 = nn.Tanh()
        self.fc3 = nn.Linear(128, input_shape_x)

    
    def forward(self, x):
        x = self.act1(self.fc1(x))
        x = self.act2(self.fc2(x))
        x = self.fc3(x)
        return x 

In [16]:
encoder = Encoder(original_dim, 32, 8)
decoder = Decoder(original_dim, 32, 8) 

### Hyperparameters

In [None]:
intermidiate_dim = 32
batch_size = 64
latent_dim = 8
epochs = 100
lr = 0.001
mse = nn.MSELoss()

### Optmizer 

In [None]:
## Train the network
np.random.seed(0)
skf = StratifiedKFold(n_splits=10)
pred = np.zeros((ground_truth_r.shape))
fake = np.zeros((ground_truth_r.shape[0]))
fake[:300] = 1


# Run 10-fold CV
for train_idx, test_idx in skf.split(training_feature, fake):
    training_feature_sk = training_feature[train_idx,:]
    training_score = ground_truth_r[train_idx]
    testing_feature_sk = training_feature[test_idx,:]
    testing_score = ground_truth_r[test_idx]    
    

### Validation

In [None]:
# # Mean squared error
# print("Mean squared error: %.3f" % mean_squared_error(ground_truth_r, pred))
# # Explained variance score: 1 is perfect prediction
# print('R2 Variance score: %.3f' % r2_score(ground_truth_r, pred))

# # Plot Prediction vs. Ground-truth Y
# fig = plt.figure()
# ax = fig.add_subplot(111)

# ax.scatter(ground_truth_r, pred,  color='black')
# plt.xlabel('ground truth')
# plt.ylabel('prediction truth')
# ax.axis('equal');

### Visualize Latent Space


In [None]:
vae.load_weights('random_weights.h5')
vae.fit([training_feature,ground_truth_r],
         epochs=epochs,
         batch_size=batch_size,
         verbose = 0)
 
[z_mean, z_log_var, z, r_mean, r_log_var, r_vae, pz_mean] = encoder.predict([training_feature,ground_truth_r],batch_size=batch_size)

tsne = MDS(n_components=2, random_state=0)
X_2d = tsne.fit_transform(z_mean)

#%matplotlib notebook
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(X_2d[:, 0], X_2d[:, 1], c=ground_truth_r)
plt.title('TSNE visualization of latent space')
ax.axis('equal')