In [6]:
import os
import numpy as np
import torch
from torch import tensor
import torch.nn as nn
import torch.nn.functional as F

from torch.autograd import Variable, grad

import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
from mpl_toolkits.axes_grid1 import make_axes_locatable
import scipy
import scipy.io as io
from pyDOE import lhs

from sklearn.metrics import mean_squared_error, mean_absolute_error

from pysr import pysr, best, best_callable

In [2]:
data = io.loadmat('data/burgers_shock.mat')

t = data['t'].flatten()[:,None]
x = data['x'].flatten()[:,None]
Exact = np.real(data['usol']).T

X, T = np.meshgrid(x,t)

X_star = np.hstack((X.flatten()[:,None], T.flatten()[:,None]))
u_star = Exact.flatten()[:,None]              

# Doman bounds
lb = X_star.min(0)
ub = X_star.max(0)

N = 2000
idx = np.random.choice(X_star.shape[0], N, replace=False)
X_u_train = X_star[idx, :]
u_train = u_star[idx,:]

In [3]:
class Network(nn.Module):
    def __init__(self, model, lambda_1_init, lambda_2_init):
        super(Network, self).__init__()
        self.model = model
        self.model.apply(self.xavier_init)
        self.lambda_1 = torch.nn.Parameter(torch.tensor([lambda_1_init]))
        self.lambda_2 = torch.nn.Parameter(torch.tensor([lambda_2_init]))
        
    def xavier_init(self, m):
        if type(m) == nn.Linear:
            torch.nn.init.xavier_uniform_(m.weight)
            m.bias.data.fill_(0.01)
        
    def forward(self, x, t):
        return self.model(torch.cat([x, t], dim=1))
    
    def loss(self, x, t, y_input, is_pde_parameters_update=False, callable_fn=None):
        uf = self.forward(x, t)
        total_loss = F.mse_loss(uf, y_input, reduction='mean')
        
        if is_pde_parameters_update:
            lambda_1 = self.lambda_1
            lambda_2 = torch.exp(self.lambda_2)
            
            # PDE Loss calculation
            u_t = self.gradients(uf, t)[0]
            u_x = self.gradients(uf, x)[0]
            u_xx = self.gradients(u_x, x)[0]
            l_eq = (u_t + lambda_1*uf*u_x - lambda_2*u_xx)
            l_eq = (l_eq**2).mean()
            
            total_loss = total_loss + l_eq
        
        return total_loss
    
    def get_theta(self, x, t):
        self.eval()
        
        uf = self.forward(x, t)
        
        # PDE Loss calculation
        u_t = self.gradients(uf, t)[0]
        u_x = self.gradients(uf, x)[0]
        u_xx = self.gradients(u_x, x)[0]
        
        X = torch.cat([-uf*u_x, u_xx], dim=1)
        y = u_t
        
        theta = (torch.inverse(X.T@X))@(X.T@y)
        
        theta_1 = np.maximum(theta[:, 0][0].detach().item(), torch.finfo(torch.float32).eps)
        theta_2 = np.log(np.maximum(theta[:, 0][1].detach().item(), torch.finfo(torch.float32).eps))
        
        return theta_1, theta_2
    
    def get_gradients_dict(self, x, t):
        self.eval()
        
        uf = self.forward(x, t)
        
        ### PDE Loss calculation ###
        # first-order derivatives
        u_t = self.gradients(uf, t)[0]
        u_x = self.gradients(uf, x)[0]
        # Homo second-order derivatives
        u_tt = self.gradients(u_t,t)[0]
        u_xx = self.gradients(u_x, x)[0]
        # Hetero second-order derivatives
        u_xt = self.gradients(u_t, x)[0]
        u_tx = self.gradients(u_x, t)[0]
        
        return {'uf':uf, 'u_x':u_x, 'u_xx':u_xx}, u_t
    
    def gradients(self, func, x):
        return grad(func, x, create_graph=True, retain_graph=True, grad_outputs=torch.ones(func.shape))
    
    def set_lambdas(self, lambda_1_init, lambda_2_init):
        self.lambda_1 = torch.nn.Parameter(torch.tensor([lambda_1_init]))
        self.lambda_2 = torch.nn.Parameter(torch.tensor([lambda_2_init]))

In [4]:
hidden_nodes = 50

model = nn.Sequential(nn.Linear(2, hidden_nodes), 
                        nn.Tanh(), 
                        nn.Linear(hidden_nodes, hidden_nodes),
                        nn.Tanh(), 
                        nn.Linear(hidden_nodes, hidden_nodes),
                        nn.Tanh(), 
                        nn.Linear(hidden_nodes, hidden_nodes),
                        nn.Tanh(),
                        nn.Linear(hidden_nodes, 1))

# Doesn't matter, can be anything.
lambda_1_init = 0.0
lambda_2_init = 0.0

network = Network(model=model, lambda_1_init=lambda_1_init, lambda_2_init=lambda_2_init)

