In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sys
sys.path.append('../../PI_GP_regressor')

from main_class import PhysicsInformedGP_regressor
from kernels.kernel_wave_3d import gram_Matrix,k_ff, k_fu, k_uf, k_uu

In [14]:
kernel_list = [gram_Matrix, k_uu, k_uf, k_fu, k_ff]
hyperparameters = ["l_space", "sigma", "l_t", "c"]
model = PhysicsInformedGP_regressor(kernel_list, params = hyperparameters,timedependence=True, Dimensions = 3)

n_training_points = 700

model.set_training_data("",n_training_points, noise = [1e-8,1e-8])



In [15]:
model.jitter = 1e-6
def get_initial_values():
    """returns the initial values for the hyperparameters
    for the length scales we initialize them randomly as log(l) ~ U(-1.3,1)
    """
    rng = np.random.default_rng()
    theta_initial = np.zeros((4))
    theta_initial[0] = (rng.uniform(0.1, 1, 1))  #lx
    theta_initial[1] = rng.uniform(0, 1, 1)               #sigma_f
    theta_initial[2] = (rng.uniform(0, 1, 1))  #lt
    theta_initial[3] = rng.uniform(0.5, 1.5, 1)               #c
    return theta_initial
n_restarts = 50
n_threads = 1
opt_params_dict = {'theta_initial': get_initial_values,   #needed for all optimization methods
                    'bounds': ((1e-2, None), (1e-5, None), (1e-3, 2),(1e-2, None)), #needed for TNC and L-BFGS-B
                    'gtol': 1e-6}
model.train("L-BFGS-B",n_restarts, n_threads,opt_params_dict)

100%|██████████| 50/50 [04:28<00:00,  5.38s/it]

      fun: 2220.620361328125
 hess_inv: <4x4 LbfgsInvHessProduct with dtype=float64>
      jac: array([     0.        , 854492.18794977, 903320.31234866,      0.        ])
  message: 'CONVERGENCE: REL_REDUCTION_OF_F_<=_FACTR*EPSMCH'
     nfev: 165
      nit: 2
     njev: 33
   status: 0
  success: True
        x: array([0.99487881, 0.39290015, 0.02980772, 1.45822157])





In [18]:
model.log_marginal_likelohood([0.99487881, 0.39290015, 0.02980772, 1.])

Array(2219.8647, dtype=float32)

In [None]:
model.set_params([0.76,0.05,0.08,1])

In [None]:
import matplotlib.pyplot as plt
def l_t():
     return np.exp(np.random.uniform(-4, 0, 1))
def l_space():
    return np.exp(np.random.uniform(-1.5, 0.3, 1))
list1 = [l_t() for i in range(100)]
list2 = [l_space() for i in range(100)]
plt.figure()
plt.plot(np.sort(list1),"x")
plt.show()
plt.figure()

plt.plot(list2,"x")


In [None]:
sorted_results = sorted(model.results_list, key=lambda x: x.fun)
#extract the .x parts of the results
sorted_result_theta = np.array([x.x for x in sorted_results])
#extract the .fun parts of the results
sorted_result_fun = [x.fun for x in sorted_results]
#plt
print(sorted_result_theta.shape)
plt.scatter(sorted_result_theta[:70,3],sorted_result_fun[:70])




In [None]:
#[0.56290215, 0.0145229 , 0.10911952, 1.15137777]     351.3336181640625

In [19]:
grid_number = 101
x = np.linspace(0,2,grid_number).reshape(-1,1)
y = np.linspace(0,2,grid_number).reshape(-1,1)
t = np.linspace(0,4,grid_number).reshape(-1,1)
gx, gy = np.meshgrid(x,y)

X_2d = np.c_[gx.ravel(), gy.ravel()]

time_points = np.linspace(0,4,81)


In [20]:
mean = []
var = []
for i in range(len(time_points)):
    X_3d = np.c_[gx.ravel(), gy.ravel(), time_points[i]*np.ones(len(gx.ravel()))]
    mean_i, var_i = model.predict_u(X_3d)
    mean.append(mean_i)
    var.append(var_i)

In [None]:
#calculate the mean in parallel
from joblib import Parallel, delayed
def parallel_mean(i):
    X_3d = np.c_[gx.ravel(), gy.ravel(), time_points[i]*np.ones(len(gx.ravel()))]
    mean_i, var_i = model.predict_u(X_3d)
    return mean_i
