# PINNS applied on the steady state 1D equation

In this notebook we will try to solve the steady state 1D momentum equation for blood flow in rigid domains. 

\begin{equation}
\frac{\partial}{\partial x } \left(\frac{Q^2}{A}\right) = -\frac{A}{\rho}\frac{\partial P}{\partial x} - \frac{8 \, \mu \, \pi \, Q}{\rho \, A}\,,
\end{equation}
which may be reformulated to

\begin{equation}
\frac{\partial P}{\partial x}  = -\frac{\rho}{A} \frac{\partial}{\partial x } \left(\frac{Q^2}{A}\right)- \frac{8 \, \mu \, \pi \, Q}{\, A^2}\,,
\end{equation}

In this case we'll treat Q as given in which and we may express P(x) as:

\begin{equation}
P\left(x\right)  = \int^x -\frac{\rho}{A} \frac{\partial}{\partial x } \left(\frac{Q^2}{A}\right) dx  + \int^x- \frac{8 \, \mu \, \pi \, Q}{\, A^2} dx\,,
\end{equation}

Our f function is defined as 
\begin{equation}
f  = \frac{\partial P}{\partial x} - I_c - I_f \,,
\end{equation}
where $I_c=-\frac{\rho}{A} \frac{\partial}{\partial x } \left(\frac{Q^2}{A}\right)$ and $I_f=- \frac{8 \, \mu \, \pi \, Q}{\, A^2}$, in which $I_c$ and $I_f$ are source terms.

## Code and functions to calculate solution and source terms for different geometries

In [None]:
import sympy as sp
import numpy as np
from scipy.integrate import quad
import matplotlib.pyplot as plt
%matplotlib inline
def sinusGeom_np(x, R0, Rmin, l):
    r = R0 + ((R0 - Rmin)/2)*(np.cos(2*np.pi*x/l) - 1)
    
    return r

def sinusGeom_sp(x, R0, Rmin, l):
    r = R0 + ((R0 - Rmin)/2)*(sp.cos(2*sp.pi*x/l) - 1)
    
    return r

def linearTapering(x, R0, Rmin, l):
    
    r = R0 + (Rmin - R0)*x
    
    return r

def integrand_friction_sp(r, mu, Q):
    
    A = sp.pi*r**2
    
    I_f_sp = -8*mu*sp.pi*Q/(A**2)
    
    return I_f_sp

def integrand_convective_sp(r, rho, Q):
    
    A = sp.pi*r**2
    
    I_c_sp = -rho*sp.diff(Q**2/A)/A
    
    return I_c_sp

def calcDeltaP(x, I_f, I_c, diffusive=True, convective=True, P_in=0):
    
    P = np.zeros(len(x))
    P[0] = P_in
    P_f = np.zeros(len(x))
    P_f[0] = P_in
    P_c = np.zeros(len(x))
    P_c[0] = P_in
    I_f_array = np.zeros(len(x))
    I_c_array = np.zeros(len(x))
    if diffusive:
        I_f_array[0] = I_f(x[0])
    if convective:
        I_c_array[0] = I_c(x[0])
    for n in range(len(x) - 1):        
        
        dp = 0
        dp_f = quad(I_f, x[n], x[n + 1])[0]
        dp_c = quad(I_c, x[n], x[n + 1])[0]
        if diffusive:
            dp += dp_f
            P_f[n + 1] = P_f[n] + dp_f
            I_f_array[n + 1] = I_f(x[n + 1])
        if convective:
            dp += dp_c
            P_c[n + 1] = P_c[n] + dp_c
            I_c_array[n + 1] = I_c(x[n + 1])
        P[n + 1] = P[n] + dp
    
    return P, P_f, P_c, I_f_array, I_c_array

