<a href="https://colab.research.google.com/github/Zerldas/Translate-EN-to-FR-Project/blob/main/NLP_Project_Translate_EN_to_FR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!python3 --version

**Kiểm tra và tải về các thư viện cần thiết trước khi thực hiện project**

In [None]:
# Tải về các thư viện cần thiết trước khi thực hiện project
!pip list

In [None]:
!pip install spacy

!python -m spacy download en_core_web_sm
!python -m spacy download fr_core_news_sm

***Danh sách các thư viện sẽ được sử dụng***

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
import spacy
import random
import torch.nn.functional as F
from wordcloud import WordCloud
from collections import Counter
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from torch.nn.utils.rnn import pad_sequence
from collections import Counter

Thiết lập GPU

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Device:", DEVICE)

Thiết lại tokenizer

In [None]:
tokenize_en = spacy.load("en_core_web_sm")
tokenize_fr = spacy.load("fr_core_news_sm")

In [None]:
def tokenizer_en(sentence):
  return [token.text.lower()
          for token in tokenize_en(sentence)
          if token.text.strip()]

def tokenizer_fr(sentence):
  return [token.text.lower()
          for token in tokenize_fr(sentence)
          if token.text.strip()]

***Tiến hành phân tích dữ liệu***

In [None]:
# Khai báo dường đẫn dữ liệu train
train_en_path = "/content/drive/MyDrive/EN-FR/data/train/train.en"
train_fr_path = "/content/drive/MyDrive/EN-FR/data/train/train.fr"
# Dữ liệu valiation
val_en_path = "/content/drive/MyDrive/EN-FR/data/val/val.en"
val_fr_path = "/content/drive/MyDrive/EN-FR/data/val/val.fr"
# Dữ liệu test sử dụng tạp test 2016 flick
test_en_path = "/content/drive/MyDrive/EN-FR/data/test/test_2016_flickr.en"
test_fr_path = "/content/drive/MyDrive/EN-FR/data/test/test_2016_flickr.fr"

In [None]:
def load_data(en_path, fr_path):
    with open(en_path, encoding="utf-8") as f_en:
        en_lines = [line.strip() for line in f_en]

    with open(fr_path, encoding="utf-8") as f_fr:
        fr_lines = [line.strip() for line in f_fr]

    assert len(en_lines) == len(fr_lines), "EN-FR line count mismatch!"
    return en_lines, fr_lines

In [None]:
train_en_lines, train_fr_lines = load_data(train_en_path, train_fr_path)
val_en_lines, val_fr_lines = load_data(val_en_path, val_fr_path)
test_en_lines, test_fr_lines = load_data(test_en_path, test_fr_path)

print("=== TẬP TRAIN ===")
print(f"Tiếng Anh: {len(train_en_lines)}")
print(f"Tiếng Pháp: {len(train_fr_lines)}")
print(f"Tổng: {len(train_en_lines) + len(train_fr_lines)}\n")

print("=== TẬP VALIDATION ===")
print(f"Tiếng Anh: {len(val_en_lines)}")
print(f"Tiếng Pháp: {len(val_fr_lines)}")
print(f"Tổng: {len(val_en_lines) + len(val_fr_lines)}\n")

print("=== TẬP TEST ===")
print(f"Tiếng Anh: {len(test_en_lines)}")
print(f"Tiếng Pháp: {len(test_fr_lines)}")
print(f"Tổng: {len(test_en_lines) + len(test_fr_lines)}")

In [None]:
# Xem xét độ dài câu (token)
def get_lengths(sentences, tokenizer):
    return [len(tokenizer(s)) for s in sentences]

train_len_en = get_lengths(train_en_lines, tokenize_en)
train_len_fr = get_lengths(train_fr_lines, tokenize_fr)

print(train_len_en)
print(train_len_fr)

In [None]:
# Biểu đồ xem xét độ dài câu
plt.figure(figsize=(8,8))
plt.scatter(train_len_en, train_len_fr, s=8, alpha=0.4)
plt.xlabel("EN Length")
plt.ylabel("FR Length")
plt.title("Sentence Length Correlation (EN vs FR)")
plt.show()

In [None]:
en_text = " ".join(train_en_lines)
fr_text = " ".join(train_fr_lines)

wordcloud_en = WordCloud(width=1000, height=600, background_color='white').generate(en_text)
wordcloud_fr = WordCloud(width=1000, height=600, background_color='white').generate(fr_text)

