In [58]:
import torch
import torch.nn as nn
import torch.optim as optim
import gpytorch
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
import torch
import torch.nn as nn
import torch.optim as optim
from scipy.stats import qmc

In [59]:
# Set device (use GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the Berger Viscous Equation parameters
c = 1.0    # Wave speed
mu = 0.1   # Viscosity coefficient
lam = 1.0  # Nonlinearity coefficient

# Define the neural network
class PINN(nn.Module):
    def __init__(self, layers):
        super(PINN, self).__init__()
        self.net = nn.Sequential(*[
            nn.Sequential(nn.Linear(layers[i], layers[i+1]), nn.Tanh())
            for i in range(len(layers)-2)
        ] + [nn.Linear(layers[-2], layers[-1])])

    def forward(self, x, t):
        inputs = torch.cat((x, t), dim=1)
        return self.net(inputs)

# Generate collocation points
def generate_collocation_points(N_f, L=1.0, T=1.0):
    # Use Latin Hypercube Sampling for better distribution
    sampler = qmc.LatinHypercube(d=2)  # 2D (x, t) space
    sample = sampler.random(N_f)  # Generate N_f samples in [0, 1]^2

    # Scale samples to [0, L] and [0, T]
    x_f = torch.tensor(sample[:, 0] * L, dtype=torch.float32).reshape(-1, 1)
    t_f = torch.tensor(sample[:, 1] * T, dtype=torch.float32).reshape(-1, 1)

    return x_f.to(device), t_f.to(device)

# Compute PDE residual using automatic differentiation
def compute_pde_residual(model, x, t):
    x.requires_grad = True
    t.requires_grad = True

    u = model(x, t)  # Predict u(x, t)
    
    u_t = torch.autograd.grad(u, t, torch.ones_like(u), create_graph=True)[0]
    u_tt = torch.autograd.grad(u_t, t, torch.ones_like(u), create_graph=True)[0]
    
    u_x = torch.autograd.grad(u, x, torch.ones_like(u), create_graph=True)[0]
    u_xx = torch.autograd.grad(u_x, x, torch.ones_like(u), create_graph=True)[0]

    f = u_tt - c**2 * u_xx - mu * u_xx + lam * u * u_xx  # PDE residual
    return f

# Define loss function
def loss_function(model, x_f, t_f, x_bc, t_bc, u_bc):
    f_residual = compute_pde_residual(model, x_f, t_f)
    loss_pde = torch.mean(f_residual**2)

    u_pred_bc = model(x_bc, t_bc)
    loss_bc = torch.mean((u_pred_bc - u_bc)**2)

    return loss_pde + loss_bc

# Training loop
def train(model, optimizer, x_f, t_f, x_bc, t_bc, u_bc, epochs=5000):
    for epoch in range(epochs):
        optimizer.zero_grad()
        loss = loss_function(model, x_f, t_f, x_bc, t_bc, u_bc)
        loss.backward()
        optimizer.step()

        if epoch % 500 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")

In [60]:
# Importance sampling: Generate collocation points based on residuals
def gen_points_import(model, N_f, L=1.0, T=1.0):
    x_f = torch.rand(N_f, 1, device=device, requires_grad=True) * L  # x in [0, L]
    t_f = torch.rand(N_f, 1, device=device, requires_grad=True) * T  # t in [0, T]
    
    if model is not None:  # Perform importance sampling
        residuals = compute_pde_residual(model, x_f, t_f).detach()
        probabilities = residuals.abs() / torch.sum(residuals.abs())  # Normalize to form a probability distribution
        sampled_indices = torch.multinomial(probabilities.view(-1), N_f, replacement=True)
        x_f, t_f = x_f[sampled_indices], t_f[sampled_indices]
    
    return x_f, t_f

def train_import(model, optimizer, N_f, x_f,t_f,x_bc, t_bc, u_bc, epochs=10000, resample_every=500):
    for epoch in range(epochs):
        if epoch % resample_every == 0 and epoch >=500:
            x_f, t_f = gen_points_import(model, N_f)
        
        optimizer.zero_grad()
        loss = loss_function(model, x_f, t_f, x_bc, t_bc, u_bc)
        loss.backward()
        optimizer.step()

        if epoch % 500 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")

In [77]:

