<a href="https://colab.research.google.com/github/KatHaruto/PINNs_example/blob/main/SFHCPINN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

difference between $SFHCPINN_{NN}$ and $SFHCPINN$ is whether distance function adn extension functino are numerically determined , or computed using NN that trained before PINNs training.

In [2]:
import sys
sys.version

'3.11.11 (main, Dec  4 2024, 08:55:07) [GCC 11.4.0]'

In [3]:
from functools import partial
from typing import Literal,TypeAlias, Protocol # type alias statement requires Python 3.12 or newer
import warnings

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from scipy.stats import qmc


In [4]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [5]:


SAMPLING_STRATEGY: TypeAlias = Literal["uniform", "LHS", "Sobol"] # todo: "adaptive"


In [6]:
class Sin(nn.Module):
  def __init__(self):
    super(Sin, self).__init__()
  def forward(self, x):
    return torch.sin(x)

class FourierFeatureLayer(nn.Module):
    def __init__(self,input_dim, hidden_size, k):
      super(FourierFeatureLayer, self).__init__()
      self.k = k
      self.w1 = nn.Linear(input_dim, hidden_size)
      self.w2 = nn.Linear(input_dim, hidden_size)

    def forward(self, x):
      x1 = self.w1(self.k * x)
      x2 = self.w2(self.k * x)
      return torch.cat([torch.cos(x1), torch.sin(x2)] , dim=1)

class PINN(nn.Module):
  def __init__(self, *, input_dim, output_dim, hidden_size_list, frequency=None):
    super(PINN, self).__init__()
    layers = []

    for i, hidden_size in enumerate(hidden_size_list):
      if i == 0 and frequency is not None:
        # first frequency fourier feature mapping
        layers.append(FourierFeatureLayer(input_dim, hidden_size, frequency))
        input_dim = hidden_size * 2
      else:
        layers.append(nn.Linear(input_dim, hidden_size))
        layers.append(Sin())
        input_dim = hidden_size
    layers.append(nn.Linear(input_dim, output_dim))

    self.layers = nn.Sequential(*layers)



  def forward(self, x):
    return self.layers(x)


class SFPINN(nn.Module):
  def __init__(self, input_dim, output_dim,  subn_hidden_size_list: list[list[int]], frequencies: list[int]):
    super(SFPINN, self).__init__()
    if len(subn_hidden_size_list) != len(frequencies):
      raise ValueError("subn_hidden_size_list and frequencies must have the same length")

    self.subnetworks = nn.ModuleList(
        [
          PINN(
            input_dim=input_dim,
            output_dim=output_dim,
            hidden_size_list=hidden_size_list,
            frequency=freq
          ) for hidden_size_list,freq in zip(subn_hidden_size_list, frequencies)
        ]
    )

    self.w = nn.Linear(output_dim * len(frequencies), output_dim)



  def forward(self, x):
    out = torch.cat([subn(x) for subn in self.subnetworks], dim=1)
    return self.w(out)

# one dimensional ADE

## Governing Equation
$$
\frac{\partial u(x,t)}{\partial t}  = \tilde{p} \frac{\partial^2u(x,t)}{\partial t} - \tilde{q} \frac{\partial u(x,t)}{\partial x} + f(x,t), for\,x \in [a,b]\, and\, t \in [t_0,T]
$$

### I/C and B/C
$$
u(x,t_0) = A_0(x), \\
u(a,t) = A_1(t), \\
u(b,t) = A_2(t), \\
\frac{\partial u(a,t)}{\partial x} = N_1(t), \\
\frac{\partial u(b,t)}{\partial x} = N_2(t)
$$

### Solution is folloing equation
$$
u(x,t) =  e^{-\alpha t}[sin(\pi x) + 0.1 sin(\beta \pi x)]
$$

In [7]:
p_hat = 0.02
q_hat = 0.01

Omega = (0., 2.)
T = (0.,5.)

alpha = 0.1
beta = 30

In [16]:
def U(xt):
  x,t = xt[:,0], xt[:,1]
  return torch.exp(-alpha * t) * (torch.sin(torch.pi * x) + 0.1 * torch.sin(beta * torch.pi * x))

def du_dt(xt):
  return -alpha * U(xt)

def du_dx(xt):
  x,t = xt[:,0], xt[:,1]
  return torch.exp(-alpha * t) * (torch.pi * torch.cos(torch.pi * x) + 0.1 * beta * torch.pi * torch.cos(beta * torch.pi * x))

def d2u_dx2(xt):
  x,t = xt[:,0], xt[:,1]
  return torch.exp(-alpha * t) * (- torch.pi**2 * torch.sin(torch.pi * x) - 0.1 * beta**2 * torch.pi**2 * torch.sin(beta * torch.pi * x))

def D(xt):
  x,t = xt[:,0], xt[:,1]
  return x * (2-x) * t / ( 5 * 2**2)

def G(xt):
  x,t = xt[:,0], xt[:,1]
  return torch.sin(torch.pi * x) + 0.1 * torch.sin(beta * torch.pi * x)

In [9]:
epoch = 50_000
N_r = 8_000

## SFHCPINN does not need initial/boundary points.
# N_b = 4_000
# N_i = 3_000

initial_lr = 0.01
decay_rate = 0.975
decay_step_size = 100

gamma_0 = 20




In [10]:
class Sampler(Protocol):
  def __init__(self,dim: int): ...
  def sample(self, boundary, num): ...

class UniformSampler:
  def __init__(self,dim):
    self.dim = dim
  def sample(self, boundary:list[list[int]], num:int):
    return np.random.uniform(size=(num, self.dim)) * (boundary[:,1] - boundary[:,0]) + boundary[:,0]

