In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import time
import scipy.io
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scipy.interpolate import griddata

import torch
from torch import nn
import torch.optim as optim
from torch.optim import lr_scheduler

import argparse
import random
import os
import math

#os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [3]:
# CUDA support
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [4]:
def seed_torch(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed) # 为了禁止hash随机化，使得实验可复现
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    random.seed(seed)

def grad(outputs, inputs):
    """ compute the derivative of outputs associated with inputs

    Params
    ======
    outputs: (N, 1) tensor
    inputs: (N, D) tensor
    """
    return torch.autograd.grad(outputs, inputs,
                               grad_outputs=torch.ones_like(outputs),
                               create_graph=True)

def activation(name):
    if name in ['tanh', 'Tanh']:
        return nn.Tanh()
    elif name in ['relu', 'ReLU']:
        return nn.ReLU(inplace=True)
    elif name in ['leaky_relu', 'LeakyReLU']:
        return nn.LeakyReLU(inplace=True)
    elif name in ['sigmoid', 'Sigmoid']:
        return nn.Sigmoid()
    elif name in ['softplus', 'Softplus']:
        return nn.Softplus()
    else:
        raise ValueError(f'unknown activation function: {name}')

In [5]:
seed_torch(24)

In [6]:
class MLP(nn.Module):
    """Deep Neural Network"""

    def __init__(self, L, M, dim_hidden, hidden_layers, dim_out,
                 act_name='tanh', init_name='xavier_normal'):
        super().__init__()
        
        dim_in = M * 2 + 2
        
        model = nn.Sequential()
        
        model.add_module('fc0', nn.Linear(dim_in, dim_hidden, bias=True))
        model.add_module('act0', activation(act_name))
        for i in range(1, hidden_layers):
            model.add_module(f'fc{i}', nn.Linear(dim_hidden, dim_hidden, bias=True))
            model.add_module(f'act{i}', activation(act_name))
        model.add_module(f'fc{hidden_layers}', nn.Linear(dim_hidden, dim_out, bias=True))
            
        self.model = model
        
        self.L = L
        self.M = M
        
        if init_name is not None:
            self.init_weight(init_name)

            
        self.k = nn.Parameter(torch.arange(1, self.M+1), requires_grad=False)
                    
    def init_weight(self, name):
        if name == 'xavier_normal':
            nn_init = nn.init.xavier_normal_
        elif name == 'xavier_uniform':
            nn_init = nn.init.xavier_uniform_
        elif name == 'kaiming_normal':
            nn_init = nn.init.kaiming_normal_
        elif name == 'kaiming_uniform':
            nn_init = nn.init.kaiming_uniform_
        else:
            raise ValueError(f'unknown initialization function: {name}')

        for param in self.parameters():
            if len(param.shape) > 1:
                nn_init(param)
                
#         for layer, param in enumerate(self.parameters()):
#             if layer % 2 == 1:
#                 nn.init.constant_(param, 0.0)
#             else:
#                 nn_init(param)
                
    def input_encoding(self, t, x):
        w = 2.0 * math.pi / self.L
        out = torch.cat([t, torch.ones_like(t), 
                            torch.cos(self.k * w * x), torch.sin(self.k * w * x)], dim = 1) 
        
        return out    
            
    def forward(self, H):
        t = H[:, 0:1]
        x = H[:, 1:2]
        
        H = self.input_encoding(t, x)
        H = self.model(H)
        
        return H
    
    def forward_test(self, x):
        print(f"{'input':<20}{str(x.shape):<40}")
        for name, module in self.model._modules.items():
            x = module(x)
            print(f"{name:<20}{str(x.shape):<40}")
        return x

    def model_size(self):
        n_params = 0
        for param in self.parameters():
            n_params += param.numel()
        return n_params
    
    def print(self):
        print(self.bias)

In [7]:
model = MLP(L=2.0, M=10, dim_hidden=128, hidden_layers=4, dim_out=1)

