<a href="https://colab.research.google.com/github/Sabelz/Master_Thesis_Alexander/blob/main/utils/functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Functions for the project

# Imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Master_Thesis_Alexander
!git config --global user.email "alexander.sabelstrom.1040@student.uu.se"
!git config --global user.name "Sabelz"

import numpy as np
import matplotlib.pyplot as plt
import torch
!pip install gpytorch
import gpytorch
!pip install jaxopt
import jaxopt
import optax

from scipy.optimize import minimize
from torch.autograd import Variable
from matplotlib import pyplot as plt
import math
import jax
import jax.numpy as jnp
import time
from sklearn.metrics import mean_squared_error
from scipy.stats import norm

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Master_Thesis_Alexander


# Training Function

In [None]:
def train(model, likelihood, x_train, y_train, training_iter=10):
    """
    Trains a Gaussian Process (GP) model using the Adam optimizer and plots the training loss.

    Parameters:
    model (gpytorch.models.GP): The GP model.
    likelihood (gpytorch.likelihoods.GaussianLikelihood): The likelihood function used in the GP model.
    x_train (torch.Tensor): The training input data.
    y_train (torch.Tensor): The training output data.
    training_iter (int, optional): The number of training iterations. Default is 10.

    Returns:
    time (float): The time taken to train the model.

    Note:
    The function uses the torch library to perform the computations. If a GPU is available,
    it moves the model and likelihood to the GPU before training. The training loss is plotted
    against the iteration number.
    """

    if torch.cuda.is_available():
      model = model.cuda()
      likelihood = likelihood.cuda()
    model.train()
    likelihood.train()
    # Use the adam optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=0.2)  # Includes GaussianLikelihood parameters

    # "Loss" for GPs - the marginal log likelihood
    mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)
    loss_list = [] # Keep track of all losses
    # Time the training
    start = time.time()
    for i in range(training_iter):
        # Zero gradients from previous iteration
        optimizer.zero_grad()
        # Output from model
        output = model(x_train)
        # Calc loss and backprop gradients
        loss = -mll(output, y_train)
        loss_list.append(loss.item())
        loss.backward()
        optimizer.step()


    end = time.time()
    # Plot the training loss
    plt.plot(list(range(1, training_iter+1)), loss_list)
    plt.xlabel("Training Iteration")
    plt.ylabel("Loss")
    return end - start


# Training Function for Variational GPs

In [None]:
def train_ELBO(model, likelihood, x_train, y_train, training_iter=25, train_loader=None):
    """
    Trains a Gaussian Process (GP) model using the Adam optimizer and plots the training loss.
    The model is trained using the Variational Evidence Lower BOund (ELBO) as the loss function.

    Parameters:
    model (gpytorch.models.ApproximateGP): The GP model.
    likelihood (gpytorch.likelihoods.Likelihood): The likelihood function used in the GP model.
    x_train (torch.Tensor): The training input data.
    y_train (torch.Tensor): The training output data.
    training_iter (int, optional): The number of training iterations. Default is 25.
    train_loader (torch.utils.data.DataLoader, optional): The DataLoader for batch training.
     If None, the model is trained on the entire dataset at once. Default is None.

    Returns:
    time (float): The time taken to train the model.

    Note:
    The function uses the torch library to perform the computations. If a GPU is available,
    it moves the model and likelihood to the GPU before training. The training loss is plotted
    against the iteration number.
    """
    if torch.cuda.is_available():
      model = model.cuda()
      likelihood = likelihood.cuda()

    model.train()
    likelihood.train()
    # Initialize MLL
    n_points = y_train.numel() # Amount of training points
    # When training a variational Gaussian Process (GP) model like ApproximateGP,
    # you should use a variational marginal log likelihood (MLL) instead of the exact MLL.
    mll = gpytorch.mlls.VariationalELBO(likelihood, model, n_points) # Loss
    # Use the adam optimizer
    optimizer = torch.optim.Adam(list(model.parameters()) + list(likelihood.parameters()), lr=0.2)
    loss_list = []
    if(train_loader == None):
      # Time the training
      start = time.time()
      for i in range(training_iter):
          # Zero gradients from previous iteration
          optimizer.zero_grad()
          # Output from model
          output = model(x_train)
          # Calc loss and backprop gradients
          loss = -mll(output, y_train)
          print
          loss_list.append(loss.item())
          loss.backward()
          optimizer.step()

    else: # If train_loader defined, use it
      # Time the training
      start = time.time()
      for i in range(training_iter):
        for x_batch, y_batch in train_loader:
          # Zero gradients from previous iteration
          optimizer.zero_grad()
          # Output from model
          output = model(x_batch)
          # Calc loss and backprop gradients
          loss = -mll(output, y_batch)
          loss_list.append(loss.item())
          loss.backward()
          optimizer.step()



    end = time.time()
    # Plot the training loss
    plt.plot(list(range(1, len(loss_list)+1)), loss_list)
    plt.xlabel("Training Iteration")
    plt.ylabel("Loss")
    return end - start