plt.figure(figsize=(12,6))
plt.imshow(wordcloud_en, interpolation='bilinear')
plt.axis("off")
plt.title("English Word Cloud")
plt.show()

plt.figure(figsize=(12,6))
plt.imshow(wordcloud_fr, interpolation='bilinear')
plt.axis("off")
plt.title("French Word Cloud")
plt.show()

Xây dựng vocabulary

In [None]:
index_token = [
  "<pad>", #Padding giúp câu có cùng độ dài
  "<unk>", #Unknow đánh dấu từ không có nghĩa
  "<sos>", #Từ ở đầu câu
  "<eos>"  #TỪ ở cuối câu
]

PAD_IDX = index_token.index("<pad>")   # 0
UNK_IDX = index_token.index("<unk>")   # 1
SOS_IDX = index_token.index("<sos>")   # 2
EOS_IDX = index_token.index("<eos>")   # 3

In [None]:
def build_vocab(sentences, tokenizer, max_size=10000000):
  counter = Counter()

  for sen in sentences:
    tokens = tokenizer(sen)
    counter.update(tokens)

  most_common = counter.most_common(max_size - 4)
  # Danh sách từ
  idx_token = index_token + [word for word, _ in most_common]
  # map token → id
  token_to_index = {token: idx for idx, token in enumerate(idx_token)}

  return token_to_index, index_token, counter

In [None]:
# Build vocab
vocab_en, index_en, counter_en = build_vocab(train_en_lines, tokenize_en, max_size=10000)
vocab_fr, index_fr, counter_fr = build_vocab(train_fr_lines, tokenize_fr, max_size=10000)

print("EN vocab size:", len(vocab_en))
print("FR vocab size:", len(vocab_fr))

In [None]:
# Đưa vocab về dạng số
def numericalize(sentence, tokenizer, vocab):
  tokens = ["<sos>"] + tokenizer(sentence) + ["<eos>"]
  return [vocab.get(tok, vocab["<unk>"]) for tok in tokens]

In [None]:
class Translate_Dataset(torch.utils.data.Dataset):
  def __init__(self, en_sentences, fr_sentences):
    self.en_sentences = en_sentences
    self.fr_sentences = fr_sentences

  def __len__(self):
    return len(self.en_sentences)

  def __getitem__(self, index):
    en_sentence = self.en_sentences[index]
    fr_sentence = self.fr_sentences[index]

    # chuyển sang dạng số
    source_ids = numericalize(en_sentence, tokenizer_en, vocab_en)
    target_ids = numericalize(fr_sentence, tokenizer_fr, vocab_fr)

    # độ dài câu nguồn để pack_padded_sequence
    length = len(source_ids)

    return torch.tensor(source_ids), torch.tensor(target_ids), length

In [None]:
def collate_fn(batch):
  source_batch, target_batch, source_lengths = zip(*batch)
  source_lengths = list(source_lengths)
  sorted_indices = sorted(range(len(source_lengths)), key=lambda i: -source_lengths[i])

  source_batch = [source_batch[i] for i in sorted_indices]
  target_batch = [target_batch[i] for i in sorted_indices]
  source_lengths = [source_lengths[i] for i in sorted_indices]

  source_batch = pad_sequence(
      source_batch, batch_first=True, padding_value=PAD_IDX
  )

  target_batch = pad_sequence(
      target_batch, batch_first=True, padding_value=PAD_IDX
  )

  # convert source_lengths to tensor
  source_lengths = torch.tensor(source_lengths, dtype=torch.long)

  return source_batch, target_batch, source_lengths

In [None]:
# Tạo DataLoaders
train = Translate_Dataset(train_en_lines, train_fr_lines)
val = Translate_Dataset(val_en_lines, val_fr_lines)
test = Translate_Dataset(test_en_lines, test_fr_lines)

train_loader = torch.utils.data.DataLoader(train, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = torch.utils.data.DataLoader(val, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test, batch_size=32 ,shuffle=False, collate_fn=collate_fn)

Encoder

In [None]:
class Encoder(nn.Module):
  def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embed_size)

    self.lstm = nn.LSTM(
      embed_size,
      hidden_size,
      num_layers=num_layers,
      dropout=dropout,
      batch_first=True
    )

  def forward(self, src, lengths):
    embedded = self.embedding(src)

    packed = nn.utils.rnn.pack_padded_sequence(
      embedded, lengths.cpu(), batch_first=True, enforce_sorted=True
    )

    outputs, (hidden, cell) = self.lstm(packed)

    return hidden, cell

