In [33]:
import sys
sys.path.append('E:/Report 12 Some Try')

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm import *
import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import gridspec
import random
import scipy.io

from Networks.DNN import PINN
from Networks.IA_DNN import Attention_PINN
from Networks.Fourier_DNN import Fourier_PINN
from Networks.AF_PINN import AF_PINN
from Networks.Efficient_KAN import KAN
from Networks.Cheby_KAN import ChebyKAN
from DataGenerator import DataGenerator

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

def seed_torch(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
seed_torch(42)

Using device: cuda:0


In [34]:
def gradients(u, x, order = 1):
    if order == 1:
        return torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u),
                                        create_graph=True,
                                        only_inputs=True)[0].to(device)
    else:
        return gradients(gradients(u, x), x, order= order - 1)

In [35]:
geom = [-1.0, 1.0]
time = [0.0, 1.0]

In [36]:
def compute_pde(model,coords):
    coords = coords.clone().detach().requires_grad_(True)

    u = model(coords)[:,0]

    u_t = gradients(u, coords)[:,1]
    u_x = gradients(u, coords)[:,0]

    u_xx = gradients(u,coords,order=2)[:,0]

    equation = u_t + u * u_x - 0.01 / np.pi * u_xx

    return torch.mean(equation ** 2).to(device)

def compute_bc(model,coords):
    coords = coords.clone().detach().requires_grad_(True)

    u = model(coords)[:,0]

    return torch.mean(u ** 2).to(device)

def compute_ic(model,coords):
    coords = coords.clone().detach().requires_grad_(True)
    x = coords[:,0]
    u = model(coords)[:,0]

    equqtion = u + torch.sin(np.pi * x) 

    return torch.mean(equqtion ** 2).to(device)


In [37]:
loss_record = {}
error_record = {}
activation = torch.nn.functional.tanh

def generate_data(N: list):
    coords_pde = DataGenerator(geom, time,'pde', seed= 42).LHS_generator(N[0],points_type='domain')
    left_bc = DataGenerator(geom, time,'bc1', seed= 42).LHS_generator(N[1],points_type='left_bc')
    right_bc = DataGenerator(geom, time,'bc2', seed= 42).LHS_generator(N[1],points_type='right_bc')
    coords_bc = torch.cat((left_bc, right_bc),dim= 0)
    coords_ic = DataGenerator(geom, time,'ic', seed= 42).LHS_generator(N[2],points_type='initial')

    return coords_pde,coords_bc,coords_ic



# coords_pde = DataGenerator(geom, time,'pde', seed= 42).grid_generator(14400,points_type='domain')
# left_bc = DataGenerator(geom, time,'bc1', seed= 42).grid_generator(1500,points_type='left_bc')
# right_bc = DataGenerator(geom, time,'bc2', seed= 42).grid_generator(1500,points_type='right_bc')
# coords_bc = torch.cat((left_bc, right_bc),dim= 0)
# coords_ic = DataGenerator(geom, time,'ic', seed= 42).grid_generator(2500,points_type='initial')

coords_pde,coords_bc,coords_ic = generate_data([14400, 3000, 3000])