# Predict Function

In [None]:
def predict(model, likelihood, test_x):
    """
    This function makes predictions using a given model and likelihood.

    The function sets the model and likelihood to evaluation mode,
    then computes the likelihood of the model's predictions on the test data.
    It uses PyTorch's `no_grad` context manager to avoid tracking gradients during the prediction,
    and GPyTorch's `fast_pred_var` setting for efficient computation.

    Parameters:
    model (gpytorch.models.GP): The Gaussian Process model to make predictions with.
    likelihood (gpytorch.likelihoods.Likelihood): The likelihood associated with the model.
    test_x (torch.Tensor): The test inputs to make predictions on.

    Returns:
    gpytorch.distributions.MultivariateNormal: The distribution of the model's predictions.
    """
    model.eval()
    likelihood.eval()
    # Make predictions by feeding model through likelihood
    with torch.no_grad(), gpytorch.settings.fast_pred_var():
        return likelihood(model(test_x))

# Plot Function GPyTorch

In [None]:
def plotGP(x_train, y_train, model, likelihood, title="GP Model"):
    """
    Plots the Gaussian Process (GP) model predictions along with the observed data.

    Parameters:
    x_train (torch.Tensor): The training input data.
    y_train (torch.Tensor): The training output data.
    model (gpytorch.models.GP): The GP model.
    likelihood (gpytorch.likelihoods.GaussianLikelihood): The likelihood function used in the GP model.
    title (str, optional): The title of the plot. Default is "GP Model".

    Note:
    The function uses matplotlib to create the plot. The plot includes the mean predictions,
    a confidence region based on the variances, and the observed data. The x-axis limits are set
    based on the minimum and maximum values of the training data.
    """
    # Find min and max value of training set
    min_value, max_value = min(x_train), max(x_train)
    # Create points between min and max values
    x_plot = torch.linspace(min_value, max_value, 1000)
    model.eval(), likelihood.eval()
    # Evaluate on plot values
    prediction = likelihood(model(x_plot))
    model.train(), likelihood.train()
    mean = prediction.mean
    variance = prediction.variance
    with torch.no_grad(), gpytorch.settings.fast_pred_var():
      # Initalize plot
      plt.style.use('default')
      _, ax = plt.subplots(1, 1)

      # Confidence region
      lower_bound = mean-(1.96*(np.sqrt(variance)))
      upper_bound = mean+(1.96*(np.sqrt(variance)))

      ax.plot(x_train.detach().numpy(), y_train.detach().numpy(), 'ko', label='Observed Data', alpha = 0.7)
      # Plot predictive means
      ax.plot(x_plot.detach().numpy(), mean.detach().numpy(), 'purple', label='Mean')
      # Plot confidence bounds as lightly shaded region
      ax.fill_between(x_plot.detach().numpy(), lower_bound.detach().numpy(),
                      upper_bound.detach().numpy(), alpha=0.5, color="violet", zorder=-1, label ='95% Confidence')
      ax.set_title(title)
      ax.legend(loc = "best")
      plt.grid(False)
      ax.plot

# Plot Function for State Space Model

