# Count 1s in a sequence using LSTM and GRU cells

In this example, we'll use PyTorch to implement LSTM and GRU cells (but we won't use the default implementations). We'll use them to count the number of 1s in a binary sequence of 0s and 1s.

Let's start with the imports:

In [2]:
import math
import numpy as np
import torch
import typing

Next, we'll define some parameters of the networks and the training process:

In [12]:
EPOCHS = 10  # training epochs
TRAINING_SAMPLES = 10000  # training dataset size
BATCH_SIZE = 16  # mini batch size
TEST_SAMPLES = 1000  # test dataset size
SEQUENCE_LENGTH = 20  # binary sequence length
HIDDEN_UNITS = 20  # hidden units of the LSTM cell

Next, let's implement a basic `LSTMCell` as a subclass of `torch.nn.Module`. The cell implementaion process only a single element of the sequence. Later, we'll include it in a larger recurrent module for processing whole sequences.

In [3]:
class LSTMCell(torch.nn.Module):

    def __init__(self, input_size: int, hidden_size: int):
        """
        :param input_size: input vector size
        :param hidden_size: cell state vector size
        """

        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # combine all gates in a single matrix multiplication
        self.x_fc = torch.nn.Linear(input_size, 4 * hidden_size)
        self.h_fc = torch.nn.Linear(hidden_size, 4 * hidden_size)

        self.reset_parameters()

    def reset_parameters(self):
        """Xavier initialization """
        size = math.sqrt(3.0 / self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-size, size)

    def forward(self,
                x_t: torch.Tensor,
                hidden: typing.Tuple[torch.Tensor, torch.Tensor] = (None, None)) \
            -> typing.Tuple[torch.Tensor, torch.Tensor]:
        h_t_1, c_t_1 = hidden  # t_1 is equivalent to t-1

        # in case of more than 2-dimensional input
        # flatten the tensor (similar to numpy.reshape)
        x_t = x_t.view(-1, x_t.size(1))
        h_t_1 = h_t_1.view(-1, h_t_1.size(1))
        c_t_1 = c_t_1.view(-1, c_t_1.size(1))

        # compute the activations of all gates simultaneously
        gates = self.x_fc(x_t) + self.h_fc(h_t_1)

        # split the input to the 4 separate gates
        i_t, f_t, candidate_c_t, o_t = gates.chunk(4, 1)

        # compute the activations for all gates
        i_t, f_t, candidate_c_t, o_t = \
            i_t.sigmoid(), f_t.sigmoid(), candidate_c_t.tanh(), o_t.sigmoid()

        # choose new state based on the input and forget gates
        c_t = torch.mul(f_t, c_t_1) + torch.mul(i_t, candidate_c_t)

        # compute the cell output
        h_t = torch.mul(o_t, c_t.tanh())

        return h_t, c_t

Next, let's implement `LSTMModel`, which contains one `LSTMCell`, but handles a whole input sequence. At each step of the sequence, `LSTMModel` returns the model prediction based on the whole sequence up to the current step:

In [5]:
class LSTMModel(torch.nn.Module):
    """LSTM model with a single output layer connected to the lstm cell output"""

    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size

        # Our own LSTM implementation
        self.lstm = LSTMCell(input_size, hidden_size)

        # Fully-connected output layer
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Start with empty network output and cell state to initialize the sequence
        c_t = torch.zeros((x.size(0), self.hidden_size)).to(x.device)
        h_t = torch.zeros((x.size(0), self.hidden_size)).to(x.device)

        # Iterate over all sequence elements across all sequences of the mini-batch
        for seq in range(x.size(1)):
            h_t, c_t = self.lstm(x[:, seq, :], (h_t, c_t))

        # Final output layer
        return self.fc(h_t)

We'll follow the same blueprint to implement `GRUCell` and `GRUModel` respecively. Let's start with the `GRUCell` implementation:

