In [1]:
import os
import random
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import loadmat

In [None]:
# Neural Network Class

# Fully connected neural network with number of layers, neurons, input size, output size and activation function as parameters

class NeuralNetwork(nn.Module):
    def __init__(self, num_layers, num_neurons, input_size, output_size, activation_function):
        super(NeuralNetwork, self).__init__()
        self.layers = nn.ModuleList()
        self.activation_function = activation_function

        # Input layer
        self.layers.append(nn.Linear(input_size, num_neurons))

        # Hidden layers
        for _ in range(num_layers - 1):
            self.layers.append(nn.Linear(num_neurons, num_neurons))

        # Output layer
        self.layers.append(nn.Linear(num_neurons, output_size))

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = self.activation_function(layer(x))
        x = self.layers[-1](x)
        return x

## define the activation functions: tanh() and sin()
def tanh(x):
    return torch.tanh(x)

def sin(x):
    return torch.sin(x)

# input size = 2, (x, t)


In [None]:
# initial condition 
def ic_func(x):
    return np.cos(x[:, 0:1]) + 0.1 * np.sin(2 * x[:, 0:1])


In [None]:
# boundary condition

## periodic boundary condition


In [None]:
# Loss Function construction

## Residual loss function

def Residual_NN(x, u, t, u_t, u_xx):
    # Compute the residual of the PDE
    f = u_t + u_xx - np.sin(t) * np.cos(x)

    dy_dx = torch.autograd.grad(y, x, torch.ones_like(y), create_graph=True)[0]
    return f

## Boundary loss function
def Boundary_NN(x, t, u):
    # Compute the boundary condition loss
    u_bc = torch.zeros_like(u)
    return u - u_bc

## Initial loss function
def Initial_NN(x, u):
    # Compute the initial condition loss
    u_ic = ic_func(x)
    return u - u_ic

## training loss function
def train_loss(data, model):
    x = x.requires_grad_()
    t = t.requires_grad_()
    u = u.requires_grad_()
    u_t = u_t.requires_grad_()
    u_xx = u_xx.requires_grad_()


    # Compute the total loss
    res_loss = Residual_NN(x, u, t, u_t, u_xx)
    bc_loss = Boundary_NN(x, t, u)
    ic_loss = Initial_NN(x, u)
    total_loss = res_loss + bc_loss + ic_loss
    return total_loss, res_loss,  ic_loss

import torch

# === Helper: Compute Derivatives for KS Equation ===
def compute_ks_derivatives(model, x, t):
    """
    Given a model that predicts u(x,t), compute:
      u, u_t, u_x, u_xx, u_xxx, and u_xxxx 
    using automatic differentiation.

    Inputs:
      model : the neural network model, taking (x, t)
      x, t  : tensors with requires_grad=True
  
    Returns:
      u, u_t, u_x, u_xx, u_xxx, u_xxxx
    """
    # Make sure x and t require gradients
    x = x.requires_grad_()
    t = t.requires_grad_()

    # Forward pass: predict u
    u = model(x, t)

    # First derivative with respect to time: u_t
    u_t = torch.autograd.grad(
        u, t,
        grad_outputs=torch.ones_like(u),
        retain_graph=True,
        create_graph=True
    )[0]

    # First derivative with respect to space: u_x
    u_x = torch.autograd.grad(
        u, x,
        grad_outputs=torch.ones_like(u),
        retain_graph=True,
        create_graph=True
    )[0]

    # Second derivative with respect to space: u_xx
    u_xx = torch.autograd.grad(
        u_x, x,
        grad_outputs=torch.ones_like(u_x),
        retain_graph=True,
        create_graph=True
    )[0]

    # Third derivative with respect to space: u_xxx
    u_xxx = torch.autograd.grad(
        u_xx, x,
        grad_outputs=torch.ones_like(u_xx),
        retain_graph=True,
        create_graph=True
    )[0]

    # Fourth derivative with respect to space: u_xxxx
    u_xxxx = torch.autograd.grad(
        u_xxx, x,
        grad_outputs=torch.ones_like(u_xxx),
        retain_graph=True,
        create_graph=True
    )[0]

    return u, u_t, u_x, u_xx, u_xxx, u_xxxx


