In [None]:
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt   
from scipy.io import loadmat

torch.manual_seed(123)


data = loadmat("data/burgers.mat")
x = data['x']
t = data['t']
usol = data['usol']

<torch._C.Generator at 0x1f3aa0825b0>

In [2]:
class FCN(nn.Module):
    def __init__(self, N_INPUT: int, N_OUTPUT: int, N_HIDDEN: int, N_LAYERS: int):
        super(FCN).__init__()
        self.activation = nn.Tanh
        
        # Input layer
        self.fcs = nn.Sequential(
            nn.Linear(N_INPUT, N_HIDDEN),  
            self.activation()            
        )
        # Hidden layers
        self.fch = self._create_hidden_layers(N_HIDDEN, N_LAYERS)
        
        # Output layer
        self.fce = nn.Linear(N_HIDDEN, N_OUTPUT)  # Output layer

        # Xavier initialization
        self._initialize_weights()

    def _create_hidden_layers(self, N_HIDDEN: int, N_LAYERS: int) -> nn.Sequential:
        # Creates hidden layers for the network.
        layers = []
        for _ in range(N_LAYERS - 1):
            layers.append(nn.Linear(N_HIDDEN, N_HIDDEN)) 
            layers.append(self.activation())
            
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
        input_tensor = torch.cat([x, t], dim=1)
        x = self.fcs(input_tensor)
        x = self.fch(x)
        x = self.fce(x)
        return x
    
    def net_u(self, x: torch.Tensor, t: torch.Tensor,):
        return self.forward(x,t)

In [None]:
model = FCN(2,1,20,8) # inputs, output, hidden_neurons, number_of_layers

# Residual Computation [Physics Informed]
def net_f(x, t, nu):
    x.requires_grad_(True)
    t.requires_grad_(True)

    u = model.net_u(x, t)

    if u is None:
        raise ValueError("u is None. Check the net_u function.")

    u_t = torch.autograd.grad(u, t, 
                              grad_outputs=torch.ones_like(u), 
                              retain_graph=True,
                              create_graph=True)[0]
    
    u_x = torch.autograd.grad(u, x, 
                              grad_outputs=torch.ones_like(u), 
                              retain_graph=True,
                              create_graph=True)[0]
    
    if u_t is None or u_x is None:
        raise ValueError("u_t or u_x is None. Check the gradient computation.")
    
    u_xx = torch.autograd.grad(u_x, x, 
                               grad_outputs=torch.ones_like(u_x), 
                               retain_graph=True,
                               create_graph=True)[0]
    

    if u_xx is None:
        raise ValueError("u_xx is None. Check the second-order gradient computation.")

    f = u_t + u*u_x - nu * u_xx
    return f

# Boundary computation for boundary loss
def net_b():
    # Left boundary condition: initial condition at t=0
    x_left = torch.linspace(-1, 1, 100).reshape(-1, 1)
    t_left = torch.zeros_like(x_left)
    u_left = -torch.sin(torch.pi * x_left)
    
    # Spatial boundary conditions at x=-1 and x=1 at different time points
    t_bound = torch.linspace(0, 1, 100).reshape(-1, 1)
    x_left_bound = torch.ones_like(t_bound) * (-1)
    x_right_bound = torch.ones_like(t_bound) * (1)
    u_left_bound = torch.zeros_like(t_bound)
    u_right_bound = torch.zeros_like(t_bound)
    
    # Combine all boundary points
    x_b = torch.cat([x_left, x_left_bound, x_right_bound], dim=0)
    t_b = torch.cat([t_left, t_bound, t_bound], dim=0)
    u_b = torch.cat([u_left, u_left_bound, u_right_bound], dim=0)
    
    return x_b, t_b, u_b

In [None]:
##### ALL CODE ######
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from scipy.io import loadmat
import os
import csv

# Ensure reproducibility
torch.manual_seed(123)
np.random.seed(123)

# Detect GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

data = loadmat("burgers_shock.mat")
x = torch.tensor(data['x'], dtype=torch.float32, device=device)
t = torch.tensor(data['t'], dtype=torch.float32, device=device)
usol = torch.tensor(data['usol'], dtype=torch.float32, device=device)

# Kinematic viscosity
nu = 0.01/np.pi

# Plotting function for points
def plot_points(x_b, t_b, x_f, t_f):
    plt.figure(figsize=(10, 6))
    plt.scatter(x_b.cpu().numpy(), t_b.cpu().numpy(), color='red', label='Boundary Points')
    plt.scatter(x_f.cpu().numpy(), t_f.cpu().numpy(), color='blue', label='Collocation Points')
    plt.title('Sampling Points')
    plt.xlabel('x')
    plt.ylabel('t')
    plt.legend()
    plt.tight_layout()
    plt.savefig('sampling_points.png')
    plt.close()

