# Physics-Informed Neural Network for 1D Diffusion Equation


This notebook will be using a Physics-Informed Neural Network to solve the 1-D Diffusion/Heat Equation. Suppose we have the Diffusion Equation:

$$ \frac{\partial{U}}{\partial{t}} = D\frac{\partial^{2}{U}}{\partial{x^{2}}},$$

with the Dirichlet boundary conditions:

$$ U\big|_{x = -L,t } = 0,$$
$$ U\big|_{x = L,t } = 0,$$

and the following initial condition:

$$ U\big|_{x,t =0} = \sin(\pi x).$$

To simplify the system, we shall set $D=1$ and $L=1$. Additionally, we shall choose the domains of our system such that we know the analytical solution to the PDE. As such, our domains are:

$$x \in (-1,1)$$

$$t \in (0,1)$$

Because, of these parameters, the exact solution to the PDE is $U = e^{-t} \sin(\pi x)$.

First off, let's import our needed Python packages:

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import time, logging, sys, os
import torch.nn as nn
import torch.autograd as autograd
from PIL import Image
import imageio


# Set default dtype to float32
torch.set_default_dtype(torch.float)

# Set the device, if CUDA is installed we use the GPU, otherwise we use the CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
if device == "cuda":
    print(torch.cuda.get_device_name())

We can ignore the following lines of code for now; these are functions to plot the results.

In [None]:
def plot_mesh(x, t, y, folder="", name="", show=None, error=0, loss=0, iter=0):
    """
    This function plots U(x,t) at every iteration to show the solution to the PDE. It will also plot the NN Loss and relative error determined by the L^2 norm.
    """
    X, T = x.cpu(), t.cpu()
    F_xt = y.cpu()

    fig, ax = plt.subplots(1, 1, dpi=200)
    cp = ax.contourf(X, T, F_xt, 20, cmap="rainbow")
    fig.colorbar(cp)  # Add a colorbar to a plot
    ax.set_xlabel("x")
    # ax.set_ylabel("x2")
    ax.set_ylabel("t")
    if error > 0 and loss != 0:
        ax.set_title(f"Iteration: {iter}, Loss: {loss:.5f} \n  Relative error: {error:.5f}")
    elif error > 0:
        ax.set_title(f"Iteration: {iter} \n Relative error: {error:.5f}")
    elif loss != 0:
        ax.set_title(f"Iteration: {iter}, Loss: {loss:.5f}")
    else:
        ax.set_title(f"Iteration: {iter}")
    plt.savefig(f"{folder}/contour_{name}.png")  # Save the first image as an image file
    plt.close()

    plt.figure(dpi=200)
    ax = plt.axes(projection="3d")
    ax.plot_surface(X.numpy(), T.numpy(), F_xt.numpy(), cmap="rainbow")
    ax.set_xlabel("x")
    ax.set_ylabel("t")
    
    if error > 0 and loss != 0:
        ax.set_title(f"Iteration: {iter}, Loss: {loss:.5f} \n  Relative error: {error:.5f}")
    elif error > 0:
        ax.set_title(f"Iteration: {iter} \n Relative error: {error:.5f}")
    elif loss != 0:
        ax.set_title(f"Iteration: {iter}, Loss: {loss:.5f}")
    else:
        ax.set_title(f"Iteration: {iter}")
        
    ax.set_zlim3d(-1, 1) 

        
    plt.savefig(f"{folder}/{name}.png")  # Save the first image as an image file
    if show:
        plt.show()
    plt.close()

In [None]:
def generate_gif(test_folder, count, remove_imgs=True):
    """
    This function generates the gif that plots U(x,t) over the course of the training/testing stage.
    """
    n = count  # The number of pictures to be stitched into a video
    fps = n // 5  # The frame rate of the generated video
    imgs = []
    for i in range(1, n, 1):
        img_file = f"{test_folder}/approx_{i}.png"
        img = imageio.imread(img_file)
        img = Image.fromarray(img)  # .resize((size_width, size_height))
        imgs.append(img)
        os.remove(img_file) if remove_imgs else None
    filename1 = os.path.join(test_folder, "0_video_u.mp4")
    filename2 = os.path.join(test_folder, "0_video_u.gif")
    imageio.mimwrite(filename1, imgs, fps=fps)  #
    imageio.mimsave(filename2, imgs, format='GIF')

    imgs = []
    for i in range(1, n, 1):
        img_file = f"{test_folder}/contour_approx_{i}.png"
        img = imageio.imread(img_file)
        img = Image.fromarray(img)  # .resize((size_width, size_height))
        imgs.append(img)
        os.remove(img_file) if remove_imgs else None
    filename = os.path.join(test_folder, "0_video_u_contour.mp4")
    imageio.mimwrite(filename, imgs, fps=fps)

