<div align="center">

# Physics-Informed Neural Networks (PINNs)

## Projet SOIA 6.1
</div>

Oscar Agnus, SOIA A3


<div align="center">

# 1D Heat Equation

</div>

Now, let's try to simulate the 1D heat equation using a physics-informed neural network (PINN), and add time.
The heat equation is given by:
$$
\frac{\partial u}{\partial t} + \nu \frac{\partial^2 u}{\partial x^2} = f(x,t)
$$

where $u$ is the temperature,
$x$ is the space coordinate,
$t$ is the time coordinate,
and $\nu$ is the thermal diffusivity.


In [1]:
# Change to inline or notebook if the plots aren't displayed correctly
# %matplotlib inline
%matplotlib notebook

# Imports
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import time
from datetime import timedelta

import os

# Weights and Biases - this is for logging the results
import wandb
# from colors import color
# from scipy.special import linestyle


def display_time(seconds):
    """Convert seconds to a time string 00h 00min 00s, for better readability."""
    delta = timedelta(seconds=seconds)
    hours, remainder = divmod(delta.seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02}h {minutes:02}min {seconds:02}s"

## Problem definition
We define the problem in physical terms, and format it so that it can be used by the neural network.

For now, the constants in the following block will be used as global variables. We may try to estimate them later.

The points in the space and time domain will be used to make a "grid" of points, which will be used only to display the results.

In [2]:
# Space domain
x_min = 0
x_max = 1
N_x = 1000 # Number of points in the space domain

# Time domain
t_min = 0
t_max = 1
N_t = 1000 # Number of points in the time domain

# Define other useful parameters
nu = 1 # Thermal diffusivity
u_bc_0 = 0 # Boundary condition at x = 0
u_bc_1 = 0 # Boundary condition at x = 1

def f(x):
    """Source of the heat equation."""
    return torch.zeros_like(x)

def initial_conditions(n_initial):
    """Initial conditions for the heat equation."""
    x = torch.rand(n_initial) * (x_max - x_min) + x_min
    t = torch.zeros_like(x)
    u_initial = - torch.sin(np.pi*x)
    return u_initial

