In [None]:
# Download the dataset from kaggle (ahmedashrafahmed/arabic-to-english-sentences-dataset)

"""
  Note :
    if you want to download the datset you need to Set up Kaggle API Key then uploading the kaggle.json file
"""

!pip install kaggle
import os
os.makedirs('/root/.kaggle', exist_ok=True)
os.rename('kaggle.json', '/root/.kaggle/kaggle.json')
os.chmod('/root/.kaggle/kaggle.json', 600)
!kaggle datasets download ahmedashrafahmed/arabic-to-english-sentences-dataset
!unzip arabic-to-english-sentences-dataset.zip
!ls


Dataset URL: https://www.kaggle.com/datasets/ahmedashrafahmed/arabic-to-english-sentences-dataset
License(s): unknown
Downloading arabic-to-english-sentences-dataset.zip to /content
  0% 0.00/493k [00:00<?, ?B/s]
100% 493k/493k [00:00<00:00, 122MB/s]
Archive:  arabic-to-english-sentences-dataset.zip
  inflating: _about.txt              
  inflating: ara.txt                 
_about.txt  arabic-to-english-sentences-dataset.zip  ara.txt  sample_data


In [12]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
import pandas as pd

file_path = 'ara.txt'

# transforming the file into a dataframe
df = pd.read_csv(file_path, delimiter='\t', header=None, names=["en_s", "ar_s", "Attribution"])

# displaying the first few rows of the dataframe to verify the content
print(df.head())

    en_s            ar_s                                        Attribution
0    Hi.         مرحبًا.  CC-BY 2.0 (France) Attribution: tatoeba.org #5...
1   Run!           اركض!  CC-BY 2.0 (France) Attribution: tatoeba.org #9...
2  Duck!      اخفض رأسك!  CC-BY 2.0 (France) Attribution: tatoeba.org #2...
3  Duck!     اخفضي رأسك!  CC-BY 2.0 (France) Attribution: tatoeba.org #2...
4  Duck!  اخفضوا رؤوسكم!  CC-BY 2.0 (France) Attribution: tatoeba.org #2...


In [4]:
# preprocessing + tokenization
import re
import nltk
import spacy
from nltk.tokenize import word_tokenize

nltk.download('punkt_tab')
nlp_arabic = spacy.blank('ar')

def clean_text(text):
    text = text.strip()
    text = re.sub(r'\s+' , ' ', text)
    text = re.sub(r'[^a-zA-Z0-9\u0621-\u064A\s\u060C\u061F\u0640\u0660-\u0669\u06F0-\u06F9\u066C\u062C\u0646\u0645\u062A\u0644\u064A\u0634.]', '', text)
    return text

def tokenize_english(text):
    return word_tokenize(text)

def tokenize_arabic(text):
    doc = nlp_arabic(text)
    return [token.text for token in doc]

def preprocess_data(df):
    df['en'] = df['en_s'].apply(clean_text)
    df['ar'] = df['ar_s'].apply(clean_text)

    df['en_tokens'] = df['en'].apply(tokenize_english)
    df['ar_tokens'] = df['ar'].apply(tokenize_arabic)

    return df[['en_tokens', 'ar_tokens']]

data = preprocess_data(df)

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


In [24]:
# building vocabulary + encoding
from collections import Counter

def build_vocab(sentences):
    flat_tokens = [token for sentence in sentences for token in sentence]
    token_counts = Counter(flat_tokens)

    vocab = {token: idx+4 for idx, (token, count) in enumerate(token_counts.items())}
    # adding special tokens
    vocab['<PAD>'] = 0
    vocab['<UNK>'] = 1
    vocab['<START>'] = 2
    vocab['<END>'] = 3

    vocab_size = len(vocab)

    return vocab,vocab_size
def encode_sentences(sentences,vocab,max_len=25):
    encoded_sentences = [
        [vocab['<START>']] +
        [vocab.get(token, vocab['<UNK>']) for token in sentence]
        for sentence in sentences
    ]

    for i, sentence in enumerate(encoded_sentences):
        if len(sentence) > max_len - 1:
            encoded_sentences[i] = sentence[:max_len - 1]

    for i, sentence in enumerate(encoded_sentences):
        sentence.append(vocab['<END>'])

    padded_sentences = []


    for sentence in encoded_sentences:
        while len(sentence) < max_len:
            sentence.append(vocab['<PAD>'])
        padded_sentences.append(sentence)
    return padded_sentences

