In [25]:

import random
from datasets import load_dataset
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
import re
from collections import defaultdict
import torch.nn.functional as F
from nltk.translate.bleu_score import SmoothingFunction

# Custom tokenizer functions
def simple_arabic_tokenizer(text):
    # Basic Arabic tokenizer that splits on whitespace and punctuation
    text = re.sub(r'[\u0617-\u061A\u064B-\u0652]', '', text)  # Replace punctuation with space
    return text.split()

def simple_english_tokenizer(text):
    # Basic English tokenizer
    text=text.lower()
    text = re.sub(r'[^\w\s]', ' ', text)  # Replace punctuation with space
    return text.split()

# Load dataset with 10,000 samples
dataset = load_dataset("opus100", "ar-en")
train_data = dataset['train'].shuffle(seed=42).select(range(50000))
valid_data = dataset['validation'].shuffle(seed=42).select(range(2000))
test_data = dataset['test'].shuffle(seed=42).select(range(2000))

# Build vocabulary
# Modified build_vocab function with vocabulary size limit
def build_vocab(data, tokenizer_fn, language, max_vocab_size=10000):
    vocab = defaultdict(int)
    special_tokens = ['<pad>', '<unk>', '<sos>', '<eos>']
    
    for item in data:
        tokens = tokenizer_fn(item['translation'][language])
        for token in tokens:
            vocab[token] += 1
    
    # Sort by frequency and limit vocabulary size
    sorted_vocab = sorted(vocab.items(), key=lambda x: x[1], reverse=True)
    
    # Take top (max_vocab_size - len(special_tokens)) most frequent tokens
    sorted_vocab = sorted_vocab[:max_vocab_size - len(special_tokens)]
    
    word_to_idx = {word: idx+len(special_tokens) for idx, (word, count) in enumerate(sorted_vocab)}
    
    # Add special tokens
    for idx, token in enumerate(special_tokens):
        word_to_idx[token] = idx
    
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    
    return word_to_idx, idx_to_word

# Build vocabularies with size limit
ar_word_to_idx, ar_idx_to_word = build_vocab(train_data, simple_arabic_tokenizer, 'ar', max_vocab_size=10000)
en_word_to_idx, en_idx_to_word = build_vocab(train_data, simple_english_tokenizer, 'en', max_vocab_size=10000)

print(f"Arabic vocab size: {len(ar_word_to_idx)} (Limited to 10,000)")
print(f"English vocab size: {len(en_word_to_idx)} (Limited to 10,000)")

# Dataset class
class TranslationDataset(Dataset):
    def __init__(self, data, max_length=50):
        self.data = data
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]['translation']
        arabic_text = item['ar']
        english_text = item['en']

        # Tokenize
        arabic_tokens = simple_arabic_tokenizer(arabic_text)+['<eos>']
        english_tokens = ['<sos>'] + simple_english_tokenizer(english_text) + ['<eos>']

        # Convert to indices with unknown handling
        arabic_indices = [ar_word_to_idx.get(token, ar_word_to_idx['<unk>']) for token in arabic_tokens]
        english_indices = [en_word_to_idx.get(token, en_word_to_idx['<unk>']) for token in english_tokens]

        # Pad or truncate
        def pad_or_truncate(sequence, pad_idx):
            if len(sequence) > self.max_length:
                return sequence[:self.max_length]
            return sequence + [pad_idx] * (self.max_length - len(sequence))

        arabic_indices = pad_or_truncate(arabic_indices, ar_word_to_idx['<pad>'])
        english_indices = pad_or_truncate(english_indices, en_word_to_idx['<pad>'])

        return {
            'arabic': torch.tensor(arabic_indices, dtype=torch.long),
            'english': torch.tensor(english_indices, dtype=torch.long)
        }

# Create dataloaders
BATCH_SIZE = 32
train_dataset = TranslationDataset(train_data)
valid_dataset = TranslationDataset(valid_data)
test_dataset = TranslationDataset(test_data)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

Arabic vocab size: 10000 (Limited to 10,000)
English vocab size: 10000 (Limited to 10,000)