def get1D_data(N, geometryType="sine", showPlots=True, diffusive=True, convective=True):

    R0 = 0.2    # [cm]
    Rmin = 0.05 # [cm]
    l = 1       # [cm]
    Q = 2       # [ml/s]
    rho = 1.05  # [g/cm^3]
    mu = 0.035  # [P] (g/(cm s))
    #N = 1001
    x_np = np.linspace(0, l, N)
    
    if geometryType == "sine":
        geomFunc_np = sinusGeom_np
        geomFunc_sp = sinusGeom_sp
    elif geometryType == "linearTapering":
        geomFunc_np = linearTapering
        geomFunc_sp = linearTapering
    elif geometryType == "constant":
        geomFunc_np = linearTapering
        geomFunc_sp = linearTapering
        R0 = Rmin
    r_np = geomFunc_np(x_np, R0, Rmin, l)

    x = sp.Symbol('x')
    r_sp = geomFunc_sp(x, R0, Rmin, l)#R0 + ((R0 - Rmin)/2)*(sp.cos(2*sp.pi*x/l) - 1)

    integrand_f = integrand_friction_sp(r_sp, mu, Q)
    integrand_c = integrand_convective_sp(r_sp, rho, Q)

    integrand_f = sp.lambdify([x], integrand_f)
    integrand_c = sp.lambdify([x], integrand_c)

    P_np, P_f_np, P_c_np, integrand_f_array, integrand_c_array = calcDeltaP(x_np, integrand_f, integrand_c,
                                                                           diffusive=diffusive, convective=convective)

    #showPlots = False
    if showPlots:
        plt.figure()
        plt.plot(x_np, r_np)
        #plt.plot(x_np, linearTapering_np(x_np, R0, Rmin, l))
        plt.xlabel("x [cm]")
        plt.ylabel("r [cm]")
        plt.figure()
        plt.plot(x_np, P_np/1333.2)
        plt.plot(x_np, P_f_np/1333.2)
        plt.plot(x_np, P_c_np/1333.2)
        plt.legend(["P", "P_f", "P_c"])
        #plt.plot(x_np, linearTapering_np(x_np, R0, Rmin, l))
        plt.xlabel("x [cm]")
        plt.ylabel("P [mmHg]")

        plt.figure()
        plt.plot(x_np, integrand_f_array/1333.2)
        plt.plot(x_np, integrand_c_array/1333.2)
        plt.legend(["I_f (dP_dx_f)", "I_c (dP_dx_c)"])
        #plt.plot(x_np, linearTapering_np(x_np, R0, Rmin, l))
        plt.xlabel("x [cm]")
        plt.ylabel("dP_dx [mmHg/cm]")
    
    return x_np, r_np, P_np, P_f_np, P_c_np, integrand_f_array, integrand_c_array

#x_np, r_np, P_np, P_f_np, P_c_np, integrand_f_array, integrand_c_array = get1D_data(101)

## Code and functions do define neural net, f-function and initialization of variables

In [None]:
def net_u(x):
    u = neural_net(x, weights, biases)
    return u
#===================================================

def net_f(x, I_f, I_c):
    
    u = net_u(x)
    
    u_x = tf.gradients(u, x)
    
    f =  u_x - I_f - I_c
    
    return f
#===================================================

def initialize_NN(layers):        
    weights = []
    biases = []
    num_layers = len(layers) 
    for l in range(0, num_layers - 1):
        W = xavier_init(size=[layers[l], layers[l+1]])
        #W = tf.Variable(tf.random_normal([layers[l], layers[l+1]], stddev=10))
        b = tf.Variable(tf.zeros([1,layers[l+1]], dtype=tf.float32), dtype=tf.float32)
        weights.append(W)
        biases.append(b)        
    return weights, biases
#===================================================
  
def xavier_init(size):
    in_dim = size[0]
    out_dim = size[1]        
    xavier_stddev = np.sqrt(2/(in_dim + out_dim))
    return tf.Variable(tf.truncated_normal([in_dim, out_dim], stddev=xavier_stddev), dtype=tf.float32)
#===================================================

def neural_net(X, weights, biases):
    num_layers = len(weights) + 1
    
    H = 2.0*(X - lb)/(ub - lb) - 1.0
    for l in range(0, num_layers - 2):
        W = weights[l]
        b = biases[l]
        H = tf.tanh(tf.add(tf.matmul(H, W), b))
    W = weights[-1]
    b = biases[-1]
    Y = tf.add(tf.matmul(H, W), b)
    return Y
#===================================================

def callback(loss):
    print('Loss:', loss)

## main program

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
#===================================================
# load data
#===================================================
N = 1001 # total number of x, P values
N_train = 10 # number of training points
N_train_f = 20 # number of training points
geometryType = "constant" # ["sine", "linearTapering", "constant"]
layers = [1, 5, 1]
diffusive = True
convective = True
showPlots = True
x, r, P, P_f, P_c, I_f, I_c = get1D_data(N, geometryType=geometryType, showPlots=showPlots,
                                                         diffusive=diffusive, convective=convective)
dyneTommHg = 1./1333.22368

scaleInputs = True
if scaleInputs:
    P *= dyneTommHg
    P_f *= dyneTommHg
    P_c *= dyneTommHg
    I_f *= dyneTommHg
    I_c *= dyneTommHg
    
