### **NEURAL MACHINE TRANSLATION** 
- Translating sentence pairs from french to english
- Using **Seq2Seq** RNNs and **attention** based sequence models

In [None]:
import io
import json
import numpy as np
import pandas as pd
import random
import re
import unicodedata
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import matplotlib.pyplot as plt

# Setting up device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
## Preprocessing Functions

# Unicode normalization
def normalize_unicode(s):
    return ''.join(c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn')

# Preprocess sentences
def preprocess_sentence(s):
    s = normalize_unicode(s)
    s = re.sub(r"([?.!,¿])", r" \1 ", s)
    s = re.sub(r'[" "]+', " ", s)
    s = s.strip()
    return s

# Tag target sentences with <sos> and <eos>
def tag_target_sentences(sentences):
    tagged_sentences = [' '.join(['<sos>', s, '<eos>']) for s in sentences]
    return tagged_sentences

In [None]:
## Tokenizer Class
class Tokenizer:
    def __init__(self, oov_token='<unk>'):
        self.word_index = {}
        self.index_word = {}
        self.oov_token = oov_token
        self.oov_index = 1
        self.word_index[oov_token] = self.oov_index
        self.index_word[self.oov_index] = oov_token
        self.next_index = 2
    
    def fit_on_texts(self, texts):
        for text in texts:
            words = text.split()
            for word in words:
                if word not in self.word_index:
                    self.word_index[word] = self.next_index
                    self.index_word[self.next_index] = word
                    self.next_index += 1
    
    def texts_to_sequences(self, texts):
        sequences = []
        for text in texts:
            words = text.split()
            seq = [self.word_index.get(w, self.oov_index) for w in words]
            sequences.append(seq)
        return sequences
    
    def sequences_to_texts(self, sequences):
        texts = []
        for seq in sequences:
            words = [self.index_word.get(idx, self.oov_token) for idx in seq]
            texts.append(' '.join(words))
        return texts
    
    def to_json(self):
        return {
            'word_index': self.word_index,
            'index_word': {int(k): v for k, v in self.index_word.items()},
            'oov_token': self.oov_token
        }
    
    @staticmethod
    def from_json(config):
        tokenizer = Tokenizer(oov_token=config['oov_token'])
        tokenizer.word_index = config['word_index']
        tokenizer.index_word = {int(k): v for k, v in config['index_word'].items()}
        tokenizer.next_index = max(tokenizer.word_index.values()) + 1
        return tokenizer

# Pad sequences (PyTorch version)
def pad_sequences_pytorch(sequences, max_len, pad_value=0):
    padded = np.zeros((len(sequences), max_len), dtype=np.int32)
    for i, seq in enumerate(sequences):
        length = min(len(seq), max_len)
        padded[i, :length] = seq[:length]
    return padded

def generate_decoder_inputs_targets(sentences, tokenizer):
    seqs = tokenizer.texts_to_sequences(sentences)
    decoder_inputs = [s[:-1] for s in seqs]
    decoder_targets = [s[1:] for s in seqs]
    return decoder_inputs, decoder_targets

In [None]:
## Data Loading and Preprocessing

# Load training data
with open('train_sentence_pairs.txt') as file:
    train = [line.rstrip() for line in file]

print(f"Training examples: {len(train)}")
print("Sample:", train[:1])

# Separate input (French) and target (English)
SEPARATOR = '<sep>'
train_input, train_target = map(list, zip(*[pair.split(SEPARATOR) for pair in train]))

# Preprocess sentences
train_preprocessed_input = [preprocess_sentence(s) for s in train_input]
train_preprocessed_target = [preprocess_sentence(s) for s in train_target]

# Tag target sentences
train_tagged_preprocessed_target = tag_target_sentences(train_preprocessed_target)

# Create tokenizers
source_tokenizer = Tokenizer(oov_token='<unk>')
source_tokenizer.fit_on_texts(train_preprocessed_input)
source_vocab_size = len(source_tokenizer.word_index) + 1

target_tokenizer = Tokenizer(oov_token='<unk>')
target_tokenizer.fit_on_texts(train_tagged_preprocessed_target)
target_vocab_size = len(target_tokenizer.word_index) + 1

print(f"Source vocab size: {source_vocab_size}")
print(f"Target vocab size: {target_vocab_size}")

# Vectorize sequences
train_encoder_inputs = source_tokenizer.texts_to_sequences(train_preprocessed_input)
train_decoder_inputs, train_decoder_targets = generate_decoder_inputs_targets(
    train_tagged_preprocessed_target, target_tokenizer)

# Pad sequences
max_encoding_len = len(max(train_encoder_inputs, key=len))
max_decoding_len = len(max(train_decoder_inputs, key=len))

padded_train_encoder_inputs = pad_sequences_pytorch(train_encoder_inputs, max_encoding_len)
padded_train_decoder_inputs = pad_sequences_pytorch(train_decoder_inputs, max_decoding_len)
padded_train_decoder_targets = pad_sequences_pytorch(train_decoder_targets, max_decoding_len)

print(f"Max encoding length: {max_encoding_len}")
print(f"Max decoding length: {max_decoding_len}")

In [None]:
## Validation Data Processing

# Load validation data
with open('val_sentence_pairs.txt') as file:
    val = [line.rstrip() for line in file]

def process_dataset(dataset):
    input_data, output_data = map(list, zip(*[pair.split(SEPARATOR) for pair in dataset]))
    preprocessed_input = [preprocess_sentence(s) for s in input_data]
    preprocessed_output = [preprocess_sentence(s) for s in output_data]
    tagged_preprocessed_output = tag_target_sentences(preprocessed_output)
    
    encoder_inputs = source_tokenizer.texts_to_sequences(preprocessed_input)
    decoder_inputs, decoder_targets = generate_decoder_inputs_targets(
        tagged_preprocessed_output, target_tokenizer)
    
    padded_encoder_inputs = pad_sequences_pytorch(encoder_inputs, max_encoding_len)
    padded_decoder_inputs = pad_sequences_pytorch(decoder_inputs, max_decoding_len)
    padded_decoder_targets = pad_sequences_pytorch(decoder_targets, max_decoding_len)
    
    return padded_encoder_inputs, padded_decoder_inputs, padded_decoder_targets

padded_val_encoder_inputs, padded_val_decoder_inputs, padded_val_decoder_targets = process_dataset(val)

**RECCURENCE BASED Seq2Seq MODELS**

In [None]:
## PyTorch Models - No Attention

class EncoderNoAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.2):
        super(EncoderNoAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=dropout)
    
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

class DecoderNoAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.2):
        super(DecoderNoAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=dropout)
        self.dense = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x, hidden, cell):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        logits = self.dense(outputs)
        return logits, hidden, cell

class Seq2SeqNoAttention(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2SeqNoAttention, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src, tgt):
        encoder_outputs, hidden, cell = self.encoder(src)
        logits, _, _ = self.decoder(tgt, hidden, cell)
        return logits

In [None]:
## PyTorch Models - With Luong Attention

class LuongAttention(nn.Module):
    def __init__(self, hidden_dim):
        super(LuongAttention, self).__init__()
        self.w = nn.Linear(hidden_dim, hidden_dim)
    
    def forward(self, encoder_outputs, decoder_output):
        # encoder_outputs: (batch, seq_len, hidden_dim)
        # decoder_output: (batch, 1, hidden_dim)
        z = self.w(encoder_outputs)  # (batch, seq_len, hidden_dim)
        scores = torch.bmm(decoder_output, z.transpose(1, 2))  # (batch, 1, seq_len)
        weights = torch.softmax(scores, dim=-1)
        context = torch.bmm(weights, encoder_outputs)  # (batch, 1, hidden_dim)
        return weights, context

class EncoderWithAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.2):
        super(EncoderWithAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=dropout, return_sequences=True)
    
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, hidden, cell

class DecoderWithAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.2):
        super(DecoderWithAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=dropout)
        self.attention = LuongAttention(hidden_dim)
        self.w_attention = nn.Linear(hidden_dim * 2, hidden_dim)
        self.w_attention.activation = nn.Tanh()
        self.dense = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, x, encoder_outputs, hidden, cell):
        embedded = self.embedding(x)
        decoder_output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        weights, context = self.attention(encoder_outputs, decoder_output)
        combined = torch.cat([context, decoder_output], dim=-1)
        attended = torch.tanh(self.w_attention(combined))
        logits = self.dense(attended)
        return logits, hidden, cell, weights

class Seq2SeqWithAttention(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2SeqWithAttention, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src, tgt, encoder_outputs):
        decoder_logits_list = []
        hidden, cell = None, None
        
        for t in range(tgt.size(1)):
            decoder_input = tgt[:, t:t+1]
            logits, hidden, cell, _ = self.decoder(decoder_input, encoder_outputs, hidden, cell)
            decoder_logits_list.append(logits)
        
        return torch.cat(decoder_logits_list, dim=1)

In [None]:
## Training Setup

# Hyperparameters
embedding_dim = 128
hidden_dim = 256
dropout = 0.2
batch_size = 32
epochs = 30
learning_rate = 0.001

# Convert to tensors
train_encoder_tensor = torch.LongTensor(padded_train_encoder_inputs)
train_decoder_input_tensor = torch.LongTensor(padded_train_decoder_inputs)
train_decoder_target_tensor = torch.LongTensor(padded_train_decoder_targets)

val_encoder_tensor = torch.LongTensor(padded_val_encoder_inputs)
val_decoder_input_tensor = torch.LongTensor(padded_val_decoder_inputs)
val_decoder_target_tensor = torch.LongTensor(padded_val_decoder_targets)

