In [1]:
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import seaborn as sns

import torch
from torch import nn, optim
from torch.autograd import Variable
from torch.optim import Optimizer

import torch.nn.functional as F

import collections
import h5py, sys, gzip, os, math

print(torch.__version__)

1.3.1


### Utility Functions

In [2]:
def mkdir(paths):
    if not isinstance(paths, (list, tuple)):
        paths = [paths]
    for path in paths:
        if not os.path.isdir(path):
            os.makedirs(path)

def to_variable(var, cuda=True, volatile=False):
    out = []
    for v in var:
        if isinstance(v, np.ndarray):
            v = torch.from_numpy(v).type(torch.FloatTensor)
        
        if not v.is_cuda and cuda:
            v = v.cuda()
        
        if not isinstance(v, Variable):
            v = Variable(v, volatile=volatile)
        
        out.append(v)
    
    return out

In [3]:
class BaseNet():
    def __init__(self):
        cprint('c', '\nBaseNet:')
    
    def get_nb_parameters(self):
        return np.sum(p.numel() for p in self.model.parameters())
    
    def set_mode_train(self, train=True):
        if train:
            self.model.train()
        else:
            self.model.eval()
            
    def update_lr(self, epoch, gamma=0.99):
        self.epoch += 1
        if self.schedule is not None:
            if len(self.schedule) == 0 or epoch in self.schedule:
                self.lr *= gamma
                print('learning rate: %f (%d)\n' % self.lr, epoch)
                for param_group in self.optimizer.param_groups:
                    param_group['lr'] = self.lr
                    
    def save(self, filename):
        cprint('c', 'Writting %s\n' % filename)
        torch.save({'epoch':self.epoch, 'lr':self.lr,
                    'model':self.model, 'optimizer':self.optimizer}, filename)
        
    def load(self, filename):
        cprint('c', 'Reading %s\n' % filename)
        state_dict = torch.load(filename)
        self.epoch = state_dict['epoch']
        self.lr = state_dict['lr']
        self.model = state_dict['model']
        self.optimizer = state_dict['optimizer']
        print('  restoring epoch: %d, lr: %f' % (self.epoch, self.lr))
        return self.epoch

### Prior classes

In [4]:
class isotropic_gauss_prior(object):
    def __init__(self, mu, sigma):
        self.mu = mu
        self.sigma = sigma
        
        self.cte_term = - 0.5 * np.log(2*np.pi)
        self.det_sig_term = -np.log(self.sigma)
    
    def loglike(self, x, do_sum=True):
        dist_term = -0.5 * ((x - self.mu)/self.sigma)**2
        if do_sum:
            return (self.cte_term + self.det_sig_term + dist_term).sum()
        else:
            return (self.cte_term + self.det_sig_term + dist_term)

### Bayesian Neural Network: Linear Layer

In [5]:

def isotropic_gauss_loglike(x, mu, sigma, do_sum=True):
    cte_term = - 0.5 * np.log(2*np.pi)
    det_sig_term = - torch.log(sigma)
    inner = (x - mu)/sigma
    dist_term = -0.5 * (inner**2)
    
    if do_sum:
        out = (cte_term + det_sig_term + dist_term).sum()
    else:
        out = (cte_term + det_sig_term + dist_term)
    
    return out

class BayesLinear_Normalq(nn.Module):
    def __init__(self, n_in, n_out, prior_class):
        super(BayesLinear_Normalq, self).__init__()
        self.n_in = n_in
        self.n_out = n_out
        self.prior = prior_class
        
        # Learnable parameters
#         self.W_mu = nn.Parameter(torch.Tensor(self.n_in, self.n_out).uniform_(-0.2, 0.2))
#         self.W_p = nn.Parameter(torch.Tensor(self.n_in, self.n_out).uniform_(-3, -2))
        
#         self.b_mu = nn.Parameter(torch.Tensor(self.n_out).uniform_(-0.2, 0.2))
#         self.b_p = nn.Parameter(torch.Tensor(self.n_out).uniform_(-3, -2))
        
        
        self.W_mu = nn.Parameter(torch.Tensor(self.n_in, self.n_out).uniform_(-0.5, 0.5))
        self.W_p = nn.Parameter(torch.Tensor(self.n_in, self.n_out).uniform_(-5, -3))
        
        self.b_mu = nn.Parameter(torch.Tensor(self.n_out).uniform_(-0.5, 0.5))
        self.b_p = nn.Parameter(torch.Tensor(self.n_out).uniform_(-5, -3))

        
        self.lpw = 0
        self.lqw = 0
    
    def forward(self, X, sample=False):
        # lqw: Log Variational Posterior
        # lpw: Log Prior
        if not sample: # a placeholder function
            output = torch.mm(X, self.W_mu) + self.b_mu.expand(X.size()[0], self.n_out)
            return output, torch.tensor([0.0]), torch.tensor([0.0])
        else:
            # local reparameterization trick
            eps_W = Variable(self.W_mu.data.new(self.W_mu.size()).normal_())
            eps_b = Variable(self.b_mu.data.new(self.b_mu.size()).normal_())
            
            std_w = 1e-6 + F.softplus(self.W_p, beta=1, threshold=20)
            std_b = 1e-6 + F.softplus(self.b_p, beta=1, threshold=20)
            
            W = self.W_mu + 1 * std_w * eps_W
            b = self.b_mu + 1 * std_b * eps_b
            
            output = torch.mm(X, W) + b.unsqueeze(0).expand(X.shape[0], -1)
            
            lqw = isotropic_gauss_loglike(W, self.W_mu, std_w) + isotropic_gauss_loglike(b, self.b_mu, std_b)
            lpw = self.prior.loglike(W) + self.prior.loglike(b)
            return output, lqw, lpw
    
    

