In [35]:
import sys
sys.path.append('/Users/jiangxiaoyu/Desktop/All Projects/GPLVM_project_code/')
import yaml
import torch
# from models_.lvmogp_svi import LVMOGP_SVI
from modules.prepare_and_train_model import LVMOGP_SVI 
from models_.gaussian_likelihood import GaussianLikelihood
from modules.prepare_data import *
from util_functions import *

In [36]:
# Specify model and likelihood

with open('/Users/jiangxiaoyu/Desktop/All Projects/GPLVM_project_code/configs/train_lvmogp_config.yaml', 'r') as file:
    config = yaml.safe_load(file)

my_model = LVMOGP_SVI(
    n_outputs = config['n_outputs'],
    n_input = config['n_input_train'],
    input_dim = config['input_dim'],
    latent_dim = config['latent_dim'],
    n_inducing_input = config['n_inducing_input'],
    n_inducing_latent = config['n_inducing_latent'],
    data_Y = None,
    pca = config['pca'],
    learn_inducing_locations_latent = config['learn_inducing_locations_latent'],
    learn_inducing_locations_input = config['learn_inducing_locations_input'],
    latent_kernel_type = config['latent_kernel_type'],
    input_kernel_type = config['input_kernel_type']
)

my_likelihood = GaussianLikelihood()

# Load trained model and likelihood

model_state_dict = torch.load(config['model_path'])
my_model.load_state_dict(model_state_dict)

likelihood_state_dict = torch.load(config['likelihood_path'])
my_likelihood.load_state_dict(likelihood_state_dict)

my_model.eval()
my_likelihood.eval()

GaussianLikelihood(
  (noise_covar): HomoskedasticNoise(
    (raw_noise_constraint): GreaterThan(1.000E-04)
  )
)

In [37]:
for name, param in my_model.named_parameters():
    print(name)

variational_strategy.inducing_points_latent
variational_strategy.inducing_points_input
variational_strategy._variational_distribution.variational_mean
variational_strategy._variational_distribution.chol_variational_covar_latent
variational_strategy._variational_distribution.chol_variational_covar_input
X.q_mu
X.q_log_sigma
covar_module_latent.raw_outputscale
covar_module_latent.base_kernel.raw_lengthscale
covar_module_input.raw_outputscale
covar_module_input.base_kernel.raw_lengthscale


In [38]:
# print(my_model.covar_module_latent.outputscale.data)
# print(my_model.covar_module_latent.base_kernel.lengthscale.data.reshape(-1))

# kernel(h_1, h_2) = outputscale * exp(-0.5 * [ (h_11 - h_21 / lengthscale_1)^2 + (h_12 - h_22 / lengthscale_2)^2 ])

In [39]:
from linear_operator.operators import CholLinearOperator, KroneckerProductLinearOperator
from linear_operator.operators import (
    LinearOperator,
    TriangularLinearOperator,
)
from gpytorch.settings import _linalg_dtype_cholesky

def _cholesky_factor(induc_induc_covar: LinearOperator) -> TriangularLinearOperator:
    L = psd_safe_cholesky(to_dense(induc_induc_covar).type(_linalg_dtype_cholesky.value()), max_tries=4)
    return TriangularLinearOperator(L)

K_uu_latent = my_model.covar_module_latent(my_model.variational_strategy.inducing_points_latent.data).to_dense().to(torch.float64)
K_uu_latent_inv = torch.linalg.solve(K_uu_latent, torch.eye(K_uu_latent.size(-1)).to(torch.float64))
K_uu_input = my_model.covar_module_input(my_model.variational_strategy.inducing_points_input.data).to_dense().to(torch.float64)
K_uu_input_inv = torch.linalg.solve(K_uu_input, torch.eye(K_uu_input.size(-1)).to(torch.float64))

K_uu = KroneckerProductLinearOperator(K_uu_latent, K_uu_input).to_dense().data
# chol_K_uu_inv_t = _cholesky_factor_latent(KroneckerProductLinearOperator(K_uu_latent_inv, K_uu_input_inv)).to_dense().data.t()
chol_K_uu_inv_t = KroneckerProductLinearOperator(
        torch.linalg.solve( _cholesky_factor(K_uu_latent).to_dense().data, torch.eye(K_uu_latent.size(-1)).to(torch.float64)),
        torch.linalg.solve( _cholesky_factor(K_uu_input).to_dense().data, torch.eye(K_uu_input.size(-1)).to(torch.float64)),
    ).to_dense().data.t()
# ------------------------------------------------------------------------------------------------------------------------------------------------------------------

