<a href="https://colab.research.google.com/github/bshi04/deep-learning-fa-2025/blob/main/Github_PINNs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this project, we solve the steady-state 2D convection-diffusion equation on a unit square domain, discretized using a $400 \times 400$ grid. The solution is governed by the following equation,
$$
-\varepsilon \nabla^2 u(x,y) + \mathbf{U}\cdot\nabla u(x,y) = f(x,y),
$$
where $u(x,y)$ is the scalar field of interest, $\varepsilon$ denotes thed iffusion coefficient, $\mathbf{U}$ is the constant advection velocity vector, and $f(x,y)$ is a spatially varying source term.

Reference solutions are generated using a custom finite-difference solver implemented in Python. The diffusion operator is approximated using a second-order central-difference scheme to ensure high spatial accuracy. A Dirichlet boundary condition,
$$
u = 0,
$$
is imposed on the left and right walls of the domain. A Neumann (zero-flux) boundary condition,
$$
\frac{\partial u}{\partial n} = 0,
$$
is applied on all remaining boundaries. For the PINN experiments, collocation points are sampled throughout the same square domain. The governing PDE is enforced at each collocation point, and the resulting predictions are compared against the numerically computed fine-mesh solution to evaluate model accuracy and convergence behavior.


### 1a. Generate Reference Data

Defines relevant constants and paramters.

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from scipy.sparse import lil_matrix, csc_matrix
from scipy.sparse.linalg import spsolve

# Initialize coefficients
U = 0.1
eps = 0.01

#  grid
Nx, Ny = 400, 400
Lx, Ly = 1.0, 1.0

# variable source
s0 = 0.1
beta = 1.0

# f(x,y) decreasing in y
def source_fn(y):
    if isinstance(y, torch.Tensor):
        return s0 * torch.exp(-beta * y) # supports GPU
    return s0 * np.exp(-beta * y) # supports CPU

# PINN params
num_epochs = 10000

### 1b. Generate Reference Data using Finite Difference Method

Finite-difference solver to generate the reference solution, `U_ref`, and the grid coordinates `X`, `Y`.

In [None]:
# Finite Difference
def fd_solver(U: float, eps: float, Nx: int, Ny: int, Lx: float, Ly: float):
    """Compute finite difference solution on specified grid.

    Args:
        U (float): Convection coefficient.
        eps (float): Difffusion coefficient.
        Nx (int): number of grid points in x-direction.
        Ny (int): number of grid points in y-direction.
        Lx (float): length of domain in x-direction.
        Ly (float): length of domain in y-direction.

    Returns:
        dict: X-coords, Y-coords, and solution u on the grid.
    """
    hx, hy = Lx / Nx, Ly / Ny
    x = np.linspace(0.0, Lx, Nx + 1)
    y = np.linspace(0.0, Ly, Ny + 1)

    # map (i,j) to global index
    def idx(i, j):
        return j * (Nx + 1) + i

    Ntot = (Nx + 1) * (Ny + 1)
    A = lil_matrix((Ntot, Ntot))
    f_hat = np.zeros(Ntot)

    # interior points
    for j in range(1, Ny):
        for i in range(1, Nx):
            p = idx(i, j)

            # convection term: U * u_x (central difference)
            A[p, idx(i + 1, j)] += U / (2.0 * hx)
            A[p, idx(i - 1, j)] -= U / (2.0 * hx)

            # diffusion term: -eps * (u_xx + u_yy)
            A[p, idx(i + 1, j)] += -eps / hx**2
            A[p, idx(i - 1, j)] += -eps / hx**2
            A[p, p] += 2.0 * eps / hx**2

            # u_yy part
            A[p, idx(i, j + 1)] += -eps / hy**2
            A[p, idx(i, j - 1)] += -eps / hy**2
            A[p, p] += 2.0 * eps / hy**2

            # source term f(y_j)
            f_hat[p] = source_fn(y[j])

    # left wall: Dirichlet u=0
    for j in range(Ny + 1):
        p = idx(0, j)
        A[p, :] = 0.0
        A[p, p] = 1.0
        f_hat[p] = 0.0

    # right wall: Dirichlet u=0
    for j in range(Ny + 1):
        p = idx(Nx, j)
        A[p, :] = 0.0
        A[p, p] = 1.0
        f_hat[p] = 0.0

    # bottom: Neumann du/dy=0
    for i in range(1, Nx):
        p = idx(i, 0)
        A[p, :] = 0.0
        A[p, idx(i, 0)] = -1.0 / hy
        A[p, idx(i, 1)] =  1.0 / hy
        f_hat[p] = 0.0

    # top: Neumann du/dy=0
    for i in range(1, Nx):
        p = idx(i, Ny)
        A[p, :] = 0.0
        A[p, idx(i, Ny - 1)] = -1.0 / hy
        A[p, idx(i, Ny)]     =  1.0 / hy
        f_hat[p] = 0.0

    u = spsolve(csc_matrix(A), f_hat).reshape((Ny + 1, Nx + 1))
    X, Y = np.meshgrid(x, y)

    return {"X": X, "Y": Y, "u": u}

