In [25]:
import torch
import torch.nn as nn
import torch.optim as optim

import os
import json
import time
import random
import numpy as np
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, f1_score, classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score

from IPython.display import Image

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
seed = 42

## Double-copy task dataset

The entire dataset comprises of the binary representation of all numbers uptil a range defined. The binary sequence from left to right (most significant to least significant) is the input. The target is just the reverse sequence.

In [26]:
import copy

# Generating data
state_size = 12
data_x = []
for i in range(pow(2, state_size)):
    data_x.append([int(x) for x in list(np.binary_repr(i, width=state_size))])
data_x = np.array(data_x)
data_x.shape

(4096, 12)

In [27]:
# Reshaping for tensors
data_x = np.transpose(data_x).reshape(state_size, pow(2, state_size), 1)
data_x = torch.from_numpy(data_x).float()
data_x = torch.zeros(data_x.shape[0], data_x.shape[1], 2).scatter_(2, data_x.long(), 1)
data_y = data_x.clone()
data_x.shape, data_y.shape

(torch.Size([12, 4096, 2]), torch.Size([12, 4096, 2]))

In [28]:
# Doubling the target sequence
data_y = torch.cat((data_y, data_y), dim=0)

In [29]:
# Creating training and test sets
train_size = 0.75
ordering = torch.randperm(pow(2, state_size))
data_x = data_x[:, ordering, :]
data_y = data_y[:, ordering, :]
train_x = data_x[:,:int(train_size * len(ordering)),:]
train_y = data_y[:,:int(train_size * len(ordering)),:]
test_x = data_x[:,int(train_size * len(ordering)):,:]
test_y = data_y[:,int(train_size * len(ordering)):,:]

# Creating training and validation sets
## TODO

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

torch.Size([12, 3072, 2]) torch.Size([24, 3072, 2]) torch.Size([12, 1024, 2]) torch.Size([24, 1024, 2])


## Modelling

In [30]:
# Input dim
input_dim = 2
# Number of hidden nodes
hidden_dim = 16
# Number of output nodes
output_dim = 2
# Number of LSTMs cells to be stacked
layers = 1
# Boolean value for bidirectioanl or not
bidirectional = True
# Boolean value to use LayerNorm or not
layernorm = False

batch_size = 8
# Percentage of training data
learning_rate = 0.001
epochs = 100

device = "cuda"

In [31]:
def train(model, train_x, train_y, test_x, test_y, epochs, loss_fn, optimizer, teacher_forcing=0.5):
    train_size = train_x.shape[1]
    device = torch.device("cpu")
    if train_x.is_cuda:
        device = torch.device("cuda")
    layers = model.layers
    hidden_dim = model.hidden_dim
    for i in range(1, epochs + 1):
        model.train()
        loss_tracker = []
        ordering = torch.randperm(train_size)
        train_x = train_x[:,ordering,:]
        train_y = train_y[:,ordering,:]
        for j in range(int(float(train_size)/batch_size) + 1):
            optimizer.zero_grad()
            start = j*batch_size
            end = min((j+1)*batch_size, train_size)
            batch = end - start
            if batch is 0:
                continue
            if model.bidirectional:
                hidden_state = torch.zeros(2 * layers, batch, hidden_dim).to(device)
                cell_state = torch.zeros(2 * layers, batch, hidden_dim).to(device)
            else:
                hidden_state = torch.zeros(layers, batch, hidden_dim).to(device)
                cell_state = torch.zeros(layers, batch, hidden_dim).to(device)
            o = model(train_x[:,start:end,:], train_y[:,start:end,:], hidden_state, 
                      cell_state, teacher_forcing)
            gt = torch.argmax(train_y[:,start:end,:], 2, keepdim=True).view(-1)
            loss = loss_fn(o.view(-1, 2), gt)
            loss_tracker.append(loss.item())
            loss.backward()
            optimizer.step()
            print("Epoch #{:<3d}: Batch {:>3d}/{:<3d} -- "
                  "Loss: {:2.5}".format(i, j+1, int(train_size/batch_size), 
                                        loss_tracker[-1]), end='\r')
        print()
        f1_train = evaluate(model, train_x, train_y)
        f1_test = evaluate(model, test_x, test_y)
        print("Average Loss: {:2.6}".format(np.mean(loss_tracker)))
        print("Training F1: {:3.4}".format(f1_train))
        print("Test F1: {:3.4}".format(f1_test))
        print("=" * 50)
    
    return model