In [6]:
class GRUCell(torch.nn.Module):

    def __init__(self, input_size: int, hidden_size: int):
        """
        :param input_size: input vector size
        :param hidden_size: cell state vector size
        """

        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # x to reset gate r
        self.x_r_fc = torch.nn.Linear(input_size, hidden_size)

        # x to update gate z
        self.x_z_fc = torch.nn.Linear(input_size, hidden_size)

        # x to candidate state h'(t)
        self.x_h_fc = torch.nn.Linear(input_size, hidden_size)

        # network output/state h(t-1) to reset gate r
        self.h_r_fc = torch.nn.Linear(hidden_size, hidden_size)

        # network output/state h(t-1) to update gate z
        self.h_z_fc = torch.nn.Linear(hidden_size, hidden_size)

        # network state h(t-1) passed through the reset gate r towards candidate state h(t)
        self.hr_h_fc = torch.nn.Linear(hidden_size, hidden_size)

    def forward(self,
                x_t: torch.Tensor,
                h_t_1: torch.Tensor = None) \
            -> torch.Tensor:

        # compute update gate vector
        z_t = torch.sigmoid(self.x_z_fc(x_t) + self.h_z_fc(h_t_1))

        # compute reset gate vector
        r_t = torch.sigmoid(self.x_r_fc(x_t) + self.h_r_fc(h_t_1))

        # compute candidate state
        candidate_h_t = torch.tanh(self.x_h_fc(x_t) + self.hr_h_fc(torch.mul(r_t, h_t_1)))

        # compute cell output
        h_t = torch.mul(z_t, h_t_1) + torch.mul(1 - z_t, candidate_h_t)

        return h_t

We'll continue with the `GRUModel` class for processing whole sequences:

In [7]:
class GRUModel(torch.nn.Module):
    """LSTM model with a single output layer connected to the lstm cell output"""

    def __init__(self, input_size, hidden_size, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size

        # Our own GRU implementation
        self.gru = GRUCell(input_size, hidden_size)

        # Fully-connected output layer
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Start with empty network output and cell state to initialize the sequence
        h_t = torch.zeros((x.size(0), self.hidden_size)).to(x.device)

        # Iterate over all sequence elements across all sequences of the mini-batch
        for seq in range(x.size(1)):
            h_t = self.gru(x[:, seq, :], h_t)

        # Final output layer
        return self.fc(h_t)

Next, we'll implement the `generate_dataset` function, which generates a total of `samples` binary sequences, each with length of `sequence_length`. The function returns the sequence and it's numeric label, which indicates the number of 1s in that sequence:

In [8]:
def generate_dataset(sequence_length: int, samples: int):
    """
    Generate training/testing datasets
    :param sequence_length: length of the binary sequence
    :param samples: number of samples
    """

    sequences = list()
    labels = list()
    for i in range(samples):
        a = np.random.randint(sequence_length) / sequence_length
        sequence = list(np.random.choice(2, sequence_length, p=[a, 1 - a]))
        sequences.append(sequence)
        labels.append(int(np.sum(sequence)))

    sequences = np.array(sequences)
    labels = np.array(labels, dtype=np.int8)

    result = torch.utils.data.TensorDataset(
        torch.from_numpy(sequences).float().unsqueeze(-1),
        torch.from_numpy(labels).float())

    return result

We'll continue with the implementation of the training procedure for either `LSTMModel` or `GRUModel`. This procedure is generic and doesn't differ from similar procedures for feed-forward networks. The recurrence part is handled by PyTorch's _autodiff_ functionality within `LSTMMOdel` and `GRUModel`: 

In [9]:
def train_model(model, loss_function, optimizer, data_loader):
    # set model to training mode
    model.train()

    current_loss = 0.0
    current_acc = 0

    # iterate over the training data
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        model.zero_grad()
        optimizer.zero_grad()

        with torch.set_grad_enabled(True):
            # forward
            outputs = model(inputs).squeeze()
            loss = loss_function(outputs, labels)

            # backward
            loss.backward()
            optimizer.step()

        # statistics
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(outputs.round() == labels.data)

    total_loss = current_loss / len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)

    print('Train Loss: {:.4f}; Accuracy: {:.4f}'.format(total_loss, total_acc))

Then, we'll implement the testing procedure. As with the training, this is a generic procedure which is similar to the one for feed-forward networks, since the sequence processing is handled internally by `LSTMMOdel` and `GRUModel`:

In [10]:
def test_model(model, loss_function, data_loader):
    # set model in evaluation mode
    model.eval()

    current_loss = 0.0
    current_acc = 0

    # iterate over  the validation data
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)

        # forward
        with torch.set_grad_enabled(False):
            outputs = model(inputs).squeeze()
            loss = loss_function(outputs, labels)

        # statistics
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(outputs.round() == labels.data)

    total_loss = current_loss / len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)

    print('Test Loss: {:.4f}; Accuracy: {:.4f}'.format(total_loss, total_acc))

    return total_loss, total_acc

We can now put it all together. We'll start by instantiting the `device`, the `train_loader`, and the `test_loader`:

In [13]:
# Select device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Generate training and testing datasets
train = generate_dataset(SEQUENCE_LENGTH, TRAINING_SAMPLES)
train_loader = torch.utils.data.DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)

test = generate_dataset(SEQUENCE_LENGTH, TEST_SAMPLES)
test_loader = torch.utils.data.DataLoader(test, batch_size=BATCH_SIZE, shuffle=True)

