In [1]:
import os
import torch
import torch.nn as nn
import math
from google.colab import drive

if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

PROJECT_PATH = '/content/drive/MyDrive/NLP_Assignment_2025(phase3)'
DATA_PATH = os.path.join(PROJECT_PATH, 'data')
CHECKPOINT_PATH = os.path.join(PROJECT_PATH, 'checkpoints')
MODEL_FILE = os.path.join(CHECKPOINT_PATH, 'transformer_phase3_best.pt')

!pip install -q tokenizers sacrebleu tqdm

Mounted at /content/drive
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.1/104.1 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from tokenizers import ByteLevelBPETokenizer
from tokenizers.processors import TemplateProcessing

print("Loading BPE Tokenizers...")
try:
    src_tokenizer = ByteLevelBPETokenizer(
        os.path.join(DATA_PATH, "src_bpe-vocab.json"),
        os.path.join(DATA_PATH, "src_bpe-merges.txt")
    )
    trg_tokenizer = ByteLevelBPETokenizer(
        os.path.join(DATA_PATH, "trg_bpe-vocab.json"),
        os.path.join(DATA_PATH, "trg_bpe-merges.txt")
    )
    src_tokenizer.post_processor = TemplateProcessing(
        single="<sos> $A <eos>",
        special_tokens=[("<sos>", 1), ("<eos>", 2)],
    )
    print("Tokenizers loaded successfully!")
except Exception as e:
    print(f"Lỗi load tokenizer: {e}")

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x): return x + self.pe[:, :x.size(1)]

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.d_head = d_model // n_head
        self.n_head = n_head
        self.d_model = d_model
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)
    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]
        Q = self.w_q(query).view(batch_size, -1, self.n_head, self.d_head).permute(0, 2, 1, 3)
        K = self.w_k(key).view(batch_size, -1, self.n_head, self.d_head).permute(0, 2, 1, 3)
        V = self.w_v(value).view(batch_size, -1, self.n_head, self.d_head).permute(0, 2, 1, 3)
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / math.sqrt(self.d_head)
        if mask is not None: energy = energy.masked_fill(mask == 0, -1e9)
        attention = torch.softmax(energy, dim=-1)
        x = torch.matmul(attention, V)
        x = x.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.d_model)
        return self.fc_out(x)

class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
    def forward(self, x): return self.fc2(self.relu(self.fc1(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, src, src_mask):
        _src = self.self_attn(src, src, src, src_mask)
        src = self.norm1(src + self.dropout(_src))
        _src = self.ffn(src)
        src = self.norm2(src + self.dropout(_src))
        return src

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head)
        self.cross_attn = MultiHeadAttention(d_model, n_head)
        self.ffn = PositionwiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, trg, enc_src, trg_mask, src_mask):
        _trg = self.self_attn(trg, trg, trg, trg_mask)
        trg = self.norm1(trg + self.dropout(_trg))
        _trg = self.cross_attn(trg, enc_src, enc_src, src_mask)
        trg = self.norm2(trg + self.dropout(_trg))
        _trg = self.ffn(trg)
        trg = self.norm3(trg + self.dropout(_trg))
        return trg

class Encoder(nn.Module):
    def __init__(self, input_dim, d_model, n_layer, n_head, d_ff, dropout, max_len):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_head, d_ff, dropout) for _ in range(n_layer)])
        self.dropout = nn.Dropout(dropout)
    def forward(self, src, src_mask):
        src = self.dropout(self.pos_encoding(self.embedding(src)))
        for layer in self.layers: src = layer(src, src_mask)
        return src

class Decoder(nn.Module):
    def __init__(self, output_dim, d_model, n_layer, n_head, d_ff, dropout, max_len):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_head, d_ff, dropout) for _ in range(n_layer)])
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, trg, enc_src, trg_mask, src_mask):
        trg = self.dropout(self.pos_encoding(self.embedding(trg)))
        for layer in self.layers: trg = layer(trg, enc_src, trg_mask, src_mask)
        return self.fc_out(trg)

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, d_model=512, n_head=8, n_layer=6, d_ff=2048, dropout=0.1, max_len=100, src_pad_idx=0, trg_pad_idx=0):
        super(Transformer, self).__init__()
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.encoder = Encoder(src_vocab_size, d_model, n_layer, n_head, d_ff, dropout, max_len)
        self.decoder = Decoder(trg_vocab_size, d_model, n_layer, n_head, d_ff, dropout, max_len)
    def make_src_mask(self, src): return (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
    def make_trg_mask(self, trg):
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=trg.device)).bool()
        return trg_pad_mask & trg_sub_mask
    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)
        output = self.decoder(trg, enc_src, trg_mask, src_mask)
        return output

Loading BPE Tokenizers...
Tokenizers loaded successfully!


In [7]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PAD_IDX = 0
SOS_IDX = 1
EOS_IDX = 2

print("Loading Model Phase 3...")
input_dim = src_tokenizer.get_vocab_size()
output_dim = trg_tokenizer.get_vocab_size()