#===================================================
# turn 1D array into 2D array of shape (N, 1)
#===================================================
x = x[:, np.newaxis]
P = P[:, np.newaxis]
I_f = I_f[:, np.newaxis]
I_c = I_c[:, np.newaxis]

In [None]:
#===================================================
# set parameters for neural network and training
#===================================================


init = tf.global_variables_initializer()

print(np.shape(I_f), np.shape(I_c))
#===================================================
# sample random points for training
#===================================================
idx = np.random.choice(x.shape[0], N_train, replace=False)
idx_f = np.random.choice(x.shape[0], N_train_f, replace=False)
x_train = x[idx]
P_train = P[idx]
#x_train = x[0:1, 0:1]
#P_train = P[0:1, 0:1]

x_train_f = x[idx_f]
I_f_train_f = np.interp(x_train_f.flatten(), x.flatten(), I_f.flatten())
I_f_train_f = I_f_train_f[:, np.newaxis]
I_c_train_f = np.interp(x_train_f.flatten(), x.flatten(), I_c.flatten())
I_c_train_f = I_c_train_f[:, np.newaxis]
print(np.shape((I_c_train_f)))
#===================================================
# set up placeholder for inputs and outputs
#===================================================
x_tf = tf.placeholder(tf.float32, shape=[None, x.shape[1]], name='x')
P_tf = tf.placeholder(tf.float32, shape=[None, P.shape[1]], name='P')

x_f_tf = tf.placeholder(tf.float32, shape=[None, x.shape[1]], name='x')
I_f_tf = tf.placeholder(tf.float32, shape=[None, I_f.shape[1]], name='integrand_f')
I_c_tf = tf.placeholder(tf.float32, shape=[None, I_c.shape[1]], name='integrand_c')
#===================================================
# initialize neural net and f functions
#===================================================
lb = x.min(0)
ub = x.max(0)
weights, biases = initialize_NN(layers)

P_pred = net_u(x_tf)
f_pred = net_f(x_f_tf, I_f_tf, I_c_tf) 

In [None]:
#===================================================
# plot y_pred before training
#===================================================
init = tf.global_variables_initializer()
with tf.Session() as sess:
    sess.run(init) # initialize variables
    P_pred_init = sess.run(P_pred, feed_dict = {x_tf:x_train})
plt.figure()
plt.plot(x.flatten(), P.flatten())
plt.plot(x_train.flatten(), P_train.flatten(), 'o')
plt.plot(x_train.flatten(), P_pred_init.flatten())

In [None]:
#===================================================
# train the model using PINNS and GradientDescentOptimizer
#===================================================

MSE_u = tf.reduce_mean(tf.square((P_tf - P_pred)))
MSE_f = tf.reduce_mean(tf.square(f_pred))/100
loss = MSE_u + MSE_f
learning_rate_value = 0.01
learning_rate = tf.placeholder(tf.float32, shape=[])
learning_rate_value_late = 0.01#25
epochs = 10000
optimiser = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)

print_every_N_batch = 1000
with tf.Session() as sess:
    sess.run(init) # initialize variables
    for epoch in range(epochs):
        avg_cost = 0
        if epoch < 2000:
            learning_rate_value_epoch = learning_rate_value
        else:
            learning_rate_value_epoch = learning_rate_value_late
        _, c, MSE_u_value, MSE_f_value = sess.run([optimiser, loss, MSE_u, MSE_f], 
                     feed_dict={x_tf: x_train, P_tf: P_train, 
                                x_f_tf: x_train_f, I_f_tf: I_f_train_f,
                                I_c_tf: I_c_train_f, learning_rate: learning_rate_value_epoch})
        
        
        avg_cost += c
        if epoch % print_every_N_batch == 0:
            print("Epoch:", (epoch + 1), "cost =", "{:.6f}".format(avg_cost), "MSE_u =", "{:.6f}".format(MSE_u_value), "MSE_f =", "{:.6f}".format(MSE_f_value))

    P_result = sess.run(P_pred, feed_dict = {x_tf:x})
    P_result_f = sess.run(P_pred, feed_dict = {x_tf: x_train_f})
    plt.figure()
    plt.plot(x.flatten(), P.flatten())
    plt.plot(x.flatten(), P_result.flatten(), '--')
    plt.plot(x_train.flatten(), P_train.flatten(), 'o')
    plt.plot(x_train_f.flatten(), P_result_f.flatten(), 'o')
    plt.xlabel("x [cm]")
    plt.ylabel("P [mmHg]")
    plt.legend(["P(x)", "P_PINN", "P_train", "P_train_f"])
    

