# G-ST-GP Model for HSP Prediction

In [1]:
# Options
dataset = 'ven-dis' # Dataset
N_h = 200 # Tr-set size: 50 / 100
noise = 1e-2 # Data noise: 1e-2 / 5e-2 / 1e-1

method = 'lap' # Spatial kernel construction: 'lap' / 'eu' / 'eu-dkl' / 'TIGP'
if method == 'eu-dkl':
  # D_in = X.shape[1] = 3
  D_hidden = 4 # Define hidden layer in eu-dkl
  D_out = 2 # Define output layer in eu-dkl
elif method == 'lap':
  eigen = 256 # Eigen-pairs for constructing laplacian matrix: 32 / 64 / 128 / 256 / 512 / 1024 / 1093 

seed_g = 1 # Random seed group

# fname = '_vis_healthy_ran3_tr50_noi0.02_physics_Col200.mat' # Save res as a '.mat' file named by "fname"

<font color='green'><b><font size="6"> Part 1: Importing Libraries

In [2]:
import autograd.numpy as np
# import pandas as pd
# import matplotlib.pyplot as plt # for plotting
# from sklearn import manifold 
import scipy.io as sio # for loading .mat files
# from mpl_toolkits.mplot3d import Axes3D
import time # for timing
import sys # for printing progress+
# from scipy.optimize import minimize, fmin_l_bfgs_b # for optimizing
# from autograd.numpy.linalg import inv # for matrix inversion
import warnings
warnings.filterwarnings("ignore")

import time # for recording running time

  <font color='green'><b><font size="6"> Part 2: Loading Dataset

- `X` : heart's spatial coordinates (x, y, z) | shape: (#,3)\
- `t`: time steps at each spatial point | shape: (#, 1)
- `hsp`: heart's HSP data at each spatial point and each time step | shape: (#-s, #-t)
- `Tri`: each triangle's note index (Node1-index, Node2-index, Node3-index) | shape: (#, 3)

In [None]:
# --- Load Data ---
print ("Load ven-dis dataset...........")
inc = sio.loadmat(r'./Ven-dis input/heart_coordinates.mat')
X = inc['heart_coordinates'] # Shape: (1094,3)
X_shape = X.shape[0] 

t = sio.loadmat(r'./Ven-dis input/time_diseased.mat')
t = t['time_diseased'].T
t_shape = t.shape[0] # Shape: 1981

hsp = sio.loadmat(r'./Ven-dis input/hsp_diseased.mat')
hsp = hsp['hsp_diseased']
hsp_m = hsp.T # Shape: (t, sp) = (1981, 1094)

mat_data = sio.loadmat('./Ven-dis input/Ven.mat')
Tri = mat_data['Ven'][0, 0][1]
Tri = Tri - 1
Tri = Tri.astype(np.int32) # Shape: (2184, 3)
print("Finishing................")

In [None]:
# ----- Prepare for training and test datasets -----
# Add noise to all datasets
if seed_g == 1:
    np.random.seed(123) # 123; 456; 789; 101; 1234
elif seed_g == 2:
    np.random.seed(456)
elif seed_g == 3:
    np.random.seed(789)
elif seed_g == 4:
    np.random.seed(101)
elif seed_g == 5:
    np.random.seed(1234)

Y_noise = hsp + noise * np.random.randn(hsp.shape[0], hsp.shape[1]) # Y with Gaussian noise

# Training data
np.random.seed(1234)

idx = np.random.choice(X_shape, N_h, replace=False)
idx = np.sort(idx)

# Training dataset
Y_train_noise = Y_noise[idx,:]
print("Y_train_noise_shape:", Y_train_noise.shape)

# Test dataset
mask = np.ones(hsp.shape[0], dtype=bool)
mask[idx] = False
Y_test_noise = Y_noise[mask,:]
print("Y_test_noise_shape:", Y_test_noise.shape)

<font color='green'><b><font size="6"> Part 3: Kernel Calculation

### Notations

| Kernel | Space | Time |
|----------|----------|----------|
| Train-Train(K) | K_sp | K_t |
| Train-Test | K_sp_s.T | K_t_s.T |
| Test-Train(K*) | K_sp_s | K_t_s |
| Test-Test(K**) | K_sp_ss | K_t_ss |

