<a href="https://colab.research.google.com/github/ayushmanlohani/Neural-translator-eng-fr-/blob/main/Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.nn import functional as F
from tqdm import tqdm
import csv
import re
import numpy as np
import math
from collections import defaultdict
import sys

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# --- Tokenizer Functions ---
def basic_tokenize(text):
    text = text.lower()
    text = re.sub(r'([.,!?;])', r' \1 ', text)
    text = re.sub(r'(["\'])', r' \1 ', text)
    text = re.sub(r'[^a-z0-9.,!?;\'\" ]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text.split()

def create_vocabulary(tokens_list, min_frequency=2):
    token_counts = defaultdict(int)
    for tokens in tokens_list:
        for token in tokens:
            token_counts[token] += 1
    vocab = {
        '<pad>': 0,
        '<unk>': 1,
        '<sos>': 2,
        '<eos>': 3
    }

    token_idx = len(vocab)
    for token, count in token_counts.items():
        if count >= min_frequency:
            vocab[token] = token_idx
            token_idx += 1
    return vocab

# --- Data Loading and Vocab Building ---
def load_and_process_data(path):
    eng_sentences = []
    fr_sentences = []
    eng_tokens_list = []
    fr_tokens_list = []

    print("Loading CSV and tokenizing...")
    with open(path, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header
        for row in csv_reader:
            if len(row) >= 2:
                # Store raw text
                eng_sentences.append(row[0])
                fr_sentences.append(row[1])

                # Store tokens for vocab building
                eng_tokens_list.append(basic_tokenize(row[0]))
                fr_tokens_list.append(basic_tokenize(row[1]))

    print(f"Loaded {len(eng_sentences)} pairs.")

    # Build Vocabularies in-memory
    print("Building English Vocabulary...")
    eng_vocab = create_vocabulary(eng_tokens_list, min_frequency=2)
    print("Building French Vocabulary...")
    fr_vocab = create_vocabulary(fr_tokens_list, min_frequency=2)

    print(f"English Vocab Size: {len(eng_vocab)}")
    print(f"French Vocab Size: {len(fr_vocab)}")

    return eng_sentences, fr_sentences, eng_vocab, fr_vocab

# Execute Loading
try:
    eng_sentences, fr_sentences, eng_vocab, fr_vocab = load_and_process_data('eng_french.csv')
except FileNotFoundError:
    print("Error: 'eng_french.csv' not found. Please upload the file to Colab.")

Using device: cuda
Loading CSV and tokenizing...
Loaded 175621 pairs.
Building English Vocabulary...
Building French Vocabulary...
English Vocab Size: 9782
French Vocab Size: 13478


In [None]:
from torch.utils.data import Dataset, DataLoader
import numpy as np

# --- Function for Tokenization ---
def tokenize_and_pad(text, vocab, max_length=128):
    """
    Converts text to list of indices, adds SOS/EOS, and pads to max_length.

    """
    # Tokenize using the same basic_tokenize function
    tokens = basic_tokenize(text)

    encoded = [vocab['<sos>']] + \
              [vocab.get(token, vocab['<unk>']) for token in tokens] + \
              [vocab['<eos>']]

    # Truncate if too long (account for SOS/EOS)
    if len(encoded) > max_length:
        encoded = encoded[:max_length]

    # Pad with zeros (vocab['<pad>'] is 0)
    padding = [vocab['<pad>']] * (max_length - len(encoded))
    encoded = encoded + padding

    return torch.tensor(encoded, dtype=torch.long)

# --- Dataset Class ---
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_length=128):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src_text = self.src_sentences[idx]
        tgt_text = self.tgt_sentences[idx]

        src_ids = tokenize_and_pad(src_text, self.src_vocab, self.max_length)
        tgt_ids = tokenize_and_pad(tgt_text, self.tgt_vocab, self.max_length)

        return {
            'src_ids': src_ids,
            'tgt_ids': tgt_ids,
            'src_text': src_text,
            'tgt_text': tgt_text
        }

# --- Split Data and Create Loaders ---

# 1. Shuffle indices
indices = list(range(len(eng_sentences)))
np.random.seed(42) # Fixed seed for reproducibility
np.random.shuffle(indices)

# 2. Reorder lists
eng_sentences_shuffled = [eng_sentences[i] for i in indices]
fr_sentences_shuffled = [fr_sentences[i] for i in indices]

# 3. 90/10 Split
split_idx = int(len(eng_sentences) * 0.9)

train_eng = eng_sentences_shuffled[:split_idx]
train_fr = fr_sentences_shuffled[:split_idx]
val_eng = eng_sentences_shuffled[split_idx:]
val_fr = fr_sentences_shuffled[split_idx:]

print(f"Training samples: {len(train_eng)}")
print(f"Validation samples: {len(val_eng)}")

# 4. Create Datasets
train_dataset = TranslationDataset(train_eng, train_fr, eng_vocab, fr_vocab)
val_dataset = TranslationDataset(val_eng, val_fr, eng_vocab, fr_vocab)

# 5. Create DataLoaders
batch_size = 64 # Reduced slightly to ensure stability
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

# Verify a single batch
sample_batch = next(iter(train_dataloader))
print(f"Batch shape src: {sample_batch['src_ids'].shape}")
print(f"Batch shape tgt: {sample_batch['tgt_ids'].shape}")

Training samples: 158058
Validation samples: 17563
Batch shape src: torch.Size([64, 128])
Batch shape tgt: torch.Size([64, 128])


In [None]:
import torch.nn as nn
import math

# --- Configuration Class ---
class TransformerConfig:
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        block_size=128,      # Matches dataset max_length
        n_layer=6,           # Encoder layers
        n_pre_cross_layer=3, # Decoder layers before cross-attention
        n_cross_layer=3,     # Decoder layers with cross-attention
        n_embd=256,
        num_heads=8,
        dropout=0.1
    ):
        self.src_vocab_size = src_vocab_size
        self.tgt_vocab_size = tgt_vocab_size
        self.block_size = block_size
        self.n_layer = n_layer
        self.n_pre_cross_layer = n_pre_cross_layer
        self.n_cross_layer = n_cross_layer
        self.n_embd = n_embd
        self.num_heads = num_heads
        self.dropout = dropout

