# Encoder-Decoder Transformer Architecture

## Create Data

In the follow method creates a dataset with pairs of numerical numbers and its textual representation. It returns a split of train and test validation data.

In [11]:
import pandas as pd
from num2words import num2words

def create_dataset(start=0, end=1000, train_size=0.8):
   numbers = list(range(start, end))
   data = [(str(n), num2words(n)) for n in numbers]
   df = pd.DataFrame(data, columns=['num', 'word'])
   
   train_df = df.sample(frac=train_size, random_state=42)
   test_df = df.drop(train_df.index)
   
   return train_df, test_df

In [12]:
train_df, val_df = create_dataset(0, 1000, 0.8)
print('Items in val_df: ', len(val_df))
print('Items in train_df: ', len(train_df))

train_df.head()

Items in val_df:  200
Items in train_df:  800


Unnamed: 0,num,word
521,521,five hundred and twenty-one
737,737,seven hundred and thirty-seven
740,740,seven hundred and forty
660,660,six hundred and sixty
411,411,four hundred and eleven


## Tokenizer

The inputs are tokenized by just using the digits and having the following special tokens:

- BOS_TOKEN_ID = 10
- EOS_TOKEN_ID = 11
- PAD_TOKEN_ID = 12

For the outputs we train a sentencepiece tokenizer.

In [13]:
INPUT_BOS_TOKEN_ID = 10
INPUT_EOS_TOKEN_ID = 11
INPUT_PAD_TOKEN_ID = 12

In [14]:
import sentencepiece as spm
import io

text_data = "\n".join(train_df['word'])
text_stream = io.StringIO(text_data)

spm.SentencePieceTrainer.train(
    sentence_iterator=text_stream,
    model_prefix='tokenizer/num_to_words',
    vocab_size=200,
    model_type='bpe',
    character_coverage=1.0,
    user_defined_symbols=['<PAD>', '<BOS>', '<EOS>']
)

sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input_format: 
  model_prefix: tokenizer/num_to_words
  model_type: BPE
  vocab_size: 200
  self_test_sample_size: 0
  character_coverage: 1
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pretokenization_delimiter: 
  treat_whitespace_as_suffix: 0
  allow_whitespace_only_pieces: 0
  user_defined_symbols: <PAD>
  user_defined_symbols: <BOS>
  user_defined_symbols: <EOS>
  required_chars: 
  byte_fallback: 0
  vocabulary_output_piece_score: 1
  train_extremely_large_corpus: 0
  seed_sentencepieces_file: 
  hard_vocab_limit: 1
  use_all_vocab: 0
  unk_id: 0
  bos_id: 1
  eos_id: 2
  pad_id: -1
  unk_piece: <unk>
  bos_piece: <s>
  eos_piece: </s>
  pad_

In [15]:
sp = spm.SentencePieceProcessor()
sp.load('tokenizer/num_to_words.model')

text = "one hundred and twenty-five"
ids = sp.encode(text, out_type=int)
tokens = sp.encode(text, out_type=str)
decoded = sp.decode(ids)

print("Tokens:", tokens)
print("Decoded:", decoded)

Tokens: ['▁one', '▁hundred', '▁and', '▁twenty', '-', 'five']
Decoded: one hundred and twenty-five


In [16]:
# get ids of special tokens
OUTPUT_BOS_TOKEN_ID = sp.piece_to_id('<BOS>')
OUTPUT_EOS_TOKEN_ID = sp.piece_to_id('<EOS>')
OUTPUT_PAD_TOKEN_ID = sp.piece_to_id('<PAD>')
print('OUTPUT_BOS_TOKEN_ID:', OUTPUT_BOS_TOKEN_ID)
print('OUTPUT_EOS_TOKEN_ID:', OUTPUT_EOS_TOKEN_ID)
print('OUTPUT_PAD_TOKEN_ID:', OUTPUT_PAD_TOKEN_ID)

OUTPUT_BOS_TOKEN_ID: 4
OUTPUT_EOS_TOKEN_ID: 5
OUTPUT_PAD_TOKEN_ID: 3


In [20]:
import sentencepiece as spm
import torch
import pandas as pd
import torch


