# Lotka Volterra UPINN

In [2]:
import numpy as np
import torch
import torch.nn as nn
from scipy.integrate import odeint
from tqdm import tqdm

# Plotly
import plotly.graph_objects as go
import plotly.io as pio
pio.templates.default = "plotly_dark"

Consider the Lotka-Volterra equations, which describe the dynamics of a predator-prey system:

\begin{align}
\frac{dx}{dt} &= \alpha x - \beta x y, \\
\frac{dy}{dt} &= - \delta y + \gamma x y,
\end{align}

where $x$ is the number of prey, $y$ is the number of predators, and $\alpha$, $\beta$, $\gamma$, and $\delta$ are positive constants.

### Problem definition
Using the example from Podina et al. (2023).

In [3]:
class LotkaVolterra:
    def __init__(self, alpha, beta, gamma, delta, X0):
        self.alpha, self.beta, self.gamma, self.delta = alpha, beta, gamma, delta
        self.X0 = X0

    def f(self, X, t, alpha, beta, gamma, delta):
        x, y = X
        dxdt = alpha*x - beta*x*y
        dydt = -delta*y + gamma*x*y
        return [dxdt, dydt]
    
    def solve(self, t):
        out = odeint(self.f, self.X0, t, (self.alpha, self.beta, self.gamma, self.delta))
        return out

In [4]:
# alpha, beta, gamma, delta = 1.3, 0.9, 0.8, 1.8
# x0, y0 = 0.44249296, 4.6280594

alpha, beta, gamma, delta = 2/3, 4/3, 1.0, 1.0
x0, y0 = 1.0, 1.0

LV = LotkaVolterra(alpha, beta, gamma, delta, [x0, y0])

### Generate data

In [5]:
# time_int = [0, 3]
time_int = [0, 20]
N_f = 1000
t_f = torch.linspace(time_int[0], time_int[1], N_f)

out = LV.solve(t_f)
x_f, y_f = out[:, 0], out[:, 1]
t_f.requires_grad = True

In [6]:
def sample_with_noise(N, t, x, y, epsilon=5e-3):

    # Check if the shapes are correct and feasible amount of points are requested
    assert x.shape == y.shape or N <= t.shape[0]

    # Calculate the mean of the data
    x_bar, y_bar = np.mean(x), np.mean(y)

    # Sample N evenly spaced points from the data
    idx = np.arange(0, t.shape[0], t.shape[0]//N)
    t, x, y = t[idx], x[idx], y[idx]

    # Add noise to the data
    x_noise = x + epsilon * x_bar * np.random.randn(*x.shape)
    y_noise = y + epsilon * y_bar * np.random.randn(*y.shape)

    return t, torch.tensor(x_noise), torch.tensor(y_noise)

t_sample, x_sample, y_sample = sample_with_noise(30, t_f, x_f, y_f)


In [7]:
# Visualize
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_f.detach().numpy(), y=x_f, mode='lines', name='x: true', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=t_f.detach().numpy(), y=y_f, mode='lines', name='y: true', line=dict(color='red')))
fig.add_trace(go.Scatter(x=t_sample.detach().numpy(), y=x_sample, mode='markers', name='x: noisy sample', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=t_sample.detach().numpy(), y=y_sample, mode='markers', name='y: noisy sample', line=dict(color='red')))
fig.update_layout(title='Lotka-Volterra Model', xaxis_title='Time', yaxis_title='Population')
fig.show()

### Define Neural Networks

In [8]:
class ScalingLayer(nn.Module):
    def __init__(self, scale_init_value=1, bias_init_value=0):
        super().__init__()
        self.scale = nn.Parameter(torch.FloatTensor([scale_init_value]))
        self.bias = nn.Parameter(torch.FloatTensor([bias_init_value]))

    def forward(self, input):
        return input * self.scale + self.bias
    