# --- Sub-components ---

class FeedForward(nn.Module):

    """ Standard Feed Forward Layer """

    def __init__(self, config):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
            nn.ReLU(),
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.dropout),
        )

    def forward(self, x):
        return self.net(x)

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.num_heads == 0
        self.num_heads = config.num_heads
        self.head_size = config.n_embd // config.num_heads
        self.n_embd = config.n_embd

        self.q_proj = nn.Linear(config.n_embd, config.n_embd)
        self.k_proj = nn.Linear(config.n_embd, config.n_embd)
        self.v_proj = nn.Linear(config.n_embd, config.n_embd)
        self.out_proj = nn.Linear(config.n_embd, config.n_embd)

        self.dropout = nn.Dropout(config.dropout)

    def forward(self, q, k=None, v=None, mask=None, is_causal=False):
        batch_size = q.size(0)

        # If k, v are None, this is self-attention (use q)
        if k is None: k = q
        if v is None: v = q

        q_out = self.q_proj(q)
        k_out = self.k_proj(k)
        v_out = self.v_proj(v)

        # Reshape for multi-head
        # (B, T, num_heads, head_size) -> (B, num_heads, T, head_size)
        q_out = q_out.view(batch_size, -1, self.num_heads, self.head_size).transpose(1, 2)
        k_out = k_out.view(batch_size, -1, self.num_heads, self.head_size).transpose(1, 2)
        v_out = v_out.view(batch_size, -1, self.num_heads, self.head_size).transpose(1, 2)

        # Attention scores
        scores = (q_out @ k_out.transpose(-2, -1)) / math.sqrt(self.head_size)

        if is_causal:
            seq_len = q_out.size(-2)
            causal_mask = torch.triu(torch.ones(seq_len, seq_len, dtype=torch.bool, device=q.device), diagonal=1)
            scores.masked_fill_(causal_mask, float('-inf'))

        if mask is not None:
            # Mask shape handling
            if mask.dim() == 3: mask = mask.unsqueeze(1) # Add head dimension
            scores.masked_fill_(~mask, float('-inf'))

        attn = F.softmax(scores, dim=-1)
        attn = self.dropout(attn)

        out = attn @ v_out

        # Restore shape
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.n_embd)
        out = self.out_proj(out)
        return out

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.attn = MultiHeadAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.ffwd = FeedForward(config)

    def forward(self, x, mask=None, is_causal=False):
        x = x + self.attn(self.ln1(x), mask=mask, is_causal=is_causal)
        x = x + self.ffwd(self.ln2(x))
        return x

class CrossAttentionBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.self_attn = MultiHeadAttention(config)
        self.ln2 = nn.LayerNorm(config.n_embd)
        self.cross_attn = MultiHeadAttention(config)
        self.ln3 = nn.LayerNorm(config.n_embd)
        self.ffwd = FeedForward(config)

    def forward(self, x, enc_out, self_mask=None, cross_mask=None):
        # Self attention (Causal)
        x = x + self.self_attn(self.ln1(x), mask=self_mask, is_causal=True)
        # Cross attention
        x = x + self.cross_attn(q=self.ln2(x), k=enc_out, v=enc_out, mask=cross_mask)
        # Feed Forward
        x = x + self.ffwd(self.ln3(x))
        return x

class Encoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.blocks = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)

    def forward(self, x, mask=None):
        for block in self.blocks:
            x = block(x, mask=mask)
        return self.ln_f(x)

class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.pre_blocks = nn.ModuleList([Block(config) for _ in range(config.n_pre_cross_layer)])
        self.cross_blocks = nn.ModuleList([CrossAttentionBlock(config) for _ in range(config.n_cross_layer)])
        self.ln_f = nn.LayerNorm(config.n_embd)

    def forward(self, x, enc_out, padding_mask=None, cross_mask=None):
        # Pre-cross blocks (causal only)
        for block in self.pre_blocks:
            x = block(x, mask=padding_mask, is_causal=True)

        # Cross-attention blocks
        for block in self.cross_blocks:
            x = block(x, enc_out, self_mask=padding_mask, cross_mask=cross_mask)

        return self.ln_f(x)

class Transformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.src_tok_emb = nn.Embedding(config.src_vocab_size, config.n_embd)
        self.tgt_tok_emb = nn.Embedding(config.tgt_vocab_size, config.n_embd)
        self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
        self.drop = nn.Dropout(config.dropout)

        self.encoder = Encoder(config)
        self.decoder = Decoder(config)
        self.head = nn.Linear(config.n_embd, config.tgt_vocab_size, bias=False)

        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, src_ids, tgt_ids, src_mask=None, tgt_mask=None):
        B, T_src = src_ids.size()
        _, T_tgt = tgt_ids.size()

        # Source Embeddings
        src_emb = self.src_tok_emb(src_ids)
        src_pos = self.pos_emb[:, :T_src, :]
        x = self.drop(src_emb + src_pos)

        # Encoder
        encoder_out = self.encoder(x, src_mask)

        # Target Embeddings
        tgt_emb = self.tgt_tok_emb(tgt_ids)
        tgt_pos = self.pos_emb[:, :T_tgt, :]
        y = self.drop(tgt_emb + tgt_pos)

        # Decoder
        y = self.decoder(y, encoder_out, padding_mask=tgt_mask, cross_mask=src_mask)

        # Head
        logits = self.head(y)
        return logits

print("Transformer Architecture defined successfully.")

Transformer Architecture defined successfully.


In [None]:
from torch.optim import AdamW
import time

# --- Initialize Model ---
config = TransformerConfig(
    src_vocab_size=len(eng_vocab),
    tgt_vocab_size=len(fr_vocab),
    block_size=128,
    n_layer=6,
    n_pre_cross_layer=3,
    n_cross_layer=3,
    n_embd=256,
    num_heads=8,
    dropout=0.1
)

model = Transformer(config).to(device)

