<a href="https://colab.research.google.com/github/arnauldnzegha/DEEP2PDEs/blob/main/ADVERT_REACTION_DIFUSION_Rassi_et_al.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DEEP2PDEs Projet

#Rassi et al.  (Physics Informed Deep Learning (Part II): An application on reaction diffusion

##Test on reaction diffusion equation

In [None]:
import tensorflow as tf
from keras import backend as K 
from keras.models import Model
import math as M
import matplotlib.pyplot as plt
from keras.optimizers import SGD,Adadelta
from keras.layers import Dense, Input
import numpy as np
from sklearn.datasets import make_regression
from sklearn.preprocessing import StandardScaler
import scipy.io
from keras.regularizers import l1
from sklearn.neural_network import MLPRegressor
from random import shuffle
import time
from itertools import product
import zipfile, os

from mpl_toolkits.mplot3d import axes3d, Axes3D

import sys
import matplotlib.pyplot as plt
import scipy.io
from scipy.interpolate import griddata
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.gridspec as gridspec
import time
np.random.seed(1234)
tf.set_random_seed(1234)

  '{0}.{1}.{2}'.format(*version.hdf5_built_version_tuple)
Using TensorFlow backend.


The raissi PINNs class.

In [None]:
class PhysicsInformedNN:
    # Initialize the class
    def __init__(self, X, u, layers, lb, ub, r0=21,D0=3,trainable_r=True):
        
        self.lb = lb
        self.ub = ub
        
        self.t = X[:,0:1]
        self.x = X[:,1:2]
        self.y = X[:,2:3]
        self.u = u
        
        self.layers = layers
        
        # Initialize NNs
        self.weights, self.biases = self.initialize_NN(layers)
        
        # tf placeholders and graph
        self.sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True,
                                                     log_device_placement=True))
        
        # Initialize parameters
        self.r = tf.Variable([r0], dtype=tf.float32, trainable=trainable_r)
        self.d = tf.Variable([D0], dtype=tf.float32)
        
        self.x_tf = tf.placeholder(tf.float32, shape=[None, self.x.shape[1]])
        self.t_tf = tf.placeholder(tf.float32, shape=[None, self.t.shape[1]])
        self.y_tf = tf.placeholder(tf.float32, shape=[None, self.y.shape[1]])
        self.u_tf = tf.placeholder(tf.float32, shape=[None, self.u.shape[1]])
                
        self.u_pred = self.net_u(self.x_tf, self.y_tf, self.t_tf)
        self.f_pred = self.net_f(self.x_tf, self.y_tf, self.t_tf)
        
        self.loss_u = tf.reduce_mean(tf.square(self.u_tf - self.u_pred))
        self.loss = tf.reduce_mean(tf.square(self.u_tf - self.u_pred)) + \
                    tf.reduce_mean(tf.square(self.f_pred))
        
        self.optimizer_u = tf.contrib.opt.ScipyOptimizerInterface(self.loss_u, 
                                                                method = 'L-BFGS-B', 
                                                                options = {'maxiter': 2000,
                                                                           'maxfun': 1000,
                                                                           'maxcor': 50,
                                                                           'maxls': 50,
                                                                           'ftol' : 1.0 * np.finfo(float).eps})
        self.optimizer = tf.contrib.opt.ScipyOptimizerInterface(self.loss, 
                                                                method = 'L-BFGS-B', 
                                                                options = {'maxiter': 50000,
                                                                           'maxfun': 5000,
                                                                           'maxcor': 50,
                                                                           'maxls': 50,
                                                                           'ftol' : 1.0 * np.finfo(float).eps})
    
        self.optimizer_Adam = tf.train.AdamOptimizer()
        self.train_op_Adam = self.optimizer_Adam.minimize(self.loss)
        
        init = tf.global_variables_initializer()
        self.sess.run(init)

    def initialize_NN(self, layers):        
        weights = []
        biases = []
        num_layers = len(layers) 
        for l in range(0,num_layers-1):
            W = self.xavier_init(size=[layers[l], layers[l+1]])
            b = tf.Variable(tf.zeros([1,layers[l+1]], dtype=tf.float32), dtype=tf.float32)
            weights.append(W)
            biases.append(b)        
        return weights, biases
        
    def xavier_init(self, size):
        in_dim = size[0]
        out_dim = size[1]        
        xavier_stddev = np.sqrt(2/(in_dim + out_dim))
        return tf.Variable(tf.truncated_normal([in_dim, out_dim], stddev=xavier_stddev), dtype=tf.float32)
    
    def neural_net(self, X, weights, biases):
        num_layers = len(weights) + 1
        
        H = 2.0*(X - self.lb)/(self.ub - self.lb) - 1.0
        for l in range(0,num_layers-2):
            W = weights[l]
            b = biases[l]
            H = tf.tanh(tf.add(tf.matmul(H, W), b))
        W = weights[-1]
        b = biases[-1]
        Y = tf.add(tf.matmul(H, W), b)
        return Y
            
    def net_u(self, x, y, t):
        u = self.neural_net(tf.concat([x,y,t],1), self.weights, self.biases)
        return u
    
    def net_f(self, x, y, t):
        r=self.r
        d=self.d
        u = self.net_u(x,y, t)
        u_t = tf.gradients(u, t)[0]
        u_x = tf.gradients(u, x)[0]
        u_y = tf.gradients(u, y)[0]
        u_xx = tf.gradients(u_x, x)[0]
        u_yy = tf.gradients(u_y, y)[0]
        f = u_t-r*u - d*(u_xx + u_yy)
        
        return f
    
    def callback(self, loss, r, d):
        print('Loss: %e, r: %.5f, d: %.5f' % (loss, r, d))
        
        
    def train(self, nIter):
        tf_dict = {self.x_tf: self.x, self.y_tf: self.y, self.t_tf: self.t, self.u_tf: self.u}
        
        start_time = time.time()
        for it in range(nIter):
            self.sess.run(self.train_op_Adam, tf_dict)
            
            # Print
            if it % 100 == 0:
                elapsed = time.time() - start_time
                
                loss_value = self.sess.run(self.loss, tf_dict)
                r = self.sess.run(self.r)
                d = self.sess.run(self.d)
                print('It: %d, Loss: %e, r: %.5f, d: %.5f, Time: %.2f' % 
                      (it, loss_value, r, d, elapsed))
                start_time = time.time()
        
        self.optimizer_u.minimize(self.sess,
                                feed_dict = tf_dict,
                                fetches = [self.loss, self.r,self.d])
        self.optimizer.minimize(self.sess,
                                feed_dict = tf_dict,
                                fetches = [self.loss, self.r,self.d],
                                loss_callback = self.callback)
        
        
    def predict(self, X_star):
        
        tf_dict = {self.t_tf: X_star[:,0:1], self.x_tf: X_star[:,1:2], self.y_tf: X_star[:,2:3]}
        
        u_star = self.sess.run(self.u_pred, tf_dict)
        f_star = self.sess.run(self.f_pred, tf_dict)
        
        return u_star, f_star

    

    