In [None]:
# For plotting state space with different hyperparameters
def plotGP_SS(x_train, y_train, ell=1, sigma=1, m0=0, v0=1, title="GP Model", n_test_points=1000):
    """
    Plots the State Space Gaussian Process (SSGP) model predictions along with the observed data.

    Parameters:
    x_train (jax.numpy.ndarray): The training input data.
    y_train (jax.numpy.ndarray): The training output data.
    ell (float, optional): The length-scale parameter for the SSGP model. Default is 1.
    sigma (float, optional): The signal variance parameter for the SSGP model. Default is 1.
    m0 (float, optional): The initial mean for the Kalman filter. Default is 0.
    v0 (float, optional): The initial variance for the Kalman filter. Default is 1.
    title (str, optional): The title of the plot. Default is "GP Model".
    n_test_points (int, optional): The number of test points to generate between the minimum and maximum values of the training data. Default is 1000.

    Note:
    The function uses matplotlib to create the plot. The plot includes the mean predictions,
    a confidence region based on the variances, and the observed data. The x-axis limits are set
    based on the minimum and maximum values of the training data.
    """
    # Find min and max value of training set
    min_value, max_value = min(x_train).numpy(), max(x_train).numpy()
    # Create points between min and max values
    x_test = np.linspace(min_value, max_value, 1000)

    all_points = jnp.concatenate([x_train.numpy(), x_test])
    temporal_order = jnp.argsort(all_points)
    # State Space X's and Y's
    ss_xs = all_points[temporal_order]
    ss_ys = jnp.concatenate([y_train.numpy(), jnp.nan * jnp.ones((n_test_points, ))])[temporal_order]
    # Compute the equivalent SS model
    dts = jnp.diff(ss_xs, prepend=min_value.item())
    mfs, vfs, mps, vps, _ = kalmanFilter(ss_ys, dts, ell, sigma, m0 = m0, v0=v0)
    # Smoothed means and variances
    mss, vss = kalmanSmoothing(ell, dts, mfs, vfs, mps, vps)

    # Posterior distribution
    ss_posterior_mean = mss[jnp.isnan(ss_ys)]
    ss_posterior_var = vss[jnp.isnan(ss_ys)]

    plt.style.use('default')
    _, ax = plt.subplots(1, 1)

    ax.scatter(x_train.detach().numpy(), y_train.detach().numpy(), color='k', marker='o', label='Observed data', alpha=0.7)
    ax.plot(x_test, ss_posterior_mean, label="Mean", color = 'purple', alpha = 1)
    ax.fill_between(x_test,
                        ss_posterior_mean - 1.96 * jnp.sqrt(ss_posterior_var),
                        ss_posterior_mean + 1.96 * jnp.sqrt(ss_posterior_var),
                        alpha=0.5,
                      label="95% Confidence", color = "violet", zorder=-1)
    ax.set_title(title)
    ax.set_xlim([min_value, max_value])
    ax.legend(loc="best")
    plt.grid(False)
    ax.plot

#Plot State Space GP

In [None]:
def plot_SSGP(x_train, y_train, x_test, means, variances, title= "GP Model"):
    """
    Plots the State Space Gaussian Process (SSGP) model predictions along with the observed data.

    Parameters:
    x_train (jax.numpy.ndarray): The training input data.
    y_train (jax.numpy.ndarray): The training output data.
    x_test (jax.numpy.ndarray): The test input data.
    means (jax.numpy.ndarray): The mean predictions of the SSGP model.
    variances (jax.numpy.ndarray): The variance predictions of the SSGP model.
    title (str, optional): The title of the plot. Default is "GP Model".

    Note:
    The function uses matplotlib to create the plot. The plot includes the mean predictions,
    a confidence region based on the variances, and the observed data. The x-axis limits are set
    based on the minimum and maximum values of the training data.
    """
    # Find min and max value of training set
    min_value, max_value = min(x_train).numpy(), max(x_train).numpy()
    temporal_order = jnp.argsort(x_test.numpy())

    # State Space X's and Y's
    x_test_order = x_test.numpy()[temporal_order]
    # Assuming means and variances already ordered
    plt.style.use('default')
    _, ax = plt.subplots(1, 1)
    ax.plot(x_test_order, means, label = "Mean", color = "purple")
    ax.scatter(x_train, y_train, label='Observed data', color = "k", marker="o", alpha = 0.7)
    ax.fill_between(x_test_order,
                        means - 1.96 * jnp.sqrt(variances),
                        means + 1.96 * jnp.sqrt(variances),
                        alpha=0.3,
                        label = "Confidence Region", color="violet", zorder=-1)
    ax.set_title(title)
    #ax.set_xlim([min_value, max_value])
    ax.legend(loc="best")
    plt.grid(False)

