# Main part of the code

#### Importing libraries

In [1]:
#Importing necessary libraries
!pip install optuna
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
from torch.autograd.functional import jacobian as jac
from torch.func import jacfwd, vmap
from csv import writer
import optuna #for hyperparameter search



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import sys
sys.path.append('/content/drive/MyDrive/elastica_davide')

In [4]:
from scripts.createDataset import getData, getDataLoaders
from scripts.utils import getBCs
from scripts.network import approximate_curve
from scripts.training import trainModel
from scripts.evaluateModel import plotTestResults

In [5]:
#We do this so Pytorch works in double precision
torch.set_default_dtype(torch.float32)

In [6]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


#### Importing and organising data

We can create the dataset as follows:

Input : (q_1,q_2,v_1,v_2,s)
Output : (q(s),v(s))

With the trajectories that we have available, we can hence generate a dataset of size
N_elements+1 x N_samples

In [7]:
#Importing and shuffling the trajectories

trajectories = np.loadtxt("/content/drive/MyDrive/elastica_davide/data/generated_data.txt")
number_samples,number_components = trajectories.shape
#Randomize the order of the trajectories
indices = np.random.permutation(len(trajectories))
trajectories = trajectories[indices]

number_elements = int(number_components/4)-1
data_train, data_test = getData(number_elements,number_samples,trajectories)

#### Training the neural network iterating over the training batches

In [8]:
def define_model(trial):
    normalize = trial.suggest_categorical("normalize",[True,False])
    netarch = trial.suggest_categorical("networkarch",[0, 1, 2])
    if netarch == 0:
      is_deeponet = True
      is_res = False
    elif netarch == 1:
      is_deeponet = False
      is_res = True
    else:
      is_deeponet = False
      is_res = False
    act = trial.suggest_categorical("act",['tanh','sigmoid','sin','swish'])
    nlayers = trial.suggest_int("n_layers", 1, 4)
    hidden_nodes = trial.suggest_int("hidden_nodes", 10, 100)
    correct_functional = trial.suggest_categorical("correct_functional",[True,False])

    model = approximate_curve(normalize, act, nlayers, hidden_nodes, correct_functional, is_res, is_deeponet)
    return model

In [9]:
def objective(trial):

    torch.manual_seed(1)
    np.random.seed(1)

    # Generate the model.
    model = define_model(trial)
    model.to(device);

    lr = trial.suggest_float("lr", 1e-4 , 1e-1, log=True)
    weight_decay = trial.suggest_float("weight_decay",0,5e-4)
    #optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    #optimizer = getattr(torch.optim, optimizer_name)(model.parameters(), lr=lr, weight_decay=weight_decay)

    #optimizer = torch.optim.LBFGS(model.parameters(),lr=lr, max_iter=100,tolerance_grad=1.e-10,
    #                                  tolerance_change=1.e-10, history_size=100, line_search_fn='strong_wolfe')
    optimizer = torch.optim.Adam(model.parameters(),lr=lr,weight_decay=weight_decay)

    criterion = nn.MSELoss()


    batch_size = 32 # trial.suggest_int("batch_size", 32, 64)

    trainloader, testloader = getDataLoaders(batch_size,data_train,data_test)

    print("Current test with :\n\n")
    for key, value in trial.params.items():
      print("    {}: {}".format(key, value))
    print("\n\n")

    epochs = 100
    loss = trainModel(number_elements,device,model,criterion,optimizer,epochs,trainloader)
    test_error = 100
    if not torch.isnan(loss):
      model.eval();

      def eval_model(s,q1,q2,v1,v2):
        s_ = torch.tensor([[s]],dtype=torch.float32).to(device)
        q1 = torch.from_numpy(q1.astype(np.float32)).reshape(1,-1).to(device)
        q2 = torch.from_numpy(q2.astype(np.float32)).reshape(1,-1).to(device)
        v1 = torch.from_numpy(v1.astype(np.float32)).reshape(1,-1).to(device)
        v2 = torch.from_numpy(v2.astype(np.float32)).reshape(1,-1).to(device)
        return model(s_,q1,q2,v1,v2).detach().cpu().numpy()[0]

      bcs = getBCs(trajectories)
      q1 = bcs["q1"]
      q2 = bcs["q2"]
      v1 = bcs["v1"]
      v2 = bcs["v2"]

      xx = np.linspace(0, 1, number_elements+1)
      res = np.zeros((50,2,len(xx)))

      for j in range(50):
          for i in range(len(xx)):
              res[j,:,i] = eval_model(xx[i],q1[j],q2[j],v1[j],v2[j])

      q_x_pred_torch = torch.from_numpy(res[:,0].astype(np.float32))
      q_x_true_torch = torch.from_numpy(trajectories[:50,np.arange(0,number_components,4)].astype(np.float32))

      q_y_pred_torch = torch.from_numpy(res[:,1].astype(np.float32))
      q_y_true_torch = torch.from_numpy(trajectories[:50,np.arange(1,number_components,4)].astype(np.float32))

      test_error = criterion(q_x_pred_torch,q_x_true_torch).item() + criterion(q_y_pred_torch,q_y_true_torch).item()

    #Saving the obtained results
    if trial.number == 0:
        labels = []
        for lab, _ in trial.params.items():
            labels.append(str(lab))
        labels.append("Test error")
        with open("/content/drive/MyDrive/elastica_davide/savedResults/results.csv", "a") as f_object:
            writer_object = writer(f_object)
            writer_object.writerow(labels)
            f_object.close()

    results = []
    for _, value in trial.params.items():
        results.append(str(value))

    results.append(test_error)

    with open("/content/drive/MyDrive/elastica_davide/savedResults/results.csv", "a") as f_object:
        writer_object = writer(f_object)
        writer_object.writerow(results)
        f_object.close()

    return test_error

