# Training Notebook for the Duffing Equation

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# imports
import torch
import matplotlib.pyplot as plt
import numpy as np
import sys
import json
import os
from scipy.integrate import solve_ivp
from tqdm.auto import trange

# Add parent directory to sys.path
from pathlib import Path
current_path = Path.cwd()
parent_dir = current_path.parent.parent
sys.path.append(str(parent_dir))

# Import necessary modules
from src.train import run_model, run_model_non_linear
from src.utils_plot import plot_loss_and_all_solution, plot_head_loss, plot_loss_and_single_solution
from src.load_save import save_model

torch.autograd.set_detect_anomaly(False)
torch.autograd.profiler.profile(False)
torch.autograd.profiler.emit_nvtx(False)

In [None]:
def check_versions_and_device():
  # set the device to the GPU if it is available, otherwise use the CPU
  current_dev = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  !nvidia-smi
  return current_dev

# set a global device variable to use in code
dev = check_versions_and_device()
print(dev)

##  1) Multi Head Training (linear form)

### Select equation parameter

-   number of head
-   stiffness parameter $\alpha$
-   force function $f$
-   initiales condition $IC$

In [None]:
num_heads = 12
equation_name = "Duffing"

np.random.seed(42)
# generate the training stiffness parameter alpha
alpha_bounds = (5, 20)
def generate_alpha(k, alpha_bounds = alpha_bounds):
  alpha_list = np.zeros(k)
  for i in range(k):
    alpha_list[i] = np.random.uniform(alpha_bounds[0], alpha_bounds[1])
  return alpha_list
alpha_list = generate_alpha(num_heads)
def get_A(alpha):
    return torch.tensor([[0., -1.], [0.1, alpha]], device=dev).double() 
A_list = [get_A(i)for i in alpha_list]

IC_list = [torch.tensor([[1.], [0.5]], device=dev).double() for _ in range(num_heads)]
# uncomment the above line to use random IC on all head
def random_IC(x_bound=[1, 3], y_bound=[0, 2]):
    ICx = np.random.uniform(x_bound[0], x_bound[1], 1)
    ICy = np.random.uniform(y_bound[0], y_bound[1], 1)
    return torch.tensor([ICx, ICy], device=dev)
#IC_list = [random_IC() for i in range(num_heads)]

force_list = [
    lambda t: torch.cat([torch.zeros(len(t), device=dev).unsqueeze(1), torch.cos(t).unsqueeze(1)], dim=1).double()
    if not isinstance(t, (float, int))
    else np.array([0, np.cos(t)]).T
    for _ in range(num_heads)
]

### Select training parameter

-   range of training $x_{range}$
-   activation function
-   number of hidden layer
-   number of equation
-   number of iterations
-   learning rate $lr$
-   sample size during epoch
-   gradient decay

In [None]:
x_range = [0, 10]
activation = "silu"
hid_lay = list(np.array([128, 128, 256, 512]))
hid_lay = list(np.array([128, 128, 132]))
num_equations = 2
iterations = 100
sample_size = 200
lr = 1e-4
decay = True

### Train the multi head model

In [None]:
verbose = True
# run model which has two non-coupled equations
loss_hist, trained_model, model_time = run_model(iterations=iterations, x_range=x_range, lr=lr,
                                                     A_list=A_list, IC_list=IC_list, force=force_list,
                                                     hid_lay=hid_lay, activation=activation,
                                                     num_equations=num_equations, num_heads=num_heads,
                                                     sample_size = sample_size, decay=decay, dev=dev,
                                                     verbose=verbose, true_functs=None, save=False)

# date tag to save
from datetime import datetime
now = datetime.now()
# Format the date and time as a string in the format 'mmddhhmm'
formatted_datetime = now.strftime('%m%d%H%M')
# Convert the formatted string to an integer
formatted_datetime_int = int(formatted_datetime)

### Plot training outcome

In [None]:
# function to numerically compute the solution to any set of two coupled, linear first-order ODES
def double_coupled_equation(t, y, A, force):
    return np.array([force(t)[0] - A[0][1] * y[1] - A[0][0] * y[0],
                     force(t)[1] - A[1][0] * y[0] - A[1][1] * y[1]])

numerical_sol_fct = lambda x, v, A, force: (solve_ivp(double_coupled_equation, [x_range[0], x_range[1]],
                                                    v.squeeze(), args=(A, force), t_eval=x.squeeze(), method="Radau").y)

plot_loss_and_all_solution(x_range=x_range, true_functs=numerical_sol_fct,
                           trained_model=trained_model, IC_list=IC_list, A_list=A_list,
                           force=force_list, train_losses=loss_hist, device=dev)
plot_head_loss(loss_hist["head"], alpha_list)

### Save model and training history

In [None]:
model_name = "base_model_2"
save_model(trained_model, formatted_datetime_int, equation_name, model_name,
           x_range, iterations, hid_lay, num_equations, num_heads, A_list,
           IC_list, force_list, alpha_list, loss_hist)

