In [22]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import math
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
from pyvi import ViTokenizer
import spacy
import itertools
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
import time

#### Tạo tokenizer cho English và Vietnamese

In [23]:
spacy_en = spacy.load("en_core_web_sm")

def tokenize_en(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text.strip())]

def tokenize_vi(text):
    return ViTokenizer.tokenize(text.strip().lower()).split()

#### Build vocabulary

In [24]:
def build_vocab(sentences, min_freq=2, specials=["<pad>", "<unk>"]):
    counter = Counter(itertools.chain(*sentences))
    vocab = {tok: idx for idx, tok in enumerate(specials)}
    idx = len(vocab)
    for token, freq in counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = idx
            idx += 1
    vocab['itos'] = {i: s for s, i in vocab.items()}
    return vocab

def encode(sentence, vocab):
    return [vocab.get(tok, vocab["<unk>"]) for tok in sentence]

def decode(indices, vocab):
    return [vocab['itos'][idx] for idx in indices if idx in vocab['itos']]

#### Dataset

In [25]:
class TranslationDataset(Dataset):
    def __init__(self, src_file, tgt_file, src_tokenizer, tgt_tokenizer):
        with open(src_file, 'r', encoding='utf-8') as f:
            self.src_sentences = f.readlines()
        with open(tgt_file, 'r', encoding='utf-8') as f:
            self.tgt_sentences = f.readlines()

        assert len(self.src_sentences) == len(self.tgt_sentences)

        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer

        self.src_tokens = [self.src_tokenizer(s) for s in self.src_sentences]
        self.tgt_tokens = [["<bos>"] + self.tgt_tokenizer(s) + ["<eos>"] for s in self.tgt_sentences]

    def __len__(self):
        return len(self.src_tokens)

    def __getitem__(self, idx):
        return self.src_tokens[idx], self.tgt_tokens[idx]



#### Collate function

In [26]:
def collate_fn(batch, src_vocab, tgt_vocab):
    src_batch, tgt_batch = zip(*batch)

    src_indices = [torch.tensor(encode(sent, src_vocab), dtype=torch.long) for sent in src_batch]
    tgt_indices = [torch.tensor(encode(sent, tgt_vocab), dtype=torch.long) for sent in tgt_batch]

    src_batch_padded = pad_sequence(src_indices, padding_value=src_vocab["<pad>"], batch_first=True)
    tgt_batch_padded = pad_sequence(tgt_indices, padding_value=tgt_vocab["<pad>"], batch_first=True)

    return src_batch_padded, tgt_batch_padded

In [None]:
src_file = "../Released Corpus/train.en.txt" 
tgt_file = "../Released Corpus/train.vi.txt"

train_dataset = TranslationDataset(src_file, tgt_file, tokenize_en, tokenize_vi)

src_token_lists = train_dataset.src_tokens
tgt_token_lists = train_dataset.tgt_tokens

src_vocab = build_vocab(src_token_lists)
tgt_vocab = build_vocab(tgt_token_lists, specials=["<pad>", "<unk>", "<bos>", "<eos>"])

train_loader = DataLoader(
    train_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=lambda batch: collate_fn(batch, src_vocab, tgt_vocab)
)

# Cài đặt Transformer

#### Positional Encoding

In [28]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=1000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.pe = pe.unsqueeze(0)  # shape (1, max_len, d_model)

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        x = x + self.pe[:, :x.size(1)].to(x.device)
        return x

#### Scaled-dot Attention

In [29]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size(-1)
    scores = q @ k.transpose(-2, -1) / math.sqrt(d_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    attn = F.softmax(scores, dim=-1)
    return attn @ v, attn

#### Multi-Head Attention Layer

In [30]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.linear_q = nn.Linear(d_model, d_model)
        self.linear_k = nn.Linear(d_model, d_model)
        self.linear_v = nn.Linear(d_model, d_model)
        self.out_proj = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(0.1)

    def forward(self, q, k, v, mask=None):
        B, L_q, _ = q.shape
        B, L_k, _ = k.shape
        B, L_v, _ = v.shape


        q = self.linear_q(q).view(B, L_q, self.num_heads, self.d_k).transpose(1, 2)
        k = self.linear_k(k).view(B, L_k, self.num_heads, self.d_k).transpose(1, 2)
        v = self.linear_v(v).view(B, L_v, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))

        attn = torch.softmax(scores, dim=-1)
        attn = self.dropout(attn)

        out = torch.matmul(attn, v)  # (B, heads, L_q, d_k)
        out = out.transpose(1, 2).contiguous().view(B, L_q, self.d_model)
        return self.out_proj(out)

#### Feed-Forward Layer

In [31]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
        )

    def forward(self, x):
        return self.net(x)

#### Encoder Layer

In [32]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out = self.attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_out))
        ff_out = self.ff(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x

#### Decoder Layer


In [33]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.enc_attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask=None, tgt_mask=None):
        x2 = self.norm1(x + self.dropout(self.self_attn(x, x, x, tgt_mask)))
        x2 = self.norm2(x2 + self.dropout(self.enc_attn(x2, enc_out, enc_out, src_mask)))
        x2 = self.norm3(x2 + self.dropout(self.ff(x2)))
        return x2

#### Encoder & Decoder

In [34]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pos_enc = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(N)])

    def forward(self, src, mask=None):
        x = self.embed(src)
        x = self.pos_enc(x)
        for layer in self.layers:
            x = layer(x, mask)
        return x

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pos_enc = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(N)])
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, tgt, enc_out, src_mask=None, tgt_mask=None):
        x = self.embed(tgt)
        x = self.pos_enc(x)
        for layer in self.layers:
            x = layer(x, enc_out, src_mask, tgt_mask)
        return self.fc_out(x)