# === Residual Loss Function for KS Equation ===
def residual_loss(model, x_f, t_f, nu):
    """
    Computes the residual loss for the KS equation:
       u_t + u*u_x + u_xx + nu*u_xxxx = 0
  
    Inputs:
      model: neural network that predicts u(x,t)
      x_f, t_f: collocation (interior) points where the PDE is enforced
      nu: parameter nu in the equation

    Returns:
      loss_res: a scalar tensor representing the Mean Squared Error (MSE)
                of the PDE residual.
    """
    # Compute u and necessary derivatives at collocation points:
    u, u_t, u_x, u_xx, _, u_xxxx = compute_ks_derivatives(model, x_f, t_f)
    
    # Compute the PDE residual f:
    # f = u_t + u*u_x + u_xx + nu*u_xxxx
    f = u_t + u * u_x + u_xx + nu * u_xxxx

    # Mean Squared Error of the residual (forcing f=0):
    loss_res = torch.mean(f**2)
    return loss_res


# === Boundary Loss Function for Periodic BC ===
def boundary_loss(model, x_left, t_left, x_right, t_right):
    """
    Computes the boundary loss for enforcing periodic boundary conditions.
    For a periodic domain [a, b] we enforce u(a,t) = u(b,t).

    Inputs:
      model: neural network that predicts u(x,t)
      x_left, t_left: points on the left boundary (e.g., x=a)
      x_right, t_right: corresponding points on the right boundary (e.g., x=b)

    Returns:
      loss_bc: the Mean Squared Error (MSE) between u(x_left,t_left) and u(x_right,t_right).
    """
    u_left = model(x_left, t_left)
    u_right = model(x_right, t_right)

    loss_bc = torch.mean((u_left - u_right)**2)
    return loss_bc


# === Initial Loss Function ===
def initial_loss(model, x_ic, t_ic, u_ic_target):
    """
    Computes the loss for the initial condition:
        u(x, t=0) = u_ic_target(x)

    Inputs:
      model: neural network that predicts u(x,t)
      x_ic, t_ic: the initial condition points (typically t_ic is zero)
      u_ic_target: true initial values obtained from your ic function

    Returns:
      loss_ic: Mean Squared Error (MSE) between network prediction and initial condition target.
    """
    u_ic_pred = model(x_ic, t_ic)
    loss_ic = torch.mean((u_ic_pred - u_ic_target)**2)
    return loss_ic


# === Combined Training Loss Function ===
def train_loss(model, data, nu):
    """
    Computes the total loss by combining:
      - the residual loss (enforcing the KS PDE)
      - the boundary loss (enforcing periodic BC)
      - the initial loss (enforcing the IC)
      
    The `data` dictionary is assumed to have the following keys:
      - 'collocation': tuple (x_f, t_f)
      - 'boundary': tuple (x_left, t_left, x_right, t_right)
      - 'initial': tuple (x_ic, t_ic, u_ic_target)
      
    nu is the KS equation parameter.

    Returns:
      total_loss: the sum of the three loss terms.
      loss_r, loss_b, loss_i: individual loss components.
    """
    # Unpack the data dictionary
    x_f, t_f = data['collocation']
    x_left, t_left, x_right, t_right = data['boundary']
    x_ic, t_ic, u_ic_target = data['initial']

    # Compute each loss term:
    loss_r = residual_loss(model, x_f, t_f, nu)
    loss_b = boundary_loss(model, x_left, t_left, x_right, t_right)
    loss_i = initial_loss(model, x_ic, t_ic, u_ic_target)

    # Combine the losses (you can also weight these if needed)
    total_loss = loss_r + loss_b + loss_i
    return total_loss, loss_r, loss_b, loss_i


In [None]:
# training process
epochs = 5000

loss_history = []
ode_loss_history = []
initial_loss_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
model = NeuralNetwork(num_layers=4, num_neurons=50, input_size=1, output_size=1, activation_function=tanh)
print(model)

x_train = torch.linspace(-2, 2, 100).view(-1, 1)  # Training points

for epoch in range(epochs):
    optimizer.zero_grad()
    total_loss, ode_loss, initial_loss = train_loss(model, x_train)
    total_loss.backward()
    optimizer.step()

    
    loss_history.append(total_loss.item())
    ode_loss_history.append(ode_loss.item())
    initial_loss_history.append(initial_loss.item())

    if epoch % 1000 == 0:
        print(f"Epoch {epoch}, Loss: {total_loss.item():.6f}")




In [None]:
import numpy as np
import torch

