In [None]:
!pip install torchtext==0.15.1

In [None]:
!python -m spacy download uk_core_news_sm
!python -m spacy download en_core_web_sm

In [None]:
!pip install datasets

In [None]:
from datasets import load_dataset

ds2016 = load_dataset("turuta/Multi30k-uk", "flickr_2016")
ds2017 = load_dataset("turuta/Multi30k-uk", "flickr_2017")
ds2018 = load_dataset("turuta/Multi30k-uk", "flickr_2018")
multi = load_dataset("turuta/Multi30k-uk", "multi30k")

In [None]:
print(f'ds2016 {ds2016}')
print(f"ds2017 {ds2017}")
print(f"ds2018 {ds2018}")
print(f"multi {multi}")

In [None]:
from datasets import concatenate_datasets, DatasetDict
from sklearn.model_selection import train_test_split as sklearn_train_test_split

combined_train = concatenate_datasets([
    ds2016["train"],
    ds2017["train"],
    ds2018["train"],
    multi["train"]
])

train_val_indices, test_indices = sklearn_train_test_split(
    list(range(len(combined_train))),
    test_size=0.2,
    random_state=42
)

train_indices, val_indices = sklearn_train_test_split(
    train_val_indices,
    test_size=0.1,  # 10% від 80% = 8% від всього
    random_state=42
)

train_dataset = combined_train.select(train_indices)
val_dataset = combined_train.select(val_indices)
test_dataset = combined_train.select(test_indices)

combined_dataset = DatasetDict({
    "train": train_dataset,
    "validation": val_dataset,
    "test": test_dataset
})

print(combined_dataset)

In [None]:
import spacy

uk_nlp = spacy.load("uk_core_news_sm")
en_nlp = spacy.load("en_core_web_sm")

string = "Hi, what are you doing"

print([token.text for token in en_nlp.tokenizer(string)])

In [None]:
import re

def clean_tokens(tokens):
    return [
        token for token in tokens
        if token.strip() and token.strip() not in {",", ".", "!", "?", ":", ";", "\xa0", " "}
    ]

def tokenize_example(example, uk_nlp, en_nlp, max_length, lower, sos_token, eos_token):
    uk_tokens = [token.text for token in uk_nlp.tokenizer(example["uk"])][:max_length]
    en_tokens = [token.text for token in en_nlp.tokenizer(example["en"])][:max_length]
    if lower:
        uk_tokens = [token.lower() for token in uk_tokens]
        en_tokens = [token.lower() for token in en_tokens]

    uk_tokens = clean_tokens(uk_tokens)
    en_tokens = clean_tokens(en_tokens)

    uk_tokens = [sos_token] + uk_tokens + [eos_token]
    en_tokens = [sos_token] + en_tokens + [eos_token]
    return {"uk_tokens": uk_tokens, "en_tokens": en_tokens}

In [None]:
max_length = 1_000
lower = True
sos_token = "<sos>"
eos_token = "<eos>"

fn_kwargs = {
    "uk_nlp": uk_nlp,
    "en_nlp": en_nlp,
    "max_length": max_length,
    "lower": lower,
    "sos_token": sos_token,
    "eos_token": eos_token,
}

train_data = train_dataset.map(tokenize_example, fn_kwargs=fn_kwargs)
valid_data = val_dataset.map(tokenize_example, fn_kwargs=fn_kwargs)
test_data = test_dataset.map(tokenize_example, fn_kwargs=fn_kwargs)

In [None]:
train_data[0]

In [None]:
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

min_freq = 2
unk_token = "<unk>"
pad_token = "<pad>"

special_tokens = [
    unk_token,
    pad_token,
    sos_token,
    eos_token,
]

