<a href="https://colab.research.google.com/github/doduyquy/NLP_Machine-Translation-LSTM/blob/main/Load_file_pth_and_translate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load file checkpoint (pth)  và and translate (en --> fr)


# 1. Mount Google Drive


In [1]:
from google.colab import drive
drive.mount('/content/drive/')

# Kiểm tra file model tồn tại
import os
model_path = "/content/drive/My Drive/021.SGU/01.Subjects/NLP/[best_model]0-316_en-fr_Luong-Attention+Beam.pth"
dataset_path = "/content/drive/My Drive/021.SGU/01.Subjects/NLP/Dataset/"

print(f"Model exists: {os.path.exists(model_path)}")
print(f"Dataset folder exists: {os.path.exists(dataset_path)}")

Mounted at /content/drive/
Model exists: True
Dataset folder exists: True


# 2. Install Dependencies


In [2]:
!pip install spacy --quiet
!python -m spacy download en_core_web_sm --quiet
!python -m spacy download fr_core_news_sm --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m95.9 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.3/16.3 MB[0m [31m95.8 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('fr_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


# 3. Import Libraries

In [3]:
import torch
import torch.nn as nn
import spacy
from collections import Counter

# Check device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

Device: cpu


# 4. Define Constants and Special Tokens


In [4]:
# Special tokens
SPECIAL_TOKENS_CONST = ["<pad>", "<unk>", "<sos>", "<eos>"]
VOCAB_MAX_SIZE_CONST = 10000

# Model hyperparameters (giống với lúc train)
LSTM_NUM_LAYERS = 2
LSTM_DROPOUT = 0.3
LSTM_EMBED_DIM = 256
LSTM_HIDDEN_DIM = 512

# Paths
MODEL_PATH = model_path
DATASET_PATH = dataset_path

# 5. Load Tokenizers


In [None]:
# Load spacy's pre-trained models
nlp_en = spacy.load("en_core_web_sm")
nlp_fr = spacy.load("fr_core_news_sm")

def en_tokenizer(text):
    """Tokenize English text"""
    return [tok.text.lower() for tok in nlp_en.tokenizer(text)]

def fr_tokenizer(text):
    """Tokenize French text"""
    return [tok.text.lower() for tok in nlp_fr.tokenizer(text)]

# Test tokenizers
print("EN tokenizer test:", en_tokenizer("Hello, how are you?"))
print("FR tokenizer test:", fr_tokenizer("Bonjour, comment allez-vous?"))

# 6. Build Vocabulary Class


In [None]:
class Vocab:
    def __init__(self, tokens, max_size=VOCAB_MAX_SIZE_CONST):
        # Get dictionary: {word, frequency}
        self.freq = Counter(tokens)
        # Get only max_size most common words
        most_common = self.freq.most_common(max_size)

        # Set list of SPECIAL_TOKENS and most_common words
        self.itos = SPECIAL_TOKENS_CONST + [word for word, _ in most_common]
        # Set dict for each item
        self.stoi = {word: idx for idx, word in enumerate(self.itos)}

    def numericalize(self, tokens):
        """Convert tokens to indices"""
        return [self.stoi.get(tok, self.stoi.get('<unk>')) for tok in tokens]

    def __len__(self):
        return len(self.itos)

# 7. Load Training Data and Build Vocabularies


In [None]:
def load_file(path):
    """Load text file and return list of lines"""
    with open(path, 'r', encoding='utf-8') as file:
        return [line.strip() for line in file]

# Load training data
train_en = load_file(f"{DATASET_PATH}/train.en")
train_fr = load_file(f"{DATASET_PATH}/train.fr")

print(f"Loaded {len(train_en)} English sentences")
print(f"Loaded {len(train_fr)} French sentences")

# Tokenize training data
print("\nTokenizing training data...")
train_en_tok = [en_tokenizer(sentence) for sentence in train_en]
train_fr_tok = [fr_tokenizer(sentence) for sentence in train_fr]
print("Tokenization complete!")

# Build vocabularies
print("\nBuilding vocabularies...")
vocab_en = Vocab(tok for sentence in train_en_tok for tok in sentence)
vocab_fr = Vocab(tok for sentence in train_fr_tok for tok in sentence)

print(f"English vocabulary size: {len(vocab_en)}")
print(f"French vocabulary size: {len(vocab_fr)}")

# Get special token indices
PAD_IDX = vocab_fr.stoi.get('<pad>')
SOS_IDX = vocab_fr.stoi.get('<sos>')
EOS_IDX = vocab_fr.stoi.get('<eos>')

print(f"\nSpecial token indices:")
print(f"  PAD_IDX: {PAD_IDX}")
print(f"  SOS_IDX: {SOS_IDX}")
print(f"  EOS_IDX: {EOS_IDX}")

# 8. Define Model Architecture


In [None]:
class LuongAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim

    def forward(self, decoder_hidden, encoder_outputs):
        """
        decoder_hidden: [num_layers, batch, hidden]
        encoder_outputs: [src_len, batch, hidden]
        """
        # Get hidden of last layer: [batch, hidden]
        decoder_hidden = decoder_hidden[-1].unsqueeze(2)  # [batch, hidden, 1]

        # Score = encoder_output · decoder_hidden
        scores = torch.bmm(
            encoder_outputs.permute(1, 0, 2),
            decoder_hidden
        ).squeeze(2)  # [batch, src_len]

        attn_weights = torch.softmax(scores, dim=1)  # [batch, src_len]

        return attn_weights


class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=LSTM_NUM_LAYERS, dropout=LSTM_DROPOUT):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim,
                            num_layers=num_layers,
                            dropout=dropout)

    def forward(self, src, src_lens):
        embedded = self.embedding(src)

        # Pack padded sequence
        packed = nn.utils.rnn.pack_padded_sequence(
            embedded,
            src_lens.cpu(),
            enforce_sorted=True
        )
        # LSTM forward
        packed_outputs, (hidden, cell) = self.lstm(packed)
        # Unpack
        outputs, _ = nn.utils.rnn.pad_packed_sequence(packed_outputs)

        return outputs, hidden, cell


class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=LSTM_NUM_LAYERS, dropout=LSTM_DROPOUT):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim,
                            num_layers=num_layers,
                            dropout=dropout)
        # Attention module
        self.attention = LuongAttention(hidden_dim)
        # Fully connected - input size is hidden_dim * 2 (context + decoder output)
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)

    def forward(self, input, hidden, cell, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.embedding(input)

        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        # Apply attention
        attn_weights = self.attention(hidden, encoder_outputs)

        # Context vector
        context = torch.bmm(
            attn_weights.unsqueeze(1),
            encoder_outputs.permute(1, 0, 2)
        ).squeeze(1)

        # Concatenate context and decoder output
        combined = torch.cat([output.squeeze(0), context], dim=1)

        # Prediction
        prediction = self.fc(combined)

        return prediction, hidden, cell, attn_weights


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, src_lens, teacher_forcing=0.5):
        batch_size = trg.size(1)
        max_len = trg.size(0)
        vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(max_len, batch_size, vocab_size).to(src.device)

        encoder_outputs, hidden, cell = self.encoder(src, src_lens)

        input_token = trg[0, :]

        for timestep in range(1, max_len):
            output, hidden, cell, _ = self.decoder(input_token, hidden, cell, encoder_outputs)
            outputs[timestep] = output
            input_token = output.argmax(1)

        return outputs

