# EpiPINN

Physics-informed neural network learning epidemiological parameters

### Antonio Jimenez AOJ268
### Ashton Cole AVC687

## Contents

- Definitions
- Experiments
    - Experiment 1: Parameter estimation with exact data
    - Experiment 2: Parameter estimation with noisy data
    - Experiment 3: Parameter estimation and forecasting with noisy limited data

## Definitions

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import random
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

In [None]:
class PINN(nn.Module):
    def __init__(self, hidden_size, depth):
        super().__init__()
        # input t
        layers = [nn.Linear(1, hidden_size), nn.Tanh()]
        for _ in range(depth - 1):
            layers.append(nn.Linear(hidden_size, hidden_size))
            layers.append(nn.Tanh())
        # Output layer with 5 units for (s, e, i, r, d)
        layers.append(nn.Linear(hidden_size, 5))
        # Add softmax to enforce all components are positive and sum to 1
        layers.append(nn.Softmax(dim=1))
        
        self.net = nn.Sequential(*layers)

    def forward(self, t):
        return self.net(t)

In [None]:
def caputo_l1_diff(psi, alpha, dt):
    n = len(psi)
    # The derivative at t=0 is undefined for the L1 scheme
    derivatives = [torch.zeros(1, device=psi.device)] 
    
    # Pre-compute the log of the gamma function part for stability
    log_gamma_term = torch.lgamma(2.0 - alpha)

    for i in range(1, n):
        # Make vector of k values from 0 to i-1
        k = torch.arange(i, dtype=torch.float32, device=psi.device)
        
        # Calculate weights c_k^(i) 
        weights = ((k + 1)**(1 - alpha) - k**(1 - alpha))
        
        # Get the differences psi(t_{i-k}) - psi(t_{i-k-1})
        psi_diffs = psi[i - k.long()] - psi[i - k.long() - 1]
        
        summation = torch.sum(weights * psi_diffs.squeeze())
        
        # Combine everything to get the derivative at time t_i
        deriv_at_i = (1.0 / (dt**alpha * torch.exp(log_gamma_term))) * summation
        derivatives.append(deriv_at_i.unsqueeze(0))
        
    return torch.cat(derivatives).unsqueeze(1)

