<a href="https://colab.research.google.com/github/ShraddhaSharma24/Natural-Language-Processing/blob/main/Machine_translation_using_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from torch.nn import Transformer
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import math
import random
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



Dummy Dataset

In [2]:
data = [
    ("i am a student", "je suis un étudiant"),
    ("how are you", "comment ça va"),
    ("hello", "bonjour"),
    ("thank you", "merci"),
    ("i love you", "je t'aime"),
    ("good night", "bonne nuit"),
]


In [3]:
from collections import defaultdict

# Basic whitespace tokenizer
def tokenize(sentence):
    return sentence.lower().split()

# Build vocabulary
def build_vocab(sentences):
    vocab = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
    idx = 4
    for sentence in sentences:
        for token in tokenize(sentence):
            if token not in vocab:
                vocab[token] = idx
                idx += 1
    return vocab

# Prepare source and target sentences
src_sentences = [pair[0] for pair in data]
trg_sentences = [pair[1] for pair in data]

SRC_vocab = build_vocab(src_sentences)
TRG_vocab = build_vocab(trg_sentences)

# Reverse mapping
SRC_itos = {i: s for s, i in SRC_vocab.items()}
TRG_itos = {i: s for s, i in TRG_vocab.items()}


In [4]:
def encode(sentence, vocab, max_len=10):
    tokens = tokenize(sentence)
    token_ids = [vocab.get(tok, vocab["<unk>"]) for tok in tokens]
    token_ids = [vocab["<sos>"]] + token_ids + [vocab["<eos>"]]
    token_ids = token_ids[:max_len] + [vocab["<pad>"]] * (max_len - len(token_ids))
    return torch.tensor(token_ids, dtype=torch.long)


In [5]:
class TranslationDataset(Dataset):
    def __init__(self, data, src_vocab, trg_vocab):
        self.data = data
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, trg = self.data[idx]
        src_tensor = encode(src, self.src_vocab)
        trg_tensor = encode(trg, self.trg_vocab)
        return src_tensor, trg_tensor

dataset = TranslationDataset(data, SRC_vocab, TRG_vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)


In [6]:
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, emb_size, maxlen=100, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(maxlen, emb_size)
        position = torch.arange(0, maxlen).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, emb_size, 2) * (-math.log(10000.0) / emb_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # Shape: (1, maxlen, emb_size)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class TransformerSeq2Seq(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, emb_size, nhead, hidden_dim, num_layers):
        super().__init__()
        self.src_emb = nn.Embedding(src_vocab_size, emb_size)
        self.trg_emb = nn.Embedding(trg_vocab_size, emb_size)
        self.pos_enc = PositionalEncoding(emb_size)

        self.transformer = nn.Transformer(
            d_model=emb_size,
            nhead=nhead,
            num_encoder_layers=num_layers,
            num_decoder_layers=num_layers,
            dim_feedforward=hidden_dim
        )
        self.fc_out = nn.Linear(emb_size, trg_vocab_size)

    def forward(self, src, trg):
        # Shape: (batch, seq_len) -> (seq_len, batch, emb_size)
        src = self.pos_enc(self.src_emb(src)).permute(1, 0, 2)
        trg = self.pos_enc(self.trg_emb(trg)).permute(1, 0, 2)

        src_mask = None  # optional: add masking here
        trg_mask = self.transformer.generate_square_subsequent_mask(trg.size(0)).to(trg.device)

        output = self.transformer(src, trg, tgt_mask=trg_mask, src_mask=src_mask)
        return self.fc_out(output).permute(1, 0, 2)  # (seq_len, batch, vocab) -> (batch, seq_len, vocab)


In [7]:
SRC_VOCAB_SIZE = len(SRC_vocab)
TRG_VOCAB_SIZE = len(TRG_vocab)
EMB_SIZE = 128
NHEAD = 4
HIDDEN_DIM = 512
NUM_LAYERS = 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = TransformerSeq2Seq(SRC_VOCAB_SIZE, TRG_VOCAB_SIZE, EMB_SIZE, NHEAD, HIDDEN_DIM, NUM_LAYERS).to(device)




In [10]:
import torch.nn as nn
import torch.optim as optim

# Ignore padding token in the loss
TRG_PAD_IDX = TRG_vocab['<pad>']


criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

# Adam optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0005)


In [11]:
import torch

def train(model, dataloader, optimizer, criterion, trg_pad_idx, device):
    model.train()
    epoch_loss = 0

    for src, trg in dataloader:
        src = src.to(device)  # shape: [src_len, batch_size]
        trg = trg.to(device)  # shape: [trg_len, batch_size]

        optimizer.zero_grad()

        # Output shape: [trg_len, batch_size, output_dim]
        output = model(src, trg[:-1, :])  # input all tokens except <eos> (for training)

        # Reshape to [trg_len * batch_size, output_dim]
        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)
        trg = trg[1:, :].reshape(-1)  # shift target by 1 (to predict next token)

        loss = criterion(output, trg)
        loss.backward()

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