# Gaussian Process Model for Importance Sampling
class ResidualGP(gpytorch.models.ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(ResidualGP, self).__init__(train_x, train_y, likelihood)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

# Importance sampling using Gaussian Process
def generate_collocation_points_with_gp(model, N_f, L=1.0, T=1.0):
    x_f = torch.rand(N_f, 1, device=device, requires_grad=True) * L  # x in [0, L]
    t_f = torch.rand(N_f, 1, device=device, requires_grad=True) * T  # t in [0, T]
    
    if model is not None:
        residuals = compute_pde_residual(model, x_f, t_f).detach()  # Detach from computational graph
        residuals = residuals.view(-1)  # Ensure residuals are 1D for GP

        # Train Gaussian Process to model residuals
        likelihood = gpytorch.likelihoods.GaussianLikelihood()
        gp_model = ResidualGP(torch.cat([x_f, t_f], dim=1), residuals, likelihood).to(device)

        gp_model.train()
        likelihood.train()
        optimizer = torch.optim.Adam(gp_model.parameters(), lr=0.001)
        mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, gp_model)

        for _ in range(50):  # GP training (keep small for efficiency)
            optimizer.zero_grad()
            output = gp_model(torch.cat([x_f, t_f], dim=1))
            loss = -mll(output, residuals)
            loss.mean().backward(retain_graph=True)  # Ensure backward works properly
            optimizer.step()

        # Predict residual mean and variance
        gp_model.eval()
        likelihood.eval()
        with torch.no_grad():
            test_x = torch.cat([x_f, t_f], dim=1)
            gp_prediction = gp_model(test_x)  # Directly use GP output
            residual_mean = gp_prediction.mean  # Mean prediction
            residual_std = gp_prediction.variance.sqrt()  # Correct way to compute uncertainty

        # Define importance score = |mean residual| + α * uncertainty
        alpha = 0.9  # Trade-off between exploitation and exploration
        importance_score = residual_mean.abs() + alpha * residual_std
        probabilities = importance_score / importance_score.sum()  # Normalize
        
        # Resample points based on importance scores
        sampled_indices = torch.multinomial(probabilities.view(-1), N_f, replacement=True)
        x_f, t_f = x_f[sampled_indices], t_f[sampled_indices]

    return x_f, t_f

In [78]:
def train_GP(model, optimizer, N_f, x_f,t_f,x_bc, t_bc, u_bc, epochs=5000, resample_every=500):
    for epoch in range(epochs):
        if epoch % resample_every == 0 and epoch >=500:
            x_f, t_f = gen_points_import(model, N_f)
        
        optimizer.zero_grad()
        loss = loss_function(model, x_f, t_f, x_bc, t_bc, u_bc)
        loss.backward()
        optimizer.step()

        if epoch % 500 == 0:
            # x_f,t_f = fit_GP(x_f,t_f)
            print(f"Epoch {epoch}, Loss: {loss.item()}")
            x_uncertain, t_uncertain = generate_collocation_points_with_gp(model,N_f,x_f, t_f) 
            x_f = x_uncertain
            t_f = t_uncertain

In [79]:
def compute_pde_residual(model, x, t):
    x = x.clone().detach().requires_grad_(True)
    t = t.clone().detach().requires_grad_(True)

    u = model(x, t)  # Predict u(x, t)
    
    u_t = torch.autograd.grad(u, t, torch.ones_like(u), create_graph=True, retain_graph=True)[0]
    u_tt = torch.autograd.grad(u_t, t, torch.ones_like(u), create_graph=True, retain_graph=True)[0]
    
    u_x = torch.autograd.grad(u, x, torch.ones_like(u), create_graph=True, retain_graph=True)[0]
    u_xx = torch.autograd.grad(u_x, x, torch.ones_like(u), create_graph=True, retain_graph=True)[0]

    f = u_tt - c**2 * u_xx - mu * u_xx + lam * u * u_xx  # PDE residual
    return f

In [80]:
#different initial/boundary conditions
def initial_condition(x, condition_type="sin"):
    if condition_type == "sin":
        return torch.sin(np.pi * x.cpu()).to(device)
    elif condition_type == "gaussian":
        return torch.exp(-10 * (x - 0.5) ** 2).to(device)
    elif condition_type == "step":
        return torch.where(x < 0.5, torch.tensor(1.0, device=device), torch.tensor(0.0, device=device))
    else:
        raise ValueError("Unknown initial condition type")

def boundary_condition(x, t, boundary_type="dirichlet"):
    if boundary_type == "dirichlet":
        return torch.zeros_like(x).to(device)  # u(0,t) = 0, u(L,t) = 0
    elif boundary_type == "neumann":
        return torch.autograd.grad(model(x, t), x, torch.ones_like(x), create_graph=True)[0]
    elif boundary_type == "periodic":
        return model(x, t) - model(x + L, t)  # u(0,t) = u(L,t)
    else:
        raise ValueError("Unknown boundary condition type")


