# Hands-on : PINN for Lotka-Volterra equations

#### First part : imports

In [1]:
#Imports
import torch
import random
import os
from numpy import genfromtxt
from tools import random_ranges
import numpy as np
from lib.pinn_auxloss_f import Pinn
import matplotlib.pyplot as plt
from scipy.integrate import odeint
import jax.numpy as jnp
import pandas as pd
import random as rd
from torch import nn

#Seed specification
random.seed(42)
torch.manual_seed(42)

ModuleNotFoundError: No module named 'torch'

In [4]:
#Generating a dataset
def acetate_overflow_model(
    t,
    alp=0.8,
    bet=0.4,
    delt=0.2,
    gam=0.6,
):
    def func(y, t):

        X,Y = [y[i] for i in range(len(y))]

        dXdt = X*(alp - bet*Y)

        dYdt = Y*(delt*X - gam)

        return np.array([dXdt, dYdt])

    y0 = [5., 3.] #Conditions initiales
    return odeint(func, y0, t)

t = np.linspace(0,10,30)
y = acetate_overflow_model(np.ravel(t))

##Adding noise in the dataset
def add_random_noise(array: np.ndarray, seed: int = None, noise_scale: float = 0.5) -> np.ndarray:
    if seed is not None:
        np.random.seed(seed)
    noise = np.random.rand(*array.shape)  # random floats in [0, 1)
    signs = np.random.choice([-1, 1], size=array.shape)  # random +/- 1
    return array + signs * noise * noise_scale
data=add_random_noise(y)

#Creating the auxiliary data – for T_init and T_f
data_aux=[torch.tensor([5.,2.]),
              torch.tensor(y[-1])]

In [15]:
# Defining the dictionaries

## Dictionary with the true value of parameters
ode_parameters_dict = {"alp":0.8,
                       "bet":0.4,
                       "delt":0.2,
                       "gam":0.6
                       }
## Dictionary with the parameter ranges
ode_parameter_ranges_dict = {"alp":(0.,1.),
                             "bet":(0.,1.),
                             "delt":(0.,1.),
                             "gam":(0.,1.)
                            }
## Dictionary with the std of variables
std_per_variable = np.std(data, axis=0)
variable_standard_deviations_dict = {"X":std_per_variable[0],
                                     "Y":std_per_variable[1]
                                    }

##Dictionary of residuals
ODE_residuals = {"ode_1" : 
                 lambda var_dict,d_dt_var_dict,value,min_var_dict,max_var_dict : 
                     d_dt_var_dict["X"] - var_dict["X"]*(value["alp"] - value["bet"]*var_dict["Y"]),
                 "ode_2" : 
                 lambda var_dict,d_dt_var_dict,value,min_var_dict,max_var_dict : 
                     d_dt_var_dict["Y"] - var_dict["Y"]*(value["delt"]*var_dict["X"] - value["gam"])
                }

#### Defining the inputs of the PINN

In [None]:
##Names of the variables associated with data
observables = ["X","Y"] 

##Dictionaries for the data and no-data variables (and associated data)
variable_data = {"X": data[:,0], "Y": data[:,1]} 
variable_no_data  = {}

##Time points specification
data_t = t

In [16]:
#List of the names of unknown parameters
parameter_names = ["alp",
                   "bet",
                   "delt",
                   "gam"]

#Specifying the ranges from the dictionary
ranges = random_ranges([ode_parameters_dict[key] for key in parameter_names],scale=20)
for i,name in enumerate(parameter_names):
    if name in ode_parameter_ranges_dict:
        ranges[i]= ode_parameter_ranges_dict[name]
        
#Specifying the constants, i.e. the true values of parameters
constants_dict = ode_parameters_dict

NameError: name 'random_ranges' is not defined

In [17]:
# Training parameters
epoch_number = 150000

# Optimizer parameters
optimizer_type = "Adam"
optimizer_hyperparameters = {"lr":1e-4, "betas":(0.9, 0.8)}

# Scheduler parameters
scheduler_hyperparameters = {"base_lr":1e-4,
                             "max_lr":1e-4,
                             "step_size_up":100,
                             "scale_mode":"exp_range",
                             "gamma":0.999,
                             "cycle_momentum":False}

#Specifying the guiding weighting of the PINN (i.e. the weights that will multiply the balancing method output)
residual_weights=[1,1]

# Loss balancing method
multiple_loss_method = "prior_losses"

#### Creating and training the PINN model