# Kalman Filter and Smoother

In [None]:
def kalmanFilter(ss_ys, dts, ell, sigma, m0=0, v0=1, observation_cov=1):
    """
    Implements the Kalman Filter algorithm for a given set of observations.

    The function consists of two nested functions: `update` and `scan_body`.
    The `update` function is responsible for updating the mean and variance
    based on the observation and the observation covariance. The `scan_body`
    function is used to scan through the observations and update the mean and
    variance accordingly.

    The function returns four arrays: `mfs`, `vfs`, `mps`, and `vps` which represent
    the filtered means, filtered variances, predicted means, and predicted variances
    respectively.

    Note: This function uses the `jax.lax.scan` function for efficient looping over
    the observations, and `jax.lax.cond` for conditionally updating the mean and
    variance based on whether the observation is NaN.

    Args:
        m0 (float, optional): Initial mean for the Kalman filter. Defaults to 0.
        v0 (float, optional): Initial variance for the Kalman filter. Defaults to 1.
        ss_ys (array): Observations in the state space model.
        Fs (array): Array of transition matrices.
        Ws (array): Process noise covariance in the state space model.
        observation_cov (float, optional): Observation covariance. Defaults to 1.

    Returns:
        mfs (array): Filtered means
        vfs (array): Filtered variances
        mps (array): Predicted means
        vps (array): Predicted variances
    """
    Fs = jnp.exp(-1 / ell * dts)
    Ws = sigma ** 2 * (1 - jnp.exp(-2 / ell * dts))
    def update(y, mp, vp):
        S = vp + observation_cov
        K = vp / S
        v = y - mp
        mf = mp + K * v
        vf = vp - K * K * S
        return mf, vf, -jax.scipy.stats.norm.logpdf(y, mp, jnp.sqrt(S))

    def scan_body(carry, elem):
        mf, vf, nll = carry
        y, F, W = elem

        mp = F * mf
        vp = F * vf * F + W

        mf, vf, nll_inc = jax.lax.cond(jnp.isnan(y),
                                        lambda _: (mp, vp, 0.),
                                        lambda _: update(y, mp, vp),
                                        None)
        nll = nll + nll_inc

        return (mf, vf, nll), (mf, vf, mp, vp)
    (_, _, nll), (mfs, vfs, mps, vps) = jax.lax.scan(scan_body, (m0, v0, 0.), (ss_ys, Fs, Ws))
    return mfs, vfs, mps, vps, nll


def kalmanSmoothing(ell, dts, mfs, vfs, mps, vps):
    """
    Implements the Kalman Smoothing algorithm for a given set of filtered means and variances.

    The function consists of a nested function: `scan_body`. The `scan_body` function is used to
    scan through the filtered means and variances and update the smoothed means and variances accordingly.

    The function returns two arrays: `mss` and `vss` which represent the smoothed means and smoothed variances respectively.

    Note: This function uses the `jax.lax.scan` function for efficient looping over the filtered means and variances.

    Args:
        Fs (array): Array of transition matrices
        mfs (array): Filtered means
        vfs (array): Filtered variances
        mps (array): Predicted means
        vps (array): Predicted variances

    Returns:
        mss (array): Smoothed means
        vss (array): Smoothed variances
    """
    Fs = jnp.exp(-1 / ell * dts)
    def scan_body(carry, elem):
        ms, vs = carry
        mf, vf, mp, vp, F = elem

        G = vf * F / vp
        ms = mf + G * (ms - mp)
        vs = vf + G * (vs - vp) * G
        return (ms, vs), (ms, vs)

    _, smoothing_results = jax.lax.scan(scan_body,
                                        (mfs[-1], vfs[-1]),
                                        (mfs[:-1], vfs[:-1], mps[1:], vps[1:], Fs[1:]),
                                        reverse=True)
    mss = jnp.concatenate([smoothing_results[0], mfs[-1, None]], axis=0)
    vss = jnp.concatenate([smoothing_results[1], vfs[-1, None]], axis=0)
    return (mss, vss)

