# Physics-Informed Integral Network for Laplace Equation
(Please reference to our paper **Physics-Informed Boundary Integral Networks (PIBI-Nets): a Data-Driven Approach for Solving Partial Differential Equations**.)

---
## Problem setup
 
### Laplace equation 
$$ \Delta u(x) := \frac{\partial^2 u}{\partial x_1^2} + \frac{\partial^2 u}{\partial x_2^2} = 0 $$
 
$$x:=(x_1, x_2)\, \in \Omega \subseteq \mathbb{R}^2$$
 
For sake of simplicity, we choose $\Omega$ to be a circle.
 
### Single and double layer potential
For a point $x\in\Omega$ inside the domain we have 
$$u(x) = \left( \int_{\partial \Omega} G(x,y)\cdot \frac{\partial g}{\partial n}(y) dS_y \right) - \left(\int_{\partial \Omega} \frac{\partial G}{\partial n}(x,y)\cdot g(y) dS_y \right).$$
But for a boundary point $x\in\Gamma:=\partial\Omega$ we have get a jump
$$
u(x) = \left( \int_{\partial \Omega} G(x,y)\cdot \frac{\partial g}{\partial n}(y) dS_y \right) - \left(\int_{\partial \Omega} \frac{\partial G}{\partial n}(x,y)\cdot g(y) dS_y \right) +
\frac{1}{2}g(x).
$$
 
 
 
### Uniformal sampled datapoints based on FD solution given by Dirichlet boundary conditions
Let ${\large\{}u_i(x){\large\}}_{i=0}^{i=N}$ be the measurements constructed by a finite difference solution of the Laplace equation based on Dirichlet boundary conditions

## Load packages

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm  # colormaps
from matplotlib.font_manager import FontProperties
import pandas as pd
import os

In [None]:
matplotlib.rcParams['text.usetex'] = True
# font properties for plot titles
font_axis = FontProperties()
font_axis.set_family('serif')
font_axis.set_name('Times New Roman')
font_axis.set_size(115)  # Set the font size to match LaTeX, e.g., 12pt
# font properties for 3d plot titles
font_axis_3d = FontProperties()
font_axis_3d.set_family('serif')
font_axis_3d.set_name('Times New Roman')
font_axis_3d.set_size(90)  # Set the font size to match LaTeX, e.g., 12pt

## Dataset setup

In [None]:
# read data from CSV file into a pandas DataFrame and save as Torch tensor
def read_data(csv_file):
    """
    read csv file and store data in x_data, u_data
    
    Args:
    - csv_file
    
    Returns:
    - x_data: tensor of shape (N,2) containing N positions x=(x1,x2)
    - u_data: tensor of shape (N,1) containing u values to corresponding x
    
    """

    # read data from CSV file into a pandas DataFrame and save as Torch tensor
    data_pd = pd.read_csv(csv_file)
    # convert pandas DataFrame to PyTorch tensor
    data_tensor = torch.tensor(data_pd.values, dtype=torch.float32)  # x1, x2, u
    x_data = data_tensor[:, 0:2]  # x1, x2, (N,2)
    u_data = data_tensor[:, 2].view(-1, 1)  # u, (N,1)
    return x_data, u_data


x_data, u_data = read_data('dataset_laplace.csv')

## Physics initialisation

In [None]:
# outer normal directional derivative on boundary of Omega (as a circle)
def outer_normal(x, radius, middle):
    """
    computes outer normal vector normals that is orthogonal to surface of Omega
    
    Args:
    - x: tensor of shape (N,2) containing N positions (x1, x2) on boundary of Omega
    
    Returns:
    - normals: tensor of shape (N,2) containing the outer normal vector on boundary of Omega at each position in x
    
    """

    N = x.shape[0]  # number of points
    xc = x - middle  # cetered
    normals = xc / radius

    return normals


