# Recurrent Neural Networks (RNNs)
A recurrent neural network contains neurons with feedback loops. They are very good for using serial data.

Let see an example. Supose we have sequences of numbers and we want to estimate the next one.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
all = torch.arange(6).repeat(1000, 1)*torch.randn((1000, 1))
Xs = all[:, 0:-1]
Ys = all[:,-1]
print(Xs.shape, Ys.shape)
Xs[:5], Ys[:5]

In [None]:
# weights for the inputs. Inputs are used while training one by one
input_weights = nn.Linear(1, 1)
recurrent_weights = nn.Linear(1, 1, bias=False)

print(input_weights.weight), print(input_weights.bias)
print(recurrent_weights.weight)

batch = Xs[:5]
batch

In [None]:
output = torch.tanh(input_weights(batch[:, 0].unsqueeze(1)))
output.shape, output

In [None]:
output = torch.tanh(recurrent_weights(output) + input_weights(batch[:, 1].unsqueeze(1)))
output.shape, output

The process is repeated for all the inputs, stored each one as a matrix column.

We can add more RNN neurons to the system, adding a Linear module to combine the results

Since all the operations are differentiable, we can learn from data with this model

In [None]:
# Define the model for training
class Model(nn.Module):
    def __init__(self, num_neurons=5):
        super(Model, self).__init__()
        self.num_neurons = num_neurons
        self.input_weights = nn.Linear(1, num_neurons)
        self.recurrent_weights = nn.Linear(num_neurons, num_neurons, bias=False)
        self.fc = nn.Linear(num_neurons, 1)

    def forward(self, xs):
        batch, cols = xs.shape
        output = torch.zeros((batch, self.num_neurons))

        for i in range(cols):
            in_value = xs[:, i].unsqueeze(1)
            output = torch.tanh(self.input_weights(in_value) + self.recurrent_weights(output))

        return self.fc(output)
    
model = Model()
sum([n.nelement() for n in model.parameters()])

In [None]:
model(Xs[:10])

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