##main

In [None]:
def flaten_data(t,x,y,u):
  xi,yi=np.meshgrid(x, y)
  xi,yi=xi.flatten(),yi.flatten()
  x_train=[]
  for ti in t:
    x_train.append(list(zip([ti]*len(xi),xi,yi)))
  x_train=np.array(x_train)
  return x_train.reshape(-1,3), u.flatten()
def get_data(nb_t,nb_xy,p,v1,v2,d):
    min_t, max_t =0 , 0.99
    min_xy, max_xy =-1, 1
    t = np.linspace(min_t, max_t, nb_t )
    x = np.linspace(min_xy,max_xy, nb_xy )
    y = np.linspace(min_xy,max_xy, nb_xy )
    x_,y_=np.meshgrid(x, y)
    u=[]
    for i in range(0,len(t)):
      u_i=[u_anal(t[i],x,y,p,v1,v2,d) for (x,y) in list(zip(x_,y_))]
      u.append(np.array(u_i))
    return t,x,y, np.array(u)

In [None]:
def get_data_from_fenics_solution(rootDire="d"):
    data=[]
    root =os.listdir(rootDire)
    i=0
    for elte in root:
      filename =rootDire+"/udata_"+str(i)+".txt"
      f = open(filename, "r")
      t_x_y_u=[]
      i+=1
      for line in f:
        elts=line.split()
        t_x_y_u.extend([np.float32(elts)])
      data.extend([t_x_y_u])
    return np.array(data)

In [None]:
nb_t =100
nb_xy=20
r=22
D=1
data=get_data_from_fenics_solution(rootDire="data")
data=data.reshape(-1,4)
T=data[:,0]
X=data[:,1]
Y=data[:,2]
U=data[:,3]