# Physics initialisation for 2D Laplace equation Δ𝑢(𝑥):= ∂^2(𝑢)/∂(𝑥_1)^2 + ∂^2(𝑢)/∂(𝑥_2)^2 = 0
# fundamental solution of Laplace equation in 2D
def fundamental_solution(x, y):
    """
    computes the fundamental solution of the Laplace equation in 2D at N positions in  
    two points x and y in R^2, given by
    Phi_0(x, y) = -1/(2π) * log(||x - y||)
    
    Args:
    - x: torch.tensor of position x on Domain Omega, shape (N,2)
    - y: torch.tensor of position y on Domain Omega, shape (N,2) 
    
    Returns:
    - Phi_0: torch.tensor of shape (N,1), value of Laplace fundamental solution over x and y
    
    """

    distance = torch.norm(x - y, p=2, dim=1).view(-1, 1) + 1e-8
    # check if distance is >0, otherwise raise an error because ln(0) = -infinity
    # if (distance == 0).any():
    #     raise ValueError("Points must be distinct")  # produce error

    Phi_0 = -1 / (2 * np.pi) * torch.log(distance)
    return Phi_0


# gradient of fundamental solution with respect to y
# grad Phi_0(x,y) = 1/(2π) * (x - y)/||x - y||²
def gradient_fundamental(x, y):
    """
    computes the gradient *with respect to x* of the Laplace fundamental solution in 2D at N positions x and y, 
    given by ∇y Phi_0(x,y) = - 1/(2π) * (x - y)/||x - y||²
    
    Args:
    - x: torch.tensor of position x on Domain Omega, shape (N,2)
    - y: torch.tensor of position y on Domain Omega, shape (N,2) 
    
    Returns:
    - grad_Phi_0: torch.tensor of shape (N,2), gradient of Laplace fundamental solution over x and y
    
    """
    distance = torch.norm(x - y + 1e-8, p=2, dim=1).view(-1, 1)

    # check if distance is >0, otherwise raise an error
    # if (distance == 0).any():
    #     raise ValueError("Points must be distinct")  # produce error
    grad_Phi_0 = (1 / (2 * np.pi)) * ((x - y) / distance ** 2)
    return grad_Phi_0

## Collocation and integration points

In [None]:
def collocation_integration_points(n, radius, middle):
    # add random shift
    angle = (2 * np.pi * torch.rand(1)).item()

    # define angles
    n = n + 1
    theta = np.linspace(0, 2 * np.pi, n) + angle
    theta = theta[1::]  # to remove one double angle at 0 and 2*pi
    # split theta into two sets
    theta_coll = theta[::2]  # select every second point
    theta_int = theta[1::2]  # select every other points

    # define collocation points
    x1_coll = middle[:, 0] + radius * np.cos(theta_coll).reshape(int(n / 2), 1)
    x2_coll = middle[:, 1] + radius * np.sin(theta_coll).reshape(int(n / 2), 1)
    x_collocation = np.concatenate((x1_coll, x2_coll), axis=1)
    x_collocation = torch.tensor(x_collocation, dtype=torch.float32).requires_grad_(True)

    # define integration points
    x1_int = middle[:, 0] + radius * np.cos(theta_int).reshape(int(n / 2), 1)
    x2_int = middle[:, 1] + radius * np.sin(theta_int).reshape(int(n / 2), 1)
    x_integration = np.concatenate((x1_int, x2_int), axis=1)
    x_integration = torch.tensor(x_integration, dtype=torch.float32).requires_grad_(True)

    return x_collocation, x_integration  # each of shape (n/2, 2)

In [None]:
n = 20
radius = 1.5
middle = torch.tensor((0, 0)).view(1, 2)
x_collocation, x_integration = collocation_integration_points(n, radius, middle)
x_coll_np = x_collocation.detach().numpy()
x_int_np = x_integration.detach().numpy()

# plot collocation and integration points
plt.scatter(x_coll_np[:, 0], x_coll_np[:, 1], color='red', label='Collocation Points')
plt.scatter(x_int_np[:, 0], x_int_np[:, 1], color='blue', label='Integration Points')
plt.axis('equal')
plt.show()

## FD mesh

In [None]:
# read data from CSV file into a pandas DataFrame and save as Torch tensor
x_num = pd.read_csv('X_mesh.csv')
# convert pandas DataFrame to PyTorch tensor
x_num = torch.tensor(x_num.values, dtype=torch.float32)  # x1, x2

# read data from CSV file into a pandas DataFrame and save as Torch tensor
u_num = pd.read_csv('u_num.csv')
# convert pandas DataFrame to PyTorch tensor
u_num = torch.tensor(u_num.values, dtype=torch.float32)  # u

N = u_num.shape
N

## Network setup