In [None]:
class EpiPINN(nn.Module):
    def __init__(self, hidden_size, depth, initial_params):
        super().__init__()
        
        self.pinn = PINN(hidden_size, depth) 
        # trainable params
        self.raw_beta = nn.Parameter(torch.tensor([initial_params['beta']]))
        self.raw_sigma = nn.Parameter(torch.tensor([initial_params['sigma']]))
        self.raw_gamma = nn.Parameter(torch.tensor([initial_params['gamma']]))
        self.raw_mu = nn.Parameter(torch.tensor([initial_params['mu']]))
        # Init z_alpha such that the init alpha is close to 1.0
        self.z_alpha = nn.Parameter(torch.tensor([initial_params['z_alpha']])) # sigmoid(2.94) is approx 0.95
        
        self.min_alpha = initial_params['min_alpha'] # Example minimum value for alpha
        self.dt = initial_params['dt']

    def beta(self):
        return nn.softplus(self.raw_beta)

    def sigma(self):
        return nn.softplus(self.raw_sigma)

    def gamma(self):
        return nn.softplus(self.raw_gamma)
        
    def mu(self):
        return nn.softplus(self.raw_mu)

    def alpha(self):
        # Restrict alpha to a specific range, (min_alpha, 1.0] 
        return self.min_alpha + (1.0 - self.min_alpha) * torch.sigmoid(self.z_alpha)
    
    def forward(self, t):
        self.pinn(t)

    def get_loss_ic(self, ts, ic, y_pred=None):
        """Get initial condition loss of model.

        Arguments:
            ts (torch.tensor): Time points for the model, assuming t[0] is the initial time. Only t[0] is needed, but the tensor dimensions must be consistent.
            ic (torch.tensor): The initial state to enforce.
            y_pred=None (torch.tensor): Predictions at time points, if already computed. Only y[0] is needed, but the tensor dimensions must be consistent.

        Returns:
            squared l2 norm of initial condition error
        """
        # IC loss
        t_initial = ts[0].unsqueeze(0) # get t_0
        y_initial_pred = self.forward(t_initial) if y_pred == None else y_pred[0, :].unsqueeze(0)
        loss_ic = nn.MSELoss(y_initial_pred - ic)

    def get_loss_data(self, t_data, y_data, y_data_pred=None):
        """Get data loss of model.

        Arguments:
            t_data (torch.tensor): Training data times.
            y_data (torch.tensor): Corresponding state data.
            y_data_pred=None (torch.tensor): Predictions at data points, if already computed.

        Returns:
            MSE loss (squared l2 norm) of data error
        """
        # Data Loss
        y_data_pred = self.forward(t_data) if y_data_pred == None else y_data_pred
        loss_data = nn.MSELoss(y_data_pred - y_data)

    def get_loss_phys(self, t_colloc, y_colloc_pred=None):
        """Get physics loss of model.

        Arguments:
            t_colloc (torch.tensor): Times at which to compute the loss.
            y_colloc_pred=None (torch.tensor): Predictions at collocation points, if already computed.

        Returns:
            squared l2 norm of residual
        """
        # Phys Loss
        y_colloc_pred = self.forward(t_colloc)
        s,e,i,r,d = y_colloc_pred.unbind(1)
        ds_dt = caputo_l1_diff(s, self.alpha, self.dt)
        de_dt = caputo_l1_diff(e, self.alpha, self.dt)
        di_dt = caputo_l1_diff(i, self.alpha, self.dt)
        dr_dt = caputo_l1_diff(r, self.alpha, self.dt)
        dd_dt = caputo_l1_diff(d, self.alpha, self.dt)

        # calculate RHS of equation 4
        num_living =  1 - d
        f_s = -self.beta() * s * i / num_living
        f_e = (self.beta() * s * i / num_living) - self.sigma() * e
        f_i = (self.sigma() * e) - (self.gamma()+ self.mu()) * i
        f_r = self.gamma() * i
        f_d = self.mu() * i

        # calc residuals (LHS - RHS = 0)
        residual_s = ds_dt - f_s
        residual_e = de_dt - f_e
        residual_i = di_dt - f_i
        residual_r = dr_dt - f_r
        residual_d = dd_dt - f_d

        all_residuals = torch.cat([residual_s, residual_e, residual_i, residual_r, residual_d], dim=1)
        loss_phys = torch.mean(all_residuals**2)
        return loss_phys

In [None]:
def train_stage1(model, ts, ys, t_colloc, ic, optimizer, epochs=1000, pr=0):
    """Stage one of EpiPINN training process.

    In this stage, only the weights of the neural network are trained to minimize the data and initial condition loss.

    Arguments:
        model (EpiPINN): An instantiated fractional SEIRD model to train.
        ts (torch.tensor): Time values for time-series data.
        ys (torch.tensor): States (s, e, i, r, d) for time-series data.
        optimizer: Pytorch training optimizer.
        epochs=1000 (Int): How many epochs to perform gradient descent.
        pr=0 (Int): Print progress every pr epochs. If 0, nothing is printed.

    Returns:
        losses, losses_data, losses_ic, losses_phys
    """
    losses = []
    losses_data = []
    losses_ic = []
    losses_phys = []

    # Ensure epidemiological parameters are not trained
    model.raw_beta.requires_grad = False
    model.raw_sigma.requires_grad = False
    model.raw_gamma.requires_grad = False
    model.raw_mu.requires_grad = False
    model.z_alpha.requires_grad = False

    for epoch in range(epochs):
        predictions = model(ts)
        
        # Compute losses separately, then combine
        loss_data = get_loss_data(ts, predictions)
        loss_ic = get_loss_ic(t_colloc, ic)
        loss_phys = get_loss_phys(t_colloc)
        loss = loss_data + loss_ic # Physics is not used for gradient descent

        # Record losses
        losses.append(loss + loss_phys) # Physics is recored, not mimimized
        losses_data.append(loss_data)
        losses_ic.append(loss_ic)
        losses_phys.append(loss_phys)

        # Adjust weights to minimize loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print progress if desired
        if pr != 0 and (epoch + 1) % pr == 0:
            print(f'Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.6f}')
        
    return losses, losses_data, losses_ic, losses_phys