class SurrogateNet(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim):
        super().__init__()
        
        self.layers = nn.ModuleList()
        self.layers.append(ScalingLayer())
        for hidden_dim in hidden_dims:
            self.layers.append(nn.Linear(input_dim, hidden_dim))
            self.layers.append(nn.Sigmoid())
            input_dim = hidden_dim
        self.layers.append(nn.Linear(input_dim, output_dim))

    def forward(self, t):
        for layer in self.layers:
            t = layer(t)
        return t
    

class ResidualNet(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim):
        super().__init__()
        
        self.layers = nn.ModuleList()
        self.layers.append(ScalingLayer())
        for hidden_dim in hidden_dims:
            self.layers.append(nn.Linear(input_dim, hidden_dim))
            self.layers.append(nn.Sigmoid())
            input_dim = hidden_dim
        self.layers.append(nn.Linear(input_dim, output_dim))

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

### Instantiate the model

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

f_known = SurrogateNet(1, [64, 64], 2)
f_known.to(device)
f_unknown = ResidualNet(2, [16, 16], 2)
f_unknown.to(device)

ResidualNet(
  (layers): ModuleList(
    (0): ScalingLayer()
    (1): Linear(in_features=2, out_features=16, bias=True)
    (2): Sigmoid()
    (3): Linear(in_features=16, out_features=16, bias=True)
    (4): Sigmoid()
    (5): Linear(in_features=16, out_features=2, bias=True)
  )
)

### Training loop

In [72]:
epochs = 30000
lr = 1e-3
optimizer = torch.optim.Adam([*f_known.parameters(), *f_unknown.parameters()], lr=lr)

# Weight scaling for the loss function
lambda1, lambda2, lambda3 = 1, 1, 1

progress_bar = tqdm(range(epochs), desc="Training", unit="epoch")

# Move the data to the device and convert to float
t_sample = t_sample.to(device).reshape(-1, 1).float()
x_sample = x_sample.to(device).reshape(-1, 1).float()
y_sample = y_sample.to(device).reshape(-1, 1).float()
t_f = t_f.to(device).reshape(-1, 1).float()

for epoch in progress_bar:

    # Stop training of residual net after 30000 epochs
    if epoch == 30000:
        for param in f_unknown.parameters():
            param.requires_grad = False
        optimizer = torch.optim.Adam(f_known.parameters(), lr=lr)

        # Extend time beyond the training data
        t_f = torch.linspace(time_int[0], time_int[1]+5, N_f).to(device).reshape(-1, 1).float()
        t_f.requires_grad = True

    optimizer.zero_grad()
    
    # Initial condition loss
    x0, y0 = LV.X0
    t0 = torch.tensor([[0.0]], device=device).float()
    X0_pred = f_known(t0)
    ic_loss = nn.MSELoss()(X0_pred, torch.tensor([[x0, y0]], device=device).float())

    # Known dynamics loss
    X_f = f_known(t_f)
    x_f, y_f = X_f[:, 0:1], X_f[:, 1:2]
    dxdt = torch.autograd.grad(x_f, t_f, torch.ones_like(x_f), create_graph=True)[0]
    dydt = torch.autograd.grad(y_f, t_f, torch.ones_like(y_f), create_graph=True)[0]

    res_pred = f_unknown(X_f)
    res_x, res_y = res_pred[:, 0:1], res_pred[:, 1:2]
    # dudt = torch.hstack([dxdt - LV.alpha*x_f + LV.beta*x_f*y_f - res_x,
    #                      dydt + LV.delta*y_f                   - res_y ])
    dudt = torch.hstack([dxdt - LV.alpha*x_f + LV.beta*x_f*y_f - res_x,
                         dydt                                  - res_y ])

    pde_loss = torch.mean(dudt[:, 0]**2) + torch.mean(dudt[:, 1]**2)

    # Data loss
    X_pred = f_known(t_sample)
    data_loss = nn.MSELoss()(X_pred, torch.hstack([x_sample, y_sample]))

    # Total loss
    loss = lambda1*ic_loss + lambda2*pde_loss + lambda3*data_loss
    progress_bar.set_postfix({'Loss': loss.item(), 'IC Loss': ic_loss.item(), 'PDE Loss': pde_loss.item(), 'Data Loss': data_loss.item()})
    loss.backward(retain_graph=True)
    optimizer.step()

