In [25]:
import tensorflow as tf
import datetime, os
#hide tf logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # or any {'0', '1', '2'},
#0 (default) shows all, 1 to filter out INFO logs, 2 to additionally filter out WARNING logs, and 3 to additionally filter out ERROR logs
import scipy.optimize
import scipy.io
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker
import time
from pyDOE import lhs
import pandas as pd
import seaborn as sns 
import codecs, json

# generates same random numbers each time
np.random.seed(1234)
tf.random.set_seed(1234)

print("TensorFlow version: {}".format(tf.__version__))

TensorFlow version: 2.8.0


# Data Prep

Training and Testing data is prepared from the solution file

In [26]:
x_1 = np.linspace(-1,1,256)
x_2 = np.linspace(1,-1,256)
x_3 = np.linspace(-1,1,256)

X, Y, Z = np.meshgrid(x_1,x_2,x_3) 

# Test Data

We prepare the test data to compare against the solution produced by the SAPINN.

In [27]:
X_v_test = np.hstack((X.flatten(order='F')[:,None], Y.flatten(order='F')[:,None],Z.flatten(order='F')[:,None]))

lb = np.array([-1, -1, -1]) #lower bound
ub = np.array([1, 1, 1])  #upper bound

vsol = 1/(4 * np.pi*np.sqrt(X**2 + Y**2+Z**2))+np.sin(X*Y+Z)

v = vsol.flatten('F')[:,None] 

# Training Data

In [28]:
def trainingdata(N_v,N_f):
    
    leftedge_x = np.hstack((X[0,:].flatten('a')[:,None], Y[0,:].flatten('a')[:,None], Z[0,:].flatten('a')[:,None]))
    leftedge_v = vsol[0,:].flatten('a')[:,None]
    
    rightedge_x = np.hstack((X[-1,:].flatten('a')[:,None], Y[-1,:].flatten('a')[:,None], Z[-1,:].flatten('a')[:,None]))
    rightedge_v = vsol[-1,:].flatten('a')[:,None]
    
    backedge_x = np.hstack((X[:,0].flatten('a')[:,None], Y[:,0].flatten('a')[:,None], Z[:,0].flatten('a')[:,None]))
    backedge_v = vsol[:,0].flatten('a')[:,None]
    
    frontedge_x = np.hstack((X[:,-1].flatten('a')[:,None], Y[:,-1].flatten('a')[:,None], Z[:,-1].flatten('a')[:,None]))
    frontedge_v = vsol[:,-1].flatten('a')[:,None]
    
    bottomedge_x = np.hstack((X[:,:,0].flatten('a')[:,None], Y[:,:,0].flatten('a')[:,None], Z[:,:,0].flatten('a')[:,None]))
    bottomedge_v = vsol[:,:,0].flatten('a')[:,None]
    
    topedge_x = np.hstack((X[:,:,-1].flatten('a')[:,None], Y[:,:,-1].flatten('a')[:,None], Z[:,:,-1].flatten('a')[:,None]))
    topedge_v = vsol[:,:,-1].flatten('a')[:,None]
    
    all_X_v_train = np.vstack([leftedge_x, rightedge_x, backedge_x, frontedge_x, bottomedge_x, topedge_x])
    all_v_train = np.vstack([leftedge_v, rightedge_v, backedge_v, frontedge_v, bottomedge_v, topedge_v])  
     
    #choose random N_v points for training
    idx = np.random.choice(all_X_v_train.shape[0], N_v, replace=False) 
    
    X_v_train = all_X_v_train[idx[0:N_v], :] #choose indices from  set 'idx' (x,t)
    v_train = all_v_train[idx[0:N_v],:]      #choose corresponding v
    
    #Collocation Points

    X_f = lb + (ub-lb)*lhs(3,N_f)
    X_f_train = np.vstack((X_f, X_v_train)) # append training points to collocation points 
    
    return X_f_train, X_v_train, v_train

# PINN

Creating sequential layers using the class tf.Module