chol_covar_latent_u = my_model.variational_strategy._variational_distribution.chol_variational_covar_latent.data.to(torch.float64)
covar_latent_u = CholLinearOperator(chol_covar_latent_u).to_dense()
chol_covar_input_u = my_model.variational_strategy._variational_distribution.chol_variational_covar_input.data.to(torch.float64)
covar_input_u = CholLinearOperator(chol_covar_input_u).to_dense()

covar_u = KroneckerProductLinearOperator(covar_latent_u, covar_input_u).to_dense().data

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------

common_background_information = {
                    'K_uu': K_uu.data,
                    'chol_K_uu_inv_t': chol_K_uu_inv_t.data, 
                    'm_u': my_model.variational_strategy._variational_distribution.variational_mean.data,
                    'Sigma_u': covar_u.data,
                    'A': chol_K_uu_inv_t @ (covar_u - torch.eye(covar_u.shape[0])) @ chol_K_uu_inv_t.t(),
                    'var_H': my_model.covar_module_latent.outputscale.data,
                    'var_X': my_model.covar_module_input.outputscale.data,
                    'W': my_model.covar_module_latent.base_kernel.lengthscale.data.reshape(-1)**2
                    }
'''
chol_K_uu_inv_t: cholesky of inverse of K_uu matrix, of shape (M_H * M_X, M_H * M_X)
m_u: mean of the variational distribution
Sigma_u: covariance matrix of the variational distribution
A: chol_K_uu_inv_t (Sigma_u - I) chol_K_uu_inv_t.T
var_H: 
var_X: 
W: vector; containing all lengthscales in the RAD kernel
c: constant
'''
c = (2 * torch.pi)**(config['latent_dim'] / 2) * common_background_information['var_H'] * common_background_information['W'].sqrt().prod()
common_background_information['constant_c'] = c