### Temporal Kernel
Matern 3/2 kernel:
$$
\mathcal{K}(\mathbf{t}_i, \mathbf{t}_j) = \sigma_a \left( 1 + \frac{\sqrt{3}\|\mathbf{t}_i - \mathbf{t}_j\|}{l_{t}} \right) \exp\left( - \frac{\sqrt{3}\|\mathbf{t}_i - \mathbf{t}_j\|}{l_{t}} \right)
$$

where $l_{t}$(par[3]) for length scale, $\sigma_a$(par[4]) for kernel scale, $\sigma_b$(par[5]) for noise (just in $\mathcal{K}_\text{tr-tr}$).

In [5]:
# # reload Kernel_Time.py
# import importlib
# import Kernel_Time
# importlib.reload(Kernel_Time)

In [None]:
print("Import temporal kernel functions..................")

from Kernel_Time import cal_t_kers

ker = 1.5 # 1 for Rational Quadratic Kernel ; 0.5 for Matern(1/2); 1.5 for Matern(3/2); 2.5 for Matern(5/2)
func_K_t, func_K_t_s, func_K_t_ss = cal_t_kers(t,ker) # K_t:train-train; K_t_s: test-train; K_t_ss: test-test

### Spatial Kernel

- Method: Laplacian-based

Laplacian matrix $L$ is calculated as:
$$
M(\nabla^2)_i = \frac{1}{2A_i} \sum_{j \in N(i)} \left( \cot a_{ij} + \cot b_{ij} \right)
$$
Where:

- $ \nabla^2 $ is the Laplacian operator's matrix representation at vertex $ i $.
- $ A_i $ represents the area associated with vertex $ i $.
- $ N(i) $ refers to the set of neighboring vertices of vertex $ i $.
- $ a_{ij} $ and $ b_{ij} $ are angles opposite to the edge between vertices $ i $ and $ j $ in a triangular mesh.

[Ref]Sorkine O. 2005 Laplacian Mesh Processing, Eurographics - State of the Art Reports, no. Section 4, pp. 53–70.


Laplacian eigenpairs kernel $\mathcal{K}(\mathbf{x}_i, \mathbf{x}_j)$ is given by:

$$
\mathcal{K} (\mathbf{x}_i, \mathbf{x}_j) = \sum_{k=1}^{M} \sigma_{m}S(\sqrt{\lambda_k}) \phi_k(\mathbf{x}_i) \phi_k(\mathbf{x}_j)
$$

$$
S(w) := \frac{2^D \pi^{D/2} \Gamma(\nu + D/2)(2\nu)^\nu}{\Gamma(\nu) l_{s}^{2\nu}} \left( \frac{2\nu}{l_{s}^2} + 4\pi^2 \omega^2 \right)^{-(\nu + D/2)}
$$

where $l_{s}$(par[0]) for length scale, $\sigma_m$(par[1]) for kernel scale, $ν$ for smoothness, $D$ as dimensionality (D = 2 for a 2D manifold in 3D space) and $Γ$ is the Gamma function.

- Method: Euclidean-based

Matern 3/2 kernel

- Method: Deep kernel learning based on Euclidean kernel

Transform the input spatial coordinates into another space, then apply the spatial kernel function.

In [None]:
if method == 'lap':
  print("Import lap spatial kernel functions..................")

  print("Solve eigenproblems for all data..................")

  from Eigen_Lap import eigen_sol

  Q, V = eigen_sol(X, Tri, num = 256) 
  # Q: eigenvalues, shape (num,)
  # V: eigenvectors, shape (N_verts, num)

  from Kernel_Space_Lap import cal_sp_kers

  func_K_sp, func_K_sp_s, func_K_sp_ss = cal_sp_kers(vertex=idx, Q=Q, V=V, kernel_type='Matern', smoothness=3./2.) # K_sp: train-train; K_sp_s: test-train; K_sp_ss: test-test
elif method == 'eu':
  print("Import euclidean spatial kernel functions..................")
  from Kernel_Space_Eu import calculate_spatial_kernels_euclidean
  func_K_sp, func_K_sp_s, func_K_sp_ss = calculate_spatial_kernels_euclidean(idx, X, Tri)

elif method == 'eu-dkl': # deep kernel learning
  print("Import eu-dkl spatial kernel functions..................")
  from Kernel_Space_Eu_DKL import calculate_spatial_kernels_euclidean
  func_K_sp, func_K_sp_s, func_K_sp_ss = calculate_spatial_kernels_euclidean(idx, X, Tri, D_hidden, D_out,
                                        W1=None, b1=None, W2=None, b2=None, 
                                        seed=42)

