In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn


# Generate Time Series Data as in Feldkamp et al.

In [None]:
def nl_sys_gen_traj(type, i, N, x1_0):
    # only generate x_1 trajectory for each of the 13 time series
    x1_traj = np.zeros((N+1, ))
    x2_traj = np.zeros((N+1, ))
    m_traj = np.zeros((N+1, ))

    # x2_0 = 10*np.random.randn()
    x2_0 = 0
    x1_traj[0] = x1_0
    x2_traj[0] = x2_0
    
    alpha = np.array([1.5, 3.1, 3.6, 3.9, 3.95, 4.0])
    beta = np.array([0.8, 1.0])
    mu = 0.7
    T = np.array([20, 80])

    if type == 'quad':
        for k in range(N):
            # Note that the index of each type is starting from 1 (0 works but wrong)
            if i == 7:
                # alpha_k = (3.6+3.9)/2 + 0.15*np.sin(2*np.pi/5000*k)
                alpha_k = (3.6+3.9)/2 + 0.15*np.sin(2*np.pi/N*k)
                x1_traj[k+1] = alpha_k*x1_traj[k]*(1 - x1_traj[k])
            else:
                x1_traj[k+1] = alpha[i-1]*x1_traj[k]*(1 - x1_traj[k])
    elif type == 'henon':
        for k in range(N):
            if i == 3:
                # beta_k = 0.9 + 0.1*np.sin(2*np.pi/5000*k)
                beta_k = 0.9 + 0.1*np.sin(2*np.pi/N*k)
                x1_traj[k+1] = beta_k - 1.4*x1_traj[k]**2 + x2_traj[k]
                x2_traj[k+1] = 0.3*x1_traj[k]
            else:
                x1_traj[k+1] = beta[i-1] - 1.4*x1_traj[k]**2 + x2_traj[k]
                x2_traj[k+1] = 0.3*x1_traj[k]
    elif type == 'ikeda':
        for k in range(N):
            m_traj[k] = 0.4 - 6.0/(1 + x1_traj[k]**2 + x2_traj[k]**2)
            x1_traj[k+1] = 1. + mu*(x1_traj[k]*np.cos(m_traj[k]) - x2_traj[k]*np.sin(m_traj[k]))
            x2_traj[k+1] = mu*(x1_traj[k]*np.sin(m_traj[k]) + x2_traj[k]*np.cos(m_traj[k]))
        m_traj[N] = 0.4 - 6.0/(1 + x1_traj[N]**2 + x2_traj[N]**2)
    elif type == 'sine':
        for k in range(N+1):
            x1_traj[k] = np.sin(2*np.pi*k/T[i-1])
    else:
        print('Wrong type input')

    return x1_traj, x2_traj, m_traj

In [None]:
x1_traj, _, _ = nl_sys_gen_traj('ikeda', 1, 10, 0.5)

fig, ax = plt.subplots()
ax.plot(range(11), x1_traj)
plt.xlim([0, 10])
plt.ylim([-1., 1.])

x1_traj.shape

In [None]:
a = np.array([1., 2., 3., 4.]).reshape((-1, 1))
a_sub = a[:-1]
print(a_sub)

In [None]:
b = np.array([2., 3., 4., 5.]).reshape((-1, 1))
ab = np.concatenate([a, b], axis=1)
print(ab)

In [None]:
train_traj_list = list()
pred_traj_list = list()
N = 500

for i in range(1, 8):
    x1_traj_i, _, _ = nl_sys_gen_traj('quad', i, N, 0.4)
    x1_traj_i = x1_traj_i.reshape((-1, 1))
    train_traj_list.append(x1_traj_i[: -1, :])
    pred_traj_list.append(x1_traj_i[1:, :])

for j in range(1, 4):
    x1_traj_j, _, _ = nl_sys_gen_traj('henon', j, N, 0.5)
    x1_traj_j = x1_traj_j.reshape((-1, 1))
    train_traj_list.append(x1_traj_j[: -1, :])
    pred_traj_list.append(x1_traj_j[1:, :])

x1_traj_ikeda, _, _ = nl_sys_gen_traj('ikeda', 1, N, 0.5)
x1_traj_ikeda = x1_traj_ikeda.reshape((-1, 1))
train_traj_list.append(x1_traj_ikeda[:-1, :])
pred_traj_list.append(x1_traj_ikeda[1:, :])

