<a href="https://colab.research.google.com/github/Dagobert42/NMT-Attention/blob/main/Neural_Machine_Translation_with_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Neural Machine Translation with Attention

The goal of this notebook is to implement the **RNNsearch-50** model, i.e., the encoder-decoder with attention system for any language pair different from English-German, German-English, English-French, and French-English. We choose **German-Italian** as an example language pair.

Furthermore we loosely follow **test-driven development** ("TDD") paradigms to replicate the original system based on the paper:

    Bahdanau, Cho & Bengio. Neural Machine Translation by Jointly Learning to Align and Translate. ICLR 2015.

An in-depth walk-through of the project is given in the accompanying report which can be found here ADD LINK

# 1. Setup

Please choose which dependencies to install to your environment. On re-runs you can ucheck the boxes to save some time.

In [None]:
torch = True #@param {type:"boolean"}
if tf_datasets:
    !pip install --upgrade torch

torchtext = True #@param {type:"boolean"}
if torchtext:
    !pip install --upgrade torchtext

spacy_packages = True #@param {type:"boolean"}
if spacy_packs:
    !pip install --upgrade spacy
    !python -m spacy download en_core_news_sm
    !python -m spacy download de_core_news_sm
    !python -m spacy download it_core_news_sm

Next let us import all the modules we are going to be using.

In [41]:
# access to translation datasets
import torchtext

# model implementation
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

# word tokenization
import spacy

# utilities
import random
import time

# 2. Data

### 2.1 Dataset

For data we refer to the Web Inventory of Transcribed and Translated Talks which comes as a torchtext dataset.  On re-runs you can ucheck the box to save some time.

In [49]:
from torchtext.datasets import IWSLT2017

load_dataset = False #@param {type:"boolean"}
if load_dataset:
    train_iter, test_iter, val_iter = IWSLT2017(split=('train', 'test', 'valid'), language_pair=('de', 'it'))
src_sentence, trg_sentence = next(train_iter)
print('Example:\n')
print('source ->', src_sentence)
print('target ->', trg_sentence)

print('train examples:', len(train_iter), end='\t')
print('test:', len(test_iter), end='\t')
print('validation:', len(val_iter))

Example:

source -> Das meine ich ernst, teilweise deshalb -- weil ich es wirklich brauchen kann!

target -> per i tanti, lusinghieri commenti, anche perché... Ne ho bisogno!!!

train examples: 205465	test: 1567	validation: 923


### 2.2 Vocabulary

The vocabulary converts words to indeces and vice versa. Unknown words are marked with the < U > token. The vocabulary can be filtered by word counts and provides methods for converting between sequences and sentences. It also takes an optional SpaCy NLP object which (when given) is used for tokenization.

In [51]:
class Vocab:
    """
    A vocabulary holding dictionaries for converting words to indeces and back.
    """
    class Entry:
        def __init__(self, id):
            """ An entry to the vocabulary. With index and count. """
            self.id = id
            self.count = 1

        def __repr__(self):
            """ String prepresentation for printing. """
            return str((self.id, self.count))

    def __init__(self, text=None, spacy_nlp=None):
        """
        Creates a vocabulary over an input text in the form of
        dictionaries for indeces and word counts. Hand in a
        spacy_nlp object to make use of a SpaCy for tokenization.
        """
        self.SPECIALS = '<S> <E> <U>'
        self.spacy_nlp = spacy_nlp
        self.words = {}
        self.ids = {}

        # add special tokens to vocabulary
        for word in self.SPECIALS.split():
            id = len(self.words.keys())
            self.words[word] = self.Entry(id)
            self.ids[id] = word

        if text:
            self.append(text)

    def append(self, txt):
        """ Adds a string token by token to the vocabulary. """
        # use SpaCy for tokenization if requested
        if self.spacy_nlp:
            for tok in self.spacy_nlp.tokenizer(txt):
                word = tok.text
                if tok.text not in self.words.keys():
                    next_id = len(self.words.keys())
                    self.words[word] = self.Entry(next_id)
                    self.ids[next_id] = word
                else:
                    self.words[word].count += 1
        else:
            for word in txt.split():
                if word not in self.words.keys():
                    next_id = len(self.words.keys())
                    self.words[word] = self.Entry(next_id)
                    self.ids[next_id] = word
                else:
                    self.words[word].count += 1

    def filter(self, n_samples, descending=True):
        """
        Reduces this vocabs dictionary to n_samples
        after sorting by word count. 
        """
        sorted_list = list(sorted(
                self.words.items(),
                key=lambda item: item[1].count,
                reverse=descending))
        self.words = {k: v for k, v in sorted_list[:n_samples]}

    def get_indeces(self, sentence):
        """ Produces a representation from the indeces in this vocabulary. """
        STA = 0
        END = 1
        UNK = 2
        if self.spacy_nlp:
            seq = [self.words[word].id if word in self.words.keys() else UNK
                for word in self.spacy_nlp.tokenizer(sentence)]
        else:
            seq = [self.words[word].id if word in self.words.keys() else UNK
                for word in sentence.split()]
        seq.append(END)
        return seq
    
    def get_sentence(self, indeces):
        """
        Converts a list of indeces into a readable sentence
        using words from this vocabulary.
        """
        return ' '.join([self.ids[id] for id in indeces])

We make sure the vocabulary works as intended by running tests with a tiny corpus. Note that during development these tests were written **first** and subsequently we implemented the related functionality in the class.

In [56]:
TEST_CORPUS_EN = """The final project should implement a system 
        related to deep learning for NLP using the Py- Torch library 
        and test it. The project is documented in an ACL-style paper 
        that adheres to the standards of practice in computational 
        linguistics."""