In [None]:
#===================================================
# train the model using regular NN and GradientDescentOptimizer
#===================================================

loss_NN = MSE_u
learning_rate_value = 0.01
learning_rate = tf.placeholder(tf.float32, shape=[])
learning_rate_value_late = 0.01#25
epochs = 10000
optimiser = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)

print_every_N_batch = 1000
with tf.Session() as sess:
    sess.run(init) # initialize variables
    for epoch in range(epochs):
        avg_cost = 0
        if epoch < 2000:
            learning_rate_value_epoch = learning_rate_value
        else:
            learning_rate_value_epoch = learning_rate_value_late
        _, c, MSE_u_value, MSE_f_value = sess.run([optimiser, loss_NN, MSE_u, MSE_f], 
                                 feed_dict={x_tf: x_train, P_tf: P_train, 
                                            x_f_tf: x_train_f, I_f_tf: I_f_train_f,
                                            I_c_tf: I_c_train_f, learning_rate: learning_rate_value_epoch})
        
        
        avg_cost += c
        if epoch % print_every_N_batch == 0:
            print("Epoch:", (epoch + 1), "cost =", "{:.6f}".format(avg_cost), "MSE_u =", "{:.6f}".format(MSE_u_value), "MSE_f =", "{:.6f}".format(MSE_f_value))

    P_result = sess.run(P_pred, feed_dict = {x_tf:x})
    P_result_f = sess.run(P_pred, feed_dict = {x_tf: x_train_f})
    plt.figure()
    plt.plot(x.flatten(), P.flatten())
    plt.plot(x.flatten(), P_result.flatten(), '--')
    plt.plot(x_train.flatten(), P_train.flatten(), 'o')
    plt.plot(x_train_f.flatten(), P_result_f.flatten(), 'o')
    plt.legend(["P(x)", "P_PINN", "P_train", "P_train_f"])

## Alternative opimizer PINN

In [None]:
optimizer = tf.contrib.opt.ScipyOptimizerInterface(loss, 
                                                   method = 'L-BFGS-B', 
                                                   options = {'maxiter': 500,
                                                              'maxfun': 50000,
                                                              'maxcor': 50,
                                                              'maxls': 50,
                                                              'ftol' : 1.0 * np.finfo(float).eps})
with tf.Session() as sess:
    sess.run(init)
    tf_dict = {x_tf: x_train, P_tf: P_train, 
               x_f_tf: x_train_f, I_f_tf: I_f_train_f,
               I_c_tf: I_c_train_f}
    optimizer.minimize(sess, 
                       feed_dict = tf_dict,         
                       fetches = [loss], 
                       loss_callback = callback)

    P_result = sess.run(P_pred, feed_dict = {x_tf: x_train_f})
    P_result_all = sess.run(P_pred, feed_dict = {x_tf: x})
    plt.figure()
    plt.plot(x.flatten(), P.flatten())
    plt.plot(x.flatten(), P_result_all.flatten(), '--')
    plt.plot(x_train.flatten(), P_train.flatten(), 'o')
    plt.plot(x_train_f.flatten(), P_result.flatten(), 'o')
    

## Alternative opimizer regular NN

In [None]:
loss_NN = tf.reduce_mean(tf.square((P_tf - P_pred)))
optimizer = tf.contrib.opt.ScipyOptimizerInterface(loss_NN, 
                                                   method = 'L-BFGS-B', 
                                                   options = {'maxiter': 500,
                                                              'maxfun': 50000,
                                                              'maxcor': 50,
                                                              'maxls': 50,
                                                              'ftol' : 1.0 * np.finfo(float).eps})

x_train_NN = x[idx_f]
P_train_NN = P[idx_f]
with tf.Session() as sess:
    sess.run(init)
    tf_dict = {x_tf: x_train_NN, P_tf: P_train_NN}
    optimizer.minimize(sess, 
                       feed_dict = tf_dict,         
                       fetches = [loss_NN], 
                       loss_callback = callback)

    P_result = sess.run(P_pred, feed_dict = {x_tf: x_train_NN})
    P_result_all = sess.run(P_pred, feed_dict = {x_tf: x})
    plt.figure()
    plt.plot(x.flatten(), P.flatten())
    plt.plot(x.flatten(), P_result_all.flatten(), '--')
    plt.plot(x_train_NN.flatten(), P_train_NN.flatten(), 'o')
    plt.plot(x_train_NN.flatten(), P_result.flatten(), 'o')