<a href="https://colab.research.google.com/github/IwohubMedia/Machine-Translation-AI-project/blob/main/Machine_Translation_with_Seq2Seq_%2B_Attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
# ==========================
# Machine Translation (Seq2Seq with Attention) - Full Project
# ==========================

# Install dependencies
!pip install torch==2.2.0 sacrebleu datasets spacy -q
!python -m spacy download en_core_web_sm -q
!python -m spacy download de_core_news_sm -q

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
import sacrebleu
from datasets import load_dataset
import spacy
import json

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --------------------------
# Dataset & Tokenization
# --------------------------
dataset = load_dataset("multi30k", split={"train":"train", "valid":"validation", "test":"test"})

spacy_en = spacy.load("en_core_web_sm")
spacy_de = spacy.load("de_core_news_sm")

def tokenize_en(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [tok.text.lower() for tok in spacy_de.tokenizer(text)]

# Build vocab
from collections import Counter

def build_vocab(texts, tokenizer, min_freq=2):
    counter = Counter()
    for txt in texts:
        counter.update(tokenizer(txt))
    vocab = {"<unk>":0, "<pad>":1, "<bos>":2, "<eos>":3}
    idx = 4
    for token, freq in counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = idx
            idx += 1
    itos = {i:s for s,i in vocab.items()}
    return vocab, itos

src_texts = [ex["translation"]["en"] for ex in dataset["train"]]
tgt_texts = [ex["translation"]["de"] for ex in dataset["train"]]

vocab_src, itos_src = build_vocab(src_texts, tokenize_en)
vocab_tgt, itos_tgt = build_vocab(tgt_texts, tokenize_de)

def process_sentence(sentence, vocab, tokenizer):
    tokens = ["<bos>"] + tokenizer(sentence.lower()) + ["<eos>"]
    return torch.tensor([vocab.get(tok, vocab["<unk>"]) for tok in tokens], dtype=torch.long)

def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for ex in batch:
        src = process_sentence(ex["translation"]["en"], vocab_src, tokenize_en)
        tgt = process_sentence(ex["translation"]["de"], vocab_tgt, tokenize_de)
        src_batch.append(src)
        tgt_batch.append(tgt)
    src_batch = pad_sequence(src_batch, padding_value=vocab_src["<pad>"])
    tgt_batch = pad_sequence(tgt_batch, padding_value=vocab_tgt["<pad>"])
    return src_batch, tgt_batch

BATCH_SIZE = 32
train_loader = DataLoader(dataset["train"], batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(dataset["valid"], batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
test_loader  = DataLoader(dataset["test"],  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

# --------------------------
# Seq2Seq with Attention
# --------------------------
class Attention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        self.attn = nn.Linear(hid_dim * 2, hid_dim)
        self.v = nn.Linear(hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[0]
        hidden = hidden[-1].unsqueeze(1).repeat(1, src_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs.permute(1,0,2)), dim=2)))
        attention = self.v(energy).squeeze(2)
        return torch.softmax(attention, dim=1)

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return outputs, hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(hid_dim + emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim * 2 + emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        self.attention = attention

    def forward(self, input, hidden, cell, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        a = self.attention(hidden, encoder_outputs).unsqueeze(1)
        weighted = torch.bmm(a, encoder_outputs.permute(1,0,2)).permute(1,0,2)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        prediction = self.fc_out(torch.cat((output.squeeze(0), weighted.squeeze(0), embedded.squeeze(0)), dim=1))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, hidden, cell = self.encoder(src)
        input = trg[0,:]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell, encoder_outputs)
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if torch.rand(1).item() < teacher_forcing_ratio else top1
        return outputs

# --------------------------
# Training
# --------------------------
INPUT_DIM, OUTPUT_DIM = len(vocab_src), len(vocab_tgt)
ENC_EMB_DIM, DEC_EMB_DIM, HID_DIM = 256, 256, 512
N_LAYERS, ENC_DROPOUT, DEC_DROPOUT = 2, 0.5, 0.5

attn = Attention(HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn)
model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)

optimizer = optim.Adam(model.parameters())
PAD_IDX = vocab_tgt["<pad>"]
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

def train_epoch(model, iterator, optimizer, criterion, clip=1):
    model.train()
    epoch_loss = 0
    for src, tgt in iterator:
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)
        optimizer.zero_grad()
        output = model(src, tgt)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        tgt = tgt[1:].view(-1)
        loss = criterion(output, tgt)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for src, tgt in iterator:
            src, tgt = src.to(DEVICE), tgt.to(DEVICE)
            output = model(src, tgt, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            tgt = tgt[1:].view(-1)
            loss = criterion(output, tgt)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

N_EPOCHS = 3
for epoch in range(N_EPOCHS):
    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    valid_loss = evaluate(model, valid_loader, criterion)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.3f} | Valid Loss: {valid_loss:.3f}")

# --------------------------
# Translation & BLEU
# --------------------------
def translate_sentence(sentence, model, vocab_src, vocab_tgt, tokenizer, max_len=50):
    model.eval()
    tokens = ["<bos>"] + tokenizer(sentence.lower()) + ["<eos>"]
    src_indexes = [vocab_src.get(tok, vocab_src["<unk>"]) for tok in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(DEVICE)
    with torch.no_grad():
        encoder_outputs, hidden, cell = model.encoder(src_tensor)
    trg_indexes = [vocab_tgt["<bos>"]]
    for _ in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(DEVICE)
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell, encoder_outputs)
            pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        if pred_token == vocab_tgt["<eos>"]:
            break
    trg_tokens = [itos_tgt[i] for i in trg_indexes]
    return trg_tokens[1:-1]

print("Example:", translate_sentence("I love machine translation.", model, vocab_src, vocab_tgt, tokenize_en))

def calculate_bleu(data, model, vocab_src, vocab_tgt, tokenizer, max_len=50):
    preds, refs = [], []
    for ex in data:
        pred_tokens = translate_sentence(ex["translation"]["en"], model, vocab_src, vocab_tgt, tokenizer, max_len)
        preds.append(" ".join(pred_tokens))
        tgt_tokens = " ".join(tokenize_de(ex["translation"]["de"]))
        refs.append(tgt_tokens)
    bleu = sacrebleu.corpus_bleu(preds, [refs])
    return bleu.score

bleu = calculate_bleu(dataset["test"], model, vocab_src, vocab_tgt, tokenize_en)
print(f"BLEU Score = {bleu:.2f}")

# --------------------------
# Save Model + Vocabs
# --------------------------
torch.save(model.state_dict(), "mt_model.pt")
with open("src_itos.json", "w", encoding="utf-8") as f:
    json.dump(itos_src, f, ensure_ascii=False)
with open("tgt_itos.json", "w", encoding="utf-8") as f:
    json.dump(itos_tgt, f, ensure_ascii=False)

!ls -lh mt_model.pt src_itos.json tgt_itos.json



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.3.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 189, in _run_module_as_main
  File "<frozen runpy>", line 148, in _get_module_details
  File "<frozen runpy>", line 112, in _get_module_details
  File "/usr/local/lib/python3.12/dist-packages/spacy/__init__.py", line 6, in <module>
  File "/usr/local/lib/python3.12/dist-packages/spacy/errors.py", line 3, in <module>
    from .compat import Literal
  File "/usr/local/lib/python3.12/dist-packages/spacy/compat.py", line 4, in <module>
    from thinc.util import copy_array
  File "/usr/local/lib/p

DatasetNotFoundError: Dataset 'multi30k' doesn't exist on the Hub or cannot be accessed.

In [None]:
# ==========================
# Machine Translation with Seq2Seq + Attention
# ==========================

# 1. Install dependencies
!pip install torch==2.0.1 torchtext==0.16.2 spacy datasets --quiet
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm

# 2. Imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from datasets import load_dataset
import spacy
import random
import math
import time

# 3. Load spacy tokenizers
spacy_en = spacy.load("en_core_web_sm")
spacy_de = spacy.load("de_core_news_sm")

def tokenize_en(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [tok.text.lower() for tok in spacy_de.tokenizer(text)]

# 4. Load dataset (English ↔ German Multi30k)
dataset = load_dataset("stas/multi30k", split="train")

SRC, TRG = [], []
for item in dataset[:2000]:  # small sample for speed
    SRC.append(tokenize_en(item["en"]))
    TRG.append(tokenize_de(item["de"]))

# 5. Build vocabulary
from collections import Counter

def build_vocab(tokenized_texts, min_freq=2):
    counter = Counter([token for sent in tokenized_texts for token in sent])
    vocab = {"<pad>":0, "<sos>":1, "<eos>":2, "<unk>":3}
    for token, freq in counter.items():
        if freq >= min_freq and token not in vocab:
            vocab[token] = len(vocab)
    return vocab

src_vocab = build_vocab(SRC)
trg_vocab = build_vocab(TRG)

itos_src = {i:s for s,i in src_vocab.items()}
itos_trg = {i:s for s,i in trg_vocab.items()}

# 6. Seq2Seq Model with Attention
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell, outputs

class Attention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        self.attn = nn.Linear(hid_dim * 2, hid_dim)
        self.v = nn.Linear(hid_dim, 1, bias=False)
    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[0]
        hidden = hidden[-1].unsqueeze(1).repeat(1, src_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs.permute(1,0,2)), dim=2)))
        attention = self.v(energy).squeeze(2)
        return F.softmax(attention, dim=1)

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(hid_dim + emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hid_dim * 2 + emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        self.attention = attention
    def forward(self, input, hidden, cell, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        a = self.attention(hidden, encoder_outputs)
        a = a.unsqueeze(1)
        weighted = torch.bmm(a, encoder_outputs.permute(1,0,2))
        weighted = weighted.permute(1,0,2)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim=2).squeeze(0))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        batch_size = trg.shape[1]
        max_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell, encoder_outputs = self.encoder(src)
        input = trg[0,:]
        for t in range(1, max_len):
            output, hidden, cell = self.decoder(input, hidden, cell, encoder_outputs)
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if random.random() < teacher_forcing_ratio else top1
        return outputs