ar_vocab , ar_vocab_size = build_vocab(data['ar_tokens'])
en_vocab , en_vocab_size = build_vocab(data['en_tokens'])
max_len = 35
data['ar_ids'] = encode_sentences(data['ar_tokens'],ar_vocab, max_len=max_len)
data['en_ids'] = encode_sentences(data['en_tokens'],en_vocab, max_len=max_len)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['ar_ids'] = encode_sentences(data['ar_tokens'],ar_vocab, max_len=max_len)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['en_ids'] = encode_sentences(data['en_tokens'],en_vocab, max_len=max_len)


In [25]:
from sklearn.model_selection import train_test_split

# Split the data into train (80%), validation (10%), and test (10%) sets
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42,shuffle=True)

# If you want a separate test set, further split val_data
val_data, test_data = train_test_split(val_data, test_size=0.5, random_state=42)

In [26]:
# Convert the train, validation, and test data to PyTorch tensors
train_en_tensor = torch.tensor(train_data['en_ids'].tolist())
train_ar_tensor = torch.tensor(train_data['ar_ids'].tolist())

val_en_tensor = torch.tensor(val_data['en_ids'].tolist())
val_ar_tensor = torch.tensor(val_data['ar_ids'].tolist())

test_en_tensor = torch.tensor(test_data['en_ids'].tolist())
test_ar_tensor = torch.tensor(test_data['ar_ids'].tolist())

In [28]:
from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):
    def __init__(self, src_tensor, tgt_tensor):
        self.src_tensor = src_tensor  # Source (English)
        self.tgt_tensor = tgt_tensor  # Target (Arabic)

    def __len__(self):
        return len(self.src_tensor)

    def __getitem__(self, idx):
        return self.src_tensor[idx], self.tgt_tensor[idx]

# Create datasets for training, validation, and testing
train_dataset = TranslationDataset(train_en_tensor, train_ar_tensor)
val_dataset = TranslationDataset(val_en_tensor, val_ar_tensor)
test_dataset = TranslationDataset(test_en_tensor, test_ar_tensor)

# Create DataLoaders to iterate through batches
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [29]:
# building the model
import math
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,n_heads):
        super(MultiHeadAttention,self).__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.head_len = d_model // n_heads

        self.key = nn.Linear(d_model , d_model)
        self.query = nn.Linear(d_model , d_model)
        self.value = nn.Linear(d_model , d_model)
        self.linear = nn.Linear(d_model , d_model)

    def K_dot_Q(self ,key , query , value, mask):
        result = torch.matmul(query , key.transpose(-2,-1)) / math.sqrt(self.head_len)
        if mask is not None:
            result = result.masked_fill(mask == 0, -1e9)
        result_probs = torch.softmax(result , dim=-1)
        out = torch.matmul(result_probs,value)
        return out

    def split_heads(self, x):
      batch_size, seq_len, d_model = x.size()
      return x.view(batch_size, seq_len, self.n_heads, self.head_len).transpose(1, 2)

    def concatinate_heads(self, x):
      batch_size, n_heads, seq_len, head_len = x.size()
      return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)

    def forward(self,k,q,v,mask = None):
        K = self.split_heads(self.key(k))
        Q = self.split_heads(self.query(q))
        V = self.split_heads(self.value(v))
        out = self.K_dot_Q(K , Q , V, mask)

        conc_out = self.concatinate_heads(out)
        final_out = self.linear(conc_out)
        return final_out



class FeedForward (nn.Module):
    def __init__(self,d_model , d_ff ):
        super(FeedForward,self).__init__()
        self.layer1 = nn.Linear(d_model, d_ff)
        self.layer2 = nn.Linear(d_ff, d_model)
        self.activation = nn.ReLU()
    def forward(self,x):
        l1 = self.activation(self.layer1(x))
        l2 = self.layer2(l1)
        return l2



