># Training Burgers equations with PINN
-----
*Import Libraries*

In [1]:
import torch
import torch.autograd as autograd
from torch import Tensor
import torch.nn as nn
import torch.optim as optim 
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.ticker
import numpy as np
import time
from pyDOE import lhs 
import scipy.io

In [2]:
# set default dtype to float32
torch.set_default_dtype(torch.float)

# Pytorch random number generator
torch.manual_seed(1234)

# Random number generators in other libraries
np.random.seed(1234)


In [3]:
# CUDA support 
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

### Load datasets  
*****  
x, t, usol  
*x.shape:(256, 1)  
t.shape:(100, 1)  
usol.shape:(256, 100)  
usol[i][j] = u(x[i],t[j])
X.shape:(100, 256)  
T.shape:(100, 256)  
X: [[-1, -0.99215686,- 0.98431373 ...  0.98431373  0.99215686]; [-1, -0.99215686,- 0.98431373 ...  0.98431373  0.99215686];...]  
T: [[0,0,...,0]; [0.01,0.01,...,0.01];...]  
x[i] = X[0][i]  
t[j] = T[j][0]  
usol[i][j] = u(X[0][i],t[j][0])*

In [4]:
#load data from file
data = scipy.io.loadmat('Data/burgers_datasets.mat')
x = data['x']
t = data['t']
usol = data['usol']
# makes 2 arrays X and T such that u(X[i],T[j])=usol[i][j] are a tuple
X, T = np.meshgrid(x, t)

*X_u_test:Grid X and T expand  
lb, ub: boundary  
u_true: expand usol  
X_u_test.shape:(25600, 2)  
X_u_test:[[-1, -0.99215686,- 0.98431373 ... 0.98431373 0.99215686;-1, -0.99215686,- 0.98431373 ... 0.98431373 0.99215686...];  
      [0,0,...,0,0.01,0.01,...,0.01,...]]'  
lb:[-1, 0]  
ub:[1, 0.99]  
u_true.shape:(25600, 1)*

In [5]:
X_u_test = np.hstack((X.flatten()[:,None], T.flatten()[:,None]))
#domain bounds
lb = X_u_test[0]
ub = X_u_test[-1]

u_true = usol.flatten('F')[:,None]
print(X_u_test.shape, u_true.shape)

(25600, 2) (25600, 1)


### Training Data
*****  
*all_X_u_train: Splicing initial conditions and boundary(x, t)  
all_u_train: Splicing initial conditions and boundary(u)  
idx: choose random N_u points for training  
X_u_train: choose indices from set 'idx' (x,t)  
u_train: choose indices from set 'idx' (u) (idx point)  
X_f_train: joint initial condition boundary condition's (x,t) and points in the grid (initial point)*  
*X.shape:(100, 256)  
T.shape:(100, 256)  
usol.shape:(256, 100)  
u(X[0][i],T[j][0])=usol[i][j]  
**initial condition:**  
u(X[0][i],T[0][0])=usol[i][0]  
leftedge_x.shape:(256, 2)  
leftedge_u.shape:(256, 1)  
**Boundary Condition x=-1:**  
u(X[0][0],T[j][0])=usol[0][j] 
bottomedge_x.shape:(100, 2)  
bottomedge_u.shape:(100, 1)  
**Boundary Condition x=1:**  
u(X[0][-1],T[j][0])=usol[-1][j]  
topedge_x.shape:(100, 2)  
topedge_u.shape:(100, 1)*
  
*all_X_u_train.shape:(456, 2)  
all_u_train.shape:(456, 1)  
idx:N_u*  
**X_u_train.shape:(N_u,2)**  
**u_train.shape:(N_u,2)**  
**X_f_train.shape:(N_f+N_u,2)**

In [6]:
def trainingdata(N_u,N_f):
    #Initial Condition -1 =< x =< 1 and t=0
    leftedge_x = np.hstack((X[0,:][:,None], T[0,:][:,None]))
    leftedge_u = usol[:,0][:,None]
    
    #Boundary Condition x = -1 and 0 =< t =<1
    bottomedge_x = np.hstack((X[:,0][:,None], T[:,0][:,None]))
    bottomedge_u = usol[0,:][:,None]
    
    #Boundary Condition x = 1 and 0 =< t =<1
    topedge_x = np.hstack((X[:,-1][:,None], T[:,0][:,None]))
    topedge_u = usol[-1,:][:,None]
    
    all_X_u_train = np.vstack([leftedge_x, bottomedge_x, topedge_x])
    all_u_train = np.vstack([leftedge_u, bottomedge_u, topedge_u])
    
    #choose random N_u points for training
    idx = np.random.choice(all_X_u_train.shape[0], N_u, replace=False)
    
    #choose indices from set 'idx' (x,t)
    X_u_train = all_X_u_train[idx,:]
    u_train = all_u_train[idx,:]
    
    #Latin Hypercube sampling for collocation points
    X_f_train = lb + (ub-lb)*lhs(2,N_f) 
    X_f_train = np.vstack((X_f_train, X_u_train))
    
    return X_f_train, X_u_train, u_train
    


### Physics Informed Neural Network