# --- Training Function ---
def train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=3e-4):
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # Scheduler: reduce LR if validation loss stops improving
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=2
    )

    criterion = nn.CrossEntropyLoss(ignore_index=0) # Ignore <pad> tokens
    best_val_loss = float('inf')

    print("Starting training...")

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        start_time = time.time()

        # --- Training Phase ---
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]', leave=False)
        for batch in progress_bar:
            src_ids = batch['src_ids'].to(device)
            tgt_ids = batch['tgt_ids'].to(device)

            # Create padding masks (1 for tokens, 0 for pad)
            # Shape: [batch, 1, 1, seq_len]
            src_mask = (src_ids != 0).unsqueeze(1).unsqueeze(2)
            tgt_mask = (tgt_ids != 0).unsqueeze(1).unsqueeze(2)

            # Forward pass
            # Input to decoder is tgt_ids without the last token
            decoder_input = tgt_ids[:, :-1]
            decoder_mask = tgt_mask[:, :, :, :-1]

            logits = model(src_ids, decoder_input, src_mask, decoder_mask)

            # Calculate loss
            # Target is tgt_ids without the first token (<sos>)
            targets = tgt_ids[:, 1:].contiguous().view(-1)
            predictions = logits.contiguous().view(-1, logits.size(-1))

            loss = criterion(predictions, targets)

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_train_loss += loss.item()
            progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})

        avg_train_loss = total_train_loss / len(train_loader)

        # --- Validation Phase ---
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for batch in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Val]', leave=False):
                src_ids = batch['src_ids'].to(device)
                tgt_ids = batch['tgt_ids'].to(device)

                src_mask = (src_ids != 0).unsqueeze(1).unsqueeze(2)
                tgt_mask = (tgt_ids != 0).unsqueeze(1).unsqueeze(2)

                decoder_input = tgt_ids[:, :-1]
                decoder_mask = tgt_mask[:, :, :, :-1]

                logits = model(src_ids, decoder_input, src_mask, decoder_mask)

                targets = tgt_ids[:, 1:].contiguous().view(-1)
                predictions = logits.contiguous().view(-1, logits.size(-1))

                loss = criterion(predictions, targets)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        # Step scheduler
        scheduler.step(avg_val_loss)

        elapsed = time.time() - start_time
        print(f'Epoch {epoch+1} | Time: {elapsed:.0f}s | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}')

        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pt')
            print(f"--> Best model saved (Loss: {best_val_loss:.4f})")

# --- Run Training ---
train_model(model, train_dataloader, val_dataloader, num_epochs=10)

Starting training...




Epoch 1 | Time: 774s | Train Loss: 2.6536 | Val Loss: 1.6682
--> Best model saved (Loss: 1.6682)




Epoch 2 | Time: 771s | Train Loss: 1.4024 | Val Loss: 1.2152
--> Best model saved (Loss: 1.2152)




Epoch 3 | Time: 772s | Train Loss: 1.0573 | Val Loss: 1.0435
--> Best model saved (Loss: 1.0435)




Epoch 4 | Time: 772s | Train Loss: 0.8795 | Val Loss: 0.9510
--> Best model saved (Loss: 0.9510)




Epoch 5 | Time: 771s | Train Loss: 0.7638 | Val Loss: 0.9047
--> Best model saved (Loss: 0.9047)




Epoch 6 | Time: 772s | Train Loss: 0.6801 | Val Loss: 0.8736
--> Best model saved (Loss: 0.8736)




Epoch 7 | Time: 772s | Train Loss: 0.6168 | Val Loss: 0.8553
--> Best model saved (Loss: 0.8553)




Epoch 8 | Time: 772s | Train Loss: 0.5648 | Val Loss: 0.8410
--> Best model saved (Loss: 0.8410)




Epoch 9 | Time: 772s | Train Loss: 0.5220 | Val Loss: 0.8412




Epoch 10 | Time: 772s | Train Loss: 0.4860 | Val Loss: 0.8402
--> Best model saved (Loss: 0.8402)


In [None]:
from google.colab import files
import shutil
from google.colab import drive

print("Checking for saved model...")
if os.path.exists('best_model.pt'):
    print(f"File found! Size: {os.path.getsize('best_model.pt') / 1024 / 1024:.2f} MB")

    # --- Download to Local Computer ---

    files.download('best_model.pt')

    # --- Save to Google Drive ---
    print("Mounting Google Drive...")
    drive.mount('/content/drive')

    # This saves it to the main folder of my Drive
    destination_path = '/content/drive/MyDrive/french_translator_model.pt'

    print(f"Copying model to {destination_path}...")
    shutil.copy('best_model.pt', destination_path)

    # Saving vocabularies as well
    # Saved them as text files for simplicity
    def save_vocab(vocab, filename):
        with open(filename, 'w') as f:
            for token, idx in vocab.items():
                f.write(f"{token}\t{idx}\n")

    save_vocab(eng_vocab, 'eng_vocab.txt')
    save_vocab(fr_vocab, 'fr_vocab.txt')

    shutil.copy('eng_vocab.txt', '/content/drive/MyDrive/eng_vocab.txt')
    shutil.copy('fr_vocab.txt', '/content/drive/MyDrive/fr_vocab.txt')

    print("SUCCESS: Model and vocabularies saved to Google Drive.")

else:
    print("ERROR: 'best_model.pt' not found. Did the training loop finish at least one validation phase?")