<font color='green'><b><font size="6"> Part 4: Parameters Estimation

`par[0]`: `ls`, length-scale for spatial kernel;\
`par[1]`: `sigma_m`, kernel scale;\
`par[2]`: a minor constant to avoid ill-conditions for spatial kernel;\
`par[3]`: `lt`, length-scale for temporal kernel;\
`par[4]`: `sigma_a`, kernel scale (fixed as 1);\
`par[5]`: a minor constant to avoid ill-conditions for temporal kernel.\
`par[6]`: `D`, noise of task.\

### Optimization Function:
* Negative Log-marginal-likelihood:

$$
\mathcal{F}_{\text{NLL}}(\theta) = \frac{1}{2} \mathbf{y_{tr}}^T \mathbf{\Sigma}^{-1} \mathbf{y_{tr}} + \frac{1}{2} \log |\mathbf{\Sigma}| + \frac{n}{2} \log(2\pi) = 0.5 * ( f1 + f2 + f3)
$$

$$
\mathbf{\Sigma} = \mathbf{K_s} \otimes \mathbf{K_t} + D \otimes I_s \otimes I_t
$$

In [8]:
# # reload files
# import importlib
# import Pars_MulStarts
# import Pars_NLL
# import Eigen_Lap
# import Block_Diag_Inv

# importlib.reload(Pars_MulStarts)
# importlib.reload(Pars_NLL)
# importlib.reload(Eigen_Lap)
# importlib.reload(Block_Diag_Inv)

In [None]:
# --- Parameter Estimation: Lap / Eu ---
print("Begin estimating kernels' parameters.................")

if method == 'lap' or method == 'eu':
  from Pars_MulStarts import Pars_Optimizer
  from Err_Cross_Val import cal_val_err

  if method == 'lap':
    bounds = ((0.5, 1.5), (800, 900), (0.0005, 0.0005), (50, 60), (1, 1), (0.01, 0.01), (0.0005, 2e-2))
  elif method == 'eu':
    bounds = ((10, 20), (80, 90), (0.0005, 0.0005), (10, 20), (1, 1), (0.01, 0.01), (0.0005, 2e-2))

  starts = 1
  np.random.seed(1234)
  guesses = np.vstack([np.random.uniform(bound[0], bound[1], size=starts) for bound in bounds]).T

  Optimizer = Pars_Optimizer(
      Y_train_noise, Y_test_noise, 
      func_K_t, func_K_t_s, func_K_t_ss,
      func_K_sp, func_K_sp_s, func_K_sp_ss,
      X, Tri, mask, t
  ) 

  best_pars, best_err = None, float('inf')
  best_opt_elapsed_time = None
  best_iter_count = None
  for i, guess in enumerate(guesses):
      print(f"\nOptimization run {i+1}:")
      print(f"Initial guess {i+1}: {np.array2string(guess, separator=', ')}")
      opt_start_time = time.time()
      pars, iter_count = Optimizer.opt_pars(guess, bounds)
      opt_end_time = time.time()
      opt_elapsed_time = opt_end_time - opt_start_time
      print(f"Optimized parameters: {np.array2string(pars, separator=', ')}")
      print(f"Elapsed time for optimization run {i+1}: {opt_elapsed_time:.2f} seconds")
      # val_err = cal_val_err(pars, Y_train_noise, func_K_t, func_K_t_s, func_K_t_ss, func_K_sp, func_K_sp_s, func_K_sp_ss)
      val_err = 0 # NOTE: for quick testing
      if val_err < best_err:
          best_err, best_pars = val_err, pars
          best_opt_elapsed_time = opt_elapsed_time
          best_iter_count = iter_count

  print("\nBest optimization result:")
  print(f"Best parameters: {np.array2string(best_pars, separator=', ')}")
  print(f"Total optimization time: {best_opt_elapsed_time}")

  pars = best_pars