def load_training_data(params):
    """
    Load and process training data for the KS equation.
    
    The training data is expected to be stored in a .npy file with shape:
      (num_time_points, num_space_points)
    which represents the solution u(x,t) on a grid. This function creates
    observation points (x, t) corresponding to the values in the data.
    
    Parameters:
      params (dict): A dictionary containing:
          'T'  : Total time domain length (we use first T/2 for training).
          'L'  : Spatial domain length.
          'N'  : Expected number of spatial points.
    
    Returns:
      data_dict (dict): A dictionary with keys:
            'x_data'      : Tensor of x coordinates (shape [N_obs, 1]).
            't_data'      : Tensor of t coordinates (shape [N_obs, 1]).
            'u_data_target': Tensor of u values (observations, shape [N_obs, 1]).
    """
    # Load the training data
    training_data = np.load('../../../data/ks_training.npy')
    
    # Get dimensions from training data
    n_time, n_space = training_data.shape
    
    # Check for consistency with parameters (if provided)
    if 'N' in params and params['N'] != n_space:
        print("Warning: params['N'] (%d) does not match the number of spatial points in the training data (%d). Using data shape." 
              % (params['N'], n_space))
    
    # Create the time axis for the first half of the time domain
    t = np.linspace(0, params['T'] / 2, n_time)
    # Create the spatial axis from 0 to L
    x = np.linspace(0, params['L'], n_space)
    
    # Create an observation grid via meshgrid.
    # X holds spatial coordinates and T_grid holds time coordinates.
    X, T_grid = np.meshgrid(x, t)
    
    # Flatten the grid to generate observation points.
    # Each observation point has the structure [x, t].
    X_train = np.hstack((X.flatten()[:, None], T_grid.flatten()[:, None]))
    
    # Flatten the training data to get corresponding u values.
    y_train = training_data.flatten()[:, None]
    
    # Convert the data to PyTorch tensors.
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    
    # Split the input into x_data and t_data.
    x_data = X_train[:, 0:1]
    t_data = X_train[:, 1:2]
    
    # Pack the tensors in a structured dictionary.
    data_dict = {
        'x_data': x_data,
        't_data': t_data,
        'u_data_target': y_train
    }
    
    return data_dict


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# ===================================================
# 1. Activation Functions
# ===================================================

def tanh(x):
    return torch.tanh(x)

def sin(x):
    return torch.sin(x)

# ===================================================
# 2. Neural Network Definition for (x, t) Inputs
# ===================================================

class NeuralNetwork(nn.Module):
    def __init__(self, num_layers, num_neurons, input_size, output_size, activation_function):
        super(NeuralNetwork, self).__init__()
        self.layers = nn.ModuleList()
        self.activation_function = activation_function

        # Input layer (input_size should be 2 for x and t)
        self.layers.append(nn.Linear(input_size, num_neurons))
        # Hidden layers
        for _ in range(num_layers - 1):
            self.layers.append(nn.Linear(num_neurons, num_neurons))
        # Output layer
        self.layers.append(nn.Linear(num_neurons, output_size))

    def forward(self, x, t):
        # Concatenate spatial and temporal inputs along the feature dimension.
        inp = torch.cat((x, t), dim=1)
        for layer in self.layers[:-1]:
            inp = self.activation_function(layer(inp))
        output = self.layers[-1](inp)
        return output

# ===================================================
# 3. Helper: Compute Derivatives for KS Equation
# ===================================================

def compute_ks_derivatives(model, x, t):
    """
    Computes u, u_t, u_x, u_xx, u_xxx, and u_xxxx using automatic differentiation.
    """
    x = x.requires_grad_()
    t = t.requires_grad_()

    u = model(x, t)

    # Time derivative u_t = du/dt
    u_t = torch.autograd.grad(
        u, t,
        grad_outputs=torch.ones_like(u),
        retain_graph=True,
        create_graph=True
    )[0]

    # First spatial derivative u_x = du/dx
    u_x = torch.autograd.grad(
        u, x,
        grad_outputs=torch.ones_like(u),
        retain_graph=True,
        create_graph=True
    )[0]

    # Second derivative u_xx = d²u/dx²
    u_xx = torch.autograd.grad(
        u_x, x,
        grad_outputs=torch.ones_like(u_x),
        retain_graph=True,
        create_graph=True
    )[0]

    # Third derivative u_xxx = d³u/dx³
    u_xxx = torch.autograd.grad(
        u_xx, x,
        grad_outputs=torch.ones_like(u_xx),
        retain_graph=True,
        create_graph=True
    )[0]

    # Fourth derivative u_xxxx = d⁴u/dx⁴
    u_xxxx = torch.autograd.grad(
        u_xxx, x,
        grad_outputs=torch.ones_like(u_xxx),
        retain_graph=True,
        create_graph=True
    )[0]

    return u, u_t, u_x, u_xx, u_xxx, u_xxxx

