In [1]:
#Necessary imports
import numpy as np
from torch import nn
import torch
from scipy.integrate import odeint
#from google.colab import output as colab_output

# Setup device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

Using device: cuda


We will try to learn the flow map for the 2D ode with 1 parameter
$$\begin{cases} x' &= \mu - x^2 \\
y' &= -y
\end{cases}.$$

We want our training data to be of the form
$$(x_j^{(1)}, y_j^{(1)}, x_j^{(2)}, y_j^{(2)}, \mu_j)$$
so first lets collect the data using scipy

In [2]:
#ODE Definition
def ode(f, t, mu):
    x, y = f
    dfdt = [mu-x**2, -y]

    return dfdt

#Data generation
DATA_POINTS = 50000 #Number of datapoints (pairs) to be collected for each selection of mu
LOW_MU = -1.5 #Lower bound of the parameter to generate data
HIGH_MU = 1.5 #Upper bound....
MU_STEP_SIZE = 0.01 #How many steps in mu to take
LOW_STATE_SPACE = -2 #Lower bound of the statespace sampling
HIGH_STATE_SPACE = 2 #Upper bound...
rng = np.random.default_rng(seed=42)

#Linspace for time. Considers only one step of size DELTA
DELTA = 0.03
t=np.linspace(0,DELTA,2)
#Numerically solve the ODE for a single step with random
#initial conditions and parameter mu.
mus= np.arange(LOW_MU, HIGH_MU, MU_STEP_SIZE)
predictors = torch.zeros(size=(mus.size*DATA_POINTS, 3))
responses = torch.zeros(size=(mus.size*DATA_POINTS, 2))
for i in range(mus.size):
    percentage = 100*i/mus.size
    print(f'{percentage:.1f}% complete')
    for j in range(DATA_POINTS):
        #Sample the statespace uniformly (Is this effecient?)
        z1 = rng.uniform(low=LOW_STATE_SPACE, high=HIGH_STATE_SPACE, size=(2))
        z2 = odeint(ode, z1, t, args=(mus[i],))[1] #ode input gives answer at t0 and t1, so just select solution @ t1
        predictors[i*DATA_POINTS+j] = torch.tensor((z1[0],z1[1],mus[i]))
        responses[i*DATA_POINTS+j] = torch.tensor((z2[0],z2[1]))



0.0% complete
0.3% complete
0.7% complete
1.0% complete
1.3% complete
1.7% complete
2.0% complete
2.3% complete
2.7% complete
3.0% complete
3.3% complete
3.7% complete
4.0% complete
4.3% complete
4.7% complete
5.0% complete
5.3% complete
5.7% complete
6.0% complete
6.3% complete
6.7% complete
7.0% complete
7.3% complete
7.7% complete
8.0% complete
8.3% complete
8.7% complete
9.0% complete
9.3% complete
9.7% complete
10.0% complete
10.3% complete
10.7% complete
11.0% complete
11.3% complete
11.7% complete
12.0% complete
12.3% complete
12.7% complete
13.0% complete
13.3% complete
13.7% complete
14.0% complete
14.3% complete
14.7% complete
15.0% complete
15.3% complete
15.7% complete
16.0% complete
16.3% complete
16.7% complete
17.0% complete
17.3% complete
17.7% complete
18.0% complete
18.3% complete
18.7% complete
19.0% complete
19.3% complete
19.7% complete
20.0% complete
20.3% complete
20.7% complete
21.0% complete
21.3% complete
21.7% complete
22.0% complete
22.3% complete
22.7% comp

In [3]:
# Create train/test split
train_split = int(0.8 * len(predictors)) # 80% of data used for training set, 20% for testing
pred_train, resp_train = predictors[:train_split], responses[:train_split]
pred_test, resp_test = predictors[train_split:], responses[train_split:]

pred_train = pred_train.to(device)
resp_train = resp_train.to(device)
pred_test = pred_test.to(device)
resp_test = resp_test.to(device)


Now we want to build our model. Based on the Su et al. 2021, we want to use a single modified resnet layer with 3 linear layers of 45 neurons each. (hidden layors, fully connected layors, whatever you want to call them). We have 2 state variables and 1 parameter, so the input of the whole neural network will have 3 neurons and the output will have 2.

In [4]:
NEURONS = 45  # Number of neurons in fully connected layers
STATES = 2    # Number of state variables
PARAMETERS = 1 # Number of parameters

class suNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = nn.Linear(STATES + PARAMETERS, NEURONS)  # 3 inputs (state variables and parameter) and 45 outputs
        self.layer_2 = nn.Linear(NEURONS, NEURONS)  # 45 inputs and 45 outputs
        self.layer_3 = nn.Linear(NEURONS, STATES)  # 45 inputs and 2 outputs (the state variables are the two outputs)
        self.layer_4 = nn.Linear(STATES,NEURONS)  # 45 inputs and 2 outputs (the state variables are the two outputs)
        self.relu = nn.ReLU()
    def forward(self, x):
        input = x
        x = self.layer_1(x) # Apply ReLU after the first layer
        x = self.relu(x)
        x = self.layer_2(x) # Apply/ ReLU after the second layer
        x = self.relu(x)
        x = self.layer_3(x)  # Third layer (no activation after this)
        input2 = x + input[:, :STATES]  # Add the original state variables to the output
        x = self.layer_4(input2) # Apply ReLU after the first layer
        x = self.relu(x)
        x = self.layer_2(x) # Apply/ ReLU after the second layer
        x = self.relu(x)
        x = self.layer_3(x)  # Third layer (no activation after this)
        return x + input2[:, :STATES]  # Add the original state variables to the output

