In [None]:
import numpy as np
import torch
print(torch.__version__)
import pyro
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from copy import copy
from torch.distributions import constraints
import arviz as az

import pyro.contrib.gp as gp
import pyro.distributions as dist
from pyro.infer import NUTS
from pyro.infer import MCMC

from matplotlib.animation import FuncAnimation
from mpl_toolkits.axes_grid1 import make_axes_locatable

import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


pyro.set_rng_seed(0)

# B1

#### Plot helper functions

In [None]:
def plot(
    plot_observed_data=False,
    plot_predictions=False,
    n_prior_samples=0,
    model=None,
    kernel=None,
    n_test=500,
    ax=None,
    big_plot=False
):

    if ax is None:
        fig, ax = plt.subplots(figsize=(12, 6))
    if plot_observed_data:
        ax.plot(X.numpy(), f(X).numpy(), "kx")
    if plot_predictions:
        Xtest = torch.linspace(-1, 1, n_test)  # test inputs
        # compute predictive mean and variance
        with torch.no_grad():
            if type(model) == gp.models.VariationalSparseGP:
                mean, cov = model(Xtest, full_cov=True)
            else:
                mean, cov = model(Xtest, full_cov=True, noiseless=False)
        sd = cov.diag().sqrt()  # standard deviation at each input point x
        ax.plot(Xtest.numpy(), mean.numpy(), "r", lw=2)  # plot the mean
        ax.fill_between(
            Xtest.numpy(),  # plot the two-sigma uncertainty about the mean
            (mean - 2.0 * sd).numpy(),
            (mean + 2.0 * sd).numpy(),
            color="C0",
            alpha=0.3,
        )
    if n_prior_samples > 0:  # plot samples from the GP prior
        Xtest = torch.linspace(-1, 1, n_test)  # test inputs
        noise = (
            model.noise
            if type(model) != gp.models.VariationalSparseGP
            else model.likelihood.variance
        )
        cov = kernel.forward(Xtest) + noise.expand(n_test).diag()
        samples = dist.MultivariateNormal(
            torch.zeros(n_test), covariance_matrix=cov
        ).sample(sample_shape=(n_prior_samples,))
        ax.plot(Xtest.numpy(), samples.numpy().T, lw=2, alpha=0.4)
    if big_plot == True:
        x_seq = np.arange(start=-1,stop=1,step=0.005)
        legend_elements = [Line2D([0], [0], color='r', lw=4, label='Predicted f(x)'),
                   Line2D([0], [0], marker='s', color='w', label='Confidence area',
                            markerfacecolor='cornflowerblue', markersize=15),
                   
                   Line2D([0], [0], marker='x', color='w', label='Observed points',
                            markeredgecolor='0')]
        ax.legend(handles=legend_elements, loc='lower right')

    ax.set_xlim(-1, 1)

#### Define data

In [None]:
def f(x):
    return np.sin(20*x)+2*np.cos(14*x)-2*np.sin(6*x)

In [None]:
N = 5
X = torch.tensor([-1,-0.5,0,0.5,1])
y = f(X)
# Let's plot the observed data
plot(plot_observed_data=True)

#### Diagnostics-induced choosing of hyperparameters

In [None]:
params = []
n_sample = np.array([50,100,200])
n_chain = np.array([2,4,6])
n_warm = np.array([50,100,200])
for i in range(len(n_sample)):
    for j in range(len(n_chain)):
        for k in range(len(n_warm)):
            pyro.clear_param_store()
            kernel = gp.kernels.RBF(input_dim=3)
            #kernel.lengthscale = pyro.nn.PyroSample(dist.Beta(torch.tensor(1.5), torch.tensor(3.0)))
            #kernel.variance = pyro.nn.PyroSample(dist.Beta(torch.tensor(1.5), torch.tensor(3.0)))
            kernel.lengthscale = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(-1.0), torch.tensor(1.0)))
            kernel.variance = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(0.), torch.tensor(2.0)))
            gpr = gp.models.GPRegression(X, y, kernel, noise=torch.tensor(0.0001)) 
            hmc_kernel = NUTS(gpr.model)
            mcmc = MCMC(hmc_kernel, num_samples=n_sample[i], num_chains=n_chain[j], warmup_steps=n_warm[k])
            mcmc.run()
            posterior_samples = mcmc.get_samples(num_samples=500)
            lengthscales = posterior_samples['kernel.lengthscale']
            variances = posterior_samples['kernel.variance']
            kernel = gp.kernels.RBF(input_dim=3, lengthscale=lengthscales.mean(), variance=variances.mean())
            gpr= gp.models.GPRegression(X, y, kernel)
            data = az.from_pyro(mcmc)
            summary = az.summary(data)
            params.append((f"num_samples={n_sample[i]}, num_chains={n_chain[j]}, warmup_steps={n_warm[k]}",summary[['ess_bulk','ess_tail','r_hat']]))