x_train=np.array(list(zip(T,X,Y)))
y_train = U.flatten()
print(len(x_train))
print(len(y_train))

84941
84941


with initial r=22, D=5

In [None]:
if __name__ == "__main__":
    nb_neurone=50
    layers = [3, nb_neurone, nb_neurone, nb_neurone, nb_neurone, nb_neurone,nb_neurone, 1]
    y_train = y_train.reshape(-1, len(y_train))
    lb = x_train.min(0)
    ub = x_train.max(0)
    print(lb, ub)
    
    idx = np.random.choice(len(y_train),1000, replace=False)
    y_train_ = y_train[idx]
    x_train_ = x_train[idx,:]
    r0,D0=22,0
    model = PhysicsInformedNN(x_train_, y_train_, layers, lb, ub,r0,D0)
    start_time = time.time()
    model.train(1)
    end_time = time.time()
    print('Training time: %f s' % (end_time-start_time))

[0. 0. 0.] [1. 1. 1.]
Device mapping:
/job:localhost/replica:0/task:0/device:XLA_CPU:0 -> device: XLA_CPU device
/job:localhost/replica:0/task:0/device:XLA_GPU:0 -> device: XLA_GPU device
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7

It: 0, Loss: 6.206939e+00, r: 22.00000, d: 0.00100, Time: 1.80
Loss: 6.206939e+00, r: 22.00000, d: 0.00100
Loss: 2.809088e+03, r: 22.00000, d: 0.00091
Loss: 5.068559e+00, r: 22.00000, d: 0.00100
Loss: 4.949876e+00, r: 22.00000, d: 0.00099
Loss: 4.755243e+00, r: 22.00000, d: 0.00095
Loss: 4.754643e+00, r: 22.00000, d: 0.00096
Loss: 4.754281e+00, r: 22.00000, d: 0.00096
Loss: 4.754274e+00, r: 22.00000, d: 0.00096
Loss: 4.754249e+00, r: 22.00000, d: 0.00097
Loss: 4.754067e+00, r: 22.00000, d: 0.00109
Loss: 4.753696e+00, r: 22.00000, d: 0.00138
Loss: 4.752770e+00, r: 22.00000, d: 0.00219
Loss: 4.751248e+00, r: 22.00000, d: 0.00366
Loss: 4.749295e+00, r: 22.00000, d: 0.00561
Loss: 

with init r=0,D=0

In [None]:
if __name__ == "__main__":
    nb_neurone=50
    layers = [3, nb_neurone, nb_neurone, nb_neurone, nb_neurone, nb_neurone,nb_neurone, 1]
    y_train = y_train.reshape(-1, len(y_train))
    lb = x_train.min(0)
    ub = x_train.max(0)
    print(lb, ub)
    
    idx = np.random.choice(nb_t*nb_xy*nb_xy,1000, replace=False)
    y_train_ = y_train[idx]
    x_train_ = x_train[idx,:]
    r0,D0=0,0
    model = PhysicsInformedNN(x_train_, y_train_, layers, lb, ub,r0,D0)
    start_time = time.time()
    model.train(1)
    end_time = time.time()
    print('Training time: %f s' % (end_time-start_time))

[0. 0. 0.] [1. 1. 1.]
Device mapping:
/job:localhost/replica:0/task:0/device:XLA_CPU:0 -> device: XLA_CPU device
/job:localhost/replica:0/task:0/device:XLA_GPU:0 -> device: XLA_GPU device
/job:localhost/replica:0/task:0/device:GPU:0 -> device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7

It: 0, Loss: 7.783874e-01, r: -0.00100, d: 0.00100, Time: 1.95
Loss: 7.783874e-01, r: -0.00100, d: 0.00100
Loss: 4.512735e+00, r: -0.00019, d: 0.00009
Loss: 4.658248e-01, r: -0.00086, d: 0.00085
Loss: 3.573233e-01, r: -0.00880, d: 0.00451
Loss: 3.419369e-01, r: -0.01352, d: 0.00593
Loss: 3.408885e-01, r: -0.01243, d: 0.00543
Loss: 3.368815e-01, r: -0.00109, d: 0.00071
Loss: 3.355094e-01, r: 0.01138, d: -0.00527
Loss: 3.383879e-01, r: 0.14160, d: -0.06872
Loss: 3.306471e-01, r: 0.07371, d: -0.03564
Loss: 3.272018e-01, r: 0.13467, d: -0.06521
Loss: 3.165883e-01, r: 0.19471, d: -0.09271
Loss: 3.652448e-01, r: 0.39692, d: -0.16360
Loss: 3.017372e-01, r: 0.26829, d: -0.11850
Loss: 