Checking for saved model...
File found! Size: 75.26 MB


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Mounting Google Drive...
Mounted at /content/drive
Copying model to /content/drive/MyDrive/french_translator_model.pt...
SUCCESS: Model and vocabularies saved to Google Drive.


In [None]:
# --- Load Best Model Weights ---
print("Loading best model weights...")
try:
    checkpoint = torch.load('best_model.pt', map_location=device)
    model.load_state_dict(checkpoint)
    print("Best model loaded successfully.")
except FileNotFoundError:
    print("Warning: 'best_model.pt' not found. Using current model weights.")

model.eval()

# --- Translation Function ---
def translate_sentence(model, sentence, src_vocab, tgt_vocab, device, max_length=50):
    # 1. Tokenize and Prepare Source
    tokens = basic_tokenize(sentence)
    src_indices = [src_vocab['<sos>']] + \
                  [src_vocab.get(t, src_vocab['<unk>']) for t in tokens] + \
                  [src_vocab['<eos>']]

    # Padding is strictly not necessary for batch_size=1, but good for consistency
    src_tensor = torch.LongTensor(src_indices).unsqueeze(0).to(device) # [1, seq_len]
    src_mask = (src_tensor != 0).unsqueeze(1).unsqueeze(2) # [1, 1, 1, seq_len]

    # 2. Encode
    with torch.no_grad():
        src_emb = model.src_tok_emb(src_tensor)
        src_pos = model.pos_emb[:, :src_tensor.size(1), :]
        encoder_out = model.encoder(model.drop(src_emb + src_pos), src_mask)

    # 3. Decode (Autoregressive)
    tgt_indices = [tgt_vocab['<sos>']]

    for _ in range(max_length):
        tgt_tensor = torch.LongTensor(tgt_indices).unsqueeze(0).to(device)
        tgt_mask = (tgt_tensor != 0).unsqueeze(1).unsqueeze(2)
        # Causal mask for decoder is handled inside the model's forward/blocks,
        # but need to ensure the mask passed covers the current sequence length.

        with torch.no_grad():
            # Get embeddings for current target sequence
            tgt_emb = model.tgt_tok_emb(tgt_tensor)
            tgt_pos = model.pos_emb[:, :tgt_tensor.size(1), :]
            y = model.drop(tgt_emb + tgt_pos)

            # Pass through Decoder
            # Note: We create a causal mask implicit in the Decoder logic,
            # but we pass the padding mask (tgt_mask) and cross_mask (src_mask)
            output = model.decoder(y, encoder_out, padding_mask=tgt_mask, cross_mask=src_mask)

            # Project to vocab
            logits = model.head(output)

            # Get last token logits
            next_token_logits = logits[0, -1, :]
            next_token_id = next_token_logits.argmax().item()

            tgt_indices.append(next_token_id)

            # Stop if End of Sequence
            if next_token_id == tgt_vocab['<eos>']:
                break

    # 4. Convert Indices to Text
    # Create reverse vocabulary mapping
    idx_to_word = {v: k for k, v in tgt_vocab.items()}

    translated_tokens = []
    for idx in tgt_indices:
        token = idx_to_word.get(idx, '')
        if token not in ['<sos>', '<eos>', '<pad>']:
            translated_tokens.append(token)

    return " ".join(translated_tokens)

# --- Test on Manual Inputs ---
print("\n--- Testing Translation ---")
test_sentences = [
    "hello how are you",
    "i love programming",
    "the cat is on the table",
    "my keyboard is black and it is of gaming type"
]

for s in test_sentences:
    trans = translate_sentence(model, s, eng_vocab, fr_vocab, device)
    print(f"En: {s}")
    print(f"Fr: {trans}")
    print("-" * 20)

Loading best model weights...
Best model loaded successfully.

--- Testing Translation ---
En: hello how are you
Fr: salut tes vous .
--------------------
En: i love programming
Fr: j ' adore les barbecues .
--------------------
En: the cat is on the table
Fr: le chat est sur la table .
--------------------
En: my keyboard is black and it is of gaming type
Fr: mon clavier est noir et c ' est du genre de type de <unk> .
--------------------


In [None]:
from nltk.translate.bleu_score import corpus_bleu
import random