#### Define model

In [None]:
pyro.clear_param_store()
kernel = gp.kernels.RBF(input_dim=3)
kernel.lengthscale = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(-1.0), torch.tensor(1.0)))
kernel.variance = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(0.), torch.tensor(2.0)))
gpr = gp.models.GPRegression(X, y, kernel, noise=torch.tensor(0.0001)) 

#### NUTS Sampling

In [None]:
# MCMC
hmc_kernel = NUTS(gpr.model)
mcmc = MCMC(hmc_kernel, num_samples=200, num_chains=6, warmup_steps=200)
mcmc.run()

In [None]:
posterior_samples = mcmc.get_samples(num_samples=500)
lengthscales = posterior_samples['kernel.lengthscale']
variances = posterior_samples['kernel.variance']

kernel = gp.kernels.RBF(input_dim=3, lengthscale=lengthscales.mean(), variance=variances.mean())
gpr= gp.models.GPRegression(X, y, kernel)

##### Plots

In [None]:
plt.scatter(torch.log(posterior_samples['kernel.lengthscale']),torch.log(posterior_samples['kernel.variance']))
plt.title('Sample from posterior, N = 500')
plt.xlabel('log Lengthscale')
plt.ylabel('log Variance')
plt.savefig("results_B/Posterior_plot.png")

#### Diagnostics

In [None]:
data = az.from_pyro(mcmc)
summary = az.summary(data)
print(summary) 

In [None]:
az.plot_posterior(data)
plt.savefig("results_B/Diagnostics_posterior.png")

In [None]:
az.plot_trace(data)
plt.savefig("results_B/Diagnostics_datatrace.png")

#### Big plot

Add confidence area and $f(x)$

In [None]:
fig, ax = plt.subplots()
plot(model=gpr,ax=ax, plot_observed_data=True, plot_predictions=True, big_plot=True)
ax.set_xlabel("x"); ax.set_ylabel("y"); ax.set_title("Results of funciton fitting")
fig.savefig("results_B/funcfit.png")

# B2

In [None]:

def plotb(x,y,
    plot_observed_data=False,
    plot_predictions=False,
    n_prior_samples=0,
    model=None,
    kernel=None,
    n_test=500,
    ax=None,
    big_plot=False
):

    if ax is None:
        fig, ax = plt.subplots(figsize=(12, 6))
    if plot_observed_data:
        ax.plot(x.numpy(), y.numpy(), "kx")
    if plot_predictions:
        Xtest = torch.linspace(-1, 1, n_test)  # test inputs
        # compute predictive mean and variance
        with torch.no_grad():
            if type(model) == gp.models.VariationalSparseGP:
                mean, cov = model(Xtest, full_cov=True)
            else:
                mean, cov = model(Xtest, full_cov=True, noiseless=False)
        sd = cov.diag().sqrt()  # standard deviation at each input point x
        ax.plot(Xtest.numpy(), mean.numpy(), "r", lw=2)  # plot the mean
        ax.fill_between(
            Xtest.numpy(),  # plot the two-sigma uncertainty about the mean
            (mean - 2.0 * sd).numpy(),
            (mean + 2.0 * sd).numpy(),
            color="C0",
            alpha=0.3,
        )
    if n_prior_samples > 0:  # plot samples from the GP prior
        Xtest = torch.linspace(-1, 1, n_test)  # test inputs
        noise = (
            model.noise
            if type(model) != gp.models.VariationalSparseGP
            else model.likelihood.variance
        )
        cov = kernel.forward(Xtest) + noise.expand(n_test).diag()
        samples = dist.MultivariateNormal(
            torch.zeros(n_test), covariance_matrix=cov
        ).sample(sample_shape=(n_prior_samples,))
        ax.plot(Xtest.numpy(), samples.numpy().T, lw=2, alpha=0.4)
    if big_plot == True:
        x_seq = np.arange(start=-1,stop=1,step=0.005)
        ax.plot(Xtest, f(Xtest), color = 'green', label= "True f(x)")
        if len(x) > 5:
            ax.plot(x[5:], y[5:], "*", color = "black")
        legend_elements = [Line2D([0], [0], color='r', lw=4, label='Predicted f(x)'),
                   Line2D([0], [0], marker='s', color='w', label='Confidence area',
                            markerfacecolor='cornflowerblue', markersize=15),
                   Line2D([0], [0], marker='x', color='w', label='Observed points',
                            markeredgecolor='0'),
                    Line2D([0], [0], marker='*', color='w', label='Sampled points',
                            markeredgecolor='0'),
                    Line2D([0],[0], color='green', label="True f(x)")]
        ax.legend(handles=legend_elements, loc='lower right')

    ax.set_xlim(-1, 1)

In [None]:

T = 10
x = torch.clone(X).detach()
x_star = torch.linspace(-1,1,200)


