# Exercises: physics-informed neural network

Exercise on the implementation of physics-informed neural network. 

Date: 2024

Course: 056936 - SCIENTIFIC COMPUTING TOOLS FOR ADVANCED MATHEMATICAL MODELLING (PAGANI STEFANO) [2023-24].

Example adapted from this [notebook](https://colab.research.google.com/drive/1qBrbgevkSBqqYc8bOPiaoJG1MBrBrluN?usp=share_link). 


Let us consider the problem

\begin{aligned}
  & -\nu \Delta u = -2(x^2+y^2)  \,, \quad (x,y) \in [-1,1] \times [-1,1]\,,\\
  & u(x,-1) = u(x,1) = -x^2 \,, \quad -1 < x < 1\,, \\
  & u(-1,y) = u(1,y) = -y^2 \,, \quad -1 < y < 1\,, \\
\end{aligned}

where $\nu=1$. We consider the PINN framework for reconstructing the solution from sparse measurements.

In [None]:
# import required libraries

import tensorflow as tf
import numpy as np
import scipy.io
from tensorflow import keras
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
#!pip -q install pyDOE
from pyDOE import lhs  # for latin hypercube sampling


In [None]:
# set seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
""" 
    Optimize a keras model using scipy.optimize
    This block of code is taken and adapted from https://github.com/pedro-r-marques/keras-opt/tree/master
    See the repository for all the information.
"""
import numpy as np
from scipy.optimize import minimize
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import backend as K  # pylint: disable=import-error

from tensorflow.python.keras.engine import data_adapter


class ScipyOptimizer():
    """ Implements a training function that uses scipy optimize in order
        to determine the weights for the model.

        The minimize function expects to be able to attempt multiple solutions
        over the model. It calls a function which collects all gradients for
        all steps and then returns the gradient information to the optimizer.
    """

    def __init__(self, model, method='L-BFGS-B', verbose=1, maxiter=1):
        self.model = model
        self.method = method
        self.verbose = verbose
        self.maxiter = maxiter
        if model.run_eagerly:
            self.func = model.__call__
        else:
            self.func = tf.function(
                model.__call__, experimental_relax_shapes=True)

    def _update_weights(self, x):
        x_offset = 0
        for var in self.model.trainable_variables:
            shape = var.get_shape()
            w_size = np.prod(shape)
            value = np.array(x[x_offset:x_offset+w_size]).reshape(shape)
            K.set_value(var, value)
            x_offset += w_size
        assert x_offset == len(x)

    def _fun_generator(self, x, iterator):
        """ Function optimized by scipy minimize.

            Returns function cost and gradients for all trainable variables.
        """
        model = self.model
        self._update_weights(x)
        losses = []

        dataset = iterator._dataset  # pylint:disable=protected-access
        assert dataset is not None
        iterator = iter(dataset)

        size = dataset.cardinality().numpy()
        if size > 0:
            n_steps = (size + dataset.batch_size - 1) // dataset.batch_size
        else:
            n_steps = None

        progbar = keras.utils.Progbar(n_steps, verbose=self.verbose)

        with tf.GradientTape() as tape:
            for step, data in enumerate(iterator):
                data = data_adapter.expand_1d(data)
                x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(
                    data)
                y_pred = self.func(x, training=True)
                loss = model.compiled_loss(y, y_pred, sample_weight,
                                           regularization_losses=model.losses)
                progbar.update(step, [('loss', loss.numpy())])
                losses.append(loss)
            xloss = tf.reduce_mean(tf.stack(losses))
            grads = tape.gradient(xloss, model.trainable_variables)

        cost = xloss.numpy()

        if all(isinstance(x, tf.Tensor) for x in grads):
            xgrads = np.concatenate([x.numpy().reshape(-1) for x in grads])
            return cost, xgrads

        if all(isinstance(x, tf.IndexedSlices) for x in grads):
            xgrad_list = []
            for var, grad in zip(model.trainable_variables, grads):
                value = tf.Variable(np.zeros(var.shape), dtype=var.dtype)
                value.assign_add(grad)
                xgrad_list.append(value.numpy())
            xgrads = np.concatenate([x.reshape(-1) for x in xgrad_list])
            return cost, xgrads

        raise NotImplementedError()
        return -1, np.array([])  # pylint:disable=unreachable

    def train_function(self, iterator):
        """ Called by model fit.
        """
        min_options = {
            'maxiter': self.maxiter,
            'disp': bool(self.verbose),
        }

        var_list = self.model.trainable_variables
        x0 = np.concatenate([x.numpy().reshape(-1) for x in var_list])

        result = minimize(
            self._fun_generator, x0, method=self.method, jac=True,
            options=min_options, args=(iterator,))

        self._update_weights(result['x'])
        return {'loss': result['fun']}


def make_train_function(model, **kwargs):
    """ Returns a function that will be called to train the model.

        model._steps_per_execution must be set in order for train function to
        be called once per epoch.
    """
    model._assert_compile_was_called()  # pylint:disable=protected-access
    model._configure_steps_per_execution(tf.int64.max)  # pylint:disable=protected-access
    opt = ScipyOptimizer(model, **kwargs)
    return opt.train_function

# Solution

In [None]:
# plot exact solution
nu = 1.0 # diffusion parameter

# define grid for sampling the exact solution
N_h = 128
ux = np.linspace(-1.0,1.0,N_h)
uy = np.linspace (-1.0,1.0,N_h)

# sampling loop:
u_true = np.zeros([N_h,N_h])
for i in range(N_h):
    u_true[:,i] = (ux[i]**2)*(uy**2)

X_plot,Y_plot = np.meshgrid(ux,uy)
plt.pcolor(X_plot, Y_plot, u_true)
plt.axis('equal')
plt.colorbar()



In [None]:
# flatten grid and solution
X,Y = np.meshgrid(ux,uy)
X_flat = tf.convert_to_tensor(np.hstack((X.flatten()[:,None],Y.flatten()[:,None])),dtype=tf.float64)
u_flat = u_true.T.flatten()

In [None]:
np.shape(u_true)

In [None]:

# PINN loss function
def PINNloss(xcl,ycl,Xmeas,umeas):
    #umeas_pred = PINN(tf.concat([xmeas,ymeas],1))
    umeas_pred = PINN(Xmeas)
    r_pred   = r_PINN(xcl,ycl)

    # loss components
    mse_meas  = tf.reduce_mean(tf.pow(umeas-umeas_pred,2))
    mse_r  = tf.reduce_mean(tf.pow(r_pred,2))

    return mse_r+mse_meas #mse_0+mse_r+mse_lb+mse_ub

# neural network weight gradients
@tf.function
def grad(model,xcl,ycl,xmeas,ymeas,umeas):
    with tf.GradientTape(persistent=True) as tape:
        loss_value = PINNloss(xcl,ycl,tf.concat([xmeas,ymeas],1),umeas)
        grads = tape.gradient(loss_value,model.trainable_variables)
    return loss_value, grads

In [None]:
# collocation points
Ncl = 2000
X = lhs(2,Ncl)
xcl = tf.expand_dims(tf.cast(-1.0+(2.0)*X[:,0],dtype=tf.float64),axis=-1)
ycl = tf.expand_dims(tf.cast(-1.0+(2.0)*X[:,1],dtype=tf.float64),axis=-1)
X_coll = tf.concat([xcl,ycl],1)

# measurement points

Ncl = 20
X = lhs(2,Ncl)
xmeas = tf.expand_dims(tf.cast(-1.0+(2.0)*X[:,0],dtype=tf.float64),axis=-1)
ymeas = tf.expand_dims(tf.cast(-1.0+(2.0)*X[:,1],dtype=tf.float64),axis=-1)
X_meas = tf.concat([xmeas,ymeas],1)
umeas = (xmeas**2)*(ymeas**2)




Task 1: Test the accuracy of the reconstruction with respect to the optimization configurations and the number of layers.

In [None]:
# training loop

# initialize new instance of NN
PINN = tf.keras.Sequential([
    tf.keras.layers.Dense(8, activation='tanh', input_shape=(2,),kernel_initializer="glorot_normal",dtype=tf.float64),
    tf.keras.layers.Dense(32, activation='tanh',kernel_initializer="glorot_normal",dtype=tf.float64),
    tf.keras.layers.Dense(1,activation=None,kernel_initializer="glorot_normal",dtype=tf.float64)
])



In [None]:

# residual computation based on AD
@tf.function
def r_PINN(x,y):
    u    = PINN(tf.concat([x,y], 1))
    u_x  = tf.gradients(u,x)[0]
    u_xx = tf.gradients(u_x, x)[0]
    u_y  = tf.gradients(u,y)[0]
    u_yy = tf.gradients(u_y, y)[0]
    return - u_xx - u_yy + 2.0*( tf.pow(x,2) + tf.pow(y,2) )



In [None]:
# # # Adam optimizer
# tf_optimizer = tf.keras.optimizers.Adam(learning_rate=0.003,beta_1=0.99)

# for iter in range(500):

#   # compute gradients using AD
#   loss_value,grads = grad(PINN,xcl,ycl,xmeas,ymeas,umeas)

  # update neural network weights
  #tf_optimizer.apply_gradients(zip(grads,PINN.trainable_variables))

#   # display intermediate results
#   if ((iter+1) % 200 == 0):
#     print('iter =  '+str(iter+1))
#     print('loss = {:.4f}'.format(loss_value))
#     PINN_flat = PINN(X_flat)
#     err = np.linalg.norm(u_flat-PINN_flat[:,-1],2)/np.linalg.norm(u_flat,2)
#     print('L2 error: %.4e' % (err))

In [None]:
# def my_loss_fn(y_true, y_pred, xcl = xcl, ycl=ycl, xmeas=xmeas, ymeas=ymeas):
#     return PINNloss(xcl,ycl,tf.concat([xmeas,ymeas],1),y_true)

def my_loss_fn(y_true, y_pred, xcl=xcl, ycl=ycl):
    return tf.reduce_mean(tf.pow(y_true - y_pred,2)) + tf.reduce_mean(tf.pow(r_PINN(xcl,ycl),2))

print(  my_loss_fn(PINN(tf.concat([xmeas,ymeas],1)) , umeas) )

PINN.compile( loss=my_loss_fn, optimizer=tf.keras.optimizers.Adam(learning_rate=0.003,beta_1=0.99) )

history_adam = PINN.fit( tf.concat([xmeas,ymeas],1), umeas , epochs=2000 )

PINN.compile( loss=my_loss_fn )

PINN.train_function = make_train_function(PINN, maxiter=2000)

#print(PINN(tf.concat([xmeas,ymeas],1)))
#print(umeas)
print(PINNloss(xcl,ycl,tf.concat([xmeas,ymeas],1),umeas))

history = PINN.fit( tf.concat([xmeas,ymeas],1), umeas )

In [None]:

# # Adam optimizer
# tf_optimizer = tf.keras.optimizers.Adam(learning_rate=0.003,beta_1=0.99)

# for iter in range(10000):

#   # compute gradients using AD
#   loss_value,grads = grad(PINN,xcl,ycl,xmeas,ymeas,umeas)

#   # update neural network weights
#   tf_optimizer.apply_gradients(zip(grads,PINN.trainable_variables))

#   # display intermediate results
#   if ((iter+1) % 200 == 0):
#     print('iter =  '+str(iter+1))
#     print('loss = {:.4f}'.format(loss_value))
#     PINN_flat = PINN(X_flat)
#     err = np.linalg.norm(u_flat-PINN_flat[:,-1],2)/np.linalg.norm(u_flat,2)
#     print('L2 error: %.4e' % (err))
    

In [None]:
#Display results
PINN_flat = PINN(X_flat)
err = np.linalg.norm(u_flat-PINN_flat[:,-1],2)/np.linalg.norm(u_flat,2)

fig = plt.figure(figsize=(16,9),dpi=150)
#fig = plt.figure()
#fig.subplots_adjust(wspace=0.3)
plt.style.use('default')
ax = fig.add_subplot(1,3,1)
ax.set_aspect(1)
im = plt.pcolor(X_plot, Y_plot, u_true)
plt.scatter(xmeas,ymeas,marker='x',s=10)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im,cax=cax)
#ax.set_yticklabels(['-1.0','-0.6','-0.2','0.2','0.6','1.0'])
ax.set_title('Exact Solution',fontsize=16)


ax = fig.add_subplot(1,3,2)
ax.set_aspect(1)
im = plt.pcolor(X_plot, Y_plot, np.reshape(PINN_flat,(N_h,N_h)))
plt.scatter(xmeas,ymeas,marker='x',s=10)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im,cax=cax)
ax.set_title('PINN Prediction'.format(err),fontsize=16)

ax = fig.add_subplot(1,3,3)
ax.set_aspect(1)
im = plt.pcolor(X_plot, Y_plot, np.abs( np.reshape(PINN_flat,(N_h,N_h)) -u_true ) )
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im,cax=cax)
ax.set_title('L2 error = {:.4f}'.format(err),fontsize=16)

Task 2: Re-implement the notebook, considering the case where there are no boundary conditions and measurements are taken only on internal points (chosen randomly). Analyze the accuracy with respect to the number of measurements.