<a href="https://colab.research.google.com/github/Jackson2706/Machine-Translation/blob/main/src/Machine_Translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# prompt: mount drive

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
%cd drive/MyDrive/Pet-Project/Machine-Translation

/content/drive/MyDrive/Pet-Project/Machine-Translation


# 1. Config

In [3]:
config = {
    "Train_dataset_folder_path": "data/train-en-vi"
}

#2. Dataset

In [4]:
# import libraries
!pip install langid
!pip install torchtext
import os
import re
import string
from torchtext.vocab import build_vocab_from_iterator
from  torchtext.data.utils import get_tokenizer
import torch
from torch.utils.data import Dataset
from langid.langid import LanguageIdentifier, model


Collecting langid
  Downloading langid-1.1.6.tar.gz (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: langid
  Building wheel for langid (setup.py) ... [?25l[?25hdone
  Created wheel for langid: filename=langid-1.1.6-py3-none-any.whl size=1941172 sha256=69427fe8cb48f47ddd117fa90e9f8a567b1e5b0627d0dfb21246923c511a3e92
  Stored in directory: /root/.cache/pip/wheels/23/c8/c6/eed80894918490a175677414d40bd7c851413bbe03d4856c3c
Successfully built langid
Installing collected packages: langid
Successfully installed langid-1.1.6
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.2.1->torchtext)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.2.1->torchtext)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-non

## 2.1 - Preprocess data

In [5]:
# Loại bỏ các mẫu có chứa kí tư không phải là tiếng anh
def identify_lang(en_data, vi_data, threshold: float = 0.9):
  identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)
  en_list = []
  vi_list = []
  for en_sentence, vi_sentence in zip(en_data, vi_data):
    en_score = identifier.classify(en_sentence)
    vi_score = identifier.classify(vi_sentence)
    if (en_score[0] == 'en' and en_score[1] >= threshold) and (vi_score[0] == 'vi' and vi_score[1] >= threshold):
      en_list.append(en_sentence)
      vi_list.append(vi_sentence)
  return en_list, vi_list

In [6]:
# Tiền xử lý data
def preprocessing_text(text: str):
    """
    Preprocesses text by removing special patterns, punctuation, digits, and emojis.

    Args:
        text (str): The input text to be preprocessed.

    Returns:
        str: Clean text containing only Vietnamese characters.
    """
    # Define the URL pattern
    url_pattern = re.compile(r"https?://\s+\www\.\s+")
    # Replace URLs with whitespace
    text = url_pattern.sub(r" ", text)

    # Define the HTML pattern
    html_pattern = re.compile(r"<[^<>]+>")
    # Replace HTML patterns with whitespace
    text = html_pattern.sub(" ", text)

    # Define punctuation and digits pattern
    replace_chars = list(string.punctuation + string.digits)
    # Replace punctuation and digits with whitespace
    for char in replace_chars:
        text = text.replace(char, " ")

    # Define the emoji pattern
    emoji_pattern = re.compile(
        "["
        "\U0001F600-\U0001F64F"  # emoticons
        "\U0001F300-\U0001F5FF"  # symbols & pictographs
        "\U0001F680-\U0001F6FF"  # transport & map symbols
        "\U0001F1E0-\U0001F1FF"  # flags (iOS)
        "\U0001F1F2-\U0001F1F4"  # Macau flag
        "\U0001F1E6-\U0001F1FF"  # flags
        "\U0001F600-\U0001F64F"
        "\U00002702-\U000027B0"
        "\U000024C2-\U0001F251"
        "\U0001f926-\U0001f937"
        "\U0001F1F2"
        "\U0001F1F4"
        "\U0001F620"
        "\u200d"
        "\u2640-\u2642"
        "]+",
        flags=re.UNICODE,
    )
    # Replace emojis with whitespace
    text = emoji_pattern.sub(r" ", text)

    # Remove duplicate whitespace
    text = re.sub(r'\s+', ' ', text)

    # Return lowercase text
    return text.lower()

def preprocess(data_list):
  process_data_list = []
  for sentence in data_list:
    process_data_list.append(preprocessing_text(sentence))
  return process_data_list

## 2.2 - Tokenizer and create vocab