In [29]:
class Sequentialmodel(tf.Module): 
    def __init__(self, layers, name=None):

        self.W = []  #Weights and biases
        self.parameters = 0 #total number of parameters

        for i in range(len(layers)-1):

            input_dim = layers[i]
            output_dim = layers[i+1]

            #Xavier standard deviation 
            std_dv = np.sqrt((2.0/(input_dim + output_dim)))

            #weights = normal distribution * Xavier standard deviation + 0
            w = tf.random.normal([input_dim, output_dim], dtype = 'float64') * std_dv

            w = tf.Variable(w, trainable=True, name = 'w' + str(i+1))

            b = tf.Variable(tf.cast(tf.zeros([output_dim]), dtype = 'float64'), trainable = True, name = 'b' + str(i+1))

            self.W.append(w)
            self.W.append(b)

            self.parameters +=  input_dim * output_dim + output_dim
            
        # Lagrange multipliers
        
        # Boundary terms      
        self.lagrange_1 = tf.Variable(tf.cast(tf.ones([N_v,1]), dtype = 'float64'), trainable = True) 
        
        # Residual terms
        self.lagrange_2 = tf.Variable(tf.cast(tf.ones([N_f+N_v,1]), dtype = 'float64'), trainable = True)
        
        # Circle terms
        self.lagrange_3 = tf.Variable(tf.cast(tf.ones([N_f+N_v,1]), dtype = 'float64'), trainable = True) 

    
    def evaluate(self,x):
        
        #preprocessing input 
        x = (x - lb)/(ub - lb) #feature scaling
        
        a = x
        
        for i in range(len(layers)-2):
            
            z = tf.add(tf.matmul(a, self.W[2*i]), self.W[2*i+1])
            a = tf.nn.tanh(z)
            
        a = tf.add(tf.matmul(a, self.W[-2]), self.W[-1]) # For regression, no activation to last layer
        
        return a
    
    def get_weights(self):

        parameters_1d = []  # [.... W_i,b_i.....  ] 1d array
        
        for i in range (len(layers)-1):
            
            w_1d = tf.reshape(self.W[2*i],[-1])   #flatten weights 
            b_1d = tf.reshape(self.W[2*i+1],[-1]) #flatten biases
            
            parameters_1d = tf.concat([parameters_1d, w_1d], 0) #concat weights 
            parameters_1d = tf.concat([parameters_1d, b_1d], 0) #concat biases
        
        return parameters_1d
        
    def set_weights(self,parameters):
                
        for i in range (len(layers)-1):

            shape_w = tf.shape(self.W[2*i]).numpy() # shape of the weight tensor
            size_w = tf.size(self.W[2*i]).numpy() #size of the weight tensor 
            
            shape_b = tf.shape(self.W[2*i+1]).numpy() # shape of the bias tensor
            size_b = tf.size(self.W[2*i+1]).numpy() #size of the bias tensor 
                        
            pick_w = parameters[0:size_w] #pick the weights 
            self.W[2*i].assign(tf.reshape(pick_w,shape_w)) # assign  
            parameters = np.delete(parameters,np.arange(size_w),0) #delete 
            
            pick_b = parameters[0:size_b] #pick the biases 
            self.W[2*i+1].assign(tf.reshape(pick_b,shape_b)) # assign 
            parameters = np.delete(parameters,np.arange(size_b),0) #delete 

            
    def loss_BC(self,x,y):

        g = tf.Variable(x, dtype = 'float64', trainable = False)
        
        x_1_f = g[:,0:1]
        x_2_f = g[:,1:2]
        x_3_f = g[:,2:3]
        
        s = y-self.evaluate(x)
        s = self.lagrange_1*s
        loss_v = tf.reduce_mean(tf.square(s))
        
        return loss_v

    def loss_PDE(self, x_to_train_f):
    
        g = tf.Variable(x_to_train_f, dtype = 'float64', trainable = False)
        
        x_1_f = g[:,0:1]
        x_2_f = g[:,1:2]
        x_3_f = g[:,2:3]

        with tf.GradientTape(persistent=True) as tape:

            tape.watch(x_1_f)
            tape.watch(x_2_f)
            tape.watch(x_3_f)

            g = tf.stack([x_1_f[:,0], x_2_f[:,0], x_3_f[:,0]], axis=1)

            v = self.evaluate(g)
            v_x_1 = tape.gradient(v,x_1_f)
            v_x_2 = tape.gradient(v,x_2_f)
            v_x_3 = tape.gradient(v,x_3_f)

        v_xx_1 = tape.gradient(v_x_1,x_1_f)
        v_xx_2 = tape.gradient(v_x_2,x_2_f)
        v_xx_3 = tape.gradient(v_x_3,x_3_f)

        del tape
        
        H = 0.075
        
        k = x_1_f**2 + x_2_f**2 + x_3_f**2 + 1
        q = 12/(np.pi * H**2) * (5/H**2 * (x_1_f**2 + x_2_f**2+ x_3_f**2) - 8/H * np.sqrt(x_1_f**2 + x_2_f**2+ x_3_f**2) + 3)
        q = tf.where(x_1_f**2+x_2_f**2+ x_3_f**2 >= H**2, 0.0, q)
        
        gg = (x_1_f**2 + x_2_f**2 + x_3_f**2 + 1)*(x_1_f**2 + x_2_f**2 +1)*np.sin(x_1_f*x_2_f+x_3_f)-(4*x_1_f*x_2_f+2*x_3_f)*np.cos(x_1_f*x_2_f+x_3_f)+1/(2*np.pi*np.sqrt(x_1_f**2 + x_2_f**2 + x_3_f**2))
        
        f = -k*(v_xx_1 + v_xx_2 + v_xx_3) - 2*x_1_f*v_x_1 - 2*x_2_f*v_x_2 - 2*x_3_f*v_x_3 - k*q-gg

        f = tf.where(x_1_f**2+x_2_f**2+x_3_f**2 >= H**2, self.lagrange_2 * f, f)
        f = tf.where(x_1_f**2+x_2_f**2+x_3_f**2 < H**2, self.lagrange_3 * f, f)
        
        loss_f = tf.reduce_mean(tf.square(f))

        return loss_f, f
    
    def loss(self,x,y,g,sigma):

        loss_v = self.loss_BC(x,y)
        loss_f, f = self.loss_PDE(g)

        loss = sigma/2 * loss_v + loss_f

        return loss, loss_v, loss_f 
    
    def optimizerfunc(self,parameters):
        
        self.set_weights(parameters)
       
        with tf.GradientTape() as tape:
            
            tape.watch(self.trainable_variables)
            loss_val, loss_v, loss_f = self.loss(X_v_train, v_train, X_f_train, sigma)
            
        grads = tape.gradient(loss_val,self.trainable_variables)
        
        del tape
        
        grads_1d = [ ] #store 1d grads 
        
        for i in range (len(layers)-1):

            grads_w_1d = tf.reshape(grads[2*i],[-1]) #flatten weights 
            grads_b_1d = tf.reshape(grads[2*i+1],[-1]) #flatten biases

            grads_1d = tf.concat([grads_1d, grads_w_1d], 0) #concat grad_weights 
            grads_1d = tf.concat([grads_1d, grads_b_1d], 0) #concat grad_biases
        
        return loss_val.numpy(), grads_1d.numpy()
    
    def optimizer_callback(self,parameters):
                
        loss_value, loss_v, loss_f = self.loss(X_v_train, v_train, X_f_train, sigma)
        
        v_pred = self.evaluate(X_v_test)
        error_vec = np.linalg.norm((v-v_pred),2)/np.linalg.norm(v,2)
        
        tf.print(loss_value, loss_v, loss_f, error_vec)
        
    def adaptive_gradients(self):

        with tf.GradientTape() as tape:
            tape.watch(self.W)
            loss_val, loss_v, loss_f = self.loss(X_v_train, v_train, X_f_train, sigma)

        grads = tape.gradient(loss_val,self.W)

        del tape

        with tf.GradientTape(persistent = True) as tape:
            tape.watch(self.lagrange_1)
            tape.watch(self.lagrange_2)
            tape.watch(self.lagrange_3)
            loss_val, loss_v, loss_f = self.loss(X_v_train, v_train, X_f_train,sigma)

        grads_L1 = tape.gradient(loss_val,self.lagrange_1) # boundary terms
        grads_L2 = tape.gradient(loss_val,self.lagrange_2) # residual terms
        grads_L3 = tape.gradient(loss_val,self.lagrange_3)
        
        del tape

        return loss_val, grads, grads_L1, grads_L2, grads_L3

