In [None]:
# Install required package for tokenization
!pip install spacy
!pip install bnlp-toolkit
!pip install datasets

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
from datasets import load_dataset
from collections import Counter
import re
import math
import time
import matplotlib.pyplot as plt
import spacy
import bnlp
from bnlp import BasicTokenizer
import unicodedata
import os

In [None]:
# Set seeds for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# DATA PREPROCESSING

OPUS-100 (bn–en) is a large bilingual translation dataset taken from the OPUS multilingual corpus. It contains parallel sentence pairs in Bengali (bn) and English (en), where each example includes the same meaning expressed in both languages. This dataset is designed for machine translation and multilingual natural language processing tasks. When loaded with `load_dataset("opus100", "bn-en")`, it provides training, validation, and test splits that can be used to build, tune, and evaluate translation models. The bn–en subset includes roughly one million sentence pairs collected from a wide range of open sources such as websites, subtitles, books, and public documents. This diversity helps models learn natural language patterns from real-world text, making OPUS-100 a valuable resource for Bengali–English translation research and applications.


In [None]:
# Load dataset
dataset = load_dataset("opus100", "bn-en")

In [None]:
# Extract data
def extract_data(data, sample=None):
    if sample is not None:
        data = data.select(range(min(sample, len(data))))
    eng = [d["translation"]["en"] for d in data]
    ban = [d["translation"]["bn"] for d in data]
    return ban, eng

