# Dataset

In [28]:
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import WhitespaceSplit
from tokenizers.processors import TemplateProcessing
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from transformers import PreTrainedTokenizerFast

import torch
import torch.nn as nn

from tqdm import tqdm

In [29]:
from torch.nn.utils.rnn import pack_padded_sequence as pack

In [30]:
import copy
import csv
import math
import os
import sys
import wget

import torch
import torch.nn as nn

from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import WhitespaceSplit
from tokenizers.processors import TemplateProcessing
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from transformers import PreTrainedTokenizerFast

from tqdm import tqdm

from torch.nn.utils.rnn import pack_padded_sequence as pack

# Download the dataset
files = ['train.src', 'train.tgt', 'dev.src', 'dev.tgt', 'test.src', 'test.tgt']
source = "https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/"
os.makedirs('./data/', exist_ok=True)

for file in files:
    print(f'Downloading {file} from {source}')
    wget.download(source + file, out='./data/')
    print("", flush=True)

Downloading train.src from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/

Downloading train.tgt from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/

Downloading dev.src from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/

Downloading dev.tgt from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/

Downloading test.src from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/

Downloading test.tgt from https://github.com/nlp-course/data/raw/refs/heads/master/Words2Num/



In [31]:
# Set device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [32]:
# Process data
for split in ['train', 'dev', 'test']:
    src_in_file = f'./data/{split}.src'
    tgt_in_file = f'./data/{split}.tgt'
    out_file = f'./data/{split}.csv'
    
    with open(src_in_file, 'r') as f_src_in, open(tgt_in_file, 'r') as f_tgt_in:
        with open(out_file, 'w') as f_out:
            src, tgt= [], []
            writer = csv.writer(f_out)
            writer.writerow(('src','tgt'))
            for src_line, tgt_line in zip(f_src_in, f_tgt_in):
                writer.writerow((src_line.strip(), tgt_line.strip()))

In [33]:
dataset = load_dataset(
    "csv",
    data_files={
        "train": f"./data/train.csv",
        "val": f"./data/dev.csv",
        "test": f"./data/test.csv",
    },
)

train_data = dataset['train']
test_data = dataset['test']
val_data = dataset['val']

Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating val split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

In [34]:
# Initialize tokenizers and add special tokens
unk_token = '[UNK]'
pad_token = '[PAD]'
bos_token = '<bos>'
eos_token = '<eos>'
src_tokenizer = Tokenizer(WordLevel(unk_token=unk_token))
src_tokenizer.pre_tokenizer = WhitespaceSplit()

src_trainer = WordLevelTrainer(special_tokens=[pad_token, unk_token])
src_tokenizer.train_from_iterator(train_data['src'], trainer=src_trainer)

tgt_tokenizer = Tokenizer(WordLevel(unk_token=unk_token))
tgt_tokenizer.pre_tokenizer = WhitespaceSplit()

tgt_trainer = WordLevelTrainer(special_tokens=[pad_token, unk_token, bos_token, eos_token])

tgt_tokenizer.train_from_iterator(train_data['tgt'], trainer=tgt_trainer)

tgt_tokenizer.post_processor = \
  TemplateProcessing(single=f"{bos_token} $A {eos_token}",
                     special_tokens=[(bos_token, 
                                      tgt_tokenizer.token_to_id(bos_token)), 
                                     (eos_token,
                                      tgt_tokenizer.token_to_id(eos_token))])

# Wrap with PreTrainedTokenizerFast for compatability with datasets
hf_src_tokenizer = PreTrainedTokenizerFast(tokenizer_object=src_tokenizer,
                                           pad_token=pad_token,
                                           unk_token=unk_token)
hf_tgt_tokenizer = PreTrainedTokenizerFast(tokenizer_object=tgt_tokenizer,
                                           pad_token=pad_token, 
                                           unk_token=unk_token, 
                                           bos_token=bos_token,
                                           eos_token=eos_token)



In [50]:
# Encode data
def encode(example):
    example['src_ids'] = hf_src_tokenizer(example['src']).input_ids
    example['tgt_ids'] = hf_tgt_tokenizer(example['tgt']).input_ids
    return example

train_data = train_data.map(encode)
val_data = val_data.map(encode)
test_data = test_data.map(encode)

Map:   0%|          | 0/65022 [00:00<?, ? examples/s]

Map:   0%|          | 0/700 [00:00<?, ? examples/s]

Map:   0%|          | 0/700 [00:00<?, ? examples/s]

In [52]:
# Create dataloaders
BATCH_SIZE = 32  
TEST_BATCH_SIZE = 1

src_vocab = hf_src_tokenizer.get_vocab()
tgt_vocab = hf_tgt_tokenizer.get_vocab()

# Defines how to batch a list of examples together
def collate_fn(examples):
    batch = {}
    bsz = len(examples)
    src_ids, tgt_ids = [], []
    for example in examples:
        src_ids.append(example['src_ids'])
        tgt_ids.append(example['tgt_ids'])

    src_len = torch.LongTensor([len(word_ids) for word_ids in src_ids]).to(device)
    src_max_length = max(src_len)
    tgt_max_length = max([len(word_ids) for word_ids in tgt_ids])

    src_batch = torch.zeros(bsz, src_max_length).long().fill_(src_vocab[pad_token]).to(device)
    tgt_batch = torch.zeros(bsz, tgt_max_length).long().fill_(tgt_vocab[pad_token]).to(device)
    for b in range(bsz):
        src_batch[b][:len(src_ids[b])] = torch.LongTensor(src_ids[b]).to(device)
        tgt_batch[b][:len(tgt_ids[b])] = torch.LongTensor(tgt_ids[b]).to(device)
    
    batch['src_lengths'] = src_len
    batch['src_ids'] = src_batch
    batch['tgt_ids'] = tgt_batch
    return batch