for mode in ['KAN']:  ##'KAN', 'ChebyKAN'

    print("Training {}:".format(mode))
    layer = [2] + [128] * 3 + [1]

    torch.cuda.empty_cache()
    if mode == 'PINN':
        model = PINN(layer,activation, non_negative=False).to(device)
        num_epochs = 10000

    elif mode == 'IA_PINN':
        model = Attention_PINN(layer,activation, non_negative=False).to(device)
        num_epochs = 10000

    elif mode == 'Fourier_PINN':
        model = Fourier_PINN(layer,activation, non_negative=False, use_rff=True, rff_num_features=128, rff_sigma=1.0).to(device)
        num_epochs = 10000

    elif mode == 'AF_PINN':
        model = AF_PINN(layer,activation, non_negative=False, use_rff=True, rff_num_features=128, rff_sigma=1.0).to(device)
        num_epochs = 10000

    elif mode == 'KAN':
        layer = [2] + [32] * 3 + [1]
        model = KAN(layer, modified_output=False,
                    grid_size=5,
                    base_activation=nn.Tanh).to(device)
        num_epochs = 2000
    elif mode == 'ChebyKAN':
        layer = [2] + [64] * 3 + [1]
        model = ChebyKAN(layer, degree=3, non_negative=False, base_activation=nn.Tanh,
                         use_layer_norm=True).to(device)
        num_epochs = 10000
    else:
        print("Man! What can I say! Wrong Mode!!!")
        break

    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs,
                                                     eta_min=1e-6)
    
    
    loss_record[mode] = []
    error_record[mode] = []
    pbar = tqdm(range(num_epochs))
    for epoch in pbar:
        
        model.train()
        optimizer.zero_grad()
        
        loss_pde = compute_pde(model,coords_pde)
        loss_bc = compute_bc(model,coords_bc)
        loss_ic = compute_ic(model,coords_ic)
        
        loss = loss_pde + loss_bc + loss_ic

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0, norm_type= 2)
        optimizer.step()
        scheduler.step()

        shuffled_tensors = [tensor[torch.randperm(tensor.size(0))] for tensor in [coords_pde, coords_bc, coords_ic]]

        # 解包到四个变量中
        coords_pde, coords_bc, coords_ic = shuffled_tensors

        if torch.isnan(loss):
            print(f'Man What Can I say? {mode} out!!!')
            break

        # Save the loss for plotting
        loss_record[mode].append(loss.item())
        current_lr = optimizer.param_groups[0]['lr']

        if epoch % 20 == 0:
            pbar.set_postfix({
                'Loss': '{0:.3e}'.format(loss.item()),
                'PDE': '{0:.3e}'.format(loss_pde.item()),
                'BC1': '{0:.3e}'.format(loss_bc.item()),
                'IC': '{0:.3e}'.format(loss_ic.item()),
                'lr':'{0:.3e}'.format(current_lr)
            }) 
        
        model.eval()        
        # 假设 model 已经定义并且可以接受输入进行预测
        t = np.linspace(0, 1, 100)
        x = np.linspace(-1, 1, 256)
        ms_x, ms_t = np.meshgrid(x, t)  # 创建网格
        # 将网格扁平化以适应模型输入
        X_star = np.hstack((ms_x.flatten()[:, None], ms_t.flatten()[:, None]))
        pt_x = torch.from_numpy(X_star[:, 0:1]).float().requires_grad_(True).to(device)
        pt_t = torch.from_numpy(X_star[:, 1:2]).float().requires_grad_(True).to(device)
        pt_u0 = model(torch.cat([pt_x, pt_t], 1))
        u = pt_u0.data.cpu().numpy()
        # 重塑 u 以匹配 ms_x 和 ms_t 的形状
        net = u.reshape(ms_t.shape)
        data = scipy.io.loadmat('burgers_shock.mat')
        exact_sol = np.real(data['usol']).T
        relative_R2 = np.linalg.norm(net - exact_sol , 2)/np.linalg.norm(exact_sol, 2)
        error_record[mode].append(relative_R2)



    model.eval()        
    # 假设 model 已经定义并且可以接受输入进行预测

    t = np.linspace(0, 1, 100)
    x = np.linspace(-1, 1, 256)

    ms_x, ms_t = np.meshgrid(x, t)  # 创建网格

    # 将网格扁平化以适应模型输入
    X_star = np.hstack((ms_x.flatten()[:, None], ms_t.flatten()[:, None]))

    pt_x = torch.from_numpy(X_star[:, 0:1]).float().requires_grad_(True).to(device)
    pt_t = torch.from_numpy(X_star[:, 1:2]).float().requires_grad_(True).to(device)
    pt_u0 = model(torch.cat([pt_x, pt_t], 1))
    u = pt_u0.data.cpu().numpy()

    # 重塑 u 以匹配 ms_x 和 ms_t 的形状
    net = u.reshape(ms_t.shape)

    fig = plt.figure(figsize=(24, 6))
    gs = gridspec.GridSpec(1,3,width_ratios=[1, 1, 1])  # 最右边的列较窄
    
    # PIC 1
    ax_net = fig.add_subplot(gs[0], projection='3d')
    surf_net = ax_net.plot_surface(ms_x, ms_t, net, cmap=cm.RdYlBu_r, edgecolor='none', linewidth=0.0003, antialiased=True)
    ax_net.set_title(f'Prediction: {mode}x{layer}')
    ax_net.set_xlabel('Space')
    ax_net.set_ylabel('Time')
    ax_net.set_zlabel('Value')
    ax_net.set_zlim([-1, 1])

    ## True solution
    data = scipy.io.loadmat('burgers_shock.mat')
    exact_sol = np.real(data['usol']).T
    # PIC 2
    ax_fdm = fig.add_subplot(gs[1], projection='3d')
    surf_fdm = ax_fdm.plot_surface(ms_x, ms_t, exact_sol, cmap=cm.RdYlBu_r, edgecolor='none', linewidth=0.0003, antialiased=True)
    ax_fdm.set_title(f'Exact Solution')
    ax_fdm.set_xlabel('Space')
    ax_fdm.set_ylabel('Time')
    ax_fdm.set_zlabel('Value')
    ax_fdm.set_zlim([-1, 1])

    # PIC 3
    abs_error = np.abs(net - exact_sol)
    relative_R2 = np.linalg.norm(net - exact_sol , 2)/np.linalg.norm(exact_sol, 2)

    ax_abs = fig.add_subplot(gs[2])
    cax_abs = ax_abs.imshow(abs_error, extent=(x.min(), x.max(), t.min(), t.max()), cmap=cm.RdYlBu_r, aspect='auto')
    ax_abs.set_title(f'Absulute Error')
    ax_abs.set_xlabel('Space')
    ax_abs.set_ylabel('Time')

    fig.colorbar(surf_net, ax=ax_net, shrink=0.6, aspect=10)
    fig.colorbar(surf_fdm, ax=ax_fdm, shrink=0.6, aspect=10)
    fig.colorbar(cax_abs, ax=ax_abs, shrink=0.6, aspect=10)

    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'{mode}_results.pdf', format="pdf", dpi=600, bbox_inches='tight')
    plt.close()
    
    print(f'R2 Error: {relative_R2}')



