In [None]:
import sentencepiece as spm
import pandas as pd
from datasets import load_dataset
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders, processors
import re
import sentencepiece as spm
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn as nn
import pandas as pd

In [None]:
data = pd.read_csv('tatoeba-dev.ara-eng.tsv', sep='\t', encoding='utf-8', header=None, names=['Arabic', 'English'])


                                                    Arabic  \
acm eng  اذا الواحد يستخدم الفلوس بحمكة يكدر يسوي كومة ...   
    eng                                 عمرك رايح المكسيك؟   
    eng                       فكرنا انه طبيعي لازم يتعاقب.   
    eng              لازم تترك الامور تاخذ مجراها الطبيعي.   
    eng                                    لا يريدون استخ.   

                                                   English  
acm eng                 If wisely used, money can do much.  
    eng                      Have you ever been to Mexico?  
    eng  We thought that it was natural that he should ...  
    eng         You must let things take their own course.  
    eng                         They don't want to use it.  


In [None]:
data_cleaned = data.dropna()
data = data.drop_duplicates()
def remove_diacritics(text):
    arabic_diacritics = re.compile(r'[\u064B-\u0652]') 
    text = re.sub(arabic_diacritics, '', text)  
    text = text.replace("ى", "ي").replace("ة", "ه") 
    return text.strip()

data['Arabic'] = data['Arabic'].apply(remove_diacritics)

In [None]:
data['English'] = data['English'].str.lower()

In [10]:
data.to_csv("cleaned_dataset.tsv", sep='\t', index=False)


In [None]:
def write_sentences_to_file(sentences, file_path):
    with open(file_path, 'w', encoding='utf-8') as f:
        for sentence in sentences:
            f.write(sentence.strip() + '\n')

write_sentences_to_file(data_cleaned['Arabic'], 'arabic_sentences.txt')
write_sentences_to_file(data_cleaned['English'], 'english_sentences.txt')


In [None]:
spm.SentencePieceTrainer.train(
    input='arabic.txt',
    model_prefix='spm_arabic',
    vocab_size=15475,
    model_type='unigram', 
    character_coverage=1.0,
    pad_id=0, unk_id=1, bos_id=2, eos_id=3  
)
spm.SentencePieceTrainer.train(
    input='english.txt',
    model_prefix='spm_english',
    vocab_size=6897,
    model_type='unigram',
    character_coverage=1.0,
    pad_id=0, unk_id=1, bos_id=2, eos_id=3
)

arabic_sp = spm.SentencePieceProcessor(model_file='arabic.model')
english_sp = spm.SentencePieceProcessor(model_file='english.model')

In [None]:
def preprocess_sequence(sp_processor, sentence, max_len, bos_id, eos_id, pad_id):
    tokens = sp_processor.encode(sentence, out_type=int)
    tokens = [bos_id] + tokens + [eos_id]  
    return pad_sequence(tokens, max_len, pad_id)

def pad_sequence(tokens, max_len, pad_id):
    """Pad or truncate sequence to max_len."""
    return tokens[:max_len] + [pad_id] * max(0, max_len - len(tokens)) if len(tokens) < max_len else tokens[:max_len]

In [None]:
max_len = 100
arabic_pad_id = 0  
english_sos_id = 2  
english_eos_id = 3  
english_pad_id = 0  

arabic_sequences = [pad_sequence(arabic_sp.encode(sentence, out_type=int), max_len, arabic_pad_id) 
                   for sentence in data_cleaned['Arabic']]
english_sequences = [preprocess_sequence(english_sp, sentence, max_len, english_sos_id, english_eos_id, english_pad_id) 
                    for sentence in data_cleaned['English']]