# Collocation points generation
def sample_collocation_points(num_points=10000):
    # Sample uniformly from the domain
    x_f = torch.rand(num_points, 1, device=device) * 2 - 1  # x in [-1, 1]
    t_f = torch.rand(num_points, 1, device=device)  # t in [0, 1]
    return x_f, t_f

# Boundary and initial condition points
def net_b():
    # Left boundary condition: initial condition at t=0
    x_left = torch.linspace(-1, 1, 100).reshape(-1, 1).to(device)
    t_left = torch.zeros_like(x_left)
    u_left = -torch.sin(torch.pi * x_left)
    
    # Spatial boundary conditions at x=-1 and x=1 at different time points
    t_bound = torch.linspace(0, 1, 100).reshape(-1, 1).to(device)
    x_left_bound = torch.ones_like(t_bound) * (-1)
    x_right_bound = torch.ones_like(t_bound) * (1)
    u_left_bound = torch.zeros_like(t_bound)
    u_right_bound = torch.zeros_like(t_bound)
    
    # Combine all boundary points
    x_b = torch.cat([x_left, x_left_bound, x_right_bound], dim=0)
    t_b = torch.cat([t_left, t_bound, t_bound], dim=0)
    u_b = torch.cat([u_left, u_left_bound, u_right_bound], dim=0)
    
    return x_b, t_b, u_b

# FCN class (with small modifications for device)
class FCN(nn.Module):
    def __init__(self, N_INPUT: int, N_OUTPUT: int, N_HIDDEN: int, N_LAYERS: int, device=device):
        super().__init__()
        self.device = device
        self.activation = nn.Tanh
        # self.mu = mu
        
        # Input layer
        self.fcs = nn.Sequential(
            nn.Linear(N_INPUT, N_HIDDEN),  
            self.activation()            
        ).to(device)
        # Hidden layers
        self.fch = self._create_hidden_layers(N_HIDDEN, N_LAYERS).to(device)
        
        # Output layer
        self.fce = nn.Linear(N_HIDDEN, N_OUTPUT).to(device)

        # Xavier initialization
        self._initialize_weights()

    def _create_hidden_layers(self, N_HIDDEN: int, N_LAYERS: int) -> nn.Sequential:
        # Creates hidden layers for the network.
        layers = []
        for _ in range(N_LAYERS - 1):
            layers.append(nn.Linear(N_HIDDEN, N_HIDDEN)) 
            layers.append(self.activation())
            
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

    def forward(self, x: torch.Tensor, t: torch.Tensor) -> torch.Tensor:
        input_tensor = torch.cat([x, t], dim=1)
        x = self.fcs(input_tensor)
        x = self.fch(x)
        x = self.fce(x)
        return x
    
    def net_u(self, x: torch.Tensor, t: torch.Tensor):
        return self.forward(x,t)

# Residual Computation [Physics Informed]
def net_f(x, t, nu, model):
    x.requires_grad_(True)
    t.requires_grad_(True)

    u = model.net_u(x, t)

    u_t = torch.autograd.grad(u, t, 
                              grad_outputs=torch.ones_like(u), 
                              retain_graph=True,
                              create_graph=True)[0]
    
    u_x = torch.autograd.grad(u, x, 
                              grad_outputs=torch.ones_like(u), 
                              retain_graph=True,
                              create_graph=True)[0]
    
    u_xx = torch.autograd.grad(u_x, x, 
                               grad_outputs=torch.ones_like(u_x), 
                               retain_graph=True,
                               create_graph=True)[0]

    f = u_t + u*u_x - nu * u_xx
    return f

# Training loop
def train_pinn(model, epochs=20000, learning_rate=1e-3):
    # Prepare data
    x_b, t_b, u_b = net_b()
    x_f, t_f = sample_collocation_points()
    
    # Plot sampling points
    plot_points(x_b, t_b, x_f, t_f)
    
    # Optimizer
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Lists to store loss values
    loss_history = []
    boundary_loss_history = []
    residual_loss_history = []
    
    # Training loop
    for epoch in range(epochs):
        optimizer.zero_grad()
        
        # Boundary loss
        u_pred_b = model.net_u(x_b, t_b)
        loss_b = torch.mean((u_pred_b - u_b)**2)
        
        # Residual loss (Physics Informed)
        f_pred = net_f(x_f, t_f, nu, model)
        loss_f = torch.mean(f_pred**2)
        
        # Total loss
        loss = loss_b + loss_f
        loss.backward()
        optimizer.step()
        
        # Store loss
        loss_history.append(loss.item())
        boundary_loss_history.append(loss_b.item())
        residual_loss_history.append(loss_f.item())
        
        # Print progress
        if epoch % 1000 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item():.4e}, Boundary Loss: {loss_b.item():.4e}, Residual Loss: {loss_f.item():.4e}')
    
    return loss_history, boundary_loss_history, residual_loss_history

