Demo code for PINN <br>
Author: yuanye_zhou@buaa.edu.cn

## 1. 基础定义

In [None]:
# 导入库

import paddle
import numpy as np
import matplotlib.pyplot as plt

# 定义网络

class MyNet(paddle.nn.Layer):
    def __init__(self,numIN,numOUT,numHidden,numLayer):
        super().__init__()
        self.IN = paddle.nn.Linear(numIN,numHidden) # 输入层
        self.FC_layers = [] # 隐藏层
        for i in range(numLayer - 1):
            fc_layer = paddle.nn.Linear(numHidden,numHidden)
            self.FC_layers.append(fc_layer)
        self.OUT = paddle.nn.Linear(numHidden,numOUT)  # 输出层
        self.ACT = paddle.nn.functional.tanh
    

    def forward(self, inputs):
        H = inputs
        H = self.IN(H)
        H = self.ACT(H)
        for fc_layer in self.FC_layers:
            H = fc_layer(H)
            H = self.ACT(H)
        outputs = self.OUT(H)      
        return outputs

## 2. 具体案例 
（代码执行顺序：0. 重启内核; 1.基础定义; 2.任选一个小节案例代码运行）

### 2.1 1D poisson 正问题

In [None]:
# 定义 DataLoss 和 Equation Loss

class DataLoss(paddle.nn.Layer):

    def __init__(self,nn_fun):
        super().__init__()
        self.fun = nn_fun 

    def forward(self, X, Y_true):
        Y_pred = self.fun(X)
        loss = paddle.sum(paddle.square(Y_pred - Y_true))
        return loss

class EqLoss(paddle.nn.Layer):

    def __init__(self,nn_fun):
        super().__init__()
        self.fun = nn_fun

    def forward(self, X):
        x = X[:,0].reshape([X.shape[0],1])
        X = x
        u = self.fun(X)
        u_x = paddle.grad(u,x,create_graph=True,retain_graph=True)[0]
        u_xx = paddle.grad(u_x,x,create_graph=True,retain_graph=True)[0]
        eq = np.pi**2*paddle.sin(np.pi*x) + u_xx # poisson equation, u'' = pi^2 * sin(pi*x)
        loss = paddle.sum(paddle.square(eq))
        return loss

In [None]:
# 生成训练数据点
X = np.linspace(-1,1,100)
X = paddle.to_tensor(X,stop_gradient=False,dtype='float32')
X = X.reshape([100,1])
X_bc = np.linspace(-1,1,2)
X_bc = paddle.to_tensor(X_bc,stop_gradient=True,dtype='float32')
X_bc = X_bc.reshape([2,1])
Y_bc = paddle.zeros([2,1],dtype='float32')

# 调用网络
numIN = 1
numOUT = 1
numHidden = 8
numLayer = 2
nn_fun = MyNet(numIN,numOUT,numHidden,numLayer)

# 调用Loss
Loss1 = DataLoss(nn_fun)
Loss2 = EqLoss(nn_fun)

# 调用优化器
lr = 0.0001
train_parameter = nn_fun.parameters()
optimizer = paddle.optimizer.Adam(learning_rate=lr,parameters=train_parameter)

In [None]:
# 训练网络
epoch = 1


while (epoch <= 20000):

    lossData = Loss1(X_bc,Y_bc)
    lossData.backward(retain_graph=True)

    lossEQ = Loss2(X)
    lossEQ.backward(retain_graph=True)

    optimizer.step()
    optimizer.clear_grad()

    if epoch % 100 == 0:
        print('epoch:',epoch,'loss_data:',float(lossData),'loss_eq:',float(lossEQ))
    epoch += 1

In [None]:
# 输出结果

Y_pred = nn_fun(X)
Y_true = paddle.sin(np.pi*X)

In [None]:
# 预测值与真实值对比

plt.plot(Y_pred)
plt.plot(Y_true,marker='.')

### 2.2 unsteady Burgers equation 正问题

In [None]:
# 定义 DataLoss 和 Equation Loss

class DataLoss(paddle.nn.Layer):

    def __init__(self,nn_fun):
        super().__init__()
        self.fun = nn_fun 

    def forward(self, X, Y_true):
        Y_pred = self.fun(X)
        loss = paddle.sum(paddle.square(Y_pred - Y_true))
        return loss

