In [1]:
import pints
import pints.toy as toy

import emupints
import emupints.plot as emuplt
import emupints.utils as emutils

import matplotlib.pyplot as plt
import numpy as np

import tensorflow as tf
import edward as ed
from edward.models import Normal

ed.set_seed(42)



Instructions for updating:
Use the retry module or similar alternatives.


## Formulating Problem

In [2]:
# Create a model
model = pints.toy.LogisticModel()

n_parameters = model.n_parameters()
n_outputs = model.n_outputs()
real_parameters = model.suggested_parameters()

values, times, noise = emutils.simulate(model,
                                        parameters = real_parameters)

# Create an object with links to the model and time series
problem = pints.SingleOutputProblem(model, times, values)

# Create a log-likelihood function (adds an extra parameter!)
real_log_likelihood = pints.KnownNoiseLogLikelihood(problem, noise)

print("Number of parameters: ", n_parameters)
print("Parameter values: ", real_parameters)
print("Example problem values:\n", problem.values()[:5])

Number of parameters:  2
Parameter values:  [ 0.1 50. ]
Example problem values:
 [3.19076239 2.09932542 4.50134443 7.21970643 3.74557539]


## Creating and normalizing training data

In [3]:
# creating distribution on bounds
param_range = 0.5
real_params_lower = (1 - param_range) * real_parameters
real_params_upper = (1 + param_range) * real_parameters

bounds = pints.Boundaries(lower = real_params_lower, upper = real_params_upper)
log_prior = pints.UniformLogPrior(bounds)

In [4]:
# make training sample
train_size = 500
train_input = log_prior.sample(train_size)
train_target = np.apply_along_axis(real_log_likelihood, 1, train_input)

In [5]:
train_input.min(axis=0)

array([ 0.05050616, 25.23160115])

In [6]:
# normalize data between -0.5 and 0.5
def normalize(x):
    x_min = x.min(axis=0)
    x_max = x.max(axis=0)
    return (x - x_min) / (x_max - x_min) - 0.5

def denormalize(x_norm, x_min, x_max):
    return (x_norm + 0.5) * (x_max - x_min) + x_min
    
train_input_normalized = normalize(train_input)
train_target_normalized = normalize(train_target)

# define functions for future use
train_input_min = train_input.min(axis=0)
train_input_max = train_input.max(axis=0)

train_target_min = train_target.min(axis=0)
train_target_max = train_target.max(axis=0)

def normalize_input(x):
    return (x - train_input_min) / (train_input_max - train_input_min) - 0.5

def denormalize_prediction(x_norm):
    return denormalize(x_norm, train_target_min, train_target_max)

# Bayesian NN

In [7]:
# network parameters
bnn_shape = [n_parameters, 50, 10, 1]
activations = tf.nn.relu

# variational inference parameters
n_iter=5000
n_samples=5

