In [None]:
%load_ext autoreload
%autoreload 2


import os
import sys
import logging

module_path = os.path.abspath(os.path.join("../.."))
if module_path not in sys.path:
    sys.path.append(module_path)

In [None]:
from pvi.models.logistic_regression import LogisticRegressionModel
from pvi.utils.gaussian import mvstandard2natural, mvnatural2standard

import torch
import numpy as np
import matplotlib.pyplot as plt
import tqdm.auto as tqdm

from torch import nn

%matplotlib inline
torch.set_default_dtype(torch.float64)

# Set up data and helper functions

In [None]:
x = torch.tensor([[2, 2], [1, 1], [0, 1], [1, 0], [-0.5, 0.1], 
                  [-1, -1], [-2, -2], [0, -1], [-1, 0], [0.5, 0.1]])
y = torch.tensor([1, 1, 1, 1, 1, 0, 0, 0, 0, 0], dtype=torch.float)

In [None]:
def plot_data(x, y):
    x_vals = x[:, 0]
    y_vals = x[:, 1]
    labels = y
    
    plt.figure()
    plt.grid(b=True)
    plt.scatter(x_vals, y_vals, c=labels)
    plt.show()

def plot_results(x, y, model, q):
    x_vals = x[:, 0]
    y_vals = x[:, 1]
    labels = y
    
    q_np1 = q["nat_params"]["np1"]
    q_np2 = q["nat_params"]["np2"]
    w_map = (-2 * q_np2).inverse().matmul(q_np1).detach()
    
    plt.figure()
    plt.grid(b=True)
    plt.scatter(x_vals, y_vals, c=labels)
    plt.arrow(0, 0, w_map[0], w_map[1], head_width=0.1)
    plt.plot([-2, 2], [(w_map[0]/w_map[1])*2, (w_map[0]/w_map[1])*-2])
    plt.show()
    
def plot_training(training_array):
    x_vals = np.arange(1, len(training_array)+1)
    plt.figure()
    plt.grid(b=True)
    plt.plot(x_vals, training_array)
    plt.ylabel('ELBO Loss')
    plt.xlabel('Step')
    plt.show()
     
data = {
    "x": x,
    "y": y,
}

In [None]:
plot_data(x, y)

# Construct logistic regression model

In [None]:
hyperparameters = {
    "D": 2,
    "optimiser_params": {"lr": 1e-2},
    "epochs": 500,
    "batch_size": 10,
    "num_elbo_samples": 100,
    "num_predictive_samples": 1
}

model = LogisticRegressionModel(hyperparameters=hyperparameters)

q = {
    "nat_params": {
        "np1": torch.tensor([0.0, 0.0, 0.0]),
        "np2": torch.tensor([-0.5, -0.5, -0.5]).diag_embed()
    }
}

q_mu, q_cov = mvnatural2standard(q["nat_params"]["np1"], q["nat_params"]["np2"])
q_dist = torch.distributions.MultivariateNormal(q_mu, covariance_matrix=q_cov)
q["distribution"] = q_dist

# Fit data

In [None]:
t = {
    "nat_params": {
        "np1": torch.tensor([0., 0., 0.]),
        "np2": torch.tensor([0., 0., 0.]).diag_embed()
    }
}

In [None]:
def fit(model, data, q, t_i):
    
    # Parameters to be optimised: work with Cholesky factor and mean.
    prec = -2. * q["nat_params"]["np2"]
    prec_chol = torch.cholesky(prec)
    cov = torch.cholesky_inverse(prec_chol)
    chol = torch.cholesky(cov)
    mean = cov.matmul(q["nat_params"]["np1"])
    
    q_params = {
        "scale_tril": nn.Parameter(chol, requires_grad=True),
        "loc": nn.Parameter(mean, requires_grad=True)
    }
    
    
    # Set up optimiser.
    if model.hyperparameters["optimiser_class"] is not None:
        optimiser = model.hyperparameters["optimiser_class"](
            list(q_params.values()) + list(model.parameters()),
            **model.hyperparameters["optimiser_params"]
        )
    else:
        optimiser = optim.Adam(
            list(q_params.values()) + list(model.parameters()),
            **model.hyperparameters["optimiser_params"]
        )
        
    # Local optimisation to find new parameters.
    training_curves = {
        "elbo": [],
        "ll": [],
        "kl": []
    }
    
    # Compute current cavity distribution.
    qcav = {
        "nat_params": {
            "np1": q["nat_params"]["np1"] - t_i["nat_params"]["np1"],
            "np2": q["nat_params"]["np2"] - t_i["nat_params"]["np2"],
        }
    }
    
    qcav_mu, qcav_cov = mvnatural2standard(qcav["nat_params"]["np1"], qcav["nat_params"]["np2"])
    qcav_dist = torch.distributions.MultivariateNormal(qcav_mu, qcav_cov)
    qcav["distribution"] = qcav_dist
    
    epoch_iter = tqdm.tqdm(range(model.hyperparameters["epochs"]), desc="Epochs")
    for i in epoch_iter:
        # Expected log-likelihood.
        q_dist = torch.distributions.MultivariateNormal(**q_params)
        thetas = q_dist.rsample((model.hyperparameters["num_elbo_samples"],))
        ll = model.likelihood_log_prob(data, thetas).mean(0).sum()
        
        # Compute the KL divergence between current approximate posterior and prior.
        kl = torch.distributions.kl_divergence(q_dist, qcav["distribution"])
        
        elbo = ll - kl
        loss = -elbo
        
        # Backwards step.
        loss.backward()
        optimiser.step()
        optimiser.zero_grad()
        
        training_curves["elbo"].append(elbo.item())
        training_curves["ll"].append(ll.item())
        training_curves["kl"].append(kl.item())
        
    # Convert back to natural parameter form, and get local factor.
    q_chol = q_params["scale_tril"].detach()
    q_mu = q_params["loc"].detach()
    q_cov = q_chol.matmul(q_chol.T)
    q_np1, q_np2 = mvstandard2natural(q_mu, q_cov)
    
    q_new = {
        "nat_params": {
            "np1": q_np1,
            "np2": q_np2,
        }
    }
    
    t_i_new = {
        "nat_params": {
            "np1": q_new["nat_params"]["np1"] - qcav["nat_params"]["np1"],
            "np2": q_new["nat_params"]["np2"] - qcav["nat_params"]["np2"],
        }
    }
        
    return q_new, t_i_new, training_curves

In [None]:
q_new, t_new, training_curves = fit(model, data, q, t)

In [None]:
plot_training(training_curves["elbo"])

In [None]:
plot_training(training_curves["ll"])

In [None]:
plot_training(training_curves["kl"])

In [None]:
plot_results(x, y, model, q_new)