In [None]:
# Use smaller dataset for faster training
sample_size = 50000
train_ban, train_eng = extract_data(dataset["train"], sample_size)
val_ban, val_eng = extract_data(dataset["validation"], sample_size // 10)
test_ban, test_eng = extract_data(dataset["test"], sample_size // 10)

In [None]:
print(f"Train: {len(train_ban)} samples")
print(f"Validation: {len(val_ban)} samples")
print(f"Test: {len(test_ban)} samples")

In [None]:
# Preprocessing classes
class ProcessBengaliCorpus:
    def __init__(self) -> None:
        self.data = None
        self.tokenizer = BasicTokenizer()

    def clean_data(self, data):
        # Remove English characters and numbers for pure Bengali
        self.data = list(map(lambda x: re.sub(r"[a-zA-Z0-9\()\_\-]", "", x), data))

        # Remove spaces around hasanta (্) (U+09CD)
        self.data = list(map(lambda x: re.sub(r"\s*\u09cd\s*", "\u09cd", x), self.data))

        # Remove standalone hasanta (্)
        self.data = list(map(lambda x: re.sub(r"\s+\u09cd", "", x), self.data))
        self.data = list(map(lambda x: re.sub(r"\u09cd\s+", "", x), self.data))

        # Remove extra whitespace
        self.data = list(map(lambda x: re.sub(r"\s+", " ", x), self.data))

        # Standardize punctuation
        self.data = [i.replace("ঃ", ":") for i in self.data]
        self.data = [i.replace("।", ".") for i in self.data]

        self.data = [i.strip() for i in self.data]

        # Normalize Unicode
        self.data = [unicodedata.normalize("NFC", i) for i in self.data]

        return self.data

    def tokenize_bengla(self, sen):
        # Tokenize Bengali text
        tokenized = self.tokenizer.tokenize(sen.strip())
        token = [str(t).strip() for t in tokenized if str(t).strip()]
        return token

In [None]:
class ProcessEnglishCorpus:
    def __init__(self) -> None:
        self.data = None
        self.eng_tokenizer = spacy.load(
            "en_core_web_sm", disable=["parser", "ner", "textcat"]
        )

    def clean_data(self, data):
        # Lowercase each word
        self.data = [i.lower() for i in data]

        # Keep basic punctuation
        self.data = list(map(lambda x: re.sub(r"[^\w\s\.\?\!,']", "", x), self.data))

        # Remove extra spaces
        self.data = list(map(lambda x: re.sub(r"\s+", " ", x), self.data))

        self.data = [i.strip() for i in self.data]

        return self.data

    def tokenize_english(self, text):
        # Tokenize English text
        tokenized = self.eng_tokenizer(text.strip())
        token = [t.text.lower() for t in tokenized]
        return token

In [None]:
!python -m spacy download en_core_web_sm

In [None]:
# Instantiate preprocessors
bn_proc = ProcessBengaliCorpus()
en_proc = ProcessEnglishCorpus()

In [None]:
# Clean all datasets
train_bn_clean = bn_proc.clean_data(train_ban)
train_en_clean = en_proc.clean_data(train_eng)
val_bn_clean = bn_proc.clean_data(val_ban)
val_en_clean = en_proc.clean_data(val_eng)
test_bn_clean = bn_proc.clean_data(test_ban)
test_en_clean = en_proc.clean_data(test_eng)

In [None]:
def tokenize_lists(str_list, lang="bn"):
    """Tokenize source and target"""
    if lang == "bn":
        return [bn_proc.tokenize_bengla(i) for i in str_list]
    else:
        return [en_proc.tokenize_english(i) for i in str_list]

In [None]:
# Tokenize datasets
train_bn_toks = tokenize_lists(train_bn_clean, lang="bn")
train_en_toks = tokenize_lists(train_en_clean, lang="en")
val_bn_toks = tokenize_lists(val_bn_clean, lang="bn")
val_en_toks = tokenize_lists(val_en_clean, lang="en")
test_bn_toks = tokenize_lists(test_bn_clean, lang="bn")
test_en_toks = tokenize_lists(test_en_clean, lang="en")

In [None]:
# Vocabulary class
class Vocab:
    def __init__(self):
        self.specials = ["<pad>", "<sos>", "<eos>", "<unk>"]

    def vocab_builder(self, data, max_size=30000, min_freq=2):
        counter = Counter()
        for sent in data:
            counter.update(sent)

        # Filter tokens by min_freq and exclude any special tokens that may appear in corpus
        words = [
            w for w, f in counter.items() if f > min_freq and w not in self.specials
        ]
        # Sort max to min
        words = sorted(words, key=lambda w: counter[w], reverse=True)
        
        # Reserve space for specials
        if max_size:
            words = words[: max_size - len(self.specials)]

        # mappings
        self.itos = list(self.specials) + words # main vocabulary 
        self.stoi = {w: i for i, w in enumerate(self.itos)} # word2key

        self.pad_idx = self.stoi["<pad>"]
        self.sos_idx = self.stoi["<sos>"]
        self.eos_idx = self.stoi["<eos>"]
        self.unk_idx = self.stoi["<unk>"]

    def get_itos(self):
        return self.itos

    def get_stoi(self):
        return self.stoi

    def encode(self, tokens, add_eos=True):
        #return indx of each token from main vocabulary
        ids = [self.stoi.get(t, self.unk_idx) for t in tokens]
        if add_eos:
            # add <eos> only for target
            ids.append(self.eos_idx)
        return ids

    def decode(self, ids):
        # get words based on ids
        out = []
        for i in ids:
            tok = self.itos[i] if 0 <= i < len(self.itos) else "<unk>"
            if tok in ("<eos>", "<pad>"):
                break
            if tok == "<sos>":
                continue
            out.append(tok)
        return out

In [None]:
# Build vocabularies from training data
src_vocab = Vocab()
src_vocab.vocab_builder(train_en_toks, max_size=50000, min_freq=2)
tgt_vocab = Vocab()
tgt_vocab.vocab_builder(train_bn_toks, max_size=50000, min_freq=2)

In [None]:
t = train_en_toks[0]
en = src_vocab.encode(t)
de = src_vocab.decode(en)
print(f"text= {t}")
print(f"encoded= {en}")
print(f"decoded= {de}")

In [None]:
t = train_bn_toks[0]
en = tgt_vocab.encode(t)
de = tgt_vocab.decode(en)
print(f"text= {t}")
print(f"encoded= {en}")
print(f"decoded= {de}")

In [None]:
# Dataset class
class BengaliEnglishDataset(Dataset):
    def __init__(
        self, src_token_lists, tgt_token_lists, src_vocab, tgt_vocab, max_len=50
    ):
        self.src_token_lists = src_token_lists
        self.tgt_token_lists = tgt_token_lists
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_token_lists)

    def __getitem__(self, idx):
        src_tokens = self.src_token_lists[idx]
        tgt_tokens = self.tgt_token_lists[idx]

        # Encode with special tokens
        src_ids = [self.src_vocab.sos_idx] + self.src_vocab.encode(
            src_tokens, add_eos=False
        )
        tgt_ids = [self.tgt_vocab.sos_idx] + self.tgt_vocab.encode(
            tgt_tokens, add_eos=False
        )

        # Truncate if too long
        if len(src_ids) > self.max_len:
            src_ids = src_ids[: self.max_len - 1] + [self.src_vocab.eos_idx]
        else:
            src_ids = src_ids + [self.src_vocab.eos_idx]

        if len(tgt_ids) > self.max_len:
            tgt_ids = tgt_ids[: self.max_len - 1] + [self.tgt_vocab.eos_idx]
        else:
            tgt_ids = tgt_ids + [self.tgt_vocab.eos_idx]

        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(
            tgt_ids, dtype=torch.long
        )


# Collate function
def collate_fn(batch):
    src_list, tgt_list = [], []
    for src, tgt in batch:
        src_list.append(src)
        tgt_list.append(tgt)

    src_padded = pad_sequence(
        src_list, batch_first=True, padding_value=src_vocab.pad_idx
    )
    tgt_padded = pad_sequence(
        tgt_list, batch_first=True, padding_value=tgt_vocab.pad_idx
    )

    return src_padded, tgt_padded

In [None]:
# Create datasets and dataloaders
max_len = 50
batch_size = 32

train_dataset = BengaliEnglishDataset(
    train_en_toks, train_bn_toks, src_vocab, tgt_vocab, max_len=max_len
)
val_dataset = BengaliEnglishDataset(
    val_en_toks, val_bn_toks, src_vocab, tgt_vocab, max_len=max_len
)
test_dataset = BengaliEnglishDataset(
    test_en_toks, test_bn_toks, src_vocab, tgt_vocab, max_len=max_len
)

train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
)
val_loader = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)