# 9. Initialize and Load Model Weights

Tạo model instances và load weights từ file .pth đã lưu.

In [None]:
# Get vocabulary sizes
input_dim = len(vocab_en)
output_dim = len(vocab_fr)

print(f"Input dimension (EN vocab): {input_dim}")
print(f"Output dimension (FR vocab): {output_dim}")

# Create model instances
encoder = Encoder(
    vocab_size=input_dim,
    embed_dim=LSTM_EMBED_DIM,
    hidden_dim=LSTM_HIDDEN_DIM,
    num_layers=LSTM_NUM_LAYERS
).to(device)

decoder = Decoder(
    vocab_size=output_dim,
    embed_dim=LSTM_EMBED_DIM,
    hidden_dim=LSTM_HIDDEN_DIM,
    num_layers=LSTM_NUM_LAYERS
).to(device)

model = Seq2Seq(encoder, decoder).to(device)

# Load saved weights
print(f"\nLoading model from: {MODEL_PATH}")
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.eval()

print("[OK] Model loaded successfully!")

# 10. Define Translation Functions


In [None]:
def translate(sentence, max_len=50):

    model.eval()

    # Tokenize
    tokens = en_tokenizer(sentence)

    # Create token indices
    ids = (
        [vocab_en.stoi["<sos>"]] +
        [vocab_en.stoi.get(t, vocab_en.stoi["<unk>"]) for t in tokens] +
        [vocab_en.stoi["<eos>"]]
    )

    src = torch.tensor(ids).unsqueeze(1).to(device)
    src_lens = torch.tensor([len(ids)]).to(device)

    with torch.no_grad():
        encoder_outputs, hidden, cell = model.encoder(src, src_lens)

    input_tok = torch.tensor([vocab_fr.stoi["<sos>"]]).to(device)
    outputs = []

    for _ in range(max_len):
        with torch.no_grad():
            pred, hidden, cell, _ = model.decoder(input_tok, hidden, cell, encoder_outputs)

        top_id = pred.argmax(1).item()

        if top_id == vocab_fr.stoi["<eos>"]:
            break

        outputs.append(top_id)
        input_tok = torch.tensor([top_id]).to(device)

    return " ".join(vocab_fr.itos[i] for i in outputs)