class LHSSampler:
  def __init__(self,dim):
    self.dim = dim
    self.sampler = qmc.LatinHypercube(d=dim)
  def sample(self, boundary:list[list[int]], num:int):
    return self.sampler.random(num) * (boundary[:,1] - boundary[:,0]) + boundary[:,0]

class SobolSampler:
  def __init__(self,dim):
    self.dim = dim
    self.sampler = qmc.Sobol(d=dim, scramble=True)

  def sample(self, boundary:list[list[int]], num:int):
    if not np.log2(num).is_integer():
      warnings.warn("number of samples should be power of 2")
    return self.sampler.random(num) * (boundary[:,1] - boundary[:,0]) + boundary[:,0]


In [11]:
def gen_sample(sampling_num:int, boundary: list, strategy: SAMPLING_STRATEGY):
  def _get_sampler(strategy: SAMPLING_STRATEGY, dim) -> Sampler:
    if strategy == "uniform":
      return UniformSampler(dim)
    if strategy == "LHS":
      return LHSSampler(dim)
    if strategy == "Sobol":
      return SobolSampler(dim)
    raise ValueError(f"Unknown sampling strategy: {strategy}")

  dim = len(boundary)
  return torch.from_numpy(_get_sampler(strategy, dim).sample(boundary, sampling_num)).to(torch.float)

In [12]:
def generate_collocation_data(N,boundary,gen_sample_func):
  return gen_sample_func(N,boundary)

In [13]:
def compute_penalty(gamma_0, current_epoch, total_epoch):
  if current_epoch < 0.1 * total_epoch:
    return gamma_0
  if current_epoch < 0.2 * total_epoch:
    return 10 * gamma_0
  if current_epoch < 0.25 * total_epoch:
    return 50 * gamma_0
  if current_epoch < 0.5 * total_epoch:
    return 100 * gamma_0
  if current_epoch < 0.75 * total_epoch:
    return 200 * gamma_0
  return 500 * gamma_0

In [17]:


def train(model, boundary, pde_residual, optimizer, scheduler, epoch,sampling_stragtegy='uniform'):
  gen_sample_func = partial(gen_sample, strategy=sampling_stragtegy)
  losses = []
  for e in tqdm(range(epoch)):
    model.train()
    optimizer.zero_grad()

    xt = generate_collocation_data(N_r, boundary,gen_sample_func).to(device)

    assert xt.size() == torch.Size([N_r, 2])
    xt_ = xt.clone().detach().requires_grad_(True)
    out = model(xt_)
    g = G(xt_)
    d = D(xt_)
    u = g + d * out
    #print(f"u: {u.mean().item()} g: {g.mean().item()} d: {d.mean().item()}")
    res = pde_residual(u,xt_,e)
    #print(f" res mean: {torch.mean(res).item()}, res sqaured mean: {torch.mean(torch.square(res)).item()}")
    loss = torch.mean(torch.square(res))
    loss.backward()

    optimizer.step()
    scheduler.step()
    losses.append(loss.item())

    if e % 1000 == 0:
      print(f"epoch: {e}, loss: {loss.item():.5f}")
      with torch.no_grad():
        model.eval()
        test_xt = generate_collocation_data(10_000, boundary,gen_sample_func).to(device)
        test_out = model(xt)
        g = G(xt_)
        d = D(xt_)
        u_pred = g + d * out

        u_true = U(xt)
        mse = torch.mean(torch.square(u_pred - u_true))
        rel = torch.square(u_pred - u_true).sum() / torch.square(u_true).sum()
        print(f"MSE: {mse.item():.5f}")
        print(f"rel: {rel.item():.5f}")



  return losses[-1]




In [18]:
boundary = np.array([[0.,2.],[0.,5.]])


def source_fn(xt):
    return du_dt(xt) - p_hat * d2u_dx2(xt) + q_hat * du_dx(xt)

def pde_residual(u,X, current_epoch):
    #X_pde = X
    # X_pde = X.clone().detach().requires_grad_(True)
    grad_u = torch.autograd.grad(u, X, torch.ones_like(u), create_graph=True)[0]
    du_dx = grad_u[:,0:1]
    du_dt = grad_u[:,1:2]

    # 2次微分
    d2u_dx2 = torch.autograd.grad(du_dx, X, torch.ones_like(du_dx), create_graph=True)[0][:,0:1]

    S_val = source_fn(X)

    # cのPDE残差
    res_c = du_dt - p_hat * d2u_dx2 + q_hat * du_dx - S_val

    return res_c

number_of_subnetworks = 20
# model = PINN(input_dim=2,output_dim=1, hidden_size_list=[100,150,80,80,50])
model = SFPINN(input_dim=2,output_dim=1,subn_hidden_size_list=[[10,25,20,20,10] for _ in range(number_of_subnetworks)],frequencies=list(range(1,21)))
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=initial_lr)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=decay_step_size, gamma=decay_rate)


train(model, boundary, pde_residual, optimizer, scheduler, epoch)

  0%|          | 0/50000 [00:00<?, ?it/s]

epoch: 0, loss: 10136254464.00000


  0%|          | 1/50000 [00:05<80:36:11,  5.80s/it]

MSE: 0.02897
rel: 724.41663


  0%|          | 98/50000 [08:03<68:22:04,  4.93s/it]


KeyboardInterrupt: 

In [None]:
# # plot
# import matplotlib.pyplot as plt

# xt = generate_collocation_data(N_r, boundary,partial(gen_sample, strategy='Sobol'))
# xx,tt = torch.meshgrid(xt[:,0],xt[:,1])
# d = D(xt).unsqueeze(0).detach().cpu().numpy()
# plt.scatter(xt[:,0],xt[:,1], c=d)
# plt.colorbar()
# plt.xlabel('x')
# plt.ylabel('t')
# plt.title('D(x,t)')
# plt.show()