In [3]:
!pip install torch numpy nltk datasets matplotlib pydantic transformers outlines typing scikit-learn outlines



In [2]:
from google.colab import files

# Upload the file
uploaded = files.upload()
# This will open a file browser - select your eng-fra.txt file

# Then modify the path in your code to just use the filename
lines = open('eng-fra.txt', encoding='utf-8').read().strip().split('\n')

Saving eng-fra.txt to eng-fra (1).txt


In [4]:
import os
import time
import json
import csv
from typing import Literal
from pydantic import BaseModel, Field
from transformers import AutoModelForCausalLM, AutoTokenizer
import outlines
from datasets import load_dataset
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np
import torch
# Check GPU info if available
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    print(f"CUDA Version: {torch.version.cuda}")

GPU: NVIDIA L4
GPU Memory: 23.8 GB
CUDA Version: 12.6


In [8]:
# please fix byself to make the corresponding packages work properly.
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import re
import random
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import time
import math
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.nn import (
    GRU,
    LSTM,
    TransformerEncoder,
    TransformerEncoderLayer,
    MultiheadAttention,
    TransformerDecoder,
    TransformerDecoderLayer,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

SOS_token = 0
EOS_token = 1


class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}
        self.n_words = 2  # Count SOS and EOS

    def addSentence(self, sentence):
        for word in sentence.split(" "):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1


# Turn a Unicode string to plain ASCII, thanks to
# https://stackoverflow.com/a/518232/2809427
def unicodeToAscii(s):
    return "".join(
        c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn"
    )


# Lowercase, trim, and remove non-letter characters
def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z!?]+", r" ", s)
    return s.strip()


def readLangs(lang1, lang2, reverse=False):
    print("Reading lines...")

    # Read the file and split into lines
    lines = (
        open("./%s-%s.txt" % (lang1, lang2), encoding="utf-8")
        .read()
        .strip()
        .split("\n")
    )

    # Split every line into pairs and normalize
    pairs = [[normalizeString(s) for s in l.split("\t")] for l in lines]

    # Reverse pairs, make Lang instances
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)

    return input_lang, output_lang, pairs


MAX_LENGTH = 10

eng_prefixes = (
    "i am ",
    "i m ",
    "he is",
    "he s ",
    "she is",
    "she s ",
    "you are",
    "you re ",
    "we are",
    "we re ",
    "they are",
    "they re ",
)


def filterPair(p):
    return (
        len(p[0].split(" ")) < MAX_LENGTH
        and len(p[1].split(" ")) < MAX_LENGTH
        and p[1].startswith(eng_prefixes)
    )


def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]


def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    print("Read %s sentence pairs" % len(pairs))
    pairs = filterPairs(pairs)
    print("Trimmed to %s sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs


input_lang, output_lang, pairs = prepareData("eng", "fra", True)
print(random.choice(pairs))


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return "%dm %ds" % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return "%s (- %s)" % (asMinutes(s), asMinutes(rs))


class PositionalEncoding(nn.Module):
    """Positional encoding to add position information to embeddings"""

    def __init__(self, hidden_size, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Create positional encoding matrix
        pe = torch.zeros(max_len, hidden_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, hidden_size, 2).float() * (-math.log(10000.0) / hidden_size)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, hidden_size)

        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1), :]
        return self.dropout(x)


