## Physics Informed Neural Networks to Approximate Solution of PDEs

In [13]:
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
import torch
from torch.utils.data import DataLoader
from Common import NeuralNet, MultiVariatePoly
import time
torch.manual_seed(128)

ModuleNotFoundError: No module named 'Common'

Consider the porous 2D incompressible steady-state Stokes equation:
$\newcommand{\Reynolds}{\text{Re}}\renewcommand{\div}{\text{div}}$
$$
 -\nabla \hat{p} + \div(\frac{2}{\Reynolds} \epsilon(\hat{u}))
 -\frac{1}{\Reynolds} \left(\frac{5L^2}{2H_t^2} + \hat{\alpha}(\rho)\right) \hat{u} = 0
$$
$$
\div(\hat{u})= 0
$$

with constant inflow and boundary conditions
$$
u_b(0,y) = \text{const} u_{in}
$$

$$
u_b(x,0)=u_b(x,1)=0, 
$$

We want to obtain an approximate solution of $u : [0,1]\times[0,1] \mapsto \mathbb{R}^2$ with physics informed neural networks (PINNs).

To do so, we approximate the underlying solution with a feedforward dense neural network with tunable parameters $\theta$:

$$
u_\theta(x,y) \approx u(x,y)
$$
Define the following residuals:

   - Interior residual given by,

   $$r_{int,0,\theta}(x, y):= -\nabla \hat{p} + \div(\frac{2}{\Reynolds} \epsilon(\hat{u}))
 -\frac{1}{\Reynolds} \left(\frac{5L^2}{2H_t^2} + \hat{\alpha}(\rho)\right) \hat{u}$$
   $$r_{int,1,\theta}(x, y):= \div(\hat{u})$$
   
   - inflow boundary residual given by,
   
        $$r_{ib,\theta}(x,y):= u_{\theta}(x,y)- u_in$$
        
   - outside boundary residual given by,
   
        $$r_{ob,\theta}(x,0):= u_{\theta}(x,0), r_{ob, \theta}(x,1) := u_{\theta}(x,1) \quad \forall x \in [-1,1].$$

and compute the corresponding loss functions:

$$
L_{int,0}(\theta) = \int_{[0,1]\times[0,1]}r_{int,0,\theta}^2(x, y) dxdy, \quad
L_{int,1}(\theta) = \int_{[0,1]\times[0,1]}r_{int,1,\theta}^2(x, y) dxdy, \quad
L_{ib}(\theta) = \int_{[0,1]}r_{ib,\theta}^2(0,y) dy, \quad
L_{ob}(\theta) = \int_{[0,1]}r_{ob,\theta}^2(x,0) dx + \int_{[0,1]}r_{ob,\theta}^2(x,1) dx
$$

The loss functions include integrals that can be approximated by suitable quadrature rule. We use quasi Monte-Carlo and accordingly define the following training sets

$$
S_{int} =\{z_n\}, \quad 1 \leq n \leq N_{int},\quad z_n = (x,y)_n \in D_T,
$$

$$
S_{ib} =\{y_n, u_b(0,y_n) \}, \quad1 \leq n \leq N_{ib}, y_n \in [0,1],
$$

$$
S_{ob, 0}=\{x_n, u_0(x_n,0)\}\quad  1 \leq n \leq N_{ob}, x_n \in [0,1].
$$

$$
S_{ob, 1}=\{x_n, u_0(x_n,1)\}\quad  1 \leq n \leq N_{ob}, x_n \in [0,1].
$$


with the training inputs points corresponding to low-discrepancy Sobol sequences.

$$
L_{int,0}(\theta) = \frac{1}{N_{int}}\sum_{i=1}^{N_{int}}r_{int,1,\theta}^2(z_n) , \quad
L_{int,1}(\theta) = \frac{1}{N_{int}}\sum_{i=1}^{N_{int}}r_{int,1,\theta}^2(z_n) , \quad
L_{ib}(\theta) = \frac{1}{N_{ib}}\sum_{i=1}^{N_{ib}}r_{sb,\theta}^2(y_n,0)\quad
L_{ob}(\theta) = \frac{1}{N_{ob}}\sum_{i=1}^{N_{ob}}r_{ob,\theta}^2(x_n,0) +\frac{1}{N_{ob}}\sum_{i=1}^{N_{ob}}r_{ob,\theta}^2(x_n,1)
$$

