In [1]:
import torch
import os
import sys
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, TensorDataset
import random
from torch.optim import LBFGS, Adam
from tqdm import tqdm
import wandb

# make sure that util is correctly accessed from parent directory
ppp_dir = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
if ppp_dir not in sys.path:
    sys.path.insert(0, ppp_dir)

from util import *
from model.pinn import PINNs
from model.pinnsformer import PINNsformer, PINNsformer_params

In [2]:
# Check if CUDA is available
cuda_available = torch.cuda.is_available()
print(f"CUDA available: {cuda_available}")

# If CUDA is available, print the CUDA version
if cuda_available:
    print(f"CUDA version: {torch.version.cuda}")
    print(f"Number of CUDA devices: {torch.cuda.device_count()}")
    print(f"Current CUDA device: {torch.cuda.current_device()}")
    print(f"CUDA device name: {torch.cuda.get_device_name(torch.cuda.current_device())}")

CUDA available: True
CUDA version: 11.8
Number of CUDA devices: 1
Current CUDA device: 0
CUDA device name: NVIDIA GeForce GTX 1650 Ti


In [3]:
# set seeds and configure cuda device
seed = 0
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

device = 'cuda:0'

In [4]:
class PDEData(Dataset):
    def __init__(self, x_range, t_range, rho_values, x_points, t_points):
        """
        Initialize the dataset for PDE data with multiple rho values.

        Args:
            x_range (list): Spatial domain [x_min, x_max].
            t_range (list): Temporal domain [t_min, t_max].
            rho_values (list): List of rho values for different scenarios.
            x_points (int): Number of points in the spatial domain.
            t_points (int): Number of points in the temporal domain.
            device (str): Device to store the tensors ('cpu' or 'cuda').
        """
        self.device = "cuda:0"
        self.x_range = x_range
        self.t_range = t_range
        self.rho_values = rho_values  # Store multiple rho values
        self.x_points = x_points
        self.t_points = t_points
        
        # Generate the data for all rho values
        self.data = {}
        for rho in rho_values:
            res, b_left, b_right, b_upper, b_lower = self._generate_data()
            
            # Convert boundary points to PyTorch tensors
            b_left = torch.tensor(b_left, dtype=torch.float32, requires_grad=True).to(self.device)
            b_right = torch.tensor(b_right, dtype=torch.float32, requires_grad=True).to(self.device)
            b_upper = torch.tensor(b_upper, dtype=torch.float32, requires_grad=True).to(self.device)
            b_lower = torch.tensor(b_lower, dtype=torch.float32, requires_grad=True).to(self.device)
            
            self.data[rho] = {
                'res': torch.tensor(res, dtype=torch.float32, requires_grad=True).to(self.device),
                'b_left': b_left,
                'b_right': b_right,
                'b_upper': b_upper,
                'b_lower': b_lower,
                # Precompute analytical solutions for boundary points
                'u_left': self.analytical_solution(b_left[:, 0:1], b_left[:, 1:2], rho),
                'u_right': self.analytical_solution(b_right[:, 0:1], b_right[:, 1:2], rho),
                'u_upper': self.analytical_solution(b_upper[:, 0:1], b_upper[:, 1:2], rho),
                'u_lower': self.analytical_solution(b_lower[:, 0:1], b_lower[:, 1:2], rho),
            }
    
    def _generate_data(self):
        """
        Generate the interior and boundary points for the PDE.

        Returns:
            res (np.ndarray): Interior points.
            b_left, b_right, b_upper, b_lower (np.ndarray): Boundary points.
        """
        x = np.linspace(self.x_range[0], self.x_range[1], self.x_points)
        t = np.linspace(self.t_range[0], self.t_range[1], self.t_points)
        
        x_mesh, t_mesh = np.meshgrid(x, t)
        data = np.concatenate((np.expand_dims(x_mesh, -1), np.expand_dims(t_mesh, -1)), axis=-1)
        
        b_left = data[0, :, :] 
        b_right = data[-1, :, :]
        b_upper = data[:, -1, :]
        b_lower = data[:, 0, :]
        res = data.reshape(-1, 2)

        return res, b_left, b_right, b_upper, b_lower
    
    def analytical_solution(self, x, t, rho):
        """
        Compute the analytical solution u_ana(x, t, rho).

        Args:
            x (torch.Tensor): Spatial points.
            t (torch.Tensor): Temporal points.
            rho (float): Reaction coefficient.

        Returns:
            torch.Tensor: Analytical solution u(x, t, rho).
        """
        h = torch.exp(- (x - torch.pi)**2 / (2 * (torch.pi / 4)**2))
        return h * torch.exp(rho * t) / (h * torch.exp(rho * t) + 1 - h)
    
    def get_interior_points(self, rho):
        """
        Get the interior points (x_res, t_res, rho_res) for a specific rho.

        Args:
            rho (float): The rho value for the current scenario.

        Returns:
            x_res, t_res, rho_res (torch.Tensor): Interior points with rho values.
        """
        res = self.data[rho]['res']
        x_res, t_res = res[:, 0:1], res[:, 1:2]
        rho_res = torch.full_like(x_res, rho)  # Same shape, constant rho
        return x_res, t_res, rho_res
    
    def get_boundary_points(self, rho):
        """
        Get the boundary points (x_left, t_left, etc.) for a specific rho.

        Args:
            rho (float): The rho value for the current scenario.

        Returns:
            Boundary points (torch.Tensor): x, t, and rho values for all boundaries.
        """
        b_left = self.data[rho]['b_left']
        b_right = self.data[rho]['b_right']
        b_upper = self.data[rho]['b_upper']
        b_lower = self.data[rho]['b_lower']
        
        x_left, t_left = b_left[:, 0:1], b_left[:, 1:2]
        x_right, t_right = b_right[:, 0:1], b_right[:, 1:2]
        x_upper, t_upper = b_upper[:, 0:1], b_upper[:, 1:2]
        x_lower, t_lower = b_lower[:, 0:1], b_lower[:, 1:2]
        
        rho_left = torch.full_like(x_left, rho)
        rho_right = torch.full_like(x_right, rho)
        rho_upper = torch.full_like(x_upper, rho)
        rho_lower = torch.full_like(x_lower, rho)
        
        return x_left, t_left, rho_left, x_right, t_right, rho_right, x_upper, t_upper, rho_upper, x_lower, t_lower, rho_lower
    
    def get_boundary_values(self, rho):
        """
        Get the precomputed analytical solutions for the boundary points.

        Args:
            rho (float): The rho value for the current scenario.

        Returns:
            u_left, u_right, u_upper, u_lower (torch.Tensor): Analytical solutions at the boundaries.
        """
        return (self.data[rho]['u_left'], self.data[rho]['u_right'], 
                self.data[rho]['u_upper'], self.data[rho]['u_lower'])
    
    def get_test_points(self, rho):
        """
        Get the test points (res_test) and their spatial and temporal components for a specific rho.

        Args:
            rho (float): The rho value for the current scenario.

        Returns:
            res_test (torch.Tensor): Test points as a tensor.
            x_test, t_test, rho_test (torch.Tensor): Spatial, temporal, and rho components of the test points.
        """
        res_test = self.data[rho]['res']
        x_test, t_test = res_test[:, 0:1], res_test[:, 1:2]
        rho_test = torch.full_like(x_test, rho)  # Same shape, constant rho
        return res_test, x_test, t_test, rho_test