# Kalman Filter with PyTorch instead

In [None]:
def kalmanFilter_torch(ss_ys, dts, ell, sigma, m0=0, v0=1, observation_cov=1):
    """
    Implements the Kalman filter algorithm for a State Space Gaussian Process (SSGP) model using PyTorch.

    Parameters:
    ss_ys (torch.Tensor): The state space output data.
    dts (torch.Tensor): The time differences between consecutive state space points.
    ell (float): The length-scale parameter for the SSGP model.
    sigma (float): The signal variance parameter for the SSGP model.
    m0 (float, optional): The initial mean for the Kalman filter. Default is 0.
    v0 (float, optional): The initial variance for the Kalman filter. Default is 1.
    observation_cov (float, optional): The observation covariance for the Kalman filter. Default is 1.

    Returns:
    mfs (torch.Tensor): The filtered means.
    vfs (torch.Tensor): The filtered variances.
    mps (torch.Tensor): The predicted means.
    vps (torch.Tensor): The predicted variances.
    nll (float): The negative log likelihood.

    Note:
    The function uses the torch library to perform the computations.
    """
    Fs = torch.exp(-1 / ell * dts)
    Ws = sigma ** 2 * (1 - torch.exp(-2 / ell * dts))

    def update(y, mp, vp):
        S = vp + observation_cov
        K = vp / S
        v = y - mp
        mf = mp + K * v
        vf = vp - K * K * S
        nll = -torch.distributions.Normal(mp, torch.sqrt(S)).log_prob(y)
        return mf, vf, nll

    mfs, vfs, nlls = [], [], []
    mf, vf, nll = m0, v0, 0.
    for y, F, W in zip(ss_ys, Fs, Ws):
        mp = F * mf
        vp = F * vf * F + W
        if torch.isnan(y):
            mf, vf, nll_inc = mp, vp, 0.
        else:
            mf, vf, nll_inc = update(y, mp, vp)
        nll += nll_inc
        mfs.append(mf)
        vfs.append(vf)
        nlls.append(torch.tensor(nll, requires_grad=True))

    mfs = torch.stack(mfs)
    vfs = torch.stack(vfs)
    mps = Fs * mfs
    vps = Fs * vfs * Fs + Ws
    nll = torch.sum(torch.stack(nlls))

    return mfs, vfs, mps, vps, nll

# Train function for State Space GPs

