In [1]:
import numpy as np
import matplotlib.pyplot as plt
import os
import pickle
import sys
import time
import torch

from scipy.integrate import solve_ivp

PROJECT_ROOT = os.path.abspath(
    os.path.join(os.getcwd(), os.pardir)
)
sys.path.append(PROJECT_ROOT)

import sample_points
from tcpinn import MLP, TcPINN
from plot import plot_solution

np.random.seed(1)

In [2]:
alpha = 1.5
beta = 1.75
gamma = 0.5
delta = 0.75

In [3]:
if torch.cuda.is_available():
    device = torch.device("cuda")

else:
    device = torch.device("cpu")

In [21]:
class LotkaVolterraInverse(TcPINN):
    """
    A tcPINN implementation for a Lotka-Volterra inverse problem.
    """
    def __init__(
        self, layers, T, X_pinn=None, X_semigroup=None, X_smooth=None, X_data=None, data=None,
        w_pinn=1., w_semigroup=1., w_smooth=1., w_data=1.
    ):
        """
        For inverse problems, one has to attach the trainable
        ODE parameters to the multilayer perceptron (MLP). The least confusing
        way to do this with the current implementation is to re-initialize 
        the MLP and the optimizer.
        """
        super().__init__(
            layers, T, X_pinn, X_semigroup, X_smooth, X_data, data,
            w_pinn, w_semigroup, w_smooth, w_data
        )
        self.is_inverse = True
        mlp = MLP(layers)
        self.mlp = self._init_ode_parameters(mlp).to(device)
        self.optimizer = torch.optim.LBFGS(
            self.mlp.parameters(), lr=1., max_iter=50000, max_eval=50000, 
            history_size=10, tolerance_grad=1e-5, tolerance_change=np.finfo(float).eps,
            line_search_fn="strong_wolfe"
        )
        self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode="min", min_lr=1e-4, verbose=True
        )
    
    
    def _init_ode_parameters(self, mlp):

        mlp.alpha = torch.nn.Parameter(3 * torch.rand(1, requires_grad=True)[0])
        mlp.beta = torch.nn.Parameter(3 * torch.rand(1, requires_grad=True)[0])
        mlp.gamma = torch.nn.Parameter(3 * torch.rand(1, requires_grad=True)[0])
        mlp.delta = torch.nn.Parameter(3 * torch.rand(1, requires_grad=True)[0])
        self.history["alpha"] = []
        self.history["beta"] = []
        self.history["gamma"] = []
        self.history["delta"] = []
        
        return mlp
    
    
    def _loss_pinn(self):
        """
        The ODE-specific standard PINN loss.
        """
        y = self.net_y(self.t_pinn, self.y_pinn)
        deriv = self.net_derivative(self.t_pinn, self.y_pinn)
        
        loss1 = torch.mean(
            (deriv[0] - self.mlp.alpha * y[:, 0:1] + self.mlp.beta * (y[:, 0:1] * y[:, 1:2])) ** 2
        )
        loss2 = torch.mean(
            (deriv[1] - self.mlp.delta * (y[:, 0:1] * y[:, 1:2]) + self.mlp.gamma * y[:, 1:2]) ** 2
        )
        loss = self.w_pinn * (loss1 + loss2)

        return loss

### Setup Training Data

In [22]:
def rhs_lotka_volterra(t, r, alpha, beta, gamma, delta):

    x, y = r
    dx_t = alpha * x - beta * x * y
    dy_t = delta * x * y - gamma * y

    return dx_t, dy_t


def get_solution(max_t, delta_t, init_val, alpha, beta, gamma, delta):
    
    times = np.linspace(0, max_t, int(max_t / delta_t) + 1)
    sol = solve_ivp(
        rhs_lotka_volterra, [0, float(max_t)], init_val, t_eval=times,
        args=(alpha, beta, gamma, delta), rtol=1e-10, atol=1e-10
    )

    return sol.y.T