def evaluate(model, x, y):
    model.eval()
    test_size = x.shape[1]
    device = torch.device("cpu")
    if x.is_cuda:
        device = torch.device("cuda")
    layers = model.layers
    hidden_dim = model.hidden_dim
    labels = []
    preds = []
    for j in range(int(test_size/batch_size) + 1):
        optimizer.zero_grad()
        start = j*batch_size
        end = min((j+1)*batch_size, test_size)
        batch = end - start
        if batch == 0:
            continue
        if model.bidirectional:
            hidden_state = torch.zeros(2 * layers, batch, hidden_dim).to(device)
            cell_state = torch.zeros(2 * layers, batch, hidden_dim).to(device)
        else:
            hidden_state = torch.zeros(layers, batch, hidden_dim).to(device)
            cell_state = torch.zeros(layers, batch, hidden_dim).to(device)
        with torch.no_grad():
            o = model(x[:,start:end,:], y[:,start:end,:], hidden_state, cell_state, teacher_forcing=0)
        pred = torch.argmax(o, 2, keepdim=True).view(-1).cpu().detach().numpy()
        preds.extend(pred)
        label = torch.argmax(y[:,start:end,:], 2, 
                             keepdim=True).view(-1).cpu().detach().numpy()
        labels.extend(label)
    return f1_score(labels, preds)

## Our implementation

In [77]:
from lstm import LSTM

class LSTMSeq2SeqDifferent(nn.Module):
    """ LSTM Class for Sequence Labelling (many-to-many-different)

    The class creates the LSTM architecture as specified by the parameters.
    A fully connected layer is added to reduce the last hidden state to output_dim.

    Parameters
    ==========
    vocab_len: int from imdb dataset
    embed_dim: dimensions of the embeddings
    hidden_dim: number of hidden nodes required
    output_dim: numer of output nodes required (1 for sentiment analysis)
    pretrained_vec: weights from imdb object
    layers: number of LSTM cells to be stacked for depth
    bidirectional: boolean
    layernorm: boolean

    """
    def __init__(self, input_dim, hidden_dim, output_dim, layers=1,
                 bidirectional=False, layernorm=False):
        super().__init__()

        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.layers = layers
        self.bidirectional = bidirectional
        self.layernorm = layernorm

        self.encoder = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, layers=layers,
                         bidirectional=bidirectional, layernorm=layernorm)
        if self.bidirectional:
            self.decoder = LSTM(input_dim=output_dim, hidden_dim=2 * hidden_dim, layers=layers,
                                bidirectional=False, layernorm=layernorm)
            self.fc = nn.Linear(2 * hidden_dim, output_dim)
        else:
            self.decoder = LSTM(input_dim=output_dim, hidden_dim=hidden_dim, layers=layers,
                                bidirectional=False, layernorm=layernorm)
            self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, target, hidden_state, cell_state, teacher_forcing=0.5):
        device = 'cpu'
        if x.is_cuda:
            device = 'cuda'
        # encoding
        _, (hidden_state, cell_state) = self.encoder(x, hidden_state, cell_state)
        batch_size = x.shape[1]
        timesteps = target.shape[0]
        x = torch.zeros(1, batch_size, self.output_dim).to(device)
        output = torch.tensor([]).to(device)
        if self.bidirectional:
            # concatenating hidden states from two directions
            hidden_state = torch.cat((hidden_state[:self.layers,:,:], 
                                      hidden_state[self.layers:,:,:]), dim=2)
            cell_state = torch.cat((cell_state[:self.layers,:,:], 
                                    cell_state[self.layers:,:,:]), dim=2)
        # decoding
        for t in range(timesteps):           
            x, (hidden_state, cell_state) = self.decoder(x, hidden_state, cell_state)            
            x = self.softmax(self.fc(x))
            output = torch.cat((output, x), dim=0)
            choice = random.random() 
            if choice < teacher_forcing:
                x = target[t].float().to(device)
                x = x.unsqueeze(0)
            else:
                # converting x to a one-hot encoding
                x = torch.zeros(x.shape).to(device).scatter_(2, torch.argmax(x, -1, keepdim=True), 1)
        return output

    def save(self, file_path='./model.pkl'):
        torch.save(self.state_dict(), file_path)

    def load(self, file_path):
        self.load_state_dict(torch.load(file_path))

    def count_parameters(self):
        tot_sum = sum(p.numel() for p in self.encoder.parameters() if p.requires_grad)
        tot_sum += sum(p.numel() for p in self.decoder.parameters() if p.requires_grad)
        tot_sum += sum(p.numel() for p in self.fc.parameters() if p.requires_grad)
        return tot_sum


In [78]:
our = LSTMSeq2SeqDifferent(input_dim, hidden_dim, output_dim, bidirectional=bidirectional, layers=layers).to(device)
print(our.count_parameters())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(our.parameters(), lr=learning_rate)

6978


In [79]:
train_x = train_x.to(device)
train_y = train_y.to(device)
test_x = test_x.to(device)
test_y = test_y.to(device)