class EqLoss(paddle.nn.Layer):

    def __init__(self,nn_fun,miu):
        super().__init__()
        self.fun = nn_fun
        self.miu = miu

    def forward(self, X):
        t = X[:,0].reshape([X.shape[0],1])
        x = X[:,1].reshape([X.shape[0],1])
        X = paddle.concat([t,x],axis=-1)
        u = self.fun(X)
        u_t = paddle.grad(u,t,create_graph=True,retain_graph=True)[0]
        u_x = paddle.grad(u,x,create_graph=True,retain_graph=True)[0]
        u_xx = paddle.grad(u_x,x,create_graph=True,retain_graph=True)[0]
        eq = u_t + u*u_x - self.miu*u_xx # burgers equation, dudt + u*dudx = miu*ddudx
        loss = paddle.sum(paddle.square(eq))
        return loss

In [None]:
# 生成训练数据点

def genData(t_start,t_end,nt,x_start,x_end,nx):
    t = np.linspace(t_start,t_end,nt)
    x = np.linspace(x_start,x_end,nx)
    T,X = np.meshgrid(t,x)
    T = T.reshape([nx*nt,1])
    X = X.reshape([nx*nt,1])
    T = paddle.to_tensor(T,dtype='float32')
    X = paddle.to_tensor(X,dtype='float32')
    TX = paddle.concat([T,X],axis=-1)
    return TX

TX_domain = genData(0,1,100,-1,1,100)
TX_bc = genData(0,1,100,-1,1,2)
Y_bc = paddle.zeros([2*100,1],dtype='float32')
TX_ic = genData(0,0,1,-1,1,100)
Y_ic = -paddle.sin(np.pi*TX_ic[:,1:2])

TX_domain.stop_gradient = False

# 调用网络
numIN = 2
numOUT = 1
numHidden = 48
numLayer = 4
nn_fun = MyNet(numIN,numOUT,numHidden,numLayer)
try:
    load_net_params = paddle.load('/home/aistudio/' + 'burgers_params')
    nn_fun.set_state_dict(load_net_params)
except:
    print('no saved params')

# 调用Loss
miu = 0.01/np.pi
Loss1 = DataLoss(nn_fun)
Loss2 = EqLoss(nn_fun,miu)

# 调用优化器
lr = 0.001
train_parameter = nn_fun.parameters()
optimizer = paddle.optimizer.Adam(learning_rate=lr,parameters=train_parameter)

In [None]:
# 训练网络
epoch = 1

while (epoch <= 100000):

    lossData_bc = 100*Loss1(TX_bc,Y_bc)
    lossData_bc.backward()

    lossData_ic = 100*Loss1(TX_ic,Y_ic)
    lossData_ic.backward()

    lossEQ = Loss2(TX_domain)
    lossEQ.backward()

    optimizer.step()
    optimizer.clear_grad()

    if epoch % 100 == 0:
        print('epoch:',epoch,'loss_data_bc:',float(lossData_bc),'loss_data_ic:',float(lossData_ic),'loss_eq:',float(lossEQ))
        paddle.save(nn_fun.state_dict(),'/home/aistudio/' + 'burgers_params') 
    epoch += 1

In [None]:
# 输出结果可视化

Y_pred = nn_fun(TX_domain)
plt.contourf(Y_pred.reshape([100,100]), 100, cmap='RdBu_r', zorder=1)
plt.colorbar()

In [None]:
TX_test = genData(0,0,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))
TX_test = genData(0.1,0.1,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))
TX_test = genData(1,1,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))

### 2.2.1 自适应残差采样

In [None]:
# Residual Based Sampling

class Eq(paddle.nn.Layer):

    def __init__(self,nn_fun,miu):
        super().__init__()
        self.fun = nn_fun
        self.miu = miu

    def forward(self, X):
        t = X[:,0].reshape([X.shape[0],1])
        x = X[:,1].reshape([X.shape[0],1])
        X = paddle.concat([t,x],axis=-1)
        u = self.fun(X)
        u_t = paddle.grad(u,t,create_graph=True,retain_graph=True)[0]
        u_x = paddle.grad(u,x,create_graph=True,retain_graph=True)[0]
        u_xx = paddle.grad(u_x,x,create_graph=True,retain_graph=True)[0]
        eq = u_t + u*u_x - self.miu*u_xx # burgers equation, dudt + u*dudx = miu*ddudx
        return eq


calEQ = Eq(nn_fun,miu) 

# 初始化 sample points

TX_sample = genData(0,1,10,-1,1,10)
TX_sample.stop_gradient = False

epoch = 1
while (epoch <= 1000):

    lossData_bc = 100*Loss1(TX_bc,Y_bc)
    lossData_bc.backward()

    lossData_ic = 100*Loss1(TX_ic,Y_ic)
    lossData_ic.backward()

    lossEQ = Loss2(TX_sample)
    lossEQ.backward()

    optimizer.step()
    optimizer.clear_grad()

    if epoch % 100 == 0:
        print('epoch:',epoch,'loss_data_bc:',float(lossData_bc),'loss_data_ic:',float(lossData_ic),'loss_eq:',float(lossEQ))
    epoch += 1