def sample_data(t_data, y_data, alpha, beta, gamma, delta):
    
    data = np.zeros((len(y_data), 2))

    for i, (t, init_val) in enumerate(zip(t_data, y_data)):

        data[i] = get_solution(
            max_t=t[0], delta_t=t[0], init_val=init_val,
            alpha=alpha, beta=beta, gamma=gamma, delta=delta
        )[-1]
    
    return data

In [23]:
layers = [3] + 3 * [128] + [2]
T = 1
max_y0 = 5

n_pinn = 10
t_pinn = np.random.uniform(0, T, (n_pinn, 1))
y_pinn = np.random.uniform(0, max_y0 , (n_pinn, 2))
X_pinn = np.hstack([t_pinn, y_pinn])

n_semigroup = 10
st_semigroup = sample_points.uniform_triangle_2d(n_semigroup, T)
y_semigroup = np.random.uniform(0, max_y0, (n_semigroup, 2))
X_semigroup = np.hstack([st_semigroup, y_semigroup])

n_smooth = 10
t_smooth = np.random.uniform(0, T, (n_smooth, 1))
y_smooth = np.random.uniform(0, max_y0, (n_smooth, 2))
X_smooth = np.hstack([t_smooth, y_smooth])

n_data = 10
t_data = np.random.uniform(0, T, (n_data, 1))
y_data = np.random.uniform(0, max_y0, (n_data, 2))
X_data = np.hstack([t_data, y_data])
data = sample_data(t_data, y_data, alpha, beta, gamma, delta)

In [24]:
model = LotkaVolterraInverse(layers, T, X_pinn=X_pinn, X_semigroup=X_semigroup, X_smooth=X_smooth, X_data=X_data, data=data)

In [25]:
%%time
model.train()

CPU times: user 14.2 s, sys: 31.1 ms, total: 14.2 s
Wall time: 2.08 s


In [26]:
path = os.getcwd()

with open(f"{path}/model_lotka_volterra_inverse.pkl", "wb") as handle:
    pickle.dump(model, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open(f"{path}/model_lotka_volterra_inverse.pkl", "rb") as f:
    model = pickle.load(f)

In [None]:
# plot inverse problem convergence
n_iterations = model.iter
alpha_true = np.full(n_iterations, alpha)
beta_true = np.full(n_iterations, beta)
gamma_true = np.full(n_iterations, gamma)
delta_true = np.full(n_iterations, delta)

_, ax = plt.subplots(figsize=(10, 4))
ax.plot(np.arange(n_iterations), model.history["alpha"], color="green", label="alpha")
ax.plot(np.arange(n_iterations), model.history["beta"], color="blue", label="beta")
ax.plot(np.arange(n_iterations), model.history["gamma"], color="red", label='gamma')
ax.plot(np.arange(n_iterations), model.history["delta"], color="orange", label="delta")
ax.plot(np.arange(n_iterations), alpha_true, color="black", linestyle="dashed")
ax.plot(np.arange(n_iterations), beta_true, color="black", linestyle="dashed")
ax.plot(np.arange(n_iterations), gamma_true, color="black", linestyle="dashed")
ax.plot(np.arange(n_iterations), delta_true, color="black", linestyle="dashed")
ax.spines[["top", "right"]].set_visible(False)
ax.set_xlabel("Iterations")

plt.legend()
plt.show()

## Predict and Plot the Solution

In [29]:
y0 = np.array([0.1, 1.0]) #np.random.uniform(0, max_y0 , 2)
max_t = 100
delta_t = 0.05
times = np.linspace(0, max_t, int(max_t / delta_t) + 1)

tc_solution = model.predict_tc(max_t, delta_t, y0)
true_solution = get_solution(max_t, delta_t, y0, alpha, beta, gamma, delta)

In [None]:
ax = plot_solution(times, true_solution, ax=None)
ax = plot_solution(
    times, tc_solution, ax=ax,
    component_kwargs=[{'color': "red", 'linestyle': "dashed"}, {'color': "orange", 'linestyle': "dashed"}]
)