test_vocab = Vocab(TEST_CORPUS_EN)

# Basic Functionality
assert(test_vocab.ids[0] == '<S>') # <S> should always be first
assert(test_vocab.ids[3] == 'The')
assert(test_vocab.words['The'].id == 3)
assert(test_vocab.words['the'].count == 2)

# Spacy Option
english_nlp = spacy.load('en_core_web_sm')
spacy_vocab = Vocab(TEST_CORPUS_EN, spacy_nlp=english_nlp)

assert(spacy_vocab.ids[0] == '<S>')
assert(spacy_vocab.ids[5] == 'project')
assert(spacy_vocab.words['project'].id == 5)
assert(spacy_vocab.words['project'].count == 2)

# Vocabulary Filter
top_30 = test_vocab
top_30.filter(n_samples=30)

assert(len(top_30.words) == 30)
assert(top_30 == test_vocab)

# Sentence Vector Conversion
vec = test_vocab.get_indeces("This is a test")
sent = test_vocab.get_sentence(vec)

assert(vec == [2, 24, 8, 22, 1])
assert(sent == '<U> is a test <E>')

print('OK. No errors or asserts were triggered during testing :)')

OK. No errors or asserts were triggered during testing :)


# 3. Model

The network consists of three main parts. These are the Encoder, Decoder and Alignment models which are ultimately combined to form the RNNsearch model.

## 3.1 Encoder

**Inputs:** sequence of one-hot vectors representing a sentence

**Outputs:** series of annotations, most recent GRU hidden states

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.embed = nn.Embedding(input_dim, embedding_dim) # m x Kx
        self.birnn = nn.GRU(embedding_dim, hidden_dim, bidirectional = True) # 2*n x m
        self.fc = nn.Linear(hidden_dim * 2, hidden_dim) # n x 2*n

    def forward(self, x):
        E = self.embeddings(x)
        # annotations shape: x_len x batch_size x 2*n
        annotations, hidden = self.birnn(embeds)
        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
        return annotations, hidden

## 3.2 Attention

**Inputs:** 

**Outputs:**

In [None]:
class Attention(nn.Module):
    def __init__(self, attention_dim):
        super().__init__()
        self.attention = nn.Linear((attention_dim * 2) + attention_dim, attention_dim)
        self.v = nn.Linear(attention_dim, 1, bias = False)
        
    def forward(self, encoder_outs, hidden):
        batch_size = encoder_outs.shape[1]
        src_len = encoder_outs.shape[0]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        
        encoder_outs = encoder_outs.permute(1, 0, 2)
        energy = torch.tanh(self.attention(torch.cat((hidden, encoder_outs), dim = 2)))
        attention = self.v(energy).squeeze(2)
        return F.softmax(attention, dim=1)

## 3.3 Decoder

**Input:** 

**Outputs:** 

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.output_dim = output_dim
        self.attention = Attention(hidden_dim)
        self.embeddings = nn.Embedding(output_dim, embedding_dim)
        self.rnn = nn.GRU((hidden_dim * 2) + embedding_dim, hidden_dim)
        self.fc_out = nn.Linear((hidden_dim * 2) + hidden_dim + embedding_dim, output_dim)
        
    def forward(self, input, encoder_outs, hidden):
        input = input.unsqueeze(0)
        embeds = self.embeddings(input)
        a = self.attention(hidden, encoder_outs)
        a = a.unsqueeze(1)
        encoder_outs = encoder_outs.permute(1, 0, 2)
        weighted = torch.bmm(a, encoder_outs)
        weighted = weighted.permute(1, 0, 2)
        rnn_input = torch.cat((embeds, weighted), dim = 2)
        
        output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
        
        embeds = embeds.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)
        
        prediction = self.fc_out(torch.cat((output, weighted, embeds), dim = 1))
        return prediction, hidden.squeeze(0)

## 3.4 Final RNNsearch Model

**Input:** 

**Outputs:** 

In [None]:
class RNNsearch(nn.Module):
    def __init__(self, input_dim, output_dim, embedding_dim, hidden_dim, attention_dim):
        super().__init__()

        self.encoder = Encoder(input_dim, embedding_dim, hidden_dim)
        self.decoder = Decoder(output_dim, embedding_dim, hidden_dim)
        self.device = device

    def init_weights(self):
        nn.init.orthogonal_(self.encoder.birnn.weight_ih_l[0])
        nn.init.orthogonal_(self.encoder.birnn.weight_hh_l[0])
        self.encoder.birnn.bias_ih_l[k].fill_(0)
        self.encoder.birnn.bias_hh_l[k].fill_(0)

    def beam_search(self, decoderout, k):
        pass

    def forward(self, x, y, teacher_forcing_ratio=0.5):
        batch_size = x.shape[1]
        max_len = y.shape[0]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)

        encoder_outputs, hidden = self.encoder(x)

        # first input to the decoder is the <S> token
        output = y[0,:]

        for t in range(1, max_len):
            output, hidden = self.decoder(output, hidden, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.max(1)[1]
            output = (y[t] if teacher_force else top1)

        return outputs

# 4. Training

## 4.1 Hyperparameters

For our purposes we can adjust the hyperparameters to more reasonably expensive values in terms of execution time.

In [None]:
embedding_dim = 256 #@param {type:"integer"}
hidden_dim = 512 #@param {type:"integer"}
alignment_dim = 64 #@param {type:"integer"}
maxout_dim = 500 #@param {type:"integer"}
max_sentence_length = 50 #@param {type:"integer"}
vocab_size = 30000 #@param {type:"integer"}
p = 0.95 #@param {type:"number"}
lr = 0.000001 #@param {type:"number"}


# 5. Evaluation