## 2) Multi Head Training (nonlinear form)

### Select equation parameter

-   number of head
-   stiffness parameter $\alpha$
-   initiales condition $IC$
-   $\beta$ non linear parameter
-   nonlinear equation

In [None]:
num_heads = 4
equation_name = "Duffing"

np.random.seed(42)
# generate the training stiffness parameter alpha
alpha_bounds = (1, 10)
def generate_alpha(k, alpha_bounds = alpha_bounds):
  alpha_list = np.zeros(k)
  for i in range(k):
    alpha_list[i] = np.random.uniform(alpha_bounds[0], alpha_bounds[1])
  return alpha_list
alpha_list = generate_alpha(num_heads)

beta = 0.5

def equation(t, y, alpha, beta=beta):
    if isinstance(y, torch.Tensor):
      yp = torch.zeros_like(y)
      force = torch.cos(t)
    elif isinstance(y, np.ndarray):
      yp = np.zeros_like(y)
      force = np.cos(t)
    yp[..., 0] = y[..., 1]
    yp[..., 1] = -0.1*y[..., 0] - alpha*y[..., 1] - beta*y[..., 0]**3 + force
    return yp
equation_list = [lambda t, y, Alpha=alpha: equation(t, y, Alpha) for alpha in alpha_list]


IC_list = [torch.tensor([[1.], [0.5]], device=dev).double() for _ in range(num_heads)]
# uncomment the above line to use random IC on all head
def random_IC(x_bound=[1.5, 2.5], y_bound=[0, 0]):
    ICx = np.random.uniform(x_bound[0], x_bound[1], 1)
    ICy = np.random.uniform(y_bound[0], y_bound[1], 1)
    return torch.tensor([ICx, ICy], device=dev)
#IC_list = [random_IC() for i in range(num_heads)]

### Select training parameter

-   range of training $x_{range}$
-   activation function
-   number of hidden layer
-   number of equation
-   number of iterations
-   learning rate $lr$
-   sample size during epoch
-   gradient decay

In [None]:
x_range = [0, 10]
activation = "silu"
hid_lay = list(np.array([128, 128, 256, 512]))
reparametrization = True
num_equations = 2
iterations = 100
sample_size = 200
lr = 1e-4
decay = True

### Train the multi head model

In [None]:
verbose = True
# run model which has two non-coupled equations
loss_hist, trained_model, model_time = run_model_non_linear(iterations=iterations, x_range=x_range, lr=lr,
                                                            equation_list=equation_list, IC_list=IC_list,
                                                            hid_lay=hid_lay, activation=activation,
                                                            num_equations=num_equations, num_heads=num_heads,
                                                            sample_size = sample_size, decay=decay, dev=dev, verbose=verbose,
                                                            true_functs=None, reparametrization=reparametrization)

# date tag to save
from datetime import datetime
now = datetime.now()
# Format the date and time as a string in the format 'mmddhhmm'
formatted_datetime = now.strftime('%m%d%H%M')
# Convert the formatted string to an integer
formatted_datetime_int = int(formatted_datetime)

### Plot training outcome

In [None]:
numerical_sol_fct = lambda x, v, alpha, beta: (solve_ivp(equation, [x_range[0], x_range[1]],
                                                    v.squeeze(), args=(alpha, beta), t_eval=x.squeeze(), method="Radau").y.T)
numerical_sol_list = [lambda x, IC=ic.detach().cpu().numpy(), Alpha=alpha, beta=beta: numerical_sol_fct(x, IC, Alpha, beta) for ic, alpha in zip(IC_list, alpha_list)]

plot_loss_and_all_solution(x_range=x_range, true_functs=numerical_sol_list,
                           trained_model=trained_model, IC_list=IC_list,
                           A_list=None, force=None, train_losses=loss_hist,
                           device=dev, equation_list=equation_list, reparametrization=reparametrization)
plot_head_loss(loss_hist["head"], alpha_list)

### Save model and training history

In [None]:
model_name = "nonlinear_repara"
A_list=[torch.tensor([np.nan])]
force_list = [torch.tensor([np.nan])]
save_model(trained_model, formatted_datetime_int, equation_name, model_name,
           x_range, iterations, hid_lay, num_equations, num_heads, A_list,
           IC_list, force_list, alpha_list, loss_hist)

## 3) Single Head Training (nonlinear form)

### Select equation parameter

-   stiffness parameter $\alpha$
-   initiales condition $IC$
-   $\beta$ non linear parameter
-   nonlinear equation

In [None]:
num_heads = 1
alpha_list = [2, 20, 30, 40]
beta = 0.5
equation_list = [lambda t, y, Alpha=alpha: equation(t, y, Alpha) for alpha in alpha_list]
IC_list = [torch.tensor([[1.], [0.5]], device=dev).double() for _ in alpha_list]

### Select training parameter

-   number of iterations
-   learning rate $lr$

