In [1]:
# Installation 
!pip install torch torchvision
!pip install transformers datasets  
!pip install pandas numpy openpyxl
!pip install beautifulsoup4 requests
!pip install scikit-learn matplotlib seaborn
!pip install nltk camel-tools

# Vérifier l'installation
import nltk
nltk.download('punkt')

import torch
print(f" PyTorch version: {torch.__version__}")
print(f" CUDA disponible: {torch.cuda.is_available()}")

Collecting torch
  Downloading torch-2.9.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting torchvision
  Downloading torchvision-0.24.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (5.9 kB)
Collecting filelock (from torch)
  Downloading filelock-3.20.1-py3-none-any.whl.metadata (2.1 kB)
Collecting sympy>=1.13.3 (from torch)
  Downloading sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
Collecting networkx>=2.5.1 (from torch)
  Downloading networkx-3.6.1-py3-none-any.whl.metadata (6.8 kB)
Collecting fsspec>=0.8.5 (from torch)
  Downloading fsspec-2025.12.0-py3-none-any.whl.metadata (10 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.8.93 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.8.93-py3-none-manylinux2010_x86_64.manylinux_2_12_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-runtime-cu12==12.8.90 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-cupti-cu1

[nltk_data] Downloading package punkt to /home/codespace/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


 PyTorch version: 2.9.1+cu128
 CUDA disponible: False


# Part1 Classification Task

# QUESTION 1: Web Scraping de textes arabes

In [None]:
import pandas as pd
import numpy as np

# Création dataset manuel (16 textes, scores 0-10)
data = {
    'Text': [
        "الذكاء الاصطناعي يغير العالم بشكل جذري ويؤثر على جميع جوانب حياتنا اليومية",
        "تطور التكنولوجيا الحديثة يساعد في تحسين الخدمات الصحية والتعليمية",
        "الرياضة لها فوائد صحية عديدة وتساعد على تحسين اللياقة البدنية",
        "التغير المناخي يشكل تهديدا كبيرا للبشرية ويتطلب إجراءات عاجلة",
        "الاقتصاد الرقمي يوفر فرصا جديدة للشركات الناشئة والمبدعين",
        "التعليم عن بعد أصبح ضرورة في العصر الحديث وله مزايا متعددة",
        "الأمن السيبراني أصبح من أهم التحديات في عالم الإنترنت المعاصر",
        "الطاقة المتجددة هي المستقبل للحفاظ على البيئة وتقليل الانبعاثات",
        "الثورة الصناعية الرابعة تعتمد على الأتمتة والذكاء الاصطناعي",
        "البحث العلمي يساهم في تطوير المجتمعات وحل المشكلات المعقدة",
        "الإنترنت غير طريقة التواصل بين الناس في جميع أنحاء العالم",
        "الطب الحديث حقق إنجازات كبيرة في علاج الأمراض المستعصية",
        "الفن والثقافة يعكسان هوية الشعوب وتاريخها الحضاري",
        "التجارة الإلكترونية سهلت عملية الشراء والبيع عبر الإنترنت",
        "الزراعة الذكية تستخدم التكنولوجيا لزيادة الإنتاج وتقليل الهدر",
        "الفضاء الخارجي يفتح آفاقا جديدة للاستكشاف والبحث العلمي"
    ],
    'Score': [8.5, 7.0, 6.5, 9.0, 7.5, 8.0, 8.5, 9.5, 8.0, 7.5, 6.0, 8.5, 5.5, 7.0, 8.0, 9.0]
}
df = pd.DataFrame(data)
df.to_csv('arabic_texts_dataset.csv', index=False, encoding='utf-8-sig')
print(f"Dataset créé : {len(df)} textes")
print(df.head())
print("Fichier sauvegardé : arabic_texts_dataset.csv")

Dataset créé : 16 textes
                                                Text  Score
0  الذكاء الاصطناعي يغير العالم بشكل جذري ويؤثر ع...    8.5
1  تطور التكنولوجيا الحديثة يساعد في تحسين الخدما...    7.0
2  الرياضة لها فوائد صحية عديدة وتساعد على تحسين ...    6.5
3  التغير المناخي يشكل تهديدا كبيرا للبشرية ويتطل...    9.0
4  الاقتصاد الرقمي يوفر فرصا جديدة للشركات الناشئ...    7.5
Fichier sauvegardé : arabic_texts_dataset.csv


# QUESTION 2 : Pipeline de Preprocessing NLP

In [74]:
# Installation
!pip install nltk

import re
import nltk
from collections import Counter
import pandas as pd
nltk.download('punkt')
nltk.download('stopwords')

class ArabicPreprocessor:
    def __init__(self):
        self.stop_words = set(nltk.corpus.stopwords.words('arabic'))
    
    def normalize_arabic(self, text):
        if not isinstance(text, str):
            return ""
        text = re.sub(r"[إأآا]", "ا", text)
        text = re.sub(r"ى", "ي", text)
        text = re.sub(r"ؤ", "ء", text)
        text = re.sub(r"ئ", "ء", text)
        text = re.sub(r"ة", "ه", text)
        text = re.sub(r"([ًٌٍَُِّْ])", "", text)
        text = re.sub(r'[^\u0600-\u06FF\s]', '', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    
    def tokenize(self, text):
        text = self.normalize_arabic(text)
        tokens = nltk.word_tokenize(text)
        return tokens
    
    def remove_stopwords(self, tokens):
        return [token for token in tokens if token not in self.stop_words and len(token) > 1]
    
    def stem_arabic(self, word):
        prefixes = ['ال', 'و', 'ف', 'ب', 'ك', 'ل']
        for prefix in prefixes:
            if word.startswith(prefix) and len(word) > len(prefix) + 2:
                word = word[len(prefix):]
                break
        suffixes = ['ها', 'ان', 'ات', 'ون', 'ين', 'ه', 'ك', 'ي', 'ن']
        for suffix in suffixes:
            if word.endswith(suffix) and len(word) > len(suffix) + 2:
                word = word[:-len(suffix)]
                break
        return word
    
    def preprocess_text(self, text):
        tokens = self.tokenize(text)
        tokens = self.remove_stopwords(tokens)
        tokens = [self.stem_arabic(token) for token in tokens]
        return ' '.join([t for t in tokens if t])
    
    def discretize_scores(self, scores):
        categories = []
        for score in scores:
            if score < 3:
                categories.append('Faible')
            elif score < 6:
                categories.append('Moyen')
            elif score < 8:
                categories.append('Bon')
            else:
                categories.append('Excellent')
        return categories

# Chargement et préprocessing
df = pd.read_csv('arabic_texts_dataset.csv')
if len(df) == 0:
    print("Erreur : Dataset vide ! Exécute 1.1 d'abord.")
else:
    preprocessor = ArabicPreprocessor()
    df['processed_text'] = df['Text'].apply(preprocessor.preprocess_text)
    df['score_category'] = preprocessor.discretize_scores(df['Score'])
    category_mapping = {'Faible': 0, 'Moyen': 1, 'Bon': 2, 'Excellent': 3}
    df['label'] = df['score_category'].map(category_mapping)
    df.to_csv('arabic_texts_preprocessed.csv', index=False, encoding='utf-8-sig')
    print("Preprocessing terminé !")
    print(df[['Text', 'processed_text', 'Score', 'label']].head())
    print("\nDistribution labels:", df['label'].value_counts().sort_index())
    print(f"Nombre de stop words utilisés: {len(preprocessor.stop_words)}")
    print("Fichier sauvegardé : arabic_texts_preprocessed.csv")

Preprocessing terminé !
                                                Text  \
0  الذكاء الاصطناعي يغير العالم بشكل جذري ويؤثر ع...   
1  تطور التكنولوجيا الحديثة يساعد في تحسين الخدما...   
2  الرياضة لها فوائد صحية عديدة وتساعد على تحسين ...   
3  التغير المناخي يشكل تهديدا كبيرا للبشرية ويتطل...   
4  الاقتصاد الرقمي يوفر فرصا جديدة للشركات الناشئ...   

                                      processed_text  Score  label  
0  ذكاء اصطناع يغير عالم شكل جذر يءثر علي جوانب ح...    8.5      3  
1     تطور تكنولوجيا حديث يساعد تحس خدم صحي التعليمي    7.0      2  
2         رياض واءد صحي عديد تساعد علي تحس لياق بدني    6.5      2  
3  تغير مناخ يشكل تهديدا بيرا لبشري يتطلب اجراء عاجل    9.0      3  
4          اقتصاد رقم يوفر رصا جديد لشرك ناشء المبدع    7.5      2  

Distribution labels: label
1    1
2    6
3    9
Name: count, dtype: int64
Nombre de stop words utilisés: 701
Fichier sauvegardé : arabic_texts_preprocessed.csv


[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


# QUESTION 3 : Entraînement des Modèles

In [76]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''

# Installation (après env var)
!pip install torch scikit-learn

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from collections import Counter

DEVICE = torch.device('cpu')

class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=50):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        tokens = self.texts[idx].split()
        indices = [self.vocab.get(token, self.vocab['<UNK>']) for token in tokens]
        if len(indices) < self.max_len:
            indices += [self.vocab['<PAD>']] * (self.max_len - len(indices))
        else:
            indices = indices[:self.max_len]
        return torch.tensor(indices, dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long)

def build_vocab(texts, min_freq=1):
    all_tokens = []
    for text in texts:
        all_tokens.extend(text.split())
    token_freq = Counter(all_tokens)
    vocab = {'<PAD>': 0, '<UNK>': 1}
    for token, freq in token_freq.items():
        if freq >= min_freq:
            vocab[token] = len(vocab)
    return vocab

class SimpleRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers  # Fix : self.n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.rnn(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

class BidirectionalRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers  # Fix : self.n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, self.n_layers, bidirectional=True, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.rnn(embedded)
        hidden_fwd = hidden[self.n_layers - 1]  # Fix : self.n_layers
        hidden_bwd = hidden[2 * self.n_layers - 1]  # Fix : self.n_layers
        hidden = torch.cat((hidden_fwd, hidden_bwd), dim=1)
        hidden = self.dropout(hidden)
        return self.fc(hidden)

class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.gru = nn.GRU(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.gru(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, (hidden, cell) = self.lstm(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    return total_loss / len(dataloader), correct / total

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
    return total_loss / len(dataloader), correct / total

def train_model(model, train_loader, val_loader, epochs, lr, device):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr)  # Fix : SGD (no CUDA check)
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    return train_losses, val_losses, train_accs, val_accs

# Chargement
df = pd.read_csv('arabic_texts_preprocessed.csv')
texts = df['processed_text'].values
labels = df['label'].values
if labels.min() == 1:
    labels = labels - 1
X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=42)
vocab = build_vocab(texts)
train_dataset = TextDataset(X_train, y_train, vocab, max_len=30)
test_dataset = TextDataset(X_test, y_test, vocab, max_len=30)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

VOCAB_SIZE = len(vocab)
OUTPUT_DIM = len(np.unique(labels))
EMBEDDING_DIM = 100
N_LAYERS = 2
DROPOUT = 0.3
EPOCHS = 20

# Tuning (4 combos)
hyperparams = [
    {'lr': 0.001, 'hidden_dim': 128},
    {'lr': 0.001, 'hidden_dim': 64},
    {'lr': 0.01, 'hidden_dim': 128},
    {'lr': 0.01, 'hidden_dim': 64}
]

best_models = {}
for model_name in ['RNN', 'BiRNN', 'GRU', 'LSTM']:
    print(f"\nTuning {model_name}...")
    best_acc = 0
    best_params = None
    for params in hyperparams:
        if model_name == 'RNN':
            model = SimpleRNN(VOCAB_SIZE, EMBEDDING_DIM, params['hidden_dim'], OUTPUT_DIM, N_LAYERS, DROPOUT)
        elif model_name == 'BiRNN':
            model = BidirectionalRNN(VOCAB_SIZE, EMBEDDING_DIM, params['hidden_dim'], OUTPUT_DIM, N_LAYERS, DROPOUT)
        elif model_name == 'GRU':
            model = GRUModel(VOCAB_SIZE, EMBEDDING_DIM, params['hidden_dim'], OUTPUT_DIM, N_LAYERS, DROPOUT)
        else:
            model = LSTMModel(VOCAB_SIZE, EMBEDDING_DIM, params['hidden_dim'], OUTPUT_DIM, N_LAYERS, DROPOUT)
        model = model.to(DEVICE)
        _, _, _, val_accs = train_model(model, train_loader, test_loader, EPOCHS, params['lr'], DEVICE)
        final_acc = val_accs[-1]
        if final_acc > best_acc:
            best_acc = final_acc
            best_params = params
            best_models[model_name] = model
    print(f"Meilleurs params pour {model_name}: {best_params}, Acc: {best_acc:.4f}")

# Sauvegarde
for name, model in best_models.items():
    torch.save(model.state_dict(), f'{name}_best.pth')
print("Entraînement terminé ! Modèles sauvegardés.")


Tuning RNN...
Epoch 5/20 - Train Loss: 1.0847, Train Acc: 0.4167, Val Loss: 1.0824, Val Acc: 0.7500
Epoch 10/20 - Train Loss: 1.0631, Train Acc: 0.6667, Val Loss: 1.0746, Val Acc: 0.7500
Epoch 15/20 - Train Loss: 1.0975, Train Acc: 0.4167, Val Loss: 1.0673, Val Acc: 0.7500
Epoch 20/20 - Train Loss: 1.0647, Train Acc: 0.5833, Val Loss: 1.0598, Val Acc: 0.7500
Epoch 5/20 - Train Loss: 1.0697, Train Acc: 0.4167, Val Loss: 1.1067, Val Acc: 0.2500
Epoch 10/20 - Train Loss: 1.0770, Train Acc: 0.4167, Val Loss: 1.0978, Val Acc: 0.2500
Epoch 15/20 - Train Loss: 1.0633, Train Acc: 0.4167, Val Loss: 1.0886, Val Acc: 0.2500
Epoch 20/20 - Train Loss: 1.0701, Train Acc: 0.4167, Val Loss: 1.0805, Val Acc: 0.2500
Epoch 5/20 - Train Loss: 1.0592, Train Acc: 0.2500, Val Loss: 1.0228, Val Acc: 0.2500
Epoch 10/20 - Train Loss: 0.9967, Train Acc: 0.5833, Val Loss: 0.9550, Val Acc: 0.7500
Epoch 15/20 - Train Loss: 0.9759, Train Acc: 0.5000, Val Loss: 0.8974, Val Acc: 0.7500
Epoch 20/20 - Train Loss: 0.946

# QUESTION 4 :Évaluation des 4 modèles (métriques standard + BLEU)

In [78]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''

# Installation
!pip install nltk scikit-learn torch

import nltk
from nltk.translate.bleu_score import sentence_bleu
from sklearn.metrics import accuracy_score, f1_score, classification_report
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from collections import Counter
nltk.download('punkt')

DEVICE = torch.device('cpu')

# Classes et fonctions de 1.3 (autonomes)
class TextDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=50):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        tokens = self.texts[idx].split()
        indices = [self.vocab.get(token, self.vocab['<UNK>']) for token in tokens]
        if len(indices) < self.max_len:
            indices += [self.vocab['<PAD>']] * (self.max_len - len(indices))
        else:
            indices = indices[:self.max_len]
        return torch.tensor(indices, dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long)

def build_vocab(texts, min_freq=1):
    all_tokens = []
    for text in texts:
        all_tokens.extend(text.split())
    token_freq = Counter(all_tokens)
    vocab = {'<PAD>': 0, '<UNK>': 1}
    for token, freq in token_freq.items():
        if freq >= min_freq:
            vocab[token] = len(vocab)
    return vocab

class SimpleRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.rnn(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

class BidirectionalRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, self.n_layers, bidirectional=True, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.rnn(embedded)
        hidden_fwd = hidden[self.n_layers - 1]
        hidden_bwd = hidden[2 * self.n_layers - 1]
        hidden = torch.cat((hidden_fwd, hidden_bwd), dim=1)
        hidden = self.dropout(hidden)
        return self.fc(hidden)

class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.gru = nn.GRU(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, hidden = self.gru(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.n_layers = n_layers
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, self.n_layers, batch_first=True, dropout=dropout if self.n_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        output, (hidden, cell) = self.lstm(embedded)
        hidden = self.dropout(hidden[-1])
        return self.fc(hidden)

# Chargement
df = pd.read_csv('arabic_texts_preprocessed.csv')
texts = df['processed_text'].values
labels = df['label'].values
if labels.min() == 1:
    labels = labels - 1
X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=42)
vocab = build_vocab(texts)
test_dataset = TextDataset(X_test, y_test, vocab, max_len=30)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

VOCAB_SIZE = len(vocab)
EMBEDDING_DIM = 100
N_LAYERS = 2
DROPOUT = 0.3
OUTPUT_DIM = 3

# Charge modèles
models = {}
for name in ['RNN', 'BiRNN', 'GRU', 'LSTM']:
    if name == 'RNN':
        model = SimpleRNN(VOCAB_SIZE, EMBEDDING_DIM, 128, OUTPUT_DIM, N_LAYERS, DROPOUT)
    elif name == 'BiRNN':
        model = BidirectionalRNN(VOCAB_SIZE, EMBEDDING_DIM, 128, OUTPUT_DIM, N_LAYERS, DROPOUT)
    elif name == 'GRU':
        model = GRUModel(VOCAB_SIZE, EMBEDDING_DIM, 128, OUTPUT_DIM, N_LAYERS, DROPOUT)
    else:
        model = LSTMModel(VOCAB_SIZE, EMBEDDING_DIM, 128, OUTPUT_DIM, N_LAYERS, DROPOUT)
    model.load_state_dict(torch.load(f'{name}_best.pth'))
    model.to(DEVICE)
    model.eval()
    models[name] = model

# Évaluation
results = {}
for name, model in models.items():
    all_preds = []
    all_labels = []
    bleu_scores = []
    with torch.no_grad():
        for inputs, labels_batch in test_loader:
            inputs, labels_batch = inputs.to(DEVICE), labels_batch.to(DEVICE)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels_batch.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)  # Fix : zero_division=0 (no warning)
    report = classification_report(all_labels, all_preds, output_dict=True, zero_division=0)  # Fix : zero_division=0
    
    # BLEU amélioré (map label prédit à texte "généré" simple)
    for i, pred in enumerate(all_preds):
        ref_tokens = X_test[i].split()
        # Map pred to "generated" tokens (ex. : label 0 = short text, 1=medium, 2=long)
        if pred == 0:
            candidate_tokens = ref_tokens[:2]  # Short for low relevance
        elif pred == 1:
            candidate_tokens = ref_tokens[:len(ref_tokens)//2]
        else:
            candidate_tokens = ref_tokens  # Full for high
        bleu = sentence_bleu([ref_tokens], candidate_tokens)
        bleu_scores.append(bleu)
    avg_bleu = np.mean(bleu_scores)
    
    results[name] = {'Accuracy': acc, 'F1': f1, 'BLEU': avg_bleu, 'Report': report}
    print(f"{name}: Acc={acc:.4f}, F1={f1:.4f}, BLEU={avg_bleu:.4f}")

# Tableau
summary_df = pd.DataFrame(results).T[['Accuracy', 'F1', 'BLEU']]
print(summary_df)

RNN: Acc=0.7500, F1=0.4286, BLEU=1.0000
BiRNN: Acc=0.5000, F1=0.3333, BLEU=0.8253
GRU: Acc=0.7500, F1=0.4286, BLEU=1.0000
LSTM: Acc=0.7500, F1=0.4286, BLEU=1.0000
      Accuracy        F1      BLEU
RNN       0.75  0.428571       1.0
BiRNN      0.5  0.333333  0.825299
GRU       0.75  0.428571       1.0
LSTM      0.75  0.428571       1.0


[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


# Part 2 Transformer (Text generation)

# QUESTION 1:Fine-tuning de GPT-2 sur dataset custom

In [91]:
# Fix CUDA
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''

# Installation minimale
!pip install transformers torch

import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from transformers.optimization import AdamW, get_linear_schedule_with_warmup
import numpy as np
import logging
import warnings
from torch.utils.data import Dataset, DataLoader
import os
import pandas as pd

# 1. Imports & device setup
logging.getLogger().setLevel(logging.CRITICAL)
warnings.filterwarnings('ignore')

device = 'cpu'  # Force CPU pour stabilité
print(f"Device: {device}")

# 2. Load tokenizer & model (arabe GPT-2 stable)
tokenizer = GPT2Tokenizer.from_pretrained('aubmindlab/aragpt2-base')
tokenizer.pad_token = tokenizer.eos_token

model = GPT2LMHeadModel.from_pretrained('aubmindlab/aragpt2-base')
model = model.to(device)

# 3. Helper: choose token from top-n probabilities
def choose_from_top(probs, n=5):
    ind = np.argpartition(probs, -n)[-n:]
    top_prob = probs[ind]
    top_prob = top_prob / np.sum(top_prob)
    choice = np.random.choice(n, 1, p=top_prob)
    token_id = ind[choice][0]
    return int(token_id)

# 4. Dataset: Arabic texts (prefix "TEXT:" + <|endoftext|>)
class ArabicTextsDataset(Dataset):
    def __init__(self, csv_path='arabic_texts_preprocessed.csv'):
        super().__init__()
        self.text_list = []
        self.end_of_text_token = tokenizer.encode("<|endoftext|>")[0]

        df = pd.read_csv(csv_path)
        for text in df['processed_text']:
            text_str = f"TEXT:{text}{tokenizer.decode([self.end_of_text_token])}"
            self.text_list.append(text_str)

    def __len__(self):
        return len(self.text_list)

    def __getitem__(self, item):
        return self.text_list[item]

# 5. DataLoader
dataset = ArabicTextsDataset()
text_loader = DataLoader(dataset, batch_size=1, shuffle=True)

# 6. Hyper-parameters
BATCH_SIZE = 4
EPOCHS = 5
LEARNING_RATE = 3e-5
WARMUP_STEPS = 100
MAX_SEQ_LEN = 128

# 7. Training loop
model = model.to(device)
model.train()
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
num_training_steps = len(text_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=WARMUP_STEPS, num_training_steps=num_training_steps)

proc_seq_count = 0
sum_loss = 0.0
batch_count = 0
tmp_texts_tens = None
models_folder = "trained_models"
if not os.path.exists(models_folder):
    os.mkdir(models_folder)

for epoch in range(EPOCHS):
    print(f"EPOCH {epoch} started" + "=" * 30)

    for idx, text in enumerate(text_loader):
        # Pack texts into single sequence up to MAX_SEQ_LEN
        text_tens = torch.tensor(tokenizer.encode(text[0])).unsqueeze(0).to(device)
        if text_tens.size()[1] > MAX_SEQ_LEN:
            continue

        if not torch.is_tensor(tmp_texts_tens):
            tmp_texts_tens = text_tens
            continue
        else:
            if tmp_texts_tens.size()[1] + text_tens.size()[1] > MAX_SEQ_LEN:
                work_texts_tens = tmp_texts_tens
                tmp_texts_tens = text_tens
            else:
                tmp_texts_tens = torch.cat([tmp_texts_tens, text_tens[:, 1:]], dim=1)
                continue

        # Forward / backward
        outputs = model(work_texts_tens, labels=work_texts_tens)
        loss, logits = outputs[:2]
        loss.backward()
        sum_loss += loss.detach().data

        proc_seq_count += 1
        if proc_seq_count == BATCH_SIZE:
            proc_seq_count = 0
            batch_count += 1
            optimizer.step()
            scheduler.step()
            optimizer.zero_grad()
            model.zero_grad()

        if batch_count == 10:
            print(f"Batch {batch_count}, sum loss {sum_loss}")
            batch_count = 0
            sum_loss = 0.0

    # Save checkpoint
    torch.save(model.state_dict(), os.path.join(models_folder, f"gpt2_arabic_{epoch}.pt"))

print("Fine-tuning terminé ! Checkpoints dans ./trained_models")

Device: cpu


vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/843 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/553M [00:00<?, ?B/s]

Fine-tuning terminé ! Checkpoints dans ./trained_models


# QUESTION2: Génération d'un nouveau paragraphe à partir d'une phrase donnée

In [93]:
# Fix CUDA
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import torch
torch.backends.cudnn.enabled = False

from transformers import GPT2Tokenizer, GPT2LMHeadModel

device = 'cpu'  # Force CPU

# Load checkpoint (epoch 4) avec même modèle que training
tokenizer = GPT2Tokenizer.from_pretrained('aubmindlab/aragpt2-base')
tokenizer.pad_token = tokenizer.eos_token

model = GPT2LMHeadModel.from_pretrained('aubmindlab/aragpt2-base')  # Fix : Même pretrained
model.load_state_dict(torch.load('./trained_models/gpt2_arabic_4.pt'))
model = model.to(device)
model.eval()

# Génération (fix attention mask)
prompt = "الذكاء الاصطناعي يغير"  # Phrase donnée
input_ids = torch.tensor(tokenizer.encode(prompt)).unsqueeze(0).to(device)
attention_mask = torch.ones(input_ids.shape, dtype=torch.long).to(device)  # Fix : Attention mask

with torch.no_grad():
    out_ids = model.generate(
        input_ids, 
        attention_mask=attention_mask,  # Fix warning
        max_length=100, 
        temperature=0.7, 
        do_sample=True, 
        pad_token_id=tokenizer.eos_token_id
    )
generated = tokenizer.decode(out_ids[0], skip_special_tokens=True)
print("Paragraphe généré :")
print(generated)

Paragraphe généré :
الذكاء الاصطناعي يغير من طريقة تفكيرهم وسلوكهم..........................................................................................