class Seq2SeqTransformer(nn.Module):
    """Sequence-to-Sequence Transformer model for machine translation"""

    def __init__(
        self,
        input_vocab_size,
        output_vocab_size,
        hidden_size=256,
        nhead=8,
        num_encoder_layers=3,
        num_decoder_layers=3,
        dim_feedforward=512,
        dropout=0.1,
    ):
        super(Seq2SeqTransformer, self).__init__()

        self.hidden_size = hidden_size
        self.nhead = nhead

        # Embedding layers
        self.src_embedding = nn.Embedding(input_vocab_size, hidden_size)
        self.tgt_embedding = nn.Embedding(output_vocab_size, hidden_size)

        # Positional encoding
        self.pos_encoder = PositionalEncoding(hidden_size, dropout)

        # Transformer encoder
        encoder_layer = TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,
        )
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_encoder_layers)

        # Transformer decoder
        decoder_layer = TransformerDecoderLayer(
            d_model=hidden_size,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,
        )
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_decoder_layers)

        # Output layer
        self.fc_out = nn.Linear(hidden_size, output_vocab_size)

        self.dropout = nn.Dropout(dropout)

        # Initialize weights
        self._init_weights()

    def _init_weights(self):
        """Initialize weights for better training"""
        initrange = 0.1
        self.src_embedding.weight.data.uniform_(-initrange, initrange)
        self.tgt_embedding.weight.data.uniform_(-initrange, initrange)
        self.fc_out.bias.data.zero_()
        self.fc_out.weight.data.uniform_(-initrange, initrange)

    def generate_square_subsequent_mask(self, sz):
        """Generate mask for decoder to prevent attending to future tokens"""
        mask = torch.triu(torch.ones(sz, sz), diagonal=1)
        mask = mask.masked_fill(mask == 1, float("-inf"))
        return mask

    def create_padding_mask(self, seq, pad_idx=0):
        """Create padding mask to ignore padding tokens"""
        return seq == pad_idx

    def forward(self, src, tgt, src_padding_mask=None, tgt_padding_mask=None):
        """
        Args:
            src: Source sequences (batch_size, src_seq_len)
            tgt: Target sequences (batch_size, tgt_seq_len)
            src_padding_mask: Mask for source padding (batch_size, src_seq_len)
            tgt_padding_mask: Mask for target padding (batch_size, tgt_seq_len)

        Returns:
            output: Predictions (batch_size, tgt_seq_len, output_vocab_size)
        """
        # Embed source and add positional encoding
        src_emb = self.src_embedding(src) * math.sqrt(self.hidden_size)
        src_emb = self.pos_encoder(src_emb)

        # Embed target and add positional encoding
        tgt_emb = self.tgt_embedding(tgt) * math.sqrt(self.hidden_size)
        tgt_emb = self.pos_encoder(tgt_emb)

        # Create target mask (causal mask)
        tgt_seq_len = tgt.size(1)
        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len).to(tgt.device)

        # Encode source
        memory = self.transformer_encoder(
            src_emb, src_key_padding_mask=src_padding_mask
        )

        # Decode
        output = self.transformer_decoder(
            tgt_emb,
            memory,
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_padding_mask,
            memory_key_padding_mask=src_padding_mask,
        )

        # Project to vocabulary
        output = self.fc_out(output)

        return F.log_softmax(output, dim=-1)

    def encode(self, src, src_padding_mask=None):
        """Encode source sequence"""
        src_emb = self.src_embedding(src) * math.sqrt(self.hidden_size)
        src_emb = self.pos_encoder(src_emb)
        memory = self.transformer_encoder(
            src_emb, src_key_padding_mask=src_padding_mask
        )
        return memory

    def decode_step(self, tgt, memory, memory_padding_mask=None):
        # Embed target and add positional encoding
        tgt_emb = self.tgt_embedding(tgt) * math.sqrt(self.hidden_size)
        tgt_emb = self.pos_encoder(tgt_emb)

        # Create target mask (causal mask)
        tgt_seq_len = tgt.size(1)
        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len).to(tgt.device)

        # Decode
        output = self.transformer_decoder(
            tgt_emb,
            memory,
            tgt_mask=tgt_mask,
            memory_key_padding_mask=memory_padding_mask,
        )

        # Project to vocabulary
        output = self.fc_out(output)

        return F.log_softmax(output, dim=-1)


def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(" ")]


def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(1, -1)