# FENICS DATA

Script use to generate the data

$u_t$ = D $Δu$ + $f(u)$

$f(u)= u(r-\gamma u)$  with  $r \in R$ and $γ >0$

$u(t,0) = 0 $   

  $u = u_0  at t = 0$

In [None]:
import os
if not os.path.exists("data"):
    os.makedirs("data")
if not os.path.exists("img"):
    os.makedirs("img")


from fenics import *
import numpy as np
import math
from matplotlib import pyplot
from mpl_toolkits.mplot3d import axes3d, Axes3D #<-- Note the capitalization!


#--------------------------------------------------------------------
def plot_img(u,png_name,img_title):
    # Affichages
    pyplot.clf()
    p = plot(u)

    # set  colormap
    p.set_cmap("jet")
    p.set_clim()
    # p.set_clim(0.0, 1.0) # avec borne min et max pour la colorbar

    # add a title to the  plot
    # pyplot.title("u_h at t=%.4f"+str(t))
    #pyplot.title(img_title+' at t = %.4f' % (tplot))
    pyplot.title(img_title)

    # add a colorbar
    pyplot.colorbar(p);
    #plt.show()

    # save  image to disk
    # plt.savefig("../img/uh_" + str(n + 1) + ".png")
    pyplot.savefig(png_name)
    return p
#---------------------------------------------------------------
# Variables du problème
# les paramètres
tmin = 0 # T initial pris à 0
tmax = 1 # on va changer cette valeur en fonciton du tmax de mesure qu'on souhaite générer
r = 22         # parameter r (taux de croissance intrinsèque)
gamma = 0          # parameter gamma (taux de mortalité)
D = 1          # coefficient de diffusion


# Seuil pour les équilibres
if D != 0:
    print("R0=", r / (D * pi * pi))
# capacité limite
if gamma != 0:
    print("K=", r / gamma)

# La solution initiale et solution exacte quand elle existe :

# Initial solution
print("pi=", math.pi)


##solution exacte prise pour gamma=0
# Cas où gamma = 0
m=1
k=1
gamma=0.0
u_ex = Expression('exp((lambd-DD*(kk*kk+mm*mm)*pi*pi)*t)*sin(kk*pi*x[0])*sin(mm*pi*x[1])',degree=2,DD=D,lambd=r,mm=m,kk=k,t=0,pi=math.pi)

# Cas où gamma est non nul
C=10.0
a=1.0
#u_ex = Expression('1/pow(1+CC*exp(-5.0*aa*t/6.0+sqrt(6.0*aa)*x[1]/6.0),2)', degree=2, CC=C, aa=a, t=0.0)

#------------------------------------------------------------------
# Information for the generation of the data
# mesh for the generated data: (pour l'instant on prend le mesh de calcul)
xmin = ymin = 0
xmax = 1
ymax = 1
nx_data = 20
ny_data = 20
mesh_data = RectangleMesh(Point(xmin, ymin), Point(xmax, ymax), nx_data, ny_data,"crossed") # "right/left")
mesh_points_data=mesh_data.coordinates()
print(list(mesh_points_data))
Vdata = FunctionSpace(mesh_data, 'P', 1)
dimVdata = Vdata.dim()
dim_mesh_data = mesh_data.geometry().dim()
dof_coordinates = Vdata.tabulate_dof_coordinates().reshape(dimVdata,dim_mesh_data)
dof_coordinates.resize((dimVdata, dim_mesh_data))
dof_x = dof_coordinates[:, 0]
dof_y = dof_coordinates[:, 1]
udata = Function(Vdata)
nddl_data=udata.vector().get_local().size
plot(mesh_data)
pyplot.show()

# Time when we ganerate the data:
num_steps_data = 100
tmin = 0
tmax = 1
Tmeas = np.linspace (tmin, tmax, num_steps_data+1)
Tmeas = Tmeas[1:Tmeas.size]


#-----------------------------------------------------------------
# paramètres du calcul
num_steps = 1000  # number of time steps
dt = (tmax-tmin) / num_steps  # time step size
print("dt=", dt)

# Create mesh and define function space
#nx = ny = 40

# Rectangular domain
nx = 40
ny = 40
mesh = RectangleMesh(Point(xmin, ymin), Point(xmax, ymax), nx, ny, "right/left")