# ===================================================
# 4. Loss Functions for the KS Equation
# ===================================================

def residual_loss(model, x_f, t_f, nu):
    """
    Enforce the KS PDE:
      u_t + u*u_x + u_xx + nu*u_xxxx = 0.
    """
    u, u_t, u_x, u_xx, _, u_xxxx = compute_ks_derivatives(model, x_f, t_f)
    f = u_t + u * u_x + nu * u_xx + nu * u_xxxx
    loss_r = torch.mean(f**2)
    return loss_r

def boundary_loss(model, x_left, t_left, x_right, t_right):
    """
    Enforce periodic boundary conditions:
      u(-2, t) = u(2, t).
    """
    u_left = model(x_left, t_left)
    u_right = model(x_right, t_right)
    loss_b = torch.mean((u_left - u_right)**2)
    return loss_b

def initial_loss(model, x_ic, t_ic, u_ic_target):
    """
    Enforce the initial condition:
      u(x, 0) = u_ic_target(x).
    """
    u_ic_pred = model(x_ic, t_ic)
    loss_ic = torch.mean((u_ic_pred - u_ic_target)**2)
    return loss_ic

def data_loss(model, x_data, t_data, u_data_target):
    """
    Data loss: mean squared error between the network prediction and observed data.
    """
    u_pred = model(x_data, t_data)
    loss_d = torch.mean((u_pred - u_data_target)**2)
    return loss_d

def train_loss(model, data, nu):
    """
    Total loss is the sum of the residual, boundary, initial, and data losses.
    
    The provided data dictionary should have the following keys:
      - 'collocation': tuple (x_f, t_f)
      - 'boundary': tuple (x_left, t_left, x_right, t_right)
      - 'initial': tuple (x_ic, t_ic, u_ic_target)
      - 'data': tuple (x_data, t_data, u_data_target)
    """
    x_f, t_f = data['collocation']
    x_left, t_left, x_right, t_right = data['boundary']
    x_ic, t_ic, u_ic_target = data['initial']
    x_data, t_data, u_data_target = data['data']

    loss_r = residual_loss(model, x_f, t_f, nu)
    loss_b = boundary_loss(model, x_left, t_left, x_right, t_right)
    loss_ic = initial_loss(model, x_ic, t_ic, u_ic_target)
    loss_d = data_loss(model, x_data, t_data, u_data_target)

    total_loss = loss_r + loss_b + loss_ic + loss_d
    return total_loss, loss_r, loss_b, loss_ic, loss_d

# ===================================================
# 5. Data Loading Functions
# ===================================================

def load_training_data(params):
    """
    Load and process training data for the KS equation.
    
    The training data is assumed to be stored in a .npy file with shape:
      (num_time_points, num_space_points)
    which represents u(x, t) on a grid. This function uses the first half
    of the time domain and returns the data in a structured dictionary.
    
    Parameters:
      params (dict): Must include:
          'T'  : Total time domain length.
          'L'  : Spatial domain length.
          'N'  : Expected number of spatial points.
          'data_path': Path to the .npy file.
    
    Returns:
      data_dict (dict): Dictionary with keys:
            'x_data'       : Tensor of x coordinates (shape [N_obs, 1]).
            't_data'       : Tensor of t coordinates (shape [N_obs, 1]).
            'u_data_target': Tensor of u observations (shape [N_obs, 1]).
    """
    training_data = np.load(params['data_path'])
    n_time, n_space = training_data.shape

    if 'N' in params and params['N'] != n_space:
        print("Warning: params['N'] (%d) does not match the number of spatial points in the training data (%d). Using data shape."
              % (params['N'], n_space))
    
    # Use the first half of the time domain
    t = np.linspace(0, params['T'] / 2, n_time)
    x = np.linspace(0, params['L'], n_space)
    
    # Create observation grid via meshgrid.
    X, T_grid = np.meshgrid(x, t)
    X_train = np.hstack((X.flatten()[:, None], T_grid.flatten()[:, None]))
    y_train = training_data.flatten()[:, None]
    
    # Convert to PyTorch tensors.
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    
    # Split input into spatial and temporal components.
    x_data = X_train[:, 0:1]
    t_data = X_train[:, 1:2]

    data_dict = {
        'x_data': x_data,
        't_data': t_data,
        'u_data_target': y_train
    }
    
    return data_dict