def calculate_bleu(data_loader, model, src_vocab, tgt_vocab, device, num_samples=100):
    model.eval()
    sources = []
    targets = [] # References need to be a list of lists
    hypotheses = []

    # Get all validation data
    print(f"Preparing to calculate BLEU on {num_samples} samples...")

    # Extract random indices
    indices = random.sample(range(len(data_loader.dataset)), min(num_samples, len(data_loader.dataset)))

    # Reverse vocab for decoding
    idx_to_word = {v: k for k, v in tgt_vocab.items()}

    with torch.no_grad():
        for i in tqdm(indices, desc="Translating"):
            item = data_loader.dataset[i]
            src_text = item['src_text']
            tgt_text = item['tgt_text']

            # Translate
            prediction = translate_sentence(model, src_text, src_vocab, tgt_vocab, device)

            # Tokenize for BLEU (simple split)
            ref_tokens = tgt_text.split()
            pred_tokens = prediction.split()

            targets.append([ref_tokens]) # List of references (we have 1 per sentence)
            hypotheses.append(pred_tokens)

            # Print first 3 examples to inspect visually
            if len(targets) <= 3:
                print(f"\nRef: {tgt_text}")
                print(f"Pred: {prediction}")

    # Calculate BLEU-4
    score = corpus_bleu(targets, hypotheses) * 100
    return score

# Run Evaluation
bleu_score = calculate_bleu(val_dataloader, model, eng_vocab, fr_vocab, device)
print(f"\nBLEU Score: {bleu_score:.2f}")

Preparing to calculate BLEU on 100 samples...


Translating:   3%|▎         | 3/100 [00:00<00:04, 21.20it/s]


Ref: Elle l'a battu à mort.
Pred: elle l ' a battu en d tail .

Ref: Elle vit à Kyoto.
Pred: elle habite kyoto .

Ref: Tom voulait que Mary dise oui.
Pred: tom voulait que mary dise oui .


Translating: 100%|██████████| 100/100 [00:05<00:00, 16.96it/s]


BLEU Score: 9.60





In [None]:
# --- Recalculate BLEU with Normalization ---
def calculate_bleu_normalized(data_loader, model, src_vocab, tgt_vocab, device, num_samples=100):
    model.eval()
    targets = []
    hypotheses = []

    # Use same seed for consistency with previous run if desired,
    # but here we just sample again.
    indices = random.sample(range(len(data_loader.dataset)), min(num_samples, len(data_loader.dataset)))

    idx_to_word = {v: k for k, v in tgt_vocab.items()}

    print(f"Recalculating BLEU (Normalized) on {len(indices)} samples...")

    with torch.no_grad():
        for i in tqdm(indices, desc="Translating"):
            item = data_loader.dataset[i]
            src_text = item['src_text']
            tgt_text = item['tgt_text'] # Contains "Tom"

            # Translate
            prediction = translate_sentence(model, src_text, src_vocab, tgt_vocab, device)

            # NORMALIZE: Convert both to lowercase before splitting
            ref_tokens = tgt_text.lower().split()
            pred_tokens = prediction.lower().split()

            targets.append([ref_tokens])
            hypotheses.append(pred_tokens)

    score = corpus_bleu(targets, hypotheses) * 100
    return score

# Run Corrected Evaluation
true_bleu_score = calculate_bleu_normalized(val_dataloader, model, eng_vocab, fr_vocab, device)
print(f"\nCorrected BLEU Score: {true_bleu_score:.2f}")

Recalculating BLEU (Normalized) on 100 samples...


Translating: 100%|██████████| 100/100 [00:06<00:00, 14.62it/s]


Corrected BLEU Score: 15.59





In [None]:
import ipywidgets as widgets
from IPython.display import display

print("--- English to French Neural Translator ---")
print("Enter an English sentence to translate:")

# Create widgets
text_input = widgets.Text(
    value='',
    placeholder='Type something (e.g., "the cat is black")',
    description='Input:',
    disabled=False
)

output_label = widgets.Label(value="Translation will appear here...")

def on_submit(change):
    if change.new:
        sentence = change.new
        # Translate
        try:
            translation = translate_sentence(model, sentence, eng_vocab, fr_vocab, device)
            output_label.value = f"French: {translation}"
        except Exception as e:
            output_label.value = f"Error: {str(e)}"

text_input.observe(on_submit, names='value')

display(text_input, output_label)

--- English to French Neural Translator ---
Enter an English sentence to translate:


Text(value='', description='Input:', placeholder='Type something (e.g., "the cat is black")')

Label(value='Translation will appear here...')