def tokenize_textual(df_column):
    sp = spm.SentencePieceProcessor(model_file='tokenizer/num_to_words.model')

    tokenized_words = df_column.apply(lambda x: [OUTPUT_BOS_TOKEN_ID] + sp.encode(x, out_type=int) + [OUTPUT_EOS_TOKEN_ID])
    max_length = tokenized_words.apply(len).max()

    padded_tokens = tokenized_words.apply(lambda x: x + [OUTPUT_PAD_TOKEN_ID] * (max_length - len(x)))

    tensor_data = torch.tensor(padded_tokens.tolist(), dtype=torch.long)
    return tensor_data


def tokenize_numeric(df_column):
    tokenized_sequences = [
        [INPUT_BOS_TOKEN_ID] + [int(d) for d in str(num)] + [INPUT_EOS_TOKEN_ID] for num in df_column
    ]

    # Ensure consistent padding length across input and target
    max_length = max(len(seq) for seq in tokenized_sequences)
    
    padded_sequences = [
        seq + [INPUT_PAD_TOKEN_ID] * (max_length - len(seq)) for seq in tokenized_sequences
    ]

    tensor_sequences = torch.tensor(padded_sequences, dtype=torch.long)
    return tensor_sequences

def decode_numeric(sequence):
   seq_list = sequence.tolist()
   tokens = [
       '<BOS>' if x == INPUT_BOS_TOKEN_ID else
       '<EOS>' if x == INPUT_EOS_TOKEN_ID else
       '<PAD>' if x == INPUT_PAD_TOKEN_ID else
       str(x) 
       for x in seq_list
   ]
   return "".join(tokens)

In [25]:
from torch.utils.data import Dataset, DataLoader

class NumberWordDataset(Dataset):
    def __init__(self, input_tensor, target_tensor, pad_token_id=3):
        self.input_tensor = input_tensor
        self.target_tensor = target_tensor
        self.pad_token_id = pad_token_id

    def __len__(self):
        return len(self.input_tensor)

    def __getitem__(self, idx):
        input_seq = self.input_tensor[idx]
        target_seq = self.target_tensor[idx]

        # Create attention masks (1 = actual token, 0 = padding)
        input_mask = (input_seq == INPUT_PAD_TOKEN_ID).long()
        target_mask = (target_seq == self.pad_token_id).long()
        
        return {
            'input_seq': input_seq,
            'target_seq': target_seq,
            'input_mask': input_mask.bool(),
            'target_mask': target_mask.bool() 
        }


In [26]:
tokenized_textual = tokenize_textual(train_df['word'])
tokenized_numeric = tokenize_numeric(train_df['num'])
# Create the dataset from tokenized data
dataset = NumberWordDataset(tokenized_numeric, tokenized_textual)


In [28]:
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
sp = spm.SentencePieceProcessor(model_file='tokenizer/num_to_words.model')

for batch in dataloader:
    input_seq = batch['input_seq']
    target_seq = batch['target_seq']
    input_mask = batch['input_mask']
    target_mask = batch['target_mask']

    for i in range(input_seq.size(0)):
        decoded_input = decode_numeric(input_seq[i])
        print('Item:', i + 1)
        print(input_seq[i])
        print("Decoded Input:", decoded_input)
        print("Input:", input_seq[i])
        print("Input Mask:", input_mask[i])
        decoded_target = sp.decode_ids(target_seq[i].tolist())
        print("Decoded Target:", decoded_target)
        print("Target:", target_seq[i])
        print("Target Mask:", target_mask[i])
        print()

    break  # Just show the first batch for testing


Item: 1
tensor([10,  9,  4,  9, 11])
Decoded Input: <BOS>949<EOS>
Input: tensor([10,  9,  4,  9, 11])
Input Mask: tensor([False, False, False, False, False])
Decoded Target: <BOS> nine hundred and forty-nine<EOS>
Target: tensor([  4,  29,  12,  14,  55, 190,  22,   5])
Target Mask: tensor([False, False, False, False, False, False, False, False])