print(f"Source vocabulary size: {len(src_vocab.itos)}")
print(f"Target vocabulary size: {len(tgt_vocab.itos)}")
print(
    f"Special tokens - PAD: {src_vocab.pad_idx}, SOS: {src_vocab.sos_idx}, EOS: {src_vocab.eos_idx}"
)

# TRANSFORMER MODEL

In [None]:
# Multi-Head Attention Layer
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, dropout, device):
        super().__init__()
        assert hid_dim % n_heads == 0
        self.hid_dim = hid_dim
        self.n_heads = n_heads
        self.head_dim = hid_dim // n_heads # Size per head

        # Linear layers to create Query, Key, Value
        self.fc_q = nn.Linear(hid_dim, hid_dim)
        self.fc_k = nn.Linear(hid_dim, hid_dim)
        self.fc_v = nn.Linear(hid_dim, hid_dim)
        # Final output projection
        self.fc_o = nn.Linear(hid_dim, hid_dim)
       
        self.dropout = nn.Dropout(dropout)
       
        # Scaling factor to stabilize gradients
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]

        # Project inputs into Q, K, V
        Q = self.fc_q(query) # (batch, query_len, hid_dim)
        K = self.fc_k(key) # (batch, key_len,   hid_dim)
        V = self.fc_v(value) # (batch, value_len, hid_dim)

        # Split into heads and rearrange:
        # (batch, seq_len, hid_dim) → (batch, heads, seq_len, head_dim)
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)

        # energy shape: (batch, heads, query_len, key_len)
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = torch.softmax(energy, dim=-1)

        x = torch.matmul(self.dropout(attention), V) # (batch, heads, query_len, head_dim)
        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(batch_size, -1, self.hid_dim)
        x = self.fc_o(x)

        return x, attention