In [None]:
#Creating PINN
pinn_cell = Pinn(ode_residual_dict=ODE_residuals,
                 ranges=ranges,
                 data_t=data_t,
                 variables_data=variable_data,
                 variables_no_data=variable_no_data,
                 data_aux=data_aux_1mM,
                 parameter_names=parameter_names,
                 optimizer_type=optimizer_type,
                 optimizer_hyperparameters=optimizer_hyperparameters,
                 scheduler_hyperparameters=scheduler_hyperparameters,
                 constants_dict=constants_dict,
                 
                 #Loss balancing specification
                 multi_loss_method=multiple_loss_method,
                 residual_weights=residual_weights,
                 variable_fit_weights=None,
                 auxiliary_fit_weights=None,
                 
                 #SoftAdapt parameters
                 soft_adapt_beta=0.1,
                 soft_adapt_t=1,
                 soft_adapt_normalize=True,
                 soft_adapt_by_type=True,
                 soft_adapt_eps=10E-8,
                 soft_adapt_warming=-1,
                 
                 #Increments parametrisation
                 incr_residual_weight=20000,
                 increment=1E2,
                 
                 #Prior_losses parameters
                 prior_losses_t=100,
                 
                 #Wang parameters
                 wang_residual = True,
                 wang_t=1,
                 wang_alpha=0.9,
                 wang_epsilon=1E-8,
                 wang_warming=-1,
                 
                 #NN parameters
                 net_hidden=7,
                 activation_function=nn.Softplus(),
                 optuna=False)

In [None]:
# Training
r2_score, pred_variables, losses, variable_fit_losses, residual_losses, aux_losses, all_learned_parameters, learning_rates = pinn_cell.train(epoch_number) #epoch_number
X,Y = pred_variables

#### First outputs : loss, hyperparameters, ...

In [None]:
# Print and Plot learning rate
plt.figure(figsize=(5,3))
plt.plot(learning_rates[0:], color = 'teal',linewidth=4)
plt.grid(True)
plt.xlabel('Epochs',fontsize=15)
plt.ylabel('Learning rate',fontsize=15)

plt.tight_layout()

#To save the figure
fig_name = 'learning_rate'
plt.savefig(fig_name+'.png', format='png')

plt.show()

In [18]:
## Print and Plot Losses
print("Loss: ","%.5g" % losses[-1])
fig, axs = plt.subplots(1, 4, figsize=(15, 6))

axs[0].plot(losses[0:], color = 'teal',linewidth=4)
axs[0].grid(True)
axs[0].set_xlabel('Epochs',fontsize=15)
axs[0].set_ylabel('Loss',fontsize=15)
axs[0].set_xscale('log')
axs[0].set_yscale('log')

axs[1].plot(variable_fit_losses[0:], color = 'teal',linewidth=4)
axs[1].grid(True)
axs[1].set_xlabel('Epochs',fontsize=15)
axs[1].set_ylabel('Variable fit loss',fontsize=15)
axs[1].set_xscale('log')
axs[1].set_yscale('log')

axs[2].plot(residual_losses[0:], color = 'teal',linewidth=4)
axs[2].grid(True)
axs[2].set_xlabel('Epochs',fontsize=15)
axs[2].set_ylabel('Residual loss',fontsize=15)
axs[2].set_xscale('log')
axs[2].set_yscale('log')


axs[3].plot(aux_losses[0:], color = 'teal',linewidth=4)
axs[3].grid(True)
axs[3].set_xlabel('Epochs',fontsize=15)
axs[3].set_ylabel('Auxiliary loss',fontsize=15)
axs[3].set_xscale('log')
axs[3].set_yscale('log')

plt.tight_layout()

#To save the figure
fig_name = 'loss_short'
plt.savefig(fig_name+'.png', format='png')

plt.show()

NameError: name 'losses' is not defined

#### Outputs : predictions and performances

In [None]:
## Print and Plot R2 during optimisation 
print("r2 :","%.5g" % r2_score[-1])

plt.figure(figsize=(5,3))
plt.plot(r2_score, color = 'black',linewidth=4)
plt.grid(True)
plt.xlabel('Epochs',fontsize=15)
plt.ylabel('Params error',fontsize=15)
plt.xscale('log')
plt.yscale('log')


plt.tight_layout()

#To save the figure
fig_name = 'error_short'
plt.savefig(fig_name+'.png', format='png')

plt.show()

In [None]:
## Comparing parameters
learned_parameters=[pinn_cell.output_param_range(v,i).item() for (i,(k,v)) in enumerate(pinn_cell.ode_parameters.items())]
true_parameters=[ode_parameters_dict[key] for key in parameter_names]

plt.grid('true')
plt.plot([-1, 3], [-1, 3],color='black')
plt.scatter(true_parameters,learned_parameters)
#plt.xscale('log')
#plt.yscale('log')

cmap = plt.cm.get_cmap('viridis', len(ranges))  # Get a colormap with as many colors as there are ranges

# Map each range index to a color from the colormap
colors = [cmap(i) for i in range(len(ranges))]


for i, (true_val, learned_val) in enumerate(zip(true_parameters, learned_parameters)):
    plt.scatter(true_val, learned_val, s=70, color=colors[i], label=f'Range {i}' if i == 0 else "",zorder=3)

    # Also color the corresponding vertical line
    plt.vlines(x=true_val, ymin=ranges[i][0], ymax=ranges[i][1], colors=colors[i],zorder=2,linewidth=3)


min_value = min(r[0] for r in ranges)
max_value = max(r[1] for r in ranges)
plt.ylim([-1,3])
plt.xlim([-1,3])

