In [1]:
!git clone https://github.com/broccubali/NoisyICML.git

fatal: destination path 'NoisyICML' already exists and is not an empty directory.


In [2]:
!pip install pyDOE



In [3]:
import torch
import torch.nn as nn
import numpy as np
from scipy.io import loadmat
from tqdm import tqdm
from pyDOE import lhs
import torch.optim as optim

In [4]:
# class PINN(nn.Module):
#     def __init__(self, input_size, hidden_size, output_size):
#         super(PINN, self).__init__()
#         self.layers = nn.ModuleList(
#             [
#                 nn.Linear(input_size if i == 0 else hidden_size, hidden_size)
#                 if i % 2 == 0
#                 else nn.Tanh()
#                 for i in range(10)
#             ]
#         )
#         self.layers.append(nn.Linear(hidden_size, output_size))

#         # Trainable parameter for the wave number squared (k^2)
#         self.k2 = nn.Parameter(torch.tensor([30.0], dtype=torch.float32, device="cuda"))

#         # Optimizer
#         self.optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
#         self.optimizer.param_groups[0]["params"].append(self.k2)

#         self.loss = nn.MSELoss()

#     def forward(self, x):
#         for layer in self.layers:
#             x = layer(x)
#         return x

#     def loss_fn(self, x, u):
#         u_pred = self.forward(x)
#         return self.loss(u_pred, u)

#     # def residual_loss(self, xtrain):
#     #     g = xtrain.clone()
#     #     g.requires_grad = True
#     #     u_pred = self.forward(g)

#     #     # Compute gradients
#     #     u_grad = torch.autograd.grad(
#     #         u_pred, g, torch.ones_like(u_pred), retain_graph=True, create_graph=True
#     #     )[0]
#     #     u_lap = torch.autograd.grad(
#     #         u_grad, g, torch.ones_like(u_grad), create_graph=True
#     #     )[0].sum(dim=1, keepdim=True)

#     #     # Residual form of the Helmholtz equation
#     #     residual = u_lap + self.k2 * u_pred
#     #     return self.loss(residual, torch.zeros_like(residual))

#     def residual_loss(self, xtrain, fhat):
#         g = xtrain.clone()
#         g.requires_grad = True
#         u_pred = self.forward(g)

#         # Compute gradients
#         u_grad = torch.autograd.grad(
#             u_pred, g, torch.ones_like(u_pred), create_graph=True, retain_graph=True
#         )[0]
#         u_xx = torch.autograd.grad(
#             u_grad[:, [0]], g, torch.ones_like(u_grad[:, [0]]), create_graph=True
#         )[0][:, [0]]
#         u_tt = torch.autograd.grad(
#             u_grad[:, [1]], g, torch.ones_like(u_grad[:, [1]]), create_graph=True
#         )[0][:, [1]]

#         # Residual calculation
#         residual = u_xx + u_tt + self.k2 * u_pred - fhat
#         return torch.mean(residual**2)  # Mean squared error for the residual


#     def total_loss(self, xtrain, utrain):
#         alpha_female = 10.0
#         data_loss = self.loss_fn(xtrain, utrain)  # Match observed data
#         physics_loss = self.residual_loss(xtrain)  # Enforce governing equations
#         return data_loss + alpha_female* physics_loss

#     def train_model(self, xtrain, utrain, epochs=10000):
#         for epoch in tqdm(range(epochs)):
#             self.optimizer.zero_grad()
#             loss = self.total_loss(xtrain, utrain)
#             loss.backward()
#             self.optimizer.step()

#             # Logging
#             if epoch % 1000 == 0:
#                 print(
#                     f"Epoch {epoch}, Loss {loss.item()}, "
#                     f"k^2 (Wave Number Squared) {self.k2.item()}"
#                 )

In [38]:
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from sklearn.preprocessing import StandardScaler
import torch.nn.init as init