def boundary_conditions(n_bound, x_0, x_1, u_0, u_1):
    """Boundary conditions for the heat equation.
    The boundary conditions are repeated n_bound times, so that not only 2 points are used for training."""
    x = torch.cat([torch.ones(n_bound // 2) * x_0, torch.ones(n_bound // 2) * x_1], dim=0)
    t = torch.rand(n_bound) * (t_max - t_min) + t_min
    u = torch.cat([torch.ones(n_bound // 2) * u_0, torch.ones(n_bound // 2) * u_1], dim=0)
    return x, t, u

def u_exact(x, t, x_min, x_max, t_min, t_max):
    """Exact solution of the heat equation."""
    # u = torch.sin(np.pi * x) * torch.exp(-np.pi**2 * nu * t)
    # return u
    return torch.zeros_like(x)

## Neural network design
We define the neural network that will compute the solution of the heat equation.

For now, we use only fully connected layers with a hyperbolic tangent (Tanh) activation function.
We will investigate different architectures later.

In [3]:
# Define the model
class Network(nn.Module):
    def __init__(self, neurons=20, layers=8):
        super(Network, self).__init__()
        self.hidden_layers = nn.ModuleList([nn.Linear(2, neurons) if i == 0 else nn.Linear(neurons, neurons) for i in range(layers)])
        self.output_layer = nn.Linear(neurons, 1)

    def forward(self, x,t):
        y = torch.stack([x,t],dim=-1)
        # print("y shape : ", y.shape)
        for layer in self.hidden_layers:
            y = torch.tanh(layer(y))
        out = self.output_layer(y)
        # print("out shape : ", out.shape)
        return out

In [4]:
# Create the model
n_layers  = 8 # Number of hidden layers
n_neurons = 20 # Number of neurons in the hidden layers

# layers = [torch.nn.Linear(N_t, n_neurons), torch.nn.Tanh()] # Is the Tanh activation mandatory here ?
# for i in range(n_layers):
#     layers.append(torch.nn.Linear(n_neurons, n_neurons))
#     layers.append(torch.nn.Tanh())
# layers.append(torch.nn.Linear(n_neurons, N_t))
# my_model = torch.nn.Sequential(*layers)

my_model = Network(n_neurons, n_layers)

# Checkpoint
my_model_checkpoint = my_model

# Summary of the model
print(my_model)

Network(
  (hidden_layers): ModuleList(
    (0): Linear(in_features=2, out_features=20, bias=True)
    (1-7): 7 x Linear(in_features=20, out_features=20, bias=True)
  )
  (output_layer): Linear(in_features=20, out_features=1, bias=True)
)


## Loss functions
Here are defined the loss functions. There are 3 different loss functions:
- $L_{data}$ : loss function for the data points
- $L_{bc}$ &nbsp; &nbsp;: loss function for the boundary conditions
- $L_{pde}$ : loss function for the PDE

All of them use the Mean Squared Error (MSE) with the given data points.
Each one of them will be used with their relative weight, which represent their importance in the composite loss function $L_{composite}$.

__Notation :__ We will use the following notation to designate the derivatives of $u$ with respect to the variable $s$ : $u_s = \frac{\partial u}{\partial s}$.

In [5]:
# Gradient function
def grad(outputs, inputs):
    """Compute the gradient of 'outputs' with respect to 'inputs'."""
    return torch.autograd.grad(outputs = outputs, inputs = inputs, grad_outputs = torch.ones_like(outputs), create_graph = True)[0]

# Loss functions
def loss_data(u_pred, u_gt):
    """Loss function for the data points."""
    return torch.nn.MSELoss()(u_pred, u_gt)
    
def loss_bc(u_pred, u_bc):
    """Loss function for the boundary conditions."""
    return torch.nn.MSELoss()(u_pred, u_bc)

def loss_pde(u_pred, x_pde, t_pde):
    """Loss function for the PDE.
    Needs to be changed in case of a different PDE."""
    u_x  = grad(u_pred, x_pde) # First order derivative
    u_xx = grad(u_x, x_pde)    # Second order derivative
    u_t  = grad(u_pred, t_pde) # First order derivative
    return torch.nn.MSELoss()(u_t + nu*u_xx, torch.zeros_like(u_pred))
    # TODO : Add the source term f(x,t) in the loss function

def loss_composite(u_gt, u_pred, u_bc, u_bc_pred, x_pde, t_pde):
    """Composite loss function, combining the three loss functions."""
    # Weights for the composite loss function
    w_data = 0.0
    w_bc   = 1.0
    w_pde  = 1.0
    return w_data*loss_data(u_pred, u_gt) + w_bc*loss_bc(u_bc_pred, u_bc) + w_pde*loss_pde(u_pred, x_pde, t_pde)

## Training and testing functions
Here we define the training and testing functions. We want to make a checkpoint of the model when the testing loss is minimal.

In [6]:
def train(model, optimizer, n_epoch, n_colloc_x, n_colloc_t, n_bound_x, current_device = "cpu"):
    """Training function for the model."""

    model.train() # Set the model to training mode
    model_checkpoint = model # Checkpoint of the model

    # Collocation points
    X = torch.rand(n_colloc_x) * (x_max - x_min) + x_min # Collocation points, randomly distributed in the space domain
    X.requires_grad_(True)
    X = X.to(current_device) # Send the inputs to the device (GPU or CPU)

    T = torch.rand(n_colloc_t) * (t_max - t_min) + t_min # Collocation points, randomly distributed in the time domain
    T.requires_grad_(True)
    T = T.to(current_device) # Send the inputs to the device (GPU or CPU)

    # Boundary conditions
    X_bc, T_bc, U_bc = boundary_conditions(n_bound_x, x_min, x_max, u_bc_0, u_bc_1) # Boundary conditions
    X_bc = X_bc.to(current_device)    # Send the inputs to the device (GPU or CPU)
    T_bc = T_bc.to(current_device)    # Send the inputs to the device (GPU or CPU)
    U_bc = U_bc.to(current_device)              # Send the values to the device (GPU or CPU)

    # Array to store the losses during the training
    train_loss = np.array([])

    # Training loop
    for n in range(n_epoch):
        t_0 = time.time()
        optimizer.zero_grad()           # Zero the parameter gradients
        U_pred = model(X, T)      # Forward pass for the collocation points
        U_pred_bc = model(X_bc, T_bc)   # Forward pass for the boundary conditions

        loss = loss_composite(u_exact(X, T, x_min, x_max, t_min, t_max), U_pred, U_bc, U_pred_bc, X, T) # Compute the loss
        loss.backward()                # Backward pass
        optimizer.step()
        train_loss = np.append(train_loss, loss.item())

        # Checkpoint
        if loss.item() <= np.min(train_loss):
            model_checkpoint = model
            torch.save(model.state_dict(), f"PINN_1D_heat_eq_model_{n_layers}l_{n_neurons}n.pth")

        # Display time and other stats
        deltat = []
        t_1 = time.time()
        deltat.append(t_1-t_0)
        eta_s = int(np.mean(deltat) * (n_epoch - n))
        if n % 100 == 0:
            print(f"#__Epoch [{n}/{n_epoch}] : ",
                  f"Duration: {display_time(time.time()-t0)} | ETA: {display_time(eta_s)}",
                  f"___ Current Loss : {train_loss[-1]} | Best Loss : {np.min(train_loss)}")

    return train_loss, model_checkpoint

## Model training

Check in the PC configuration if the GPU is available. Cuda needs to be installed to use the GPU with PyTorch.

This will speed up the training process.

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Is cuda available: ", torch.cuda.is_available())
my_model.to(device)
print(f"Using device: {device}")

Is cuda available:  True
Using device: cuda


We then load the data and start the training process.

There isn't any testing data, because we will not use the accuracy metrics to assess the model.
The model will be evaluated by the `composite_loss` function.

The training data `u_data` should be either the exact solution, or some measured data.

In [8]:
# Training parameters
lr = 0.01
optimiser = optim.Adam(my_model.parameters(), lr=lr)
# optimiser = optim.SGD(my_model.parameters(), lr=lr)
n_epoch = 5000
n_col_x = 1000
n_col_t = 1000
n_bc = 1000

# Training
t0 = time.time()
training_loss, my_model_checkpoint = train(my_model, optimiser, n_epoch, n_col_x, n_col_t, n_bc, device)
t1 = time.time()

  return F.mse_loss(input, target, reduction=self.reduction)
  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
  return F.mse_loss(input, target, reduction=self.reduction)


#__Epoch [0/5000] :  Duration: 00h 00min 00s | ETA: 00h 16min 20s ___ Current Loss : 0.0200686976313591 | Best Loss : 0.0200686976313591
#__Epoch [100/5000] :  Duration: 00h 00min 01s | ETA: 00h 01min 33s ___ Current Loss : 1.1277835199052788e-07 | Best Loss : 2.6422899424005664e-08
#__Epoch [200/5000] :  Duration: 00h 00min 04s | ETA: 00h 02min 09s ___ Current Loss : 2.8521256467683997e-09 | Best Loss : 2.8521256467683997e-09
#__Epoch [300/5000] :  Duration: 00h 00min 06s | ETA: 00h 01min 53s ___ Current Loss : 1.911014901878616e-09 | Best Loss : 1.911014901878616e-09
#__Epoch [400/5000] :  Duration: 00h 00min 08s | ETA: 00h 01min 15s ___ Current Loss : 1.242301816972713e-09 | Best Loss : 1.242301816972713e-09
#__Epoch [500/5000] :  Duration: 00h 00min 11s | ETA: 00h 01min 32s ___ Current Loss : 8.019474528886406e-10 | Best Loss : 8.019474528886406e-10
#__Epoch [600/5000] :  Duration: 00h 00min 13s | ETA: 00h 01min 19s ___ Current Loss : 5.327540630872818e-10 | Best Loss : 5.327540630

In [9]:
# Display the results
print(f"Training loss: {np.min(training_loss)}")
print(f"Training time: {display_time(t1-t0)}")

Training loss: 2.4480726473763426e-11
Training time: 00h 01min 14s


In [10]:
# Parameters for enhanced figure plots
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 100  # Increasing dpi will enhance quality

## Testing the model

The model is now tested on a grid of points in the space and time domain.

In [28]:
# Test points, randomly distributed in the space and time domain

grid_XT = np.meshgrid(np.linspace(x_min, x_max, N_x), np.linspace(t_min, t_max, N_t), indexing='ij')
grid_X, grid_T = [torch.tensor(t,dtype=torch.float32) for t in grid_XT]

# grid_X = torch.rand(N_x, requires_grad=True)*(x_max - x_min) + x_min
# grid_X = grid_X.reshape(N_x, 1)
grid_X = grid_X.to(device) # Send the inputs to the device (GPU or CPU)

# grid_T = torch.rand(N_t, requires_grad=True)*(t_max - t_min) + t_min
# grid_T = grid_T.reshape(N_t, 1)
grid_T = grid_T.to(device) # Send the X_bc to the device (GPU or CPU)

# Predict the solution
U_check = my_model_checkpoint(grid_X, grid_T)
U_ex = u_exact(grid_X, grid_T, x_min, x_max, t_min, t_max)

# Move tensors to CPU before converting to NumPy arrays
grid_X = grid_X.cpu().detach().numpy()
grid_T = grid_T.cpu().detach().numpy()
U_check = U_check.cpu().detach().numpy()
U_ex = U_ex.cpu().detach().numpy()

# U_check = U_check.squeeze(1)
print("U_check shape : ", U_check.shape)

U_check shape :  (1000, 1000, 1)


For better readablity, we can either plot the results in 2D or 3D.

__Note :__ For 2D simulations, 3D rendering will be mandatory in order to display the time evolution.

In [29]:
# Plot the solutions in 2D
plt.figure()
plt.imshow(U_check, aspect='auto', extent=(x_min, x_max, t_min, t_max), label="Predicted solution")
plt.xlabel("x")
plt.ylabel("t")
plt.title(f"Solution of the 1D heat equation, stationary \n Architecture : {n_layers} layers, {n_neurons} neurons/layer")
# plt.grid()
plt.show()

<IPython.core.display.Javascript object>

In [34]:
# Plot the solutions in 3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
U_check = U_check.reshape(N_x, N_t)
ax.plot_surface(grid_X, grid_T, U_check, cmap='viridis')
ax.set_xlabel("x")
ax.set_ylabel("t")
ax.set_zlabel("u")
ax.set_title(f"Solution of the 1D heat equation, stationary \n Architecture : {n_layers} layers, {n_neurons} neurons/layer")
plt.show()

<IPython.core.display.Javascript object>

In [14]:
# # Compute the mean squared error to assess the prediction's accuracy
# mse_checkpoint = np.linalg.norm(U_check - U_ex, 2)
# print(f"MSE for the checkpoint : {mse_checkpoint}")

In [15]:
# Save the figure to directory
parent_directory = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
save_fig_directory = f"{parent_directory}/PINN_Plots/1D_heat_eq"
plt.savefig(f"{save_fig_directory}/PINN_1D_heat_eq_solution_{n_layers}l_{n_neurons}n.png")

In [16]:
# Plot the training loss
plt.figure()
plt.semilogy(training_loss, label=f"Optimizer : {optimiser.__class__.__name__}, lr={lr}") # Log scale
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.title(f"Training loss of the 1D heat equation \n Architecture : {n_layers} layers, {n_neurons} neurons/layer \n Best loss : {np.min(training_loss)}")
plt.show()

<IPython.core.display.Javascript object>

In [17]:
# Save the figure to directory
plt.savefig(f"{save_fig_directory}/PINN_1D_heat_eq_loss_{n_layers}l_{n_neurons}n.png")