Decoder

In [None]:
class Decoder(nn.Module):
  def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout, pad_idx):
    super().__init__()

    self.embedding = nn.Embedding(
      vocab_size,
      embed_size,
      padding_idx=pad_idx
    )

    self.lstm = nn.LSTM(
      embed_size,
      hidden_size,
      num_layers=num_layers,
      dropout=dropout,
      batch_first=True
    )

    self.fc_out = nn.Linear(hidden_size, vocab_size)

  def forward(self, token, hidden, cell):
    token = token.unsqueeze(1)
    embedded = self.embedding(token)
    output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
    prediction = self.fc_out(output.squeeze(1))

    return prediction, hidden, cell

Seq2Seq

In [None]:
class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder, pad_index):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.pad_index = pad_index

  def forward(self, source, target, source_lengths, teacher_forcing_ratio=0.5):
    batch_size, target_sequence_length = target.size()
    vocabulary_size = self.decoder.fc_out.out_features
    device = source.device

    # output_tensor = (batch_size, target_len, vocab_size)
    outputs = torch.zeros(batch_size, target_sequence_length, vocabulary_size, device=device)

    # Encoder returns: final_hidden_state, final_cell_state
    hidden_state, cell_state = self.encoder(source, source_lengths)

    # First input to decoder is always <sos>
    decoder_input = target[:, 0]

    for timestep in range(1, target_sequence_length):

      decoder_output, hidden_state, cell_state = self.decoder(
          decoder_input, hidden_state, cell_state
      )

      outputs[:, timestep] = decoder_output

      use_teacher_forcing = random.random() < teacher_forcing_ratio
      predicted_token = decoder_output.argmax(1)

      decoder_input = target[:, timestep] if use_teacher_forcing else predicted_token

    return outputs

Build Model

In [None]:
# Khai báo thông số model
INPUT_SIZE = len(vocab_en)
OUTPUT_SIZE = len(vocab_fr)
ENC_EMB_SIZE = 256
DEC_EMB_SIZE = 256
HID_SIZE = 512
NUM_LAYERS = 2
DROPOUT = 0.5
BATCH_SIZE = 32
N_EPOCHS = 10
LEARNING_RATE = 0.001

In [None]:
#  Thiết lập Early Stopping
class EarlyStopping:
  def __init__(self, patience=3, min_delta=0):
    self.patience = patience
    self.min_delta = min_delta
    self.best_loss = float('inf')
    self.counter = 0
    self.should_stop = False

  def __call__(self, val_loss):
    # Nếu cải thiện
    if val_loss < self.best_loss - self.min_delta:
      self.best_loss = val_loss
      self.counter = 0
    else:
      self.counter += 1
      print(f"No improvement ({self.counter}/{self.patience})")

      if self.counter >= self.patience:
        self.should_stop = True
        print("\n Early Stopping Triggered!")

In [None]:
encoder = Encoder(INPUT_SIZE, ENC_EMB_SIZE, HID_SIZE, NUM_LAYERS, DROPOUT).to(DEVICE)
decoder = Decoder(OUTPUT_SIZE, DEC_EMB_SIZE, HID_SIZE, NUM_LAYERS, DROPOUT, PAD_IDX).to(DEVICE)
model = Seq2Seq(encoder, decoder, DEVICE).to(DEVICE)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
  optimizer,
  mode='min',
  factor=0.5,
  patience=2,
  threshold=1e-4,
  cooldown=0,
  min_lr=1e-6,
)

Training Loop

In [None]:
early_stopping = EarlyStopping(patience=3, min_delta=0.001)

def train_epoch():
  model.train()
  total_loss = 0

  for source, target, source_lengths in train_loader:
    source = source.to(DEVICE)
    target = target.to(DEVICE)
    source_lengths = source_lengths.to(DEVICE)

    optimizer.zero_grad()

    output = model(source, target, source_lengths)

    # Bỏ <sos> trong cả output và target
    output = output[:, 1:].contiguous().view(-1, output.size(-1))
    target = target[:, 1:].contiguous().view(-1)

    loss = criterion(output, target)
    loss.backward()

    # gradient clipping
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1)

    optimizer.step()

    total_loss += loss.item()

  return total_loss / len(train_loader)