model = Transformer(input_dim, output_dim, d_model=512, n_head=8, n_layer=6, d_ff=2048, dropout=0.1, max_len=150, src_pad_idx=PAD_IDX, trg_pad_idx=PAD_IDX)

if os.path.exists(MODEL_FILE):
    model.load_state_dict(torch.load(MODEL_FILE, map_location=DEVICE))
    model.to(DEVICE)
    print("Model loaded successfully!")
else:
    print(f"Error: Model file not found at {MODEL_FILE}")

def beam_search(sentence, model, device, beam_size=3, max_len=100):
    model.eval()

    src_encoded = src_tokenizer.encode(sentence)
    src_tensor = torch.LongTensor(src_encoded.ids).unsqueeze(0).to(device)
    src_mask = model.make_src_mask(src_tensor)

    with torch.no_grad():
        enc_src = model.encoder(src_tensor, src_mask)

        beam = [([SOS_IDX], 0.0)]

        for i in range(max_len):
            candidates = []
            all_ended = True

            for seq, score in beam:
                if seq[-1] == EOS_IDX:
                    candidates.append((seq, score))
                    continue

                all_ended = False
                trg_tensor = torch.LongTensor(seq).unsqueeze(0).to(device)
                trg_mask = model.make_trg_mask(trg_tensor)

                output = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)
                prob = output[:, -1, :]
                log_prob = torch.log_softmax(prob, dim=1).squeeze(0)

                topk_prob, topk_idx = torch.topk(log_prob, beam_size)

                for j in range(beam_size):
                    token = topk_idx[j].item()
                    token_prob = topk_prob[j].item()
                    candidates.append((seq + [token], score + token_prob))

            if all_ended: break

            beam = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_size]

    best_seq = beam[0][0]
    decoded_text = trg_tokenizer.decode(best_seq, skip_special_tokens=True)
    return decoded_text

Loading Model Phase 3...
Model loaded successfully!


In [8]:
import sacrebleu
import urllib.request
import tarfile
from tqdm import tqdm
import html

TEST_DIR = os.path.join(PROJECT_PATH, 'data', 'test_2013')
if not os.path.exists(TEST_DIR): os.makedirs(TEST_DIR)
tgz_path = os.path.join(TEST_DIR, "test.tgz")

if not os.path.exists(os.path.join(TEST_DIR, 'tst2013.vi')):
    print("Downloading test data...")
    urllib.request.urlretrieve("https://github.com/stefan-it/nmt-en-vi/raw/master/data/test-2013-en-vi.tgz", tgz_path)
    with tarfile.open(tgz_path, "r:gz") as tar: tar.extractall(path=TEST_DIR)

def clean_text(text): return html.unescape(text).replace('\xa0', ' ').strip()

with open(os.path.join(TEST_DIR, 'tst2013.vi'), 'r', encoding='utf-8') as f:
    src_sents = [clean_text(line) for line in f.readlines()]
with open(os.path.join(TEST_DIR, 'tst2013.en'), 'r', encoding='utf-8') as f:
    ref_sents = [clean_text(line) for line in f.readlines()]

print(f"Evaluating Phase 3 (BPE + Beam Search) on {len(src_sents)} sentences...")

hypotheses = []
for sent in tqdm(src_sents):
    pred = beam_search(sent, model, DEVICE, beam_size=3)
    hypotheses.append(pred)

bleu = sacrebleu.corpus_bleu(hypotheses, [ref_sents], tokenize='13a')
print(f"\n{'='*40}")
print(f"FINAL BLEU SCORE (PHASE 3): {bleu.score:.2f}")
print(f"{'='*40}")

# In ví dụ
for i in range(3):
    print(f"SRC : {src_sents[i]}")
    print(f"REF : {ref_sents[i]}")
    print(f"PRED: {hypotheses[i]}")
    print("-" * 20)

Evaluating Phase 3 (BPE + Beam Search) on 1268 sentences...


100%|██████████| 1268/1268 [08:16<00:00,  2.55it/s]



FINAL BLEU SCORE (PHASE 3): 0.26
SRC : Khi tôi còn nhỏ , Tôi nghĩ rằng BắcTriều Tiên là đất nước tốt nhất trên thế giới và tôi thường hát bài " Chúng ta chẳng có gì phải ghen tị . "
REF : When I was little , I thought my country was the best on the planet , and I grew up singing a song called " Nothing To Envy . "
PRED: <sos>And I 'm going to show you a little bit of this .<eos>
--------------------
SRC : Tôi đã rất tự hào về đất nước tôi .
REF : And I was very proud .
PRED: <sos>And I 'm going to show you a little bit of this .<eos>
--------------------
SRC : Ở trường , chúng tôi dành rất nhiều thời gian để học về cuộc đời của chủ tịch Kim II- Sung , nhưng lại không học nhiều về thế giới bên ngoài , ngoại trừ việc Hoa Kỳ , Hàn Quốc và Nhật Bản là kẻ thù của chúng tôi .
REF : In school , we spent a lot of time studying the history of Kim Il-Sung , but we never learned much about the outside world , except that America , South Korea , Japan are the enemies .
PRED: <sos>And I 'm going t