In [40]:
def integration_prediction_func(test_input, output_index, my_model, common_background_information):

    input_K_f_u = my_model.covar_module_input(test_input, my_model.variational_strategy.inducing_points_input.data).to_dense().data
    input_K_u_f_K_f_u = input_K_f_u.t() @ input_K_f_u

    data_specific_background_information = {
            'm_plus': my_model.X.q_mu.data[output_index],
            'Sigma_plus': my_model.X.q_log_sigma.exp().square().data[output_index],
            'input_K_f_u': input_K_f_u, 
            'input_K_u_f_K_f_u': input_K_u_f_K_f_u,
            'expectation_K_uu': None
            }
    
    # helper functions -----------------------------------------------------------------------------------------------------------------------

    def multivariate_gaussian_pdf(x, mu, cov):
        '''cov is a vector, representing all elements in the diagonal matrix'''
        k = mu.size(0)
        cov_det = cov.prod()
        cov_inv = torch.diag(1.0 / cov)
        norm_factor = torch.sqrt((2 * torch.pi) ** k * cov_det)

        x_mu = x - mu
        result = torch.exp(-0.5 * x_mu @ cov_inv @ x_mu.t()) / norm_factor
        return result.item()

    def G(h:Tensor, common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):

        mu = data_specific_background_information['m_plus']
        cov_diag = data_specific_background_information['Sigma_plus'] + common_background_information['W']
        result = multivariate_gaussian_pdf(h, mu, cov_diag)
        return common_background_information['constant_c'] * result

    def R(h_1:Tensor, h_2:Tensor, common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        mu_1 = h_2
        cov_diag_1 = 2 * common_background_information['W']
        mu_2 = (h_1 + h_2) / 2
        cov_diag_2 = 0.5 * common_background_information['W'] + data_specific_background_information['Sigma_plus']
        result1 = multivariate_gaussian_pdf(h_1, mu_1, cov_diag_1)
        result2 = multivariate_gaussian_pdf(data_specific_background_information['m_plus'], mu_2, cov_diag_2)
        return (common_background_information['constant_c'] ** 2 ) * result1 * result2
    
    def expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        result_ = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_f_u'].reshape(1, -1), data_specific_background_information['input_K_f_u'].reshape(1, -1)).to_dense().data 
        result_ = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype) @ common_background_information['m_u'].to(result_.dtype)
        return result_
        
    def expectation_lambda_square(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        result_ = common_background_information['m_u']
        _result = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype)
        interm_term = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_u_f_K_f_u'], data_specific_background_information['input_K_u_f_K_f_u']).to_dense().data
        result_ = _result @ interm_term.to(result_.dtype) @ _result.t()
        # result_ = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype) @ common_background_information['m_u']

        if data_specific_background_information['expectation_K_uu'] == None:
            data_specific_background_information['expectation_K_uu'] = interm_term
        return result_
        
    def expectation_gamma(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        result_ = common_background_information['var_H'] * common_background_information['var_X']

        if data_specific_background_information['expectation_K_uu'] == None:
            data_specific_background_information['expectation_K_uu'] = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_u_f_K_f_u'], \
                                                                                                    data_specific_background_information['input_K_u_f_K_f_u']).to_dense().data

        return result_ + (common_background_information['A'] * data_specific_background_information['expectation_K_uu']).sum()
    
    def integration_predictive_mean(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        return expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information)


    def integration_predictive_var(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
        return expectation_lambda_square(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information) \
            + expectation_gamma(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information) \
            - expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information)**2
    
    # ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    
    expectation_latent_K_f_u = Tensor([G(my_model.variational_strategy.inducing_points_latent.data[i]).item() for i in range(config['n_inducing_latent'])])
    expectation_latent_K_u_f_K_f_u = Tensor([R(my_model.variational_strategy.inducing_points_latent.data[i], my_model.variational_strategy.inducing_points_latent.data[j]).item() \
                                            for j in range(config['n_inducing_latent']) for i in range(config['n_inducing_latent'])]).reshape(config['n_inducing_latent'], config['n_inducing_latent'])

    data_specific_background_information['expectation_latent_K_f_u'] = expectation_latent_K_f_u
    data_specific_background_information['expectation_latent_K_u_f_K_f_u'] = expectation_latent_K_u_f_K_f_u

    return integration_predictive_mean(data_specific_background_information=data_specific_background_information), \
           integration_predictive_var(data_specific_background_information=data_specific_background_information)



In [43]:
output_index = 14
for i in range(20):
    test_input = Tensor([i-10])
    print(integration_prediction_func(test_input, output_index, my_model, common_background_information))

(tensor([0.1815]), tensor([18.3273]))
(tensor([0.7284]), tensor([53.7530]))
(tensor([1.1499]), tensor([15.6998]))
(tensor([0.4806]), tensor([21.6984]))
(tensor([-0.2372]), tensor([48.8876]))
(tensor([-0.7895]), tensor([51.2051]))
(tensor([-1.0141]), tensor([11.4194]))
(tensor([0.0224]), tensor([0.2853]))
(tensor([0.2942]), tensor([4.4255]))
(tensor([-1.5231]), tensor([7.8596]))
(tensor([-0.3446]), tensor([3488.4443]))
(tensor([0.2836]), tensor([34025.7695]))
(tensor([-0.5187]), tensor([22782.6660]))
(tensor([1.0543]), tensor([5264.1099]))
(tensor([0.3456]), tensor([5630.4941]))
(tensor([-1.0292]), tensor([91.9700]))
(tensor([-1.2787]), tensor([49.1571]))
(tensor([-0.8351]), tensor([25.8974]))
(tensor([-0.8117]), tensor([59.5811]))
(tensor([-1.7658]), tensor([32.5957]))


In [42]:

"""
input_K_f_u = my_model.covar_module_input(test_input, my_model.variational_strategy.inducing_points_input).to_dense().data
print(input_K_f_u.shape)

input_K_u_f_K_f_u = input_K_f_u.t() @ input_K_f_u
print(input_K_u_f_K_f_u.shape)

i = 0 # test data index

data_specific_background_information = {
    'm_plus': my_model.X.q_mu.data[i],
    'Sigma_plus': my_model.X.q_log_sigma.exp().data[i],
    'input_K_f_u': input_K_f_u,
    'input_K_u_f_K_f_u': input_K_u_f_K_f_u,
    'expectation_K_uu': None
}

'''
input_K_f_u: 1 * #input_inducing_points 
input_K_u_f_K_f_u: #input_inducing_points  *  #input_inducing_points 
'''
def multivariate_gaussian_pdf(x, mu, cov):
    '''cov is a vector, representing all elements in the diagonal matrix'''
    k = mu.size(0)
    cov_det = cov.prod()
    cov_inv = torch.diag(1.0 / cov)
    norm_factor = torch.sqrt((2 * torch.pi) ** k * cov_det)

    x_mu = x - mu
    result = torch.exp(-0.5 * x_mu @ cov_inv @ x_mu.t()) / norm_factor
    return result.item()


def G(h:Tensor, common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):

    mu = data_specific_background_information['m_plus']
    cov_diag = data_specific_background_information['Sigma_plus'] + common_background_information['W']
    result = multivariate_gaussian_pdf(h, mu, cov_diag)
    return common_background_information['constant_c'] * result


def R(h_1:Tensor, h_2:Tensor, common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    mu_1 = h_2
    cov_diag_1 = 2 * common_background_information['W']
    mu_2 = (h_1 + h_2) / 2
    cov_diag_2 = 0.5 * common_background_information['W'] + data_specific_background_information['Sigma_plus']
    result1 = multivariate_gaussian_pdf(h_1, mu_1, cov_diag_1)
    result2 = multivariate_gaussian_pdf(data_specific_background_information['m_plus'], mu_2, cov_diag_2)

    return (common_background_information['constant_c'] ** 2 ) * result1 * result2

expectation_latent_K_f_u = Tensor([G(my_model.variational_strategy.inducing_points_latent.data[i]).item() for i in range(config['n_inducing_latent'])])
expectation_latent_K_u_f_K_f_u = Tensor([R(my_model.variational_strategy.inducing_points_latent.data[i], my_model.variational_strategy.inducing_points_latent.data[j]).item() \
                                         for j in range(config['n_inducing_latent']) for i in range(config['n_inducing_latent'])]).reshape(config['n_inducing_latent'], config['n_inducing_latent'])

data_specific_background_information['expectation_latent_K_f_u'] = expectation_latent_K_f_u
data_specific_background_information['expectation_latent_K_u_f_K_f_u'] = expectation_latent_K_u_f_K_f_u

def expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    result_ = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_f_u'].reshape(1, -1), data_specific_background_information['input_K_f_u'].reshape(1, -1)).to_dense().data 
    result_ = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype) @ common_background_information['m_u'].to(result_.dtype)
    return result_
    

def expectation_lambda_square(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    result_ = common_background_information['m_u']
    result_ = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype)
    interm_term = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_u_f_K_f_u'], data_specific_background_information['input_K_u_f_K_f_u']).to_dense().data
    result_ = result_ @ interm_term.to(result_.dtype) 
    result_ = result_ @ common_background_information['chol_K_uu_inv_t'].to(result_.dtype) @ common_background_information['m_u']

    if data_specific_background_information['expectation_K_uu'] == None:
        data_specific_background_information['expectation_K_uu'] = interm_term
    return result_
    

def expectation_gamma(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    result_ = common_background_information['var_H'] * common_background_information['var_X']

    if data_specific_background_information['expectation_K_uu'] == None:
        data_specific_background_information['expectation_K_uu'] = KroneckerProductLinearOperator(data_specific_background_information['expectation_latent_K_u_f_K_f_u'], \
                                                                                                  data_specific_background_information['input_K_u_f_K_f_u']).to_dense().data
    
    return result_ + (common_background_information['A'] * data_specific_background_information['expectation_K_uu']).sum()

def integration_predictive_mean(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    return expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information)

def integration_predictive_var(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information):
    return expectation_lambda_square(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information) \
         + expectation_gamma(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information) \
         - expectation_lambda(common_background_information=common_background_information, data_specific_background_information=data_specific_background_information)**2

"""

"\ninput_K_f_u = my_model.covar_module_input(test_input, my_model.variational_strategy.inducing_points_input).to_dense().data\nprint(input_K_f_u.shape)\n\ninput_K_u_f_K_f_u = input_K_f_u.t() @ input_K_f_u\nprint(input_K_u_f_K_f_u.shape)\n\ni = 0 # test data index\n\ndata_specific_background_information = {\n    'm_plus': my_model.X.q_mu.data[i],\n    'Sigma_plus': my_model.X.q_log_sigma.exp().data[i],\n    'input_K_f_u': input_K_f_u,\n    'input_K_u_f_K_f_u': input_K_u_f_K_f_u,\n    'expectation_K_uu': None\n}\n\n'''\ninput_K_f_u: 1 * #input_inducing_points \ninput_K_u_f_K_f_u: #input_inducing_points  *  #input_inducing_points \n'''\ndef multivariate_gaussian_pdf(x, mu, cov):\n    '''cov is a vector, representing all elements in the diagonal matrix'''\n    k = mu.size(0)\n    cov_det = cov.prod()\n    cov_inv = torch.diag(1.0 / cov)\n    norm_factor = torch.sqrt((2 * torch.pi) ** k * cov_det)\n\n    x_mu = x - mu\n    result = torch.exp(-0.5 * x_mu @ cov_inv @ x_mu.t()) / norm_factor\n