### 1c. PINN Architectures

In [None]:
class MLP(nn.Module):
    """Generic MLP used as a building block for advection and diffusion subnetworks."""
    def __init__(
        self,
        in_dim=2,
        out_dim=1,
        hidden_dim=64,
        num_hidden_layers=6,
        act_fn=nn.Tanh(), # additional experiments modify silu
    ):
        super().__init__()
        layers = []
        layers.append(nn.Linear(in_dim, hidden_dim))
        layers.append(act_fn)
        for _hidden in range(num_hidden_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(act_fn)

        layers.append(nn.Linear(hidden_dim, out_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

# vanilla pinn
class PINN(nn.Module):
    """Standard fully connected PINN network to approximate u(x, y)."""
    def __init__(self,
                 in_dim=2,
                 out_dim=1,
                 hidden_dim=64,
                 num_hidden_layers=6):
        super().__init__()
        layers = []
        layers.append(nn.Linear(in_dim, hidden_dim))
        layers.append(nn.Tanh())

        for _hidden in range(num_hidden_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.Tanh())
        layers.append(nn.Linear(hidden_dim, out_dim))
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x) # x: [N, 2] = (x, y)

# gated approach
class SplitPINN(nn.Module):
    """Two-branch PINN to seperately capture advection and diffusion terms.
       Contributions are re-coupled through a parallel gating branch.
    """
    def __init__(
        self,
        in_dim=2,
        out_dim=1,
        hidden_dim=45,
        num_hidden_layers=6,
        silu_bool=True,
    ):
        super().__init__()

        # defaults to silu split on adv branch due to improved convergence
        if silu_bool:
            self.adv_net  = MLP(in_dim, out_dim, hidden_dim, num_hidden_layers, act_fn=nn.SiLU())
        else:
            self.adv_net  = MLP(in_dim, out_dim, hidden_dim, num_hidden_layers)

        self.diff_net = MLP(in_dim, out_dim, hidden_dim, num_hidden_layers)

        # small subnet to learn fusion gate
        self.gate_net = nn.Sequential(
            nn.Linear(2, 20),
            nn.Tanh(),
            nn.Linear(20, 1),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        u_adv  = self.adv_net(x)
        u_diff = self.diff_net(x)
        gate = self.gate_net(x)

        u = (1 - gate) * u_diff + gate * u_adv
        return u

# addition re-coupling approach
class AddSplitPINN(nn.Module):
    """Two-branch PINN to seperately capture advection and diffusion terms.
       Contributions are re-coupled via addition.
    """
    def __init__(
        self,
        in_dim=2,
        out_dim= 1,
        hidden_dim=45,
        num_hidden_layers=6,
    ):
        super().__init__()
        self.adv_net  = MLP(in_dim, out_dim, hidden_dim, num_hidden_layers)
        self.diff_net = MLP(in_dim, out_dim, hidden_dim, num_hidden_layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        u_adv  = self.adv_net(x)
        u_diff = self.diff_net(x)
        u = u_adv + u_diff
        return u


# feature mixing approach
class MixedPINN(nn.Module):
  """Advection and diffusion branches each produce feature vectors that are concatenated
     and mapped to the scalar output u through a final linear layer.
  """
  def __init__(
        self,
        in_dim=2,
        out_dim=1,
        hidden_dim=44,
        num_hidden_layers=6,
    ):
        super().__init__()

        self.adv_features  = MLP(
            in_dim=in_dim,
            out_dim=10,
            hidden_dim=hidden_dim,
            num_hidden_layers=num_hidden_layers,
            act_fn=nn.SiLU()
        )

        self.diff_features = MLP(
            in_dim=in_dim,
            out_dim=10,
            hidden_dim=hidden_dim,
            num_hidden_layers=num_hidden_layers,
        )

        self.final_layer = nn.Linear(20, 1)

  def forward(self, x: torch.Tensor) -> torch.Tensor:
        f_adv  = self.adv_features(x)
        f_diff = self.diff_features(x)
        combined = torch.cat([f_adv, f_diff], dim=1) # feature vectors concatenated
        return self.final_layer(combined)