# 7. Initialize model
INPUT_DIM = len(src_vocab)
OUTPUT_DIM = len(trg_vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

attn = Attention(HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)

optimizer = optim.Adam(model.parameters())
PAD_IDX = trg_vocab["<pad>"]
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# 8. Training function
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(DEVICE), trg.to(DEVICE)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# 9. Collate function + DataLoader
def collate_batch(batch):
    src_batch, trg_batch = [], []
    for src_sample, trg_sample in batch:
        src_tensor = torch.tensor([src_vocab.get(token, src_vocab["<unk>"]) for token in src_sample] + [src_vocab["<eos>"]], dtype=torch.long)
        trg_tensor = torch.tensor([trg_vocab.get(token, trg_vocab["<unk>"]) for token in trg_sample] + [trg_vocab["<eos>"]], dtype=torch.long)
        src_batch.append(src_tensor)
        trg_batch.append(trg_tensor)
    src_batch = pad_sequence(src_batch, padding_value=src_vocab["<pad>"])
    trg_batch = pad_sequence(trg_batch, padding_value=trg_vocab["<pad>"])
    return src_batch, trg_batch

train_data = list(zip(SRC, TRG))
train_iterator = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_batch)

# 10. Train for 1 epoch (increase later)
N_EPOCHS = 1
CLIP = 1
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.3f}")

# 11. Translate function
def translate_sentence(sentence, src_vocab, trg_vocab, model, device, max_len=50):
    model.eval()
    tokens = [tok.text.lower() for tok in spacy_en.tokenizer(sentence)]
    src_tensor = torch.tensor([src_vocab.get(token, src_vocab["<unk>"]) for token in tokens] + [src_vocab["<eos>"]], dtype=torch.long).unsqueeze(1).to(device)
    hidden, cell, encoder_outputs = model.encoder(src_tensor)
    input_token = torch.tensor([trg_vocab["<sos>"]], dtype=torch.long).to(device)
    outputs = []
    for _ in range(max_len):
        output, hidden, cell = model.decoder(input_token, hidden, cell, encoder_outputs)
        pred_token = output.argmax(1).item()
        outputs.append(pred_token)
        if pred_token == trg_vocab["<eos>"]:
            break
        input_token = torch.tensor([pred_token], dtype=torch.long).to(device)
    return [itos_trg[i] for i in outputs]

# 12. Test translation
print(translate_sentence("A man is playing guitar.", src_vocab, trg_vocab, model, DEVICE))

# 13. Save model
torch.save(model.state_dict(), "seq2seq_attention.pth")
print("✅ Model saved as seq2seq_attention.pth")