plt.xlabel('True parameters',fontsize=20)
plt.ylabel('Learned parameters',fontsize=20)

plt.tight_layout()

#To save the figure
fig_name = 'params_short'
plt.savefig(fig_name+'.png', format='png')

plt.show()

In [None]:
## Percentage error on parameters
err=np.array([(abs(true_parameters[i]-learned_parameters[i])/true_parameters[i])*100 for i in range(len(true_parameters))])
print("percentage error", np.mean(err),"\n")

## AIC
AIC = 2*(20+7*20+20)+2*(losses[-1]) #Formula to adjust on the number of parameters
print("AIC",AIC,"\n")

## R2_scores
from sklearn.metrics import r2_score
R2_scores_train_data=[r2_score(data[:,0],X.detach().numpy()),r2_score(data[:,1],Y.detach().numpy())]
print("R2_scores_train_data",R2_scores_train_data,"\n")

R2_scores_true_data=[r2_score(y[:,0],X.detach().numpy()),r2_score(y[:,1],Y.detach().numpy())]
print("R2_scores_gene_data",R2_scores_gene_data,"\n")

In [19]:
## Plot the predicted variables
fig, axs = plt.subplots(1, 2, figsize=(15, 6))

axs[0].plot(t, y[:,0], linestyle='dashed', label='X_true', color='b')
axs[0].plot(data_t, X.detach().numpy(), label='X_pred', color='b')
axs[0].plot(data_t, data[:,0], 'o', label='X_data', color='b')
axs[0].set_title('X')
axs[0].set_xlabel('t(h)')
axs[0].set_ylabel('X')
axs[0].legend()
axs[0].grid(True)

axs[1].plot(t, y[:,1], linestyle='dashed', label='Y_true', color='g')
axs[1].plot(data_t,Y.detach().numpy(), label='Y_pred', color='g')
axs[1].plot(data_t, data[:,1], 'o', label='Y_data', color='g')
axs[1].set_title('Y')
axs[1].set_xlabel('t (h)')
axs[1].set_ylabel('Y')
axs[1].legend()
axs[1].grid(True)

plt.tight_layout()

fig_name = 'prediction_plot'
plt.savefig(fig_name+'.png', format='png')

plt.show()

NameError: name 'plt' is not defined

In [None]:
## Integrate with guessed parameters

learned = ode_parameters_dict.copy()
learned = learned | dict(zip(parameter_names, learned_parameters))
alp, bet, delt, gam = learned.values()
gene_data=acetate_overflow_model(np.ravel(t), alp, bet, delt, gam)
X_learned,Y_learned= gene_data[:,0],gene_data[:,1]

from tools import ssr_error

# Plot the solved variables
fig, axs = plt.subplots(1, 2, figsize=(15, 6))

axs[0].plot(t, y[:,0], linestyle='dashed', label='X_true', color='b')
axs[0].plot(t, X_learned, label='X_learned', color='b')
axs[0].plot(data_t, data[:,0], 'o', label='X_data', color='b')
axs[0].set_title('X')
axs[0].set_xlabel('t (h)')
axs[0].set_ylabel('X')
axs[0].legend()
axs[0].grid(True)

axs[1].plot(t, y[:,1], linestyle='dashed', label='Y_true', color='g')
axs[1].plot(t, Y_learned, label='Y_learned', color='g')
axs[1].plot(data_t, data[:,1], 'o', label='Y_data', color='g')
axs[1].set_title('Y')
axs[1].set_xlabel('t (h)')
axs[1].set_ylabel('Y')
axs[1].legend()
axs[1].grid(True)

plt.tight_layout()

#To save the figure
fig_name = 'predicted_variables'
plt.savefig(fig_name+'.png', format='png')

plt.show()

In [None]:
## Values of parameters
variable_res = {"X":X_learned,
                "Y":Y_learned}

error = ssr_error(standard_deviations_dict=variable_standard_deviations_dict, observables=observables, variable_data=variable_data, variable_res=variable_res)
print("Sum of squared residuals error: " + str(error)+" \n")

## R2 scores
R2_scores_vs_data=[r2_score(y[:,0],X_learned),r2_score(y[:,1],Y_learned)]
print("R2_scores_vs_data",R2_scores_vs_data,"\n")

R2_scores_gene_vs_learned=[r2_score(data[:,0],X_learned),r2_score(data[:,1],Y_learned)]
print("R2_scores_gene_vs_learned",R2_scores_gene_vs_learned,"\n")

In [None]:
# Print and compare parameters
from tools import param_error_percentages #lib.

print("Learned parameters :")
for i in range(len(learned_parameters)):
    print(parameter_names[i], ":", learned_parameters[i])

print("\nTrue parameters :")
for i in range(len(learned_parameters)):
    print(parameter_names[i], ":", true_parameters[i])

print("\nParameters errors :")
for i in range(len(learned_parameters)):
    print(parameter_names[i], ":", param_error_percentages(true_parameters,learned_parameters)[i])