uk_vocab = build_vocab_from_iterator(
    train_data["uk_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
)

en_vocab = build_vocab_from_iterator(
    train_data["en_tokens"],
    min_freq=min_freq,
    specials=special_tokens,
)

In [None]:
uk_vocab.get_itos()[:20]

In [None]:
en_vocab.get_itos()[:20]

In [None]:
len(uk_vocab), len(en_vocab)

In [None]:
assert uk_vocab[unk_token] == en_vocab[unk_token]
assert uk_vocab[pad_token] == en_vocab[pad_token]

unk_index = uk_vocab[unk_token]
pad_index = uk_vocab[pad_token]

In [None]:
uk_vocab.set_default_index(unk_index)
en_vocab.set_default_index(unk_index)

In [None]:
tokens = ['a',
 'in',
 'the',
 'on',
 'man',
 'is',
 'and',
 'of']
en_vocab.lookup_indices(tokens)

In [None]:
en_vocab.lookup_tokens(en_vocab.lookup_indices(tokens))

In [None]:
def numericalize_example(example, uk_vocab, en_vocab):
    uk_ids = uk_vocab.lookup_indices(example["uk_tokens"])
    en_ids = en_vocab.lookup_indices(example["en_tokens"])
    return {"uk_ids": uk_ids, "en_ids": en_ids}

In [None]:
fn_kwargs = {"uk_vocab": uk_vocab, "en_vocab": en_vocab}

train_data = train_data.map(numericalize_example, fn_kwargs=fn_kwargs)
valid_data = valid_data.map(numericalize_example, fn_kwargs=fn_kwargs)
test_data = test_data.map(numericalize_example, fn_kwargs=fn_kwargs)

In [None]:
train_data[0]

In [None]:
uk_vocab.lookup_tokens(train_data[0]["uk_ids"])

In [None]:
data_type = "torch"
format_columns = ["uk_ids", "en_ids"]

train_data = train_data.with_format(
    type=data_type, columns=format_columns, output_all_columns=True
)

test_data = test_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

valid_data = valid_data.with_format(
    type=data_type,
    columns=format_columns,
    output_all_columns=True,
)

In [None]:
train_data[0]

In [None]:
def get_collate_fn(pad_index):
    def collate_fn(batch):
        batch_uk_ids = [example["uk_ids"] for example in batch]
        batch_en_ids = [example["en_ids"] for example in batch]
        batch_uk_ids = nn.utils.rnn.pad_sequence(batch_uk_ids, padding_value=pad_index, batch_first=True)
        batch_en_ids = nn.utils.rnn.pad_sequence(batch_en_ids, padding_value=pad_index, batch_first=True)
        batch = {
            "uk_ids": batch_uk_ids,
            "en_ids": batch_en_ids,
        }
        return batch

    return collate_fn

In [None]:
print(train_data[0])

In [None]:
def get_data_loader(dataset, batch_size, pad_index, shuffle=False):
    collate_fn = get_collate_fn(pad_index)
    data_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        collate_fn=collate_fn,
        shuffle=shuffle,
    )
    return data_loader

In [None]:
batch_size = 256

train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True)
test_data_loader = get_data_loader(test_data, batch_size, pad_index)
valid_data_loader = get_data_loader(valid_data, batch_size, pad_index)

In [None]:
import torch
import torch.nn as nn
import numpy as np
import math
from torch.autograd import Variable