In [None]:
def train_SSGP(x_train, y_train, x_test, ell=1, sigma=1, training_iterations=25):
    """
    Trains a State Space Gaussian Process (SSGP) model using JAX and the Adam optimizer.

    This function takes training and test data as input, along with initial guesses for the
    length scale (ell) and signal variance (sigma) hyperparameters of the Gaussian Process.
    It performs optimization to minimize the negative log-likelihood (NLL) of the model
    using the Adam optimizer from the optax library. The function also handles the conversion
    of PyTorch tensors to NumPy arrays for compatibility with JAX.

    Parameters:
    x_train (torch.Tensor): The input features for training, as a PyTorch tensor.
    y_train (torch.Tensor): The target values for training, as a PyTorch tensor.
    x_test (torch.Tensor): The input features for testing, as a PyTorch tensor.
    ell (float, optional): The initial guess for the length scale hyperparameter. Default is 1.
    sigma (float, optional): The initial guess for the signal variance hyperparameter. Default is 1.
    training_iterations (int, optional): The number of iterations for the optimizer to run. Default is 25.

    Returns:
    tuple: A tuple containing the optimized length scale (ell), signal variance (sigma),
           and the time taken for training (in seconds).

    Note:
    The function internally uses a bijection to ensure the positivity of the parameters during optimization.
    It also uses a Kalman filter implementation (`kalmanFilter`) for the NLL calculation,
    which should be defined elsewhere in the code. The inputs are expected to be PyTorch tensors,
    which are converted to NumPy arrays for processing with JAX.
    """
    n_test_points = len(x_test)
    all_points = jnp.concatenate([x_train.numpy(), x_test.numpy()])
    temporal_order = jnp.argsort(all_points)

    # State Space X's and Y's
    ss_xs = all_points[temporal_order]
    ss_ys = jnp.concatenate([y_train.numpy(), jnp.nan * jnp.ones((n_test_points, ))])[temporal_order]
    t0 = min(x_train).numpy().item()
    dts = jnp.diff(ss_xs, prepend=t0)

    # Bijection for the positivity of the parameters in the optimisation
    def bijection(x):
        return jnp.exp(x)


    def loss_fn(params):
        # Unpack parameters
        ell, sigma = bijection(params)
        *_, nll = kalmanFilter(ss_ys, dts, ell, sigma)
        return nll

    init_params = jnp.array(jnp.array([1., 1.]))  # Initial parameters
    # Create an instance of the Adam optimizer from optax
    solver = optax.adam(learning_rate=0.2)
    opt_state = solver.init(init_params)
    # Run optimization
    start_time = time.time() # Time it
    for _ in range(training_iterations):
      grad = jax.grad(loss_fn)(init_params)
      updates, opt_state = solver.update(grad, opt_state, init_params)
      init_params = optax.apply_updates(init_params, updates)
    end_time = time.time()
    opt_ell, opt_sigma = bijection(init_params)


    return opt_ell, opt_sigma, end_time-start_time

Train function for State Space GPs in pytorch

In [None]:
def train_SSGP_torch(x_train, y_train, x_test, ell=1, sigma=1, training_iterations=25):
    """
    Trains a State Space Gaussian Process (SSGP) model using PyTorch and the Adam optimizer.

    This function takes training and test data as input, along with initial guesses for the
    length scale (ell) and signal variance (sigma) hyperparameters of the Gaussian Process.
    It performs optimization to minimize the negative log-likelihood (NLL) of the model
    using the Adam optimizer. The function also handles CUDA availability for GPU acceleration.

    Parameters:
    x_train (torch.Tensor): The input features for training.
    y_train (torch.Tensor): The target values for training.
    x_test (torch.Tensor): The input features for testing.
    ell (float, optional): The initial guess for the length scale hyperparameter. Default is 1.
    sigma (float, optional): The initial guess for the signal variance hyperparameter. Default is 1.
    training_iterations (int, optional): The number of iterations for the optimizer to run. Default is 25.

    Returns:
    tuple: A tuple containing the optimized length scale (ell), signal variance (sigma),
           and the time taken for training (in seconds).

    Note:
    The function internally uses a bijection to ensure the positivity of the parameters during optimization.
    It also uses a Kalman filter implementation (`kalmanFilter_torch`) for the NLL calculation,
    which should be defined elsewhere in the code.
    """
    if torch.cuda.is_available():
      x_train = x_train.cuda()
      y_train = y_train.cuda()
      x_test = x_test.cuda()
    n_test_points = len(x_test)
    all_points = torch.cat([x_train, x_test])
    temporal_order = torch.argsort(all_points)

    # State Space X's and Y's
    ss_xs = all_points[temporal_order]
    ss_ys = torch.cat([y_train, torch.tensor([float('nan')] * n_test_points)])[temporal_order]
    t0 = torch.tensor([min(x_train).item()])
    if torch.cuda.is_available():
      t0 = t0.cuda()
    dts = torch.diff(ss_xs, prepend=t0)

    # Bijection for the positivity of the parameters in the optimisation
    def bijection(x):
        return torch.exp(x)

    def loss_fn(params):
        # Unpack parameters
        ell, sigma = bijection(params)
        *_, nll = kalmanFilter_torch(ss_ys, dts, ell, sigma)
        return nll

    # Run optimisation
    init_params = torch.log(torch.tensor([1., 1.]))  # Initial parameters
    if torch.cuda.is_available():
      init_params = init_params.cuda()
    init_params = Variable(init_params, requires_grad=True)

    def loss_step():
        opt.zero_grad()
        loss = loss_fn(init_params)
        loss.backward()
        return loss

    opt = torch.optim.Adam([init_params], lr=0.1)

    start_time = time.time() # Time it

    # Run the optimizer for a specific number of iterations
    for _ in range(training_iterations):
        opt.step(loss_step)

    end_time = time.time()

    opt_ell, opt_sigma = bijection(init_params.data)

    return opt_ell, opt_sigma, end_time-start_time