In [10]:
# --- Parameter Estimation: Eu-dkl ---
if method == 'eu-dkl':
  from DKL.Pars_MulStarts import Pars_Optimizer

  D_in = 3
  num_W1 = D_in * D_hidden
  num_b1 = D_hidden
  num_W2 = D_hidden * D_out
  num_b2 = D_out
  nn_param_dim = num_W1 + num_b1 + num_W2 + num_b2

  bounds = [
          (10, 20),    # ls
          (80, 90),    # sigma_m
          (0.0005, 0.0005), # sigma_n
          (10, 20),    # lt
          (1, 1),      # sigma_a
          (0.01, 0.01),# sigma_b
          (0.0005, 2e-2), # D
      ] + [(-1, 1)]*nn_param_dim

  starts = 1
  np.random.seed(1234)
  guesses = []
  for bound in bounds:
      if bound[0] == bound[1]:
          guesses.append([bound[0]]*starts)
      else:
          guesses.append(np.random.uniform(bound[0], bound[1], size=starts))
  guesses = np.array(guesses).T

  def Opt_K_sp(ls, sigma_m, sigma_n, W1, b1, W2, b2):
      return calculate_spatial_kernels_euclidean(idx, X, Tri, D_hidden, D_out, W1=W1, b1=b1, W2=W2, b2=b2)[0](ls, sigma_m, sigma_n)

  Optimizer = Pars_Optimizer(
      Y_train_noise, Y_test_noise, 
      func_K_t, func_K_t_s, func_K_t_ss,
      Opt_K_sp, func_K_sp_s, func_K_sp_ss,
      X, Tri, mask, t,
      D_in, D_hidden, D_out
  )

  best_pars, best_err = None, float('inf')
  best_opt_elapsed_time = None
  best_iter_count = None
  for i, guess in enumerate(guesses):
      print(f"\nOptimization run {i+1}:")
      print(f"Initial guess {i+1}: {np.array2string(guess, separator=', ')}")
      opt_start_time = time.time()
      pars, iter_count = Optimizer.opt_pars(guess, bounds)
      opt_end_time = time.time()
      opt_elapsed_time = opt_end_time - opt_start_time
      print(f"Optimized parameters: {np.array2string(pars, separator=', ')}")
      print(f"Elapsed time for optimization run {i+1}: {opt_elapsed_time:.2f} seconds")
      val_err = 0 # NOTE: for quick testing
      if val_err < best_err:
          best_err, best_pars = val_err, pars
          best_opt_elapsed_time = opt_elapsed_time
          best_iter_count = iter_count

  print("\nBest optimization result:")
  print(f"Best parameters: {np.array2string(best_pars, separator=', ')}")
  print(f"Total optimization time: {best_opt_elapsed_time}")

  pars = best_pars

<font color='green'><b><font size="6"> Part 5: Posterior Prediction

- GP Prediction:
$$
\mathbf{Y}_* \mid X_*, X, \mathbf{Y} \sim \mathcal{N} \left( K(X_*, X) K(X, X)^{-1} \mathbf{Y}, \, K(X_*, X_*) - K(X_*, X) K(X, X)^{-1} K(X, X_*) \right)
$$
 
where $K = K_f \otimes K_s \otimes K_t + D \otimes I_s \otimes I_t$

- Relative Error:

$RE = \frac{\|y - \hat{y}\|_F}{\|y\|_F}$
   
Where $\|\cdot\|_F$ is the Frobenius norm. $y$ is true values (hsp_true), $\hat{y}$ is predicted values (hsp_pred), $n$ is number of samples.

In [None]:
print("Calculate posterior distribution..................")

if method == "eu" or method == 'lap':
  # import Post_Pred
  # importlib.reload(Post_Pred)
  from Post_Pred import Post_Predictor

  Predictor = Post_Predictor(
      pars=pars,
      func_K_t=func_K_t,
      func_K_t_s=func_K_t_s,
      func_K_t_ss=func_K_t_ss,
      func_K_sp=func_K_sp,
      func_K_sp_s=func_K_sp_s,
      func_K_sp_ss=func_K_sp_ss
  )
  
  # Record prediction time
  post_mean_start_time = time.time()

  hsp_pred = Predictor.post_mean(Y_train_noise)
  print("The shape of hsp_pred:", hsp_pred.shape) # shape: (N_time, N_space)

  post_mean_end_time = time.time()
  post_mean_elapsed_time = post_mean_end_time - post_mean_start_time
  print(f"Elapsed time for Predictor.post_mean: {post_mean_elapsed_time:.2f} seconds")
  
  cov_diag = Predictor.post_var()
  print("The shape of cov_diag", cov_diag.shape)

  # --- Visualization Data ---
  hsp_vis = Y_noise.copy()
  hsp_vis[mask,:] = hsp_pred.T

  # --- Error Analysis ---
  hsp_true = np.array(Y_noise[:, :])
  hsp_pred_full = np.array(hsp_vis[:, :])

  diff = hsp_true - hsp_pred_full
  re_total = np.linalg.norm(diff.reshape(-1)) / np.linalg.norm(hsp_true.reshape(-1))

  print(f"Total RE: {re_total:.6f}")