and solve the following minimization problem

$$
\theta^\ast = argmin_{\theta} \Big(L_{int}(\theta) + \lambda_u L_u(\theta)\Big)
$$

with

$$
L_{int} = L_{int,0} + L_{int,1}, \quad
L_u = L_{ib}(\theta) + L_{ob}(\theta)
$$



In [10]:
class Pinns:
    def __init__(self, n_int_, n_ib_, n_ob_):
        self.n_int = n_int_
        self.n_ib = n_ob_
        self.n_ob = n_ib_
        

        # Extrema of the solution domain (x,y) in [0,1]x[0,1]
        self.domain_extrema = torch.tensor([[0, 1],[0, 1]])

        # Number of space dimensions
        self.space_dimensions = 2

        # Parameter to balance role of data and PDE
        self.lambda_u = 10
        
        self.T_inlet = 293.15  # K

        self.channel_thickness = 380e-6  # m
        self.Ht = 0.5 * self.channel_thickness

        self.substrate_thickness = 150e-6  # m
        self.Hs = 0.5 * self.substrate_thickness

        self.conductivity_fluid = 0.598
        self.conductivity_substrate = 149
        self.viscosity_fluid = 1.004e-3
        self.capacity_fluid = 4180
        self.density_fluid = 998

        self.applied_pressure = 100000 * 5.0 / 4.0  # kg m^-1 s^-2
        self.L = 0.001  # m
        self.U = np.sqrt(self.applied_pressure / self.density_fluid)  # m/s


        self.nu = self.viscosity_fluid / self.density_fluid
        self.rho = self.load_rho()

        self.Re = self.L * self.U / self.nu
        #Prandtl_number = fd.Constant(capacity_fluid * viscosity_fluid / conductivity_fluid)
        self.nondim_Ht = self.Ht / self.L
        self.nondim_Hs = self.Hs / self.L
        
        self.qk = 1 #this change in each optimization steps
        self.alpha_f = 0
        self.alpha_s = 5*self.L**2/(2*self.Ht**2)

        # F Dense NN to approximate the solution of the underlying equation
        self.approximate_solution = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=4,
                                              n_hidden_layers=4,
                                              neurons=20,
                                              regularization_param=0.,
                                              regularization_exp=2.,
                                              retrain_seed=42)
        '''self.approximate_solution = MultiVariatePoly(self.domain_extrema.shape[0], 3)'''

        # Generator of Sobol sequences
        self.sobol = torch.quasirandom.SobolEngine(dimension=self.domain_extrema.shape[0])
                

        # Training sets S_ib, S_ob, S_int as torch dataloader
        self.training_set_ib, self.training_set_ob, self.training_set_int = self.assemble_datasets()
    
    def load_rho(self):
        #file_path = "/mnt/c/Users/bonvi/Documents/simulation_hack/simulation_hackaton_eth-rafael/simulation_hackaton_eth-rafael/formatted_data0/"
        file_path = "C:\\Users\\bonvi\\Documents\\simulation_hack\\simulation_hackaton_eth-rafael\\simulation_hackaton_eth-rafael\\formatted_data0\\"
        file = file_path + 'results-0.h5_rho.out'
        return np.loadtxt(file)

    ################################################################################################
    # Function to linearly transform a tensor whose value are between 0 and 1
    # to a tensor whose values are between the domain extrema
    def convert(self, tens):
        assert (tens.shape[1] == self.domain_extrema.shape[0])
        return tens * (self.domain_extrema[:, 1] - self.domain_extrema[:, 0]) + self.domain_extrema[:, 0]

    # Initial condition to solve the inflow boundary condition
    def initial_condition(self, x):
        return (1,0)

    # Exact solution NOT APPLICABLE
#     def exact_solution(self, inputs):
#         t = inputs[:, 0]
#         x = inputs[:, 1]