In [None]:
# Tính Valid
def eval_epoch():
  model.eval()
  total_loss = 0

  with torch.no_grad():
    for source, target, source_lengths in val_loader:
      source = source.to(DEVICE)
      target = target.to(DEVICE)
      source_lengths = source_lengths.to(DEVICE)

      output = model(
        source,
        target,
        source_lengths,
        teacher_forcing_ratio=0.0
      )

      output = output[:, 1:].contiguous().view(-1, output.size(-1))
      target = target[:, 1:].contiguous().view(-1)

      loss = criterion(output, target)
      total_loss += loss.item()

  return total_loss / len(val_loader)

In [None]:
# Train model
early_stopping = EarlyStopping(patience=3, min_delta=0.001)
train_losses = []
val_losses = []

for epoch in range(N_EPOCHS):
  train_loss = train_epoch()
  val_loss = eval_epoch()

  train_losses.append(train_loss)
  val_losses.append(val_loss)

  print(f"\nEpoch {epoch}/{NUM_EPOCHS}")
  print(f"Train Loss: {train_loss:.4f}")
  print(f"Val Loss: {val_loss:.4f}")

  # Early stopping kiểm tra val_loss
  early_stopping(val_loss)
  if early_stopping.should_stop:
    print(f"\n EARLY STOPPING IN {epoch}")
    break

# Lưu mô hình
torch.save(model.state_dict(), "best_model.pth")

In [None]:
def plot_losses(train_losses, val_losses):
  plt.figure(figsize=(8, 5))
  plt.plot(train_losses, label="Train Loss")
  plt.plot(val_losses, label="Validation Loss")
  plt.xlabel("Epoch")
  plt.ylabel("Loss")
  plt.title("Training & Validation Loss")
  plt.legend()
  plt.grid(True)
  plt.show()

plot_losses(train_losses, val_losses)

In [None]:
# Hàm translate
def translate(sentence):
  model.eval()
  source_ids = numericalize(sentence, tokenizer_en, vocab_en)
  source_tensor = torch.tensor(source_ids, dtype=torch.long).unsqueeze(0).to(DEVICE)
  source_lengths = [source_tensor.size(1)]
  idx_to_fr = list(vocab_fr.keys())

  with torch.no_grad():
    hidden, cell = model.encoder(source_tensor, source_lengths)
    token = torch.tensor([SOS_IDX], dtype=torch.long, device=DEVICE)

    result_tokens = []

    for _ in range(50):
      output, hidden, cell = model.decoder(token, hidden, cell)

      next_token = output.argmax(1)
      idx = next_token.item()

      if idx == EOS_IDX:
          break

      result_tokens.append(idx_to_fr[idx])
      token = next_token

  return " ".join(result_tokens)

In [None]:
# Hàm tính BLEU
def compute_bleu(n=100):
  smoothie = SmoothingFunction().method4
  scores = []

  for i in range(n):
    pred_sentence = translate(test_en_lines[i]).lower()
    pred_tokens = pred_sentence.split()

    ref_tokens = tokenizer_fr(test_fr_lines[i].lower())

    bleu = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smoothie)
    scores.append(bleu)

  return sum(scores) / len(scores)

print("BLEU:", compute_bleu(100))

In [None]:
# Dịch thử năm câu
for i in range(5):
  print("EN :", test_en_lines[i])
  print("PRED:", translate(test_en_lines[i]))
  print("REF :", test_fr_lines[i])
  print()

In [None]:
!mv ./result ./drive/MyDrive/EN-FR/

Phần Nâng Cao

Thêm Cơ Chế Attention

In [None]:
class BahdanauAttention(nn.Module):
  def __init__(self, enc_hid_dim, dec_hid_dim):
    super().__init__()
    # enc_hid_dim: kích thước encoder output (nếu bidirectional thì = enc_hid * 2)
    # dec_hid_dim: kích thước hidden decoder
    self.W_encoder = nn.Linear(enc_hid_dim, dec_hid_dim)
    self.W_decoder = nn.Linear(dec_hid_dim, dec_hid_dim)
    self.v = nn.Linear(dec_hid_dim, 1, bias=False)

  def forward(self, decoder_hidden, encoder_outputs, mask=None):
    decoder_hidden = decoder_hidden.unsqueeze(1)

    energy = torch.tanh(
        self.W_encoder(encoder_outputs) + self.W_decoder(decoder_hidden)
    )

    score = self.v(energy).squeeze(2)

    if mask is not None:
        score = score.masked_fill(mask == 0, -1e9)

    attn_weights = F.softmax(score, dim=1)

    return attn_weights