class PINN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(PINN, self).__init__()

        # Define network layers with alternating Linear and Tanh
        self.layers = nn.ModuleList(
            [
                nn.Linear(input_size if i == 0 else hidden_size, hidden_size)
                if i % 2 == 0
                else nn.Tanh()
                for i in range(20)
            ]
        )
        self.layers.append(nn.Linear(hidden_size, output_size))  # Final output layer

        for layer in self.layers:
            if isinstance(layer, nn.Linear):
                init.xavier_normal_(layer.weight)
                if layer.bias is not None:
                    init.zeros_(layer.bias)

        # Trainable parameter for wave number squared (k²)
        self.k2 = nn.Parameter(torch.tensor([25.0], dtype=torch.float32, device="cuda"))

        # Loss function
        self.loss = nn.MSELoss()

        # Optimizer and scheduler
        self.physics_optimizer = Adam(self.parameters(), lr=1e-2)
        self.loss_optimizer = Adam(self.parameters(), lr=1e-2)
        self.physics_optimizer.param_groups[0]["params"].append(self.k2)
        self.loss_optimizer.param_groups[0]["params"].append(self.k2)
        self.scheduler_physics = StepLR(self.physics_optimizer, step_size=1000, gamma=0.5)  # Adjusted for frequent updates
        self.scheduler_loss = StepLR(self.loss_optimizer, step_size=1000, gamma=0.5)  # Adjusted for frequent updates

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

    def loss_fn(self, x, u):
        u_pred = self.forward(x)
        return self.loss(u_pred, u)

    def residual_loss(self, xtrain):
        # Enable gradient computation for second-order derivatives
        g = xtrain.clone().requires_grad_(True)
        u_pred = self.forward(g)

        # first-order derivatives
        u_grad = torch.autograd.grad(
            u_pred, g, torch.ones_like(u_pred), create_graph=True, retain_graph=True
        )[0]

        # second-order derivatives
        u_xx = torch.autograd.grad(
            u_grad[:, 0], g, torch.ones_like(u_grad[:, 0]), create_graph=True, retain_graph=True
        )[0][:, [0]]
        u_tt = torch.autograd.grad(
            u_grad[:, 1], g, torch.ones_like(u_grad[:, 1]), create_graph=True, retain_graph=True
        )[0][:, [1]]

        # helmholtz residual
        fhat = -self.k2 * torch.sin(torch.sqrt(self.k2) * g[:,[0]]) * torch.sin(torch.sqrt(self.k2) * g[:, [1]])
        # print(fhat)
        residual = u_xx + u_tt + self.k2 * u_pred - fhat
        return torch.mean(residual**2)

    def total_loss(self, xtrain, utrain):
        data_loss = self.loss_fn(xtrain, utrain)
        physics_loss = self.residual_loss(xtrain)
        # print("data loss", data_loss, "physics loss", physics_loss)
        # Reduced physics loss weight for better stability
        return data_loss + 100 * physics_loss

    def train_model(self, xtrain, utrain, epochs=10000):
        for epoch in tqdm(range(epochs)):
            self.loss_optimizer.zero_grad()
            self.physics_optimizer.zero_grad()
            loss = self.loss_fn(xtrain, utrain)
            # print("data loss", loss.item())
            loss.backward()
            self.loss_optimizer.step()
            loss = self.residual_loss(xtrain)
            # print("physics loss", loss.item())
            loss.backward()
            self.physics_optimizer.step()
            # Gradient clipping to stabilize training
            # torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)
            self.physics_optimizer.step()
            self.loss_optimizer.step()
            self.scheduler_physics.step()
            self.scheduler_loss.step()
            # self.scheduler.step()

            if epoch % 1000 == 0:
                print(f"Epoch {epoch}, Loss: {loss.item():.6f}, k²: {self.k2.item():.6f}")



In [37]:
u = np.load("/content/NoisyICML/pinns-inverse/Helmholtz/helmholtz_solution.npy") # BOB THE BUILDER
x = np.load("/content/NoisyICML/pinns-inverse/Helmholtz/x_coordinate.npy")  # BOB THE BUILDER
t = np.load("/content/NoisyICML/pinns-inverse/Helmholtz/t_coordinate.npy")

In [39]:
x = torch.tensor(x, dtype=torch.float32)
t = torch.tensor(t, dtype=torch.float32)
u = torch.tensor(u, dtype=torch.float32).T

In [40]:
X, T = np.meshgrid(x, t)
xtrue = np.hstack((X.flatten()[:, None], T.flatten()[:, None]))
utrue = u.flatten()[:, None]

In [41]:
X.shape, T.shape, u.shape

((100, 256), (100, 256), torch.Size([100, 256]))

In [42]:
idx = np.random.choice(xtrue.shape[0], 10000, replace=False)
xtrain = xtrue[idx, :]
utrain = u.flatten()[idx][:, None]

In [43]:
utrain.shape, xtrain.shape

(torch.Size([10000, 1]), (10000, 2))

In [44]:
device = torch.device("cuda")
Xtrain = torch.tensor(xtrain, dtype=torch.float32, device=device)
Xtrue = torch.tensor(xtrue, dtype=torch.float32, device=device)
Utrain = utrain.to(device)

In [45]:
model = PINN(input_size=2, hidden_size=30, output_size=1).to(device)
model.train_model(Xtrain, Utrain, epochs=10000)

  0%|          | 11/10000 [00:00<03:18, 50.21it/s]

Epoch 0, Loss: 474.864899, k²: 24.977310


 10%|█         | 1008/10000 [00:22<03:21, 44.64it/s]

Epoch 1000, Loss: 21.519346, k²: 8.801414


 20%|██        | 2009/10000 [00:43<02:41, 49.52it/s]

Epoch 2000, Loss: 0.272653, k²: 1.451031


 30%|███       | 3009/10000 [01:05<02:19, 49.96it/s]

Epoch 3000, Loss: 0.014951, k²: 0.672559


 40%|████      | 4007/10000 [01:27<02:11, 45.72it/s]

Epoch 4000, Loss: 0.003180, k²: 0.480523


 50%|█████     | 5006/10000 [01:48<02:01, 41.11it/s]

Epoch 5000, Loss: 0.001198, k²: 0.392909


 60%|██████    | 6006/10000 [02:10<01:19, 50.37it/s]

Epoch 6000, Loss: 0.000606, k²: 0.335920


 70%|███████   | 7005/10000 [02:32<01:01, 48.50it/s]

Epoch 7000, Loss: 0.000348, k²: 0.293149


 79%|███████▉  | 7897/10000 [02:51<00:45, 46.09it/s]


KeyboardInterrupt: 