In [None]:
# fully-connected neural network setup with PyTorch
class FCNN(nn.Module):
    n_integration_points = 1_000

    def __init__(self, N_input, N_output, N_hidden, N_layers):
        """
        class implementation of a fully-connected neural network with PyTorch given by Args
        
        Args:
        - N_input: integer, number of input dimension, here N_input = 2 given by x=(x1, x2)
        - N_output: integer, number of output dimession, here N_output = 1 given by u(x)
        - N_hidden: integer, depth of one hidden layer
        - N_layers: integer, number of hidden layers in the network
        
        Methods:
        - forward: forward pass of neural network
        
        """
        super().__init__()
        activation = nn.ELU

        # input / start layer
        self.fc_start = nn.Sequential(*[
            nn.Linear(N_input, N_hidden),
            activation()
        ])
        # hidden layers
        self.fc_hidden = nn.Sequential(*[
            nn.Sequential(*[
                nn.Linear(N_hidden, N_hidden),
                activation()
            ])
            for _ in range(N_layers - 1)  # -1 since first layer already defined before for-loop
        ])
        # output / end layer
        self.fc_end = nn.Linear(N_hidden, N_output)

        self.x_coll, self.x_int = collocation_integration_points(self.n_integration_points, radius, middle)
        self.vmapped_potential = torch.vmap(self.calc_potentials, randomness='same')
        self.vmapped_potential_xcol = torch.vmap(self.calc_potentials_xcoll, randomness='same')

    def resample(self):
        self.x_coll, self.x_int = collocation_integration_points(self.n_integration_points, radius, middle)
        self.vmapped_potential = torch.vmap(self.calc_potentials, randomness='same')
        self.vmapped_potential_xcol = torch.vmap(self.calc_potentials_xcoll, randomness='same')

    def predict_u_inside(self, x):
        double_layer, single_layer = self.vmapped_potential(x)
        u_int_data = (single_layer.squeeze() - double_layer.squeeze()).view(-1, 1)
        return u_int_data

    def predict_u_boundary(self, x):
        double_layer, single_layer = self.vmapped_potential_xcol(x)
        u_int_boundary = (single_layer.squeeze() - double_layer.squeeze()).view(-1, 1)
        return u_int_boundary

    def forward(self, x):
        """
        forward pass through network building blocks given in this class above
        
        Args:
        - x: tensor of shape (N, 2) containing N positions (x1, x2)

        Returns:
        - u: tensor of shape (N,1) containing the network solution u at position x
        
        """
        # forward network
        x = self.fc_start(x)  # input layer
        x = self.fc_hidden(x)  # hidden layer(s)
        u = self.fc_end(x)  # output layer

        return u

    def calc_potentials(self, x):
        """
        Inner domain points
        :param x:
        :return:
        """
        # single_layer
        # define collocation and integration points
        y = self.x_int
        # normal vectors
        normal_y = outer_normal(y, radius, middle)
        # boundary density
        g_y = self(y)
        G = fundamental_solution(x, y)
        grad_ones = torch.ones_like(g_y)
        dg_dy = torch.autograd.grad(g_y, y, grad_outputs=grad_ones, create_graph=True)[0]
        dg_dn = torch.sum(dg_dy * normal_y, dim=1).view(-1, 1)
        single_layer = torch.mean(G * dg_dn)
        ################
        # double_layer
        dG_dy = gradient_fundamental(x, y)
        dG_dn = torch.sum(dG_dy * normal_y, dim=1).view(-1, 1)
        double_layer = torch.mean(dG_dn * g_y)
        return double_layer.squeeze(), single_layer.squeeze()

    def calc_potentials_xcoll(self, x):
        """
        for boundary
        :param x:
        :return:
        """
        y = self.x_int
        g_y = self(y)

        G = fundamental_solution(x, y)
        grad_ones = torch.ones_like(g_y)
        dg_dy = torch.autograd.grad(g_y, y, grad_outputs=grad_ones, create_graph=True)[0]
        normal_y = outer_normal(y, radius, middle)
        dg_dn = torch.sum(dg_dy * normal_y, dim=1).view(-1, 1)
        single_layer = torch.mean(G * dg_dn)
        ################
        # double_layer
        dG_dy = gradient_fundamental(x, y)
        dG_dn = torch.sum(dG_dy * normal_y, dim=1).view(-1, 1)
        double_layer = torch.mean(dG_dn * g_y) - 0.5 * pibi(x)
        return double_layer.squeeze(), single_layer.squeeze()