In [26]:
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.W1 = nn.Linear(hidden_dim * 2, hidden_dim)  # For encoder outputs
        self.W2 = nn.Linear(hidden_dim * 2, hidden_dim)  # For decoder hidden state
        self.V = nn.Linear(hidden_dim, 1)               # For attention scores

    def forward(self, decoder_hidden, encoder_outputs):
        # decoder_hidden: (batch_size, hidden_dim*2)
        # encoder_outputs: (seq_len, batch_size, hidden_dim*2)

        seq_len, batch_size, hidden_dim = encoder_outputs.shape #(50,32,600)
        
        # Expand decoder hidden state to match encoder outputs
        decoder_hidden = decoder_hidden.unsqueeze(0) #(32,600) -> (1,32,600)
        
        # Compute attention scores
        energy = torch.tanh(self.W1(encoder_outputs) + self.W2(decoder_hidden))  # (seq_len, batch_size, hidden_dim)
        attention_scores = self.V(energy).squeeze(2)  # (seq_len, batch_size)
        
        # Compute attention weights
        attention_weights = F.softmax(attention_scores, dim=0)  # (seq_len, batch_size)

        # Compute context vector as weighted sum of encoder outputs
        context = torch.sum(encoder_outputs * attention_weights.unsqueeze(2), dim=0)  # (batch_size, hidden_dim*2)
        
        return context, attention_weights

In [27]:
class Seq2SeqWithAttention(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim=256, hidden_dim=512, 
                 n_layers=2, dropout=0.3):
        super().__init__()
        
        # Encoder (bidirectional)
        self.encoder_embedding = nn.Embedding(input_dim, emb_dim, padding_idx=0)
        self.encoder = nn.LSTM(emb_dim, hidden_dim, n_layers,
                              dropout=dropout if n_layers > 1 else 0,
                              bidirectional=True)
        
        # Decoder components
        self.decoder_embedding = nn.Embedding(output_dim, emb_dim, padding_idx=0)
        self.attention = BahdanauAttention(hidden_dim)
        self.decoder = nn.LSTM(emb_dim + hidden_dim*2, hidden_dim*2, n_layers,
                              dropout=dropout if n_layers > 1 else 0)
        
        # Output layers
        self.fc_out = nn.Linear(hidden_dim*4, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.fc_out.out_features
        
        # Initialize outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(src.device)
        
        # Encoder forward pass
        embedded = self.dropout(self.encoder_embedding(src))
        encoder_outputs, (hidden, cell) = self.encoder(embedded)
        
        # Reshape encoder final states (bidirectional to unidirectional)
        hidden = self._reshape_hidden(hidden)
        cell = self._reshape_hidden(cell)
        
        # First decoder input is <sos> token
        input = trg[0,:]
        
        for t in range(1, trg_len):
            # Decoder forward pass with attention
            embedded = self.dropout(self.decoder_embedding(input.unsqueeze(0)))
            
            # Calculate attention context
            context, _ = self.attention(hidden[-1], encoder_outputs) # (32,600)  , (32,50,600)
            context = context.unsqueeze(0)
            # Combine embedded input with context
            
            rnn_input = torch.cat((embedded, context), dim=2)
            
            # Decoder step
            output, (hidden, cell) = self.decoder(rnn_input, (hidden, cell))
            
            # Final prediction
            output = torch.cat((output, context), dim=2)
            pred = self.fc_out(output.squeeze(0))
            outputs[t] = pred
            
            # Teacher forcing
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = pred.argmax(1)
            input = trg[t] if teacher_force else top1
        
        return outputs
    
    def _reshape_hidden(self, hidden):
        """Convert bidirectional LSTM outputs to decoder-compatible format"""
        hidden = hidden.view(self.encoder.num_layers, 2, -1, self.encoder.hidden_size)
        return torch.cat((hidden[:, 0, :, :], hidden[:, 1, :, :]), dim=2)

In [37]:

INPUT_DIM = len(ar_word_to_idx)
OUTPUT_DIM = len(en_word_to_idx)
ENC_EMB_DIM = 256 
DEC_EMB_DIM = 256  
HID_DIM = 300      
N_LAYERS = 1      
ENC_DROPOUT = 0.7  
DEC_DROPOUT = 0.7  

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = Seq2SeqWithAttention(
    INPUT_DIM, OUTPUT_DIM, ENC_EMB_DIM, HID_DIM,
    N_LAYERS, ENC_DROPOUT
).to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)  
criterion = nn.CrossEntropyLoss(ignore_index=en_word_to_idx['<pad>'])

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 22,329,301 trainable parameters


In [38]:
def train(model, loader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for batch in loader:
        src = batch['arabic'].permute(1, 0).to(device)
        trg = batch['english'].permute(1, 0).to(device)
        
        optimizer.zero_grad()
        output = model(src, trg[:-1,:])

        output_dim = output.shape[-1]
        output = output.reshape(-1, output_dim)  # Changed view to reshape
        trg = trg[1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(loader)

def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for batch in loader:
            src = batch['arabic'].permute(1, 0).to(device)
            trg = batch['english'].permute(1, 0).to(device)

            output = model(src, trg[:-1,:])

            output_dim = output.shape[-1]
            output = output.reshape(-1, output_dim)  # Changed view to reshape
            trg = trg[1:].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(loader)

N_EPOCHS = 10  # Reduced training time
CLIP = 1

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_loader, criterion)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model_small.pt')

    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f}')