#         u = -torch.exp(-np.pi ** 2 * t) * torch.sin(np.pi * x)
#         return u

    ################################################################################################
    # Function returning the input-output tensor required to assemble the training set S_ib corresponding to the vertical boundary
    def add_inflow_boundary_points(self):
        y0 = self.domain_extrema[0, 0]
        input_ib = self.convert(self.soboleng.draw(self.n_ib))
        input_ib[:, 0] = torch.full(input_ib[:, 0].shape, t0)
        output_ib = self.initial_condition(input_ib[:, 1]).reshape(0, 1)

        return input_ib, output_ib

    # Function returning the input-output tensor required to assemble the training set S_ob corresponding to the spatial boundary
    def add_horizontal_boundary_points(self):
        x0 = self.domain_extrema[1, 0]
        xL = self.domain_extrema[1, 1]

        input_ob = self.convert(self.soboleng.draw(self.n_ob))

        input_ob_0 = torch.clone(input_ob)
        input_ob_0[:, 1] = torch.full(input_sb_0[:, 1].shape, x0)

        input_ob_L = torch.clone(input_ob)
        input_ob_L[:, 1] = torch.full(input_sb_L[:, 1].shape, xL)

        output_ob_0 = torch.zeros((input_ob.shape[0], 1))
        output_ob_L = torch.zeros((input_ob.shape[0], 1))

        return torch.cat([input_ob_0, input_ob_L], 0), torch.cat([output_ob_0, output_ob_L], 0)

    #  Function returning the input-output tensor required to assemble the training set S_int corresponding to the interior domain where the PDE is enforced
    def add_interior_points(self):
        input_int = self.convert(self.soboleng.draw(self.n_int))
        output_int = torch.zeros((input_int.shape[0], 1))
        return input_int, output_int

    # Function returning the training sets S_ib, S_ob, S_int as dataloader
    def assemble_datasets(self):
        input_ib, output_ib = self.add_inflow_boundary_points()  # S_ib
        input_ob, output_ob = self.add_horizontal_boundary_points()   # S_ob
        input_int, output_int = self.add_interior_points()         # S_int

        training_set_ib = DataLoader(torch.utils.data.TensorDataset(input_ib, output_ib), batch_size=self.space_dimensions*self.n_ib, shuffle=False)
        training_set_ob = DataLoader(torch.utils.data.TensorDataset(input_ob, output_ob), batch_size=2*self.n_ob, shuffle=False)
        training_set_int = DataLoader(torch.utils.data.TensorDataset(input_int, output_int), batch_size=self.n_int, shuffle=False)

        return training_set_ib, training_set_ob, training_set_int

    ################################################################################################
    # Function to compute the terms required in the definition of the INFLOW boundary residual
    def apply_inflow_condition(self, input_ib):
        u_pred_ib = self.approximate_solution(input_ib)
        return u_pred_ib

    # Function to compute the terms required in the definition of the HORIZONTAL boundary residual
    def apply_boundary_conditions(self, input_ob):
        u_pred_ob = self.approximate_solution(input_ob)

        return u_pred_ob

    def alpha(self,rho):
        return (1-rho)/(1+self.qk*rho)*(self.alpha_s - self.alpha_f)+self.alpha_f
    
    def interpolate_rho(self,input_int):
        input_int = int(input_int.detach() * 100)
        rho = np.zeros((input_int.shape[0],1))
        for i in range(input_int.shape[0]):
            x = input_int[i,0]
            y = input_int[i,1]
            rho[i] = self.rho[x,y]
        return rho
        
    def compute_pde_residual(self, input_int):
        rho = interpolate_rho(input_int)
        input_int.requires_grad = True # save gradients
        
        u = self.approximate_solution(input_int)[:,0:2]
        p = self.approximate_solution(input_int)[:,2:4] 

        # grad compute the gradient of a "SCALAR" function L with respect to some input nxm TENSOR Z=[[x1, y1],[x2,y2],[x3,y3],...,[xn,yn]], m=2
        # it returns grad_L = [[dL/dx1, dL/dy1],[dL/dx2, dL/dy2],[dL/dx3, dL/dy3],...,[dL/dxn, dL/dyn]]
        # Note: pytorch considers a tensor [u1, u2,u3, ... ,un] a vectorial function
        # whereas sum_u = u1 + u2 + u3 + u4 + ... + un as a "scalar" one

        # In our case ui = u(xi), therefore the line below returns:
        # grad_u = [[dsum_u/dx1, dsum_u/dy1],[dsum_u/dx2, dsum_u/dy2],[dsum_u/dx3, dL/dy3],...,[dsum_u/dxm, dsum_u/dyn]]
        # and dsum_u/dxi = d(u1 + u2 + u3 + u4 + ... + un)/dxi = d(u(x1) + u(x2) u3(x3) + u4(x4) + ... + u(xn))/dxi = dui/dxi
        grad_u = torch.autograd.grad(u.sum(), input_int, create_graph=True)[0]
        #print(grad_u[0].shape)
        #grad_u = grad_u[0]
        grad_u_x = grad_u[:, 0]
        grad_u_y = grad_u[:, 1]
        grad_u_xx = torch.autograd.grad(grad_u_x.sum(), input_int, create_graph=True)[0][:, 0]
        grad_u_yy = torch.autograd.grad(grad_u_y.sum(), input_int, create_graph=True)[0][:, 1]
        


        grad_p = torch.autograd.grad(p.sum(), input_int, create_graph=True)[0]
        grad_p_x = grad_p[:,0]
        grad_p_y = grad_p[:,1]
        
        nabla_p = grad_p_x + grad_p_y
        div_term = 2*self.Re*(grad_u_xx + grad_u_yy)
        drag_term = 1/self.Re * (5 * self.L**2 / (2 * self.Ht**2) + self.alpha(rho)) * u
        
        residual = -nabla_p + div_term - drag_term
        return residual.reshape(-1, )
    
    
    # Function to compute the incompressibility term residuals on the interior
    def compute_incompress_residual(self, input_int):
        input_int.requires_grad = True
        u = self.approximate_solution(input_int)

        # grad compute the gradient of a "SCALAR" function L with respect to some input nxm TENSOR Z=[[x1, y1],[x2,y2],[x3,y3],...,[xn,yn]], m=2
        # it returns grad_L = [[dL/dx1, dL/dy1],[dL/dx2, dL/dy2],[dL/dx3, dL/dy3],...,[dL/dxn, dL/dyn]]
        # Note: pytorch considers a tensor [u1, u2,u3, ... ,un] a vectorial function
        # whereas sum_u = u1 + u2 + u3 + u4 + ... + un as a "scalar" one

        # In our case ui = u(xi), therefore the line below returns:
        # grad_u = [[dsum_u/dx1, dsum_u/dy1],[dsum_u/dx2, dsum_u/dy2],[dsum_u/dx3, dL/dy3],...,[dsum_u/dxm, dsum_u/dyn]]
        # and dsum_u/dxi = d(u1 + u2 + u3 + u4 + ... + un)/dxi = d(u(x1) + u(x2) u3(x3) + u4(x4) + ... + u(xn))/dxi = dui/dxi
        
        grad_u = torch.autograd.grad(u.sum(), input_int, create_graph=True)[0]
        grad_u_x = grad_u[:, 0]
        grad_u_y = grad_u[:, 1]
        div_u = grad_u_x + grad_u_y
    
        return div_u.reshape(-1, )

    # Function to compute the total loss (weighted sum of inflow boundary loss, horizontal boundary loss and interior loss)
    def compute_loss(self, inp_train_ib, u_train_ib, inp_train_ob, u_train_ob, inp_train_int, verbose=True):
        u_pred_ib = self.apply_inflow_condition(inp_train_ib)
        u_pred_ob = self.apply_boundary_conditions(inp_train_ob)
        

        assert (u_pred_ib.shape[1] == u_train_ib.shape[1])
        assert (u_pred_ob.shape[1] == u_train_ob.shape[1])


        r_int = self.compute_pde_residual(inp_train_int)
        r_inc = self.compute_incompress_residual(inp_train_int)
        r_ib = u_train_ib - u_pred_ib
        r_ob = u_train_ob - u_pred_ob

        loss_ib = torch.mean(abs(r_ib) ** 2)
        loss_ob = torch.mean(abs(r_ob) ** 2)
        loss_int = torch.mean(abs(r_int) ** 2)
        loss_inc = torch.mean(abs(r_inc) ** 2)

        loss_u = loss_ib + loss_ob

        loss = torch.log10(self.lambda_u * (loss_ib + loss_ob) + loss_int + loss_inc)
        if verbose: print("Total loss: ", round(loss.item(), 4), "| PDE Loss: ", round(torch.log10(loss_u).item(), 4), "| Function Loss: ", round(torch.log10(loss_int + loss_inc).item(), 4))

        return loss

    ################################################################################################
    def fit(self, num_epochs, optimizer, verbose=True):
        history = list()

        # Loop over epochs
        for epoch in range(num_epochs):
            if verbose: print("################################ ", epoch, " ################################")

            for j, ((inp_train_ib, u_train_ib), (inp_train_ob, u_train_ob), (inp_train_int, u_train_int)) in enumerate(zip(self.training_set_sb, self.training_set_tb, self.training_set_int)):
                def closure():
                    optimizer.zero_grad()
                    loss = self.compute_loss(inp_train_ib, u_train_ib, inp_train_ob, u_train_ob, inp_train_int, verbose=verbose)
                    loss.backward()

                    history.append(loss.item())
                    return loss

                optimizer.step(closure=closure)

        print('Final Loss: ', history[-1])

        return history

    ################################################################################################
    def plotting(self):
        inputs = self.soboleng.draw(100000)
        inputs = self.convert(inputs)

        output = self.approximate_solution(inputs).reshape(-1, )
