In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.distributions as dist
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
import numpy as np
import sys

In [2]:
class VAEarch(nn.Module):
    def __init__(self, feature_size, latent_size):
        super(VAEarch, self).__init__()
        self.feature_size = feature_size

        # Encoder
        self.fc1  = nn.Linear(feature_size, 128)
        self.fc21 = nn.Linear(128, latent_size)
        self.fc22 = nn.Linear(128, latent_size)

        # Decoder
        self.fc3 = nn.Linear(latent_size, 128)
        self.fc4 = nn.Linear(128, feature_size)

        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def encode(self, x):
        h1 = self.relu(self.fc1(x))
        z_mu = self.fc21(h1)
        z_var = self.fc22(h1)
        return z_mu, z_var

    def reparametrize(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = Variable(std.data.new(std.size()).normal_())
            return eps.mul(std) + mu
        else:
            return mu

    def decode(self, z): 
        h3 = self.relu(self.fc3(z))
        return self.sigmoid(self.fc4(h3))

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        return self.decode(z), mu, logvar
    
    

In [11]:
class VAE():
    def __init__(self, A, B, model_path=None):
        
        # Set up model
        self.model = VAEarch(A, B)
        if model_path:
            
            self.load_model(model_path)
        self.model = self.model
        self.optimizer = optim.Adam(self.model.parameters(), lr=1e-6)
        
    def test_sample(self, ds2):
        self.model.eval()
        train_loss = 0
        l = []
        for i,j in enumerate(ds2):
            data = Variable(torch.from_numpy(j))
            recon_batch, mu, logvar = self.model(data.float())
            loss = self.loss_function(recon_batch, data.float(), mu, logvar)
            train_loss += loss.data
            l.append(loss.data)
        mean = torch.mean(torch.stack(l))
        return loss.data, recon_batch, data.float(), mu, logvar
        
        
    def train(self):
        self.model.train()
        train_loss = 0
        l = []
        for i,j in enumerate(ds1):
            data = Variable(torch.from_numpy(ds1))
            recon_batch, mu, logvar = self.model(data.float())
            self.optimizer.zero_grad()
            loss = self.loss_function(recon_batch, data.float(), mu, logvar)
            likelihood = self.log_likelihood(recon_batch, data.float(), mu, logvar)
            loss.backward()
            l.append(loss.data)
            train_loss += loss.data
            self.optimizer.step()
        mean = torch.mean(torch.stack(l))
        return loss.data, likelihood.data

    def test(self, ds2):
        self.model.eval()
        train_loss = 0
        l = []
        for i,j in enumerate(ds2):
            data = Variable(torch.from_numpy(j))
            recon_batch, mu, logvar = self.model(data.float())
            loss = self.loss_function(recon_batch, data.float(), mu, logvar)
            train_loss += loss.data
            l.append(loss.data)
        mean = torch.mean(torch.stack(l))
        return loss.data, recon_batch, data.float(), mu, logvar

    def loss_function(self, recon_x, x, mu, logvar):    
        BCE = F.binary_cross_entropy(recon_x, x)
        KLD = 0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        return -1*(BCE + KLD)

    def log_likelihood(self, recon_x, x, mu, logvar):
        BCE = F.binary_cross_entropy(recon_x, x)
        return -1*BCE

    def load_model(self, model_path):
        self.model.load_state_dict(torch.load(model_path))

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)


In [12]:
ds2 = np.loadtxt("book.ts.data", delimiter=",", dtype=int)
ds1 = np.loadtxt("book.valid.data", delimiter=",", dtype=int)

latent_size = 2
vae = VAE(ds1.shape[1], latent_size)


In [13]:
for epoch in range(1, 6):
    vae.train()
    vae.save_model('tae.pt')
    

In [14]:
vae = VAE(ds1.shape[1], latent_size, model_path='tae.pt')

In [15]:
elbo_valid, recon_xv, xv, muv, logvarv = vae.test(ds1)
print('elbo_valid is' ,elbo_valid)

elbo_test, recon_xt, xt, mut, logvart = vae.test(ds2)
print('elbo_test is', elbo_test)

loglike_valid = vae.log_likelihood(recon_xv, xv, muv, logvarv)
print('loglikelihood value is ',loglike_valid.detach())

loglike_test = vae.log_likelihood(recon_xt, xt, mut, logvart)
print('loglikelihood value is ',loglike_test.detach())

elbo_valid is tensor(-0.7775)
elbo_test is tensor(-0.7784)
loglikelihood value is  tensor(-0.7782)
loglikelihood value is  tensor(-0.7784)


In [119]:
if len(sys.argv) != 5:
        print("Usage:python vae_python.py <k> <modelfile-path> <data-set> <samples>")
else:
    modelfile, dataset, samples = sys.argv[2], sys.argv[3], sys.argv[4]

    ds1 = np.loadtxt(dataset, delimiter=",", dtype=int)
    latent_size = 2
    vae = VAE(ds1.shape[1], latent_size, model_path=modelfile)

    iter_ = 0
    while iter_<=samples:
        elbo_valid, recon_xv, xv, muv, logvarv = vae.test_sample(ds1[iter_])
        print('elbo_valid is' ,elbo_valid)

        loglike_valid = vae.log_likelihood(recon_xv, xv, muv, logvarv)
        print('loglikelihood value is ',loglike_valid.detach())
        iter_ = iter_+1

Usage:python vae_python.py <k> <modelfile-path> <data-set> <samples>