In [5]:
# %%
# Configuration parameters and wandb logging
total_i = 50
model_name = "pinnsformer_params"
config_dict = {
    "total_i": total_i,
    "model": model_name,
    "d_out": 1,
    "d_model": 32,
    "d_hidden": 512,
    "N": 1,
    "heads": 2,
    "bias_fill": 0.01,
    "optimizer": "adam",
    "batch_size": 128,
    "learning_rate": 1e-3
}

run = wandb.init(
    project="pinnsformer",
    config=config_dict,
    settings=wandb.Settings(silent=True)
)

# %%
# Weight initialization function
def init_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            m.bias.data.fill_(config_dict["bias_fill"])

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


In [7]:
model = PINNsformer_params(
    d_out=config_dict["d_out"],
    d_model=config_dict["d_model"],
    d_hidden=config_dict["d_hidden"],
    N=config_dict["N"],
    heads=config_dict["heads"]
).to(device)

model.apply(init_weights)
if config_dict["optimizer"] == "LBFGS":
    optim = LBFGS(model.parameters(), line_search_fn='strong_wolfe')
elif config_dict["optimizer"] == "adam":
    optim = Adam(model.parameters(), lr=config_dict["learning_rate"])

print(model)
print(get_n_params(model))

PINNsformer_params(
  (linear_emb): Linear(in_features=3, out_features=32, bias=True)
  (encoder): Encoder(
    (layers): ModuleList(
      (0): EncoderLayer(
        (attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=32, out_features=32, bias=True)
        )
        (ff): FeedForward(
          (linear): Sequential(
            (0): Linear(in_features=32, out_features=256, bias=True)
            (1): WaveAct()
            (2): Linear(in_features=256, out_features=256, bias=True)
            (3): WaveAct()
            (4): Linear(in_features=256, out_features=32, bias=True)
          )
        )
        (act1): WaveAct()
        (act2): WaveAct()
      )
    )
    (act): WaveAct()
  )
  (decoder): Decoder(
    (layers): ModuleList(
      (0): DecoderLayer(
        (attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=32, out_features=32, bias=True)
        )
        (ff): FeedForward(
          (linear):

In [None]:
""" rho_values = [5.0]
dataset = PDEData(x_range=[0, 2 * np.pi], t_range=[0, 1], rho_values=rho_values, x_points=101, t_points=101)

# %%
# Training loop
loss_track = {i: {} for i in range(total_i)}

for i in tqdm(range(total_i)):
    for rho in rho_values:
        def closure():
            # Get interior points
            x_res, t_res, rho_res = dataset.get_interior_points(rho)
            # Get boundary points and their analytical values
            x_left, t_left, rho_left, x_right, t_right, rho_right, x_upper, t_upper, rho_upper, x_lower, t_lower, rho_lower = dataset.get_boundary_points(rho)
            u_left, u_right, u_upper, u_lower = dataset.get_boundary_values(rho)
            
            # Model predictions for interior and boundary points
            pred_res = model(x_res, t_res, rho_res)
            pred_left = model(x_left, t_left, rho_left)
            pred_right = model(x_right, t_right, rho_right)
            pred_upper = model(x_upper, t_upper, rho_upper)
            pred_lower = model(x_lower, t_lower, rho_lower)

            # Compute derivatives with respect to space and time
            u_x = torch.autograd.grad(pred_res, x_res, grad_outputs=torch.ones_like(pred_res),
                                        retain_graph=True, create_graph=True)[0]
            u_t = torch.autograd.grad(pred_res, t_res, grad_outputs=torch.ones_like(pred_res),
                                        retain_graph=True, create_graph=True)[0]

            # Define the loss terms:
            # PDE residual loss (here we assume u_t - rho * u * (1-u) = 0, adjust as needed)
            loss_res = torch.mean((u_t - rho * pred_res * (1 - pred_res)) ** 2)
            # Boundary condition loss (e.g., matching the solution at t=t_min vs t=t_max)
            loss_bc = torch.mean((pred_upper - pred_lower) ** 2)
            # Initial condition loss (here comparing the left boundary with the analytical solution)
            loss_ic = torch.mean((pred_left[:,0] - u_left[:,0]) ** 2)

            loss = loss_res + loss_bc + loss_ic

            wandb.log({
                "loss": loss.item(),
                "loss_res": loss_res.item(),
                "loss_bc": loss_bc.item(),
                "loss_ic": loss_ic.item(),
                "iteration": i,
                "rho": rho
            })

            # Track losses in a dictionary
            if rho not in loss_track[i]:
                loss_track[i][rho] = {}
            loss_track[i][rho]["loss_res"] = loss_res.item()
            loss_track[i][rho]["loss_bc"] = loss_bc.item()
            loss_track[i][rho]["loss_ic"] = loss_ic.item()

            optim.zero_grad()
            loss.backward(retain_graph=True)
            return loss

        optim.step(closure)

wandb.finish() """

In [None]:
rho_values = [5.0]
dataset = PDEData(x_range=[0, 2 * np.pi], t_range=[0, 1], rho_values=rho_values, x_points=101, t_points=101)

# %%
# Training loop
loss_track = {i: {} for i in range(total_i)}

# Set up Adam optimizer

batch_size = config_dict["batch_size"]
num_epochs = total_i  # total_i from your config

for epoch in range(num_epochs):
    for rho in rho_values:
        # Get interior points and create a mini-batch DataLoader
        x_res, t_res, rho_res = dataset.get_interior_points(rho)
        interior_dataset = TensorDataset(x_res, t_res, rho_res)
        interior_loader = DataLoader(interior_dataset, batch_size=batch_size, shuffle=True)

        # Get full boundary data for current rho
        (x_left, t_left, rho_left,
         x_right, t_right, rho_right,
         x_upper, t_upper, rho_upper,
         x_lower, t_lower, rho_lower) = dataset.get_boundary_points(rho)
        u_left, u_right, u_upper, u_lower = dataset.get_boundary_values(rho)

        x_left, t_left, rho_left = x_left.detach(), t_left.detach(), rho_left.detach()
        u_left, u_right, u_upper, u_lower = u_left.detach(), u_right.detach(), u_upper.detach(), u_lower.detach()
        
        for bx, bt, brho in interior_loader:
            # Ensure mini-batch inputs require gradients for differentiation
            bx.requires_grad_()
            bt.requires_grad_()

            def closure():
                optim.zero_grad()

                # Forward pass on interior mini-batch
                pred_res = model(bx, bt, brho)
                # Compute derivative with respect to time (no retain_graph here)
                u_t = torch.autograd.grad(pred_res, bt, 
                                           grad_outputs=torch.ones_like(pred_res),
                                           retain_graph=True,
                                           create_graph=True)[0]
                loss_res = torch.mean((u_t - rho * pred_res * (1 - pred_res)) ** 2)

                # Forward pass on full-boundary data
                pred_left  = model(x_left, t_left, rho_left)
                pred_right = model(x_right, t_right, rho_right)
                pred_upper = model(x_upper, t_upper, rho_upper)
                pred_lower = model(x_lower, t_lower, rho_lower)
                loss_bc = torch.mean((pred_upper - pred_lower) ** 2)
                loss_ic = torch.mean((pred_left[:, 0] - u_left[:, 0]) ** 2)

                loss = loss_res + loss_bc + loss_ic
                loss.backward(retain_graph=True)
                return loss

            loss_val = optim.step(closure)
            bx, bt, brho = bx.detach(), bt.detach(), brho.detach()
            wandb.log({
                "epoch": epoch,
                "rho": rho,
                "loss": loss_val.item()
            })

    print(f"Epoch {epoch+1}/{num_epochs} complete, Loss: {loss_val.item():.4f}")

wandb.finish()


RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.

In [None]:
# %%
# Print final losses for each rho value
last_iteration = total_i - 1
for rho in rho_values:
    print('Rho: {:4f}, Loss Res: {:4f}, Loss BC: {:4f}, Loss IC: {:4f}'.format(
        rho,
        loss_track[last_iteration][rho]["loss_res"],
        loss_track[last_iteration][rho]["loss_bc"],
        loss_track[last_iteration][rho]["loss_ic"]
    ))
    total_loss = (loss_track[last_iteration][rho]["loss_res"] +
                  loss_track[last_iteration][rho]["loss_bc"] +
                  loss_track[last_iteration][rho]["loss_ic"])
    print('Train Loss: {:4f}'.format(total_loss))
torch.save(model.state_dict(), './1dreaction_pinnsformer_params.pt')

In [None]:
# %%
# Evaluation
errors = []
for rho in rho_values:
    res_test, x_test, t_test, rho_test = dataset.get_test_points(rho)
    u_analytical = dataset.analytical_solution(x_test, t_test, rho).cpu().detach().numpy().reshape(101, 101)

    with torch.no_grad():
        pred = model(x_test.to(device), t_test.to(device), rho_test.to(device))[:, 0:1]
        pred = pred.cpu().detach().numpy().reshape(101, 101)

    rl1 = np.sum(np.abs(u_analytical - pred)) / np.sum(np.abs(u_analytical))
    rl2 = np.sqrt(np.sum((u_analytical - pred)**2) / np.sum(u_analytical**2))
    print(f"Rho: {rho}, Relative L1 error: {rl1:.4f}, Relative L2 error: {rl2:.4f}")
    errors.append((rho, rl1, rl2))


In [None]:
# %%
# Visualization
for rho in rho_values:
    res_test, x_test, t_test, rho_test = dataset.get_test_points(rho)
    u_analytical = dataset.analytical_solution(x_test, t_test, rho).cpu().detach().numpy().reshape(101, 101)
    with torch.no_grad():
        pred = model(x_test.to(device), t_test.to(device), rho_test.to(device))[:, 0:1]
        pred = pred.cpu().detach().numpy().reshape(101, 101)
    abs_error = np.abs(u_analytical - pred)

    plt.figure(figsize=(4, 3))
    plt.imshow(pred, extent=[0, 2*np.pi, 1, 0], aspect='auto')
    plt.xlabel('x')
    plt.ylabel('t')
    plt.title(f'Predicted u(x,t) for rho={rho}')
    plt.colorbar()
    plt.tight_layout()
    plt.savefig(f'./1dreaction_pinnsformer_params_pred_rho_{rho}.png')
    plt.show()

    plt.figure(figsize=(4, 3))
    plt.imshow(u_analytical, extent=[0, 2*np.pi, 1, 0], aspect='auto')
    plt.xlabel('x')
    plt.ylabel('t')
    plt.title(f'Exact u(x,t) for rho={rho}')
    plt.colorbar()
    plt.tight_layout()
    plt.savefig(f'./1dreaction_exact_rho_{rho}.png')
    plt.show()

    plt.figure(figsize=(4, 3))
    plt.imshow(abs_error, extent=[0, 2*np.pi, 1, 0], aspect='auto')
    plt.xlabel('x')
    plt.ylabel('t')
    plt.title(f'Absolute Error for rho={rho}')
    plt.colorbar()
    plt.tight_layout()
    plt.savefig(f'./1dreaction_pinnsformer_params_error_rho_{rho}.png')
    plt.show()