In [5]:
X_u_train = tensor(X_u_train).float().requires_grad_(True)
u_train = tensor(u_train).float().requires_grad_(True)

X_star = tensor(X_star).float().requires_grad_(True)
u_star = tensor(u_star).float().requires_grad_(True)

In [6]:
# optimizer = torch.optim.Adam(network.parameters(), lr=3e-4)  # metaopt also has .parameters()
optimizer = torch.optim.LBFGS(network.parameters(), lr=5e-2, max_iter=50, max_eval=50, line_search_fn='strong_wolfe')

epochs = 600
# weights_path = './saved_path_inverse_burger/frozen_pinn.pth'
weights_path = './saved_path_inverse_burger/nn_with_physical_reg_from_symreg.pth'

In [7]:
# network.train(); best_train_loss = 1e6
# for i in range(epochs):
#     ### Add the closure function to calculate the gradient. For LBFGS.
#     def closure():
#         if torch.is_grad_enabled():
#             optimizer.zero_grad()
#         l = network.loss(X_u_train[:, 0:1], X_u_train[:, 1:2], u_train, is_pde_parameters_update=False)
#         if l.requires_grad:
#             l.backward()
#         return l

#     optimizer.step(closure)

#     # calculate the loss again for monitoring
#     l = closure()
    
#     if i > 400 and float(l.item()) < best_train_loss:
#         torch.save(network.state_dict(), './saved_path_inverse_burger/nn_without_physical_reg.pth')
#         best_train_loss = float(l.item())

#     if (i % 100) == 0:
#         print("Epoch {}: ".format(i), l.item())

In [8]:
# lambda_1_init, lambda_2_init = network.get_theta(X_u_train[:, 0:1], X_u_train[:, 1:2])
# network.set_lambdas(lambda_1_init, lambda_2_init)

lambda_1_init = 0.6860763
lambda_2_init = np.log(0.0020577204)

### Choosing btw reset model weights or pretraining ###
network = Network(model=model, lambda_1_init=lambda_1_init, lambda_2_init=lambda_2_init)
optimizer = torch.optim.LBFGS(network.parameters(), lr=5e-2, max_iter=50, max_eval=50, line_search_fn='strong_wolfe')

network.train(); best_train_loss = 1e6
for i in range(epochs):
    ### Add the closure function to calculate the gradient. For LBFGS.
    def closure():
        if torch.is_grad_enabled():
            optimizer.zero_grad()
        l = network.loss(X_u_train[:, 0:1], X_u_train[:, 1:2], u_train, is_pde_parameters_update=True)
        if l.requires_grad:
            l.backward()
        return l

    optimizer.step(closure)

    # calculate the loss again for monitoring
    l = closure()

    if i > 400 and float(l.item()) < best_train_loss:
        torch.save(network.state_dict(), weights_path)
        best_train_loss = float(l.item())

    if (i % 10) == 0:
        print("Epoch {}: ".format(i), l.item())

Epoch 0:  0.09962643159045007


KeyboardInterrupt: 

In [9]:
### Loading the best weights ###
network.load_state_dict(torch.load(weights_path))

<All keys matched successfully>

In [10]:
network.eval()

Network(
  (model): Sequential(
    (0): Linear(in_features=2, out_features=50, bias=True)
    (1): Tanh()
    (2): Linear(in_features=50, out_features=50, bias=True)
    (3): Tanh()
    (4): Linear(in_features=50, out_features=50, bias=True)
    (5): Tanh()
    (6): Linear(in_features=50, out_features=50, bias=True)
    (7): Tanh()
    (8): Linear(in_features=50, out_features=1, bias=True)
  )
)

In [11]:
nu = 0.01 / np.pi

error_lambda_1 = np.abs(network.lambda_1.detach().item() - 1.0)*100
error_lambda_2 = np.abs(torch.exp(network.lambda_2).detach().item() - nu) / nu * 100

error_lambda_1, error_lambda_2

(0.325244665145874, 0.0905594275334805)

In [12]:
1.0, network.lambda_1.detach().item()

(1.0, 0.9967475533485413)

In [13]:
nu, torch.exp(network.lambda_2).detach().item()

(0.003183098861837907, 0.003180216265730802)

### Symbolic regression

In [None]:
grads_dict, target = network.get_gradients_dict(X_u_train[:, 0:1], X_u_train[:, 1:2])
index2features = grads_dict.keys()
print(index2features)

G = torch.cat(list(grads_dict.values()), dim=1).detach().numpy()
target = torch.squeeze(target).detach().numpy()

In [None]:
equations = pysr(G, target, niterations=100, binary_operators=["plus", "sub", "mult"], unary_operators=[], batching=True)

In [None]:
print(best(equations))
fn = best_callable(equations)

In [None]:
uf = grads_dict['uf']
u_x = grads_dict['u_x']
u_xx = grads_dict['u_xx']

In [None]:
# Exreacted equation (for further fine-tuning)
# u_t + 0.6860763*uf*u_x - 0.0020577204*u_xx