Item: 2
tensor([10,  1,  3,  6, 11])
Decoded Input: <BOS>136<EOS>
Input: tensor([10,  1,  3,  6, 11])
Input Mask: tensor([False, False, False, False, False])
Decoded Target: <BOS> one hundred and thirty-six<EOS>
Target: tensor([  4,  47,  12,  14,  52, 190,  66,   5])
Target Mask: tensor([False, False, False, False, False, False, False, False])

Item: 3
tensor([10,  3,  7,  7, 11])
Decoded Input: <BOS>377<EOS>
Input: tensor([10,  3,  7,  7, 11])
Input Mask: tensor([False, False, False, False, False])
Decoded Target: <BOS> three hundred and seventy-seven<EOS>
Target: tensor([  4,  44,  12,  14,  57, 190,  67,   5])
Target Mask: tensor([False, F

In [29]:
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, input_vocab_size, target_vocab_size, d_model=512, nhead=8, num_layers=6, 
                 d_ff=2048, dropout=0.1):
        super().__init__()
        self.encoder_embedding = nn.Embedding(input_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(target_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.pos_decoder = PositionalEncoding(d_model, dropout)
        
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True
        )
        self.fc_out = nn.Linear(d_model, target_vocab_size)

    def forward(self, src, tgt, src_mask, tgt_mask):
        src_emb = self.pos_encoder(self.encoder_embedding(src))
        tgt_emb = self.pos_decoder(self.decoder_embedding(tgt))
        tgt_seq_len = tgt.size(1)
        causal_mask = self.generate_square_subsequent_mask(tgt_seq_len)

        output = self.transformer(
            src=src_emb,
            tgt=tgt_emb,
            tgt_mask=causal_mask,
            src_key_padding_mask=src_mask,
            tgt_key_padding_mask=tgt_mask,
            memory_key_padding_mask=src_mask
        )
        
        return self.fc_out(output)

    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz), diagonal=1).bool()
        return mask.to(next(self.parameters()).device)
        
class PositionalEncoding(nn.Module):
   def __init__(self, d_model, dropout=0.1, max_len=100):
       super().__init__()
       self.dropout = nn.Dropout(p=dropout)
       position = torch.arange(max_len).unsqueeze(1)
       div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
       pe = torch.zeros(max_len, d_model)
       pe[:, 0::2] = torch.sin(position * div_term)
       pe[:, 1::2] = torch.cos(position * div_term)
       self.register_buffer('pe', pe)

   def forward(self, x):
       x = x + self.pe[:x.size(1)]
       return self.dropout(x)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoderLayer
import math

class CustomMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        self.scale = math.sqrt(self.d_k)

    def forward(self, query, key, value, attn_mask=None, key_padding_mask=None):
        batch_size, query_len, _ = query.shape
        _, key_len, _ = key.shape

        Q = self.q_linear(query).view(batch_size, query_len, self.num_heads, self.d_k).transpose(1, 2)
        K = self.k_linear(key).view(batch_size, key_len, self.num_heads, self.d_k).transpose(1, 2)
        V = self.v_linear(value).view(batch_size, key_len, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
        scores = scores.clamp(min=-10, max=10)

        if attn_mask is not None:
            if attn_mask.dim() == 2:
                attn_mask = attn_mask.unsqueeze(0).unsqueeze(0)
            elif attn_mask.dim() == 3:
                attn_mask = attn_mask.unsqueeze(1)
            
            scores = scores + attn_mask.to(scores.device)

        if key_padding_mask is not None:
            key_padding_mask = key_padding_mask.unsqueeze(1).unsqueeze(2)
            scores = scores.masked_fill(key_padding_mask, -1e9)


        attn_weights = F.softmax(scores, dim=-1)

        attn_weights = self.dropout(attn_weights)
        attn_output = torch.matmul(attn_weights, V)

        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, query_len, self.d_model)

        return self.out_linear(attn_output)

class CustomTransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = CustomMultiHeadAttention(d_model, nhead, dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, src, src_mask=None):
        src2 = self.norm1(src)
        src2 = self.self_attn(src2, src2, src2, key_padding_mask=src_mask)
        src = src + self.dropout1(src2)
        
        src2 = self.norm2(src)
        src2 = self.linear2(self.dropout(F.relu(self.linear1(src2))))
        src = src + self.dropout2(src2)
        
        return src