#         exact_output = self.exact_solution(inputs).reshape(-1, )

        fig, axs = plt.subplots(1, 2, figsize=(16, 8), dpi=150)
#         im1 = axs[0].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=exact_output.detach(), cmap="jet")
#         axs[0].set_xlabel("x")
#         axs[0].set_ylabel("t")
#         plt.colorbar(im1, ax=axs[0])
#         axs[0].grid(True, which="both", ls=":")
        im2 = axs[0].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output.detach(), cmap="jet")
        axs[0].set_xlabel("x")
        axs[0].set_ylabel("t")
        plt.colorbar(im2, ax=axs[0])
        axs[0].grid(True, which="both", ls=":")
#         axs[0].set_title("Exact Solution")
        axs[0].set_title("Approximate Solution")

        plt.show()

#         err = (torch.mean((output - exact_output) ** 2) / torch.mean(exact_output ** 2)) ** 0.5 * 100
#         print("L2 Relative Error Norm: ", err.item(), "%")


In [11]:
# Solve the heat equation:
# u_t = u_xx, (t,x) in [0, 0.1]x[-1,1]
# with zero dirichlet BC and
# u(x,0)= -sin(pi x)

n_int = 256
n_sb = 64
n_tb = 64

pinn = Pinns(n_int, n_sb, n_tb)