In [6]:
# Quick weight sampling function for plotting
def sample_weights(W_mu, b_mu, W_p, b_p):
    eps_W = W_mu.data.new(W_mu.size()).normal_()
    # sample parameters
    std_w = 1e-6 + F.softplus(W_p, beta=1, threshold=20)
    W = W_mu + 1 * std_w * eps_W
    if b_mu is not None:
        std_b = 1e-6 + F.softplus(b_p, beta=1, threshold=20)
        eps_b = b_mu.data.new(b_mu.size()).normal_()
        b = b_mu + 1 * std_b * eps_b
    else:
        b = None
    
    return W, b

### Simple Bayesian RNN Cell

In [7]:
# Simple Bayesian RNN Cell
class Bayes_RNN_Cell(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(Bayes_RNN_Cell, self).__init__()
        
        print('Bayes_RNN')
        
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        
        self.prior_instance = isotropic_gauss_prior(mu=0, sigma=0.1)
        
        # To update
        # Add weights directly, get rid of the Bayesian Linear Layers
        self.WxhLayer = BayesLinear_Normalq(input_dim, hidden_dim, self.prior_instance)
        self.WhhLayer = BayesLinear_Normalq(hidden_dim, hidden_dim, self.prior_instance)
        
    def forward(self, x, seq_len, sample=False):
        # tlqw: Total Log Variational Posterior
        # tlpw: Total Log Prior
        # x : input tensor
        tlqw = 0
        tlpw = 0
        # print('forward Bayes_RNN_Cell')
        
        x = x.view(-1, seq_len, self.input_dim)
        
        # Initialize the hidden state
        Hidden = Variable(torch.zeros(x.size(0), self.hidden_dim))
        
        hiddens = []
        
        for i in range(seq_len):
            # print('forward i = ', i)
            # XWxh, lqw, lpw = self.WxhLayer(x.narrow(1, i, 1).view(-1, self.input_dim), sample)
            XWxh, lqw, lpw = self.WxhLayer(x[:,i,:], sample)
            tlqw += lqw
            tlpw += lpw
            HWhh, lqw, lpw = self.WhhLayer(Hidden, sample)
            tlqw += lqw
            tlpw += lpw
            Hidden = torch.tanh(XWxh + HWhh)
            hiddens.append(Hidden)
        
        # The final output layer will be added in the Bayesian RNN model

        hiddens = torch.stack(hiddens, dim=1)
        
        return hiddens, tlqw, tlpw

### Bayesian LSTM Cell

In [8]:
# Bayesian RNN Cell with LSTM Units
class Bayes_LSTM_Cell(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(Bayes_LSTM_Cell, self).__init__()
        
        print('Bayes_LSTM')
        
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        
        self.prior_instance = isotropic_gauss_prior(mu=0, sigma=0.1)
        
        # To update
        # Add weights directly, get rid of the Bayesian Linear Layers
        
        self.Wf = BayesLinear_Normalq(input_dim + hidden_dim, hidden_dim, self.prior_instance)
        self.Wu = BayesLinear_Normalq(input_dim + hidden_dim, hidden_dim, self.prior_instance)
        self.Wc = BayesLinear_Normalq(input_dim + hidden_dim, hidden_dim, self.prior_instance)
        self.Wo = BayesLinear_Normalq(input_dim + hidden_dim, hidden_dim, self.prior_instance)

        
    def forward(self, x, seq_len, sample=False):
        # tlqw: Total Log Variational Posterior
        # tlpw: Total Log Prior
        # x : input tensor
        tlqw = 0
        tlpw = 0
        
        x = x.view(-1, seq_len, self.input_dim)
        
        # Initialize the hidden state, cell state
        Hidden = Variable(torch.zeros(x.size(0), self.hidden_dim))
        c = Variable(torch.zeros(x.size(0), self.hidden_dim))
        
        hiddens = []
        
        for i in range(seq_len):
            xh = torch.cat((x[:, i, :], Hidden), dim=1)
    
            Tf, lqw, lpw = self.Wf(xh, sample)
            tlqw += lqw
            tlpw += lpw

            Tu, lqw, lpw = self.Wu(xh, sample)
            tlqw += lqw
            tlpw += lpw
            
            c_tilde, lqw, lpw = self.Wc(xh, sample)
            tlqw += lqw
            tlpw += lpw
            
            To, lqw, lpw = self.Wo(xh, sample)
            tlqw += lqw
            tlpw += lpw
            
            c = torch.sigmoid(Tf) * c + torch.sigmoid(Tu) * torch.tanh(c_tilde)
            
            Hidden = torch.sigmoid(To) * torch.tanh(c)

            hiddens.append(Hidden)

        
        hiddens = torch.stack(hiddens, dim=1)
        
        return hiddens, tlqw, tlpw


### Multilayer Bayesian RNNs

In [9]:
class Multilayer_Bayes_RNN(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim=1):
        super(Multilayer_Bayes_RNN, self).__init__()
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        
        self.layers = nn.ModuleList()
        
        dims = [input_dim] + hidden_dims + [output_dim]
        for i in range(1, len(dims) - 1):
            self.layers.append(Bayes_RNN_Cell(dims[i-1], dims[i], dims[i+1]))
        
        self.prior_instance = isotropic_gauss_prior(mu=0, sigma=0.1)
        self.WhyLayer = BayesLinear_Normalq(hidden_dims[-1], output_dim, self.prior_instance)
        
    
    def forward(self, x, seq_len, sample=False):
        tlqw, tlpw = 0, 0
        
        # x dimensions: [batch_size, seq_len, input_dim]
        x = x.view(-1, seq_len, self.input_dim)
        
        for i in range(len(self.layers)):
            x, lqw, lpw = self.layers[i](x, seq_len, sample)
            tlqw += lqw
            tlpw += lpw
        
        out, lqw, lpw = self.WhyLayer(x[:,-1,:], sample)
        tlqw += lqw
        tlpw += lpw
        
        return out, tlqw, tlpw
        

### Multilayer Bayesian LSTMs

In [10]:
class Multilayer_Bayes_LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim=1):
        super(Multilayer_Bayes_LSTM, self).__init__()
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        
        self.layers = nn.ModuleList()
        
        dims = [input_dim] + hidden_dims + [output_dim]
        for i in range(1, len(dims) - 1):
            self.layers.append(Bayes_LSTM_Cell(dims[i-1], dims[i], dims[i+1]))
        
        print('Number of Layers in the Bayesian LSTM Model:', len(hidden_dims))
        
        self.prior_instance = isotropic_gauss_prior(mu=0, sigma=0.1)
        self.Wout = BayesLinear_Normalq(hidden_dims[-1], output_dim, self.prior_instance)

    
    def forward(self, x, seq_len, sample=False):
        tlqw, tlpw = 0, 0
        
        # x dimensions: [batch_size, seq_len, input_dim]
        x = x.view(-1, seq_len, self.input_dim)
        
        for i in range(len(self.layers)):
            x, lqw, lpw = self.layers[i](x, seq_len, sample)
            tlqw += lqw
            tlpw += lpw
        
        out, lqw, lpw = self.Wout(x[:,-1,:], sample)
        tlqw += lqw
        tlpw += lpw
        
        return out, tlqw, tlpw
        

In [11]:
# def log_gaussian_loss(output, target, sigma, no_dim):
#     exponent = -0.5*(target - output)**2/sigma**2
#     log_coeff = -no_dim*torch.log(sigma)
    
#     return - (log_coeff + exponent).sum()

### Bayesian RNN (LSTM) Model

In [28]:
# Multilayer Layer Model
class TS_Model():
    def __init__(self, input_dim, hidden_dim, output_dim, lr, batch_size, num_batches, sample=True):

        # Bayesian RNN Model
        # self.net = Multilayer_Bayes_RNN(input_dim, [hidden_dim, hidden_dim], output_dim)
        
        # Bayesian LSTM Model
        self.net = Multilayer_Bayes_LSTM(input_dim, [hidden_dim, hidden_dim], output_dim)

        self.lr = lr
        self.batch_size = batch_size
        self.num_batches = num_batches
        
#         self.optimizer = torch.optim.SGD(self.net.parameters(), lr = self.lr)
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr = self.lr)

        self.sample = sample
        
        # self.loss_func = log_gaussian_loss
    
    def fit(self, x, y, seq_len, num_samples):
#         x, y = to_variable(var=(x,y), cuda=True)
        x, y = to_variable(var=(x,y), cuda=False)
    
        # x dimensions: [batch_size, seq_len, input_dim]
        
        self.optimizer.zero_grad()
        
        loss_mse = 0
        Edkl = 0
        
        if num_samples == 1 or not self.sample:
            num_samples = 1
            out, tlqw, tlpw = self.net(x, seq_len, sample=False)
            Edkl = torch.tensor(0)
            loss_mse = F.mse_loss(out, y, reduction='sum')
        else:
            for i in range(num_samples):
                out, tlqw, tlpw = self.net(x, seq_len, sample=self.sample)

                Edkl += (tlqw - tlpw)/(self.num_batches * seq_len * 4)

                loss_mse += F.mse_loss(out, y, reduction='sum')

        loss_mse /= num_samples
        Edkl /= num_samples
        
        loss = loss_mse + Edkl
        loss.backward()
        self.optimizer.step()
        
        return Edkl.data, loss_mse.data, out.data
    

### More to come

### Reference
https://github.com/JavierAntoran/Bayesian-Neural-Networks