The following set of functions will generate the PyTorch tensors needed for the neural-network training

In [None]:
def make_pde_data(x_min,x_max,t_min,t_max,N_train_x,N_train_t):
    """
    This function creates the PyTorch arrays needed for the PINN. The arrays contain all possible combinations of x,t arranged into a 2D array.
    """
    # Determine why omit first and last values #
    x = torch.linspace(x_min, x_max, N_train_x)
    t = torch.linspace(t_min, t_max, N_train_t)
    X, T = torch.meshgrid(x, t)

    # x values are up/down column 1, t values are up/down column 2
    # Note: in order to stack torch tensors, tensors must be 2D unlike numpy arrays
    X_pde = torch.hstack((X.transpose(1, 0).flatten().view(-1, 1), T.transpose(1, 0).flatten().view(-1, 1)))
    return X_pde

Note that we are arranging our meshgrids of X and T as such:


<div>
<img src="./images/grid_schematic.png" width="700"/>
</div>

In [None]:
def make_ic_bc_data(x_min,x_max,t_min,t_max,N_bc):
    """
    This function creates the PyTorch tensor that are modified for the initial condition
    and the boundary conditions of the PDE.
    """
    x_bc = torch.linspace(x_min, x_max, N_bc)
    t_bc = torch.linspace(t_min, t_max, N_bc)
    X, T = torch.meshgrid(x_bc, t_bc)

    # Make 2D array for initial condition- All x values, t = 0
    # Note: X tensor is array to show all values of x and t, Y tensor is U in Heat Equation
    left_X = torch.hstack((X[:, 0][:, None], T[:, 0][:, None]))
    left_Y = torch.sin(np.pi * left_X[:, 0]).unsqueeze(1)

    #Set boundary conditions
    #Set left boundary condition for x=x_min at all T
    bottom_X = torch.zeros(size = (x_bc.shape[0],2))
    bottom_X[:,0] = x_bc.min()
    bottom_X[:,1] = t_bc
    # bottom_X = torch.hstack((X[0, :][:, None], T[0, :][:, None]))
    bottom_Y = torch.zeros(bottom_X.shape[0], 1)
    
    # Set right boundary condition for x=x_max at all T
    top_X = torch.zeros(size = (x_bc.shape[0],2))
    top_X[:,0] = x_bc.max()
    top_X[:,1] = t_bc
    # top_X = torch.hstack((X[-1, :][:, None], T[-1, :][:, None]))

    top_Y = torch.zeros(top_X.shape[0], 1)

    # Stack all tensors to include boundary conditions and initial conditions
    X_train = torch.vstack([left_X, bottom_X, top_X])
    Y_train = torch.vstack([left_Y, bottom_Y, top_Y])

    # Select random values as part of training process
    idx = np.random.choice(X_train.shape[0], N_bc, replace=False)
    X_train_BC = X_train[idx, :]
    Y_train_BC = Y_train[idx, :]
    return X_train_BC, Y_train_BC

In [None]:
def gen_test_dataset(x_min,x_max,t_min,t_max,N_test_x,N_test_t):
    """
    This function generates the PyTorch tensors that contain the actual analytical solution to the PDE.
    """
    x = torch.linspace(x_min, x_max, N_test_x).view(-1, 1)
    # x = torch.reshape(x,(x.shape[0],1))
    t = torch.linspace(t_min, t_max, N_test_t).view(-1, 1)
    # t = torch.reshape(t,(t.shape[0],1))

    # Create the mesh
    X, T = torch.meshgrid(x.squeeze(1),t.squeeze(1))
    
    # Evaluate solution to PDE
    y_real = torch.exp(-T) * (torch.sin(np.pi * X))  # Example 1
    # Prepare Data
    # Transform the mesh into a 2-column vector to be consistent with other arrays
    x_test = torch.hstack((X.transpose(1, 0).flatten().view(-1, 1), T.transpose(1, 0).flatten().view(-1, 1)))
    y_test = y_real.view(-1,1).transpose(1, 0).flatten().view(-1, 1)  # Colum major Flatten (so we transpose it)
    return y_real, x_test, y_test, X, T

The following class contains all of the methods needed for the neural network. There are several features of this neural network:

1) Xavier Normal Initialization: This creates a uniform variance across each layer to prevent the gradient descent method from blowing up.
2) Loss functions: This calculates the: 1) difference between predicted values of $U(x,t)$ and the solution to the PDE; 2) and the difference actual predicted values to the boundary conditions and their solution and 3) difference between the predicted initial condition and the actual initial condition. The penalty associated with these differences is the 'loss' due to the entire PDE solution. The neural network aims to minimize this loss and ensure the PDE, boundary condition, and initial condition are satisfied.
3) The $L^{2}$ norm is calculated to predict the error of the system.
4) ADAM optimization; other optimization methods can be used in lieu of this