# Training loop
num_epochs = 500
for epoch in range(num_epochs):
    model.train()

    outputs = model(Xs)
    loss = loss_fn(outputs.squeeze(), Ys)

    optimizer.zero_grad()
    loss.backward()
        
    optimizer.step()

    if epoch % (num_epochs // 10) == 0:
        print(epoch, loss.item())

print("Training complete")

In [None]:
all_test = torch.arange(6).repeat(200, 1)*torch.randn((200, 1))
Xs_test = all_test[:, 0:-1]
Ys_test = all_test[:,-1]

In [None]:
with torch.no_grad():
    output = model(Xs_test)
    loss = loss_fn(output.squeeze(), Ys_test)
print('Testing loss', loss)

In [None]:
output[:5], Ys_test[:5]

In this example, each value in the training dataset contains a single feature, but in general it can contains as much as necessary. For example, consider that we have trajectories in the 2D plane, and we want to calculate the next point based on previous points. In this case, every point is described by 2 features, the X and Y position.

Lets first generate the dataset.

In [None]:
# The trajectory is creating by y = ax + b, with a and b random
a = torch.randn((1200, 1))
b = torch.randn((1200, 1)) * 5 
x = torch.arange(6).repeat((1200, 1))
y = a * x + b

In [None]:
x[:5], y[:5]

In [None]:
dataset = torch.stack([x, y], dim=2)
dataset.shape

In [None]:
dataset[:2]

In [None]:
# Split in Xs and Ys
Xs = dataset[:, :-1, :]
Ys = dataset[:, -1, :]
Xs.shape, Ys.shape

In [None]:
# Split in train and test
Xs_train, Ys_train = Xs[:1000], Ys[:1000]
Xs_test, Ys_test = Xs[1000:], Ys[1000:]
Xs_train.shape, Ys_train.shape, Xs_test.shape, Ys_test.shape

Now, we need to provide to the RNN network pairs of features.

In [None]:
# Define the model for training
class Model2D(nn.Module):
    def __init__(self, num_neurons=10):
        super(Model2D, self).__init__()
        self.num_neurons = num_neurons
        self.input_weights = nn.Linear(2, num_neurons)
        self.recurrent_weights = nn.Linear(num_neurons, num_neurons, bias=False)
        self.fc = nn.Linear(num_neurons, 2)

    def forward(self, xs):
        batch, cols, _ = xs.shape
        output = torch.zeros((batch, 1, self.num_neurons))

        for i in range(cols):
            in_value = xs[:, i, :].unsqueeze(1)
            output = torch.tanh(self.input_weights(in_value) + self.recurrent_weights(output))

        return self.fc(output).squeeze()
    
model2d = Model2D()
sum([n.nelement() for n in model2d.parameters()])

In [None]:
estimated = model2d(Xs_train[:20])
estimated.shape, Ys_train[:20].shape

In [None]:
loss_fn = nn.MSELoss()
loss_fn(estimated, Ys_train[:20])

In [None]:
optimizer = optim.Adam(model2d.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    model2d.train()

    outputs = model2d(Xs_train)
    loss = loss_fn(outputs, Ys_train)

    optimizer.zero_grad()
    loss.backward()
        
    optimizer.step()

    if epoch % (num_epochs // 10) == 0:
        print(epoch, loss.item())

print("Training complete")

In [None]:
with torch.no_grad():
    output = model2d(Xs_test)
    loss = loss_fn(output, Ys_test)
print('Testing loss', loss)

In [None]:
output[:5], Ys_test[:5]

Torch contains a module that directly implements the RNN neuron.

In [None]:
# Define the model for training
class Model2DStd(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Model2DStd, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.device = None

    def forward(self, xs):
        batch_size, _, _ = xs.shape
        if self.device is None:
            h0 = torch.zeros(1, batch_size, self.hidden_size)  
        else:
            h0 = torch.zeros(1, batch_size, self.hidden_size, device=self.device)
        out, hn = self.rnn(xs, h0)  
        out = self.fc(hn.squeeze(0))  
        return out
    
    def to(self, device):
        super().to(device)
        self.device = device
        return self
    
model2d_std = Model2DStd(2, 10, 2)
sum([n.nelement() for n in model2d_std.parameters()])

In [None]:
estimated = model2d_std(Xs_train[:20])
estimated.shape, Ys_train[:20].shape

In [None]:
optimizer = optim.Adam(model2d_std.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    model2d_std.train()

    outputs = model2d_std(Xs_train)
    loss = loss_fn(outputs, Ys_train)

    optimizer.zero_grad()
    loss.backward()
        
    optimizer.step()

    if epoch % (num_epochs // 10) == 0:
        print(epoch, loss.item())

print("Training complete")

In [None]:
with torch.no_grad():
    output = model2d_std(Xs_test)
    loss = loss_fn(output, Ys_test)
print('Testing loss', loss)

## Limits of RNN networks

Since RNN might contains large sequences of chained operations, the information contained in the first inputs can be forgotten. 

Lets see an example.

In [None]:
SEQ_LENGTH = 100
Xs = torch.randn((1200, SEQ_LENGTH, 1))
Ys = Xs[:, -5] + Xs[:, -1]

Xs_train, Ys_train = Xs[:1000], Ys[:1000]
Xs_test, Ys_test = Xs[1000:], Ys[1000:]
Xs_train.shape, Ys_train.shape, Xs_test.shape, Ys_test.shape

In [None]:
model_add = Model2DStd(1, SEQ_LENGTH, 1)
sum([n.nelement() for n in model_add.parameters()])

In [None]:
model_add(Xs_train[:10]).shape

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
model_device = model_add.to(device)
Xs_train_device = Xs_train.to(device)
Ys_train_device = Ys_train.to(device)
Xs_test_device = Xs_test.to(device)
Ys_test_device = Ys_test.to(device)

optimizer = optim.Adam(model_device.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

num_epochs = 100
for epoch in range(num_epochs):
    model_device.train()

    outputs = model_device(Xs_train_device)
    loss = loss_fn(outputs, Ys_train_device)

    optimizer.zero_grad()
    loss.backward()
        
    optimizer.step()

    if epoch % (num_epochs // 10) == 0:
        with torch.no_grad():
            output_tst = model_device(Xs_test_device)
            loss_test = loss_fn(output_tst, Ys_test_device)

        print(epoch, "Loss train:", loss.item(), "Loss test:", loss_test.item())

print("Training complete")

Lets see what happen now when the model needs to remember a very early value.

In [None]:
SEQ_LENGTH = 100
Xs = torch.randn((1200, SEQ_LENGTH, 1))
Ys = Xs[:, 0] + Xs[:, -1]

Xs_train, Ys_train = Xs[:1000], Ys[:1000]
Xs_test, Ys_test = Xs[1000:], Ys[1000:]

model_add = Model2DStd(1, 10, 1)

model_device = model_add.to(device)
Xs_train_device = Xs_train.to(device)
Ys_train_device = Ys_train.to(device)
Xs_test_device = Xs_test.to(device)
Ys_test_device = Ys_test.to(device)

optimizer = optim.Adam(model_device.parameters(), lr=0.01)
loss_fn = nn.MSELoss()

num_epochs = 1000
for epoch in range(num_epochs):
    model_device.train()

    outputs = model_device(Xs_train_device)
    loss = loss_fn(outputs, Ys_train_device)

    optimizer.zero_grad()
    loss.backward()
        
    optimizer.step()

    if epoch % (num_epochs // 10) == 0:
        with torch.no_grad():
            output_tst = model_device(Xs_test_device)
            loss_test = loss_fn(output_tst, Ys_test_device)

        print(epoch, "Loss train:", loss.item(), "Loss test:", loss_test.item())

print("Training complete")

There are other types of recurrent networks types created to alleviate this problem:
- Gated Recurrent Unit (GRU)
- Long-Short Term Memory (LSTM)