# ===================================================
# 6. Define Initial Condition Function
# ===================================================

def ic_func(x):
    # For example, initial condition u(x,0) = cos(x)
    return torch.cos(x) + 0.1 * torch.sin(2 * x)

# ===================================================
# 7. Generate Other Training Data Components
# ===================================================

# Domain settings
x_min, x_max = 0.0 , 1.0
t_min, t_max = 0.0, 100

# Collocation points for the PDE residual
N_f = 1000
x_f = torch.rand(N_f, 1)*(x_max-x_min)+x_min
t_f = torch.rand(N_f, 1) * (t_max - t_min) + t_min

# Boundary points for enforcing periodic conditions: u(x_min, t)=u(x_max, t)
N_b = 100
x_left = x_min * torch.ones(N_b, 1)
x_right = x_max * torch.ones(N_b, 1)
t_b = torch.rand(N_b, 1) * (t_max - t_min) + t_min
t_left = t_b
t_right = t_b

# Initial condition points (t = 0)
N_ic = 100
# x_ic = torch.linspace(x_min, x_max, N_ic).view(-1, 1)
x_ic = torch.rand(N_ic, 1)*(x_max-x_min)+x_min
t_ic = torch.zeros(N_ic, 1)
u_ic_target = ic_func(x_ic)

# Load observed training data from file
params = {'T': 100.0, 'L': 1.0, 'N': 2048, 'data_path': '../../../data/ks_training.npy'}
data_obs = load_training_data(params)
# Use keys 'x_data', 't_data', 'u_data_target' from the loaded data dictionary.

# Bundle all components into a single data dictionary for loss evaluation.
data = {
    'collocation': (x_f, t_f),
    'boundary': (x_left, t_left, x_right, t_right),
    'initial': (x_ic, t_ic, u_ic_target),
    'data': (data_obs['x_data'], data_obs['t_data'], data_obs['u_data_target'])
}

# ===================================================
# 8. Construct the PINN Model and Trainable Parameter nu
# ===================================================

# Model: input dimension is 2 (x and t), output dimension is 1.
model = NeuralNetwork(num_layers=4, num_neurons=50, input_size=2, output_size=1, activation_function=tanh)
print(model)

# Unknown parameter nu defined as a trainable parameter.
nu = torch.nn.Parameter(torch.tensor([0.5], dtype=torch.float32))

# ===================================================
# 9. Training Process
# ===================================================

optimizer = optim.Adam(list(model.parameters()) + [nu], lr=0.001)
epochs = 5000

loss_history = []
residual_loss_history = []
boundary_loss_history = []
initial_loss_history = []
data_loss_history = []

for epoch in range(epochs):
    optimizer.zero_grad()
    total_loss, loss_r, loss_b, loss_ic, loss_d = train_loss(model, data, nu)
    total_loss.backward()
    optimizer.step()
    
    loss_history.append(total_loss.item())
    residual_loss_history.append(loss_r.item())
    boundary_loss_history.append(loss_b.item())
    initial_loss_history.append(loss_ic.item())
    data_loss_history.append(loss_d.item())
    
    if epoch % 1000 == 0:
        print(f"Epoch {epoch}: Total Loss={total_loss.item():.6f}, Residual Loss={loss_r.item():.6f}, "
              f"Boundary Loss={loss_b.item():.6f}, Initial Loss={loss_ic.item():.6f}, "
              f"Data Loss={loss_d.item():.6f}, nu={nu.item():.6f}")

print("Training complete. Final nu =", nu.item())


NeuralNetwork(
  (layers): ModuleList(
    (0): Linear(in_features=2, out_features=50, bias=True)
    (1-3): 3 x Linear(in_features=50, out_features=50, bias=True)
    (4): Linear(in_features=50, out_features=1, bias=True)
  )
)
Epoch 0: Total Loss=4.897291, Residual Loss=0.000009, Boundary Loss=0.000017, Initial Loss=0.551996, Data Loss=4.345269, nu=0.499001