our = train(our, train_x, train_y, test_x, test_y, epochs=30, loss_fn=loss_fn, optimizer=optimizer, teacher_forcing=0.5)

Epoch #1  : Batch 384/384 -- Loss: 0.63436
Average Loss: 0.662022
Training F1: 0.6174
Test F1: 0.613
Epoch #2  : Batch 384/384 -- Loss: 0.59385
Average Loss: 0.608296
Training F1: 0.6777
Test F1: 0.673
Epoch #3  : Batch 384/384 -- Loss: 0.58093
Average Loss: 0.572714
Training F1: 0.7093
Test F1: 0.7006
Epoch #4  : Batch 384/384 -- Loss: 0.56853
Average Loss: 0.560965
Training F1: 0.711
Test F1: 0.7035
Epoch #5  : Batch 384/384 -- Loss: 0.52903
Average Loss: 0.550854
Training F1: 0.7194
Test F1: 0.7151
Epoch #6  : Batch 384/384 -- Loss: 0.51697
Average Loss: 0.538282
Training F1: 0.7135
Test F1: 0.7054
Epoch #7  : Batch 384/384 -- Loss: 0.53981
Average Loss: 0.529668
Training F1: 0.7132
Test F1: 0.7079
Epoch #8  : Batch 384/384 -- Loss: 0.48811
Average Loss: 0.520341
Training F1: 0.7331
Test F1: 0.7306
Epoch #9  : Batch 384/384 -- Loss: 0.49353
Average Loss: 0.504197
Training F1: 0.7712
Test F1: 0.7704
Epoch #10 : Batch 384/384 -- Loss: 0.46452
Average Loss: 0.482569
Training F1: 0.7874

## PyTorch implementation

In [80]:
class PyTorchBaseline(nn.Module):
    """ LSTM Class for Sequence Labelling (many-to-many-different)

    The class creates the LSTM architecture as specified by the parameters.
    A fully connected layer is added to reduce the last hidden state to output_dim.

    Parameters
    ==========
    vocab_len: int from imdb dataset
    embed_dim: dimensions of the embeddings
    hidden_dim: number of hidden nodes required
    output_dim: numer of output nodes required (1 for sentiment analysis)
    pretrained_vec: weights from imdb object
    layers: number of LSTM cells to be stacked for depth
    bidirectional: boolean
    layernorm: boolean

    """
    def __init__(self, input_dim, hidden_dim, output_dim, layers=1,
                 bidirectional=False, layernorm=False):
        super().__init__()

        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.layers = layers
        self.bidirectional = bidirectional
        self.layernorm = layernorm

        self.encoder = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, num_layers=layers,
                         bidirectional=bidirectional) #, layernorm=layernorm)
        if self.bidirectional:
            self.decoder = nn.LSTM(input_size=output_dim, hidden_size=2 * hidden_dim, num_layers=layers,
                                bidirectional=False) #, layernorm=layernorm)
            self.fc = nn.Linear(2 * hidden_dim, output_dim)
        else:
            self.decoder = nn.LSTM(input_size=output_dim, hidden_size=hidden_dim, num_layers=layers,
                                bidirectional=False) #, layernorm=layernorm)
            self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, target, hidden_state, cell_state, teacher_forcing=0.5):
        device = 'cpu'
        if x.is_cuda:
            device = 'cuda'
        # encoding
        _, (hidden_state, cell_state) = self.encoder(x, (hidden_state, cell_state))
        batch_size = x.shape[1]
        timesteps = target.shape[0]
        x = torch.zeros(1, batch_size, self.output_dim).to(device)
        output = torch.tensor([]).to(device)
        if self.bidirectional:
            # concatenating hidden states from two directions
            hidden_state = torch.cat((hidden_state[:self.layers,:,:], 
                                      hidden_state[self.layers:,:,:]), dim=2)
            cell_state = torch.cat((cell_state[:self.layers,:,:], 
                                    cell_state[self.layers:,:,:]), dim=2)
        # decoding
        for t in range(timesteps):           
            x, (hidden_state, cell_state) = self.decoder(x, (hidden_state, cell_state))
            x = self.softmax(self.fc(x))
            output = torch.cat((output, x), dim=0)
            choice = random.random() 
            if choice < teacher_forcing:
                x = target[t].float().to(device)
                x = x.unsqueeze(0)
            else:
                # converting x to a one-hot encoding
                x = torch.zeros(x.shape).to(device).scatter_(2, torch.argmax(x, -1, keepdim=True), 1)
        return output

    def save(self, file_path='./model.pkl'):
        torch.save(self.state_dict(), file_path)

    def load(self, file_path):
        self.load_state_dict(torch.load(file_path))

    def count_parameters(self):
        tot_sum = sum(p.numel() for p in self.encoder.parameters() if p.requires_grad)
        tot_sum += sum(p.numel() for p in self.decoder.parameters() if p.requires_grad)
        tot_sum += sum(p.numel() for p in self.fc.parameters() if p.requires_grad)
        return tot_sum