In [None]:
class fcn_nn(nn.Module):
    def __init__(self,layers, domain):
        nn.Module.__init__(self)
        self.layers = layers
        self.domain = domain

        # Activation function to normalize outputs of NN
        self.activation = nn.Tanh()

        # Make loss function a mean squared error
        # Note: first argument of loss function is actual data, second argument is empty array?
        self.loss_function = nn.MSELoss(reduction="mean")

        """Initialise neural network as a list using nn.Modulelist"""
        #Apply linear transformation to data #
        self.transf_data = [nn.Linear(self.layers[i], self.layers[i + 1]) for i in range(len(self.layers) - 1)]
        self.linear_mod = nn.ModuleList(self.transf_data)
        self.optmz = 0 # For optimization

        "Xavier Normal Initialization: Uniform variance across layers to prevent gradient descent from blowing up"
        for i in range(len(self.layers) - 1):
            nn.init.xavier_normal_(self.linear_mod[i].weight.data, gain=1.0)
            # set biases to zero
            nn.init.zeros_(self.linear_mod[i].bias.data)

    def forward(self,x):
        """
        Foward pass function 
        """
        if torch.is_tensor(x) != True:
            x = torch.from_numpy(x)
        a = x.float()
        for i in range(len(self.layers) - 2):
            z = self.linear_mod[i](a)
            a = self.activation(z)
        a = self.linear_mod[-1](a)
        return a
    
    "Loss Functions"


    def lossBC(self, x_BC, y_BC):
        """
        Loss function for Boundary condition
        """
        loss_BC = self.loss_function(self.forward(x_BC), y_BC)
        return loss_BC
    

    def lossPDE(self,x_PDE):
        """
        Loss function for actual PDE solving
        """
        # x_min, x_max, t_min, t_max = self.domain
        x = x_PDE.clone()
        x.requires_grad = True
        
        # Calculate U in Diffusion Equation
        u = self.forward(x)
        u_x_t = autograd.grad(u, x, torch.ones([x.shape[0], 1]).to(device), retain_graph=True, create_graph=True)[0]  # first derivative of U function with respect to x
        u_xx_tt = autograd.grad(u_x_t, x, torch.ones(x.shape).to(device), create_graph=True)[0]  # first derivative of U function with respect to x
        u_x = u_x_t[:, [0]]
        u_t = u_x_t[:, [1]]  # we select the 2nd element for t (the first one is x) (Remember the input X=[x,t])
        u_xx = u_xx_tt[:, [0]]  # we select the 1st element for x (the second one is t) (Remember the input X=[x,t])
        u_tt = u_xx_tt[:, [1]]


        # f is actual solution to PDE
        f = torch.exp(-x[:, 1:]) * (-torch.sin(np.pi * x[:, 0:1]) - np.pi**2 * torch.sin(np.pi * x[:, 0:1]))

        #u is loss function- sum of gradients and actual solution
        u = u_t - u_xx + f

        u_hat = torch.zeros(x.shape[0], 1).to(device)
        loss_pde = self.loss_function(u, u_hat)
        return loss_pde

    def loss(self, x_BC, y_BC, x_PDE, w_bc, w_pde):
        """
        Calculate loss associated with boundary conditions and actual PDE and sums it up with the actual weights
        """
        loss_bc = self.lossBC(x_BC, y_BC)
        loss_pde = self.lossPDE(x_PDE)
        return w_bc * loss_bc + w_pde * loss_pde
    

    def error_l2_norm(self, x, y):
        """
        Calculate error between predicted data and test data.
        """
        error_u_predict = torch.norm(self.forward(x) - y, dim=1, p=2)
        error_u_predict_mean = error_u_predict.mean()
        return error_u_predict_mean, error_u_predict
    

    def relative_error_l2_norm(self, x, y):
        """
        Calculate relative error using L2 norm.
        """
        error_u_predict = torch.norm(self.forward(x) - y, dim=1, p=2)
        norm_u_real = torch.norm(y, dim=1, p=2)
        rela_error = error_u_predict / (norm_u_real)
        rela_error_mean = rela_error.mean()
        return rela_error_mean, rela_error
    

Let's define the parameters for our PINN:

In [None]:
test_folder = './pinn_results/'
logging.basicConfig(handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler(test_folder + "/0_log.txt")], level=logging.INFO, format="%(message)s")
logger = logging.getLogger()