In [None]:
class BahdanauDecoder(nn.Module):
  def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout=0.1):
    super().__init__()

    self.embedding = nn.Embedding(output_dim, emb_dim)
    self.attention = BahdanauAttention(enc_hid_dim, dec_hid_dim)

    self.rnn = nn.GRU(emb_dim + enc_hid_dim, dec_hid_dim, batch_first=True)

    self.fc_out = nn.Linear(emb_dim + enc_hid_dim + dec_hid_dim, output_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, input_token, last_hidden, encoder_outputs, mask=None):
    embedded = self.dropout(self.embedding(input_token)).unsqueeze(1)  # [batch,1,emb]

    # Prepare decoder hidden: [batch, dec_hid_dim]
    dec_hidden = last_hidden.squeeze(0)

    # attention weights: [batch, src_len]
    attn_weights = self.attention(dec_hidden, encoder_outputs, mask)

    # context vector: [batch, enc_hid_dim]
    context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs).squeeze(1)

    # RNN input
    rnn_input = torch.cat([embedded.squeeze(1), context], dim=1).unsqueeze(1)

    # GRU
    output, hidden = self.rnn(rnn_input, last_hidden)
    output = output.squeeze(1)  # [batch, dec_hid]

    # final projection
    logits = self.fc_out(torch.cat([output, context, embedded.squeeze(1)], dim=1))

    return logits, hidden, attn_weights

In [None]:
class Seq2SeqWithAttention(nn.Module):
  def __init__(self, encoder, decoder, device):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.device = device

  def create_mask(self, source_tokens, padding_index):
    return (source_tokens != padding_index).to(self.device)

  def forward(
    self,
    source_tokens,
    source_lengths,
    target_tokens=None,
    teacher_forcing_ratio=0.5,
    padding_index=None
  ):

    batch_size = source_tokens.size(0)

    # Nếu không truyền padding_index → mặc định là 0
    if padding_index is None:
      padding_index = 0

    # --- ENCODER ---
    encoder_outputs, encoder_hidden_states = self.encoder(
      source_tokens, source_lengths
    )

    # Mask dùng trong attention
    attention_mask = self.create_mask(source_tokens, padding_index)

    # --- CHUẨN HÓA HIDDEN ĐỂ CHO VÀO DECODER ---
    if isinstance(encoder_hidden_states, tuple):
      # Trường hợp encoder là LSTM
      encoder_hidden_tensor = encoder_hidden_states[0]
    else:
      encoder_hidden_tensor = encoder_hidden_states

    # Nếu encoder là bidirectional
    if encoder_hidden_tensor.size(0) >= 2 and encoder_hidden_tensor.size(0) % 2 == 0:
      last_forward_state = encoder_hidden_tensor[-2, :, :]
      last_backward_state = encoder_hidden_tensor[-1, :, :]

      decoder_initial_hidden = torch.tanh(
        torch.cat((last_forward_state, last_backward_state), dim=1)
      ).unsqueeze(0)

    else:
      decoder_initial_hidden = encoder_hidden_tensor[-1, :, :].unsqueeze(0)

    if target_tokens is not None:
      target_length = target_tokens.size(1)
    else:
      target_length = 50  # độ dài tối đa khi suy luận

    outputs = torch.zeros(
        batch_size, target_length, self.decoder.output_dim
    ).to(self.device)

    # Token đầu vào đầu tiên cho decoder là token <sos>
    if target_tokens is not None:
        decoder_input_token = target_tokens[:, 0]
    else:
        sos_index = 3
        decoder_input_token = torch.LongTensor(
          [sos_index] * batch_size
        ).to(self.device)

    decoder_hidden = decoder_initial_hidden

    for t in range(1, target_length):
        output_logits, decoder_hidden, attention_weights = self.decoder(
          decoder_input_token,
          decoder_hidden,
          encoder_outputs,
          mask=attention_mask
        )

        outputs[:, t, :] = output_logits

        # Teacher forcing
        use_teacher_forcing = (
          target_tokens is not None
          and torch.rand(1).item() < teacher_forcing_ratio
        )

        predicted_token = output_logits.argmax(1)

        decoder_input_token = (
          target_tokens[:, t] if use_teacher_forcing else predicted_token
        )

    return outputs


In [None]:
encoder = Encoder(INPUT_DIM, EMB_DIM, HID_DIM).to(DEVICE)
attention = BahdanauAttention(HID_DIM).to(DEVICE)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, attention).to(DEVICE)
model = Seq2Seq(encoder, decoder, PAD_IDX).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)