# Create datasets
train_dataset = TensorDataset(train_encoder_tensor, train_decoder_input_tensor, train_decoder_target_tensor)
val_dataset = TensorDataset(val_encoder_tensor, val_decoder_input_tensor, val_decoder_target_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Custom loss function with masking
class MaskedCrossEntropyLoss(nn.Module):
    def __init__(self):
        super(MaskedCrossEntropyLoss, self).__init__()
        self.ce_loss = nn.CrossEntropyLoss(reduction='none')
    
    def forward(self, logits, targets):
        # logits: (batch, seq_len, vocab_size)
        # targets: (batch, seq_len)
        batch_size, seq_len, vocab_size = logits.size()
        
        logits_flat = logits.reshape(-1, vocab_size)
        targets_flat = targets.reshape(-1)
        
        loss = self.ce_loss(logits_flat, targets_flat)
        mask = (targets_flat != 0).float()
        masked_loss = (loss * mask).sum() / (mask.sum() + 1e-8)
        
        return masked_loss

In [None]:
## Training Function - No Attention

def train_no_attention(epochs):
    encoder = EncoderNoAttention(source_vocab_size, embedding_dim, hidden_dim, dropout).to(device)
    decoder = DecoderNoAttention(target_vocab_size, embedding_dim, hidden_dim, dropout).to(device)
    model = Seq2SeqNoAttention(encoder, decoder).to(device)
    
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = MaskedCrossEntropyLoss()
    
    train_losses, val_losses = [], []
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        
        for enc_input, dec_input, dec_target in train_loader:
            enc_input = enc_input.to(device)
            dec_input = dec_input.to(device)
            dec_target = dec_target.to(device)
            
            optimizer.zero_grad()
            logits = model(enc_input, dec_input)
            loss = criterion(logits, dec_target)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        train_losses.append(train_loss)
        
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for enc_input, dec_input, dec_target in val_loader:
                enc_input = enc_input.to(device)
                dec_input = dec_input.to(device)
                dec_target = dec_target.to(device)
                
                logits = model(enc_input, dec_input)
                loss = criterion(logits, dec_target)
                val_loss += loss.item()
        
        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    
    return model, train_losses, val_losses

In [None]:
## Training Function - With Attention

def train_with_attention(epochs):
    encoder = EncoderWithAttention(source_vocab_size, embedding_dim, hidden_dim, dropout).to(device)
    decoder = DecoderWithAttention(target_vocab_size, embedding_dim, hidden_dim, dropout).to(device)
    model = Seq2SeqWithAttention(encoder, decoder).to(device)
    
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = MaskedCrossEntropyLoss()
    
    train_losses, val_losses = [], []
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        
        for enc_input, dec_input, dec_target in train_loader:
            enc_input = enc_input.to(device)
            dec_input = dec_input.to(device)
            dec_target = dec_target.to(device)
            
            optimizer.zero_grad()
            encoder_outputs, _, _ = model.encoder(enc_input)
            logits = model(enc_input, dec_input, encoder_outputs)
            loss = criterion(logits, dec_target)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        train_losses.append(train_loss)
        
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for enc_input, dec_input, dec_target in val_loader:
                enc_input = enc_input.to(device)
                dec_input = dec_input.to(device)
                dec_target = dec_target.to(device)
                
                encoder_outputs, _, _ = model.encoder(enc_input)
                logits = model(enc_input, dec_input, encoder_outputs)
                loss = criterion(logits, dec_target)
                val_loss += loss.item()
        
        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    
    return model, train_losses, val_losses

In [None]:
## Training Execution (Uncomment traininge ke time krna hai)

# print("Training model without attention...")
# model_no_attn, train_losses_no_attn, val_losses_no_attn = train_no_attention(epochs)

# print("Training model with attention...")
# model_attn, train_losses_attn, val_losses_attn = train_with_attention(epochs)

In [None]:
## Inference Functions

def translate_no_attention(sentence, encoder, decoder, source_tokenizer, target_tokenizer, max_len=30):
    encoder.eval()
    decoder.eval()
    
    input_seq = source_tokenizer.texts_to_sequences([sentence])
    input_padded = torch.LongTensor(pad_sequences_pytorch(input_seq, max_encoding_len)).to(device)
    
    with torch.no_grad():
        _, hidden, cell = encoder(input_padded)
    
    decoded = []
    current_token = '<sos>'
    
    for _ in range(max_len):
        token_idx = target_tokenizer.word_index.get(current_token, 1)
        decoder_input = torch.LongTensor([[token_idx]]).to(device)
        
        with torch.no_grad():
            logits, hidden, cell = decoder(decoder_input, hidden, cell)
        
        token_idx = torch.argmax(logits[0, -1, :]).item()
        current_token = target_tokenizer.index_word.get(token_idx, '<unk>')
        
        if current_token == '<eos>':
            break
        
        decoded.append(current_token)
    
    return ' '.join(decoded)

def translate_with_attention(sentence, encoder, decoder, source_tokenizer, target_tokenizer, max_len=30):
    encoder.eval()
    decoder.eval()
    
    input_seq = source_tokenizer.texts_to_sequences([sentence])
    input_padded = torch.LongTensor(pad_sequences_pytorch(input_seq, max_encoding_len)).to(device)
    
    with torch.no_grad():
        encoder_outputs, hidden, cell = encoder(input_padded)
    
    decoded = []
    current_token = '<sos>'
    
    for _ in range(max_len):
        token_idx = target_tokenizer.word_index.get(current_token, 1)
        decoder_input = torch.LongTensor([[token_idx]]).to(device)
        
        with torch.no_grad():
            logits, hidden, cell, _ = decoder(decoder_input, encoder_outputs, hidden, cell)
        
        token_idx = torch.argmax(logits[0, -1, :]).item()
        current_token = target_tokenizer.index_word.get(token_idx, '<unk>')
        
        if current_token == '<eos>':
            break
        
        decoded.append(current_token)
    
    return ' '.join(decoded)