steps = 3000  # Number of steps in the optimization
batch_size = 1000  # Batch size in the optimization
w_bc = 100  # Extra weights for the loss_bc functional
w_pde = 1  # Extra weights for the loss_pde functional
lr = 1e-3  # Learning rate for the stochastic gradient descent
layers = np.array([2, 50, 50, 50, 50, 1])  # Number of neurons on each layer of the fully connected neural network
log_freq = 50  # log frequency

## Set Boundaries for position and time for PDE ##
x_min = -1
x_max = 1 #L
t_min = 0
t_max = 1


# Define the number of training/test data
N_test_x = 200  #  Number of testing points in space @@@ actually
N_test_t = 200  #  Number of testing points in time
N_train_x = 150  #  Number of training points in space
N_train_t = 150  #  Number of training points in time
N_bc = 1000  #  Number of training points on the boundary

### Generate Training and Test Data ### 
X_pde = make_pde_data(x_min,x_max,t_min,t_max,N_train_x,N_train_t)
X_train_BC, Y_train_BC = make_ic_bc_data(x_min,x_max,t_min,t_max,N_bc)
y_real, x_test, y_test, X, T = gen_test_dataset(x_min,x_max,t_min,t_max,N_test_x,N_test_t)

# Send data to Device for NN #
X_train_BC = X_train_BC.float().to(device)  # Training Points (BC)
Y_train_BC = Y_train_BC.float().to(device)  # Training Points (BC)
X_pde = X_pde.float().to(device)  # Collocation Points
X_test = x_test.float().to(device)  # the input dataset (complete)
Y_test = y_test.float().to(device)  # the real solution


Now let's create the neural network and run it to the specified conditions:

In [None]:
# Create Neural Network Model and Optimizer

PINN = fcn_nn(layers = layers, domain = [x_min,x_max,t_min,t_max])
PINN.to(device)
print(PINN)
optimizer = torch.optim.Adam(PINN.parameters(), lr=lr, amsgrad=False)

# Creating a root directory to store information
res_dict = {"loss": [], "loss_bc": [], "loss_pde": [], "rela_err_l2": []}
start_time_a = time.time()
count = 1
plot_mesh(X, T, y_real, name=f"0_real", error=0, folder=test_folder)  # plot real solution

for i in range(1, steps + 1):
    idx = np.random.choice(X_pde.shape[0], batch_size, replace=False)  # Choose randomly a batch in x_train_pde
    loss = PINN.loss(X_train_BC, Y_train_BC, X_pde[idx, :], w_bc=w_bc, w_pde=w_pde)  # use mean squared error
    optimizer.zero_grad()  # We rest the optimizer by making all gradients 0 again
    loss.backward()  # Backpropagation (compute the gradients)
    optimizer.step()  # Actualization of the parameters
    # optimizer.step(PINN.closure) # L-BFGS Optimizer

    # Create the plots with the solution with the current parameters, and the relative error.
    res_dict["loss"].append(loss.item())
    with torch.no_grad():
        err_l2_mean, error = PINN.error_l2_norm(X_test, Y_test)
        res_dict["rela_err_l2"].append(err_l2_mean.item())
    if i % log_freq == 0 or i == 1:
        u_predict = PINN(X_test)
        arr_y1 = u_predict.reshape(shape=[N_test_t, N_test_x]).transpose(1, 0).detach().cpu()
        plot_mesh(X, T, arr_y1, name=f"approx_{count}", loss=loss.item(), folder=test_folder, iter=i)
        error = error.reshape(shape=[N_test_t, N_test_x]).transpose(1, 0).detach()
        plot_mesh(X, T, error, name=f"error_{count}", error=err_l2_mean, folder=test_folder, iter=i)
        if i % (log_freq*2) == 0 or i == 1:
            logger.info(f"| Iter: {i} | Loss: {loss.item():.4f} | Total_time: {(time.time() - start_time_a)/60:.1f}min")
        count += 1
    path = test_folder + "/A_results_dict.npy"
    np.save(path, np.asarray(res_dict, dtype=object))
print('Finished training')

In [None]:
generate_gif(test_folder, count, remove_imgs=False)

Now let's compare our results with the intended results. 

The actual results:

<div>
<img src="./images/real_solution.png" width="700"/>
</div>

And now our results:

<div>
<img src="./images/final.png" width="700"/>
</div>


We can see that towards the red-shaded region on the sinusoidal wave ($x > 0$), we lose the definition of $U$. Our calculated loss is decently low to suggest that we have high accuracy for our PINN. Additionally, on my personal machine, it took approximately 135 seconds for the PINN to learn of the solution. Although this is a simple PDE to solve, more complicated PDEs and systems will require more computational resources for which PINNs can be less resource-intensive at its most basic implementation.