In [None]:
def initialise_pibi(lr):
    # define number of neurons in each layer type
    N_input = 2
    N_output = 1
    N_hidden = 512
    N_layers = 3  # number of hidden layers

    # define a neural network to train
    pibi = FCNN(N_input, N_output, N_hidden, N_layers)

    # optimizer
    optimiser = torch.optim.Adam(pibi.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimiser, factor=0.5, eps=1e-12, verbose=True)

    # loss function
    mse_loss = torch.nn.MSELoss()  # Mean squared error

    return pibi, optimiser, scheduler, mse_loss

## Train PIBI and visualise trained solution

In [None]:
def train_pibi(iterations, lambda_physics_data, lambda_physics_coll, n, pibi, optimiser, scheduler, mse_loss):
    # n: number of collocation and integration points
    # store loss values
    loss_values = []
    # x_coll, x_int = collocation_integration_points(n, radius, middle)
    # vmapped_potential = torch.vmap(pibi.calc_potentials, randomness='same')
    # vmapped_potential_xcol = torch.vmap(pibi.calc_potentials_xcoll, randomness='same')

    for epoch in range(iterations):
        # Clear gradients before forward and backward pass for each batch to avoid accumulation of gradients
        optimiser.zero_grad()
        pibi.resample()
        x_coll = pibi.x_coll

        u_int_data = pibi.predict_u_inside(x_data)
        loss_int_data = mse_loss(u_int_data, u_data)

        u_int_coll = pibi.predict_u_boundary(x_coll)
        loss_int_coll = mse_loss(u_int_coll, torch.zeros_like(u_int_coll))

        # backpropagate weighted joint loss, take optimiser step
        loss = lambda_physics_data * loss_int_data + lambda_physics_coll * loss_int_coll
        loss_values.append(loss.item())  # store loss value for convergence plot
        loss.backward()
        optimiser.step()
        scheduler.step(loss)

        # plot result as training progresses on test set
        if epoch % 2 == 0:
            print('epoch:', epoch, ', data:', loss_int_data.data, ', coll:', loss_int_coll.data)

            if epoch == iterations - 1:
                # save model
                torch.save(pibi.state_dict(), "pibi_model.pt")

                # solve integral for plot points
                _, x_int = collocation_integration_points(n, radius, middle)
                u_pibi = pibi.predict_u_inside(x_num)
                x1 = (x_num[:, 0]).reshape((101, 101))  # (test_size,test_size)
                x2 = (x_num[:, 1]).reshape((101, 101))  # (test_size,test_size)
                u_nn = u_pibi.reshape(x1.shape)  # nn output, (test_size,test_size)
                u_nn = u_nn.detach()

                # evaluation plot in 3D
                fig = plt.figure(figsize=(15, 15))
                ax = fig.add_subplot(111, projection='3d', computed_zorder=False)
                surf = ax.plot_surface(x1, x2, u_nn, cmap=cm.jet, zorder=1, alpha=1)
                ax.scatter(x_data[:, 0], x_data[:, 1], u_data, c='black', marker='o', s=10 ** 2, alpha=1, zorder=2)
                surf.set_clim(vmin=-1, vmax=1)  # limits to colorbar
                # fig.colorbar(surf, shrink=0.45, aspect=10, pad=0.2)
                fig.axes[0].tick_params(axis="both", labelsize=20)
                # fig.axes[1].tick_params(axis="both", labelsize=45)
                # axis ticks
                ax.tick_params(axis="both", labelsize=60, pad=20)
                major_ticks = [-1, -0.5, 0, 0.5, 1]
                ax.set_xticks(major_ticks)
                ax.set_yticks(major_ticks)
                ax.set_zticks(major_ticks)
                # tick colour
                ax.tick_params(axis='x', colors='grey')
                ax.tick_params(axis='y', colors='grey')
                ax.tick_params(axis='z', colors='grey')
                # labels
                ax.set_xlabel('$x_1$', fontproperties=font_axis_3d, labelpad=55)
                ax.set_ylabel('$x_2$', fontproperties=font_axis_3d, labelpad=55)
                ax.set_zlabel('$u$', fontproperties=font_axis_3d, labelpad=55)
                # plt.title(f"PIBI-Net at epoch {epoch}", size=20)
                # Save the figure as a JPEG image
                plt.savefig('piibi_3d.jpg', dpi=500, format='jpeg', bbox_inches='tight')
                plt.show()

                # evaluation plot in 2d
                fig, ax = plt.subplots(figsize=(15, 15))
                img = ax.imshow(u_nn, cmap='jet', origin='lower',
                                extent=[torch.min(x1).item(), torch.max(x1).item(),
                                        torch.min(x2).item(), torch.max(x2).item()],
                                vmin=-1, vmax=1)
                ax.scatter(x_data[:, 0], x_data[:, 1], c='black', marker='o', s=10 ** 2, alpha=1, zorder=4)
                # fig.colorbar(img, ax=ax, shrink=0.55, aspect=10, pad=0.2)
                # tick sizes
                fig.axes[0].tick_params(axis="both", labelsize=20)
                # fig.axes[1].tick_params(axis="both", labelsize=45)
                # axis ticks
                ax.tick_params(axis="both", labelsize=70, pad=5)
                major_ticks = [-1, -0.5, 0, 0.5, 1]
                ax.set_xticks(major_ticks)
                ax.set_yticks(major_ticks)
                # tick colour
                ax.tick_params(axis='x', colors='grey')
                ax.tick_params(axis='y', colors='grey')
                # labels
                ax.set_xlabel('$x_1$', fontproperties=font_axis, labelpad=0)
                ax.set_ylabel('$x_2$', fontproperties=font_axis, labelpad=-20)
                # plt.title(f"PIIBI-Net at epoch {epoch} (Top view)", size=20)
                # Save the figure as a JPEG image 
                plt.savefig('pibi_2d.jpg', dpi=500, format='jpeg', bbox_inches='tight')
                plt.show()

                # absolute error plot
                u_num_matrix = u_num.reshape(u_nn.shape)
                u_error = torch.abs(u_nn - u_num_matrix)
                fig, ax = plt.subplots(figsize=(15, 15))
                img = ax.imshow(u_error, cmap='jet_r', origin='lower',
                                extent=[torch.min(x1).item(), torch.max(x1).item(),
                                        torch.min(x2).item(), torch.max(x2).item()],
                                vmin=0, vmax=0.7)
                ax.scatter(x_data[:, 0], x_data[:, 1], c='black', marker='o', s=10 ** 2, alpha=1, zorder=4)
                # fig.colorbar(img, ax=ax, shrink=0.55, aspect=10, pad=0.2)
                # tick sizes
                fig.axes[0].tick_params(axis="both", labelsize=20)
                # fig.axes[1].tick_params(axis="both", labelsize=45)
                # axis ticks
                ax.tick_params(axis="both", labelsize=70, pad=5)
                major_ticks = [-1, -0.5, 0, 0.5, 1]
                ax.set_xticks(major_ticks)
                ax.set_yticks(major_ticks)
                # tick colour
                ax.tick_params(axis='x', colors='grey')
                ax.tick_params(axis='y', colors='grey')
                # labels
                ax.set_xlabel('$x_1$', fontproperties=font_axis, labelpad=0)
                ax.set_ylabel('$x_2$', fontproperties=font_axis, labelpad=-20)
                # plt.title('Absolute error (Top View)', size=20)
                plt.savefig('pibi_error.jpg', dpi=500, format='jpeg', bbox_inches='tight')
                plt.show()
                u_vec = u_error.reshape(N, 1)
                print('min', torch.min(u_vec), 'max', torch.max(u_vec), 'mean', torch.mean(u_vec), 'std',
                      torch.std(u_vec))

    return loss_values, pibi


In [None]:
# pibi initialisation
lr = 1e-3  # learning rate
pibi, optimiser, scheduler, mse_loss = initialise_pibi(lr)

iterations = 31  # 5_000
lambda_physics_data = 1
lambda_physics_coll = 1e-6  # weighting of collocation loss
n = 1000  # number of collocation points
radius = 1.5
middle = torch.tensor((0, 0)).view(1, 2)
loss_values, pibi = train_pibi(iterations, lambda_physics_data, lambda_physics_coll, n, pibi, optimiser, scheduler,
                               mse_loss)

## Visualize convergence of loss function

In [None]:
epochs = list(range(1, iterations + 1))
fig, ax = plt.subplots()
ax.plot(epochs, loss_values)
ax.set(xlabel='epochs / iterations',
       ylabel='loss',
       title='Convergence')
# plt.savefig('pibi_loss.jpg', dpi=1000, format='jpeg')
plt.show()