for k in range(1, 3):
    x1_traj_k, _, _ = nl_sys_gen_traj('sine', k, N, 0.5)
    x1_traj_k = x1_traj_k.reshape((-1, 1))
    train_traj_list.append(x1_traj_k[:-1, :])
    pred_traj_list.append(x1_traj_k[1:, :])

train_traj = np.concatenate(train_traj_list)
pred_traj = np.concatenate(pred_traj_list)
batch_size = N
num_batch  = len(train_traj_list)

In [None]:
train_set = torch.FloatTensor(train_traj).view(-1, 1)
true_set = torch.FloatTensor(pred_traj).view(-1, 1)

In [None]:
train_set[0].view(-1, 1, 1)
train_set.shape

# Set up Neural Network

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_dim_1, hidden_dim_2, hidden_dim_3, output_size):
        super(RNNModel, self).__init__()

        # Defining some parameters
        self.hidden_dim_1 = hidden_dim_1
        self.hidden_dim_2 = hidden_dim_2
        self.hidden_dim_3 = hidden_dim_3
        self.input_size = input_size
        self.n_layers = 1

        # Defining the layers
        # RNN Layer
        self.rnn_1 = nn.RNN(input_size, hidden_dim_1)
        self.rnn_2 = nn.RNN(hidden_dim_1, hidden_dim_2)
        self.rnn_3 = nn.RNN(hidden_dim_2, hidden_dim_3)
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim_3, output_size)

        # Initializing hidden state for first input using method defined below (Why should this be in the forward loop???)
        # self.hidden_1, self.hidden_2 = self.init_hidden(batch_size)
    
    def forward(self, x):
        batch_size = x.size(0)

        # Initializing hidden state for first input using method defined below
        self.hidden_1, self.hidden_2, self.hidden_3 = self.init_hidden(batch_size)

        # Passing in the input and hidden state into the model and obtaining outputs
        out1, self.hidden_1 = self.rnn_1(x.view(-1,1,self.input_size),self.hidden_1)
        out2, self.hidden_2 = self.rnn_2(out1, self.hidden_2)
        out3, self.hidden_3 = self.rnn_3(out2, self.hidden_3)
        
        # Reshaping the outputs such that it can be fit into the fully connected layer
        # out2 = out2.contiguous().view(-1, self.hidden_dim_2)
        # pred = self.linear(lstm_out_2.view(len(seq),-1))
        out = self.fc(out3.view(len(x), -1))
        # out = self.fc(out2[:, -1, :])
        
        return out
    
    def init_hidden(self, batch_size):
        # This method generates the first hidden state of zeros which we'll use in the forward pass
        # We'll send the tensor holding the hidden state to the device we specified earlier as well
        hidden_1 = torch.zeros(self.n_layers, batch_size, self.hidden_dim_1)
        hidden_2 = torch.zeros(self.n_layers, batch_size, self.hidden_dim_2)
        hidden_3 = torch.zeros(self.n_layers, batch_size, self.hidden_dim_3)
        return hidden_1, hidden_2, hidden_3

In [None]:
# Creating a model instance, loss function and the optimizer
model = RNNModel(input_size=1,hidden_dim_1=8, hidden_dim_2=7, hidden_dim_3 = 6,output_size=1)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)

In [None]:
display(model)
display(optimizer)
def count_parameters(model):
    params = [p.numel() for p in model.parameters() if p.requires_grad]
    for item in params:
        print(f'{item:>6}')
    print(f'______\n{sum(params):>6}')
    
count_parameters(model)

# Training

In [None]:
num_batch
batch_size

In [None]:
# Set the number of epochs

epochs = 100

for epoch in range(epochs):
    
    # Running each batch separately 
    
    for bat_id in range(num_batch):
    
        for k in range(batch_size):
            
            # set the optimization gradient to zero

            optimizer.zero_grad()
            
            # initialize the hidden states

            model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                            torch.zeros(1,1,model.hidden_dim_1))

            model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                            torch.zeros(1,1,model.hidden_dim_2))
            
            model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                            torch.zeros(1,1,model.hidden_dim_3))
            
            # Make predictions on the current sequence

            y_pred = model(train_set[bat_id*batch_size + k])
            
            # Compute the loss

            loss = criterion(y_pred, true_set[bat_id*batch_size + k])
            
            # Perform back propogation and gradient descent

            loss.backward()

            optimizer.step()

    if epoch%10 == 0:

        print(f'Epoch: {epoch} Loss: {loss.item():10.8f}')