def translate_beam(sentence, beam_width=5, max_len=50):

    model.eval()

    # Tokenize
    tokens = en_tokenizer(sentence)

    ids = (
        [vocab_en.stoi["<sos>"]] +
        [vocab_en.stoi.get(t, vocab_en.stoi["<unk>"]) for t in tokens] +
        [vocab_en.stoi["<eos>"]]
    )

    src = torch.tensor(ids).unsqueeze(1).to(device)
    src_lens = torch.tensor([len(ids)]).to(device)

    with torch.no_grad():
        encoder_outputs, hidden, cell = model.encoder(src, src_lens)

    # Initialize beam
    beams = [(0.0, [vocab_fr.stoi["<sos>"]], hidden, cell)]
    completed_hypotheses = []

    for step in range(max_len):
        next_beams = []

        for log_prob, tokens_seq, h, c in beams:
            if tokens_seq[-1] == vocab_fr.stoi["<eos>"]:
                completed_hypotheses.append((log_prob, tokens_seq))
                continue

            input_tok = torch.tensor([tokens_seq[-1]]).to(device)

            with torch.no_grad():
                pred, h_new, c_new, _ = model.decoder(input_tok, h, c, encoder_outputs)
                log_probs = torch.log_softmax(pred, dim=1)

            top_log_probs, top_indices = torch.topk(log_probs[0], beam_width)

            for token_log_prob, token_id in zip(top_log_probs, top_indices):
                new_log_prob = log_prob + token_log_prob.item()
                new_tokens = tokens_seq + [token_id.item()]
                next_beams.append((new_log_prob, new_tokens, h_new, c_new))

        next_beams.sort(key=lambda x: x[0], reverse=True)
        beams = next_beams[:beam_width]

        if not beams or all(tokens_seq[-1] == vocab_fr.stoi["<eos>"] for _, tokens_seq, _, _ in beams):
            break

    for log_prob, tokens_seq, _, _ in beams:
        completed_hypotheses.append((log_prob, tokens_seq))

    if not completed_hypotheses:
        return ""

    completed_hypotheses.sort(key=lambda x: x[0], reverse=True)
    best_log_prob, best_tokens = completed_hypotheses[0]

    output_tokens = [
        vocab_fr.itos[idx]
        for idx in best_tokens
        if idx != vocab_fr.stoi["<sos>"] and idx != vocab_fr.stoi["<eos>"]
    ]

    return " ".join(output_tokens)

print("[OK] Translation functions defined!")

# 11. Test Translation with Random Sentences

In [None]:
# Compare different beam widths
test_sentence = "child sits on street on a busy street . "

print(f"English: {test_sentence}")
print("-" * 60)
print(f"Greedy:    {translate(test_sentence)}")

for bw in [3, 5]:
    result = translate_beam(test_sentence, beam_width=bw)
    print(f"Beam-{bw:2d}:   {result}")

# The end.