# Loss Function

The loss function consists of two parts:

loss_BC: MSE error of boundary losses
loss_PDE: MSE error of collocation points satisfying the PDE

loss = loss_BC + loss_PDE


In [30]:
N_v = 600 #Total number of data points for 'u'
N_f = 10000 #Total number of collocation points 

# Training data
X_f_train, X_v_train, v_train = trainingdata(N_v,N_f)

iter_vec = []
error_vec = []

layers = np.array([3,5,5,1]) #2 hidden layers

maxcor = 200 
max_iter = 1
iteration = 0

start_time = time.time()

sigma = 1153.3

PINN = Sequentialmodel(layers,sigma)

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

optimizer_L1 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

optimizer_L2 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

optimizer_L3 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

num_epochs = 1

for epoch in range(num_epochs):
    
        loss_value, grads, grads_L1, grads_L2, grads_L3 = PINN.adaptive_gradients()

        if epoch % 100 == 0:
            tf.print(loss_value)
        
        optimizer.apply_gradients(zip(grads, PINN.W)) #gradient descent weights 
        optimizer_L1.apply_gradients(zip([-grads_L1], [PINN.lagrange_1])) # gradient ascent adaptive coefficients of boundary residual
        optimizer_L2.apply_gradients(zip([-grads_L2], [PINN.lagrange_2])) # gradient ascent adaptive coefficients of PDE residual
        optimizer_L3.apply_gradients(zip([-grads_L3], [PINN.lagrange_3]))  