In [None]:
def train_stage2(model, ts, ys, t_colloc, ic, optimizer, epochs=1000, pr=0):
    """Stage two of EpiPINN training process.

    In this stage, both the weights of the neural network and epidemiological parameters are trained to minimize the data, initial condition, and physics losses.

    Arguments:
        model (EpiPINN): An instantiated fractional SEIRD model to train.
        ts (torch.tensor): Time values for time-series data.
        ys (torch.tensor): States (s, e, i, r, d) for time-series data.
        optimizer: Pytorch training optimizer.
        epochs=1000 (Int): How many epochs to perform gradient descent.
        pr=0 (Int): Print progress every pr epochs. If 0, nothing is printed.

    Returns:
        losses, losses_data, losses_ic, losses_phys
    """
    losses = []
    losses_data = []
    losses_ic = []
    losses_phys = []

    # Ensure epidemiological parameters are trained
    model.raw_beta.requires_grad = True
    model.raw_sigma.requires_grad = True
    model.raw_gamma.requires_grad = True
    model.raw_mu.requires_grad = True
    model.z_alpha.requires_grad = True

    for epoch in range(epochs):
        predictions = model(ts)
        
        # Compute losses separately, then combine
        loss_data = get_loss_data(ts, predictions)
        loss_ic = get_loss_ic(t_colloc, ic)
        loss_phys = get_loss_phys(t_colloc)
        loss = loss_data + loss_ic + loss_phys

        # Record losses
        losses.append(loss)
        losses_data.append(loss_data)
        losses_ic.append(loss_ic)
        losses_phys.append(loss_phys)

        # Adjust weights to minimize loss
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print progress if desired
        if pr != 0 and (epoch + 1) % pr == 0:
            print(f'Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.6f}')
        
    return losses, losses_data, losses_ic, losses_phys

## Experiments

### Experiment 1: Parameter estimation with exact data

This experiment is a recreation of the Mpox synthetic case found in the paper. We were unable to find the synthetic time series data $(s, e, i, r, d)(t)$ from the cited source, so we instead implemented a fractional ODE solver to generate data consistent with the provided epidemiological parameters.

In [None]:
# Data generation
alpha_true = 0.95 # Derivative fraction used for data
beta_true = 0.25 # Infection rate used for model
sigma_true = 0.13 # Incubation rate used for model
gamma_true = 0.052 # Recovery rate used for model
mu_true = 0.005 # Death rate used for model
y0 = np.array([0.99, 0.01, 0, 0, 0]) # Initial state: 1% exposed
t0 = 0
tf = 500 # 500 days
num_step = 500 # Good ground truth from tests

f = lambda t, y: np.array([
    - beta_true * (y[0] * y[3]) / (1 - y[4]),
    beta_true * (y[0] * y[3]) / (1 - y[4]) - sigma_true * y[1],
    sigma_true * y[1] - (gamma_true + mu_true) * y[2],
    gamma_true * y[2],
    mu_true * y[2]
])

ts, ys = caputo_euler(f, alpha, (t0, tf), num_step)

## TODO: turn data into tensors

In [None]:
# Define model
min_alpha_guess = 0.9 # Mimimum searched derivative fraction
alpha_guess = 1 # Derivative fraction used for model
z_alpha_guess = torch.logit(alpha_guess)
beta_guess = 0.25 # Infection rate used for model
sigma_guess = 0.13 # Incubation rate used for model
gamma_guess = 0.052 # Recovery rate used for model
mu_guess = 0.005 # Death rate used for model
hidden_size = 64 # Number of neurons per layer
depth = 3 # Number of layers
initial params = {
    "z_alpha": z_alpha_guess,
    "min_alpha": min_alpha_guess,
    "beta": beta_guess,
    "sigma": sigma_guess,
    "gamma": gamma_guess,
    "mu": mu_guess
}

model1 = EpiPINN(hidden_size, depth, initial_params)

In [None]:
# Load model if already generated

In [None]:
# Train model

# Stage 1: weights only, without considering physics in updates

# Stage 2: weights and epidemiological parameters, considering full loss

In [None]:
# Save model

In [None]:
# Compound plot of model results, and comparison against data

In [None]:
# Loss plot

### Experiment 2: Parameter estimation with noisy data

This experiment is also a recreation of the Mpox synthetic case found in the paper. Now random noise is added to the data to test the model's abitilty to recover the ideal model.