# Position-wise Feedforward Layer
class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, hid_dim, pf_dim, dropout):
        super().__init__()
        self.fc1 = nn.Linear(hid_dim, pf_dim) # First linear layer expands the hidden dimension
        self.fc2 = nn.Linear(pf_dim, hid_dim) # Second linear layer projects back to hidden size
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.dropout(torch.relu(self.fc1(x))) # (batch, seq_len, pf_dim)
        x = self.fc2(x) # (batch, seq_len, hid_dim)
        return x


# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
        super().__init__()
        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device) # Multi-head self-attention
        self.feedforward = PositionwiseFeedforwardLayer(hid_dim, pf_dim, dropout) # Multi-head self-attention
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        # Self-Attention Block
        _src, _ = self.self_attention(src, src, src, src_mask)
        src = self.self_attn_layer_norm(src + self.dropout(_src)) # Residual connection + layer normalization
        # Feedforward Block
        _src = self.feedforward(src)
        src = self.ff_layer_norm(src + self.dropout(_src)) # Residual connection + layer normalization

        return src


# Encoder
class Encoder(nn.Module):
    def __init__(
        self,
        input_dim,
        hid_dim,
        n_layers,
        n_heads,
        pf_dim,
        dropout,
        device,
        max_length=100,
    ):
        super().__init__()
        self.device = device
        # Token embedding: converts word index → vector
        self.tok_embedding = nn.Embedding(input_dim, hid_dim)
        # Positional embedding: adds position information
        self.pos_embedding = nn.Embedding(max_length, hid_dim)
        # Stack of encoder layers
        self.layers = nn.ModuleList(
            [
                EncoderLayer(hid_dim, n_heads, pf_dim, dropout, device)
                for _ in range(n_layers)
            ]
        )
        self.dropout = nn.Dropout(dropout)
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(self.device)

    def forward(self, src, src_mask):
        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = (
            torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        ) # (batch, seq_len)
        src = self.dropout(
            (self.tok_embedding(src) * self.scale) + self.pos_embedding(pos)
        )

        for layer in self.layers:
            src = layer(src, src_mask)

        return src


# Decoder Layer
class DecoderLayer(nn.Module):
    def __init__(self, hid_dim, n_heads, pf_dim, dropout, device):
        super().__init__()
        # Layer norms for each sub-layer
        self.self_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.enc_attn_layer_norm = nn.LayerNorm(hid_dim)
        self.ff_layer_norm = nn.LayerNorm(hid_dim)
        # Self-attention
        self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout, device)
        # Encoder-decoder attention
        self.encoder_attention = MultiHeadAttentionLayer(
            hid_dim, n_heads, dropout, device
        )

        self.feedforward = PositionwiseFeedforwardLayer(hid_dim, pf_dim, dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        _trg, _ = self.self_attention(trg, trg, trg, trg_mask)
        trg = self.self_attn_layer_norm(trg + self.dropout(_trg))

        _trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
        trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))

        _trg = self.feedforward(trg)
        trg = self.ff_layer_norm(trg + self.dropout(_trg))

        return trg, attention


# Decoder
class Decoder(nn.Module):
    def __init__(
        self,
        output_dim,
        hid_dim,
        n_layers,
        n_heads,
        pf_dim,
        dropout,
        device,
        max_length=100,
    ):
        super().__init__()
        self.device = device
        self.tok_embedding = nn.Embedding(output_dim, hid_dim)
        self.pos_embedding = nn.Embedding(max_length, hid_dim)
        self.layers = nn.ModuleList(
            [
                DecoderLayer(hid_dim, n_heads, pf_dim, dropout, device)
                for _ in range(n_layers)
            ]
        )
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        self.scale = torch.sqrt(torch.FloatTensor([hid_dim])).to(device)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        pos = (
            torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        )
        trg = self.dropout(
            (self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos)
        )

        for layer in self.layers:
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)

        output = self.fc_out(trg)
        return output, attention