# Save the model
torch.save(f_known.state_dict(), 'models/lotka-volterra/f_known1.pt')
torch.save(f_unknown.state_dict(), 'models/lotka-volterra/f_unknown1.pt')

Training: 100%|██████████| 30000/30000 [04:34<00:00, 109.11epoch/s, Loss=9.07e-5, IC Loss=1.23e-6, PDE Loss=4.69e-5, Data Loss=4.26e-5]    


In [10]:
# Load the model
f_known.load_state_dict(torch.load('models/lotka-volterra/f_known.pt'))
f_unknown.load_state_dict(torch.load('models/lotka-volterra/f_unknown.pt'))


You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.



RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [82]:
def to_numpy(tensor):
    if type(tensor) != torch.Tensor:
        return tensor
    return tensor.squeeze().detach().cpu().numpy()

In [83]:
with torch.no_grad():
    t_f = torch.linspace(0, 20, 1000).reshape(-1, 1).to(device).float()
    X_pred = f_known(t_f)
    x_pred, y_pred = to_numpy(X_pred[:, 0:1]), to_numpy(X_pred[:, 1:2])
    res_pred = f_unknown(X_pred)
    res_x, res_y = to_numpy(res_pred[:, 0:1]), to_numpy(res_pred[:, 1:2])
    t_f = to_numpy(t_f)
    t_sample = to_numpy(t_sample)
    x_sample = to_numpy(x_sample)
    y_sample = to_numpy(y_sample)

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=t_f, y=x_pred, mode='lines', name='x: prediction', line=dict(color='blue')))
    fig.add_trace(go.Scatter(x=t_f, y=y_pred, mode='lines', name='y: prediction', line=dict(color='red')))
    fig.add_trace(go.Scatter(x=t_sample, y=x_sample, mode='markers', name='x: noisy sample', line=dict(color='blue')))
    fig.add_trace(go.Scatter(x=t_sample, y=y_sample, mode='markers', name='y: noisy sample', line=dict(color='red')))
    fig.update_layout(title='Lotka-Volterra Model', xaxis_title='Time', yaxis_title='Population')
    fig.show()

In [84]:
y_missing = -LV.delta * y_pred + LV.gamma * x_pred * y_pred

# Plot the residuals
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_f, y=res_x, mode='lines', name='G1', line=dict(color='blue')))
fig.add_trace(go.Scatter(x=t_f, y=res_y, mode='lines', name='G2', line=dict(color='red')))
fig.add_trace(go.Scatter(x=t_f, y=y_missing, mode='lines', name='γ*x*y', line=dict(color='green')))
fig.update_layout(title='Residuals', xaxis_title='Time', yaxis_title='Residual')
fig.show()

# Recover missing term of the Lotka-Volterra equations with PySINDy

In [77]:
# PySINDy
import pysindy as ps

In [90]:
# Get relevant data
X = to_numpy(X_pred)
dX = to_numpy(res_pred)
t = t_f

# Define and fit PySINDy models for each residual
sindy_model = ps.SINDy(feature_library=ps.PolynomialLibrary(degree=2), optimizer=ps.STLSQ(threshold=0.2), feature_names=['x', 'y'])

# Fit models for the unknown dynamics
sindy_model.fit(x=X, x_dot=dX, t=t)

# Print the model
sindy_model.print()


(x)' = 0.000
(y)' = -0.549 1 + 0.727 x + 0.029 y + -0.339 x^2 + 1.040 x y + -0.942 y^2



Sparsity parameter is too big (0.2) and eliminated all coefficients