class Embed_PosEncod(nn.Module):
    def __init__(self, vocab_size, max_seq_len, d_model):
        super(Embed_PosEncod, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.position_encod = nn.Embedding(max_seq_len, d_model)

    def forward(self, x,device):
        batch, seq_len = x.size()
        x_tock = self.embedding(x).to(device)
        x_positions = self.position_encod(torch.arange(seq_len, device=device))
        return x_tock + x_positions



class Encoder_layer(nn.Module):
    def __init__(self,d_model,n_heads,d_ff,dropout):
        super(Encoder_layer,self).__init__()
        self.attention = MultiHeadAttention(d_model,n_heads)
        self.Norm1 = nn.LayerNorm(d_model)
        self.Norm2 = nn.LayerNorm(d_model)
        self.feed = FeedForward(d_model , d_ff)
        self.dropout = nn.Dropout(dropout)
    def forward(self,x,mask):
        out = self.attention(x,x,x,None)
        norm1 = self.Norm1(self.dropout(out)+x)
        out = self.feed(norm1)
        norm2 = self.Norm2(self.dropout(out)+norm1)
        return norm2


class Decoder_layer(nn.Module):
    def __init__(self,d_model,n_heads,d_ff,dropout):
        super(Decoder_layer,self).__init__()
        self.attention = MultiHeadAttention(d_model,n_heads)
        self.cross_attention = MultiHeadAttention(d_model,n_heads)
        self.Norm1 = nn.LayerNorm(d_model)
        self.Norm2 = nn.LayerNorm(d_model)
        self.Norm3 = nn.LayerNorm(d_model)
        self.feed = FeedForward(d_model , d_ff)
        self.dropout = nn.Dropout(dropout)
    def forward(self,x,enc_out,trg_mask, src_mask):
        out = self.attention(x,x,x,trg_mask)
        norm1 = self.Norm1(self.dropout(out)+x)
        out = self.cross_attention(enc_out , norm1 , enc_out , src_mask)
        norm2 = self.Norm2(self.dropout(out)+norm1)
        out = self.feed(norm2)
        norm3 = self.Norm3(self.dropout(out)+norm2)
        return norm3


class Transformer(nn.Module):
    def __init__(self, vocab_size, tgt_vocab_size, max_seq_len, d_model, n_heads, d_ff, dropout, num_layers):
        super(Transformer, self).__init__()
        self.enc_embed_pos = Embed_PosEncod(vocab_size, max_seq_len, d_model)
        self.dec_embed_pos = Embed_PosEncod(tgt_vocab_size, max_seq_len, d_model)
        self.encoders = nn.ModuleList([Encoder_layer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoders = nn.ModuleList([Decoder_layer(d_model, n_heads, d_ff, dropout) for _ in range(num_layers)])
        self.linear_out = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
      device = src.device

      src_mask = (src != 0).unsqueeze(1).unsqueeze(2).to(device)

      seq_len = tgt.size(1)
      tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(2).to(device)

      nopeak_mask = torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1).bool().to(device)

      tgt_mask = tgt_mask & nopeak_mask.unsqueeze(0)
      return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask,tgt_mask = self.generate_mask(src, tgt)
        src_embeded = self.dropout(self.enc_embed_pos(src, src.device))
        tgt_embeded = self.dropout(self.dec_embed_pos(tgt, tgt.device))
        enc_output = src_embeded
        for encoder_layer in self.encoders:
            enc_output = encoder_layer(enc_output,src_mask)

        dec_output = tgt_embeded
        for decoder_layer in self.decoders:
            dec_output = decoder_layer(dec_output, enc_output, tgt_mask,src_mask)
        out = self.linear_out(dec_output)

        return out


In [30]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
import torch

flat_ar_ids = [idx for word,idx in  ar_vocab.items() ]
unique_classes = np.unique(flat_ar_ids)
class_weights = compute_class_weight('balanced', classes=unique_classes, y=flat_ar_ids)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)
vocab_size = ar_vocab_size
extended_class_weights_tensor = torch.zeros(vocab_size).to(device)

for idx, class_id in enumerate(unique_classes):
    extended_class_weights_tensor[class_id] = class_weights_tensor[idx]



In [None]:
from torch.optim.lr_scheduler import StepLR
from transformers import get_linear_schedule_with_warmup

def train_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for src, tgt in train_loader:
        src, tgt = src.to(device), tgt.to(device)

        tgt_input = tgt[:, :-1].to(device)
        tgt_output = tgt[:, 1:].to(device)

        # Forward pass
        optimizer.zero_grad()
        output = model(src, tgt_input)

        # Calculate the loss (Ignoring padding tokens)
        loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        epoch_loss += loss.item()

        _, predicted = output.max(dim=-1)  t
        mask = (tgt_output != 0)
        correct_predictions += ((predicted == tgt_output) & mask).sum().item()  
        total_predictions += mask.sum().item()

    epoch_accuracy = correct_predictions / total_predictions
    return epoch_loss / len(train_loader), epoch_accuracy


def evaluate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for src, tgt in val_loader:
            src, tgt = src.to(device), tgt.to(device)

            tgt_input = tgt[:, :-1].to(device)
            tgt_output = tgt[:, 1:].to(device)

            output = model(src, tgt_input)
            loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))
            val_loss += loss.item()
            _, predicted = output.max(dim=-1)
            mask = (tgt_output != 0)
            correct_predictions += ((predicted == tgt_output) & mask).sum().item()
            total_predictions += mask.sum().item()
    val_accuracy = correct_predictions / total_predictions
    return val_loss / len(val_loader), val_accuracy