In [8]:
class BNNEmulator(emupints.Emulator):
    
    def __init__(self, log_likelihood, X, y, **kwargs):
        super(BNNEmulator, self).__init__(log_likelihood, X, y, **kwargs)
        
        self._x = tf.placeholder(tf.float32, (None, self.n_parameters()))
        self._sess = ed.get_session()
                
    def __call__(self, x):
        # convert to np array
        if type(x) != np.ndarray:
            x = np.asarray(x)
        n_params = self.n_parameters()
        x = x.reshape((x.size // n_params, n_params))

        feed_dict = {self._x : x}        
        # samples weights and biases from approximated posterior
        qweights_sample = [
            qW.sample() for qW in self._q_weights
        ]
        qbiases_sample = [
            qb.sample() for qb in self._q_biases
        ]
        
        nn = self._make_nn(qweights_sample,
                            qbiases_sample, self.activations
                           )
        
        return nn.eval(feed_dict=feed_dict)
        

    def set_parameters(self, shape, activations=tf.tanh):
        # if only one activation function provided make list
        if not hasattr(activations, '__len__'):
            activations = [activations] * len(shape)
            activations[-1] = tf.identity
        self.activations = activations
            
        self.nn_shape = shape

        self._set_priors(shape)
        self._set_q_priors(shape)
        
        self._z = Normal(
            loc=self._make_nn(self._weights, self._biases, activations),
            scale=0.1 * tf.ones(tf.shape(self._x)[0])
        )
        
        
        
    # performs variational inference
    def fit(self, n_iter=1000, n_samples=5):
        # feed training data to the session 
        #self._sess.run()
        
        # connect corresponding variables with variational variables
        qp_dict = dict(
                zip([*self._weights, *self._biases],
                    [*self._q_weights, *self._q_biases])
        )
        # target data
        data = {
            self._z : self._y.reshape(-1),
            self._x : self._X
        }
        
        inference = ed.KLqp(qp_dict, data=data)
        inference.run(n_iter=n_iter, n_samples=n_samples, logdir='log')
        
        
    # Create variable with proper normal prior
    def _set_priors(self, shape):
        self._weights = []
        self._biases = []
        # currently distrubitued as N(0, I)
        for i in range(len(shape)-1):
            W_i = Normal(loc=tf.zeros([shape[i], shape[i+1]]), 
                         scale=tf.ones([shape[i], shape[i+1]]))
            b_i = Normal(loc=tf.zeros(shape[i+1]), 
                         scale=tf.ones(shape[i+1]))

            self._weights.append(W_i)
            self._biases.append(b_i)
    
    # sets variational priors
    def _set_q_priors(self, shape):
        self._q_weights = []
        self._q_biases = []
        # use get_var to allow parameters to vary
        # initialized randomly
        # softplus ensures positive variance
        for i in range(len(shape)-1):
            # creates/get variable for
            qW_i_loc = tf.get_variable(f"qW_{i}/loc", [shape[i], shape[i+1]])
            qW_i_scale = tf.get_variable(f"qW_{i}/scale", [shape[i], shape[i+1]])
            qW_i = Normal(loc=qW_i_loc,
                          scale=tf.nn.softplus(qW_i_scale))

            qb_i_loc = tf.get_variable(f"qb_{i}/loc", [shape[i+1]])
            qb_i_scale = tf.get_variable(f"qb_{i}/scale", [shape[i+1]])
            qb_i = Normal(loc=qb_i_loc, 
                         scale=tf.nn.softplus(qb_i_scale))

            self._q_weights.append(qW_i)
            self._q_biases.append(qb_i)

    def _make_nn(self, weights, biases, activations):
        W_0, b_0, a_0 = weights[0], biases[0], activations[0]
        W_last, b_last, a_last = weights[-1], biases[-1], activations[-1]

        h = a_0(tf.matmul(self._x, W_0) + b_0)
        for W_i, b_i, a_i in zip(weights[1:-1], biases[1:-1], activations[1:-1]):
            h = a_i(tf.matmul(h, W_i) + b_i)
        # usually a_last is identity
        h = a_last(tf.matmul(h, W_last) + b_last)

        return tf.reshape(h, [-1])
    
    def __str__():
        pass


In [9]:
# create emulator for this instance
emu_nn = BNNEmulator(real_log_likelihood, train_input_normalized, train_target_normalized)

In [10]:
# specify shape of NN
emu_nn.set_parameters(bnn_shape, activations=activations)

In [11]:
emu_nn.fit(n_iter=n_iter, n_samples=n_samples)



5000/5000 [100%] ██████████████████████████████ Elapsed: 16s | Loss: 522.465


In [12]:
emu_nn_normalized = lambda x: denormalize_prediction(emu_nn(x))

# Performance test

In [13]:
def mean_error(y_pred, y_true):
    return np.mean(np.abs(y_true - y_pred))

In [14]:
# look at mean error
test_size = 200
test_input = log_prior.sample(test_size)
test_input_norm = normalize(test_input)

test_true = np.apply_along_axis(real_log_likelihood, 1, test_input)
test_pred = emu_nn_normalized(test_input_norm)

print(mean_error(test_pred, test_true))

419.80252524211204