# Embedding the input sequence
class Embedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(Embedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

    def forward(self, x):
        return self.embedding(x)

class LearnablePositionalEncoding(nn.Module):
    def __init__(self, max_len, embedding_dim, dropout=0.1):
        super(LearnablePositionalEncoding, self).__init__()
        self.pos_embedding = nn.Embedding(max_len, embedding_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        positions = torch.arange(0, seq_len, device=x.device).unsqueeze(0)  # (1, seq_len)
        pos_embed = self.pos_embedding(positions)  # (1, seq_len, d_model)
        x = x + pos_embed
        x = self.dropout(x)
        return x

# Self-attention layer
class SelfAttention(nn.Module):
    ''' Scaled Dot-Product Attention '''

    def __init__(self, dropout=0.1):
        super(SelfAttention, self).__init__()
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        key_dim = key.size(-1)
        attn = torch.matmul(query / np.sqrt(key_dim), key.transpose(2, 3))
        if mask is not None:
            mask = mask.unsqueeze(1)
            attn = attn.masked_fill(mask == 0, -1e9)
        attn = self.dropout(torch.softmax(attn, dim=-1))
        output = torch.matmul(attn, value)

        return output

# Multi-head attention layer
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dim, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert embedding_dim % num_heads == 0, "embedding_dim must be divisible by num_heads"
        self.embedding_dim = embedding_dim
        self.self_attention = SelfAttention(dropout)
        # The number of heads
        self.num_heads = num_heads
        # The dimension of each head
        self.dim_per_head = embedding_dim // num_heads
        # The linear projections
        self.query_projection = nn.Linear(embedding_dim, embedding_dim)
        self.key_projection = nn.Linear(embedding_dim, embedding_dim)
        self.value_projection = nn.Linear(embedding_dim, embedding_dim)
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(embedding_dim, embedding_dim)

    def forward(self, query, key, value, mask=None):
      batch_size = query.size(0)

      # Apply linear projections
      query = self.query_projection(query)
      key = self.key_projection(key)
      value = self.value_projection(value)

      # Reshape for multi-head attention
      query = query.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
      key = key.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
      value = value.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)

      # Calculate attention
      scores = self.self_attention(query, key, value, mask)

      # Reshape back
      output = scores.transpose(1, 2).contiguous().view(batch_size, -1, self.embedding_dim)

      # Final linear projection
      output = self.out(output)
      return output

# Norm layer
class Norm(nn.Module):
    def __init__(self, embedding_dim):
        super(Norm, self).__init__()
        self.norm = nn.LayerNorm(embedding_dim)

    def forward(self, x):
        return self.norm(x)


# Transformer encoder layer
class EncoderLayer(nn.Module):
    def __init__(self, embedding_dim, num_heads, ff_dim=2048, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(embedding_dim, num_heads, dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(embedding_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embedding_dim)
        )
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.norm1 = Norm(embedding_dim)
        self.norm2 = Norm(embedding_dim)

    def forward(self, x, mask=None):
        x2 = self.norm1(x)
        # Add and Muti-head attention
        x = x + self.dropout1(self.self_attention(x2, x2, x2, mask))
        x2 = self.norm2(x)
        x = x + self.dropout2(self.feed_forward(x2))
        return x

# Transformer decoder layer
class DecoderLayer(nn.Module):
    def __init__(self, embedding_dim, num_heads, ff_dim=2048, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(embedding_dim, num_heads, dropout)
        self.encoder_attention = MultiHeadAttention(embedding_dim, num_heads, dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(embedding_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embedding_dim)
        )
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        self.norm1 = Norm(embedding_dim)
        self.norm2 = Norm(embedding_dim)
        self.norm3 = Norm(embedding_dim)

    def forward(self, x, memory, source_mask, target_mask):
        x2 = self.norm1(x)
        x = x + self.dropout1(self.self_attention(x2, x2, x2, target_mask))
        x2 = self.norm2(x)
        x = x + self.dropout2(self.encoder_attention(x2, memory, memory, source_mask))
        x2 = self.norm3(x)
        x = x + self.dropout3(self.feed_forward(x2))
        return x

# Encoder transformer
class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_seq_len, num_heads, num_layers, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        self.layers = nn.ModuleList([EncoderLayer(embedding_dim, num_heads, 2048, dropout) for _ in range(num_layers)])
        self.norm = Norm(embedding_dim)
        # self.position_embedding = PositionalEncoder(embedding_dim, max_seq_len, dropout)
        self.position_embedding = LearnablePositionalEncoding(max_seq_len, embedding_dim)  # використовуємо навчуваний енкодинг

    def forward(self, source, source_mask):
        # Embed the source
        x = self.embedding(source)
        # Add the position embeddings
        x = self.position_embedding(x)
        # Propagate through the layers
        for layer in self.layers:
            x = layer(x, source_mask)
        # Normalize
        x = self.norm(x)
        return x

# Decoder transformer
class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_seq_len,num_heads, num_layers, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        self.layers = nn.ModuleList([DecoderLayer(embedding_dim, num_heads, 2048, dropout) for _ in range(num_layers)])
        self.norm = Norm(embedding_dim)
        # self.position_embedding = PositionalEncoder(embedding_dim, max_seq_len, dropout)
        self.position_embedding = LearnablePositionalEncoding(max_seq_len, embedding_dim)

    def forward(self, target, memory, source_mask, target_mask):
        # Embed the source
        x = self.embedding(target)
        # Add the position embeddings
        x = self.position_embedding(x)
        # Propagate through the layers
        for layer in self.layers:
            x = layer(x, memory, source_mask, target_mask)
        # Normalize
        x = self.norm(x)
        return x

# Transformers
class Transformer(nn.Module):
    def __init__(self, source_vocab_size, target_vocab_size, source_max_seq_len, target_max_seq_len, embedding_dim, num_heads, num_layers, dropout=0.1):
        super(Transformer, self).__init__()
        self.source_vocab_size = source_vocab_size
        self.target_vocab_size = target_vocab_size
        self.source_max_seq_len = source_max_seq_len
        self.target_max_seq_len = target_max_seq_len
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.dropout = dropout

        self.encoder = Encoder(source_vocab_size, embedding_dim, source_max_seq_len, num_heads, num_layers, dropout)
        self.decoder = Decoder(target_vocab_size, embedding_dim, target_max_seq_len, num_heads, num_layers, dropout)
        self.final_linear = nn.Linear(embedding_dim, target_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, source, target, source_mask, target_mask):
        # Encoder forward pass
        memory = self.encoder(source, source_mask)
        # Decoder forward pass
        output = self.decoder(target, memory, source_mask, target_mask)
        # Final linear layer
        output = self.dropout(output)
        output = self.final_linear(output)
        return output

    def make_source_mask(self, source_ids, source_pad_id):
        return (source_ids != source_pad_id).unsqueeze(-2)

    def make_target_mask(self, target_ids):
        batch_size, len_target = target_ids.size()
        subsequent_mask = (1 - torch.triu(torch.ones((1, len_target, len_target), device=target_ids.device), diagonal=1)).bool()
        return subsequent_mask


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Transformer(
    source_vocab_size=len(uk_vocab),
    target_vocab_size=len(en_vocab),
    embedding_dim=512,
    source_max_seq_len=256,
    target_max_seq_len=256,
    num_layers=6,
    num_heads=8,
    dropout=0.2
).to(device)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)

In [None]:
import torch.optim as optim

optim = torch.optim.Adam(model.parameters())

In [None]:
from tqdm import tqdm

def train_epoch(model, train_loader, optim, epoch, n_epochs, source_pad_id, target_pad_id, device):
    model.train()
    total_loss = []
    bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Training epoch {epoch+1}/{n_epochs}")
    for i, batch in bar:
        source, target = batch["uk_ids"].to(device), batch["en_ids"].to(device)
        target_input = target[:, :-1]
        source_mask, target_mask = model.make_source_mask(source, source_pad_id), model.make_target_mask(target_input)
        preds = model(source, target_input, source_mask, target_mask)
        optim.zero_grad()
        gold = target[:, 1:].contiguous().view(-1)
        loss = criterion(preds.view(-1, preds.size(-1)), gold.view(-1))
        loss.backward()
        optim.step()
        total_loss.append(loss.item())
        bar.set_postfix(loss=total_loss[-1])

    train_loss = sum(total_loss) / len(total_loss)
    return train_loss, total_loss

In [None]:
from tqdm import tqdm
import torch

@torch.no_grad()
def validate_epoch(model, val_loader, epoch, n_epochs, source_pad_id, target_pad_id, device):
    model.eval()
    total_loss = []
    bar = tqdm(enumerate(val_loader), total=len(val_loader), desc=f"Validating epoch {epoch+1}/{n_epochs}")

    for i, batch in bar:
        source, target = batch["uk_ids"].to(device), batch["en_ids"].to(device)
        target_input = target[:, :-1]
        source_mask = model.make_source_mask(source, source_pad_id)
        target_mask = model.make_target_mask(target_input)

        preds = model(source, target_input, source_mask, target_mask)
        gold = target[:, 1:].contiguous().view(-1)
        loss = criterion(preds.view(-1, preds.size(-1)), gold.view(-1))
        total_loss.append(loss.item())
        bar.set_postfix(loss=total_loss[-1])

    val_loss = sum(total_loss) / len(total_loss)
    return val_loss, total_loss

In [None]:
import numpy as np

n_epochs = 45
patience = 3 
best_valid_loss = float("inf")
epochs_no_improve = 0

for epoch in range(n_epochs):
    train_loss, train_losses = train_epoch(
        model=model,
        train_loader=train_data_loader,
        optim=optim,
        epoch=epoch,
        n_epochs=n_epochs,
        source_pad_id=pad_index,
        target_pad_id=pad_index,
        device=device
    )

    val_loss, val_losses = validate_epoch(
        model=model,
        val_loader=valid_data_loader,
        epoch=epoch,
        n_epochs=n_epochs,
        source_pad_id=pad_index,
        target_pad_id=pad_index,
        device=device
    )

    print(f"\tTrain Loss: {train_loss:7.3f} | Train PPL: {np.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {val_loss:7.3f} | Valid PPL: {np.exp(val_loss):7.3f}")

    if val_loss < best_valid_loss:
        best_valid_loss = val_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        print(f"No improvement for {epochs_no_improve} epoch(s).")

    if epochs_no_improve >= patience:
        print("Early stopping triggered.")
        break

In [None]:
from google.colab import drive
drive.mount('/content/drive')

model_save_path = '/content/drive/MyDrive/transformer/my_model_uk-en.pth'
torch.save(model.state_dict(), model_save_path)

In [None]:
import torch
import torch.nn.functional as F
from math import log

def translate_sentence(
    sentence,
    model,
    uk_nlp,
    en_nlp,
    uk_vocab,
    en_vocab,
    device,
    max_output_length=50,
    beam_size=3,
    lower=False,
    sos_token="<sos>",
    eos_token="<eos>",
    print_process=False
):
    """
    Translate a sentence using the Transformer model with beam search

    Args:
        sentence: Input sentence to translate (string or list of tokens)
        model: Transformer model
        uk_nlp: Ukrainian spacy tokenizer
        en_nlp: English spacy tokenizer
        uk_vocab: Source vocabulary
        en_vocab: Target vocabulary
        device: torch device
        max_output_length: Maximum output length
        beam_size: Beam size for beam search
        lower: Whether to lowercase tokens
        sos_token: Start of sentence token
        eos_token: End of sentence token
        print_process: Whether to print decoding process

    Returns:
        translated_tokens: List of translated tokens
    """
    model.eval()

    # Tokenize input sentence
    if isinstance(sentence, str):
        tokens = [token.text for token in uk_nlp.tokenizer(sentence)]
    else:
        tokens = [token for token in sentence]

    if lower:
        tokens = [token.lower() for token in tokens]

    # Add SOS/EOS and convert to indices
    tokens = [sos_token] + tokens + [eos_token]
    src_ids = torch.LongTensor(uk_vocab.lookup_indices(tokens)).unsqueeze(0).to(device)

    # Create source mask
    src_mask = model.make_source_mask(src_ids, uk_vocab[eos_token])

    # Encode source sentence
    with torch.no_grad():
        memory = model.encoder(src_ids, src_mask)

    # Initialize beams (tokens, log_prob)
    beams = [([en_vocab[sos_token]], 0.0)]
    completed = []

    for step in range(max_output_length):
        if not beams:
            break

        # Prepare batch for all beams
        beam_inputs = torch.LongTensor([beam[0] for beam in beams]).to(device)
        batch_size = beam_inputs.size(0)

        # Create target mask
        trg_mask = model.make_target_mask(beam_inputs)

        # Expand memory for batch
        memory_expanded = memory.expand(batch_size, -1, -1)

        # Decode
        with torch.no_grad():
            output = model.decoder(beam_inputs, memory_expanded, src_mask, trg_mask)
            logits = model.final_linear(output[:, -1, :])
            log_probs = F.log_softmax(logits, dim=-1)

        # Get top candidates
        top_k_scores, top_k_indices = log_probs.topk(beam_size, dim=1)

        # Generate new beams
        new_beams = []
        for i, (tokens, score) in enumerate(beams):
            for j in range(beam_size):
                new_tokens = tokens + [top_k_indices[i,j].item()]
                new_score = score + top_k_scores[i,j].item()
                new_beams.append((new_tokens, new_score))

        # Select top beams
        beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_size]

        # Check for completed beams
        new_beams = []
        for beam in beams:
            tokens, score = beam
            last_token = tokens[-1]

            if last_token == en_vocab[eos_token] or step == max_output_length-1:
                completed.append(beam)
            else:
                new_beams.append(beam)

        beams = new_beams

        if not beams:
            break

    # If no beams completed, use incomplete beams
    if not completed:
        completed = beams[:1]

    # Length normalization (to favor longer sequences)
    completed = [
        (tokens, score / (len(tokens)**0.7))
        for tokens, score in completed
    ]

    # Select best beam
    best_tokens = max(completed, key=lambda x: x[1])[0]

    # Convert indices to tokens
    translated_tokens = en_vocab.lookup_tokens(best_tokens)

    # Remove SOS/EOS
    translated_tokens = [t for t in translated_tokens if t not in [sos_token, eos_token]]

    return translated_tokens

In [None]:
sentence = test_data[0]["uk"]
expected_translation = test_data[0]["en"]

sentence, expected_translation

In [None]:
translation = translate_sentence(
    sentence,
    model,
    uk_nlp,
    en_nlp,
    uk_vocab,
    en_vocab,
    device,
)

In [None]:
translation

In [None]:
translations = [
    translate_sentence(
        example["uk"],  
        model,          
        uk_nlp,         
        en_nlp,         
        uk_vocab,       
        en_vocab,       
        device,         
        print_process=False
    )
    for example in tqdm(test_data)
]

In [None]:
predictions = [" ".join(translation[1:-1]) for translation in translations]

references = [[example["en"]] for example in test_data]

In [None]:
predictions[0], references[0]

In [None]:
def get_tokenizer_fn(nlp, lower):
    def tokenizer_fn(s):
        tokens = [token.text for token in nlp.tokenizer(s)]
        if lower:
            tokens = [token.lower() for token in tokens]
        return tokens

    return tokenizer_fn

In [None]:
tokenizer_fn = get_tokenizer_fn(en_nlp, lower)

In [None]:
predictions = [tokenizer_fn(" ".join(translation[1:-1])) for translation in translations]
references = [[tokenizer_fn(example["en"].strip())] for example in test_data]

In [None]:
len(predictions), len(references)

In [None]:
print("Sample translations:")
for i in range(10):
    example = test_data[i]  
    source = example["uk"] 
    ref = example["en"]     
    hyp = " ".join(predictions[i])  

    print(f"\nExample {i+1}:")
    print(f"Source (UK): {source}")
    print(f"Reference (EN): {ref}")
    print(f"Translation (EN): {hyp}")


In [None]:
from torchtext.data.metrics import bleu_score

score = bleu_score(
    predictions, references
)

print(f"BLEU score: {score}")