store = []

for k in range(1,T+1):
    pyro.clear_param_store()
    y = f(x)
    kernel = gp.kernels.RBF(input_dim=3)
    #kernel.lengthscale = pyro.nn.PyroSample(dist.Beta(torch.tensor(1.5), torch.tensor(3.0)))
    #kernel.variance = pyro.nn.PyroSample(dist.Beta(torch.tensor(1.5), torch.tensor(3.0)))
    kernel.lengthscale = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(-1.0), torch.tensor(1.0)))
    kernel.variance = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(0.), torch.tensor(2.0)))
    gpr = gp.models.GPRegression(x, y, kernel, noise=torch.tensor(0.0001)) 
    # MCMC
    hmc_kernel = NUTS(gpr.model)
    mcmc = MCMC(hmc_kernel, num_samples=200, num_chains=6, warmup_steps=200)
    mcmc.run()
    posterior_samples = mcmc.get_samples(num_samples=500)
    lengthscales = posterior_samples['kernel.lengthscale']
    variances = posterior_samples['kernel.variance']

    kernel = gp.kernels.RBF(input_dim=3, lengthscale=lengthscales.mean(), variance=variances.mean())
    gpr = gp.models.GPRegression(x, y, kernel, noise=torch.tensor(0.0001))
    with torch.no_grad():
        m, cov = gpr(x_star, full_cov=True, noiseless=False)
        v = cov.diag().sqrt()
        d = torch.distributions.Normal(x_star@m, v*v + x_star@cov@x_star.T).sample() #???
        p = torch.argmin(d).unsqueeze(0) #???
        x = torch.cat((x,x_star[p]))
        y = f(x)
    
    if k == 1 or k == 5 or k == 10:
        store.append(copy(gpr))


In [None]:
fig, ax = plt.subplots(3,1)
fig.set_size_inches(13, 13)
y = f(x)
for i in range(3):
    plotb(x[:5+i*5], y[:5+i*5],model=store[i], plot_predictions=True, big_plot=True, plot_observed_data=True,ax=ax[i])
    ax[i].set_xlabel('x'); ax[i].set_ylabel('f(x)/predicted f(x)'); ax[i].set_title(f"Algorithm with k={i*5}")

#fig.savefig("results_B/optim.png")

#### Investigation of alternative method

##### 'Throw-away if closer than existing points, x' - method

In [None]:
def closeness_criterium(new_point, existing_points, boundary):
    return torch.min(torch.abs(new_point-existing_points)) >= boundary

In [None]:

T = 10
x = torch.clone(X).detach()
x_star = torch.linspace(-1,1,200)


store = []

for k in range(1,T+2):
    pyro.clear_param_store()
    y = f(x)
    kernel = gp.kernels.RBF(input_dim=3)
    kernel.lengthscale = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(-1.0), torch.tensor(1.0)))
    kernel.variance = pyro.nn.PyroSample(dist.LogNormal(torch.tensor(0.), torch.tensor(2.0)))
    gpr = gp.models.GPRegression(x, y, kernel, noise=torch.tensor(0.0001)) 
    # MCMC
    hmc_kernel = NUTS(gpr.model)
    mcmc = MCMC(hmc_kernel, num_samples=200, num_chains=6, warmup_steps=200)
    mcmc.run()
    posterior_samples = mcmc.get_samples(num_samples=500)
    lengthscales = posterior_samples['kernel.lengthscale']
    variances = posterior_samples['kernel.variance']

    kernel = gp.kernels.RBF(input_dim=3, lengthscale=lengthscales.mean(), variance=variances.mean())
    gpr = gp.models.GPRegression(x, y, kernel, noise=torch.tensor(0.0001))
    with torch.no_grad():
        m, cov = gpr(x_star, full_cov=True, noiseless=False)
        v = cov.diag().sqrt()
        d = torch.distributions.Normal(x_star@m, v*v + x_star@cov@x_star.T)
        sampl = d.sample()
        p = torch.argmin(sampl).unsqueeze(0)
        
        while not closeness_criterium(new_point = x_star[p], existing_points = x, boundary =0.05):
            sampl = d.sample()
            p = torch.argmin(sampl).unsqueeze(0)
        x = torch.cat((x,x_star[p]))
        
        y = f(x)
    
    if k == 1 or k == 6 or k == 11:
        store.append(copy(gpr))

In [None]:
fig, ax = plt.subplots(3,1)
fig.set_size_inches(13, 13)
y = f(x)
for i in range(3):
    plotb(x[:5+i*5], y[:5+i*5],model=store[i], plot_predictions=True, big_plot=True, plot_observed_data=True,ax=ax[i])
    ax[i].set_xlabel('x'); ax[i].set_ylabel('f(x)/predicted f(x)'); ax[i].set_title(f"Algorithm with k={i*5}")

fig.savefig("results_B/optim_close.png")