# Error metrics

In [None]:
def error_metrics(x_test, y_test, predictions, variances, confidence_level=0.95, scaler=None):
    """
    Compute various error metrics for the given test data and predictions.

    Parameters:
    x_test (torch.Tensor): The test input data.
    y_test (torch.Tensor): The true output data for the test set.
    predictions (torch.Tensor): The model's predictions for the test data.
    variances (torch.Tensor): The variances of the model's predictions.
    confidence_level (float, optional): The confidence level for the prediction interval. Default is 0.95.
    scaler (StandardScaler, optional): A fitted StandardScaler object if the 'y_test' was scaled. Default is None.

    Returns:
    RMSE (float): The Root Mean Square Error between the true output and the predictions.
    NLPD (float): The Negative Log Predictive Density.
    PICP (float): The Prediction Interval Coverage Probability.
    MPIW (float): The Mean Prediction Interval Width.

    Note:
    The function assumes that the input tensors are on a GPU and moves them to CPU.
    """
    x_test_cpu = x_test.cpu()
    y_test_cpu = y_test.cpu()
    predictions_cpu = predictions.cpu()
    variances_cpu = variances.cpu()
    # Root Mean Square Error (RMSE) ---
    if scaler is not None:
      # Assuming 'y_test' was scaled with 'scaler'
      y = scaler.inverse_transform(y_test_cpu)
      p = scaler.inverse_transform(predictions_cpu)
    else:
      y = y_test_cpu
      p = predictions_cpu
    RMSE = mean_squared_error(y, p, squared=False)

    # Negative Log Predictive Density (NLPD) ---

    NLPD = torch.mean(0.5 * (np.log(2 * np.pi * variances_cpu))
     + ((predictions_cpu - y_test_cpu) ** 2 / (variances_cpu*2))).item()
    # Prediction Interval Coverage Probability (PICP) ---
    z = norm.ppf(1-(1-confidence_level)/2)
    # Confidence interval
    lower_bound = (predictions_cpu - z*np.sqrt(variances_cpu))
    upper_bound = (predictions_cpu + z*np.sqrt(variances_cpu))
    PICP = np.mean((y_test_cpu.numpy() >= lower_bound.numpy()) & (y_test_cpu.numpy() <= upper_bound.numpy()))

    # Mean Prediction Interval Width (MPIW) ---
    MPIW = np.mean(2 * z * np.sqrt(variances_cpu.numpy()))

    return RMSE, NLPD, PICP, MPIW

# Bar Plot Function

In [None]:
def plot_bar(names, values, title, xlabel, ylabel):
    """
    Creates a bar plot with the given data and labels.

    Parameters:
    names (list): A list of names for the bars. Must be the same length as 'values'.
    values (list): A list of values for the bars. Must be the same length as 'names'.
    title (str): The title of the plot.
    xlabel (str): The label for the x-axis.
    ylabel (str): The label for the y-axis.

    Raises:
    ValueError: If 'names' and 'values' are not the same length.

    Note:
    The function uses matplotlib to create the plot. The plot is displayed immediately.
    """
    # Check if the lengths of times and models are equal
    if len(names) != len(values):
        raise ValueError("The lengths of names and values must be equal.")


    # Create a bar plot
    plt.figure(figsize=(10, 6))
    plt.bar(names, values, color='purple')

    # Add labels and title
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)

    # Display the plot
    plt.show()