hmax = mesh.hmax()
dmax = 2 * mesh.rmax()
print("hmax=", hmax)
print("dmax=", dmax)
print("CFL : dt/h^2=", dt / (hmax * hmax))
print("dt/rmax^2=", dt / (dmax * dmax))
plot(mesh)
pyplot.show()
V = FunctionSpace(mesh, 'P', 2)

# Define boundary condition
u_ex.t = tmin

def boundary(x, on_boundary):
    return on_boundary

bc = DirichletBC(V, u_ex, boundary)

# Define initial value
u_n = interpolate(u_ex, V)
print(u_n)
p = plot_img(u_n,"img/uh_0.png","Initial solution (at t="+str(tmin)+")")

## test
u_proj = project(u_n,Vdata)
vect_u_proj = u_proj.vector().get_local()
#print(vect_u_proj)
filename = "data/udata_0.txt"
output = open(filename, 'w')
for lg in range(nddl_data):
  output.write("%.4g\t %.4g\t %.4g\t %.4g" % (tmin, dof_x[lg], dof_y[lg], vect_u_proj[lg]))
  output.write('\n')
output.close()


p = plot_img(u_proj,"img/uh_0_project.png","Initial solution PROJ (at t="+str(tmin)+")")


# Define variational problem
u = TrialFunction(V)
v = TestFunction(V)


# Variational formulation (backward-Euler scheme for the time derivative, Cranck-Nicolson scheme for the logistic term)
if D != 0:
    F = (1 - r * dt / 2. + gamma * dt * u_n) * u * v * dx + dt * D * dot(grad(u), grad(v)) * dx - (
                1 + r * dt / 2.) * u_n * v * dx
else:
    F = (1 - r * dt / 2. + gamma * dt * u_n) * u * v * dx - (1 + r * dt / 2.) * u_n * v * dx

a, L = lhs(F), rhs(F)

# Time-stepping
u = Function(V)
t = tmin

uex = Function(V)


pyplot.show()


s = 0
ind = 0
for n in range(num_steps):
    # Update current time
    t += dt
    u_ex.t = t

    bc = DirichletBC(V, u_ex, boundary)

    solve(a == L, u, bc)

    uex = interpolate(u_ex, V)

    # Save png image of the computes solution
    if (n + 1) % 10 == 0:
        p = plot_img(u, "img/uh_" + str(n + 1) + ".png",'u_h at t = %.4f' % (t))
        p = plot_img(uex, "img/uex_" + str(n + 1) + ".png", 'u_ex at t = %.4f' % (t))

    # write the solution in a file
    print(t,Tmeas[ind] )
    if abs(t-Tmeas[ind]) <dt*0.1:
        u_proj = project(u, Vdata)
        vect_u_proj = u_proj.vector().get_local()
        filename = "data/udata_"+str(ind+1)+".txt"
        output = open(filename, 'w')
        for lg in range(nddl_data):
            output.write("%.4g\t %.4g\t %.4g\t %.4g" % (t, dof_x[lg], dof_y[lg], vect_u_proj[lg]))
            output.write('\n')
        output.close()


        fig = pyplot.figure()
        #ax = fig.add_subplot(111, projection="3d")
        ax = Axes3D(fig)
        ax.scatter(dof_x, dof_y, vect_u_proj, c='b', marker='.')
        pyplot.title("toto")
        ##ax.scatter(x0, x1, vect_u_proj, c='b', marker='.')
        ## l'ordre des valeurs de x0 et x1 ne correspond pas avec l'ordre
        ## des coefficients de vect_u_proj
        ## Par contre, le même ordre est utiliser entre dof_x, dof_y et vect_uproj
        pyplot.show()
        ind +=1


    # Compute error at vertices
    error = np.abs(uex.vector().get_local() - u.vector().get_local()).max()
    err_rel = error / (np.abs(uex.vector().get_local()).max())
    print('t = %.4f: error = %.4g' % (t, error))
    print('t = %.4f: err_rel = %.4g' % (t, err_rel))
    s = s + (np.abs(uex.vector().get_local() - u.vector().get_local()).max()) * (
        np.abs(uex.vector().get_local() - u.vector().get_local()).max())

    # Update previous solution
    u_n.assign(u)

eh_dt = sqrt(dt * s)
print('dt = %.4f, h = %.4f : error = %.4g' % (dt, hmax, eh_dt))