def get_dataloader(batch_size):
    """Create dataloader for training"""
    input_lang, output_lang, pairs = prepareData("eng", "fra", True)

    n = len(pairs)
    input_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32)
    target_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32)

    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexesFromSentence(input_lang, inp)
        tgt_ids = indexesFromSentence(output_lang, tgt)
        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, : len(inp_ids)] = inp_ids
        target_ids[idx, : len(tgt_ids)] = tgt_ids

    train_data = TensorDataset(
        torch.LongTensor(input_ids).to(device), torch.LongTensor(target_ids).to(device)
    )

    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(
        train_data, sampler=train_sampler, batch_size=batch_size
    )
    return input_lang, output_lang, train_dataloader


def train_epoch_transformer(
    dataloader, model, optimizer, criterion, clip=1.0, teacher_forcing_ratio=0.9
):
    """Train for one epoch with probabilistic teacher forcing (like AttnDecoderRNN)"""
    model.train()
    total_loss = 0

    for src, tgt in dataloader:
        optimizer.zero_grad()

        # Create padding masks
        src_padding_mask = src == 0

        # Encode source
        memory = model.encode(src, src_padding_mask)

        # Start with SOS token
        batch_size = src.size(0)
        decoder_input = torch.full(
            (batch_size, 1), SOS_token, dtype=torch.long, device=device
        )

        # Store outputs for loss calculation
        decoder_outputs = []
        max_target_len = tgt.size(1) - 1  # Exclude SOS token

        # Decode step by step (like AttnDecoderRNN)
        for i in range(max_target_len):
            # Decode one step
            decoder_output = model.decode_step(decoder_input, memory, src_padding_mask)

            # Get the last token prediction (batch_size, vocab_size)
            step_output = decoder_output[:, -1, :]
            decoder_outputs.append(step_output)

            # Decide whether to use teacher forcing (like in AttnDecoderRNN)
            if random.random() < teacher_forcing_ratio:
                # Teacher forcing: use ground truth as next input
                decoder_input = torch.cat(
                    [decoder_input, tgt[:, i].unsqueeze(1)], dim=1
                )
            else:
                # Use model's own prediction as next input
                _, topi = step_output.topk(1)
                next_token = topi.detach()
                decoder_input = torch.cat([decoder_input, next_token], dim=1)

        # Stack all outputs (batch_size, seq_len, vocab_size)
        output = torch.stack(decoder_outputs, dim=1)

        # Target output (exclude SOS token)
        tgt_output = tgt[:, 1:]

        # Calculate loss
        loss = criterion(
            output.contiguous().view(-1, output.size(-1)),
            tgt_output.contiguous().view(-1),
        )

        # Backward pass
        loss.backward()

        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)


def train_transformer(
    train_dataloader,
    model,
    n_epochs,
    learning_rate=0.0005,
    print_every=5,
    initial_teacher_forcing=0.95,
    final_teacher_forcing=0.5,
):
    """Train the transformer model with scheduled sampling"""
    start = time.time()
    plot_losses = []
    print_loss_total = 0

    optimizer = optim.Adam(
        model.parameters(), lr=learning_rate, betas=(0.9, 0.98), eps=1e-9
    )
    criterion = nn.NLLLoss(ignore_index=0)  # Ignore padding

    print("Starting training with scheduled sampling")
    print(f"Teacher forcing: {initial_teacher_forcing} -> {final_teacher_forcing}")

    for epoch in range(1, n_epochs + 1):
        # Gradually decrease teacher forcing ratio (scheduled sampling)
        teacher_forcing_ratio = initial_teacher_forcing - (
            (initial_teacher_forcing - final_teacher_forcing) * (epoch / n_epochs)
        )

        loss = train_epoch_transformer(
            train_dataloader,
            model,
            optimizer,
            criterion,
            teacher_forcing_ratio=teacher_forcing_ratio,
        )

        print_loss_total += loss

        if epoch % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print(
                f"{timeSince(start, epoch / n_epochs)} (Epoch {epoch} {epoch / n_epochs * 100:.0f}%) "
                f"Loss: {print_loss_avg:.4f} TF: {teacher_forcing_ratio:.2f}"
            )
            plot_losses.append(print_loss_avg)

    return plot_losses

    return plot_losses