Epoch: 01
	Train Loss: 5.745
	 Val. Loss: 5.329
Epoch: 02
	Train Loss: 5.261
	 Val. Loss: 5.168
Epoch: 03
	Train Loss: 5.024
	 Val. Loss: 5.104
Epoch: 04
	Train Loss: 4.853
	 Val. Loss: 5.109
Epoch: 05
	Train Loss: 4.722
	 Val. Loss: 5.088
Epoch: 06
	Train Loss: 4.605
	 Val. Loss: 5.112
Epoch: 07
	Train Loss: 4.505
	 Val. Loss: 5.069
Epoch: 08
	Train Loss: 4.417
	 Val. Loss: 5.164
Epoch: 09
	Train Loss: 4.331
	 Val. Loss: 5.166
Epoch: 10
	Train Loss: 4.264
	 Val. Loss: 5.169


In [39]:
def translate_sentence(sentence, model, max_length=50):
    model.eval()
    
    # Tokenize with preprocessing
    tokens = simple_arabic_tokenizer(sentence) +['<eos>']
    
    # Convert to indices with unknown handling
    src_indices = [ar_word_to_idx.get(token, ar_word_to_idx['<unk>']) for token in tokens]
    
    # Create tensor and move to device (add <sos> and <eos>)
    src_indices = src_indices 
    src_tensor = torch.LongTensor(src_indices).unsqueeze(1).to(device)  # [seq_len, 1]
    
    # Encoder forward pass
    with torch.no_grad():
        embedded = model.dropout(model.encoder_embedding(src_tensor))
        encoder_outputs, (hidden, cell) = model.encoder(embedded)
        
        # Reshape hidden states for decoder
        hidden = model._reshape_hidden(hidden)
        cell = model._reshape_hidden(cell)
    
    # Initialize target with <sos>
    trg_indices = [en_word_to_idx['<sos>']]
    
    # Decoding loop with attention
    for _ in range(max_length):
        trg_tensor = torch.LongTensor([trg_indices[-1]]).to(device)  # [1]
        
        with torch.no_grad():
            # Get decoder embeddings
            embedded = model.dropout(model.decoder_embedding(trg_tensor.unsqueeze(0)))
            
            # Calculate attention context
            context, _ = model.attention(hidden[-1], encoder_outputs)
            context = context.unsqueeze(0)
            # Combine embedded input with context
            rnn_input = torch.cat((embedded, context), dim=2)
            
            # Decoder step
            output, (hidden, cell) = model.decoder(rnn_input, (hidden, cell))
            
            # Combine output with context
            output = torch.cat((output, context), dim=2)
            
            # Predict next token
            prediction = model.fc_out(output.squeeze(0))
        
        # Get predicted token
        pred_token = prediction.argmax(1).item()
        trg_indices.append(pred_token)
        
        # Stop if <eos> is generated
        if pred_token == en_word_to_idx['<eos>']:
            break
    
    # Convert indices to tokens
    trg_tokens = [en_idx_to_word[i] for i in trg_indices]
    
    # Join tokens into sentence (use moses detokenizer for better results)
    return ' '.join(trg_tokens[1:-1])  # Remove <sos> and <eos>

In [40]:
# Load the best model
model.load_state_dict(torch.load('best_model_small.pt'))

# Test on some sentences
test_sentences = [
    "أخرج من هنا الأن",
    "أنا أحب التعلم الآلي",
    "ما هو اسمك",
    "هل يمكنك مساعدتي",
    "شكرا لك"
]

for sent in test_sentences:
    translation = translate_sentence(sent, model)
    print(f"Arabic: {sent}")
    print(f"English: {translation}")
    print()

Arabic: أخرج من هنا الأن
English: out of here now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now now

Arabic: أنا أحب التعلم الآلي
English: i love like a doctor

Arabic: ما هو اسمك
English: what is he matter

Arabic: هل يمكنك مساعدتي
English: can you help me

Arabic: شكرا لك
English: thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank you thank