In [12]:
if method == 'eu-dkl':

  from Post_Pred import Post_Predictor

  # Unpack optimized parameters
  # Neural net params
  offset = 7
  W1_size = D_in * D_hidden
  b1_size = D_hidden
  W2_size = D_hidden * D_out
  b2_size = D_out

  W1 = pars[offset : offset + W1_size].reshape(D_in, D_hidden)
  b1 = pars[offset + W1_size : offset + W1_size + b1_size]
  W2 = pars[offset + W1_size + b1_size : offset + W1_size + b1_size + W2_size].reshape(D_hidden, D_out)
  b2 = pars[offset + W1_size + b1_size + W2_size : offset + W1_size + b1_size + W2_size + b2_size]

  func_K_sp, func_K_sp_s, func_K_sp_ss = calculate_spatial_kernels_euclidean(idx, X, Tri, D_hidden, D_out, 
                                          W1=W1, b1=b1, W2=W2, b2=b2, 
                                          seed=42)


  Predictor = Post_Predictor(
      pars=pars,
      func_K_t=func_K_t,
      func_K_t_s=func_K_t_s,
      func_K_t_ss=func_K_t_ss,
      func_K_sp=func_K_sp,
      func_K_sp_s=func_K_sp_s,
      func_K_sp_ss=func_K_sp_ss
  )

  # Record prediction time
  post_mean_start_time = time.time()

  hsp_pred = Predictor.post_mean(Y_train_noise)
  print("The shape of hsp_pred:", hsp_pred.shape) # shape: (N_time, N_space)

  post_mean_end_time = time.time()
  post_mean_elapsed_time = post_mean_end_time - post_mean_start_time
  print(f"Elapsed time for Predictor.post_mean: {post_mean_elapsed_time:.2f} seconds")

  cov_diag = Predictor.post_var()
  print("The shape of cov_diag", cov_diag.shape)

  # --- Visualization Data ---
  hsp_vis = Y_noise.copy()
  hsp_vis[mask,:] = hsp_pred.T
  # # save hsp_vis as a .mat file
  # scipy.io.savemat(fname, {'hsp': hsp_vis})

  # --- Error Analysis ---
  hsp_true = np.array(Y_noise[:, :])
  hsp_pred_full = np.array(hsp_vis[:, :])

  diff = hsp_true - hsp_pred_full
  re_total = np.linalg.norm(diff.reshape(-1)) / np.linalg.norm(hsp_true.reshape(-1))

  print(f"Total RE: {re_total:.6f}")

In [None]:
# save cov_diag as cov_diag_atr-n200-noi0.01.txt

# np.savetxt("cov_diag_atr-n200-noi0.01.txt", cov_diag)

<font color='green'><b><font size="6"> Part 6: Active Learning

In [22]:
# Options
reps = 30 # Total iterations
k = 3 # Added spatial points in each iteration
re0 = re_total 
alpha = 1e4 # Weights for balancing geo-dist and LOOCV NLPD
type_al = 'com' # Method: 'ran' / 'var' / 'geo-dist' / 'com-fix' / 'com'
alpha1_fix = 1/3 # Options for 'com-fix';
# alpha1_fix = 1/3, gamma = 0.5
# alpha1_fix = 1/2, gamma = 1
# alpha1_fix = 3/5, gamma = 1.5

In [14]:
# # NOTE: quick start for test
# # cov_diag = np.loadtxt("cov_diag_vendis-n50-noi0.01.txt"); re_total = 0.4290
# cov_diag = np.loadtxt("cov_diag_atr-n50-noi0.01.txt"); re_total = 0.507434

In [None]:
import ActiveLearning
# importlib.reload(ActiveLearning)
from ActiveLearning import active_learning


idx_new, final_hsp_pred, final_cov_diag = active_learning(reps, idx, X, Y_noise, t, cov_diag, Q, V, pars, func_K_sp, func_K_sp_s, func_K_sp_ss, func_K_t, func_K_t_s, func_K_t_ss, re0, Y_train_noise, k, Tri, type_al, alpha, alpha1_fix)