def evaluate_transformer(
    model, sentence, input_lang, output_lang, max_length=MAX_LENGTH
):
    """Evaluate a single sentence"""
    model.eval()

    with torch.no_grad():
        # Prepare input
        input_tensor = tensorFromSentence(input_lang, sentence)

        # Encode
        src_padding_mask = input_tensor == 0
        memory = model.encode(input_tensor, src_padding_mask)

        # Start with SOS token
        decoder_input = torch.tensor([[SOS_token]], device=device)
        decoded_words = []

        for _ in range(max_length):
            # Decode using decode_step
            output = model.decode_step(
                decoder_input,
                memory,
                src_padding_mask,
            )

            # Get the last token prediction
            output_token = output[:, -1, :].argmax(dim=-1)

            if output_token.item() == EOS_token:
                decoded_words.append("<EOS>")
                break

            decoded_words.append(output_lang.index2word[output_token.item()])

            # Append to decoder input for next iteration
            decoder_input = torch.cat([decoder_input, output_token.unsqueeze(0)], dim=1)

    return decoded_words, None


def evaluateRandomly_transformer(model, pairs, input_lang, output_lang, n=10):
    """Evaluate random sentences"""
    for i in range(n):
        pair = random.choice(pairs)
        print(">", pair[0])
        print("=", pair[1])
        output_words, _ = evaluate_transformer(model, pair[0], input_lang, output_lang)
        output_sentence = " ".join(output_words)
        print("<", output_sentence)
        print("")


def evaluateDataset_transformer(model, input_lang, output_lang, pairs):
    """Evaluate the whole dataset and compute BLEU score with smoothing"""
    from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

    smooth = SmoothingFunction()
    total_bleu_score = 0
    model.eval()

    for pair in pairs:
        reference = [pair[1].split(" ")]
        output_words, _ = evaluate_transformer(model, pair[0], input_lang, output_lang)

        # Remove <EOS> token
        if output_words and output_words[-1] == "<EOS>":
            output_words = output_words[:-1]

        candidate = output_words

        # Use smoothing to handle zero n-gram counts
        bleu_score = sentence_bleu(
            reference, candidate, weights=(0.5, 0.5), smoothing_function=smooth.method4
        )
        total_bleu_score += bleu_score

    average_bleu_score = total_bleu_score / len(pairs)
    return average_bleu_score


if __name__ == "__main__":
    print("=" * 80)
    print("Transformer-based Machine Translation: French to English")
    print("=" * 80)

    # IMPROVED Hyperparameters
    hidden_size = 128
    nhead = 4
    num_encoder_layers = 3
    num_decoder_layers = 3
    dim_feedforward = 256
    dropout = 0.3
    batch_size = 64
    n_epochs = 50
    learning_rate = 0.0003
    initial_teacher_forcing = 0.8
    final_teacher_forcing = 0.5

    print(f"\nHyperparameters:")
    print(f"  Hidden size: {hidden_size}")
    print(f"  Number of heads: {nhead}")
    print(f"  Encoder layers: {num_encoder_layers}")
    print(f"  Decoder layers: {num_decoder_layers}")
    print(f"  Feedforward dim: {dim_feedforward}")
    print(f"  Dropout: {dropout}")
    print(f"  Batch size: {batch_size}")
    print(f"  Epochs: {n_epochs}")
    print(f"  Learning rate: {learning_rate}")
    print(f"  Teacher forcing: {initial_teacher_forcing} -> {final_teacher_forcing}")
    print(f"  Device: {device}")

    # Load data
    print("\nLoading data...")
    input_lang, output_lang, train_dataloader = get_dataloader(batch_size)

    # Initialize model
    print("\nInitializing Transformer model...")
    model = Seq2SeqTransformer(
        input_vocab_size=input_lang.n_words,
        output_vocab_size=output_lang.n_words,
        hidden_size=hidden_size,
        nhead=nhead,
        num_encoder_layers=num_encoder_layers,
        num_decoder_layers=num_decoder_layers,
        dim_feedforward=dim_feedforward,
        dropout=dropout,
    ).to(device)

    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")

    # Train
    print("\n" + "=" * 80)
    plot_losses = train_transformer(
        train_dataloader,
        model,
        n_epochs,
        learning_rate,
        print_every=5,
        initial_teacher_forcing=initial_teacher_forcing,
        final_teacher_forcing=final_teacher_forcing,
    )

    # Evaluate
    print("\n" + "=" * 80)
    print("Evaluation on random samples:")
    print("=" * 80)
    evaluateRandomly_transformer(model, pairs, input_lang, output_lang, n=10)

    # Calculate BLEU score
    print("\n" + "=" * 80)
    print("Computing BLEU score on full dataset...")
    bleu_score = evaluateDataset_transformer(model, input_lang, output_lang, pairs)
    print(f"BLEU score: {bleu_score:.4f}")
    print("=" * 80)

    # Save model
    print("\nSaving model...")
    torch.save(
        {
            "model_state_dict": model.state_dict(),
            "input_lang": input_lang,
            "output_lang": output_lang,
            "hyperparameters": {
                "hidden_size": hidden_size,
                "nhead": nhead,
                "num_encoder_layers": num_encoder_layers,
                "num_decoder_layers": num_decoder_layers,
                "dim_feedforward": dim_feedforward,
                "dropout": dropout,
            },
        },
        "transformer_translation_model.pt",
    )
    print("Model saved to 'transformer_translation_model.pt'")
    print("\nTraining complete!")