In [81]:
# Testing multiple conditions
layers = [2, 50, 50, 50, 1]  # Input (x,t) -> Hidden layers -> Output (u)
model = PINN(layers).to(device)
N_f = 1000  # Collocation points
L, T = 1.0, 1.0  # Spatial and time limits
N_bc = 100
epochs = 5000
x_f, t_f = generate_collocation_points(N_f, L, T)
initial_conditions = ["sin", "gaussian", "step"]
boundary_conditions = ["dirichlet", "neumann", "periodic"]


#validation
x_val, t_val = generate_collocation_points(10000,L,T)
x_bc = torch.linspace(0, L, N_bc).view(-1, 1).to(device)

results_base = []
results_import = []
results_gauss = []
# loss_function(model, x_f, t_f, x_bc, t_bc, u_bc).item()
for ic in initial_conditions:
    for bc in boundary_conditions:
        print(f"Training with IC: {ic}, BC: {bc}")

        # Generate boundary and initial condition data
        x_bc = torch.linspace(0, L, N_bc).view(-1, 1).to(device)
        t_bc = torch.zeros_like(x_bc).to(device)
        u_bc = initial_condition(x_bc, ic)

        # Train the model
        model_base = PINN(layers).to(device)
        optimizer = optim.Adam(model_base.parameters(), lr=1e-3)
        train(model_base, optimizer, x_f, t_f, x_bc, t_bc, u_bc, epochs=epochs)

        
        t_val_bc = torch.zeros_like(x_bc).to(device)
        u_val_bc = initial_condition(x_bc, ic)
        loss_val = loss_function(model_base, x_val, t_val, x_bc, t_val_bc, u_val_bc).item()

        print("Validation loss base:",loss_val)
        results_base.append([ic,bc,loss_val])
        
        model_import = PINN(layers).to(device)
        optimizer = optim.Adam(model_import.parameters(), lr=1e-3)
        train_import(model_import, optimizer, N_f, x_f,t_f,x_bc, t_bc, u_bc, epochs=epochs, resample_every=100)

        t_val_bc = torch.zeros_like(x_bc).to(device)
        u_val_bc = initial_condition(x_bc, ic)
        loss_val = loss_function(model_import, x_val, t_val, x_bc, t_val_bc, u_val_bc).item()

        print("Validation loss import:",loss_val)
        results_import.append([ic,bc,loss_val])

        model_GP = PINN(layers).to(device)
        optimizer = optim.Adam(model_GP.parameters(), lr=1e-3)
        train_GP(model_import, optimizer, N_f, x_f,t_f,x_bc, t_bc, u_bc, epochs=epochs, resample_every=100)

        t_val_bc = torch.zeros_like(x_bc).to(device)
        u_val_bc = initial_condition(x_bc, ic)
        loss_val = loss_function(model_GP, x_val, t_val, x_bc, t_val_bc, u_val_bc).item()

        print("Validation loss gauss:",loss_val)
        results_gauss.append([ic,bc,loss_val])
        # (Optional) Save results for later comparison
        # np.save(f"results_{ic}_{bc}.npy", u_pred)


Training with IC: sin, BC: dirichlet
Epoch 0, Loss: 0.6114212274551392
Epoch 500, Loss: 0.008377736434340477
Epoch 1000, Loss: 0.0015274215256795287
Epoch 1500, Loss: 0.0005380272632464767
Epoch 2000, Loss: 0.004294271115213633
Epoch 2500, Loss: 0.00013752855011262
Epoch 3000, Loss: 9.729313023854047e-05
Epoch 3500, Loss: 7.361101597780362e-05
Epoch 4000, Loss: 5.8993955462938175e-05
Epoch 4500, Loss: 4.8845584387890995e-05
Validation loss base: 0.000504104420542717
Epoch 0, Loss: 0.5037978291511536
Epoch 500, Loss: 0.031285036355257034
Epoch 1000, Loss: 0.005235261283814907
Epoch 1500, Loss: 0.005714646074920893
Epoch 2000, Loss: 0.0015815077349543571
Epoch 2500, Loss: 0.0008865875424817204
Epoch 3000, Loss: 0.0012352521298453212
Epoch 3500, Loss: 0.0004845723742619157
Epoch 4000, Loss: 0.0004020536143798381
Epoch 4500, Loss: 0.0014945417642593384
Validation loss import: 9.415707609150559e-05
Epoch 0, Loss: 0.00010406879300717264
Epoch 500, Loss: 0.000168681115610525
Epoch 1000, Loss:

KeyboardInterrupt: 

In [None]:
for result in results_base:
    print(result[0],result[1],"loss",result[2])

In [None]:
for result in results_import:
    print(result[0],result[1],"loss",result[2])

In [None]:
for result in results_gauss:
    print(result[0],result[1],"loss",result[2])