In [None]:
lr_list = [0.0001, 0.00008, 0.00004, 0.00001]
iterations_list = [20000, 30000, 40000, 50000]
iterations_list = [5000, 20, 30, 40]

### Train single head model

In [None]:
solution_PINNS = []
rng = np.random.default_rng()
t_eval = torch.arange(x_range[0], x_range[1], 0.001, requires_grad=True, device=dev).double()
t_eval = t_eval[np.concatenate(([0], rng.choice(range(1, len(t_eval)), size=512 - 1, replace=False)))]
t_eval = t_eval.reshape(-1, 1)
t_eval, _ = t_eval.sort(dim=0)

for i in trange(len(alpha_list)):
    if i==0:
        loss_history, trained_model, _ = run_model_non_linear(iterations=iterations_list[i], x_range=x_range, lr=lr_list[i],
                                                               equation_list=[equation_list[i]], IC_list=[IC_list[i]],
                                                               hid_lay=hid_lay, activation=activation,
                                                               num_equations=num_equations, num_heads=num_heads,
                                                               sample_size = sample_size, decay=decay, dev=dev, verbose=True,
                                                               true_functs=None, reparametrization=reparametrization)
    else:
        _, _, _ = run_model_non_linear(iterations=iterations_list[i], x_range=x_range, lr=lr_list[i],
                                                               equation_list=[equation_list[i]], IC_list=[IC_list[i]],
                                                               hid_lay=hid_lay, activation=activation,
                                                               num_equations=num_equations, num_heads=num_heads,
                                                               sample_size = sample_size, decay=decay, dev=dev, verbose=False,
                                                               true_functs=None, reparametrization=reparametrization)
    solution_PINNS.append(trained_model(t_eval, reparametrization=reparametrization)[0])

### Plot training of the first model

In [None]:
numerical_sol_list = [lambda x, IC=ic.detach().cpu().numpy(), Alpha=alpha, beta=beta: numerical_sol_fct(x, IC, Alpha, beta) for ic, alpha in zip(IC_list, alpha_list)]

plot_loss_and_single_solution(x_range=x_range, true_functs=numerical_sol_list,
                              trained_model=trained_model, IC_list=IC_list, A_list=None,
                              force=None, train_losses=loss_history, equation_list=equation_list, 
                              reparametrization=reparametrization, device=dev)

### Plot MAE and MaxAE results over several alpha value

In [None]:
mae_y1 = []
mae_y2 = []
maxae_y1 = []
maxae_y2 = []

for i in range(len(alpha_list)):
    true_funct = lambda x: numerical_sol_fct(x, v=IC_list[0].detach().cpu().numpy(), alpha=alpha_list[i], beta=beta)
    pinns = solution_PINNS[i].detach().cpu().numpy()
    numerical = true_funct(t_eval.detach().cpu().numpy())
    absolute_error = np.abs(pinns[:, 0, :] - numerical)
    mae_y1.append(absolute_error.mean(0)[0])
    mae_y2.append(absolute_error.mean(0)[1])
    maxae_y1.append(absolute_error.max(0)[0])
    maxae_y2.append(absolute_error.max(0)[1])

fig, ax = plt.subplots(1, figsize=(13, 4))

ax.plot(alpha_list, mae_y1, "-o", label="$MAE$ ${y_1}$", linewidth=2, markersize=6)
ax.plot(alpha_list, mae_y2,"-o", label="$MAE$ ${y_2}$", linewidth=2, markersize=6)
ax.plot(alpha_list, maxae_y1, "-x", color="#1f77b4", label="$MaxAE$  ${y_1}$", linewidth=2, markersize=8)
ax.plot(alpha_list, maxae_y2, "-x", color="#ff7f0e", label="$MaxAE$ ${y_2}$", linewidth=2, markersize=8)

ax.set_yscale("log")
ax.set_title(r"Mean and Max Absolute Error with increasing Stiffness", fontsize=20)
ax.set_xlabel(r'Stiffness parameter $\alpha$', fontsize=16)
ax.set_ylabel('Absolute Error', fontsize=16)
ax.set_xticks(alpha_list, [rf"$\alpha$={i}" for i in alpha_list])
ax.set_yticks([0.1, 0.01, 0.001, 0.0001],
              [r"$10^{-1}$", r"$10^{-2}$", r"$10^{-3}$", r"$10^{-4}$"])
ax.grid()
ax.tick_params(axis='x', labelsize=14)
ax.tick_params(axis='y', labelsize=16)
ax.legend(loc='best', fontsize=14)

### Save MAE and MaxAE results over several alpha value

In [None]:
history = {}
history["alpha_list"] = alpha_list
history["mae_y1"] = mae_y1
history["mae_y2"] = mae_y2
history["maxae_y1"] = maxae_y1
history["maxae_y2"] = maxae_y2

current_path = Path.cwd().parent.parent
path = os.path.join(current_path, "result_history")
with open(os.path.join(path, "Duffing_Error_Trained.json"),  "w") as fp:
    json.dump(history, fp)