mean = Parallel(n_jobs=4)(delayed(parallel_mean)(i) for i in range(len(time_points)))

In [None]:
from matplotlib.animation import FuncAnimation
mean = np.array(mean)
mean = mean.reshape(len(time_points),grid_number,grid_number)
u_matrix = model.raw_data[2]
print(mean.shape)
fig, ax  = plt.subplots(1,2)
fig.tight_layout()
def frame(i):
    fig.suptitle('t = ' + str(time_points[i]), fontsize=16)
    ax[0].clear()
    im = ax[0].imshow(mean[i,:,:].T, cmap='viridis', interpolation='nearest',vmin = -0.1, vmax = 0.1, extent=[0,2,0,2])
    ax[0].set_title("mean")
    ax[1].clear()
    im = ax[1].imshow(u_matrix[:,:,i], cmap='viridis', interpolation='nearest',vmin = -0.1, vmax = 0.1, extent=[0,2,0,2])
    ax[1].set_title("ground truth")
ani = FuncAnimation(fig, frame, frames=len(time_points), repeat=False)

ani.save('test3.mp4', writer='ffmpeg', fps=10)
plt.close()
from IPython.display import HTML

# Replace 'video.mp4' with the name of your MP4 file
video_path = 'video.mp4'

# Embed the video in the notebook
HTML(f'<video width="640" height="480" controls><source src="{"test3.mp4"}" type="video/mp4"></video>')

In [13]:
fig, ax  = plt.subplots(1,2,subplot_kw={"projection": "3d"})
fig.tight_layout()
def frame(i):
    fig.suptitle('t = ' + str(time_points[i]), fontsize=16)
    ax[0].clear()
    im = ax[0].plot_surface(gx,gy,mean[i,:,:].T, cmap='viridis')
    ax[0].set_title("mean")
    ax[0].set_zlim(-0.5,0.5)
    ax[1].clear()
    im = ax[1].plot_surface(gx,gy,u_matrix[:,:,i], cmap='viridis')
    ax[1].set_title("ground truth")
    ax[1].set_zlim(-0.5,0.5)
ani = FuncAnimation(fig, frame, frames=len(time_points), repeat=False)

ani.save('3dplot.gif', writer='ffmepg', fps=10)
plt.close()

MovieWriter ffmepg unavailable; using Pillow instead.


In [None]:
fig, ax = plt.subplots(2,10,figsize=(40,10))
ax = ax.flatten()
#split the ax
ax_1 = ax[:10]
ax_2 = ax[10:]
u_matrix = model.raw_data[2]
print(u_matrix.shape)
for i,t in zip(range(10),np.linspace(0, 4, len(u_matrix[0,0,:]))):
    
    im = ax_2[i].imshow(u_matrix[:,:,i*5].T, cmap='viridis',extent=[0,2,0,2],vmin = -0.1, vmax = 0.1)
    #plot the train points. we need to chekck for every t value and then plot the x,y coordinates
    #ax[i].scatter(x_train_u[t_train_u==i*5], y_train_u[t_train_u==i*5], color='red', marker='x', label='training points')
    ax_1[i].imshow(mean[i*5,:,:], cmap='viridis',extent=[0,2,0,2],vmin = -0.1, vmax = 0.1)

    #ax_1[i].set_xlabel('x')
    #ax_1[i].set_ylabel('y')
    ax_1[i].set_title('t = ' + str(np.round(time_points[i*4],4)), fontsize=20)
    #tight layout
    fig.tight_layout()



In [None]:
fig, ax = plt.subplots(2,10,figsize=(40,10))
ax = ax.flatten()
#split the ax
ax_1 = ax[:10]
ax_2 = ax[10:]
u_matrix = model.raw_data[2]
print(u_matrix.shape)
for i,t in zip(range(10),np.linspace(0, 4, len(u_matrix[0,0,:]))):
    
    im = ax_2[i].imshow(u_matrix[:,:,i*5].T, cmap='viridis',extent=[0,2,0,2],vmin = -1, vmax = 1.5)
    #plot the train points. we need to chekck for every t value and then plot the x,y coordinates
    #ax[i].scatter(x_train_u[t_train_u==i*5], y_train_u[t_train_u==i*5], color='red', marker='x', label='training points')
    ax_1[i].imshow(np.abs(mean[i*5,:,:]-u_matrix.T[i*5,:,:]), cmap='viridis',extent=[0,2,0,2],vmin = 0, vmax = 1)

    #ax_1[i].set_xlabel('x')
    #ax_1[i].set_ylabel('y')
    ax_1[i].set_title('t = ' + str(np.round(time_points[i*4],4)), fontsize=20)
    #tight layout
    fig.tight_layout()