In [7]:
def yield_tokens(sentences, tokenizer):
  for sentence in sentences:
    yield tokenizer(sentence)

In [8]:

def build_vocabulary(sentences, tokenizer):
  vocabulary = build_vocab_from_iterator(yield_tokens(sentences, tokenizer),  specials=["<unk>", "<pad>", "<sos>", "<eos>"])
  vocabulary.set_default_index(vocabulary["<unk>"])
  return vocabulary

## 2.3 - Test

In [9]:
en_list = []
with open("data/train-en-vi/train.en") as file:
  for sentence in file:
    en_list.append(sentence)

vi_list = []
with open("data/train-en-vi/train.vi") as file:
  for sentence in file:
    vi_list.append(sentence)



In [10]:
clean_en_list, clean_vi_list = identify_lang(en_list, vi_list, 0.9)
print(f"Dữ liệu data ban đầu: {len(en_list)} - {len(vi_list)}")
print(f"Dũ liệu data sau khi xủ lý: {len(clean_en_list)} - {len(clean_vi_list)}")

Dữ liệu data ban đầu: 133317 - 133317
Dũ liệu data sau khi xủ lý: 128999 - 128999


In [11]:
print(f"{clean_en_list[56]} - {clean_vi_list[56]}")

clean_en_data = preprocess(clean_en_list)
clean_vi_data = preprocess(clean_vi_list)
print(f"{clean_en_data[56]} - {clean_vi_data[56]}")


You can program the hundreds of muscles in your arm .
 - Bạn có thể lập trình cho hàng trăm cơ bắp trong cánh tay .

you can program the hundreds of muscles in your arm  - bạn có thể lập trình cho hàng trăm cơ bắp trong cánh tay 


In [12]:
en_tokenizer = get_tokenizer('basic_english')
en_vocab = build_vocabulary(clean_en_data, en_tokenizer)
print(f"English vocab: {en_vocab.get_stoi()}")

vi_tokenizer = get_tokenizer('basic_english')
vi_vocab = build_vocabulary(clean_vi_data, vi_tokenizer)
print(f"Vietnamese vocab: {vi_vocab.get_stoi()}")