KDV方程：
$$
\left\{\begin{matrix}
&u_{t}+ \lambda_1 u u_{x} + \lambda_2 u_{xxx}=0  ,(t,x)\in (0,1)\times (-1,1),  \\
&u(0,x)= cos(\pi x),  \\
&u(t,-1)=u(t,1),t\in [0,1],  \\
&u_x(t,-1)=u_x(t,1),x\in [0,1].
\end{matrix}\right.
$$
$\lambda_1 = 1.000$, $\lambda_2 = 0.0025$.

## Options

In [17]:
import argparse
class Options_KDV(object):
    def __init__(self):
        parser = argparse.ArgumentParser()
        parser.add_argument('--no_cuda', action='store_true', default=False, help='disable CUDA or not')
        parser.add_argument('--dim_hidden', type=int, default=12, help='neurons in hidden layers')     # 10 9
        parser.add_argument('--hidden_layers', type=int, default=9, help='number of hidden layers')    # 4  20
        parser.add_argument('--lam', type=float, default=1, help='weight in loss function')
        parser.add_argument('--lr', type=float, default=0.001, help='initial learning rate')
        parser.add_argument('--epochs_Adam', type=int, default=300000, help='epochs for Adam optimizer')
        parser.add_argument('--epochs_LBFGS', type=int, default=2500, help='epochs for LBFGS optimizer')
        parser.add_argument('--newton_iter', type=int, default=100, help='newton_iter for LBFGS optimizer')
        parser.add_argument('--step_size', type=int, default=50000, help='step size in lr_scheduler for Adam optimizer')
        parser.add_argument('--gamma', type=float, default=0.9, help='gamma in lr_scheduler for Adam optimizer')
        parser.add_argument('--tol', type=float, default=100, help='the annealing scheme')
        parser.add_argument('--resume', type=bool, default=False, help='resume or not')
        parser.add_argument('--sample_method', type=str, default='uniform', help='sample method')

        self.parser = parser

    def parse(self):
        arg = self.parser.parse_args(args=[])
        arg.load_model = False
        arg.cuda = not arg.no_cuda and torch.cuda.is_available()
        # arg.cuda = False
        arg.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        print(arg.cuda)
        print(arg.device)
        return arg

args = Options_KDV().parse()
print(args.hidden_layers)

def save_model(state, is_best=None, save_dir=None):
    last_model = os.path.join(save_dir, 'last_model.pth.tar')
    torch.save(state, last_model)
    if is_best:
        best_model = os.path.join(save_dir, 'best_model.pth.tar')
        shutil.copyfile(last_model, best_model)

True
cuda
9


In [18]:
model.to(device)
args.model=model

## 数据集生成

In [19]:
class Trainset_KDV():
    '''
    训练点:时间剖分 100 段， 空间上分成25段，每段上取4个高斯点
    '''
    def __init__(self, *args):
        self.args = args
        self.shape = (self.args[0], self.args[1])
        
    def __call__(self):
        return self.data()
    
    def data(self):
        # 为了生成高斯积分点，n_x一定要是4的倍数
        gp = np.array([0.8611363116,-0.8611363116,0.3399810436,-0.3399810436])
        gc = np.array([0.3478548451,0.3478548451,0.6521451549,0.6521451549])
        n_t = self.args[0]
        n_x = self.args[1]
        n_ics = self.args[2]
        
        # 生成初值训练点
        t = np.linspace(0, 1, n_t)
        x = np.linspace(-1, 1, n_x)
        x, t = np.meshgrid(x, t)
        tx = np.hstack((t.reshape(-1,1), x.reshape(-1,1)))

        t_ics = np.zeros(n_ics)
        x_ics = np.linspace(-1, 1, n_ics)
        tx_ics = np.hstack([t_ics.reshape(-1,1),x_ics.reshape(-1,1)])
        
        u_ics = np.cos(math.pi*x_ics)
        u_ics = u_ics.reshape(-1,1)
        M = np.triu(np.ones([n_t, n_t]),k=1).T
        
        # 生成tgx(高斯训练点)
        num_cell = int(n_x/4 + 1)
        l = np.linspace(-1, 1, num_cell)[:, None]
        l = np.hstack([l[:-1], l[1:]])
        c = (l[:, 1] - l[:, 0])/2
        c = c[:, None]
        gp = gp[None, :]
        d = ((l[:, 1] + l[:, 0])/2)
        d = d[:, None]
        n_p = c * gp + d  # n_p是100个高斯积分点
        n_p = n_p.reshape(n_x, 1)
        gcl = c * gc     # gcl是这100个高斯积分点对应的权重  
        gcl = gcl.reshape(n_x, 1)
        t = np.linspace(0, 1, n_t)[:, None]
        x, t = np.meshgrid(n_p, t) #   mesh size (128, 100)
        print(x.shape)
        txg = np.hstack([t.reshape(-1, 1), x.reshape(-1, 1)])
        # 计算一下初始能量
        E2 =  np.cos(math.pi*n_p) ** 2 * gcl
        E2 = np.sum(E2)
        print(E2)
        
        tx = torch.from_numpy(tx).float().to(device)
        txg = torch.from_numpy(txg).float().to(device)
        
        tx_ics = torch.from_numpy(tx_ics).float().to(device)
        u_ics = torch.from_numpy(u_ics).float().to(device)
        M = torch.from_numpy(M).float().to(device)
        
        return tx, tx_ics, u_ics, M, gcl, txg, E2

In [20]:
# 构造一个预测网格
# load the data
data = scipy.io.loadmat('../data/KdV.mat')
usol = data['uu']

# Grid
t_star = data['tt'][0]
x_star = data['x'][0]

In [21]:
print(t_star.shape)
print(x_star.shape)
X_pred = np.hstack([np.ones((len(x_star),1)) * t_star[0 + 1], x_star.reshape(-1,1)])
pred = np.zeros((512, 201))

(201,)
(512,)


In [22]:
# 把初值填在第一列
pred[:, 0] = np.cos(math.pi * x_star)

In [23]:
nt = t_star.shape[0]
nx = 100
n_ics = 100

In [24]:
# tgx是高斯点的数据集， shape :(12800, 2) 第一列代表时间，第二列代表空间 ，前100个点表示t = 0 时刻上的采样点，第101个到200表示t = 1/nt 时刻的采样点， 依次类推
trainset = Trainset_KDV(nt, nx, n_ics)
args.trainset = trainset
tx, tx_ics, u_ics, M, gcl, txg, E2 = trainset()  # E2 是初始时刻的平方积分

(201, 100)
0.9999999999999998


## 训练

In [25]:
class Trainer_Wave(object):
    def __init__(self, args):
        self.model = args.model
        self.lr = args.lr
        self.gamma = args.gamma
        self.trainset = args.trainset
        self.step_size = args.step_size
        self.model_name = self.model.__class__.__name__     
        self.optimizer_Adam = optim.Adam(self.model.parameters(), lr=self.lr, betas=(0.9, 0.999))
        self.scheduler = lr_scheduler.ExponentialLR(self.optimizer_Adam, gamma=self.gamma)
        self.epochs_Adam = args.epochs_Adam
        self.tol = args.tol
        
        # data
        self.tx, self.tx_ics, self.u_ics, self.M,  self.gcl, self.txg, self.E2 = self.trainset()
        self.gcl = torch.from_numpy(self.gcl).float().to(device)
        self.E2 = torch.tensor(self.E2).float().to(device)
        self.nx = nx
        self.nt = nt
        
        # Logger
        self.loss_log = []
        self.loss_ics_log = []
        self.loss_res_log = []
        self.loss_energy_log = []
        self.epoch_log = []
        
        self.x_pred = x_star
        self.t_pred = t_star
        self.pred = pred  # (512, 201)

    def net_r(self, tx):
        tx.requires_grad_(True).to(device)
        u = self.model(tx)
        grad_u = grad(u, tx)[0]
        u_t = grad_u[:,[0]]
        u_x = grad_u[:,[1]]
        u_xx = grad(u_x, tx)[0][:, [1]]
        u_xxx = grad(u_xx, tx)[0][:, [1]]

        residual = u_t  + u * u_x + 0.0025 * u_xxx

        return residual

    def net_u(self, tx):
        u = self.model(tx)  
        return u
    
    def loss_ics(self, X, u_ics):
        u_pred = self.net_u(X)
        loss_ics = torch.mean((u_pred - u_ics)**2)
        return loss_ics
    
    def loss_res(self, X):
        r_pred = self.net_r(X)
        loss_r = torch.mean(r_pred**2)
        return loss_r
    
    def energy_loss(self, X):
        q_pred = self.net_u(X)
        num_t = int(len(X)/self.nx)
        q_pred = q_pred.reshape(num_t, 100)
        if self.gcl.shape != (1, 100):
            self.gcl = self.gcl.reshape(1, 100)
        q_pred = q_pred ** 2 * self.gcl
        q_pred = torch.sum(q_pred, axis=1)
        q_pred = (q_pred - self.E2)
        loss_q = torch.mean(q_pred ** 2)
        return loss_q
        
    def loss(self, X0, u_ics, X1):
        if X0[0, 0] == 0:
            L0 = 100 * self.loss_ics(X0, u_ics)
        else:
            L0 = self.loss_ics(X0, u_ics)
        loss = L0 + self.loss_res(X1) + 10 * self.energy_loss(X1)    
        return loss
    
    def predict(self, X):
        X = torch.tensor(X).float().to(device)
        pred_u = self.net_u(X)
        pred_u = pred_u.cpu().detach().numpy()
        return pred_u
    
    def train(self):
        start = time.time()
        for i in range(0, self.nt - 1):
            X_train = self.txg[i*100 : (i + 2)*100]
            X_init = X_train[:100]
            X_next = X_train[-100:]
            for epoch in range(self.epochs_Adam):
                self.optimizer_Adam.zero_grad()
                if i == 0:
                    loss_value = self.loss(self.tx_ics, self.u_ics, X_train)
                else:
                    loss_value = self.loss(X_init, u_ics_pred, X_train)
                if loss_value < 1e-6:
                    print(f'i :{i}' + f' Epoch # {epoch}/{self.epochs_Adam}' + f'loss:{loss_value:.2e}')
                    X_pred = np.hstack([np.ones((len(self.x_pred),1)) * self.t_pred[i + 1], self.x_pred.reshape(-1,1)])
                    U_pred = self.predict(X_pred)
                    self.pred[:, [i+1]] = U_pred
                    break
                else:
                    loss_value.backward()
                    self.optimizer_Adam.step()
                    if (epoch+1) % self.step_size == 0:
                        self.scheduler.step()            
                    if epoch % 1000 == 0:
                        if i == 0:
                            loss_ics_value = self.loss_ics(self.tx_ics, self.u_ics)
                        else:
                            loss_ics_value = self.loss_ics(X_init, u_ics_pred)
                        loss_res_value = self.loss_res(X_train)
                        loss_energy = self.energy_loss(X_train)

                        self.loss_log.append(loss_value.detach().cpu())
                        self.loss_ics_log.append(loss_ics_value.detach().cpu())
                        self.loss_res_log.append(loss_res_value.detach().cpu())
                        self.loss_energy_log.append(loss_energy.detach().cpu())
                        self.epoch_log.append(epoch)

                        end = time.time()
                        running_time = end - start
                        start = time.time()

                        print(f'Epoch #  {epoch}/{self.epochs_Adam}' + f'    time:{running_time:.2f}' + '\n' + \
                              f'loss:{loss_value:.2e}, loss_ics:{loss_ics_value:.2e}, loss_res:{loss_res_value:.2e}, loss_energy:{loss_energy:.2e}')
            u_ics_pred = self.predict(X_next)
            u_ics_pred = torch.tensor(u_ics_pred).float().to(device)
            #scheduler = lr_scheduler.ExponentialLR(self.optimizer_Adam, gamma=0.9)

In [26]:
trainer = Trainer_Wave(args)

(201, 100)
0.9999999999999998


In [None]:
trainer.train()

Epoch #  0/300000    time:0.58
loss:2.58e+02, loss_ics:5.80e-01, loss_res:3.92e+01, loss_energy:7.36e-01
Epoch #  1000/300000    time:27.46
loss:8.77e-03, loss_ics:7.54e-05, loss_res:1.22e-03, loss_energy:2.56e-07
Epoch #  2000/300000    time:30.74
loss:1.24e-02, loss_ics:4.08e-05, loss_res:8.73e-03, loss_energy:3.34e-06
Epoch #  3000/300000    time:33.37
loss:3.54e-03, loss_ics:2.86e-05, loss_res:8.42e-04, loss_energy:2.11e-07


In [None]:
file = f'{args.epochs_Adam}epoch,{args.step_size}step_size,{args.tol}tol,E2'
if os.path.exists(file)==False:
    os.mkdir(file)

In [None]:
plt.plot(trainer.epoch_log, trainer.loss_ics_log, label='loss_ic')
plt.plot(trainer.epoch_log, trainer.loss_res_log, label='loss_r')
plt.yscale('log')
plt.legend()
plt.savefig(f'{file}/loss.jpg')

In [None]:
# load the data
data = scipy.io.loadmat('data/KdV.mat')
usol = data['uu']

# Grid
t_star = data['tt'][0]
x_star = data['x'][0]

In [None]:
print(t_star.shape)
print(x_star.shape)
usol.shape

In [None]:
# load the data
data = scipy.io.loadmat('data/KdV.mat')
usol = data['uu']

# Grid
t_star = data['tt'][0]
x_star = data['x'][0]
TT, XX = np.meshgrid(t_star, x_star)

# Reference solution
plt.pcolor(TT, XX, usol, cmap='jet')
plt.colorbar()
plt.xlabel('$t$')
plt.ylabel('$x$')

In [None]:
# Get trained network parameters
TX = np.hstack((TT.reshape(-1,1), XX.reshape(-1,1)))
TX = torch.from_numpy(TX).double()

model = trainer.model.cpu().double()
u_pred = model(TX).detach().numpy()
u_pred = u_pred.reshape(TT.shape)

error = np.linalg.norm(u_pred - usol) / np.linalg.norm(usol) 
print('Relative l2 error: {:.3e}'.format(error))

In [None]:
fig = plt.figure(figsize=(18, 5))
plt.subplot(1, 3, 1)
plt.pcolor(TT, XX, usol, cmap='jet')
plt.colorbar()
plt.xlabel('$t$')
plt.ylabel('$x$')
plt.title(r'Exact $u(x)$')
plt.tight_layout()

plt.subplot(1, 3, 2)
plt.pcolor(TT, XX, u_pred, cmap='jet')
plt.colorbar()
plt.xlabel('$t$')
plt.ylabel('$x$')
plt.title(r'Predicted $u(x)$')
plt.tight_layout()

plt.subplot(1, 3, 3)
plt.pcolor(TT, XX, np.abs(usol - u_pred), cmap='jet')
plt.colorbar()
plt.xlabel('$t$')
plt.ylabel('$x$')
plt.title('Absolute error')
plt.tight_layout()
plt.savefig(f'{file}/predict.jpg')
plt.show()

In [None]:
fig = plt.figure(figsize=(13, 4))
plt.subplot(1, 3, 1)
plt.plot(x_star, usol[:,0], color='blue')
plt.plot(x_star, u_pred[:,0], '--', color='red')
plt.xlabel('$x$')
plt.ylabel('$u(t, x)$')
plt.title('$t = 0$')
plt.tight_layout()

plt.subplot(1, 3, 2)
plt.plot(x_star, usol[:,25], color='blue')
plt.plot(x_star, u_pred[:,25], '--', color='red')
plt.xlabel('$x$')
plt.ylabel('$u(t, x)$')
plt.title('$t = 0.5$')
plt.tight_layout()

plt.subplot(1, 3, 3)
plt.plot(x_star, usol[:,-1], color='blue')
plt.plot(x_star, u_pred[:,-1], '--', color='red')
plt.xlabel('$x$')
plt.ylabel('$u(t, x)$')
plt.title('$t = 1.0$')
plt.tight_layout()
plt.savefig(f'{file}/check_t.jpg')
plt.show()