In [None]:
optuna_study = input("Do you want to do hyperparameter test? Type Yes or No ")
params = {}
if optuna_study=="Yes":
    optuna_study = True
else:
    optuna_study = False
if optuna_study:
    study = optuna.create_study(direction="minimize",study_name="Euler Elastica")
    study.optimize(objective, n_trials=100)
    print("Study statistics: ")
    print("  Number of finished trials: ", len(study.trials))

    params = study.best_params

Do you want to do hyperparameter test? Type Yes or No Yes


[I 2023-10-14 19:27:13,693] A new study created in memory with name: Euler Elastica


Current test with :


    normalize: True
    networkarch: 0
    act: sigmoid
    n_layers: 4
    hidden_nodes: 96
    correct_functional: False
    lr: 0.0004001351626389534
    weight_decay: 4.107793070450294e-05



Loss [2](epoch):  0.3932371437549591
Loss [3](epoch):  0.45897868275642395
Loss [4](epoch):  0.4571979343891144
Loss [5](epoch):  0.4476149082183838
Loss [6](epoch):  0.36708560585975647
Loss [7](epoch):  0.24484089016914368
Loss [8](epoch):  0.21626968681812286
Loss [9](epoch):  0.15656474232673645
Loss [10](epoch):  0.12319495528936386
Loss [11](epoch):  0.06442251801490784
Loss [12](epoch):  0.04837831109762192
Loss [13](epoch):  0.033051274716854095
Loss [14](epoch):  0.025655072182416916
Loss [15](epoch):  0.03498153015971184
Loss [16](epoch):  0.02635006234049797
Loss [17](epoch):  0.038770489394664764
Loss [18](epoch):  0.027440451085567474
Loss [19](epoch):  0.013566236943006516
Loss [20](epoch):  0.022772496566176414
Loss [21](epoch):  0.029205340892076492
Loss [

[I 2023-10-14 19:28:09,237] Trial 0 finished with value: 0.038577035011257976 and parameters: {'normalize': True, 'networkarch': 0, 'act': 'sigmoid', 'n_layers': 4, 'hidden_nodes': 96, 'correct_functional': False, 'lr': 0.0004001351626389534, 'weight_decay': 4.107793070450294e-05}. Best is trial 0 with value: 0.038577035011257976.


#### Plot the results for the best combination of hyperparameters

In [None]:
if params=={}:
    print("No parameters have been specified. Let's input them:\n\n")
    normalize = input("Normalize is True or False? ")=="True"
    is_deeponet = input("Is DeepONet True or False? ")=="True"
    if is_deeponet:
      is_res = False
    else:
      is_res = input("Is_res True or False? ")=="True"
    act = input("What activation function to use? Choose among 'sin', 'sigmoid', 'swish', 'tanh'")
    nlayers = int(input("How many layers do you want the network to have? "))
    hidden_nodes = int(input("How many hidden nodes do you want the network to have? "))
    correct_functional = input("Type True if you want to impose by design the BCs, False otherwise: ")
    lr = float(input("What learning rate do you want to use? "))
    weight_decay = float(input("What weight decay do you want to use? "))

    params = {"normalize": normalize,
              "act": act,
              "n_layers":nlayers,
              "hidden_nodes":hidden_nodes,
              "correct_functional":correct_functional,
              "lr":lr,
              "weight_decay":weight_decay,
              "is_res":is_res,
              "is_deeponet":is_deeponet}

In [None]:
def define_best_model():
    normalize = params["normalize"]
    act = params["act"]
    nlayers = params["n_layers"]
    hidden_nodes = params["hidden_nodes"]
    correct_functional = params["correct_functional"]
    is_res = params["is_res"]
    is_deeponet = params["is_deeponet"]

    model = approximate_curve(normalize, act, nlayers, hidden_nodes, correct_functional, is_res, is_deeponet)

    return model

In [None]:
model = define_best_model()
model.to(device);

In [None]:
weight_decay = params["weight_decay"]
lr = params["lr"]
#optimizer = torch.optim.LBFGS(model.parameters(),lr=lr, max_iter=100,tolerance_grad=1.e-10,
#                                  tolerance_change=1.e-10, history_size=100, line_search_fn='strong_wolfe')
optimizer = torch.optim.Adam(model.parameters(),lr=lr,weight_decay=weight_decay)

criterion = nn.MSELoss()


batch_size = 32

trainloader, testloader = getDataLoaders(batch_size,data_train,data_test)
epochs = 100
loss = trainModel(number_elements,device,model,criterion,optimizer,epochs,trainloader)

In [None]:
model.eval();
plotTestResults(model,device,number_elements,number_components,trajectories)

In [None]:
def eval_model(model,device,s,q1,q2,v1,v2):
    s_ = torch.tensor([[s]],dtype=torch.float32).to(device)
    q1 = torch.from_numpy(q1.astype(np.float32)).reshape(1,-1).to(device)
    q2 = torch.from_numpy(q2.astype(np.float32)).reshape(1,-1).to(device)
    v1 = torch.from_numpy(v1.astype(np.float32)).reshape(1,-1).to(device)
    v2 = torch.from_numpy(v2.astype(np.float32)).reshape(1,-1).to(device)
    return model(s_,q1,q2,v1,v2).detach().cpu().numpy()[0]

#This returns the approximation vector q'(s) in R^2 associated to the boundary conditions (q1,q2,v1,v2)
#and correspondent to position s in the interval [0,1]
# TODO : Understand why this is not providing a good fit of the derivative
def eval_derivative_model(model,device,s,q1,q2,v1,v2):
    s_ = torch.tensor([[s]],dtype=torch.float32).to(device)
    q1 = torch.from_numpy(q1.astype(np.float32)).reshape(1,-1).to(device)
    q2 = torch.from_numpy(q2.astype(np.float32)).reshape(1,-1).to(device)
    v1 = torch.from_numpy(v1.astype(np.float32)).reshape(1,-1).to(device)
    v2 = torch.from_numpy(v2.astype(np.float32)).reshape(1,-1).to(device)

    q = lambda s : model(s,q1,q2,v1,v2)[0]
    v = lambda s : (jacfwd(q))(s)[:,:,0].T
    return v(s_).detach().cpu().numpy().reshape(-1)

def eval_derivative_model(model,device,s,q1,q2,v1,v2):
    s_ = torch.tensor([[s]],dtype=torch.float32).to(device)
    q1 = torch.from_numpy(q1.astype(np.float32)).reshape(1,-1).to(device)
    q2 = torch.from_numpy(q2.astype(np.float32)).reshape(1,-1).to(device)
    v1 = torch.from_numpy(v1.astype(np.float32)).reshape(1,-1).to(device)
    v2 = torch.from_numpy(v2.astype(np.float32)).reshape(1,-1).to(device)

    q = lambda s : model(s,q1,q2,v1,v2)[0]
    v = lambda s : (jacfwd(q))(s)[:,:,0].T
    return v(s_).detach().cpu().numpy().reshape(-1)

def plotTestResults(model,device,number_elements,number_components,trajectories):

    criterion = nn.MSELoss()

    bcs = getBCs(trajectories)
    q1 = bcs["q1"]
    q2 = bcs["q2"]
    v1 = bcs["v1"]
    v2 = bcs["v2"]

    xx = np.linspace(0, 1, number_elements+1)
    res = np.zeros((len(trajectories),2,len(xx)))
    res_derivative = np.zeros_like(res)

    for j in range(20):
        for i in range(len(xx)):
            res[j,:,i] = eval_model(model,device,xx[i],q1[j],q2[j],v1[j],v2[j])
            res_derivative[j,:,i] = eval_derivative_model(model,device,xx[i],q1[j],q2[j],v1[j],v2[j])

    print(res[0])
    print(res_derivative[0])


In [None]:
plotTestResults(model,device,number_elements,number_components,trajectories)

In [None]:
print(trajectories[0])