# Recurrent Neural Networks

In this series of exercises, we are going to see some basic applications of Recurrent Neural Networks (RNNs).

In [None]:
# classical imports
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import matplotlib.pyplot as plt

## First RNN: sinusoidal wave

This toy problem is an extremely simple illustration of a univariate (one-feature) time series. Our objective will be to build a model able to correctly predict the next value in the series, given a variable amount of sequential values from previous timesteps. First, let's generate some data and visualize the result.

In [None]:
# since we are creating artificial data, we can operate directly on tensors
t = torch.linspace(0, 799, 800) # sample uniformely space in (0,799) for 800 points
y = torch.sin(t*2*3.1416/40)

# let's just visualize the first 100 samples
fig, ax = plt.subplots(figsize=(12,4))
ax.plot(t[:100], y[:100], color='green', label="Sinusoidal wave")
ax.set_xlabel("t")
ax.set_ylabel("y = sin(t*2*pi/40)")
ax.set_title("First 100 points of the time series")
ax.legend(loc='best')
ax.grid(True)

Now, we split the data into training and test (let's forget the validation set, for the moment).

In [None]:
test_size = 40

# negative indices start indexing from the last part of the tensor/array (e.g. last element is y[-1])
train_set = y[:-(test_size)]
test_set = y[-test_size:]

print("Total data: %d samples; Training set: %d samples; Test set: %d samples" %
      (y.shape[0], train_set.shape[0], test_set.shape[0]))

If this were a classical neural network, we would be done; but in this case, we need to actually create training sequences, where we give a few input values to the RNN, and then have the next value in the sequence as output to be predicted, to be used as a ground truth. We can slide a window of given size over the single samples of our training set to create this new training data.

In [None]:
# we can use a simple function to automatically create the training data;
# it's going to return lists of tuples (x_tensor, y_tensor), where x_tensor
# are the values in the previous time steps, and y_tensor is the next value
# to be predicted
def input_data(sequence, window_size) :
    out = []
    L = len(sequence)

    for i in range(0, L - window_size) :
        window = sequence[i:i+window_size]
        label = sequence[i+window_size:i+window_size+1]
        out.append((window, label))

    return out

window_size = 40
print("Creating input sequences of %d samples" % window_size)
train_data = input_data(train_set, window_size)
print("Created a total of %d input sequences" % len(train_data))

# let's take a look at the first training sequence
x_tensor = train_data[0][0] # training samples for this sequence
y_tensor = train_data[0][1] # next value to be predicted
t = [i for i in range(0, x_tensor.shape[0])]
fig, ax = plt.subplots(figsize=(12,4))
ax.plot(t, x_tensor, color='green', label="Sinusoidal wave, training sequence")
ax.scatter(t[-1] + 1, y_tensor, color='red', label="Next value to be predicted")
ax.set_xlabel("t")
ax.set_ylabel("y = sin(t^2*pi/40)")
ax.set_title("First training sequence")
ax.legend(loc='best')
ax.grid(True)

We finally get to our favorite part, where we need to create a new model, inheriting from torch.nn.Module.

In [None]:
class MyFirstLSTM(nn.Module):

    # we need to specify the size of the hidden state, or in other word, the tensor
    # storing the history
    def __init__(self,input_size = 1, hidden_size = 1, out_size = 1) :
        super().__init__()
        # the hidden state of all cells is initialized to zeros; the hidden state in
        # this case is a set of h_t and c_t for each LSTM unit in the module
        # the number of LSTM units is the parameter 'hidden_size'
        self.hidden_size = hidden_size
        self.hidden = (torch.zeros(1, 1, hidden_size), torch.zeros(1, 1, hidden_size)) # (h_t0, c_t0)
        # and the neural network simply has two modules:
        # - a LSTM, that will be unrolled to adjust for sequences of different length
        self.lstm = nn.LSTM(input_size, hidden_size)
        # - and a simple linear module, that will output the value for the next element in the sequence
        self.linear = nn.Linear(hidden_size, out_size)

    # forward pass
    def forward(self, sequence) :
        # the LSTM module is called with the input sequence and the hidden state
        # stored inside the attribute self.hidden of this class
        lstm_out, self.hidden = self.lstm(sequence.view(len(sequence), 1, -1), self.hidden) # this call performs unrolling inside
        # after getting the outputs of all LSTM cells as a sequence, the linear layer
        # outputs one prediction per element of the sequence
        pred = self.linear(lstm_out.view(len(sequence),-1))
        # we return the last element in the sequence as the prediction for t+1,
        # but in fact we got one prediction for each unrolled unit of the LSTM module
        return pred[-1]

    # forward pass with printouts (TODO, see below)

# let's instantiate a network here, to check the parameter count
my_first_lstm = MyFirstLSTM(hidden_size=40)
print("My first LSTM network has %d parameters!" % sum(p.numel() for p in my_first_lstm.parameters() if p.requires_grad))
print("Of which %d inside the LSTM cells" % sum(p.numel() for p in my_first_lstm.lstm.parameters() if p.requires_grad))

Before trying to run the code, try setting the size of the hidden state to 1. This will make the LSTM module use a single unit, so we can check that the number of parameters is what we expect. If they look a bit more than what you would have anticipated, it's because pytorch keeps some of the biases as two separate values instead of one, see the [LSTM module documentation](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html) for more details.

Now, add a new method to the `MyFirstLSTM` class, `forward_with_printouts` which will basically be a copy of the `forward`, just printing out the shapes of the tensors and the shapes of the hidden state. If you feel particularly inspired, you could try to create [forward hooks for the layers](https://pytorch.org/docs/stable/generated/torch.nn.modules.module.register_module_forward_hook.html) instead.

Finally, set the hidden state size back to 40 (or another integer of your choice), and we can now proceed with the training.

In [None]:
# fix random seed for reproducibility
torch.manual_seed(42)

# re-instantiate the network here (after fixing seed) so we always get same
# initial (pseudo-random) values of its internal parameters
my_first_lstm = MyFirstLSTM(hidden_size=40)

# as usual, we prepare the optimization loop
loss = nn.MSELoss()
optimizer = torch.optim.Adam(my_first_lstm.parameters(), lr=1e-5)

max_epochs = 20
future = 40 # number of future points we are going to obtain before evaluation

for epoch in range(0, max_epochs):

    for seq, y_train in train_data:
        optimizer.zero_grad()
        my_first_lstm.hidden = (torch.zeros(1, 1, my_first_lstm.hidden_size),
                       torch.zeros(1, 1, my_first_lstm.hidden_size))

        y_pred = my_first_lstm(seq)
        train_loss = loss(y_pred, y_train)
        train_loss.backward()
        optimizer.step()

    print("Epoch %d, loss: %.4e" % (epoch, train_loss.item()))

    # this part here is just to offer a visualization of the training process,
    # normally we should NEVER use the test set during training
    if epoch % 5 == 0 or epoch == max_epochs-1 :
      preds = train_set[-window_size:].tolist()
      for f in range(0, future) :
          seq = torch.FloatTensor(preds[-window_size:])
          with torch.no_grad() :
              my_first_lstm.hidden = (torch.zeros(1, 1, my_first_lstm.hidden_size),
                            torch.zeros(1, 1, my_first_lstm.hidden_size))
              preds.append(my_first_lstm(seq).item())

      loss_train = loss(torch.tensor(preds[-window_size:]), y[760:])
      print("Performance on test set: %.4e" % loss_train.item())

      fig, ax = plt.subplots(figsize=(12,4))
      ax.set_xlim(700, 801) # we visualize only the last part of the data set
      ax.grid(True)
      ax.plot(y.numpy()[:760], color='green', label="Training set")
      ax.scatter(range(760, 800), y.numpy()[760:], color='green', label="Test set, ground truth")
      ax.plot(range(760, 800), preds[window_size:], color='orange', label="Prediction")
      ax.legend(loc='best')
      ax.set_title("Epoch %d" % epoch)
      plt.show()

Now that the network is trained, we can check what happens if we give it an input sequence of different size, at the same time asking it to predict more values than what it typically saw in the training set.

In [None]:
# since we used what was previously the test set during training,
# let's generate completely new data, a proper test set
t_new = torch.linspace(1000, 1200, 200)
y_new = torch.sin(t_new*2*3.1416/40)

input_sequence_length = 80
input_sequence = y_new[0:input_sequence_length] # first samples
local_input_sequence = input_sequence.clone() # create a separate tensor, to avoid messing up the original
pred_sequence_length = 60
pred_sequence = torch.zeros(60)

# we do not need to perform other backward passes, so we can just perform a forward pass
# with the torch.no_grad() context, to avoid recreating the computational graph
with torch.no_grad() :

  for i in range(0, pred_sequence_length) :
    # prepare the LSTM network, setting its initial hidden state to zero
    my_first_lstm.hidden = (torch.zeros(1, 1, my_first_lstm.hidden_size),
                              torch.zeros(1, 1, my_first_lstm.hidden_size))

    # obtain the prediction for the sequence so far
    y_pred = my_first_lstm(local_input_sequence)

    # store the predicted element
    pred_sequence[i] = y_pred.item()

    # add the *predicted element* to the next input sequence, using concatenation
    local_input_sequence = torch.cat((local_input_sequence, y_pred), dim=0)

# check the results
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(t_new[:input_sequence_length], input_sequence, color='green', label="Initial training sequence")
ax.scatter(t_new[input_sequence_length:input_sequence_length+pred_sequence_length], y_new[input_sequence_length:input_sequence_length+pred_sequence_length],
           color='green', label="Ground truth")
ax.plot(t_new[input_sequence_length:input_sequence_length+pred_sequence_length], pred_sequence, color='orange', label="Predicted values")
ax.grid(True)
ax.legend(loc='best')

Try to use the `forward_with_printouts()` method that you previously designed, instead of just the forward pass. What are the shapes of the tensors and the hidden state? Is this what you were expecting?

Just to have a term of comparison, we shall now try an Auto-Regressive Integrated Moving Average (ARIMA) model, from the library statmodels.

In [None]:
# colaboratory has statsmodels natively accessible, if you are running this on
# a local notebook, you might have to install it
#!pip install statsmodels
from statsmodels.tsa.arima.model import ARIMA

# create a 'history' variable as a list, because we will have to append elements to it
history = [x for x in train_set]

# the model is autoregressive, so for every sample we want to create, we need
# to feed it the whole history up to that point (including its own previous predictions)
y_pred = []
y_true = []
for i in range(0, test_set.shape[0]) :

  # create and fit the model to the current history
  arima_order = (1,1,0) # this tuple contains hyperparameters for ARIMA, don't worry too much about it
  model = ARIMA(history, order=(1,1,0))
  model_fitted = model.fit()

  # perform a prediction
  y_hat = model_fitted.forecast()[0] # get the first element of the forecast

  # store information
  y_pred.append(y_hat)
  y_true.append(test_set[i].item())

  # update history
  history.append(y_true[-1]) # we are updating the history with an *observed value*

fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(range(len(train_set)-100, len(train_set)), train_set[-100:], color='green', label="Training data")
ax.scatter(range(len(train_set), len(train_set) + len(test_set)), test_set, color='green', label="Test data, ground truth")
ax.plot(range(len(train_set), len(train_set) + len(test_set)), y_pred, color='orange', label="ARIMA predictions")
ax.grid(True)
ax.legend(loc='best')

Despite all the warning messages, this one was not that bad! However, it is also true that we always updated the history that ARIMA uses to predict the next value with an _observed_ value. Verify what happens if the history is updated with the value _predicted_ by ARIMA. Does it still work well? Why do you think is that?

As a final check on our beloved LSTM network, we could try a forward pass with a sequence and try to visualize the computational graph. We will need to re-install the pytorchviz library, which might take some time, but it could be cool. Let's check!

In [None]:
!pip install -U git+https://github.com/szagoruyko/pytorchviz.git@master --quiet

In [None]:
from torchviz import make_dot

# define input sequence
input_sequence = y_new[0:input_sequence_length]
print("The input sequence has %d elements" % input_sequence_length)
# set up hidden state of the LSTM
my_first_lstm.hidden = (torch.zeros(1, 1, my_first_lstm.hidden_size),
                              torch.zeros(1, 1, my_first_lstm.hidden_size))
# get prediction for the next element in the sequence
y_hat = my_first_lstm(local_input_sequence)
# get the names of the parameters in the network
params = dict(my_first_lstm.named_parameters())
# add names for the other tensors
params["y_hat"] = y_hat
# plot computational graph!
make_dot(y_hat, params=params)

The plot looks a bit mysterious at first, but it becomes clearer after reading the documentation: the 'ih' and 'hh' naming convention for weights and biases stands for 'input-to-hidden' and 'hidden-to-hidden' respectively, which means that the parameters are grouped in tensors, based on whether they are between the input at time $t$ and the hidden part $h_{t+1}$ or just between $h_t$ and $h_{t+1}$. They are (probably) stored in this way to improve computational efficiency.