In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [3]:
# definition of the PINN neural network
class PINN(nn.Module):
    def __init__(self, layers: list, activation: str = 'tanh'):
        super(PINN, self).__init__()
        self.net = nn.Sequential()
        for i in range(len(layers) - 1):
            self.net.add_module(f"layer_{i}", nn.Linear(layers[i], layers[i+1]))
            if i < len(layers) - 2:
                if activation.lower() in ['tanh', 'sigmoid', 'relu', 'softplus', 'sin']:
                    if activation.lower() == 'tanh':
                        self.net.add_module(f"activation_{i}", nn.Tanh())
                    elif activation.lower() == 'sigmoid':
                        self.net.add_module(f"activation_{i}", nn.Sigmoid())
                    elif activation.lower() == 'relu':
                        self.net.add_module(f"activation_{i}", nn.ReLU())
                    elif activation.lower() == 'softplus':
                        self.net.add_module(f"activation_{i}", nn.Softplus())
                    elif activation.lower() == 'sin':
                        class Sin(nn.Module):
                            def forward(self, x):
                                return torch.sin(x)
                        self.net.add_module(f"activation_{i}", Sin())
                else:
                    try:
                        self.net.add_module(f"activation_{i}", nn.__dict__[activation]())
                    except KeyError:
                        raise ValueError(f"Activation function '{activation}' is not recognized.")
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

In [4]:
class TrainParams:
    def __init__(self, optimizer: torch.optim.Optimizer, activation: str, ic: list, layers: list):
        self.optimizer = optimizer
        self.activation = activation
        self.x_ic = ic[0]
        self.y_ic = ic[1]
        self.layers = layers
    def write_results(self, epochs: np.ndarray, loss: np.ndarray, time: float):
        self.epochs = epochs
        self.loss = loss
        self.time = time
    def write_results_long(self, epochs_arr: np.ndarray, time_arr: np.ndarray):
        self.epochs_long = epochs_arr
        self.time_long = time_arr

In [5]:
def ode_residual(x: torch.Tensor, y_pred: torch.Tensor) -> torch.Tensor:
    # Example: dy/dx + y = 0
    y_pred_x = torch.autograd.grad(y_pred, x, grad_outputs=torch.ones_like(y_pred), create_graph=True)[0]
    return y_pred_x + y_pred

In [8]:
# training loop function
def train(params: TrainParams, ode_residual: callable, length: str = 'flash', save_results: bool = False):
    # hyperparameters
    model = PINN(params.layers, params.activation)
    optimizer = params.optimizer(model.parameters(), lr=1e-3)
    
    # define number of epochs based on length
    if length == 'flash':
        epochs = 2500
    elif length == 'standard':
        epochs = 10000
    elif length == 'long':
        epochs = 30000
    else:
        raise ValueError("Length must be 'flash', 'standard', or 'long'.")

    # initial condition
    x_ic = torch.tensor([[params.x_ic]])
    y_ic = torch.tensor([[params.y_ic]]) 

    # training data (collocation points)
    x_colloc = torch.linspace(0, 1, 100).view(-1, 1)
    x_colloc.requires_grad = True

    if save_results:
        epochs_record = np.zeros(epochs)
        loss_record = np.zeros(epochs)
        if length == 'long':
            epochs_arr = np.zeros(epochs // 100)
            time_arr = np.zeros(epochs // 100)

    time_start = time.time()
    for epoch in range(epochs):
        optimizer.zero_grad()
        y_pred = model(x_colloc)
        res = ode_residual(x_colloc, y_pred)
        loss_ode = torch.mean(res**2)

        # Initial condition loss
        y_ic_pred = model(x_ic)
        loss_ic = torch.mean((y_ic_pred - y_ic)**2)

        loss = loss_ode + loss_ic
        loss.backward()
        optimizer.step()

        if save_results:
            epochs_record[epoch] = epoch
            loss_record[epoch] = loss.item()
        
        if length == 'long':
            if epoch % 100 == 0:
                epochs_arr[epoch // 100] = epoch
                time_arr[epoch // 100] = time.time() - time_start
                
        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}, Time elapsed: {time.time() - time_start:.2f}s")
    time_end = time.time()
    time_elapsed = time_end - time_start
    print(f"Training completed in {time_elapsed:.2f}s")


In [9]:
params_test = TrainParams(
    optimizer=optim.Adam,
    activation='Softmax',
    ic=[0.0, 1.0],  # Example: y(0) = 1
    layers=[1, 20, 20, 20, 1]
)
train(params_test, ode_residual, length='standard', save_results=True)

Epoch 0, Loss: 0.855414092540741, Time elapsed: 0.00s
Epoch 100, Loss: 0.6000888347625732, Time elapsed: 0.21s
Epoch 200, Loss: 0.5174604654312134, Time elapsed: 0.38s
Epoch 300, Loss: 0.4989248514175415, Time elapsed: 0.56s
Epoch 400, Loss: 0.48416608572006226, Time elapsed: 0.73s
Epoch 500, Loss: 0.4253869950771332, Time elapsed: 0.91s
Epoch 600, Loss: 0.2541835308074951, Time elapsed: 1.15s
Epoch 700, Loss: 0.11364398896694183, Time elapsed: 1.32s
Epoch 800, Loss: 0.07555553317070007, Time elapsed: 1.51s
Epoch 900, Loss: 0.054641786962747574, Time elapsed: 1.68s
Epoch 1000, Loss: 0.03945804014801979, Time elapsed: 1.85s
Epoch 1100, Loss: 0.028378108516335487, Time elapsed: 2.02s
Epoch 1200, Loss: 0.02045154944062233, Time elapsed: 2.19s
Epoch 1300, Loss: 0.014885226264595985, Time elapsed: 2.35s
Epoch 1400, Loss: 0.010996716096997261, Time elapsed: 2.52s
Epoch 1500, Loss: 0.008251757360994816, Time elapsed: 2.68s
Epoch 1600, Loss: 0.006270250771194696, Time elapsed: 2.85s
Epoch 1700