# Seq2Seq Transformer
class Seq2SeqTransformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask

    def make_trg_mask(self, trg):
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(
            torch.ones((trg_len, trg_len), device=self.device)
        ).bool()
        trg_mask = trg_pad_mask & trg_sub_mask
        return trg_mask

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        enc_src = self.encoder(src, src_mask)
        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)

        return output, attention


# Initialize weights
def initialize_weights(m):
    if hasattr(m, "weight") and m.weight.dim() > 1:
        nn.init.xavier_uniform_(m.weight.data)

## TRAINING FUNCTIONS

In [None]:
def train(model, loader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0 # To track total loss for this epoch

    for src, trg in loader:
        # Loop over each batch from the DataLoader
        src, trg = src.to(device), trg.to(device)

        optimizer.zero_grad()

        # Forward pass
        output, _ = model(src, trg[:, :-1])
        output_dim = output.shape[-1]
        # Flatten predictions: (batch, seq_len, vocab) → (batch*seq_len, vocab)
        output = output.reshape(-1, output_dim)

        # Shift target: remove first token (usually <sos>)
        # and flatten to match output shape
        trg = trg[:, 1:].reshape(-1)

        # Compute loss between predicted tokens and real tokens
        loss = criterion(output, trg)
        loss.backward()
        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(loader)


def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        # Disable gradient calculation
        for src, trg in loader:
            src, trg = src.to(device), trg.to(device)

            # Forward pass (no teacher forcing)
            output, _ = model(src, trg[:, :-1])
            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            # Compute validation loss
            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(loader)

In [None]:
INPUT_DIM = len(src_vocab.itos)
OUTPUT_DIM = len(tgt_vocab.itos)
HID_DIM = 128
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1
SRC_PAD_IDX = src_vocab.pad_idx
TRG_PAD_IDX = tgt_vocab.pad_idx

# Create encoder and decoder
enc = Encoder(
    INPUT_DIM, HID_DIM, ENC_LAYERS, ENC_HEADS, ENC_PF_DIM, ENC_DROPOUT, device
)
dec = Decoder(
    OUTPUT_DIM, HID_DIM, DEC_LAYERS, DEC_HEADS, DEC_PF_DIM, DEC_DROPOUT, device
)

# Create model
model = Seq2SeqTransformer(enc, dec, SRC_PAD_IDX, TRG_PAD_IDX, device).to(device)
model.apply(initialize_weights)


# Count parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f"The model has {count_parameters(model):,} trainable parameters")

# Initialize optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.0005)
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

## TRAINING TRANSFORMER

In [None]:
N_EPOCHS = 10
CLIP = 1

best_valid_loss = float('inf')
train_losses = []
valid_losses = []
train_ppl = []
val_ppl = []
start_time = time.perf_counter()

for epoch in range(N_EPOCHS):

    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, val_loader, criterion)

    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_ppl.append(math.exp(train_loss))
    val_ppl.append(math.exp(valid_loss))

    # save model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "best-transformer-model.pt")

    if (epoch+1) % 2 == 0:
        print(f"Epoch: {epoch+1}")
        print(f"\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}")
        print(f"\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}")

end_time =  time.perf_counter() - start_time 

In [None]:
print(f"Avg. training loss : {sum(train_losses)/N_EPOCHS}")
print(f"Avg. training ppl loss : {sum(train_ppl)/N_EPOCHS}")
print(f"Avg. validation loss : {sum(valid_losses)/N_EPOCHS}")
print(f"Avg. validation ppl loss : {sum([val_ppl])/N_EPOCHS}")
print(f"Training time(ms) : {end_time * 1000}")

<h4>Tranining results</h4>
<img src="trans.png" alt="Tranining results">


<h4>Training and Validation Loss results</h4>
<img src="transTVLOSS.png" alt="Training and Validation Loss results" width=500 height=300>


<h4>Test results</h4>
<img src="transTest.png" alt="Test results">