In [None]:
# 自适应采样

epoch = 1
while (epoch <= 20000):

    lossData_bc = 100*Loss1(TX_bc,Y_bc)
    lossData_bc.backward()

    lossData_ic = 100*Loss1(TX_ic,Y_ic)
    lossData_ic.backward()

    lossEQ = Loss2(TX_sample)
    lossEQ.backward(retain_graph=True)

    optimizer.step()
    optimizer.clear_grad()

    if epoch % 100 == 0:
        print('epoch:',epoch,'loss_data_bc:',float(lossData_bc),'loss_data_ic:',float(lossData_ic),'loss_eq:',float(lossEQ))
    epoch += 1

    # update sample points every 1000 epoch
    if epoch % 1000 == 0:
        R = calEQ(TX_domain)
        # find large R points
        TX_sample_add = paddle.where(paddle.abs(R)>1,TX_domain,paddle.zeros([TX_domain.shape[0],TX_domain.shape[1]]))
        # remove zero points
        TX_sample_add = paddle.unique(TX_sample_add,axis=0)
        TX_sample = paddle.concat([TX_sample,TX_sample_add],axis=0)      
        print('number of sample points:',TX_sample.shape[0])  
        (R-R).backward()
        optimizer.step()
        optimizer.clear_grad()

In [None]:
# 输出结果可视化

Y_pred = nn_fun(TX_domain)
plt.contourf(Y_pred.reshape([100,100]), 100, cmap='RdBu_r', zorder=1)
plt.colorbar()

In [None]:
TX_test = genData(0,0,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))
TX_test = genData(0.1,0.1,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))
TX_test = genData(1,1,1,-1,1,100)
plt.plot(TX_test[:,1],nn_fun(TX_test))

### 2.3 1D 反问题

In [None]:
# # 定义 DataLoss 和 Equation Loss

class DataLoss(paddle.nn.Layer):

    def __init__(self,nn_fun):
        super().__init__()
        self.fun = nn_fun 

    def forward(self, X, Y_true):
        Y_pred = self.fun(X)
        loss = paddle.sum(paddle.square(Y_pred - Y_true))
        return loss

class EqLoss(paddle.nn.Layer):

    def __init__(self,nn_fun,gamma):
        super().__init__()
        self.fun = nn_fun
        self.gamma = gamma

    def forward(self, X):
        x = X[:,0].reshape([X.shape[0],1])
        X = x
        u = self.fun(X)
        u_x = paddle.grad(u,x,create_graph=True,retain_graph=True)[0]
        eq = u_x - self.gamma*(1+np.pi/2*paddle.cos(np.pi/2*x)) # 1D equation, u' = gamma * (1 + pi/2*cos(pi/2*x))
        loss = paddle.sum(paddle.square(eq))
        return loss

In [None]:
# 生成训练数据点
X = np.linspace(0,10,100)
X = paddle.to_tensor(X,stop_gradient=False,dtype='float32')
X = X.reshape([100,1])
X_sup = np.linspace(0,5,10)
X_sup = paddle.to_tensor(X_sup,stop_gradient=True,dtype='float32')
X_sup = X_sup.reshape([10,1])
Y_sup = X_sup + paddle.sin(np.pi/2*X_sup)

# 调用网络
numIN = 1
numOUT = 1
numHidden = 48
numLayer = 3
nn_fun = MyNet(numIN,numOUT,numHidden,numLayer)

# 反问题未知参数
gamma = paddle.rand([1],dtype='float32')
gamma.stop_gradient = False

# 调用Loss
Loss1 = DataLoss(nn_fun)
Loss2 = EqLoss(nn_fun,gamma)

# 调用优化器
lr = 0.001
train_parameter = nn_fun.parameters() + [gamma]
optimizer = paddle.optimizer.Adam(learning_rate=lr,parameters=train_parameter)

In [None]:
# 训练网络
epoch = 1

while (epoch <= 50000):

    lossData = 100*Loss1(X_sup,Y_sup)
    lossData.backward(retain_graph=True)

    lossEQ = Loss2(X)
    lossEQ.backward(retain_graph=True)

    optimizer.step()
    optimizer.clear_grad()

    if epoch % 100 == 0:
        print('epoch:',epoch,'loss_data:',float(lossData),'loss_eq:',float(lossEQ),'gamma:',float(gamma))
    epoch += 1

In [None]:
# 输出结果

Y_pred = nn_fun(X)
Y_true = X + paddle.sin(np.pi/2*X)

In [None]:
# 预测值与真实值对比

plt.plot(Y_pred)
plt.plot(Y_true,marker='.')