In [None]:
train_data, temp_data = train_test_split(data_cleaned, test_size=0.2, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

train_arabic_sequences = [pad_sequence(arabic_sp.encode(sentence, out_type=int), max_len, 0) 
                        for sentence in train_data['Arabic']]
train_english_sequences = [preprocess_sequence(english_sp, sentence, max_len, 2, 3, 0) 
                          for sentence in train_data['English']]
val_arabic_sequences = [pad_sequence(arabic_sp.encode(sentence, out_type=int), max_len, 0) 
                       for sentence in val_data['Arabic']]
val_english_sequences = [preprocess_sequence(english_sp, sentence, max_len, 2, 3, 0) 
                        for sentence in val_data['English']]
test_arabic_sequences = [pad_sequence(arabic_sp.encode(sentence, out_type=int), max_len, 0) 
                        for sentence in test_data['Arabic']]
test_english_sequences = [preprocess_sequence(english_sp, sentence, max_len, 2, 3, 0) 
                         for sentence in test_data['English']]

train_dataset = TranslationDataset(torch.tensor(train_arabic_sequences), torch.tensor(train_english_sequences))
val_dataset = TranslationDataset(torch.tensor(val_arabic_sequences), torch.tensor(val_english_sequences))
test_dataset = TranslationDataset(torch.tensor(test_arabic_sequences), torch.tensor(test_english_sequences))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
from tokenizers import Tokenizer, models, pre_tokenizers, trainers

arabic_tokenizer = Tokenizer(models.BPE())
arabic_pre_tokenizer = pre_tokenizers.Whitespace()
arabic_tokenizer.pre_tokenizer = arabic_pre_tokenizer
arabic_trainer = trainers.BpeTrainer(vocab_size=20000, special_tokens=["<PAD>", "<UNK>", "<SOS>", "<EOS>"])

arabic_tokenizer.train(files=["arabic_sentences.txt"], trainer=arabic_trainer)
arabic_tokenizer.save("arabic_bpe_tokenizer.json")

english_tokenizer = Tokenizer(models.BPE())
english_pre_tokenizer = pre_tokenizers.Whitespace()
english_tokenizer.pre_tokenizer = english_pre_tokenizer
english_trainer = trainers.BpeTrainer(vocab_size=10000, special_tokens=["<PAD>", "<UNK>", "<SOS>", "<EOS>"])

english_tokenizer.train(files=["english_sentences.txt"], trainer=english_trainer)
english_tokenizer.save("english_bpe_tokenizer.json")


In [None]:
import torch
arabic_tensors = torch.tensor(arabic_sequences, dtype=torch.long)
english_tensors = torch.tensor(english_sequences, dtype=torch.long)

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, src_tensors, tgt_tensors):
        self.src_tensors = src_tensors
        self.tgt_tensors = tgt_tensors
 
    def __len__(self):
        return len(self.src_tensors)
 
    def __getitem__(self, idx):
        return self.src_tensors[idx], self.tgt_tensors[idx]

train_dataset = TranslationDataset(arabic_tensors, english_tensors)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.d_model = d_model 
        self.num_heads = num_heads 
        self.d_k = d_model // num_heads 
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model) 
        self.W_v = nn.Linear(d_model, d_model) 
        self.W_o = nn.Linear(d_model, d_model) 
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [19]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [20]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [21]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [22]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [None]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2).to(src.device)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3).to(tgt.device)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length, device=tgt.device), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

In [None]:

src_vocab_size = 15000
tgt_vocab_size = 6800  
d_model = 512         
num_heads = 8         
num_layers = 6         
d_ff = 2048           
max_seq_length = 100    
dropout = 0.3         
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Transformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    d_model=d_model,
    num_heads=num_heads,
    num_layers=num_layers,
    d_ff=d_ff,
    max_seq_length=max_seq_length,
    dropout=dropout
).to(device)

model = nn.DataParallel(model)
model = model.to(device)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=english_pad_id)  # Use english_pad_id (0)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

def train_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0
    total_correct = 0
    total_tokens = 0
    for src, tgt in dataloader:
        src = src.to(device)
        tgt = tgt.to(device)
        optimizer.zero_grad()
        tgt_input = tgt[:, :-1].to(device)
        tgt_output = tgt[:, 1:].to(device)
        output = model(src, tgt_input)
        loss = criterion(output.reshape(-1, tgt_vocab_size), tgt_output.reshape(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item()
        predictions = output.argmax(dim=-1)
        mask = (tgt_output != english_pad_id)
        correct = (predictions == tgt_output) & mask
        total_correct += correct.sum().item()
        total_tokens += mask.sum().item()
    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_tokens if total_tokens > 0 else 0
    return avg_loss, accuracy



In [None]:
num_epochs = 43
for epoch in range(num_epochs):
    avg_loss, accuracy = train_epoch(model, train_loader, optimizer, criterion)
    scheduler.step(avg_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

model_path = "transformer_translation.pth"
torch.save(model.module if isinstance(model, nn.DataParallel) else model, model_path)

In [None]:
def translate_sentence(model, arabic_sentence, arabic_sp, english_sp, max_len, device):
    arabic_sentence = remove_diacritics(arabic_sentence)
    src = pad_sequence(arabic_sp.encode(arabic_sentence, out_type=int), max_len, 0)
    src = torch.tensor([src], dtype=torch.long).to(device)
    
    tgt = torch.tensor([[2]], dtype=torch.long).to(device)  # BOS = 2
    model.eval()
    with torch.no_grad():
        for _ in range(max_len - 1):
            output = model(src, tgt)
            next_token = output[:, -1, :].argmax(dim=-1).item()
            tgt = torch.cat([tgt, torch.tensor([[next_token]], dtype=torch.long).to(device)], dim=1)
            if next_token == 3:  # EOS = 3
                break
    translated_ids = tgt[0].cpu().tolist()
    if translated_ids[0] == 2:  
        translated_ids = translated_ids[1:]
    if translated_ids[-1] == 3:  
        translated_ids = translated_ids[:-1]
    
    translated_sentence = english_sp.decode(translated_ids)
    return translated_sentence
arabic_test_sentence = "أنا لا أحتاجك بعد الأن"
translated = translate_sentence(model, arabic_test_sentence, arabic_sp, english_sp, max_len=100, device=device)
print(f"Arabic: {arabic_test_sentence}")
print(f"English: {translated}")