In [None]:
!pip install datasets

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.2.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m26.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl 

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import sentence_bleu
from datasets import load_dataset
from typing import List, Tuple

In [None]:
def preprocess_data(dataset, src_vocab, tgt_vocab, max_len):

    data = []
    for example in dataset:
        src_sentence = example['translation']['ar']
        tgt_sentence = example['translation']['en']
        src_indices = [src_vocab.get(word, src_vocab['<unk>']) for word in src_sentence.split()]
        tgt_indices = [tgt_vocab.get(word, tgt_vocab['<unk>']) for word in tgt_sentence.split()]
        data.append((
            torch.tensor(src_indices[:max_len], dtype=torch.long),
            torch.tensor(tgt_indices[:max_len], dtype=torch.long),
        ))
    return data

In [None]:
def build_vocab(sentences: List[str], max_vocab_size: int = 80000) -> dict:
    vocab = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
    word_freq = {}
    for sentence in sentences:
        for word in sentence.split():
            word_freq[word] = word_freq.get(word, 0) + 1
    sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
    for idx, (word, _) in enumerate(sorted_words[:max_vocab_size - len(vocab)], len(vocab)):
        vocab[word] = idx
    return vocab


In [None]:

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embed_size, num_heads, num_layers, ff_hidden_dim, max_len):
        super(Transformer, self).__init__()
        self.src_embedding = nn.Embedding(src_vocab_size, embed_size)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, embed_size)
        self.positional_encoding = self.create_positional_encoding(max_len, embed_size)

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=ff_hidden_dim)
        decoder_layer = nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=ff_hidden_dim)

        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers)

        self.fc_out = nn.Linear(embed_size, tgt_vocab_size)

    def create_positional_encoding(self, max_len, embed_size):
        pe = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * -(np.log(10000.0) / embed_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        return pe.unsqueeze(0)

    def forward(self, src, tgt):
        src = self.src_embedding(src).to(device) + self.positional_encoding[:, :src.size(1), :].to(device)
        tgt = self.tgt_embedding(tgt).to(device) + self.positional_encoding[:, :tgt.size(1), :].to(device)

        memory = self.encoder(src.permute(1, 0, 2))
        output = self.decoder(tgt.permute(1, 0, 2), memory)
        return self.fc_out(output.permute(1, 0, 2))



In [None]:

def collate_fn(batch):
    src_batch, tgt_batch = zip(*batch)
    src_padded = torch.nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=0)
    tgt_padded = torch.nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=0)
    return src_padded, tgt_padded

def train_model(model, data_loader, optimizer, criterion, num_epochs, device):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for src, tgt in data_loader:
            src, tgt = src.to(device), tgt.to(device)
            tgt_input = tgt[:, :-1]
            tgt_output = tgt[:, 1:]

            optimizer.zero_grad()
            output = model(src, tgt_input)
            loss = criterion(output.reshape(-1, output.size(-1)), tgt_output.reshape(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(data_loader):.4f}")

In [None]:

def evaluate_model(model, data_loader, src_vocab, tgt_vocab, device):
    model.eval()
    translations = []
    with torch.no_grad():
        for src, tgt in data_loader:
            src = src.to(device)
            tgt = tgt.to(device)
            output = model(src, tgt[:, :-1])
            predicted = torch.argmax(output, dim=-1)
            for i in range(predicted.size(0)):
                src_sentence = ' '.join([list(src_vocab.keys())[list(src_vocab.values()).index(word)] for word in src[i] if word != 0])
                tgt_sentence = ' '.join([list(tgt_vocab.keys())[list(tgt_vocab.values()).index(word)] for word in predicted[i] if word != 0])
                translations.append((src_sentence, tgt_sentence))
    return translations

In [None]:
def load_data_from_file(file_path: str) -> Tuple[List[str], List[str]]:

    src_sentences = []
    tgt_sentences = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split("\t")
            if len(parts) == 2:
                src_sentences.append(parts[0])
                tgt_sentences.append(parts[1])
    return src_sentences, tgt_sentences

In [None]:
file_path = "ara_.txt"
src_sentences, tgt_sentences = load_data_from_file(file_path)

src_sentences_train, src_sentences_test, tgt_sentences_train, tgt_sentences_test = train_test_split(
    src_sentences, tgt_sentences, test_size=0.3, random_state=42
)

src_vocab = build_vocab(src_sentences_train)
tgt_vocab = build_vocab(tgt_sentences_train)

train_data = preprocess_data(
    [{"translation": {"ar": src, "en": tgt}} for src, tgt in zip(src_sentences_train, tgt_sentences_train)],
    src_vocab, tgt_vocab, max_len=50
)
test_data = preprocess_data(
    [{"translation": {"ar": src, "en": tgt}} for src, tgt in zip(src_sentences_test, tgt_sentences_test)],
    src_vocab, tgt_vocab, max_len=50
)

In [None]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=32, collate_fn=collate_fn)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Transformer(
    src_vocab_size=len(src_vocab),
    tgt_vocab_size=len(tgt_vocab),
    embed_size=256,
    num_heads=8,
    num_layers=4,
    ff_hidden_dim=512,
    max_len=50
).to(device)






