In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
!pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.2-py3-none-any.whl (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-2.20.0-py3-none-any.whl (547 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m547.8/547.8 kB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
Collecting dill (from evaluate)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from evaluate)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K     [

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

import evaluate
from transformers import AutoTokenizer

from sklearn.model_selection import train_test_split
from tqdm import tqdm
import math

tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")

if tokenizer.bos_token is None:
    tokenizer.bos_token = '<bos>'
    tokenizer.bos_token_id = tokenizer.convert_tokens_to_ids('<bos>')

if tokenizer.eos_token is None:
    tokenizer.eos_token = '<eos>'
    tokenizer.eos_token_id = tokenizer.convert_tokens_to_ids('<eos>')

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

        output = self.W_o(self.combine_heads(attn_output))
        return output

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.d_model = d_model
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != tokenizer.pad_token_id).unsqueeze(1).unsqueeze(2)

        tgt_sub_mask = torch.tril(torch.ones((tgt.size(1), tgt.size(1)), device=tgt.device)).bool()
        tgt_mask = tgt_mask & tgt_sub_mask.unsqueeze(0).unsqueeze(0)

        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)

        src = self.encoder_embedding(src) * math.sqrt(self.d_model)
        src = self.dropout(self.positional_encoding(src))

        for layer in self.encoder_layers:
            src = layer(src, src_mask)

        tgt = self.decoder_embedding(tgt) * math.sqrt(self.d_model)
        tgt = self.dropout(self.positional_encoding(tgt))

        for layer in self.decoder_layers:
            tgt = layer(tgt, src, src_mask, tgt_mask)

        output = self.fc(tgt)
        return output


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/557 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/895k [00:00<?, ?B/s]

bpe.codes:   0%|          | 0.00/1.14M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/3.13M [00:00<?, ?B/s]

In [5]:
class TranslationDataset(Dataset):
    def __init__(self, src_texts, tgt_texts, tokenizer, max_length=64):
        self.src_texts = src_texts
        self.tgt_texts = tgt_texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src_text = self.src_texts[idx]
        tgt_text = self.tgt_texts[idx]

        src_encoding = self.tokenizer.encode_plus(src_text, return_tensors="pt", padding="max_length", max_length=self.max_length, truncation=True)
        tgt_encoding = self.tokenizer.encode_plus(tgt_text, return_tensors="pt", padding="max_length", max_length=self.max_length, truncation=True)

        src_input_ids = src_encoding['input_ids'].squeeze()
        tgt_input_ids = tgt_encoding['input_ids'].squeeze()

        return src_input_ids, tgt_input_ids

def collate_fn(batch):
    src_batch = [item[0] for item in batch]
    tgt_batch = [item[1] for item in batch]
    src_batch = pad_sequence(src_batch, batch_first=True, padding_value=tokenizer.pad_token_id)
    tgt_batch = pad_sequence(tgt_batch, batch_first=True, padding_value=tokenizer.pad_token_id)
    return src_batch, tgt_batch

def loadData(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    return [line.strip() for line in lines]

vi_sentences = loadData("/content/drive/MyDrive/BTL Deep Learning/Transformer/data/tst2013.vi")
en_sentences = loadData("/content/drive/MyDrive/BTL Deep Learning/Transformer/data/tst2013.en")

test_dataset = TranslationDataset(en_sentences, vi_sentences, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=64, collate_fn=collate_fn)


In [6]:
test_dataset.__getitem__(4)

(tensor([    0, 13085,  2052,  1555, 26431,  3657,  1946, 10081, 18728, 20935,
             4,  1555, 59190, 10601, 45428, 56213,  1358, 54356,  6107,     4,
         30045,  1555,  5642,  1881,  9842, 10601, 21022,   858, 30895, 34359,
         26431, 17501, 27817,     5,     2,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
             1,     1,     1,     1]),
 tensor([   0,  251,   70,   72,  317,    4,   70, 3011, 5603,  805,   18,  237,
         3224,  943,  675, 1284,  101,  127, 6937,   12,  733,    4,   51,   70,
           74,  487,  110,  235,    7,   68,   25,   97,    8, 4456,  668, 2404,
          311,    5,    2,    1,    1,    1,    1,    1,    1,    1,    1,    1,
            1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,    1,
            1,    1,    1,    1]))

In [7]:
test_dataset.src_texts[20]

'As you can see , the river can be very narrow at certain points , allowing North Koreans to secretly cross .'

In [8]:
src_vocab_size = tokenizer.vocab_size
tgt_vocab_size = tokenizer.vocab_size
d_model = 512
num_heads = 8
num_layers = 4
d_ff = 2048
max_seq_length = 64
dropout = 0.1

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout).to(device)
model.load_state_dict(torch.load('/content/drive/MyDrive/BTL Deep Learning/Transformer/model/trans.pth'))

cuda


<All keys matched successfully>

In [9]:
def translate_sentence(model, sentence, tokenizer, max_length=64):
    model.eval()
    with torch.no_grad():
        encoded_input = tokenizer.encode_plus(sentence, return_tensors="pt", padding="max_length", max_length=max_length, truncation=True)
        src_tensor = encoded_input['input_ids'].to(device)

        tgt_tokens = [tokenizer.bos_token_id]  # Use BOS token instead of CLS token
        for i in range(max_length):
            tgt_tensor = torch.tensor([tgt_tokens], dtype=torch.long).to(device)
            output = model(src_tensor, tgt_tensor)
            output_token = output[0, -1, :].argmax(dim=-1).item()
            if output_token == tokenizer.eos_token_id:
                break
            tgt_tokens.append(output_token)

        translation = tokenizer.decode(tgt_tokens, skip_special_tokens=True)
        return translation

In [10]:
sen = "As you can see , the river can be very narrow at certain points , allowing North Koreans to secretly cross ."
translate_sentence(model, sen, tokenizer)

'Như các bạn có thể thấy, những người di cư có quyền ưu tiên, có thể vượt quá mức nào đó, có thể vượt qua được.'

In [11]:
translations = []

for sen in test_dataset.src_texts:
  translations.append(translate_sentence(model, sen, tokenizer))

In [19]:
tgt = []
for i in range(len(translations)):
  a = tokenizer.decode(test_dataset.__getitem__(i)[1], skip_special_tokens=True)
  tgt.append([a])

In [20]:
translations[4]

'Khi tôi 7 tuổi, tôi đã thấy nhà quản lý công cộng đầu tiên của mình, nhưng tôi nghĩ rằng cuộc sống bình thường của tôi là Bắc Triều Tiên'

In [21]:
tgt[4]

['Khi tôi lên 7, tôi chứng kiến cảnh người ta xử bắn công khai lần đầu tiên trong đời, nhưng tôi vẫn nghĩ cuộc sống của mình ở đây là hoàn toàn bình thường.']

In [22]:
bleu = evaluate.load("bleu")
bleu.compute(predictions=translations, references=tgt, )

{'bleu': 0.12205254676226655,
 'precisions': [0.45458765279282065,
  0.18568621766998422,
  0.078007992209275,
  0.03489881098523377],
 'brevity_penalty': 0.9913113686781851,
 'length_ratio': 0.9913488971377734,
 'translation_length': 32315,
 'reference_length': 32597}