init_params = PINN.get_weights().numpy()

results = scipy.optimize.minimize(fun = PINN.optimizerfunc,
                                      x0 = init_params,
                                      args=(),
                                      method='BFGS',
                                      jac= True, # If jac is True, fun is assumed to return the gradient along with the objective function
                                      callback = None, 
                                      options = {'disp': None,
                                                'maxcor': maxcor, 
                                                'ftol': 5e-10, # 1 * np.finfo(float).eps,  #The iteration stops when (f^k - f^{k+1})/max{|f^k|,|f^{k+1}|,1} <= ftol
                                                'gtol': 5e-10, 
                                                'maxfun':  50000, 
                                                'maxiter': 100,
                                                'iprint': -1,   #print update every 50 iterations
                                                'maxls': 1})
PINN.set_weights(results.x)
v_pred = PINN.evaluate(X_v_test)
error = np.linalg.norm((v-v_pred),2)/np.linalg.norm(v,2)
error_vec.append(error)
iter_vec.append(iteration)


while iteration<2000:
    iteration += 100
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    optimizer_L1 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    optimizer_L2 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    optimizer_L3 = tf.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

    num_epochs = 1

    for epoch in range(num_epochs):
    
        loss_value, grads, grads_L1, grads_L2, grads_L3 = PINN.adaptive_gradients()

        if epoch % 100 == 0:
            tf.print(loss_value)
        
        optimizer.apply_gradients(zip(grads, PINN.W)) #gradient descent weights 
        optimizer_L1.apply_gradients(zip([-grads_L1], [PINN.lagrange_1])) # gradient ascent adaptive coefficients of boundary residual
        optimizer_L2.apply_gradients(zip([-grads_L2], [PINN.lagrange_2])) # gradient ascent adaptive coefficients of PDE residual
        optimizer_L3.apply_gradients(zip([-grads_L3], [PINN.lagrange_3]))  
    
    results = scipy.optimize.minimize(fun = PINN.optimizerfunc,
                                      x0 = results.x,
                                      args=(),
                                      method='BFGS',
                                      jac= True, # If jac is True, fun is assumed to return the gradient along with the objective function
                                      callback = None, 
                                      options = {'disp': None,
                                                'maxcor': maxcor, 
                                                'ftol': 5e-10, # 1 * np.finfo(float).eps,  #The iteration stops when (f^k - f^{k+1})/max{|f^k|,|f^{k+1}|,1} <= ftol
                                                'gtol': 5e-10, 
                                                'maxfun':  50000, 
                                                'maxiter': 100,
                                                'iprint': -1,   #print update every 50 iterations
                                                'maxls': 1})
    PINN.set_weights(results.x)
    v_pred = PINN.evaluate(X_v_test)
    error = np.linalg.norm((v-v_pred),2)/np.linalg.norm(v,2)
    error_vec.append(error)
    iter_vec.append(iteration)
    
elapsed = time.time() - start_time              
print('Training time: %.2f' % (elapsed))

print(iter_vec)
print(error_vec)

200.357011818168


  results = scipy.optimize.minimize(fun = PINN.optimizerfunc,


0.52383826453731053


  results = scipy.optimize.minimize(fun = PINN.optimizerfunc,


0.12416519166930695
0.04843077684901731
0.029653338770872402
0.02242965656926859
0.01785200243341974
0.015815514657602878
0.015019332376539442
0.014594031848754848
0.014275726145896867
0.01396651899406743
0.01376088876626501
0.013538001276226716
0.013323449823427714
0.013094968774045746
0.01282745768824604
0.012603863894164779
0.012332049856907642
0.012143460076640057
0.011997219490094857
Training time: 381.91
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000]
[0.10824996866732187, 0.10035482579050334, 0.09553931699953519, 0.09520325791635274, 0.09376741791025317, 0.09379267983027323, 0.09352553368986054, 0.0935240242033914, 0.09358932430787678, 0.09401957041257544, 0.0935762863376287, 0.09353384618674976, 0.09356552505002386, 0.09364326440397155, 0.09343868622675133, 0.09324676069575197, 0.09297196074197739, 0.09280255374541165, 0.09286439888029127, 0.09267334068270981, 0.09273274541014127]