In [12]:
def evaluate(model, dataloader, criterion, trg_pad_idx, device):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, trg in dataloader:
            src = src.to(device)  # [src_len, batch_size]
            trg = trg.to(device)  # [trg_len, batch_size]

            output = model(src, trg[:-1, :])  # no teacher forcing

            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)
            trg = trg[1:, :].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


In [1]:
!pip install nltk





In [2]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
nltk.download('punkt')


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [3]:
def translate_sentence(model, sentence, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer, device, max_len=50):
    model.eval()

    tokens = [tok.lower() for tok in src_tokenizer(sentence)]
    tokens = ['<sos>'] + tokens + ['<eos>']

    src_indexes = [src_vocab[token] for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    with torch.no_grad():
        enc_src = model.transformer.encoder(model.src_tok_emb(src_tensor) * math.sqrt(model.d_model))

    trg_indexes = [trg_vocab['<sos>']]

    for i in range(max_len):
        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(1).to(device)

        with torch.no_grad():
            output = model(src_tensor, trg_tensor)
            pred_token = output.argmax(2)[-1, :].item()

        trg_indexes.append(pred_token)

        if pred_token == trg_vocab['<eos>']:
            break

    trg_tokens = [list(trg_vocab.keys())[list(trg_vocab.values()).index(idx)] for idx in trg_indexes]
    return trg_tokens[1:-1]  # remove <sos> and <eos>


In [5]:
def calculate_bleu(data, model, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer, device):
    smoothie = SmoothingFunction().method4
    score = 0
    total = 0

    for src_sentence, trg_sentence in data:
        pred_trg = translate_sentence(model, src_sentence, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer, device)
        ref = [trg_tokenizer(trg_sentence)]

        score += sentence_bleu(ref, pred_trg, smoothing_function=smoothie)
        total += 1

    return score / total


In [6]:
def simple_tokenizer(text):
    return text.lower().strip().split()


In [7]:
SRC_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, 'i': 3, 'am': 4, 'a': 5, 'student': 6}
TRG_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, 'je': 3, 'suis': 4, 'un': 5, 'étudiant': 6}

SRC_vocab_inv = {v: k for k, v in SRC_vocab.items()}
TRG_vocab_inv = {v: k for k, v in TRG_vocab.items()}


In [8]:
test_data = [
    ("I am a student", "Je suis un étudiant")
]


In [10]:
bleu_score = calculate_bleu(
    test_data,
    model=model,  # your trained model
    src_vocab=SRC_vocab,
    trg_vocab=TRG_vocab,
    src_tokenizer=simple_tokenizer,
    trg_tokenizer=simple_tokenizer,
    device=device
)

print(f"BLEU Score: {bleu_score:.4f}")


NameError: name 'model' is not defined