Vietnamese vocab: {'€': 18408, 'ỳ': 18407, 'ợng': 18400, 'ởi': 18399, 'ộc': 18394, 'ổi': 18391, 'ồng': 18390, 'ồi': 18388, 'ếy': 18382, 'ắcqui': 18377, 'ậm': 18375, 'ần': 18373, 'ầ': 18372, '́': 18368, 'ưự': 18367, 'ơởi': 18362, 'đụ': 18355, 'độngmà': 18352, 'đẹo': 18345, 'đầ': 18340, 'đượng': 18338, 'đượcnén': 18335, 'đượ': 18333, 'đưỡ': 18332, 'đưpực': 18329, 'đĩ': 18323, 'đôii': 18320, 'đôc': 18319, 'đóc': 18317, 'đêly': 18311, 'đèo': 18310, 'đãkhông': 18308, 'đâytrước': 18307, 'đán': 18306, 'đuờng': 18305, 'đps': 18303, 'đoối': 18302, 'điốt': 18298, 'điệnoíhoại': 18297, 'điểmvà': 18296, 'điôxít': 18293, 'điêng': 18292, 'đóii': 18318, 'đia': 18287, 'ănglo': 18280, 'ùm': 18278, 'õng': 18277, 'ôột': 18276, 'óố': 18272, 'ãu': 18265, 'â': 18264, 'ành': 18261, '®': 18260, 'zơ': 18259, 'zé': 18258, 'zzzz': 18256, 'zywiec': 18253, 'zyprexa': 18252, 'zuritch': 18248, 'zone': 18241, 'zolli': 18240, 'zoilo': 18239, 'zine': 18236, 'zimmer': 18235, 'zhuzhou': 18233, 'zhou': 18232, 'zhong': 1823

## 2.4 - Create dataset

In [13]:
class EnglishVietNamDataset(Dataset):
  def __init__(self, data_folder, phase, en_tokenizer, vi_tokenizer ,max_sequence_length, threshold, en2vi = True):
    en_data_file_path = os.path.join(data_folder, f"{phase}.en")
    vi_data_file_path = os.path.join(data_folder, f"{phase}.vi")

    raw_en_data = []
    raw_vi_data = []
    with open(en_data_file_path, "r") as file:
      for line in file:
        raw_en_data.append(line)

    with open(vi_data_file_path, "r") as file:
      for line in file:
        raw_vi_data.append(line)
    clean_en_data, clean_vi_data = identify_lang(raw_en_data, raw_vi_data, threshold)
    self.input_en_data = preprocess(clean_en_data)
    self.input_vi_data = preprocess(clean_vi_data)
    self.en_vocab = build_vocabulary(self.input_en_data, en_tokenizer)
    self.vi_vocab = build_vocabulary(self.input_vi_data, vi_tokenizer)
    self.en_tokenizer = en_tokenizer
    self.vi_tokenizer = vi_tokenizer
    self.max_sequence_length = max_sequence_length
    self.en2vi = en2vi

  def __len__(self):
    return len(self.input_en_data)

  def __getitem__(self, index):
    en_data = self.input_en_data[index]
    vi_data = self.input_vi_data[index]
    if self.en2vi:
      en_vectorize_data = self._vectorize(en_data, self.en_tokenizer, self.en_vocab, self.max_sequence_length)
      vi_vectorize_data = self._vectorize(vi_data, self.vi_tokenizer, self.vi_vocab, self.max_sequence_length, True)
      en_tensor = torch.tensor(en_vectorize_data, dtype=torch.long)
      vi_tensor = torch.tensor(vi_vectorize_data, dtype=torch.long)
      return en_tensor, vi_tensor
    else:
      en_vectorize_data = self._vectorize(en_data, self.en_tokenizer, self.en_vocab, self.max_sequence_length, True)
      vi_vectorize_data = self._vectorize(vi_data, self.vi_tokenizer, self.vi_vocab, self.max_sequence_length)
      en_tensor = torch.tensor(en_vectorize_data, dtype=torch.long)
      vi_tensor = torch.tensor(vi_vectorize_data, dtype=torch.long)
      return en_tensor, vi_tensor

  def _vectorize(self, text, tokenizer, vocab, max_sequence_length, add_sos = False):
    tokens = tokenizer(text)
    tokens = [vocab[token] for token in tokens] + [vocab["<eos>"]]
    if add_sos:
      tokens = [vocab["<sos>"]] + tokens
    token_ids = tokens[:max_sequence_length] + [vocab["<pad>"]] * max((max_sequence_length - len(tokens)), 0)
    return token_ids



In [None]:
# Test

en_tokenizer = get_tokenizer('basic_english')
vi_tokenizer = get_tokenizer('basic_english')

train_dataset = EnglishVietNamDataset(data_folder="data/train-en-vi",
                                      phase="train",
                                      en_tokenizer=en_tokenizer,
                                      vi_tokenizer=vi_tokenizer,
                                      max_sequence_length=50,
                                      threshold=0.95,
                                      en2vi=True)

print(train_dataset[56])

# Model


## 3.1 - RNN based model

In [None]:
from torch import nn

In [None]:
class RNNEncoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(RNNEncoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True)

    def forward(self, src):
        embeddings = self.embedding(src)
        outputs, hidden = self.rnn(embeddings)
        return outputs, hidden

In [None]:
class RNNDecoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(RNNDecoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)

    def forward(self, input, context, hidden):
        # Ensure hidden state has the correct shape
        hidden = hidden.unsqueeze(0)  # Add an extra layer dimension
        embeddings = self.embedding(input)
        output, hidden = self.rnn(embeddings, hidden.squeeze(0))
        prediction = self.fc_out(output)
        return prediction, hidden.squeeze(0)  # Remove the extra layer dimension

In [None]:
class RNN_Seq2Seq_Model(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, sequence_en, sequence_vn):
        outputs, hidden = self.encoder(sequence_en)
        decoder_outputs, _ = self.decoder(sequence_vn, outputs, hidden)
        return decoder_outputs

In [None]:
en_input, vi_input = train_dataset[6]
rnn_encoder = RNNEncoder(len(en_vocab), 512, 256)
rnn_decoder = RNNDecoder(len(vi_vocab), 512, 256)
model = RNN_Seq2Seq_Model(rnn_encoder, rnn_decoder)

output = model(en_input, vi_input)
print(en_input.size())
print(output.size())

## 3.2 - Transformer base

In [None]:
class Transformer_Encoder(nn.Module):
    def __init__(self, vocab_size_en, embedding_dim, model_dim, nhead):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size_en, embedding_dim)
        self.transformer_encoder = nn.TransformerEncoderLayer(d_model=model_dim,
                                                              nhead=nhead,
                                                              dim_feedforward=6,
                                                              dropout=0.0,
                                                              batch_first=True)

    # src = [batch_size, seq_length]
    def forward(self, src):
        embedded = self.embedding(src)                # [batch_size, seq_length, d]
        context = self.transformer_encoder(embedded)  # [batch_size, seq_length, d]
        return context