NameError: name 'NeuralNet' is not defined

In [None]:
# Plot the input training points
input_sb_, output_sb_ = pinn.add_spatial_boundary_points()
input_tb_, output_tb_ = pinn.add_temporal_boundary_points()
input_int_, output_int_ = pinn.add_interior_points()

plt.figure(figsize=(16, 8), dpi=150)
plt.scatter(input_sb_[:, 1].detach().numpy(), input_sb_[:, 0].detach().numpy(), label="Boundary Points")
plt.scatter(input_int_[:, 1].detach().numpy(), input_int_[:, 0].detach().numpy(), label="Interior Points")
plt.scatter(input_tb_[:, 1].detach().numpy(), input_tb_[:, 0].detach().numpy(), label="Initial Points")
plt.xlabel("x")
plt.ylabel("t")
plt.legend()
plt.show()

In [None]:
n_epochs = 1
optimizer_LBFGS = optim.LBFGS(pinn.approximate_solution.parameters(),
                              lr=float(0.5),
                              max_iter=50000,
                              max_eval=50000,
                              history_size=150,
                              line_search_fn="strong_wolfe",
                              tolerance_change=1.0 * np.finfo(float).eps)
optimizer_ADAM = optim.Adam(pinn.approximate_solution.parameters(),
                            lr=float(0.001))

In [None]:
hist = pinn.fit(num_epochs=n_epochs,
                optimizer=optimizer_LBFGS,
                verbose=True)

plt.figure(dpi=150)
plt.grid(True, which="both", ls=":")
plt.plot(np.arange(1, len(hist) + 1), hist, label="Train Loss")
plt.xscale("log")
plt.legend()

In [None]:
pinn.plotting()