<h4>Attention heatmap (Head 0)</h4>
<img src="transATVH.png" alt="Test results" width=500 height=300>


# SAVE MODEL

In [None]:

checkpoint = {
    "model_state_dict": model.state_dict(),
    "hparams": {
        "input_dim": INPUT_DIM,
        "output_dim": OUTPUT_DIM,
        "hid_dim": HID_DIM,
        "enc_layers": ENC_LAYERS,
        "dec_layers": DEC_LAYERS,
        "enc_heads": ENC_HEADS,
        "dec_heads": DEC_HEADS,
        "enc_pf_dim": ENC_PF_DIM,
        "dec_pf_dim": DEC_PF_DIM,
        "enc_dropout": ENC_DROPOUT,
        "dec_dropout": DEC_DROPOUT,
        "src_pad_idx": SRC_PAD_IDX,
        "trg_pad_idx": TRG_PAD_IDX,
        "max_length": max_len        # important: sequence length used for padding / pos enc

    },
    "src_itos": src_vocab.get_itos(),
    "tgt_itos": tgt_vocab.get_itos(),
    "src_stoi": src_vocab.get_stoi(),   # optional (you can rebuild from itos)
    "tgt_stoi": tgt_vocab.get_stoi(),
}

torch.save(checkpoint, "transformer_checkpoint.pt")


## TESTING TRANSFORMER

In [None]:
model.load_state_dict(torch.load("best-transformer-model.pt"))

In [None]:
test_loss = evaluate(model, test_loader, criterion)
print(f"\nTest Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}")

In [None]:
import torch

path = os.path.join(os.getcwd(), "app", "saved_models")

CKPT_PATH = os.path.join(path, "transformer_checkpoint.pt")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1) load checkpoint
ckpt = torch.load(CKPT_PATH, map_location=device)
h = ckpt["hparams"]

# 2) rebuild encoder/decoder exactly as at training
enc = Encoder(
    h["input_dim"],
    h["hid_dim"],
    h["enc_layers"],
    h["enc_heads"],
    h["enc_pf_dim"],
    h["enc_dropout"],
    device,
)

dec = Decoder(
    h["output_dim"],
    h["hid_dim"],
    h["dec_layers"],
    h["dec_heads"],
    h["dec_pf_dim"],
    h["dec_dropout"],
    device,
)

# 3) seq2seq wrapper
model = Seq2SeqTransformer(enc, dec, h["src_pad_idx"], h["trg_pad_idx"], device)

# 4) load state_dict and move to device
model.load_state_dict(ckpt["model_state_dict"])
model.to(device)
model.eval()   # ensure eval mode

# 5) rebuild vocabs
src_vocab = Vocab()
tgt_vocab = Vocab()

src_vocab.itos = ckpt["src_itos"]
tgt_vocab.itos = ckpt["tgt_itos"]

src_vocab.stoi = {w: i for i, w in enumerate(src_vocab.itos)}
tgt_vocab.stoi = {w: i for i, w in enumerate(tgt_vocab.itos)}

src_vocab.pad_idx = src_vocab.stoi["<pad>"]
src_vocab.sos_idx = src_vocab.stoi["<sos>"]
src_vocab.eos_idx = src_vocab.stoi["<eos>"]
src_vocab.unk_idx = src_vocab.stoi["<unk>"]

tgt_vocab.pad_idx = tgt_vocab.stoi["<pad>"]
tgt_vocab.sos_idx = tgt_vocab.stoi["<sos>"]
tgt_vocab.eos_idx = tgt_vocab.stoi["<eos>"]
tgt_vocab.unk_idx = tgt_vocab.stoi["<unk>"]