Next, let's instantiate an `LSTMModel`:

In [14]:
# Instantiate LSTM model
# input of size 1 for digit of the sequence
# number of hidden units
# regression model output size (number of ones)
model = LSTMModel(input_size=1,
                  hidden_size=HIDDEN_UNITS,
                  output_size=1)

# Transfer the model to the GPU
model = model.to(device)

Next, we'll instantiate the training framework components...

In [15]:
# loss function (we use MSELoss because of the regression)
loss_function = torch.nn.MSELoss()

# Adam optimizer
optimizer = torch.optim.Adam(model.parameters())

... and we'll run the training:

In [16]:
for epoch in range(EPOCHS):
    print('Epoch {}/{}'.format(epoch + 1, EPOCHS))

    train_model(model, loss_function, optimizer, train_loader)
    test_model(model, loss_function, test_loader)

Epoch 1/10
Train Loss: 59.7466; Accuracy: 0.0510
Test Loss: 19.6232; Accuracy: 0.0700
Epoch 2/10
Train Loss: 6.7320; Accuracy: 0.4826
Test Loss: 1.6951; Accuracy: 0.6920
Epoch 3/10
Train Loss: 0.8296; Accuracy: 0.8031
Test Loss: 0.2831; Accuracy: 0.8570
Epoch 4/10
Train Loss: 0.1729; Accuracy: 0.8917
Test Loss: 0.0736; Accuracy: 0.9290
Epoch 5/10
Train Loss: 0.0543; Accuracy: 0.9333
Test Loss: 0.0257; Accuracy: 1.0000
Epoch 6/10
Train Loss: 0.0272; Accuracy: 0.9986
Test Loss: 0.0124; Accuracy: 1.0000
Epoch 7/10
Train Loss: 0.0155; Accuracy: 0.9998
Test Loss: 0.0125; Accuracy: 0.9980
Epoch 8/10
Train Loss: 0.0117; Accuracy: 0.9999
Test Loss: 0.0102; Accuracy: 1.0000
Epoch 9/10
Train Loss: 0.0101; Accuracy: 0.9998
Test Loss: 0.0052; Accuracy: 1.0000
Epoch 10/10
Train Loss: 0.0077; Accuracy: 1.0000
Test Loss: 0.0041; Accuracy: 1.0000


Unlike regular RNN, we can see that the LSTM network doesn't suffer from exploding gradients even with sequence length 20. <br />
Let's do the same experiment with `GRUModel`:

In [17]:
model = GRUModel(input_size=1,
                 hidden_size=HIDDEN_UNITS,
                 output_size=1)

# Transfer the model to the GPU
model = model.to(device)

# loss function (we use MSELoss because of the regression)
loss_function = torch.nn.MSELoss()

# Adam optimizer
optimizer = torch.optim.Adam(model.parameters())

# Train
for epoch in range(EPOCHS):
    print('Epoch {}/{}'.format(epoch + 1, EPOCHS))

    train_model(model, loss_function, optimizer, train_loader)
    test_model(model, loss_function, test_loader)


Epoch 1/10
Train Loss: 55.0390; Accuracy: 0.0703
Test Loss: 16.3110; Accuracy: 0.2040
Epoch 2/10
Train Loss: 7.0288; Accuracy: 0.5343
Test Loss: 2.1039; Accuracy: 0.7260
Epoch 3/10
Train Loss: 1.0462; Accuracy: 0.8011
Test Loss: 0.3727; Accuracy: 0.8660
Epoch 4/10
Train Loss: 0.2059; Accuracy: 0.8938
Test Loss: 0.0754; Accuracy: 0.9280
Epoch 5/10
Train Loss: 0.0504; Accuracy: 0.9327
Test Loss: 0.0237; Accuracy: 1.0000
Epoch 6/10
Train Loss: 0.0162; Accuracy: 0.9999
Test Loss: 0.0098; Accuracy: 1.0000
Epoch 7/10
Train Loss: 0.0062; Accuracy: 1.0000
Test Loss: 0.0045; Accuracy: 1.0000
Epoch 8/10
Train Loss: 0.0032; Accuracy: 1.0000
Test Loss: 0.0039; Accuracy: 1.0000
Epoch 9/10
Train Loss: 0.0022; Accuracy: 1.0000
Test Loss: 0.0018; Accuracy: 1.0000
Epoch 10/10
Train Loss: 0.0018; Accuracy: 1.0000
Test Loss: 0.0017; Accuracy: 1.0000


The `GRUModel` achieved 100% accuracy even sooner than the `LSTMModel`. However, this is a toy dataset and we cannot use it as a comparison for the real-world performance of the 2 models.