In [7]:
class Sequentialmodel(nn.Module):
    
    def __init__(self,layers):
        super().__init__()
        
        #activation function
        self.activation = nn.Tanh()
        
        #loss function
        self.loss_function = nn.MSELoss(reduction = 'mean')
        
        #Initialise neural network as a list using nn.Modulelist
        self.linears = nn.ModuleList([nn.Linear(layers[i], layers[i+1]) for i in range(len(layers)-1)])
        self.iter = 0
        
        '''
        Simple Linear layers
        self.fc1 = nn.linear(2,50)
        self.fc2 = nn.linear(50,50)
        self.fc3 = nn.linear(50,50)
        self.fc4 = nn.linear(50,1)
        '''
        
        for i in range(len(layers)-1):
            nn.init.xavier_normal_(self.linears[i].weight.data, gain=1.0)
            
            # set biases to zero
            nn.init.zeros_(self.linears[i].bias.data)
    
    #forward pass
    def forward(self,x):
        
        if torch.is_tensor(x) != True:
            x = torch.from_numpy(x)
        
        u_b = torch.from_numpy(ub).float().to(device)
        l_b = torch.from_numpy(lb).float().to(device)
        
        #preprocessing input
        x = (x - l_b)/(u_b - l_b)
        
        #convert to float
        a = x.float()
        
        '''
        a = self.activation(self.fc1(a))
        a = self.activation(self.fc2(a))
        a = self.activation(self.fc3(a))
        a = self.fc4(a)
        '''
        
        for i in range(len(layers) - 2):
            z = self.linears[i](a)
            a = self.activation(z)
        a = self.linears[-1](a)
        return a
    
    def loss_BC(self,x,y):
        
        loss_u = self.loss_function(self.forward(x), y)
        return loss_u
    
   
    def loss_PDE(self, x_to_train_f):
        
        nu = 0.01/np.pi
                
        x_1_f = x_to_train_f[:,[0]]
        x_2_f = x_to_train_f[:,[1]]
                        
        g = x_to_train_f.clone()
                        
        g.requires_grad = True
        
        u = self.forward(g)
                
        u_x_t = autograd.grad(u,g,torch.ones_like(u).to(device), retain_graph=True, create_graph=True)[0]
                                
        u_xx_tt = autograd.grad(u_x_t,g,torch.ones_like(u_x_t).to(device), create_graph=True)[0]
                                                            
        u_x = u_x_t[:,[0]]
        
        u_t = u_x_t[:,[1]]
        
        u_xx = u_xx_tt[:,[0]]
                                        
        f = u_t + (self.forward(g))*(u_x) - (nu)*u_xx 
        
        loss_f = self.loss_function(f,f_hat)
        
        return loss_f
    
    def loss(self, x, y, x_to_train_f):
        
        loss_u = self.loss_BC(x, y)
        loss_f = self.loss_PDE(x_to_train_f)
        
        loss_val = loss_u + loss_f
        
        return loss_val
    
    def closure(self):
        
        optimizer.zero_grad()
        
        loss = self.loss(X_u_train, u_train, X_f_train)
        
        loss.backward()
        
        self.iter += 1
        
        if self.iter % 100 == 0:
            
            print(loss)
            
        return loss
    
    def test(self):
        
        u_pred = self.forward(X_u_test_tensor)
        
        error_vec = torch.linalg.norm((u-u_pred),2)/torch.linalg.norm(u,2) 
        
        u_pred = u_pred.cpu().detach().numpy()
        
        u_pred = np.reshape(u_pred, (256, 100), order='F')
        
        return error_vec, u_pred

### Train

In [8]:
#Generate Training data

#Total number of data points for 'u'
N_u = 100
N_f = 1000
X_f_train_np_array, X_u_train_np_array, u_train_np_array = trainingdata(N_u,N_f)

#Convert to tensor and send to GPU
X_f_train = torch.from_numpy(X_f_train_np_array).float().to(device)
X_u_train = torch.from_numpy(X_u_train_np_array).float().to(device)
u_train = torch.from_numpy(u_train_np_array).float().to(device)
X_u_test_tensor = torch.from_numpy(X_u_test).float().to(device)
u = torch.from_numpy(u_true).float().to(device)
f_hat = torch.zeros(X_f_train.shape[0],1).to(device)

layers = np.array([2,20,20,20,20,20,20,20,20,1]) #8 hidden layers

PINN = Sequentialmodel(layers)
       
PINN.to(device)

params = list(PINN.parameters())

'''Optimization'''

'L-BFGS Optimizer'
optimizer = torch.optim.LBFGS(PINN.parameters(), lr=0.1, 
                              max_iter = 10000, 
                              max_eval = None, 
                              tolerance_grad = 1e-05, 
                              tolerance_change = 1e-09, 
                              history_size = 100, 
                              line_search_fn = 'strong_wolfe')

start_time = time.time()

optimizer.step(PINN.closure)

elapsed = time.time() - start_time                
print('Training time: %.2f' % (elapsed))


''' Model Accuracy ''' 
error_vec, u_pred = PINN.test()

print(u_pred)


tensor(0.1028, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0526, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0277, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0168, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0099, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0067, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0055, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0045, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0039, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0033, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0029, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0028, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0027, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0025, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0024, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0023, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0022, device='cuda:0', grad_fn=<AddBackward0>)
tensor(0.0020, device='cuda:0', grad_fn=<AddBack