Training KAN:


100%|██████████| 2000/2000 [08:00<00:00,  4.16it/s, Loss=7.172e-03, PDE=3.657e-03, BC1=1.253e-06, IC=3.513e-03, lr=1.222e-06]


R2 Error: 0.10000016697384916


In [38]:
# Loss Curve
def plot_loss_curves(loss_record):
    """
    绘制多个损失曲线

    Args:
        loss_record: 字典，键为损失名称，值为损失值列表
    """
    # 创建图表
    plt.figure(figsize=(10, 8))

    # 颜色映射，可以根据需要添加更多颜色
    colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b']
    linestyles = ['-', '-.', '--']

    # 为每个损失创建平滑的曲线
    for idx, (key, values) in enumerate(loss_record.items()):
        # 获取x轴值（迭代次数）
        x = np.arange(len(values))

        # 选择颜色，如果颜色用完就循环使用
        color = colors[idx % len(colors)]
        linestyle = linestyles[idx % len(linestyles)]

        # 绘制原始数据点（小点）
        plt.plot(x, values, label=key, color=color, linestyle=linestyle)

    # 设置图表样式
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.xlabel('Iterations', fontsize=12)
    plt.ylabel('Loss', fontsize=12)
    plt.yscale("log")
    plt.title('Training Loss Curves', fontsize=14)

    # 添加图例
    plt.legend(loc='upper right')

    # 调整布局以确保图例完全可见
    plt.tight_layout()

    return plt


fig = plot_loss_curves(loss_record)
plt.savefig(f'loss_curve.pdf', format="pdf", dpi=300, bbox_inches='tight')
plt.close()

In [39]:
# Loss Curve
def plot_error_curves(error_record):
    """
    绘制多个损失曲线

    Args:
        error_record: 字典，键为损失名称，值为损失值列表
    """
    # 创建图表
    plt.figure(figsize=(10, 8))

    # 颜色映射，可以根据需要添加更多颜色
    colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b']
    linestyles = ['-', '-.', '--']

    # 为每个损失创建平滑的曲线
    for idx, (key, values) in enumerate(error_record.items()):
        # 获取x轴值（迭代次数）
        x = np.arange(len(values))

        # 选择颜色，如果颜色用完就循环使用
        color = colors[idx % len(colors)]
        linestyle = linestyles[idx % len(linestyles)]

        # 绘制原始数据点（小点）
        plt.plot(x, values, label=key, color=color, linestyle=linestyle)

    # 设置图表样式
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.xlabel('Iterations', fontsize=12)
    plt.ylabel('Loss', fontsize=12)
    plt.yscale("log")
    plt.title('Relative Error $R^2$', fontsize=14)

    # 添加图例
    plt.legend(loc='upper right')

    # 调整布局以确保图例完全可见
    plt.tight_layout()

    return plt


fig = plot_error_curves(error_record)
plt.savefig(f'relative_error_curve.pdf', format="pdf", dpi=300, bbox_inches='tight')
plt.close()