In [None]:
X = model.X
import jax.numpy as jnp
def marg_log_likelihood(X, targets, params):
    K = k_uu(X, X, params) + 1e-6*np.eye(len(X))
    L = jnp.linalg.cholesky(K)
    alpha = jnp.linalg.solve(L.T, np.linalg.solve(L, targets))
    
    marg_log_likelihood = -1/2 * jnp.dot(targets.T,alpha) - np.sum(np.log(np.diagonal(L))) - len(X)/2 * np.log(2*np.pi)
    return marg_log_likelihood

from scipy.optimize import minimize
def minimize_log(X,targets):
    targets = targets.ravel()
    def marg_log_likelihood(params):

        K = k_uu(X, X, params)
        L = np.linalg.cholesky(K + 1e-6*np.eye(len(X)))
        alpha = np.linalg.solve(L.T, np.linalg.solve(L, targets))
    
        marg_log_likelihood = 1/2 * np.dot(targets.T,alpha) + np.trace(np.log(L)) + len(X)/2 * np.log(2*np.pi)
        return marg_log_likelihood
    return marg_log_likelihood
theta = minimize(minimize_log(X,model.u_train), x0 = [0.1, 0.1, 0.1, 0.1], 
               bounds=((1e-5, None), (1e-5, None),(1e-5, None),(1e-5, None)),
               method='L-BFGS-B')
print(theta.x)
print(theta.fun)

def posterior_distribution(X, targets,x_test, params ):
    K = k_uu(X, X, params) 
    K_s = k_uu(X, x_test,params)
    K_ss = k_uu(x_test, x_test,params)

    L = jnp.linalg.cholesky(K + 1e-6*np.eye(len(X)))
    alpha = jnp.linalg.solve(L.T, jnp.linalg.solve(L, targets))

    f_star = jnp.dot(K_s.T, alpha)
    v = jnp.linalg.solve(L, K_s)
    var_f_star = K_ss - jnp.dot(v.T, v)
    
    return f_star, var_f_star

x = np.linspace(0,2,20).reshape(-1,1)
y = np.linspace(0,2,20).reshape(-1,1)
t = np.linspace(0,4,20).reshape(-1,1)

gx, gy, gt = np.meshgrid(x,y,t)

X_3d = np.c_[gx.ravel(), gy.ravel(), gt.ravel()]
x_test = np.c_[x.ravel(), y.ravel(), t.ravel()]
print(X_3d.shape)

f_star, var_f_star = posterior_distribution(X, model.u_train, X_3d, theta.x)
f_star.shape

f_star = f_star.reshape(20,20,20)
#var_f_star = var_f_star.reshape(20,20,20)



In [None]:
from matplotlib.animation import FuncAnimation
fig, ax  = plt.subplots()
def frame(t):
    ax.clear()
    im = ax.imshow(f_star[:,:,t], cmap='viridis', interpolation='nearest',vmin = -5, vmax = 5)

ani = FuncAnimation(fig, frame, frames=30, repeat=False)

ani.save('test.mp4', writer='ffmpeg', fps=10)

In [None]:
from skopt import gp_minimize, forest_minimize
from skopt.space import Real
model.jitter = 1e-5
def gaussian_optimization(dictionary: dict):
    def objective_function(params):
        l_x, sigma_f_sq, l_t, alpha = params
        return model.log_marginal_likelohood(params).item()
    
    result = gp_minimize(objective_function,  
                        **dictionary)
    print(result.x, result.fun)
    return result

ranges = [Real(0, 0.5, name='l_x', prior="uniform"),
             Real(0, 1, name='sigma_f_sq', prior="uniform"),
             Real(0, 0.5, name='l_t', prior="uniform"),
             Real(0, 4, name='alpha', prior="uniform")]
bays_opt_dictionary = {'dimensions': ranges,
                        'n_calls': 30,
                        'n_initial_points': 20,
                        'random_state':2,
                        'verbose': True,
                        'noise': 1e-7                        
                        }
result = gaussian_optimization(bays_opt_dictionary)


In [None]:

from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

x,y,t = model.X[:,0], model.X[:,1], model.X[:,2]

ax.scatter(x, y, t)

ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('t')
plt.savefig("training_points.png",bbox_inches='tight')
plt.show()