class CustomTransformerEncoder(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            CustomTransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, src, mask=None):
        for layer in self.layers:
            src = layer(src, src_mask=mask)
        return self.norm(src)

class CustomTransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = CustomMultiHeadAttention(d_model, nhead, dropout)
        self.multihead_attn = CustomMultiHeadAttention(d_model, nhead, dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
    def forward(self, tgt, memory, tgt_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
        tgt2 = self.norm1(tgt)
        tgt2 = self.self_attn(tgt2, tgt2, tgt2, attn_mask=tgt_mask)
        tgt = tgt + self.dropout1(tgt2)
        
        tgt2 = self.norm2(tgt)
        tgt2 = self.multihead_attn(tgt2, memory, memory, key_padding_mask=memory_key_padding_mask)
        tgt = tgt + self.dropout2(tgt2)
        
        tgt2 = self.norm3(tgt)
        tgt2 = self.linear2(self.dropout(F.relu(self.linear1(tgt2))))
        tgt = tgt + self.dropout3(tgt2)
        
        return tgt

class CustomTransformerDecoder(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            CustomTransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, tgt, memory, tgt_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
        for layer in self.layers:
            tgt = layer(tgt, memory, tgt_mask=tgt_mask, memory_key_padding_mask=memory_key_padding_mask)
        return self.norm(tgt)
        

class CustomTransformerModel(nn.Module):
    def __init__(self, input_vocab_size, target_vocab_size, d_model=512, nhead=8, num_layers=6, 
                 dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.encoder_embedding = nn.Embedding(input_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(target_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.pos_decoder = PositionalEncoding(d_model, dropout)

        self.encoder = CustomTransformerEncoder(d_model, nhead, num_layers, dim_feedforward, dropout)
        self.decoder = CustomTransformerDecoder(d_model, nhead, num_layers, dim_feedforward, dropout)

        self.fc_out = nn.Linear(d_model, target_vocab_size)

        self._init_weights()


    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)


    def forward(self, src, tgt, src_mask, tgt_mask):
        src_emb = self.pos_encoder(self.encoder_embedding(src))
        tgt_emb = self.pos_decoder(self.decoder_embedding(tgt))
        
        memory = self.encoder(src_emb, src_mask)

        tgt_seq_len = tgt.size(1)
        causal_mask = self.generate_square_subsequent_mask(tgt_seq_len)

        output = self.decoder(
            tgt_emb,
            memory,
            tgt_mask=causal_mask,
            tgt_key_padding_mask=tgt_mask,
            memory_key_padding_mask=src_mask
        )
                
        return self.fc_out(output)

    def generate_square_subsequent_mask(self, sz):
        return torch.triu(torch.full((sz, sz), float('-inf')), diagonal=1)



In [49]:
def train_model(model, dataloader, num_epochs=20, learning_rate=1e-5):
    # device = torch.device("cpu" if torch.backends.mps.is_available() else "cpu")
    device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
    model.to(device)

    max_grad_norm = 1.0

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    optimizer.zero_grad(set_to_none=True)

    criterion = nn.CrossEntropyLoss(ignore_index=OUTPUT_PAD_TOKEN_ID)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for batch in dataloader:
            src = batch['input_seq'].to(device)
            tgt = batch['target_seq'].to(device)

            src_mask = batch['input_mask'].to(device)
            tgt_mask = batch['target_mask'].to(device)

            src_mask = src_mask[:, :src.shape[1]].to(device)
            tgt_mask = tgt_mask[:, :tgt.shape[1]].to(device)

            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            tgt_input = tgt_input.to(device)
            tgt_output = tgt_output.to(device)


            tgt_mask = tgt_mask[:, :-1]


            optimizer.zero_grad()

            output = model(src, tgt_input, src_mask, tgt_mask)

            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)

            loss = criterion(output, tgt_output)
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
            
            if torch.isnan(loss):
                print(f"NaN loss detected in batch! Skipping batch.")
                continue

            optimizer.step()
            epoch_loss += loss.item()

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(dataloader)}")


In [47]:
INPUT_VOCAB_SIZE = 15
TARGET_VOCAB_SIZE = sp.get_piece_size()

D_MODEL = 512
NHEAD = 8
NUM_LAYERS = 6

In [50]:
model = TransformerModel(INPUT_VOCAB_SIZE, TARGET_VOCAB_SIZE, D_MODEL, NHEAD, NUM_LAYERS)
train_model(model, dataloader, num_epochs=5, learning_rate=1e-4)

Epoch 1/5, Loss: 2.1866929924488065
Epoch 2/5, Loss: 0.8046772241592407
Epoch 3/5, Loss: 0.6707995140552521
Epoch 4/5, Loss: 0.5844918149709701
Epoch 5/5, Loss: 0.47032070994377134


In [None]:
custom_model = CustomTransformerModel(INPUT_VOCAB_SIZE, TARGET_VOCAB_SIZE, D_MODEL, NHEAD, NUM_LAYERS)
train_model(custom_model, dataloader, num_epochs=5, learning_rate=1e-4)

In [None]:
def greedy_decoding(model, number, tokenizer_text, max_length=50):
    device = torch.device("cpu")
    model.to(device)
    model.eval()
    
    with torch.no_grad():
        # tokenize the input number
        src = tokenize_numeric(pd.Series([number]))
        src_mask = ~torch.ones(src.shape, dtype=torch.bool).to(device)

        # initialize the target sequence with the BOS token
        tgt = torch.tensor([[OUTPUT_BOS_TOKEN_ID]], dtype=torch.long).to(device)
        tgt_mask = ~torch.ones((1, 1), dtype=torch.bool).to(device)
        
        for _ in range(max_length):
            output = model(src, tgt, src_mask, tgt_mask)
            next_token = output[:, -1].argmax(dim=-1).unsqueeze(1)

            # check if the next token is the EOS token 
            # and stop decoding if so
            if next_token.item() == OUTPUT_EOS_TOKEN_ID:
                break

            # append the next token to the target sequence
            tgt = torch.cat([tgt, next_token], dim=1)
            tgt_mask = ~torch.ones(tgt.shape, dtype=torch.bool)
            
        output_tokens = tgt[0][1:].tolist()

        # if EOS token is present, remove it and everything after it
        if OUTPUT_EOS_TOKEN_ID in output_tokens:
            output_tokens = output_tokens[:output_tokens.index(OUTPUT_EOS_TOKEN_ID)]
        return tokenizer_text.decode(output_tokens)

greedy_decoding(model, 120, sp, max_length=50)

'one hundred and twenty'

In [None]:
def beam_search(model, number, tokenizer_text, beam_width=3, max_length=50):
    device = torch.device("cpu")
    model.to(device)
    model.eval()
    
    with torch.no_grad():
        src = tokenize_numeric(pd.Series([number])).to(device)
        src_mask = ~torch.ones(src.shape, dtype=torch.bool).to(device)

        beams = [(torch.tensor([[OUTPUT_BOS_TOKEN_ID]], dtype=torch.long).to(device), 0)]

        for _ in range(max_length):
            new_beams = []
            for seq, score in beams:

                # if sequence ends with EOS token, 
                # just add it directly to new beams
                if(seq[0, -1].item() == OUTPUT_EOS_TOKEN_ID):
                    new_beams.append((seq, score))
                    continue

                tgt_mask = ~torch.ones(seq.shape, dtype=torch.bool).to(device)
                output = model(src, seq, src_mask, tgt_mask)
                logits = output[:, -1, :]
                top_k_probs, top_k_tokens = torch.topk(logits, beam_width, dim=-1)
                
                for i in range(beam_width):
                    next_token = top_k_tokens[0, i].unsqueeze(0).unsqueeze(0)
                    new_seq = torch.cat([seq, next_token], dim=1)
                    new_score = score + torch.log(top_k_probs[0, i])
                    new_beams.append((new_seq, new_score))

            # sort the new beams by score a
            beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]

            # early stopping if all beams end with EOS token
            if all(beams[i][0][0, -1] == OUTPUT_EOS_TOKEN_ID for i in range(len(beams))):
                break
        
        best_seq = beams[0][0][0].tolist()[1:]
        if OUTPUT_EOS_TOKEN_ID in best_seq:
            best_seq = best_seq[:best_seq.index(OUTPUT_EOS_TOKEN_ID)]

        return tokenizer_text.decode(best_seq)

beam_search(model, 234, sp, beam_width=3, max_length=50)

'two hundred and thirty-four'

In [104]:
def top_k_sampling(model, number, tokenizer_text, k=10, temperature=0.2, max_length=50, p=None):
    device = torch.device("cpu")
    model.to(device)
    model.eval()
    
    with torch.no_grad():
        src = tokenize_numeric(pd.Series([number])).to(device)
        tgt = torch.tensor([[OUTPUT_BOS_TOKEN_ID]], dtype=torch.long).to(device)
        src_mask = ~torch.ones(src.shape, dtype=torch.bool).to(device)
        
        for _ in range(max_length):
            tgt_mask = ~torch.ones(tgt.shape, dtype=torch.bool).to(device)
            output = model(src, tgt, src_mask, tgt_mask)
            logits = output[:, -1, :]
            
            # apply temperature scaling
            scaled_logits = logits / temperature
            
            def top_k(scaled_logits, k):
                # apply top-k filtering
                top_k_probs, top_k_tokens = torch.topk(scaled_logits, k, dim=-1)
                probabilities = torch.softmax(top_k_probs, dim=-1)
                return probabilities, top_k_tokens


            def top_p(scaled_logits, p):
                # top p
                probs = torch.softmax(scaled_logits, dim=-1)
                sorted_probs, sorted_indices = torch.sort(probs, descending=True)

                cumulative_probs = torch.cumsum(sorted_probs, dim=-1)
                cutoff = cumulative_probs > p
                cutoff[..., 1:] = cutoff[..., :-1].clone()
                cutoff[..., 0] = False

                sorted_probs = sorted_probs.masked_fill(cutoff, 0.0)
                return sorted_probs, sorted_indices
            
            if p:
                probabilities, items = top_p(scaled_logits, p)
            elif k:
                probabilities, items = top_k(scaled_logits, k)
            else:
                raise ValueError("Either k or p must be provided")

            next_token = items[0, torch.multinomial(probabilities[0], 1)].unsqueeze(0)

            if next_token.item() == OUTPUT_EOS_TOKEN_ID:
                break

            tgt = torch.cat([tgt, next_token], dim=1)
        
        output_tokens = tgt[0][1:].tolist()
        return tokenizer_text.decode(output_tokens)


top_k_sampling(model, 120, sp, k=5, max_length=50)
top_k_sampling(model, 120, sp, p=0.9, max_length=50)

'one hundred and twenty'

In [62]:
def test_accuracy(model, test_df, tokenizer_num, tokenizer_text):
    correct = 0
    total = 0

    for _, row in test_df.iterrows():
        number = row['num']
        word = row['word']
        # predicted_word = predict(model, number, tokenizer_num, tokenizer_text)
        # predicted_word = top_k_sampling_predict(model, number, sp, k=5, max_length=50)
        predicted_word = beam_search_predict(model, number, sp, beam_width=3)
        print(f"Number: {number}, Predicted: {predicted_word}, Actual: {word}")
        if predicted_word == word:
            correct += 1
        total += 1

    return correct / total

In [63]:
test_accuracy(model, val_df[:10], tokenize_numeric, sp)

Number: 1, Predicted: one hundred and ninety-one, Actual: one
Number: 4, Predicted: four, Actual: four
Number: 13, Predicted: one hundred and thirty-three, Actual: thirteen
Number: 14, Predicted: one hundred and forty-four, Actual: fourteen
Number: 20, Predicted: twenty, Actual: twenty
Number: 21, Predicted: two hundred and twenty-one, Actual: twenty-one
Number: 27, Predicted: two hundred and seventy-two, Actual: twenty-seven
Number: 32, Predicted: three hundred and twenty-two, Actual: thirty-two
Number: 34, Predicted: thirty-four, Actual: thirty-four
Number: 35, Predicted: three hundred and fifty-three, Actual: thirty-five


0.3