def translate(model, input_sentence, max_len, sos_token, eos_token, device='cuda'):

    src_tokens = encode_sentences([input_sentence.split()], ar_vocab, max_len=max_len)[0]
    src_tensor = torch.tensor([src_tokens], dtype=torch.long).to(device)  # shape: [1, seq_len]
    model.eval()
    tgt = torch.tensor([[sos_token]], dtype=torch.long).to(device)  
    for index in range(max_len - 1):
        output = model(src_tensor, tgt)

        next_token = output[:, -1, :].argmax(dim=-1).unsqueeze(1)
        tgt = torch.cat([tgt, next_token], dim=1)

        if next_token.item() == eos_token:
            break

    translated_tokens = tgt.squeeze(0).cpu().numpy()  
    translated_sentence = ' '.join([' '.join([word for word, idx in ar_vocab.items() if idx == token]) for token in translated_tokens[1:] if token != en_vocab['<PAD>']])
    return translated_sentence



def train(model, train_loader, val_loader, num_epochs=10, lr=5e-5, batch_size=32, device='cuda', warmup_steps=1000,test_sentence='hello'):
    model.to(device)

    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-3)

    criterion = nn.CrossEntropyLoss(ignore_index=0, weight=extended_class_weights_tensor)  

    total_steps = len(train_loader) * num_epochs
    scheduler = get_linear_schedule_with_warmup(optimizer,
                                                num_warmup_steps=warmup_steps,  
                                                num_training_steps=total_steps)

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")

        # Train for one epoch
        train_loss, train_accuracy = train_epoch(model, train_loader, optimizer, criterion, device)
        print(f"Training loss: {train_loss:.4f}, Training accuracy: {train_accuracy:.4f}")

        # Evaluate on validation set
        val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)
        print(f"Validation loss: {val_loss:.4f}, Validation accuracy: {val_accuracy:.4f}")

        translated_sentence = translate(model, test_sentence, max_len=max_len,sos_token=ar_vocab['<START>'], eos_token=ar_vocab['<END>'])
        print(f'input : {test_sentence}');
        print(f'translated : {translated_sentence}')
        scheduler.step()

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

    return train_losses, val_losses, train_accuracies, val_accuracies


In [74]:
model = Transformer(vocab_size=en_vocab_size, tgt_vocab_size=ar_vocab_size, d_model=512, n_heads=8,
                    num_layers=3, d_ff=2048, dropout=0.4,max_seq_len=max_len)

In [75]:
train_losses ,val_losses, train_accuracies, val_accuracies = train(model=model,train_loader=train_loader,val_loader=val_loader,num_epochs=20,lr=3e-2,batch_size=batch_size)

Epoch 1/20
Training loss: 9.4330, Training accuracy: 0.0001
Validation loss: 9.4152, Validation accuracy: 0.0001
input : hello
translated : ملايين وفي غناك وعده سأشتاق لفتت والمنجل سأخفف لها طفولة عامي ألزمت البترول كثر نلتق راهبا تقولي دكتورا نادلا أنسى حضوري خريطة خريطة الغيوم الافلام اسألها النسيان أنتم المتحدة للميلاد القادمة لقراءة خلال تعشق
Epoch 2/20
Training loss: 5.9988, Training accuracy: 0.3026
Validation loss: 5.3478, Validation accuracy: 0.3634
input : hello
translated : هل توم أن أن <END>
Epoch 3/20
Training loss: 4.7311, Training accuracy: 0.4488
Validation loss: 4.1711, Validation accuracy: 0.5223
input : hello
translated : هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل هل
Epoch 4/20
Training loss: 3.9260, Training accuracy: 0.5455
Validation loss: 3.5292, Validation accuracy: 0.5950
input : hello
translated : كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم كم
Epoch 5/

In [None]:
def test(model, test_loader, device):
    total_predictions = 0
    correct_predictions = 0
    model.eval()
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    val_loss = 0.0
    with torch.no_grad():
        for src, tgt in test_loader:
            src, tgt = src.to(device), tgt.to(device)

            tgt_input = tgt[:, :-1].to(device)  
            tgt_output = tgt[:, 1:].to(device)  

            output = model(src, tgt_input)

            loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))

            val_loss += loss.item()

            _, predicted = output.max(dim=-1)              
            mask = (tgt_output != 0) 
            correct_predictions += ((predicted == tgt_output) & mask).sum().item() 
            total_predictions += mask.sum().item()
            val_accuracy = correct_predictions / total_predictions
    return val_loss / len(test_loader),val_accuracy
test_loss,val_accuracy = test(model, test_loader, device)
print(f"Test Loss: {test_loss:.4f} test accuracy : {val_accuracy:.4f}")

Test Loss: 1.3676 test accuracy : 0.8515