# Plot loss vs epochs
def plot_loss(loss_history, boundary_loss_history, residual_loss_history):
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 3, 1)
    plt.plot(loss_history)
    plt.title('Total Loss vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Total Loss')
    plt.yscale('log')
    
    plt.subplot(1, 3, 2)
    plt.plot(boundary_loss_history)
    plt.title('Boundary Loss vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Boundary Loss')
    plt.yscale('log')
    
    plt.subplot(1, 3, 3)
    plt.plot(residual_loss_history)
    plt.title('Residual Loss vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Residual Loss')
    plt.yscale('log')
    
    plt.tight_layout()
    plt.savefig('loss_vs_epochs.png')
    plt.close()

# Write loss data to CSV
def write_loss_to_csv(loss_history, boundary_loss_history, residual_loss_history):
    with open('loss_data.csv', 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Epoch', 'Total Loss', 'Boundary Loss', 'Residual Loss'])
        for epoch, (total, boundary, residual) in enumerate(zip(loss_history, boundary_loss_history, residual_loss_history)):
            csvwriter.writerow([epoch, total, boundary, residual])

# Contour plot
def plot_and_save_contour(model, x, t, usol):
    # Create mesh grid
    X, T = np.meshgrid(x.flatten(), t.flatten())
    
    # Prepare input for prediction
    X_tensor = torch.tensor(X.reshape(-1, 1), dtype=torch.float32, device=device)
    T_tensor = torch.tensor(T.reshape(-1, 1), dtype=torch.float32, device=device)
    
    # Predict
    with torch.no_grad():
        U_pred = model.net_u(X_tensor, T_tensor).cpu().numpy().reshape(X.shape)
    
    # Plotting
    plt.figure(figsize=(12, 5))
    
    # Actual solution
    plt.subplot(1, 2, 1)
    plt.contourf(X, T, usol.T, levels=20, cmap='jet')
    plt.colorbar()
    plt.title('Actual Solution')
    plt.xlabel('x')
    plt.ylabel('t')
    
    # Predicted solution
    plt.subplot(1, 2, 2)
    plt.contourf(X, T, U_pred, levels=20, cmap='jet')
    plt.colorbar()
    plt.title('Predicted Solution')
    plt.xlabel('x')
    plt.ylabel('t')
    
    plt.tight_layout()
    plt.savefig('burgers_contour.png')
    plt.close()

    # Save contours to .dat file
    np.savetxt('burgers_contours.dat', np.column_stack((X.flatten(), T.flatten(), U_pred.flatten())), 
               header='x\tt\tU', comments='')

# Main execution
if __name__ == "__main__":
    # Initialize model on the device
    model = FCN(2, 1, 20, 8, device=device).to(device)
    
    # Train PINN
    loss_history, boundary_loss_history, residual_loss_history = train_pinn(model)
    
    # Plot and save loss data
    plot_loss(loss_history, boundary_loss_history, residual_loss_history)
    write_loss_to_csv(loss_history, boundary_loss_history, residual_loss_history)
    
    # Plot and save contour
    plot_and_save_contour(model, x.cpu(), t.cpu(), usol.cpu())
    
    # Save model
    torch.save(model.state_dict(), 'burgers_pinn_model.pth')
    
    print("Training complete. Check the generated plots, CSV, and .dat files.")

In [None]:
# Another way of computing the boundary conditions
import torch

# Define the neural network (NN)
class NeuralNetwork(torch.nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(2, 50),
            torch.nn.Tanh(),
            torch.nn.Linear(50, 50),
            torch.nn.Tanh(),
            torch.nn.Linear(50, 1)
        )

    def forward(self, x):
        return self.layers(x)

# Initialize NN
nn = NeuralNetwork()

# Define the inputs for each term
# Boundary condition inputs
xj = torch.linspace(0, 1, 100).view(-1, 1).requires_grad_(True)  # x values for boundary condition
tk = torch.zeros(50, 1).requires_grad_(True)  # t values for boundary at x=-1
tl = torch.zeros(50, 1).requires_grad_(True)  # t values for boundary at x=+1

# Interior points for physics-informed loss
Np = 1000  # Number of interior points
xi = torch.rand(Np, 1) * 2 - 1  # Random x values in [-1, 1]
ti = torch.rand(Np, 1)  # Random t values in [0, 1]
input_interior = torch.cat([xi, ti], dim=1)

# Define lambda weights
lambda_1, lambda_2, lambda_3 = 1.0, 1.0, 1.0  # Example weights

# Compute boundary loss L_b
NN_xj_0 = nn(torch.cat([xj, torch.zeros_like(xj)], dim=1))
boundary1 = torch.mean((NN_xj_0 + torch.sin(torch.pi * xj))**2)

NN_minus1_tk = nn(torch.cat([-torch.ones_like(tk), tk], dim=1))
boundary2 = torch.mean((NN_minus1_tk - 0)**2)

NN_plus1_tl = nn(torch.cat([torch.ones_like(tl), tl], dim=1))
boundary3 = torch.mean((NN_plus1_tl - 0)**2)

L_b = lambda_1 * boundary1 + lambda_2 * boundary2 + lambda_3 * boundary3

# Compute physics-informed loss L_p
NN_interior = nn(input_interior)

# Partial derivatives
dNN_dt = torch.autograd.grad(NN_interior, ti, grad_outputs=torch.ones_like(NN_interior), retain_graph=True, create_graph=True)[0]
dNN_dx = torch.autograd.grad(NN_interior, xi, grad_outputs=torch.ones_like(NN_interior), retain_graph=True, create_graph=True)[0]
d2NN_dx2 = torch.autograd.grad(dNN_dx, xi, grad_outputs=torch.ones_like(dNN_dx), retain_graph=True, create_graph=True)[0]

# Physics loss
nu = 0.01  # Viscosity
physics_residual = dNN_dt + NN_interior * dNN_dx - nu * d2NN_dx2
L_p = torch.mean(physics_residual**2)

# Total loss
total_loss = L_b + L_p


In [None]:
import torch
import torch.nn as nn

# Define the neural network architecture
class PINN(nn.Module):
    def __init__(self):
        super(PINN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(2, 50),  # Input: (x, t)
            nn.Tanh(),
            nn.Linear(50, 50),
            nn.Tanh(),
            nn.Linear(50, 1)   # Output: u(x, t)
        )

    def forward(self, x, t):
        # Concatenate x and t along the last dimension
        input_tensor = torch.cat([x, t], dim=1)
        return self.layers(input_tensor)

# Initialize the PINN
model = PINN()

# Define the boundary and physics-informed losses
def loss_fn(model, xj, tk, tl, xi, ti, nu, lambda_1=1.0, lambda_2=1.0, lambda_3=1.0):
    # Boundary Loss (Lb)
    # u(x, 0) = -sin(pi*x)
    NN_xj_0 = model(xj, torch.zeros_like(xj))
    boundary1 = torch.mean((NN_xj_0 + torch.sin(torch.pi * xj))**2)
    
    # u(-1, t) = 0
    NN_minus1_tk = model(-torch.ones_like(tk), tk)
    boundary2 = torch.mean((NN_minus1_tk - 0)**2)
    
    # u(1, t) = 0
    NN_plus1_tl = model(torch.ones_like(tl), tl)
    boundary3 = torch.mean((NN_plus1_tl - 0)**2)

    L_b = lambda_1 * boundary1 + lambda_2 * boundary2 + lambda_3 * boundary3

    # Physics-Informed Loss (Lp)
    NN_interior = model(xi, ti)
    
    # Compute derivatives using autograd
    dNN_dt = torch.autograd.grad(NN_interior, ti, grad_outputs=torch.ones_like(NN_interior), retain_graph=True, create_graph=True)[0]
    dNN_dx = torch.autograd.grad(NN_interior, xi, grad_outputs=torch.ones_like(NN_interior), retain_graph=True, create_graph=True)[0]
    d2NN_dx2 = torch.autograd.grad(dNN_dx, xi, grad_outputs=torch.ones_like(dNN_dx), retain_graph=True, create_graph=True)[0]

    # Physics residual
    physics_residual = dNN_dt + NN_interior * dNN_dx - nu * d2NN_dx2
    L_p = torch.mean(physics_residual**2)

    # Total loss
    return L_b + L_p

# Training parameters
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
nu = 0.01  # Viscosity
epochs = 5000

# Generate training data
xj = torch.linspace(-1, 1, 100).view(-1, 1).requires_grad_(True)  # Boundary x points
tk = torch.rand(50, 1).requires_grad_(True)  # Boundary t points
tl = torch.rand(50, 1).requires_grad_(True)  # Boundary t points

xi = torch.rand(1000, 1) * 2 - 1  # Interior x points in [-1, 1]
ti = torch.rand(1000, 1)  # Interior t points in [0, 1]

# Training loop
for epoch in range(epochs):
    optimizer.zero_grad()
    loss = loss_fn(model, xj, tk, tl, xi, ti, nu)
    loss.backward()
    optimizer.step()

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# After training, you can evaluate the model at desired points (x, t)
