In [14]:
import sys
from typing import Union
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import copy

NUM_EPOCHS = 50000
LEARNING_RATE = 1e-4
NUM_COLLOCATION = 10000
PATIENCE = 100
THRESHOLD = 1e-3
EARLY_STOPPING_EPOCH = 1
NUM_SAMPLES = 1000

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def numpy_to_tensor(array):
    return torch.tensor(array, requires_grad=True, dtype=torch.float32).to(DEVICE).reshape(-1, 1)

def grad(outputs, inputs):
    return torch.autograd.grad(outputs, inputs, grad_outputs=torch.ones_like(outputs), create_graph=True)

def generate_dataset(num_samples: int = NUM_SAMPLES):
    """Generate dataset of random multiple initial conditions and control actions"""
    df = pd.DataFrame(columns=['t', 'X', 'S', 'V'])
    df['X'] = np.random.uniform(0.1, 5)
    df['S'] = np.random.uniform(5, 30)
    df['V'] = np.random.uniform(0.5, 2)
    df['F'] =  np.random.uniform(0.5, 2)
    df['t'] = 0.0 # initial time
    
    t_train = numpy_to_tensor(df['t'].values)
    X_train = numpy_to_tensor(df['X'].values)
    S_train = numpy_to_tensor(df['S'].values)
    V_train = numpy_to_tensor(df['V'].values)
    F_train = numpy_to_tensor(df['F'].values)
    
    in_train = torch.cat([t_train, X_train, S_train, V_train, F_train], dim=1)
    out_train = torch.cat([X_train, S_train, V_train], dim=1)
    return in_train, out_train

class PINN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PINN, self).__init__()
        self.input = nn.Linear(input_dim, 128)
        self.fc1 = nn.Linear(128, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.fc3 = nn.Linear(1024, 128)
        self.output = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.input(x))
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.output(x)
        return x

In [15]:
# Time parameters
T_START = 0
T_END = 80
TIME_RANGE = int(T_END - T_START) # Absolute time 

# Kinetic parameters
MU_MAX = 0.20 # 1/h
K_S = 1       # g/l
Y_XS = 0.5    # g/g
Y_PX = 0.2    # g/g
S_F = 10      # g/l

def loss_fn(net: nn.Module) -> torch.Tensor:
    t_col = numpy_to_tensor(np.random.uniform(T_START, T_END, NUM_COLLOCATION))
    X0_col = numpy_to_tensor(np.random.uniform(0.1, 5, NUM_COLLOCATION))
    S0_col = numpy_to_tensor(np.random.uniform(5, 30, NUM_COLLOCATION))
    V0_col = numpy_to_tensor(np.random.uniform(0.5, 2, NUM_COLLOCATION))
    F_col = numpy_to_tensor(np.random.uniform(0.5, 2, NUM_COLLOCATION))
    
    u_col = torch.cat([t_col, X0_col, S0_col, V0_col, F_col], dim=1)

    preds = net.forward(u_col)

    X_pred = preds[:, 0].view(-1, 1)
    S_pred = preds[:, 1].view(-1, 1)
    V_pred = preds[:, 2].view(-1, 1)

    dXdt_pred = grad(X_pred, t_col)[0]
    dSdt_pred = grad(S_pred, t_col)[0]
    dVdt_pred = grad(V_pred, t_col)[0]

    mu = MU_MAX * S_pred / (K_S + S_pred)

    error_dXdt = dXdt_pred - mu * X_pred + X_pred * F_col / V0_col
    error_dSdt = dSdt_pred + mu * X_pred / Y_XS - F_col / V0_col * (S_F - S_pred)
    error_dVdt = dVdt_pred - F_col
    
    error_ode = 1/3 * torch.mean(error_dXdt**2) + 1/3 * torch.mean(error_dSdt**2) + 1/3 * torch.mean(error_dVdt**2)

    return error_ode

def main(in_train: torch.Tensor, out_train: torch.Tensor, verbose: int = 100):
    
    net = PINN(input_dim=5, output_dim=3).to(DEVICE)
    optimizer = torch.optim.Adam(net.parameters(), lr=LEARNING_RATE)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2500, gamma=0.7)

    # Loss weights
    w_data, w_ode, w_ic = 1.0, 1.0, 1.0

    # Initialize early stopping variables
    best_loss = float("inf")
    best_model_weights = None
    patience = PATIENCE
    threshold = THRESHOLD

    for epoch in range(NUM_EPOCHS):
        optimizer.zero_grad()
        preds = net.forward(in_train)
        X_pred = preds[:, 0].view(-1, 1)
        S_pred = preds[:, 1].view(-1, 1)
        V_pred = preds[:, 2].view(-1, 1)
        loss_X = nn.MSELoss()(X_pred, out_train[:, 0].view(-1, 1))
        loss_S = nn.MSELoss()(S_pred, out_train[:, 1].view(-1, 1))
        loss_V = nn.MSELoss()(V_pred, out_train[:, 2].view(-1, 1))
        loss_data = 0.33 * (loss_X + loss_S + loss_V)

        loss_ode = loss_fn(net)

        loss = w_data * loss_data + w_ode * loss_ode
        loss.backward()
        optimizer.step()
        scheduler.step()

        if epoch % verbose == 0:
            print(f"Epoch {epoch}, Loss_data: {loss_data.item():.4f}, Loss_ode: {loss_ode.item():.4f}")
            # Print the current learning rate of the optimizer
            for param_group in optimizer.param_groups:
                print("Current learning rate: ", param_group["lr"])

        if epoch >= EARLY_STOPPING_EPOCH:
            if loss < best_loss - threshold:
                best_loss = loss
                best_model_weights = copy.deepcopy(net.state_dict())
                patience = 1000
            else:
                patience -= 1
                if patience == 0:
                    print(f"Early stopping at epoch {epoch}")
                    net.load_state_dict(best_model_weights)
                    break

    return net

In [18]:
in_train, out_train = generate_dataset(num_samples=10)

In [19]:
net = main(in_train, out_train)

Epoch 0, Loss_data: nan, Loss_ode: 56.1005
Current learning rate:  0.0001


KeyboardInterrupt: 