In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index=src_vocab['<pad>'])

train_model(model, train_loader, optimizer, criterion, num_epochs=100, device=device)

translations = evaluate_model(model, test_loader, src_vocab, tgt_vocab, device)
for src, tgt in translations[:10]:
    print(f"Source: {src}\nTranslated: {tgt}\n")

Epoch 1/100, Loss: 8.5642
Epoch 2/100, Loss: 7.6232
Epoch 3/100, Loss: 7.0124
Epoch 4/100, Loss: 6.4251
Epoch 5/100, Loss: 5.8630
Epoch 6/100, Loss: 5.3460
Epoch 7/100, Loss: 4.8551
Epoch 8/100, Loss: 4.3976
Epoch 9/100, Loss: 3.9607
Epoch 10/100, Loss: 3.5587
Epoch 11/100, Loss: 3.1644
Epoch 12/100, Loss: 2.8117
Epoch 13/100, Loss: 2.4601
Epoch 14/100, Loss: 2.1402
Epoch 15/100, Loss: 1.8425
Epoch 16/100, Loss: 1.5678
Epoch 17/100, Loss: 1.3108
Epoch 18/100, Loss: 1.0866
Epoch 19/100, Loss: 0.8791
Epoch 20/100, Loss: 0.7023
Epoch 21/100, Loss: 0.5520
Epoch 22/100, Loss: 0.4267
Epoch 23/100, Loss: 0.3302
Epoch 24/100, Loss: 0.2566
Epoch 25/100, Loss: 0.2012
Epoch 26/100, Loss: 0.1590
Epoch 27/100, Loss: 0.1284
Epoch 28/100, Loss: 0.1065
Epoch 29/100, Loss: 0.0907
Epoch 30/100, Loss: 0.0753
Epoch 31/100, Loss: 0.0693
Epoch 32/100, Loss: 0.0549
Epoch 33/100, Loss: 0.0510
Epoch 34/100, Loss: 0.0394
Epoch 35/100, Loss: 0.0364
Epoch 36/100, Loss: 0.0314
Epoch 37/100, Loss: 0.0330
Epoch 38/1

In [None]:
def translate_sentence(model, sentence: str, src_vocab: dict, tgt_vocab: dict, device: torch.device, max_len: int = 50000) -> str:

    src_indices = [src_vocab.get(word, src_vocab['<unk>']) for word in sentence.split()]
    src_tensor = torch.tensor(src_indices, dtype=torch.long).unsqueeze(0).to(device)


    tgt_indices = [tgt_vocab['<sos>']]
    for _ in range(max_len):
        tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device)
        output = model(src_tensor, tgt_tensor)
        next_word_idx = torch.argmax(output[0, -1, :]).item()


        if next_word_idx == tgt_vocab['<eos>'] or (len(tgt_indices) > 1 and next_word_idx == tgt_indices[-1]):
            break

        tgt_indices.append(next_word_idx)


    translated_sentence = ' '.join([list(tgt_vocab.keys())[list(tgt_vocab.values()).index(idx)] for idx in tgt_indices[1:] if idx != tgt_vocab['<eos>']])
    return translated_sentence


In [None]:
input_sentence = "لديه سيارته الخاصة"
translated_sentence = translate_sentence(model, input_sentence, src_vocab, tgt_vocab, device)
print(f"Input: {input_sentence}")
print(f"Translated: {translated_sentence}")