train_iter = torch.utils.data.DataLoader(train_data, 
                                         batch_size=BATCH_SIZE, 
                                         shuffle=True, 
                                         collate_fn=collate_fn)
val_iter = torch.utils.data.DataLoader(val_data, 
                                       batch_size=BATCH_SIZE, 
                                       shuffle=False, 
                                       collate_fn=collate_fn)
test_iter = torch.utils.data.DataLoader(test_data, 
                                        batch_size=TEST_BATCH_SIZE, 
                                        shuffle=False, 
                                        collate_fn=collate_fn)

# Model

Our model will consist of a bi-directional LSTM encoder feeding to  a uni-directional LSTM decoder. This of course generates a shape mismatch between the output of the encoder vs the expected input of the decoder. We address this a function 'reshape', built from the assumption that the shape of the encoder output will be `(num_layers * directions, bsz, hidden_size)`, from the pytorch documentaiton of the LSTM.

In [54]:
def reshape(t: torch.Tensor, layer_size, bsz, hidden_size) -> torch.Tensor:
    t = t.reshape([2, layer_size, bsz, hidden_size // 2])
    t = torch.cat([t[0], t[1]], dim=-1)

    return t

In [55]:
class EncoderDecoder(nn.Module):
    def __init__(
            self,
            src_tokenizer,
            tgt_tokenizer,
            embedding_size=64,
            hidden_size=64,
            layers=2,
    ):
        super(EncoderDecoder, self).__init__()
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer
        
        self.vocab_size_src = len(src_tokenizer)
        self.vocab_size_tgt = len(tgt_tokenizer)

        self.padding_id_src = self.src_tokenizer.pad_token_id
        self.padding_id_tgt = self.tgt_tokenizer.pad_token_id
        self.bos_id = self.tgt_tokenizer.bos_token_id
        self.eos_id = self.tgt_tokenizer.bos_token_id

        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.layers = layers

        self.src_embeddings = nn.Embedding(self.vocab_size_src, embedding_size)
        self.tgt_embeddings = nn.Embedding(self.vocab_size_tgt, embedding_size)

        # RNN cells
        self.encoder_rnn = nn.LSTM(
            input_size=embedding_size,
            hidden_size=hidden_size // 2,  # to match decoder hidden size
            batch_first=True,
            num_layers=layers,
            bidirectional=True,  # bidirectional encoder
        )
        self.decoder_rnn = nn.LSTM(
            input_size=embedding_size,
            hidden_size=hidden_size,
            batch_first=True,
            num_layers=layers,
            bidirectional=False,  # unidirectional decoder
        )

        # Final projection layer
        self.hidden2output = nn.Linear(hidden_size, self.vocab_size_tgt)

        # Create loss function
        self.loss_function = nn.CrossEntropyLoss(
            reduction="sum", ignore_index=self.padding_id_tgt
        )

    def forward_encoder(self, src, src_lengths):
        """Encodes 'src'. Returns hidden state and context state

        Args:
            src: input batch of size (bsz, max_src_len)
            src_lengths: lengths of sources of size (bsz)
        """
        bsz = src.shape[0]
        src_embeddings = self.src_embeddings(src)
        packed = pack(src_embeddings, src_lengths, batch_first=True, enforce_sorted=False)
        _, (h, c) = self.encoder_rnn(packed)
        h = reshape(h, self.layers, bsz, self.hidden_size)
        c = reshape(c, self.layers, bsz, self.hidden_size)

        return h, c
    
    def forward_decoder(self, encoder_final_state, tgt_in):
        tgt_embeddings = self.tgt_embeddings(tgt_in)
        out, _ = self.decoder_rnn(tgt_embeddings, encoder_final_state)
        return self.hidden2output(out)
    
    def forward(self, src, src_lengths, tgt_in):
        encoder_final_state = self.forward_encoder(src, src_lengths)
        logits = self.forward_decoder(encoder_final_state, tgt_in)
        return logits

# Training

In [56]:
model = EncoderDecoder(hf_src_tokenizer,
                       hf_tgt_tokenizer,).to(device)

epochs = 2
lr = 2e-3
optim = torch.optim.Adam(model.parameters(), lr=lr)

for epoch in range(epochs):
    total_words = 0
    total_loss = 0.0
    for batch in tqdm(train_iter):
        model.zero_grad()
        tgt = batch["tgt_ids"]
        src = batch["src_ids"]
        src_lengths = batch['src_lengths']

        # Remove eos
        tgt_in = tgt[:, :-1]
        # Remove bos
        tgt_out = tgt[:, 1:]
        bsz = src.size(0)

        logits = model.forward(src, src_lengths, tgt_in)
        loss = model.loss_function(
            logits.reshape(-1, model.vocab_size_tgt), tgt_out.reshape(-1)
        )

        total_words += tgt_out.ne(model.padding_id_tgt).float().sum().item()
        total_loss += loss.item()

        loss.div(bsz).backward()
        optim.step()



100%|██████████| 2032/2032 [00:38<00:00, 52.41it/s]
100%|██████████| 2032/2032 [00:38<00:00, 52.97it/s]


In [64]:
def test_model(model, test_iter):
    """Return the accuracy of the 'model' on 'test_iter'"""
    correct = 0
    total = 0