#### Full Model

In [35]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, N=6, num_heads=8, d_ff=2048, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(src_vocab_size, d_model, N, num_heads, d_ff, dropout)
        self.decoder = Decoder(tgt_vocab_size, d_model, N, num_heads, d_ff, dropout)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_out = self.encoder(src, src_mask)
        out = self.decoder(tgt, enc_out, src_mask, tgt_mask)
        return out


# Huấn luyện mô hình

In [36]:
def generate_square_subsequent_mask(sz):
    mask = torch.triu(torch.ones(sz, sz), diagonal=1)
    return mask == 0  # True là vị trí hợp lệ

In [37]:
def create_optimizer_and_loss(model, tgt_pad_idx, lr=1e-4):
    criterion = nn.CrossEntropyLoss(ignore_index=tgt_pad_idx)
    optimizer = optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.98), eps=1e-9)
    return optimizer, criterion

In [38]:
def train_model(model, dataloader, optimizer, criterion, tgt_pad_idx, device, num_epochs=10, print_every=100):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for i, (src, tgt) in enumerate(dataloader):
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            src_mask = None
            tgt_mask = generate_square_subsequent_mask(tgt_input.size(1)).to(device)

            logits = model(src, tgt_input, src_mask, tgt_mask)
            logits = logits.reshape(-1, logits.size(-1))
            tgt_output = tgt_output.reshape(-1)

            loss = criterion(logits, tgt_output)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            if (i + 1) % print_every == 0:
                print(f"Epoch {epoch+1}, Step {i+1}, Loss: {loss.item():.4f}")

        print(f"[Epoch {epoch+1}] Average Loss: {total_loss / len(dataloader):.4f}")

def save_model(model, path="transformer_nmt.pth"):
    torch.save(model.state_dict(), path)


In [41]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [None]:
# Khởi tạo mô hình
model = Transformer(
    src_vocab_size=len(src_vocab),
    tgt_vocab_size=len(tgt_vocab),
    d_model=256,         
    N=3,                 
    num_heads=4,
    d_ff=1024,
    dropout=0.1
).to(device)

optimizer, criterion = create_optimizer_and_loss(model, tgt_pad_idx=tgt_vocab["<pad>"])

train_model(model, train_loader, optimizer, criterion, tgt_pad_idx=tgt_vocab["<pad>"], device=device, num_epochs=8)

save_model(model)

Using device: cuda


In [39]:
def beam_search_decode(model, src, src_vocab, tgt_vocab, beam_size=5, max_len=50, device="cuda"):
    model.eval()
    with torch.no_grad():
        src = src.unsqueeze(0).to(device)  # (1, src_len)
        src_mask = None
        enc_out = model.encoder(src, src_mask)

        # Mỗi phần tử beam: (tgt_indices, log_prob)
        beams = [([tgt_vocab["<bos>"]], 0.0)]

        for _ in range(max_len):
            new_beams = []
            for seq, score in beams:
                if seq[-1] == tgt_vocab["<eos>"]:
                    new_beams.append((seq, score))
                    continue

                tgt_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
                tgt_mask = generate_square_subsequent_mask(tgt_tensor.size(1)).to(device)

                output = model.decoder(tgt_tensor, enc_out, src_mask, tgt_mask)
                output = output[:, -1, :]  # Lấy logit của token cuối
                probs = torch.log_softmax(output, dim=-1)

                topk_probs, topk_ids = probs.topk(beam_size)

                for i in range(beam_size):
                    next_seq = seq + [topk_ids[0, i].item()]
                    next_score = score + topk_probs[0, i].item()
                    new_beams.append((next_seq, next_score))

            # Giữ lại beam_size chuỗi tốt nhất
            beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_size]

        # Chọn chuỗi có score cao nhất
        best_seq = beams[0][0]

        # Bỏ <bos> và <eos>
        return decode([idx for idx in best_seq if idx not in {tgt_vocab["<bos>"], tgt_vocab["<eos>"]}], tgt_vocab)


# Kết quả BLEU score cuối cùng

In [None]:
model = Transformer(
    src_vocab_size=len(src_vocab),
    tgt_vocab_size=len(tgt_vocab),
    d_model=256,
    N=3,
    num_heads=4,
    d_ff=1024,
    dropout=0.1
).to(device)

model.load_state_dict(torch.load("transformer_nmt.pth", map_location=device))
model.eval()

test_dataset = TranslationDataset(
    src_file="../Released Corpus/test.en.txt",
    tgt_file="../Released Corpus/test.vi.txt",
    src_tokenizer=tokenize_en,
    tgt_tokenizer=tokenize_vi
)

test_loader = DataLoader(
    test_dataset,
    batch_size=1,
    shuffle=False,
    collate_fn=lambda batch: collate_fn(batch, src_vocab, tgt_vocab)
)

In [None]:

refs = []
hyps = []

for src_batch, tgt_batch in test_loader:
    src_tokens = src_batch[0]
    tgt_tokens = tgt_batch[0]

    tgt_text = decode([idx.item() for idx in tgt_tokens if idx.item() not in {
        tgt_vocab["<pad>"], tgt_vocab["<bos>"], tgt_vocab["<eos>"]
    }], tgt_vocab)
    refs.append([tgt_text])

    pred_tokens = beam_search_decode(model, src_tokens, src_vocab, tgt_vocab, max_len=50, device=device)
    hyps.append(pred_tokens)

smoothie = SmoothingFunction().method4
bleu_score = corpus_bleu(refs, hyps, smoothing_function=smoothie)

print(f"Test BLEU score: {bleu_score*100:.2f}")


Test BLEU score: 26.74