Epoch: 10 Loss: 0.00527634


KeyboardInterrupt: 

In [None]:

model_save_name = 'RNN_500_2.pt'
path = F"./RNN/{model_save_name}"
torch.save(model.state_dict(), path)

In [None]:
model_save_name = 'RNN_500_2.pt'
path = F"./RNN/{model_save_name}" 
model.load_state_dict(torch.load(path))

In [None]:
horizon = batch_size

model.eval()

pred_model = list()
true_data = list()

for id in range(num_batch):
    true_data.append(pred_traj_list[id])
    pred_traj = np.zeros((batch_size, ))
    for k in range(batch_size):
        # Initialize hidden states
        model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
        model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
        model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
        
        pred_traj[k] = model(train_set[id*batch_size + k])
    
    pred_model.append(pred_traj)

In [None]:
fig1, ax1 = plt.subplots(2, 3, figsize=(18, 12))
alpha_index = ['3.1', '3.6', '3.9', '3.95', '4.0', 'periodic']
for id in range(1, 7):
    ax1[(id-1)//3, (id-1)%3].plot(range(batch_size), pred_model[id])
    ax1[(id-1)//3, (id-1)%3].plot(range(batch_size), true_data[id])
    ax1[(id-1)//3, (id-1)%3].set_xlim([260, 300])
    ax1[(id-1)//3, (id-1)%3].set_title('alpha='+alpha_index[id-1])

In [None]:
fig2, ax2 = plt.subplots(1, 3, figsize=(18, 6))
beta_index = ['0.8', '1.0', 'periodic']
for id in range(7, 10):
    ax2[(id-7)%3].plot(range(batch_size), pred_model[id])
    ax2[(id-7)%3].plot(range(batch_size), true_data[id])
    ax2[(id-7)%3].set_xlim([260, 300])
    ax2[(id-7)%3].set_title('beta='+beta_index[id-7])

In [None]:
fig3, ax3 = plt.subplots(1, 1, figsize=(6, 6))
ax3.plot(range(batch_size), pred_model[10])
ax3.plot(range(batch_size), true_data[10])
ax3.set_xlim([260, 300])
ax3.set_title('ikeda')

In [None]:
fig4, ax4 = plt.subplots(1, 2, figsize=(12, 6))
T_index = ['20', '80']
for id in range(11, 13):
    ax4[(id-11)%2].plot(range(batch_size), pred_model[id])
    ax4[(id-11)%2].plot(range(batch_size), true_data[id])
    ax4[(id-11)%2].set_xlim([200, 400])
    ax4[(id-11)%2].set_title('T='+T_index[id-11])

# Extrapolation Testing

In [None]:
def var_test_gen_traj(type, i, N, x1_0):
    # only generate x_1 trajectory for each of the 13 time series
    x1_traj = np.zeros((N+1, ))
    x2_traj = np.zeros((N+1, ))
    m_traj = np.zeros((N+1, ))

    # x2_0 = 10*np.random.randn()
    x2_0 = 0
    x1_traj[0] = x1_0
    x2_traj[0] = x2_0
    
    alpha = np.array([3.0, 3.75])
    beta = np.array([0.75, 0.90])
    mu = 0.75
    T = np.array([15, 50, 85])

    if type == 'quad':
        for k in range(N):
            # Note that the index of each type is starting from 1 (0 works but wrong)
            x1_traj[k+1] = alpha[i-1]*x1_traj[k]*(1 - x1_traj[k])
    elif type == 'henon':
        for k in range(N):
            x1_traj[k+1] = beta[i-1] - 1.4*x1_traj[k]**2 + x2_traj[k]
            x2_traj[k+1] = 0.3*x1_traj[k]
    elif type == 'ikeda':
        for k in range(N):
            m_traj[k] = 0.4 - 6.0/(1 + x1_traj[k]**2 + x2_traj[k]**2)
            x1_traj[k+1] = 1. + mu*(x1_traj[k]*np.cos(m_traj[k]) - x2_traj[k]*np.sin(m_traj[k]))
            x2_traj[k+1] = mu*(x1_traj[k]*np.sin(m_traj[k]) + x2_traj[k]*np.cos(m_traj[k]))
        m_traj[N] = 0.4 - 6.0/(1 + x1_traj[N]**2 + x2_traj[N]**2)
    elif type == 'sine':
        for k in range(N+1):
            x1_traj[k] = np.sin(2*np.pi*k/T[i-1])
    else:
        print('Wrong type input')

    return x1_traj, x2_traj, m_traj

In [None]:
test_T = 200

# Set the model to evaluation mode

model.eval()

x_init = 0.8

test_var_input = list()
test_var_pred = list()
test_var_out = list()

for i in range(2):
    x1_traj_pred = np.zeros((test_T, ))
    x1_traj, _, _ = var_test_gen_traj('quad', i, test_T, x_init)
    test_var_input.append(x1_traj[:-1])
    test_var_out.append(x1_traj[1: ])
    
    x1_traj = torch.FloatTensor(x1_traj).view(-1, 1)
    for k in range(test_T):
        model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
        model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
        model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
        x1_traj_pred[k] = model(x1_traj[k])
    test_var_pred.append(x1_traj_pred)

fig5, ax5 = plt.subplots(1, 2, figsize=(12, 6))
alpha_index = ['3.0', '3.75']
for id in range(2):
    ax5[id].plot(range(test_T), test_var_pred[id])
    ax5[id].plot(range(test_T), test_var_out[id])
    ax5[id].set_xlim([80, 150])
    ax5[id].set_title('alpha='+alpha_index[id])

for i in range(2):
    x1_traj_pred = np.zeros((test_T, ))
    x1_traj, _, _ = var_test_gen_traj('henon', i, test_T, x_init)
    test_var_input.append(x1_traj[:-1])
    test_var_out.append(x1_traj[1: ])
    
    x1_traj = torch.FloatTensor(x1_traj).view(-1, 1)
    for k in range(test_T):
        model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
        model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
        model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
        x1_traj_pred[k] = model(x1_traj[k])
    test_var_pred.append(x1_traj_pred)

fig6, ax6 = plt.subplots(1, 2, figsize=(12, 6))
beta_index = ['0.75', '0.90']
for id in range(2):
    ax6[id].plot(range(test_T), test_var_pred[id+2])
    ax6[id].plot(range(test_T), test_var_out[id+2])
    ax6[id].set_xlim([80, 150])
    ax6[id].set_title('beta='+beta_index[id])

In [None]:
x1_traj_pred = np.zeros((test_T, ))
x1_traj, _, _ = var_test_gen_traj('ikeda', 0, test_T, x_init)
test_var_input.append(x1_traj[:-1])
test_var_out.append(x1_traj[1: ])
    
x1_traj = torch.FloatTensor(x1_traj).view(-1, 1)
for k in range(test_T):
    model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
    model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
    model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
    x1_traj_pred[k] = model(x1_traj[k])
test_var_pred.append(x1_traj_pred)

fig7, ax7 = plt.subplots(1, 1, figsize=(6, 6))
ikeda_index = '0.75'
ax7.plot(range(test_T), test_var_pred[4])
ax7.plot(range(test_T), test_var_out[4])
ax7.set_xlim([80, 150])
ax7.set_title('mu='+ikeda_index)

In [None]:
for i in range(3):
    x1_traj_pred = np.zeros((test_T, ))
    x1_traj, _, _ = var_test_gen_traj('sine', i, test_T, x_init)
    test_var_input.append(x1_traj[:-1])
    test_var_out.append(x1_traj[1: ])
    
    x1_traj = torch.FloatTensor(x1_traj).view(-1, 1)
    for k in range(test_T):
        model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
        model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
        model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
        x1_traj_pred[k] = model(x1_traj[k])
    test_var_pred.append(x1_traj_pred)

fig8, ax8 = plt.subplots(1, 3, figsize=(18, 6))
T_index = ['15', '50', '80']
for id in range(3):
    ax8[id].plot(range(test_T), test_var_pred[id+5])
    ax8[id].plot(range(test_T), test_var_out[id+5])
    ax8[id].set_xlim([0, 150])
    ax8[id].set_title('T='+T_index[id])

# Switching Test

In [None]:
def switch_test_gen_traj(i, N1, N2, N3, x1_0):
    # only generate x_1 trajectory for each of the 13 time series
    N = N1 + N2 + N3
    x1_traj = np.zeros((N+1, ))
    x2_traj = np.zeros((N+1, ))
    m_traj = np.zeros((N+1, ))

    # x2_0 = 10*np.random.randn()
    x2_0 = 0
    x1_traj[0] = x1_0
    x2_traj[0] = x2_0

    mu = 0.7

    if i == 0:
        for k in range(N1):
            x1_traj[k+1] = 3.1*x1_traj[k]*(1 - x1_traj[k])
        for k in range(N1, N1+N2):
            x1_traj[k+1] = 0.8 - 1.4*x1_traj[k]**2 + x2_traj[k]
            x2_traj[k+1] = 0.3*x1_traj[k]
        for k in range(N1+N2, N1+N2+N3):
            x1_traj[k+1] = np.sin(2*np.pi*k/80)
    elif i == 1:
        for k in range(N1):
            x1_traj[k+1] = np.sin(2*np.pi*k/20)
        for k in range(N1, N1+N2+N3):
            x1_traj[k+1] = 3.1*x1_traj[k]*(1 - x1_traj[k])
    elif i == 2:
        for k in range(N1):
            x1_traj[k+1] = 0.8 - 1.4*x1_traj[k]**2 + x2_traj[k]
            x2_traj[k+1] = 0.3*x1_traj[k]
        for k in range(N1, N1+N2):
            x1_traj[k+1] = 1.0 - 1.4*x1_traj[k]**2 + x2_traj[k]
            x2_traj[k+1] = 0.3*x1_traj[k]
        for k in range(N1+N2, N1+N2+N3):
            x1_traj[k+1] = 3.1*x1_traj[k]*(1 - x1_traj[k])
    elif i == 3:
        for k in range(N1):
            x1_traj[k+1] = 1.5*x1_traj[k]*(1 - x1_traj[k])
        for k in range(N1, N1+N2):
            x1_traj[k+1] = 1.0 - 1.4*x1_traj[k]**2 + x2_traj[k]
            x2_traj[k+1] = 0.3*x1_traj[k]
        for k in range(N1+N2, N1+N2+N3):
            m_traj[k] = 0.4 - 6.0/(1 + x1_traj[k]**2 + x2_traj[k]**2)
            x1_traj[k+1] = 1. + mu*(x1_traj[k]*np.cos(m_traj[k]) - x2_traj[k]*np.sin(m_traj[k]))
            x2_traj[k+1] = mu*(x1_traj[k]*np.sin(m_traj[k]) + x2_traj[k]*np.cos(m_traj[k]))
        m_traj[N] = 0.4 - 6.0/(1 + x1_traj[N]**2 + x2_traj[N]**2)

    else:
        print('Wrong type input')

    return x1_traj, x2_traj, m_traj

In [None]:
test_T1 = 100
test_T2 = 200
test_T3 = 300
test_T = test_T1 + test_T2 + test_T3

# Set the model to evaluation mode

model.eval()

x_init = 0.6

test_sw_input = list()
test_sw_pred = list()
test_sw_out = list()

for i in range(4):
    x1_traj_pred = np.zeros((test_T, ))
    x1_traj, _, _ = switch_test_gen_traj(i, test_T1, test_T2, test_T3, x_init)
    test_sw_input.append(x1_traj[:-1])
    test_sw_out.append(x1_traj[1: ])
    
    x1_traj = torch.FloatTensor(x1_traj).view(-1, 1)
    for k in range(test_T):
        model.hidden_1 = (torch.zeros(1,1,model.hidden_dim_1),
                        torch.zeros(1,1,model.hidden_dim_1))
        model.hidden_2 = (torch.zeros(1,1,model.hidden_dim_2),
                        torch.zeros(1,1,model.hidden_dim_2))
        model.hidden_3 = (torch.zeros(1,1,model.hidden_dim_3),
                        torch.zeros(1,1,model.hidden_dim_3))
        x1_traj_pred[k] = model(x1_traj[k])
    test_sw_pred.append(x1_traj_pred)

fig9, ax9 = plt.subplots(1, 4, figsize=(24, 6))
for id in range(4):
    ax9[id].plot(range(test_T), test_sw_pred[id])
    ax9[id].plot(range(test_T), test_sw_out[id])
    if id != 1:
      ax9[id].set_xlim([75, 125])
    # ax5[id].set_title('alpha='+alpha_index[id])

fig10, ax10 = plt.subplots(1, 4, figsize=(24, 6))
for id in range(4):
    ax10[id].plot(range(test_T), test_sw_pred[id])
    ax10[id].plot(range(test_T), test_sw_out[id])
    if id != 1:
      ax10[id].set_xlim([175, 225])
    # ax5[id].set_title('alpha='+alpha_index[id])