Reading lines...
Read 135842 sentence pairs
Trimmed to 11445 sentence pairs
Counting words...
Counted words:
fra 4601
eng 2991
['elles sont presque la', 'they re almost here']
Transformer-based Machine Translation: French to English

Hyperparameters:
  Hidden size: 128
  Number of heads: 4
  Encoder layers: 3
  Decoder layers: 3
  Feedforward dim: 256
  Dropout: 0.3
  Batch size: 64
  Epochs: 50
  Learning rate: 0.0003
  Teacher forcing: 0.8 -> 0.5
  Device: cuda

Loading data...
Reading lines...
Read 135842 sentence pairs
Trimmed to 11445 sentence pairs
Counting words...
Counted words:
fra 4601
eng 2991

Initializing Transformer model...
Total parameters: 2,351,407
Trainable parameters: 2,351,407

Starting training with scheduled sampling
Teacher forcing: 0.8 -> 0.5
1m 56s (- 17m 25s) (Epoch 5 10%) Loss: 3.8402 TF: 0.77
3m 52s (- 15m 30s) (Epoch 10 20%) Loss: 2.7667 TF: 0.74
5m 48s (- 13m 33s) (Epoch 15 30%) Loss: 2.2220 TF: 0.71
7m 44s (- 11m 37s) (Epoch 20 40%) Loss: 1.8343 TF: 0.68

  output = torch._nested_tensor_from_mask(


< m not an early morning <EOS>

> je suis mecontent
= i m unhappy
< m unhappy <EOS>

> vous prechez des convaincus
= you re preaching to the choir
< re preaching to the choir <EOS>

> nous regrettons de ne pas pouvoir vous aider
= we re sorry we can t help you
< re sorry we can t help you <EOS>

> je vais etudier le francais l annee prochaine
= i m going to study french next year
< m going to french french next year year <EOS>

> je suis depourvu de talent
= i m untalented
< m untalented <EOS>

> vous regardez tout le temps la television
= you are always watching tv
< are always watching tv <EOS>

> vous etes tres effrontes
= you re very forward
< re very forward <EOS>

> il est assez age pour conduire une voiture
= he is old enough to drive a car
< is old enough to drive car <EOS>

> je me sens plutot fatigue
= i m feeling sort of tired
< m feeling sort tired tired <EOS>


Computing BLEU score on full dataset...
BLEU score: 0.6074

Saving model...
Model saved to 'transformer_translati