In [41]:
def evaluate_test_set(model, test_loader, max_length=50):
    model.eval()
    total = 0
    correct = 0
    all_references = []
    all_hypotheses = []
    smoothing = SmoothingFunction()
    
    with torch.no_grad():
        for batch in test_loader:
            src = batch['arabic'].to(device)
            trg = batch['english'].to(device)
            
            # Get batch translations
            batch_translations = []
            for i in range(src.size(1)):
                # Convert tensor to tokens
                src_tokens = [ar_idx_to_word[idx.item()] for idx in src[:,i] if idx.item() != ar_word_to_idx['<pad>']]
                src_sentence = ' '.join(src_tokens[1:-1])  # Remove <sos> and <eos>
                
                # Get translation
                if not src_sentence.strip():
                    continue
                translation = translate_sentence(src_sentence, model, max_length)
                batch_translations.append(translation)
                
                # Get reference
                ref_tokens = [en_idx_to_word[idx.item()] for idx in trg[:,i] if idx.item() != en_word_to_idx['<pad>']]
                reference = ' '.join(ref_tokens[1:-1])
                
                
                # For word-level accuracy
                pred_words = set(translation.split())
                true_words = set(reference.split())
                correct += len(pred_words & true_words)
                total += len(true_words)
    
    # Calculate metrics
    word_accuracy = correct / total if total > 0 else 0
    
   
    print("\nEvaluation Results:")
    print(f"test Accuracy: {word_accuracy:.2%}")
    print("\nSample Translations:")
    for i, (ref, hyp) in enumerate(zip(
        [' '.join(ref[0]) for ref in all_references[:5]],
        [' '.join(hyp) for hyp in all_hypotheses[:5]]
    )):
        print(f"\nSample {i+1}:")
        print(f"Reference: {ref}")
        print(f"Generated: {hyp}")


In [42]:
test_metrics = evaluate_test_set(model, test_loader)


Evaluation Results:
test Accuracy: 8.53%

Sample Translations:


In [43]:
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
import numpy as np

def calculate_bleu(dataset, model, max_length=50, num_samples=None):
    """
    Calculate BLEU score for model translations
    
    Args:
        dataset: Dataset containing source and reference translations
        model: Your Seq2SeqWithAttention model
        max_length: Maximum length for generated translations
        num_samples: Number of samples to evaluate (None for all)
    """
    references = []
    hypotheses = []
    smoothie = SmoothingFunction().method4  # Smoothing for short sentences
    
    if num_samples is None:
        num_samples = len(dataset)
    
    for i in range(min(num_samples, len(dataset))):
        # Get source and reference
        item = dataset[i]['translation']
        src_text = item['ar']
        ref_text = item['en']
        
        # Tokenize reference
        ref_tokens = simple_english_tokenizer(ref_text)
        references.append([ref_tokens])  # Note: wrapped in list for multiple references
        
        # Generate translation
        hyp_text = translate_sentence(src_text, model, max_length)
        hyp_tokens = simple_english_tokenizer(hyp_text)
        hypotheses.append(hyp_tokens)
        
        # Print sample translations
        if i < 3:  # Print first 3 examples
            print(f"\nSample {i+1}:")
            print(f"Arabic:    {src_text}")
            print(f"Reference: {ref_text}")
            print(f"Generated: {hyp_text}")
    
    # Calculate BLEU scores
    bleu1 = corpus_bleu(references, hypotheses, weights=(1, 0, 0, 0), smoothing_function=smoothie)
    bleu2 = corpus_bleu(references, hypotheses, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothie)
    bleu4 = corpus_bleu(references, hypotheses, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothie)
    
    print("\nBLEU Scores:")
    print(f"BLEU-1: {bleu1*100:.2f}")
    print(f"BLEU-2: {bleu2*100:.2f}")
    print(f"BLEU-4: {bleu4*100:.2f}")
    
    return {"bleu1": bleu1, "bleu2": bleu2, "bleu4": bleu4}

In [44]:
# Calculate BLEU on validation set (first 100 samples for quick evaluation)
bleu_scores = calculate_bleu(test_data, model, num_samples=100)



Sample 1:
Arabic:    أجل.
Reference: Yeah.
Generated: 

Sample 2:
Arabic:    وإذا لم تكن هى؟
Reference: What if it's not her?
Generated: if if not not not yet

Sample 3:
Arabic:    وإن ضمان الأمن ومصداقيته في كابل وغيرها من المراكز لهو ذو أهمية قصوى.
Reference: Ensuring credible security in Kabul and other centres is of paramount importance.
Generated: security security security security security security security security security and the <unk> of of the <unk> of of the

BLEU Scores:
BLEU-1: 12.49
BLEU-2: 5.97
BLEU-4: 1.07