model = suNet().to(device)


With our model defined we next need to define a loss function. The loss function is given in Su's paper as
$$L(\Theta) =\frac{1}{J} \sum_{j=1}^J \| z_j^{(2)} - N(z_j^{(1)},\alpha,\Theta)\|^2_2 $$

In [5]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),0.001)

In [None]:
print(pred_train.is_cuda)
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100
    return acc

epochs = 10000
for epoch in range(epochs):
    ### Training
    model.train()

    # 1. Forward pass (model outputs raw logits)
    resp_pred= model(pred_train).squeeze() # squeeze to remove extra `1` dimensions, this won't work unless model and data are on same device

    # 2. Calculate loss/accuracy
    loss = loss_fn(resp_pred, resp_train)
    #acc = accuracy_fn(y_true=resp_train, y_pred=resp_pred)

    # 3. Optimizer zero grad
    optimizer.zero_grad()

    # 4. Loss backwards
    loss.backward()

    # 5. Optimizer step
    optimizer.step()

    ### Testing
    model.eval()
    with torch.inference_mode():
        # 1. Forward pass
        test_logits = model(pred_test).squeeze()
        # 2. Caculate loss/accuracy
        test_loss = loss_fn(test_logits,
                            resp_test)

    # Print out what's happening every 10 epochs
    if epoch % 100 == 0:
        percentage = 100*epoch/epochs
        print(f"Precentage: {percentage:.1f} | Epoch: {epoch} | Loss: {loss:.8f}| Test loss: {test_loss:.8f}")

True
Precentage: 0.0 | Epoch: 0 | Loss: 0.11712608| Test loss: 0.10198291
Precentage: 1.0 | Epoch: 100 | Loss: 0.00022734| Test loss: 0.00083276
Precentage: 2.0 | Epoch: 200 | Loss: 0.00010397| Test loss: 0.00034681
Precentage: 3.0 | Epoch: 300 | Loss: 0.00006547| Test loss: 0.00020319


In [None]:
import matplotlib.pyplot as plt
import numpy as np
INIT_X=0.2
INIT_Y=0.2
MU = 1
STEPS = 20000
TIME_LENGTH = STEPS * DELTA
RESOLUTION = 2000 #Number of time steps in the ODE simulation
#initial conditions to test our model from
init = np.array([INIT_X,INIT_Y,MU])
#Initialized the predictions
model_prediction_x = np.zeros(STEPS)
model_prediction_y = np.zeros(STEPS)
#Convert the init to a torch to input into the model (DO NOT TOUCH)
input = torch.transpose(torch.from_numpy(init).unsqueeze(1).float(),0,1)
input = input.to(device)
with torch.inference_mode():
    for i in range(STEPS):
        output = model(input)
        #print(output)
        model_prediction_x[i]= output[0][0].item()
        model_prediction_y[i]= output[0][1].item()
        #Add the parameter back onto the output
        c=torch.tensor([[MU]]).to(device)

        #The new input is the old output
        input = torch.cat((output,c),dim=1)

        '''if (i% 10 == 0):
            print(f'State position: {output}| time t={time[i]}')'''





In [None]:
#Plot the model predicitons
s = np.linspace(0, TIME_LENGTH, STEPS)
#Plot the true solutions
time = np.linspace(0,TIME_LENGTH,RESOLUTION) #The step size should be aligned with time step in the model
true = odeint(ode, [INIT_X,INIT_Y], time, args=(MU,))

fig, (ax1, ax2) = plt.subplots(1, 2)
#Time Series
ax1.plot(time, true[:,0])
ax1.plot(time, true[:,1])
ax1.scatter(s, model_prediction_x, s=4, c='blue', edgecolor='w', alpha=0.15, linewidth=0.5, marker='o')
ax1.scatter(s, model_prediction_y, s=4, c='#FF5733', edgecolor='w', alpha=0.15, linewidth=0.5, marker='o')

# Add titles and labels
ax1.set_title('Time Series')
ax1.legend(["Model x(t_i) prediction", "Model y(t_i) prediction", "True x(t)", "True y(t)"])
ax1.set_xlabel('t')
ax1.set_ylabel('x or y')
#Phaseplane
ax2.plot(true[:,0],true[:,1])
ax2.scatter(model_prediction_x,model_prediction_y, s=4, c='red', alpha=0.15, linewidth=0.5, marker='o')
ax2.set_title('Phase Plane')
ax2.set_xlabel('X Axis')
ax2.set_ylabel('Y Axis')
print(model_prediction_x)
# Show the plot
plt.show()