In [None]:
class Transformer_Decoder(nn.Module):
    def __init__(self, vocab_size_vn, embedding_dim, model_dim, nhead, sequence_length_vn):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size_vn, embedding_dim)
        self.mask = torch.triu(torch.ones(sequence_length_vn, sequence_length_vn), diagonal=1).bool()
        self.transformer_decoder = nn.TransformerDecoderLayer(d_model=model_dim,
                                                              nhead=nhead,
                                                              dim_feedforward=6,
                                                              dropout=0.0,
                                                              batch_first=True)
        self.fc_out = nn.Linear(model_dim, vocab_size_vn)

    # input: [batch_size, seq_length_vn]
    # context: [batch_size, seq_length_en, d]
    def forward(self, input, context):
        embedded = self.embedding(input)                                           # [batch_size, seq_length_vn, d]
        output = self.transformer_decoder(embedded, context, tgt_mask=self.mask)   # [batch_size, seq_length_vn, d]
        prediction = self.fc_out(output)                                           # [batch_size, seq_length_vn, vocab_size_vn]

        return prediction.unsqueeze(1)                                 # [batch_size, vocab_size_vn, seq_length_vn]

In [None]:
class Seq2Seq_Model(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, sequence_en, sequence_vn):
        context = self.encoder(sequence_en)
        outputs = self.decoder(sequence_vn, context)
        return outputs

# Training

## 4.1 - Dataloader

In [None]:
from torch.utils.data import DataLoader

In [None]:
train_loader = DataLoader(train_dataset, 2, shuffle=True, drop_last=True)

## 4.2 - Optimize & Loss function

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.05)

## 4.3 - Utils funtions

In [None]:
import nltk
from nltk.translate.bleu_score import corpus_bleu

In [None]:
def convert_to_sentences(outputs, vocab):
    """
    Convert model outputs (tensor) to list of sentences (strings).

    Args:
        outputs (torch.Tensor): Model outputs tensor.

    Returns:
        list of str: List of sentences.
    """
    # Giả định outputs là tensor kích thước [batch_size, seq_length, vocab_size]
    # Chọn từ có giá trị max trong từng batch và chuyển đổi thành từ trong từ điển
    _, predicted_indices = torch.max(outputs, dim=2)

    # Chuyển đổi indices sang list of sentences
    predicted_sentences = []
    for indices in predicted_indices:
        sentence = ' '.join([vocab.itos[idx.item()] for idx in indices])
        predicted_sentences.append(sentence)

    return predicted_sentences


In [None]:
def calculate_bleu_score(reference_corpus, translation_corpus):
    tokenizer = nltk.tokenize.WordPunctTokenizer()
    references = [[tokenizer.tokenize(ref)] for ref in reference_corpus]
    translations = [tokenizer.tokenize(translation) for translation in translation_corpus]

    return corpus_bleu(references, translations)

## 4.4 - Training


In [None]:
for epoch in range(50):
    model.train()
    total_loss = 0

    for batch_idx, (en_input_batch, vi_input_batch) in enumerate(train_loader):
        optimizer.zero_grad()

        # Forward pass
        output = model(en_input_batch, vi_input_batch)

        # Compute loss
        loss = criterion(output.view(-1, output.size(-1)), vi_input_batch.view(-1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if (batch_idx + 1) % 5 == 0:
            avg_loss = total_loss / 50
            print(f"Epoch [{epoch + 1}/{50}], Step [{batch_idx + 1}/{len(train_loader)}], Loss: {avg_loss:.4f}")
            total_loss = 0