In [1]:
import numpy as np
import pandas as pd

A = 0.1*np.array([[np.sqrt(99), -1], [1, np.sqrt(99)]])
B = 0.5*np.array([[np.sqrt(2), -2], [np.sqrt(2), np.sqrt(2)]])

sigma1 = .01
sigma2 = .2

X_0 = np.array([[1,0]]).T
Y_0 = np.dot(B, X_0) + np.random.normal(loc=0, scale=sigma2, size=2).reshape(2,1)

In [2]:
process_length = 2000

X = [X_0]
Y = [Y_0]

for _ in range(process_length):
    X_k = X[-1]
    X_k_next = np.dot(A, X_k) + np.random.normal(loc=0, scale=sigma1, size=2).reshape(2,1)
    Y_k_next = np.dot(B, X_k_next) + np.random.normal(loc=0, scale=sigma2, size=2).reshape(2,1)
    X.append(X_k_next)    
    Y.append(Y_k_next)

def sliding_windows(data, seq_length):
    x = []
    y = []

    for i in range(len(data)-seq_length-1):
        _x = data[i:(i+seq_length)]
        _y = data[i+seq_length]
        x.append(_x)
        y.append(_y)

    return np.array(x),np.array(y)

In [28]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as data_utils

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
sequence_length = 4
input_size = 2
hidden_size = 2
num_layers = 1
num_classes = 2
batch_size = 1
num_epochs = 5
learning_rate = 0.001

Y_seq, Y_target = sliding_windows(Y, sequence_length)
Y_seq = Variable(torch.Tensor(Y_seq.reshape(-1, sequence_length, 1, 2)))
Y_target = Variable(torch.Tensor(Y_target.reshape(-1, 1, 2)))
train_samples = 1000

train_tensor = data_utils.TensorDataset(Y_seq[:train_samples], Y_target[:train_samples]) 
train_loader = data_utils.DataLoader(dataset = train_tensor, batch_size = batch_size)

test_tensor = data_utils.TensorDataset(Y_seq[(train_samples+1):], Y_target[(train_samples+1):]) 
test_loader = data_utils.DataLoader(dataset = test_tensor, batch_size = batch_size)

# Recurrent neural network (many-to-one)
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.last_hidden = None
    
    def forward(self, x):
        # Set initial hidden and cell states 
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, hidden = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        self.last_hidden = hidden
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
                
        return out

model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)

In [29]:
# Loss and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
total_step = len(train_loader)
best_model = {
    'epoch': -1,
    'model_state_dict': None,
    'accuracy': 0,
    'loss': 1,
}
# Train the model
for epoch in range(num_epochs):
    for i, (Y_seq_i, Y_target_i) in enumerate(train_loader):
        model.train()
        
        Y_seq_i = Y_seq_i.reshape(-1, sequence_length, input_size).to(device)
        Y_target_i = Y_target_i.reshape(-1, num_classes).to(device)
        # Forward pass
        outputs = model(Y_seq_i)
        loss = criterion(outputs, Y_target_i)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 1000 == 0:
            model.eval()
            test_criterion = torch.nn.MSELoss()
            test_loss = 0
            for i, (Y_seq_i, Y_target_i) in enumerate(test_loader):
                Y_seq_i = Y_seq_i.reshape(-1, sequence_length, input_size).to(device)
                Y_target_i = Y_target_i.reshape(-1, num_classes).to(device)
                test_outputs = model(Y_seq_i)
                test_loss = test_criterion(test_outputs, Y_target_i)
                
            test_accuracy = 1 - test_loss.item()
            if best_model['accuracy'] < test_accuracy:
                best_model = {
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'accuracy': test_accuracy,
                    'loss': loss.item(),
                }
            print("Epoch: %d, loss: %1.5f, test accuracy: %1.5f" % (epoch, loss.item(), test_accuracy))
print("="*20)
print(f"Best model using the validation set:\n"
      f" epoch:         [{best_model['epoch']}]\n"
      f" loss:          [{best_model['loss']}]\n"
      f" test accuracy: [{best_model['accuracy']}]\n")
print("="*20)

model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
model.load_state_dict(best_model['model_state_dict'])

Epoch: 0, loss: 0.09449, test accuracy: 0.90459
Epoch: 1, loss: 0.10764, test accuracy: 0.98154
Epoch: 2, loss: 0.08181, test accuracy: 0.97872
Epoch: 3, loss: 0.07128, test accuracy: 0.97682
Epoch: 4, loss: 0.06604, test accuracy: 0.97559
Best model using the validation set:
 epoch:         [1]
 loss:          [0.10763672739267349]
 test accuracy: [0.9815373905003071]



<All keys matched successfully>

In [30]:
model.eval()
Y_seq_1001, Y_target_1001 = iter(test_loader).next()

Y_seq_1001 = Y_seq_1001.reshape(-1, sequence_length, input_size).to(device)
Y_target_1001 = Y_target_1001.reshape(-1, num_classes).to(device)
    
Y1001_hat = model(Y_seq_1001)

In [31]:
Y1001_hat.data

tensor([[0.6765, 0.3503]])

In [32]:
Y_target_1001

tensor([[0.5093, 0.2328]])

In [33]:
model.last_hidden

(tensor([[[-0.7424,  0.1528]]], grad_fn=<StackBackward0>),
 tensor([[[-1.0916,  0.1751]]], grad_fn=<StackBackward0>))

In [34]:
Y[train_samples+sequence_length+1]

array([[0.50929014],
       [0.23276622]])

In [35]:
np.dot(B, X[train_samples+sequence_length+1])

array([[0.80354182],
       [0.14426469]])

In [38]:
(hn, cn) = model.last_hidden
Xhat = cn.data.reshape(2, 1).numpy()
Xhat

array([[-1.0916133 ],
       [ 0.17512082]], dtype=float32)

In [39]:
np.dot(B, Xhat)

array([[-0.94700798],
       [-0.64805805]])

In [15]:
Xhat

array([[-0.02047363],
       [-0.7645826 ]], dtype=float32)