In [1]:
import random
import torch as t
import torch.nn as nn

import numpy as np

In [2]:
# Example 
PRIOR_MEAN = 0.9
PRIOR_VAR = 1.0
GROUND_SIGMA = 5.5

In [3]:
# Analytical solution for lognormal
def log_norm(x, mu, std):    
    var = std**2
    norm_constant = -0.5 * t.log(2*np.pi*var)
    sqerror = (x - mu)**2
    prec = 1/var
    
    return norm_constant - (0.5 * prec * sqerror)


class Elbo(nn.Module):
    def __init__(self, n=100):
        super(Elbo, self).__init__()
        
        self.batch_size = n # latent samples per step
        self.softplus = nn.Softplus()
        
        # adaptive variational params
        self.q_mean = nn.Parameter(t.randn(1,1), requires_grad=True)
        self.q_sigma = nn.Parameter(t.randn(1,1), requires_grad=True)
        self.prior_m = nn.Parameter(t.randn(1,1), requires_grad=False)
        self.prior_s = nn.Parameter(t.randn(1,1), requires_grad=False)
        self.likelihood_s = nn.Parameter(t.FloatTensor((1)), requires_grad=False)
        
        #Set the prior and likelihood moments.
        self.prior_s.data.fill_(PRIOR_VAR)
        self.prior_m.data.fill_(PRIOR_MEAN)
        self.likelihood_s.data.fill_(GROUND_SIGMA)
     
        
    def generate_rand(self):
        return np.random.normal(size=(self.batch_size,1))
    
    
    def get_mean(self) :
        return self.q_mean.data.numpy()
    
    
    def get_var(self) :
        torch_var = self.softplus(self.q_sigma).data**2
        return torch_var.numpy()
    
    
    def reparam(self, eps):
        eps = nn.Parameter(t.FloatTensor(eps))
        
        return eps.mul(self.softplus(self.q_sigma)) \
                .add(self.q_mean)
    
    
    def log_prob(self, y, x) :
        return log_norm(y, x, self.likelihood_s)
    
    
    def compute_elbo(self, x, y):
        eps = self.generate_rand()
        z = self.reparam(eps)
        
        q_likelihood = t.mean(log_norm(z, self.q_mean, self.softplus(self.q_sigma)))
        prior = t.mean(log_norm(z, self.prior_m, self.prior_s))
        
        xzt = x * z.transpose(0,1)
        sum_log_prob = t.sum(self.log_prob(y, xzt), 0)
        likelihood = t.mean(sum_log_prob)
        
        kl_div_mc = q_likelihood - prior
        loss = likelihood - kl_div_mc
        
        return loss

# Gen example data

In [4]:
N = 200
w = 3.2

X = np.random.uniform(low=-50, high=50, size=(N, 1))
Y = w *X + np.random.normal(size=(N, 1), scale=GROUND_SIGMA)

# Optimise it

In [5]:
EPOCHS = 3000

def run(X, Y, ep=5000, eta=0.2) :
    q = Elbo()
    optimiser = t.optim.Adam(q.parameters(), lr=eta)
    x = Variable(t.Tensor(X), requires_grad=False) 
    y = Variable(t.Tensor(Y), requires_grad=False)

    optimise(q, x, y, optimiser, ep)
    
    return q


def optimise(q, x, y, optimiser, ep, verbose=False) :
    for i in range(ep):
        loss = - q.compute_elbo(x, y)
        optimiser.zero_grad()
        loss.backward(retain_graph=True)
        optimiser.step()

        if verbose :
            if i % 500 == 0:
                print(q.get_mean(), q.get_var())



q = run(X, Y, ep=EPOCHS)

"""
means = []

for i in range(10) :
    q = run(X, Y, ep=EPOCHS)
    means.append(q.get_mean())
"""

NameError: name 'Variable' is not defined

# Eval

In [None]:
def analytical_posterior_var(var, X) :
    scaled_prec = (1/var**2) * X.T @ X +1
    
    return scaled_prec**-1


def analytical_posterior_mean(prior_mean, var, X, Y) :
    scaled_cov = (1/var**2) * X.T @ Y
    post_var = analytical_posterior_var(var, X)
    
    return post_var * (prior_mean + scaled_cov)



TRUE_POST_MEAN = analytical_posterior_mean(GROUND_PRIOR_MEAN, GROUND_PRIOR_VAR, X, Y)
TRUE_POST_VAR = analytical_posterior_var(GROUND_PRIOR_VAR, X)

q.get_mean() - TRUE_POST_MEAN, \
q.get_var() - TRUE_POST_VAR


#np.mean(means) - TRUE_POST_MEAN 