# Encoder-Decoder Architecture for seq2seq

Task: translate Roman Numbers into Arabic Numbers




In [39]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
import torch
import utils.neural_nets as net
from roman_arabic_numerals import conv

## Build the Dataset and the DataLoader

In [2]:
data = pd.read_csv('./data/roman-numbers/classification.csv')
data.head(2)

Unnamed: 0,sequence,target
0,___CXXXIII,odd
1,____CXCVII,odd


In [3]:
numbers = [x.replace('_', '') for x in data.sequence.values]
arabic = [str(conv.rom_arab(n)) for n in numbers]
numbers[:3], arabic[:3]

(['CXXXIII', 'CXCVII', 'XCIX'], ['133', '197', '99'])

In [8]:
SOS_token = 0 # Start of String
EOS_token = 1 # End of String

class Lang:
    """Utility class that indexes tokens (for roman numbers, letters)"""
    def __init__(self, name):
        self.name = name
        self.letter2index = {}
        self.letter2count = {}
        self.index2letter = {SOS_token: "SOS", EOS_token: "EOS"}
        self.n_letters = 2

    def add_word(self, word):
        for c in word:
            self.add_letter(c)

    def add_letter(self, letter):
        if letter not in self.letter2index:
            self.letter2index[letter] = self.n_letters
            self.letter2count[letter] = 1
            self.index2letter[self.n_letters] = letter
            self.n_letters += 1
        else:
            self.letter2count[letter] += 1

# Define two vocabularies
roman_vocabulary, arabic_vocabulary = Lang(name='roman'), Lang('arabic')
for i, roman in enumerate(numbers):
    roman_vocabulary.add_word(roman)
    arabic_vocabulary.add_word(arabic[i])

In [31]:
# Define into and target
input_vocabulary = roman_vocabulary
target_vocabulary = arabic_vocabulary

def indexes_from_word(lang, word):
    """Translates a string into a sequence of integers, using lang"""
    return [lang.letter2index[c] for c in word]

def word_tensor(lang, word):
    """Produces the tensor of a word, using lang"""
    indexes = indexes_from_word(lang, word)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long).view(1, -1)

def pair_tensors(pair):
    """"Produces tensors for a pair <input, target>"""
    input_tensor = word_tensor(input_vocabulary, pair[0])
    target_tensor = word_tensor(target_vocabulary, pair[1])
    return (input_tensor, target_tensor)

def get_dataloader(batch_size):
    """
    Builds the Dataloader for the roman to arabic task.
    The Dataloader returns pairs of tensors for <input, target> pairs.
    """
    pairs = [(n, arabic[i]) for i, n in enumerate(numbers)]

    n = len(pairs)
    input_ids = np.zeros((n, net.MAX_LENGTH), dtype=np.int32)
    target_ids = np.zeros((n, net.MAX_LENGTH), dtype=np.int32)

    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexes_from_word(input_lang, inp)
        tgt_ids = indexes_from_word(output_lang, tgt)
        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, :len(inp_ids)] = inp_ids
        target_ids[idx, :len(tgt_ids)] = tgt_ids

    train_data = TensorDataset(torch.LongTensor(input_ids), torch.LongTensor(target_ids))
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
    return roman_vocabulary, arabic_vocabulary, train_dataloader

input_lang, output_lang, train_dataloader = get_dataloader(batch_size=4)

In [30]:
# The mini batch is represented as 8 tensors:
# - 4 tensors represent the inputs, with each input being a list of indexes
# - 4 tensors represent the targets, with each input being a list of indexes
for x in train_dataloader:
    print(x)
    break

[tensor([[2, 6, 3, 3, 1, 0, 0, 0, 0, 0],
        [2, 2, 3, 3, 5, 1, 0, 0, 0, 0],
        [2, 3, 2, 5, 4, 4, 1, 0, 0, 0],
        [2, 2, 3, 4, 3, 1, 0, 0, 0, 0]]), tensor([[ 2,  5, 10,  1,  0,  0,  0,  0,  0,  0],
        [ 7,  7,  6,  1,  0,  0,  0,  0,  0,  0],
        [ 2,  4,  5,  1,  0,  0,  0,  0,  0,  0],
        [ 7,  2,  4,  1,  0,  0,  0,  0,  0,  0]])]


## Train the Encoder-Decoder Network

In [38]:
def train_epoch(
        dataloader,
        encoder,
        decoder,
        encoder_optimizer,
        decoder_optimizer,
        criterion
):
    """Runs a training epoch over the entire dataset."""
    total_loss = 0
    for input_tensor, target_tensor in dataloader:
        # Make sure gradients have been reset
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        # Apply the encoder to embed the input sequence (the roman number)
        encoder_outputs, encoder_hidden = encoder(input_tensor)
        # Apply the decoder to generate the corresponding arabic number (note that the decoder receives the hidden state of the encoder)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

        # Gradient Descent and Back Propagation
        loss = criterion(
            decoder_outputs.view(-1, decoder_outputs.size(-1)),
            target_tensor.view(-1)
        )
        loss.backward()
        encoder_optimizer.step()
        decoder_optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def train(
        train_dataloader,
        encoder,
        decoder,
        n_epochs,
        learning_rate=0.001,
        plot_every=100
):
    """Trains the Encoder-Decoder Network on the Training DataLoader."""
    history = []
    plot_loss_total = 0
    encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=learning_rate)
    criterion = nn.NLLLoss()
    run = list(range(n_epochs))
    for epoch in tqdm(run):
        loss = train_epoch(train_dataloader, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion)
        plot_loss_total += loss

        if epoch % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            history.append(plot_loss_avg)
            plot_loss_total = 0
    return history

In [None]:
# Run the Training

hidden_size = 16
batch_size = 4
epochs = 50

input_lang, output_lang, train_dataloader = get_dataloader(batch_size)

encoder = net.EncoderRNN(input_lang.n_letters, hidden_size)
decoder = net.DecoderRNN(hidden_size, output_lang.n_letters)

history = train(train_dataloader, encoder, decoder, epochs, plot_every=5)

## Evaluation

In [41]:
def evaluate(encoder, decoder, word, input_lang, output_lang):
    with torch.no_grad():
        input_tensor = word_tensor(input_lang, word=word)

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden)

        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()

        decoded_letters = []
        for idx in decoded_ids:
            if idx.item() == EOS_token:
                decoded_letters.append('<EOS>')
                break
            decoded_letters.append(output_lang.index2letter[idx.item()])
    return decoded_letters, decoder_attn

In [48]:
encoder.eval()
decoder.eval()
test = 'XI'
out, attn = evaluate(encoder, decoder, test, roman_vocabulary, arabic_vocabulary)
print(out)

['<EOS>']


In [49]:
attention = attn[0].detach().numpy()
attention.shape, attention

((10, 3),
 array([[0.01353303, 0.16599102, 0.820476  ],
        [0.09213254, 0.17835926, 0.72950816],
        [0.01152237, 0.11995567, 0.8685219 ],
        [0.0175138 , 0.13351867, 0.8489675 ],
        [0.02106348, 0.13179219, 0.84714437],
        [0.02053656, 0.12986624, 0.84959716],
        [0.01949414, 0.12828569, 0.8522202 ],
        [0.01855664, 0.12699355, 0.8544498 ],
        [0.01780397, 0.12594481, 0.8562512 ],
        [0.01721337, 0.12509772, 0.85768884]], dtype=float32))