In [1]:
import numpy as np
import torch
from torch import nn, optim, autograd
from torch.nn import functional as F
from pyDOE import lhs
import scipy.io
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.gridspec as gridspec
%matplotlib inline
from mpl_toolkits.axes_grid1 import make_axes_locatable

from utils_training import *

#Paper reproduction
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
np.random.seed(1234)

In [None]:
N_train = 1000
N_bound = 400
N_init = 200

# Input  [x,t]
# Output [theta, w]

la = np.array([np.pi,1])
lb = np.array([0,0])

traindata = lb+(la-lb)*lhs(2,N_train)
x_inside = traindata[:,0:1]
t_inside = traindata[:,1:2]

x_inside = numpy_to_tensor(x_inside, var_name="x_inside", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
t_inside = numpy_to_tensor(t_inside, var_name="t_inside", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)

x_bound = lb[0]+(la[0]-lb[0])*lhs(1,N_bound)
x_bound_t_zero = np.zeros_like(x_bound)
t_bound = lb[1]+(la[1]-lb[1])*lhs(1,N_init)


x_bound = numpy_to_tensor(x_bound, var_name="x_bound", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
x_bound_t_zero = numpy_to_tensor(x_bound_t_zero, var_name="x_bound_t_zero", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
t_bound = numpy_to_tensor(t_bound, var_name="t_bound", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)

In [3]:
# External force
def g(data):
    x = data[:,0:1]
    t = data[:,1:2]
    return np.cos(t) - (np.pi / 2) * np.sin(x) * np.cos(t)

# Exact solution
def exact_theta(data):
    x = data[:,0:1]
    t = data[:,1:2]
    return (np.pi / 2 * np.cos(x) + (x - np.pi / 2)) * np.cos(t)

def exact_w(data):
    x = data[:,0:1]
    t = data[:,1:2]
    return (np.pi / 2) * np.sin(x) * np.cos(t)

In [None]:
random_seed = 1234
np.random.seed(random_seed)

x_values = [0.2, 0.8, 1.8, 2.6, 3.0]
observe_number_per_x = 10

observe_data = []
for x in x_values:
    t_for_x = lhs(1, observe_number_per_x)
    x_column = np.full((observe_number_per_x, 1), x)
    observe_data_x_t = np.hstack((x_column, t_for_x))
    observe_data.append(observe_data_x_t)

observe_data = np.vstack(observe_data)

observe_clear_theta = exact_theta(observe_data)
observe_clear_w = exact_w(observe_data)
observe_clear_u = np.concatenate((observe_clear_theta,observe_clear_w),axis=1)

############# N(0,0.1^2) #############
noise_nu = 0
noise_std = 0.1
noise_u = np.random.normal(loc=noise_nu, scale=noise_std, size=observe_clear_u.shape)
observe_u = observe_clear_u + noise_u
############# N(0,0.1^2) #############

observe_data = numpy_to_tensor(observe_data, var_name="observe_data", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
observe_clear_u = numpy_to_tensor(observe_clear_u, var_name="observe_u", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
observe_u = numpy_to_tensor(observe_u, var_name="observe_u", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
print('J:',len(observe_u))

observe_data_x_inside = observe_data[:,0:1]
observe_data_t_inside = observe_data[:,1:2]

noise_u = numpy_to_tensor(noise_u, var_name="noise_u", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = False)

In [None]:
np.random.seed(5678)
N_test_number = 10000

test_data = lb+(la-lb)*lhs(2,N_test_number)
test_theta = exact_theta(test_data)
test_w = exact_w(test_data)
test_u = np.concatenate((test_theta,test_w),axis=1)

test_data = numpy_to_tensor(test_data, var_name="test_data", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)
test_u = numpy_to_tensor(test_u, var_name="test_u", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)

test_data_x_inside = test_data[:,0:1]
test_data_t_inside = test_data[:,1:2]

In [6]:
C1 = torch.tensor(0.0, requires_grad=True)

In [None]:
print('J:',len(observe_u))

In [8]:
def get_loss_f(x_grad,t_grad,PINNs,C,return_sequence='not'):
    
    ########### loss f  ###########
    E_inside = PINNs(torch.cat((x_grad,t_grad),axis=1))
    
    E_theta = E_inside[:,0:1]
    E_w = E_inside[:,1:2]
    
    theta_tt = compute_higher_order_derivatives(E_theta, [t_grad,t_grad])
    theta_x = compute_higher_order_derivatives(E_theta, [x_grad])
    theta_xx = compute_higher_order_derivatives(theta_x, [x_grad])
    
    w_tt = compute_higher_order_derivatives(E_w, [t_grad,t_grad])
    w_x = compute_higher_order_derivatives(E_w, [x_grad])
    w_xx = compute_higher_order_derivatives(w_x, [x_grad])
    
    g = torch.cos(t_grad) - torch.pi/2 * torch.sin(x_grad) * torch.cos(t_grad)
    
    loss_f_1_sequence = C*theta_tt-theta_xx+(E_theta-w_x)
    loss_f_1_squared_sequence = torch.square(loss_f_1_sequence)

    loss_f_2_sequence = w_tt+theta_x-w_xx-g
    loss_f_2_squared_sequence = torch.square(loss_f_2_sequence)
    
    if return_sequence=='yes':
        return loss_f_1_squared_sequence,loss_f_2_squared_sequence
    else:
        return torch.mean(loss_f_1_squared_sequence)+torch.mean(loss_f_2_squared_sequence)

In [9]:
def get_loss_bound(bound_x, bound_x_t_zero, bound_t, PINNs, C, return_sequence='not'):
    
    E_bound_x_zero = PINNs(torch.cat((bound_x,bound_x_t_zero),axis=1)) #u(x,0) u(bound_x,t = bound_x_t_zero)
    
    E_theta_x_zero = E_bound_x_zero[:,0:1]  #theta(x,0)
    E_w_x_zero = E_bound_x_zero[:,1:2] #w(x,0)
    
    E_theta_x_zero_dt = compute_higher_order_derivatives(E_theta_x_zero, [bound_x_t_zero]) #theta_t(x,0)
    E_w_x_zero_dt = compute_higher_order_derivatives(E_w_x_zero, [bound_x_t_zero]) #w_t(x,0)
    
    loss_bound_x_a = E_theta_x_zero-(torch.pi/2)*torch.cos(bound_x)-(bound_x-torch.pi/2)
    loss_bound_x_b = E_theta_x_zero_dt-torch.zeros_like(E_theta_x_zero_dt)
    loss_bound_x_c = E_w_x_zero-(torch.pi/2)*torch.sin(bound_x)
    loss_bound_x_d = E_w_x_zero_dt-torch.zeros_like(E_w_x_zero_dt)
    
    loss_bound_x = torch.mean(torch.square(loss_bound_x_a))+\
                   torch.mean(torch.square(loss_bound_x_b))+\
                   torch.mean(torch.square(loss_bound_x_c))+\
                   torch.mean(torch.square(loss_bound_x_d))

    E_bound_zero_t = PINNs(torch.cat((torch.zeros_like(bound_t),bound_t),axis=1)) #u(0,t)
    E_theta_zero_t = E_bound_zero_t[:,0:1]  #theta(0,t)
    E_w_zero_t = E_bound_zero_t[:,1:2] #w(0,t)
    
    E_bound_pi_t = PINNs(torch.cat((torch.ones_like(bound_t)*torch.pi,bound_t),axis=1)) #u(pi,t)
    E_theta_pi_t = E_bound_pi_t[:,0:1]  #theta(pi,t)
    E_w_pi_t = E_bound_pi_t[:,1:2] #w(pi,t)
    
    loss_bound_t = torch.mean(torch.square(E_theta_zero_t))+\
                   torch.mean(torch.square(E_w_zero_t))+\
                   torch.mean(torch.square(E_theta_pi_t))+\
                   torch.mean(torch.square(E_w_pi_t))
    
    loss_bound_value = loss_bound_x+loss_bound_t
    
    return loss_bound_value

In [10]:
#Paper reproduction
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
np.random.seed(1234)

In [11]:
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
np.random.seed(1234)

net_settings_for_PINNs1 = NetSetting(input_dims=2, hidden_neurons_list=[50]*4, 
                                     output_dims=2, hidden_activation='tanh', 
                                     output_activation=None, initializer_method='xavier')
PINNs1 = get_mlp_pinn(net_settings_for_PINNs1)
PINNs1.cuda()

initialize_weights(PINNs1, net_settings_for_PINNs1.initializer_method)

optimizer1 = optim.Adam(PINNs1.parameters(), lr=0.001)    
optimizer1.add_param_group({'params': [C1], 'lr': 0.001})

In [12]:
x_inside_all = torch.cat((x_inside,observe_data[:,0:1]),axis=0)
t_inside_all = torch.cat((t_inside,observe_data[:,1:2]),axis=0)

In [None]:
Theta_list = np.zeros_like(observe_u.cpu().detach().numpy())
Theta_list = numpy_to_tensor(Theta_list, var_name="Theta_list", value_range_dim = True, to_torch = True, to_cuda = True, requires_grad = True)

In [14]:
C1_id = id(C1)
Theta_list_id = id(Theta_list)

optimizer_error = optim.Adam([Theta_list], lr=0.0001,betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)    

In [15]:
def train_optimizer1_error_correction(PINNs,C,Theta_list_p):
    
    for param in PINNs.parameters():
        param.requires_grad = True
    
    C.requires_grad = True
    
    Theta_list_p.requires_grad = False

In [16]:
def train_optimizer2_error_correction(PINNs,C,Theta_list_p):
    
    for param in PINNs.parameters():
        param.requires_grad = False
    
    C.requires_grad = False
    
    Theta_list_p.requires_grad = True

In [17]:
def train_for_error_correction(PINNs,C,Theta_list_input,observe_data_input,observe_u_input):
    
    train_optimizer2_error_correction(PINNs,C,Theta_list_input)
        
    E_observation = PINNs(observe_data_input)
    
    loss_observation_refine = torch.mean(torch.square(E_observation+Theta_list_input-observe_u_input)) 
    
    optimizer_error.zero_grad()
    loss_observation_refine.backward()
    optimizer_error.step()
        
    train_optimizer1_error_correction(PINNs,C,Theta_list_input)
    
    return Theta_list_input

In [18]:
def flatten_line(data):
    for i in range(1, len(data)):
        if data[i] > data[i - 1]:
            data[i] = data[i - 1]
    return data

In [19]:
import copy
def training_recall(PINNs, C, it_iter, loss_f_for_T_1_best, best_PINNs_state, 
                    loss_all_1, loss_f_1, loss_f_for_collocation_1, 
                    loss_f_for_T_1, loss_f_excapt_T_1, loss_T_1, 
                    loss_T_clear_1, loss_T_1_test_data,loss_T_EC_1, test_loss_1, 
                    C1_list, optimizer1):

    loss_f_for_T_1_smooth = flatten_line(loss_f_for_T_1.copy())
    current_min_index = loss_f_for_T_1_smooth.index(min(loss_f_for_T_1_smooth))
    current_min = min(loss_f_for_T_1_smooth)

    if best_PINNs_state is None or current_min < best_PINNs_state['min_loss']:
        best_PINNs_state = {
            'model_state': copy.deepcopy(PINNs.state_dict()),
            'C_state': C.clone().detach(),
            'optimizer_state': copy.deepcopy(optimizer1.state_dict()),
            'it': it_iter,
            'min_loss': current_min
        }

    if len(loss_f_for_T_1_smooth) - current_min_index > 300:
        print('Restoring best model from iteration:', best_PINNs_state['it'])
        PINNs.load_state_dict(best_PINNs_state['model_state'])
        
        C_value = best_PINNs_state['C_state']
        with torch.no_grad(): 
            C.copy_(C_value)  

        optimizer1.load_state_dict(best_PINNs_state['optimizer_state'])
        
        recall_it = it_iter  
        it_iter = best_PINNs_state['it']  

        loss_all_1 = loss_all_1[:it_iter+1]
        loss_f_1 = loss_f_1[:it_iter+1]
        loss_f_for_collocation_1 = loss_f_for_collocation_1[:it_iter+1]
        loss_f_for_T_1 = loss_f_for_T_1[:it_iter+1]
        loss_f_excapt_T_1 = loss_f_excapt_T_1[:it_iter+1]
        loss_T_1 = loss_T_1[:it_iter+1]
        loss_T_clear_1 = loss_T_clear_1[:it_iter+1]
        loss_T_1_test_data = loss_T_1_test_data[:it_iter+1]
        test_loss_1 = test_loss_1[:it_iter+1]
        C1_list = C1_list[:it_iter+1]
        loss_T_EC_1 = loss_T_EC_1[:it_iter+1]
        loss_f_for_T_1_best = True
    
    return (PINNs, C, it_iter, loss_f_for_T_1_best, best_PINNs_state, 
            loss_all_1, loss_f_1, loss_f_for_collocation_1, loss_f_for_T_1, 
            loss_f_excapt_T_1, loss_T_1, loss_T_clear_1, loss_T_1_test_data,loss_T_EC_1, 
            test_loss_1, C1_list,optimizer1)

In [None]:
############## Record list ###############
loss_all_1 = []
loss_f_1 = []
loss_f_for_collocation_1 = []
loss_f_for_T_1 = []
loss_f_excapt_T_1 = []
loss_T_1 = []
loss_T_clear_1 = []
loss_T_1_test_data = []
loss_T_EC_1 = []
test_loss_1 = []
C1_list = []
############## Record list ###############


nIter1 = 20000
it = 0
best_total_loss = 100
best_test_loss = 100 

# Initial placeholders for best model and iteration
loss_f_for_T_1_best = False
best_PINNs_state = None
best_it = None
recall_it = None

while it<nIter1:
    #########loss f#########
    loss_f = get_loss_f(x_inside_all,t_inside_all,PINNs1,C1,return_sequence='not')
    
    #########loss f  for collocation data#########
    loss_f_for_collocation = get_loss_f(x_inside,t_inside,PINNs1,C1,return_sequence='not')
    #########loss f  for observation data#########
    loss_f_for_T = get_loss_f(observe_data_x_inside,observe_data_t_inside,PINNs1,C1,return_sequence='not')
    #########loss f  excapt observation data#########
    loss_f_excapt_T = get_loss_f(test_data_x_inside,test_data_t_inside,PINNs1,C1,return_sequence='not')
    
    #########loss b#########        
    loss_b = get_loss_bound(x_bound, x_bound_t_zero, t_bound, PINNs1, C1, return_sequence='not')
    
    #########loss T observation#########        
    E_observation = PINNs1(observe_data) 
    
    if loss_f_for_T_1_best:
        loss_observation = torch.mean(torch.square(E_observation+Theta_list-observe_u)) 
    else:
        loss_observation = torch.mean(torch.square(E_observation-observe_u))            

    #########loss EC######### 
    loss_EC = torch.mean(torch.square(Theta_list-noise_u))   
    
    #########loss T noisy observation#########  
    loss_observation_noisy = torch.mean(torch.square(E_observation-observe_u)) 
    #########loss T clear observation#########        
    E_observation_clear = PINNs1(observe_data) 
    loss_observation_clear = torch.mean(torch.square(E_observation_clear-observe_clear_u))    
    #########loss T excapt observation#########        
    E_observation_excapt = PINNs1(test_data) 
    loss_observation_excapt = torch.mean(torch.square(E_observation_excapt-test_u))   
    
    #########loss PI#########
    loss = loss_f+loss_b+10*loss_observation
    
    #########test_loss NRMSE#########
    pre_u = PINNs1(test_data)
    test_loss = relative_l2_torch(pre_u,test_u)
    #########test_loss NRMSE#########
    
    #########Record#########
    loss_f_1.append(loss_f.item())
    loss_f_for_collocation_1.append(loss_f_for_collocation.item())
    loss_f_for_T_1.append(loss_f_for_T.item())
    loss_f_excapt_T_1.append(loss_f_excapt_T.item())
    C1_list.append(C1.item())   
    loss_T_1.append(loss_observation_noisy.item()) 
    loss_T_EC_1.append(loss_EC.item())  
    loss_T_clear_1.append(loss_observation_clear.item()) 
    loss_T_1_test_data.append(loss_observation_excapt.item()) 
    test_loss_1.append(test_loss)
    #########Record#########
    
    optimizer1.zero_grad()
    loss.backward()
    optimizer1.step()
       
    if it % 1000 == 0:
        
        print('It:', it, 'train_loss:', loss.item(), 'test_loss:', test_loss)      
        print('C1',C1)     

    if  not loss_f_for_T_1_best:
        
        if it>0:
            
            (PINNs1, C1, it, loss_f_for_T_1_best, best_PINNs_state, 
             loss_all_1, loss_f_1, loss_f_for_collocation_1, loss_f_for_T_1, 
             loss_f_excapt_T_1, loss_T_1, loss_T_clear_1, loss_T_1_test_data,loss_T_EC_1, 
             test_loss_1, C1_list, optimizer1) = training_recall(
                 PINNs1, C1, it, loss_f_for_T_1_best, best_PINNs_state, 
                 loss_all_1, loss_f_1, loss_f_for_collocation_1, 
                 loss_f_for_T_1, loss_f_excapt_T_1, loss_T_1, loss_T_clear_1, 
                 loss_T_1_test_data,loss_T_EC_1, test_loss_1, C1_list, optimizer1)
            
            if loss_f_for_T_1_best:
                Theta_list.requires_grad = True
                for param_group in optimizer1.param_groups:
                    if any(id(param) == C1_id for param in param_group['params']):
                        param_group['lr'] = 0.0001  
    else:
        ############################ Refine L_T ############################   
        Theta_list = train_for_error_correction(PINNs1,C1,Theta_list,observe_data,observe_u)
        ############################ Refine L_T ############################  
    it = it + 1   
    
    ############################ Save the Best Model ############################
    if it>19500 and loss.item() < best_total_loss:
        best_total_loss = loss.item()
        torch.save(PINNs1.state_dict(), '../saved_model/Total_EC_model.pth')
        print(f"New best model saved at iteration {it}")
        
    if it>19500 and test_loss < best_test_loss:
        best_test_loss = test_loss
        torch.save(PINNs1.state_dict(), '../saved_model/Test_EC_model.pth')
        print(f"New best model saved at iteration {it}")
    ########################################################################### 
    
print('Final:', 'train_loss:', loss.item(), 'test_loss:', test_loss)