def translate_sentence(sentence, model, src_vocab, tgt_vocab, device, max_len=50):
    model.eval()

    # Tokenize and encode source sentence
    tokens = en_proc.tokenize_english(sentence)
    tokens = (
        [src_vocab.sos_idx]
        + src_vocab.encode(tokens, add_eos=False)
        + [src_vocab.eos_idx]
    )
    src_tensor = torch.LongTensor(tokens).unsqueeze(0).to(device)

    # Create source mask
    src_mask = model.make_src_mask(src_tensor)

    with torch.no_grad():
        enc_src = model.encoder(src_tensor, src_mask)

    # Initialize target with <sos> token
    trg_indexes = [tgt_vocab.sos_idx]

    for i in range(max_len):
        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)
        trg_mask = model.make_trg_mask(trg_tensor)

        with torch.no_grad():
            output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)

        pred_token = output.argmax(2)[:, -1].item()
        trg_indexes.append(pred_token)

        if pred_token == tgt_vocab.eos_idx:
            break

    # Decode target tokens
    trg_tokens = tgt_vocab.decode(trg_indexes)

    return " ".join(trg_tokens), attention


# 6) call your translate function (unchanged)
english_sentence = "the shop is closed at night"
translation, attention = translate_sentence(
    english_sentence, model, src_vocab, tgt_vocab, device, max_len=h.get("max_length", 50)
)

print("EN:", english_sentence)
print("BN:", translation)


In [None]:
def translate_sentence(sentence, model, src_vocab, tgt_vocab, device, max_len=50):
    model.eval()

    # Tokenize and encode source sentence
    tokens = en_proc.tokenize_english(sentence)
    tokens = (
        [src_vocab.sos_idx]
        + src_vocab.encode(tokens, add_eos=False)
        + [src_vocab.eos_idx]
    )
    src_tensor = torch.LongTensor(tokens).unsqueeze(0).to(device)

    # Create source mask
    src_mask = model.make_src_mask(src_tensor)

    with torch.no_grad():
        enc_src = model.encoder(src_tensor, src_mask)

    # Initialize target with <sos> token
    trg_indexes = [tgt_vocab.sos_idx]

    for i in range(max_len):
        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)
        trg_mask = model.make_trg_mask(trg_tensor)

        with torch.no_grad():
            output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)

        pred_token = output.argmax(2)[:, -1].item()
        trg_indexes.append(pred_token)

        if pred_token == tgt_vocab.eos_idx:
            break

    # Decode target tokens
    trg_tokens = tgt_vocab.decode(trg_indexes)

    return " ".join(trg_tokens), attention


example_idx = 7
example_src = test_eng[example_idx]
example_tgt = test_ban[example_idx]

print(f"\nExample {example_idx + 1}:")
print(f"Source (EN): {example_src}")
print(f"Target (BN): {example_tgt}")

# Clean and tokenize for translation
clean_src = en_proc.clean_data([example_src])[0]
translation, attention = translate_sentence(
    clean_src, model, src_vocab, tgt_vocab, device
)
print(f"translation (BN): {translation}")

In [None]:
# PLOT TRAINING LOSS
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label="Train Loss")
plt.plot(valid_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training and Validation Loss")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# ATTENTION VISUALIZATION 
def display_attention(sentence, translation, attention, n_heads=8, head=0):
    """
    Display attention for a specific head
    """
    if attention.dim() == 4:
        attention = attention.squeeze(0)[head]  # Take first head

    fig, ax = plt.subplots(figsize=(12, 8))

    # Get tokens
    src_tokens = ["<sos>"] + en_proc.tokenize_english(sentence) + ["<eos>"]
    trg_tokens = ["<sos>"] + translation.split() + ["<eos>"]

    # Create heatmap
    cax = ax.matshow(attention.cpu().detach().numpy(), cmap="viridis")

    # Set ticks
    ax.set_xticks(range(len(src_tokens)))
    ax.set_yticks(range(len(trg_tokens)))
    ax.set_xticklabels(src_tokens, rotation=45)
    ax.set_yticklabels(trg_tokens)

    ax.set_xlabel("Source Tokens")
    ax.set_ylabel("Target Tokens")
    ax.set_title(f"Attention Visualization (Head {head})")

    plt.colorbar(cax)
    plt.tight_layout()
    plt.show()


# Visualize attention for the example
if attention is not None:
    display_attention(clean_src, translation, attention)