In [81]:
pytorch = PyTorchBaseline(input_dim, hidden_dim, output_dim, layers, bidirectional).to(device)
print(pytorch.count_parameters())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(pytorch.parameters(), lr=learning_rate)

7234


In [82]:
train_x = train_x.to(device)
train_y = train_y.to(device)
test_x = test_x.to(device)
test_y = test_y.to(device)

pytorch = train(pytorch, train_x, train_y, test_x, test_y, epochs=30, loss_fn=loss_fn, optimizer=optimizer, teacher_forcing=0.5)

Epoch #1  : Batch 384/384 -- Loss: 0.58635
Average Loss: 0.662501
Training F1: 0.6114
Test F1: 0.5975
Epoch #2  : Batch 384/384 -- Loss: 0.55478
Average Loss: 0.615947
Training F1: 0.6668
Test F1: 0.6574
Epoch #3  : Batch 384/384 -- Loss: 0.56453
Average Loss: 0.579507
Training F1: 0.7062
Test F1: 0.7039
Epoch #4  : Batch 384/384 -- Loss: 0.53218
Average Loss: 0.567879
Training F1: 0.7181
Test F1: 0.7135
Epoch #5  : Batch 384/384 -- Loss: 0.56569
Average Loss: 0.560036
Training F1: 0.7255
Test F1: 0.7157
Epoch #6  : Batch 384/384 -- Loss: 0.55177
Average Loss: 0.55057
Training F1: 0.7182
Test F1: 0.7124
Epoch #7  : Batch 384/384 -- Loss: 0.55446
Average Loss: 0.541844
Training F1: 0.7208
Test F1: 0.7135
Epoch #8  : Batch 384/384 -- Loss: 0.54068
Average Loss: 0.535399
Training F1: 0.7325
Test F1: 0.7259
Epoch #9  : Batch 384/384 -- Loss: 0.53794
Average Loss: 0.527226
Training F1: 0.7458
Test F1: 0.7413
Epoch #10 : Batch 384/384 -- Loss: 0.55906
Average Loss: 0.519745
Training F1: 0.74

In [63]:
print("Our implementation\n{}".format("=" * len("Our implementation")))
print("# of parameters: {}".format(our.count_parameters()))
for name, param in our.named_parameters():
    print("{:<25}: {}".format(name, param.shape))

Our implementation
# of parameters: 6978
encoder.model.0.weights  : torch.Size([18, 64])
encoder.model.0.bias     : torch.Size([64])
encoder.model_rev.0.weights: torch.Size([18, 64])
encoder.model_rev.0.bias : torch.Size([64])
decoder.model.0.weights  : torch.Size([34, 128])
decoder.model.0.bias     : torch.Size([128])
fc.weight                : torch.Size([2, 32])
fc.bias                  : torch.Size([2])


In [64]:
print("PyTorch implementation\n{}".format("=" * len("PyTorch implementation")))
print("# of parameters: {}".format(pytorch.count_parameters()))
for name, param in pytorch.named_parameters():
    print("{:<30}: {}".format(name, param.shape))

PyTorch implementation
# of parameters: 7234
encoder.weight_ih_l0          : torch.Size([64, 2])
encoder.weight_hh_l0          : torch.Size([64, 16])
encoder.bias_ih_l0            : torch.Size([64])
encoder.bias_hh_l0            : torch.Size([64])
encoder.weight_ih_l0_reverse  : torch.Size([64, 2])
encoder.weight_hh_l0_reverse  : torch.Size([64, 16])
encoder.bias_ih_l0_reverse    : torch.Size([64])
encoder.bias_hh_l0_reverse    : torch.Size([64])
decoder.weight_ih_l0          : torch.Size([128, 2])
decoder.weight_hh_l0          : torch.Size([128, 32])
decoder.bias_ih_l0            : torch.Size([128])
decoder.bias_hh_l0            : torch.Size([128])
fc.weight                     : torch.Size([2, 32])
fc.bias                       : torch.Size([2])


PyTorch uses $Wh + b_h + Wx + b_x$ whereas we are using $Wx' + b$, where $x'$ is $h, x$ concatenated. Therefore PyTorch has an extra set of biases for each direction for the encoder and also for the decoder.

For one direction - 64 <br>
For reverse direction - 64 <br>
For the decoder - 128 <br>

Our model has $6978$ parameters while the PyTorch model has $6978 + 64 + 64 + 128 = 7234$ parameters.