# CS365 Project Phase 2: Arabic NLP Classification

## 1. Introduction

### Project Overview and Objectives
This project implements both traditional and modern approaches for Arabic text classification using the KalimatCorpus-2.0 dataset. The main objectives are:
- Implement traditional machine learning approaches (SVM, Naive Bayes with TF-IDF)
- Implement modern deep learning approaches (BiLSTM, AraBERT)
- Compare performance across different methods
- Analyze trade-offs between traditional and modern approaches

### Dataset Description
The chosen dataset is KalimatCorpus-2.0, which contains Arabic news articles from multiple categories:
- **Culture**: Cultural news and articles
- **Economy**: Economic and business news
- **International**: International news coverage
- **Local**: Local Omani news
- **Religion**: Religious content and discussions
- **Sports**: Sports news and coverage

### Background on Arabic NLP Challenges
Arabic NLP presents unique challenges including:
- Right-to-left script direction
- Complex morphology and root-based word formation
- Diacritics and various letter forms
- Dialectal variations and Modern Standard Arabic differences
- Limited preprocessing tools compared to English

## 2. Data Exploration and Preprocessing


### Load data
Downloading each article category in KalimatCorpus-2.0 from sourceforge.net
- Each category is in a folder, with all articles as .txt
- All articles are written words each in a line

In [2]:
import os
import multiprocessing.dummy as mp

In [3]:
kalimat_base = "data/KalimatCorpus-2.0"
expected_dirs = os.listdir(kalimat_base)

#### Checking for the existence of the directories

In [None]:
def check_kalimat_structure_os():
    missing = [d for d in expected_dirs if not os.path.isdir(os.path.join(kalimat_base, d))]
    
    if missing:
        print(f"❌ Missing folders: {missing}")
    else:
        count = 0
        for d in expected_dirs:
            folder_path = os.path.join(kalimat_base, d)
            count += len([f for f in os.listdir(folder_path)])
        print(f"✅ Kalimat Corpus is ready with {count} .txt files")

check_kalimat_structure_os()

#### `load_kalimat_articles`
We went through each folder `category` pass it to `load_kalimat_articles`. 
This function will read all the articles, append them to articles list with the following information:
- `category`: the name of the folder
- `filename`: the name of the file
- `text`: the content of the article
- `text_length`: the length of the article in characters
- `word_count`: the number of words in the article

In [5]:
def load_kalimat_articles(category):
    category_path = os.path.join(kalimat_base, category)
    if not os.path.isdir(category_path):
        print(f"❌ Category '{category}' does not exist in the Kalimat Corpus.")
        return []

    articles = []
    for filename in os.listdir(category_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(category_path, filename)
            with open(file_path, 'r', encoding='utf-8') as f:
                words = [line.strip() for line in f if line.strip()]
                text = " ".join(words)
                articles.append({
                    "category": category.replace("articles", "").upper(),
                    "filename": filename,
                    "text": text,
                    "text_length": len(text),
                    "word_count": len(words)
                })

    print(f"✅ Loaded {len(articles)} articles from category '{category}'")
    return articles

Parellalize the loading of articles using `multiprocessing` to speed up the process.

Then in `dataset` we flat the list of articles to one array instead of a list of lists.

In [6]:
def load_all_articles_parallel():
    with mp.Pool(processes=min(len(expected_dirs), int(os.cpu_count() / 2))) as pool:
        results = pool.map(load_kalimat_articles, expected_dirs)
    
    dataset = [article for category_articles in results for article in category_articles]
    return dataset

`if __name__ == "__main__"` is Important to solve an issue in Windows OS with `multiprocessing` module.

In [None]:
if __name__ == "__main__":
    dataset = load_all_articles_parallel()
    print(f"✅ Dataset loaded with {len(dataset)} articles.")

### Preprocessing
Say Hello to pandas!

We will start by constructing a DataFrame from the dataset list of dictionaries.

In [3]:
import pandas as pd
df = pd.DataFrame(dataset)
df.head(), df['category'].value_counts()

NameError: name 'dataset' is not defined

#### Setup necessary libraries
- `nltk` for text processing
- `nltk.corpus.stopwords` for stop words
- `regex`

Then downloading the stop words using `nltk.download()` function.
- `stopwords` is a list of common words that are not useful for text analysis (e.g. "في", "من", "إلى", "على", "و")


In [None]:
import nltk
from nltk.corpus import stopwords
import regex as re

nltk.download('stopwords')


Adding additional stopwords based on our analysis of the corpus

In [None]:
arabic_stopwords = set(stopwords.words('arabic'))
stemmer = nltk.stem.ISRIStemmer() # Worsen the results
for word in ['في', 'ان', 'ان', 'الى', 'او', 'فى']: arabic_stopwords.add(word)
print(f"Stop words count: {len(arabic_stopwords)}")

#### `preprocess_text`
We applied `preprocess_text` to clean and normalize the Arabic text before modeling.  
This function performs the following steps:
- **Remove** punctuation, digits (Arabic and English), and English letters.
- **Normalize** Arabic letters by unifying variants (e.g., "أ", "إ", "آ" → "ا").
- **Remove** Arabic diacritics and extra whitespace.
- **Tokenize** the text and **remove** Arabic stopwords.
- Finally, **join** the tokens back into a cleaned string.

In [None]:
def preprocess_text(text):

    text = re.sub(r'\p{P}+|\$', '', text)  # remove all punctuation (English + Arabic)
    text = re.sub(r'[0-9٠-٩]', '', text)  # remove Arabic and English digits
    text = re.sub(r'[a-zA-Z]', '', text)  # remove English letters
    text = re.sub(r'[اآإأ]', 'ا', text)  # replace Arabic letter with hamza with 'ا'
    text = re.sub(r'[\u064B-\u0652]', '', text)  # remove Arabic diacritics
    text = re.sub(r'\s+', ' ', text).strip()  # clean extra spaces

    tokens = text.split()
    tokens = [stemmer.stem(word) for word in tokens if word not in arabic_stopwords]


    return ' '.join(tokens)

preprocess_text("!مرحباً... هذا نَصٌّ تَجْرِيبِيٌ يحتوي على 123 أرقام ٤٥٦، في علامات ترقيم @#$%، كلمات إنجليزية like This.")

Now we can apply the `preprocess_text` function to the DataFrame creating a new column `processed_text`

In [12]:
df['processed_text'] = df['text'].apply(preprocess_text)

### Visualization

In [13]:
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from collections import Counter
import arabic_reshaper
from bidi.algorithm import get_display
%matplotlib inline

fm.fontManager.addfont('arial-unicode-ms.ttf')
arabic_font = fm.FontProperties(fname='arial-unicode-ms.ttf')

plt.rcParams['font.family'] = 'sans-serif'
plt.rcParams['font.sans-serif'] = arabic_font.get_name()

In [None]:
df['is_duplicate'] = df.duplicated(subset='processed_text', keep=False)

# Group and make sure both True/False appear
counts = df.groupby(['category', 'is_duplicate']).size().unstack().reindex(columns=[False, True], fill_value=0)
counts.columns = ['Unique', 'Duplicate']
print(counts)
# Plot
counts.plot(kind='bar', stacked=True, figsize=(10, 6), colormap='tab10')
plt.title("Stacked Bar of Unique vs Duplicate Articles per Category")
plt.xlabel("Category")
plt.ylabel("Article Count")
plt.xticks(rotation=45)
plt.grid(axis='y')
plt.tight_layout()
plt.show()


#### 1- Stacked Bar: Unique vs Duplicate Articles per Category
While most categories have a healthy distribution of unique articles, the `RELIGION` category has a significant number of duplicate articles. This could bias classification models if not properly handled.

In [15]:
df = df.drop_duplicates(subset=['processed_text'])

#### 2- Vocabulary Size
After preprocessing, the corpus contains ~235k unique words. Such size will impact the dimensionality of the feature extraction methods like TF-IDF.

In [None]:
vocab = set()
df['processed_text'].str.split().apply(vocab.update)
len(vocab)

#### 3- Top 20 Most Common Words (Bar Chart)
The most frequent words in the corpus. we can see "السلطنة" which make sense as the corpus if of Omani articles.

In [17]:
word_counts = Counter()
_ = df['processed_text'].str.split().apply(word_counts.update)


In [None]:
common_words = word_counts.most_common(20)

words, counts = zip(*common_words)
display_words = list(map(get_display, map(arabic_reshaper.reshape, words)))
plt.figure(figsize=(10, 6))
plt.bar(display_words, counts)
plt.title("Top 20 Most Common Words in the Corpus")
plt.ylabel('Frequency')
plt.xticks(rotation=45)
plt.grid(axis='y')
plt.tight_layout()
plt.show()

#### 4- Article Word Counts per Category (Box Plot)
The median article length is different across categories. `RELIGION` articles tend to be longer on average, while `SPORTS` articles are shorter.

In [None]:
plt.figure(figsize=(12,6))
df.boxplot(column='word_count', by='category', grid=False, rot=10)
plt.title('Article Words Counts per Category') 
plt.suptitle('')  # Remove the automatic "Boxplot grouped by" title
plt.xlabel('Category')
plt.ylabel('Number of Words')
plt.tight_layout()
plt.show()

#### 5- Top 20 Bigrams (Horizontal Bar Chart)
Common bigrams such as "السلطان قابوس","بن سعيد", and "محمد بن" appear frequently in the corpus, Capturing common word expressions in Omani journalism.

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(ngram_range=(2,2), max_features=20)
X = vectorizer.fit_transform(df['processed_text'])
bigrams = vectorizer.get_feature_names_out()

counts = X.sum(axis=0).A1
display_words = list(map(get_display, map(arabic_reshaper.reshape, bigrams)))

bigrams_counts = list(zip(display_words, counts))
bigrams_counts.sort(key=lambda x: x[1], reverse=True)
sorted_display_words, sorted_counts = zip(*bigrams_counts)
# Plot
plt.figure(figsize=(10,6))
plt.barh(sorted_display_words, sorted_counts)
plt.title('Top 20 Bigrams (Sorted)')
plt.xlabel('Frequency')
plt.tight_layout()
plt.show()


## 3. Phase 1: Traditional Approaches

### Task 1: Traditional Text Classification
Traditional machine learning approaches using feature extraction methods and classical algorithms.


In [21]:
from sklearn.feature_extraction.text import TfidfVectorizer
tfidf_vectorizer = TfidfVectorizer()
tfidf_matrix = tfidf_vectorizer.fit_transform(df['processed_text'])

We Tried every combination of Traditional methods `BoW` , `TfIdf` and `SVM` , `Naive Bayes` , `Random Forest` for classification.

In general `TfIdf` was better than `BoW` as for the algorithms of learning the best accuracy was from `SVM` with `TfIdf` . `Naive bayes` is the fastest with under 1 sec and very good accuracy. `Random Forest` has good accuracy but it took longer time .

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.svm import LinearSVC
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
import seaborn as sns
import numpy as np

X = tfidf_matrix
y = df['category']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

svm_classifier = LinearSVC(random_state=42, C=1.0)
nb_classifier = MultinomialNB(alpha=0.01)
svm_classifier.fit(X_train, y_train), nb_classifier.fit(X_train, y_train)

#### Results Summary
- `SVM` + `TfIdf` : Gave us the best f1-score average accuracy of 92%
- Mean Cross Validation Score: 90.60%
- `Naive Bayes` + `TfIdf` : Gave us the second best f1-score average accuracy of 88%
- Mean Cross Validation Score: 87.73%

In [23]:
def show_confusion_matrix(y_true, y_pred, labels=None, normalize=False, figsize=(6, 4), title="Confusion Matrix"):
    if labels is None:
        labels = sorted(list(set(y_true) | set(y_pred)))

    cm = confusion_matrix(y_true, y_pred, labels=labels)
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1, keepdims=True)
        fmt = ".2f"
    else:
        fmt = "d"

    plt.figure(figsize=figsize)
    sns.heatmap(cm, annot=True, fmt=fmt, cmap='Blues', xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title(title)
    plt.xticks(rotation=45, ha='right')  # Tilt x-axis labels
    plt.tight_layout()
    plt.show()


In [None]:
def model_evaluation(classifier, X, y):
    y_pred = classifier.predict(X)
    cv_scores = cross_val_score(classifier, X, y, cv=5, scoring='accuracy')

    print(f'\nCross-Validation Scores: {cv_scores}')
    print(f'Mean Cross-Validation Score: {np.mean(cv_scores):.4f}')

    print("\n Model Evaluation")
    accuracy = accuracy_score(y, y_pred)
    print(f"Accuracy: {accuracy}")

    print("\nClassification Report:")
    class_report = classification_report(y, y_pred, target_names=sorted(y.unique()))
    print(class_report)

    print("\nConfusion Matrix:")
    cm = show_confusion_matrix(y_test, y_pred, labels=sorted(y_test.unique()))

print("SVM Classifier Evaluation")
model_evaluation(svm_classifier, X_test, y_test)
print("\n" + "="*50 + "\n")
print("Naive Bayes Classifier Evaluation")
model_evaluation(nb_classifier, X_test, y_test)

#### Prediction function that uses the SVM + TfIdf model
- The funciton takes a string as input
- Preprocess the text
- Transform the text using the `tfidf_vectorizer`
- Predict the category using the `svm_model`
- Return the predicted category

In [25]:
def predict_category(text):
    tokenized_text = preprocess_text(text)
    X_new = tfidf_vectorizer.transform([tokenized_text])
    return svm_classifier.predict(X_new)[0]

### Task 2: Traditional Text Generation
N-gram language models for Arabic text generation.


In [None]:
print("CULTURE →", predict_category("أطلقت وزارة الثقافة برنامجًا وطنيًا يهدف إلى إحياء التراث الشعبي من خلال دعم الفنون التقليدية والمهرجانات المحلية التي تسلط الضوء على الهوية السعودية."))
print("ECONOMY →", predict_category("شهدت الأسواق المالية ارتفاعًا ملحوظًا في قيمة الأسهم السعودية بعد إعلان الحكومة عن خطة تنموية جديدة تركز على التنوع الاقتصادي وتقليل الاعتماد على النفط."))
print("INTERNATIONAL →", predict_category("عقدت القمة الخليجية الأوروبية اجتماعها السنوي في بروكسل لمناقشة التحديات العالمية مثل الأمن الغذائي والتغير المناخي وتعزيز التعاون بين الشرق والغرب."))
print("LOCAL →", predict_category("بدأت أمانة المدينة بتنفيذ مشروع توسعة الطرق الداخلية بهدف تخفيف الازدحام المروري، كما تم الإعلان عن إنشاء ممرات مشاة ومواقف ذكية."))
print("RELIGION →", predict_category("حثّ إمام المسجد خلال خطبة الجمعة على التمسك بالقيم الإسلامية ونشر التسامح بين أفراد المجتمع، مشيرًا إلى أهمية الصدق والأمانة في التعاملات اليومية."))
print("SPORTS →", predict_category("تمكن المنتخب الوطني من الفوز على نظيره الإيراني في مباراة مثيرة انتهت بنتيجة ٣-٢، ليضمن التأهل إلى نهائي كأس آسيا وسط فرحة جماهيرية عارمة."))

### 🔍 Observations On The Predictions
* The model correctly predicted **5 out of 6** categories.
* The **RELIGION** article was misclassified as **LOCAL**, likely due to **data imbalance** or **semantic overlap** in community-related language.
* Overall, the model demonstrates **strong accuracy**, with minor limitations in underrepresented categories.

Let me know if you want this phrased formally for a report or presentation.


### Task 2: Text Generation
We needed to use different preprocessing step to get rid of stemming and other steps that will ruin the generation of text.

In [None]:
def preprocess_arabic_text(text):
    # Remove non-Arabic characters and normalize whitespace
    text = re.sub(r'[^\u0600-\u06FF\s]', ' ', text) # keep only Arabic characters
    text = re.sub(r'[\u064B-\u0652]', '', text)  # remove Arabic diacritics
    text = re.sub(r'\s+', ' ', text).strip() # normalize whitespace
    text = re.sub(r'[اآإأ]', 'ا', text)  # replace Arabic letter with hamza with 'ا'

    return text
preprocess_arabic_text("!مرحباً... هذا نَصٌّ تَجْرِيبِيٌ يحتوي على 123 أرقام ٤٥٦، في علامات ترقيم @#$%، كلمات إنجليزية like This.")

In [28]:
from collections import defaultdict

# Function to build n-gram model
def build_ngram_model(texts, n):
    model = defaultdict(list)
    all_words = []
    
    for text in texts:
        # Preprocess the text
        text = preprocess_arabic_text(text)
        # Split into words
        words = text.split()
        all_words.extend(words)
        
        # Build n-grams
        for i in range(len(words) - n + 1):
            # Use tuple of n-1 words as key
            prefix = tuple(words[i:i+n-1])
            # Use the nth word as value
            suffix = words[i+n-1]
            model[prefix].append(suffix)
    
    return model, list(set(all_words))

In [29]:
import random
# Function to generate text with a random start word
def generate_arabic_text(model, all_words, length=100, n=5):
    # Choose a random start word
    start_word = random.choice(all_words)
    
    # Find a valid prefix that contains the start word
    valid_prefixes = [prefix for prefix in model.keys() if start_word in prefix]
    
    # If no valid prefix contains the start word, just use any prefix
    if valid_prefixes:
        current = random.choice(valid_prefixes)
    else:
    # Fall back to any random prefix
        current = random.choice(list(model.keys()))
        start_word = current[0] if len(current) > 0 else start_word  # Update start word to match what we're using

    result = list(current)
    
    # Generate text
    for _ in range(length):
        if current in model:
            # Choose a random next word based on the current n-1 words
            next_word = random.choice(model[current])
            result.append(next_word)
            # Update current context (sliding window)
            current = tuple(result[-(n-1):])
        else:
            # if we reach a dead end, choose a new random prefix
            current = random.choice(list(model.keys()))
            result.extend(current)
    
    return start_word, ' '.join(result)

In [30]:
import time

def build_eval_ngram_model(n, texts):
    model, all_words = build_ngram_model(texts, n)
    print(f"\n✅ N-gram model built with n={n} ({len(model)} prefixes)\n")

    for i in range(3):
        start_word, generated_text = generate_arabic_text(model, all_words, length=30, n=n)
        print(f"🔹 Sample {i+1} (start: '{start_word}'):")
        for word in generated_text.split(): 
            print(word, end=' ', flush=True)
            time.sleep(0.05)
        print('')  # new line after each sample


#### Testing the model from n=1 to n=5

In [None]:
for i in range(5):
    build_eval_ngram_model(i+1, df['text'])

# 4. Phase 2: Modern Approaches

## Task 1: Deep Learning Text Classification
Modern neural network approaches including BiLSTM and Transformer-based models.

In [387]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
import numpy as np

1- **Tokenization**

- For the BiLSTM model

In [None]:
from collections import Counter

tokenized_text = [text.split() for text in df['processed_text']]

# Count word frequencies
word_counts = Counter(word for article in tokenized_text for word in article)

# Vocabulary
vocab = {word: idx + 2 for idx, (word, count) in enumerate(word_counts.items())}
vocab['<PAD>'] = 0  # Padding token
vocab['<UNK>'] = 1  # Unknown token

# Reverse vocabulary for decoding
reverse_vocab = {idx: word for word, idx in vocab.items()}

# Stats
print(f"Most common words: {word_counts.most_common(10)}")
print(f"Vocabulary size: {len(vocab)}")

- For the Tranformer model

In [389]:
from transformers import AutoTokenizer
from arabert.preprocess import ArabertPreprocessor

model_name = "aubmindlab/bert-base-arabertv02"
arabert_prep = ArabertPreprocessor(model_name=model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

1.1 **Tokenizer Encoder and Decoder**

- For the BiLSTM model

In [None]:
def encode_text(article, vocab, max_len):
    tokens = [vocab.get(word, vocab['<UNK>']) for word in article]
    chunks = []

    # Split the tokens into chunks of max_len
    for i in range(0, len(tokens), max_len):
        chunk = tokens[i:i + max_len]
        if len(chunk) < max_len:
            chunk += [vocab['<PAD>']] * (max_len - len(chunk))
        chunks.append(chunk)
        # break # Act as the normal encode function, not chunking

    return chunks

def decode_text(encoded_article, reverse_vocab):
    return ' '.join(reverse_vocab.get(idx, '<UNK>') for idx in encoded_article if idx not in (0, 1))  # Skip PAD and UNK tokens

# Test encoding and decoding
sample_article = tokenized_text[0]
print(f"Sample article encoded and decoded safely: {' '.join([decode_text(chunk, reverse_vocab) for chunk in encode_text(sample_article, vocab, 100)]) == ' '.join(sample_article)}")

- For the Tranformer model

In [391]:
def encode_text_transformer(text, max_len=128):
    assert max_len < 512, "Max length for BERT should be less than 512 tokens."
    
    tokens = tokenizer.encode(text, add_special_tokens=False)

    chunks = []
    for i in range(0, len(tokens), max_len - 2):
        chunk = tokens[i:i + (max_len - 2)]

        chunk = [2] + chunk + [3]  # Add [CLS] and [SEP] tokens

        padding_length = max_len - len(chunk)
        chunk += [0] * padding_length  # Pad with zeros
        chunks.append(chunk)
    
    return chunks

def decode_text_transformer(encoded_article):
    decoded = tokenizer.decode(encoded_article, skip_special_tokens=True)
    return decoded.replace('  ', ' ').strip()  # Clean up double spaces

In [None]:
print(tokenizer.encode("السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم", max_length=24, truncation=True, padding='max_length'))
print(encode_text_transformer("السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم", max_len=12))
print(decode_text_transformer(encode_text_transformer("السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم")[0]) == "السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم السلام عليكم")
# There will be less padding in the encode_text_transformer because chunking adds CLS and SEP

1.2 **Label Encoding**

In [None]:
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(df['category'])

# Label mapping
label_mapping = {idx: label for idx, label in enumerate(label_encoder.classes_)}
label2id = {label: idx for idx, label in enumerate(label_encoder.classes_)}
decode_labels = lambda idx: label_encoder.inverse_transform(idx)
print(f"Label mapping: {label_mapping}")

2- **Dataset Preparation**

2.1- **Dataset Class**

In [409]:
class BiLSTMTextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = torch.tensor(texts, dtype=torch.long)
        self.labels = torch.tensor(labels, dtype=torch.long)

        self.lengths = (self.texts != vocab['<PAD>']).sum(dim=1)  # Calculate lengths for each text

    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.lengths[idx], self.labels[idx]

- For the Tranformer model

In [410]:
class TransformerTextDataset(Dataset):
    def __init__(self, tokenized_text, labels, tokenizer, max_len, split = 'train'):
        self.tokenized_text = torch.tensor(tokenized_text, dtype=torch.long)
        self.attention_mask = self.tokenized_text != 0
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.split = split

    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        input_ids = self.tokenized_text[idx]
        # if self.split == 'test':
        #     input_ids = decode_text_transformer(input_ids)
            
        return {
            'input_ids': input_ids,
            'attention_mask': self.attention_mask[idx],
            'labels': self.labels[idx]
        }

3- **Model Architecture**

3.1- **LSTM Model**: Bi-directional LSTM

In [411]:
class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, num_classes, pad_idx):
        super(BiLSTMClassifier, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)

        # Dropout layer
        self.dropout1 = nn.Dropout(0.3)
        
        # BiLSTM layer
        self.lstm1 = nn.LSTM(embedding_dim, hidden_dim, num_layers=max(2, num_layers//2), dropout=0.4, bidirectional=True, batch_first=True)

        # normalization layer
        self.norm1 = nn.LayerNorm(hidden_dim * 2)

        # fc layer
        self.fc1 = nn.Linear(hidden_dim*2, hidden_dim)

        # Second BiLSTM layer
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, num_layers=max(2, num_layers//2), dropout=0.4, bidirectional=True, batch_first=True)

        # normalization layer
        self.norm2 = nn.LayerNorm(hidden_dim * 2)

        # Dropout layer
        self.dropout2 = nn.Dropout(0.5)

        # Fully connected layer
        self.fc2 = nn.Linear(hidden_dim * 2, num_classes)
    
    def forward(self, x, lengths):
        x = self.embedding(x)  # (batch_size, seq_len, embedding_dim)
        x = self.dropout1(x)

        # Pack the sequence for LSTM
        packed1 = nn.utils.rnn.pack_padded_sequence(
            x, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out1, _ = self.lstm1(packed1)  # (batch_size, seq_len, hidden_dim * 2)
        lstm1_out, _ = nn.utils.rnn.pad_packed_sequence(
            packed_out1, batch_first=True)

        lstm1_out = self.norm1(lstm1_out) # (batch_size, seq_len, hidden_dim * 2)
        fc1_out = self.fc1(lstm1_out) # (batch_size, seq_len, hidden_dim)

        # Pack the sequence for the second LSTM
        packed2 = nn.utils.rnn.pack_padded_sequence(
            fc1_out, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out2, (hidden, _) = self.lstm2(packed2)
        lstm2_out, _ = nn.utils.rnn.pad_packed_sequence(
            packed_out2, batch_first=True)
        
        lstm2_out = self.norm2(lstm2_out + lstm1_out) # Residual connection

        # Use the final forward and backward hidden states
        out = torch.cat((hidden[-2], hidden[-1]), dim=1)  # (batch_size, hidden_dim * 2)
        out = self.dropout2(out)
        return self.fc2(out)  # (batch_size, num_classes)

3.2- **Transformer Based Model**: AraBERT Model

In [412]:
from transformers import AutoModelForSequenceClassification

# The model can be used like this:
# model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(label_mapping), id2label=label_mapping, label2id=label2id)

4- **Training the Model**

In [413]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

- For the BiLSTM model

In [434]:
eval_interval = 100  # Evaluate every 100 batches
eval_iters = 10 # Number of iterations for evaluation

max_len = 500  # Maximum length of sequences
batch_size = 256 # Batch size for training
bilstm_num_epochs = 30  # Number of epochs for BiLSTM training
lr = 2e-3  # Learning rate

embedding_dim = 300  # Dimension of word embeddings
num_layers = 6  # Number of LSTM layers
hidden_dim = 256  # Hidden dimension for LSTM

- For the Tranformer model

In [435]:
output_dir = './results'
evaluation_strategy = 'epoch' # # Evaluate at the end of each epoch
save_strategy = 'epoch'  # Save model at the end of each epoch
tf_learning_rate = 4e-5 # 1e-4
per_device_train_batch_size = 256
per_device_eval_batch_size = 32
gradient_accumulation_steps = 2
num_train_epochs = 8
weight_decay = 0.01
logging_dir = "./logs"
load_best_model_at_end = True

4.1- **Data Loaders**

- For the BiLSTM model

In [None]:
chunked_texts_BiLSTM = []
chunked_labels_BiLSTM = []


for article, label in zip(tokenized_text, encoded_labels):
    chunksBiLSTM = encode_text(article, vocab, max_len)

    chunked_texts_BiLSTM.extend(chunksBiLSTM)
    chunked_labels_BiLSTM.extend([label] * len(chunksBiLSTM))

X_train, X_devtest, y_train, y_devtest = train_test_split(
    chunked_texts_BiLSTM, chunked_labels_BiLSTM, test_size=0.2, stratify=chunked_labels_BiLSTM, random_state=42
)
X_dev, X_test, y_dev, y_test = train_test_split(
    X_devtest, y_devtest, test_size=0.5, stratify=y_devtest, random_state=42
)

biLSTMtrain_dataset = BiLSTMTextDataset(X_train, y_train)
biLSTMdev_dataset = BiLSTMTextDataset(X_dev, y_dev)
biLSTMtest_dataset = BiLSTMTextDataset(X_test, y_test)

bitrain_loader = DataLoader(biLSTMtrain_dataset, batch_size=batch_size, shuffle=True)
bidev_loader = DataLoader(biLSTMdev_dataset, batch_size=batch_size, shuffle=False)
bitest_loader = DataLoader(biLSTMtest_dataset, batch_size=batch_size, shuffle=False)

# stats
print(f"BiLSTM Train dataset size: {len(biLSTMtrain_dataset)}")
print(f"BiLSTM Dev dataset size: {len(biLSTMdev_dataset)}")
print(f"BiLSTM Test dataset size: {len(biLSTMtest_dataset)}")

- For the Tranformer model

In [None]:
# arabert preprocessing
df['arabert_text'] = df['text'].apply(arabert_prep.preprocess)

In [None]:
chunked_texts_Transformer = []
chunked_labels_Transformer = []

for article, label in zip(df['arabert_text'].tolist(), encoded_labels):
    chucksTransformer = encode_text_transformer(article, max_len=max_len)

    chunked_texts_Transformer.extend(chucksTransformer)
    chunked_labels_Transformer.extend([label] * len(chucksTransformer))

X_train, X_devtest, y_train, y_devtest = train_test_split(
    chunked_texts_Transformer, chunked_labels_Transformer, test_size=0.2, stratify=chunked_labels_Transformer, random_state=42
)
X_dev, X_test, y_dev, y_test = train_test_split(
    X_devtest, y_devtest, test_size=0.5, stratify=y_devtest, random_state=42
)

tFtrain_dataset = TransformerTextDataset(X_train, y_train, tokenizer, max_len)
tFdev_dataset = TransformerTextDataset(X_dev, y_dev, tokenizer, max_len)
tFtest_dataset = TransformerTextDataset(X_test, y_test, tokenizer, max_len, split='test')

tFtrain_loader = DataLoader(tFtrain_dataset, batch_size=per_device_train_batch_size, shuffle=True)
tFdev_loader = DataLoader(tFdev_dataset, batch_size=per_device_train_batch_size, shuffle=False)
tFtest_loader = DataLoader(tFtest_dataset, batch_size=per_device_eval_batch_size, shuffle=False)

# stats
print(f"Transformer Train dataset size: {len(tFtrain_dataset)}")
print(f"Transformer Dev dataset size: {len(tFdev_dataset)}")
print(f"Transformer Test dataset size: {len(tFtest_dataset)}")

4.1- **Evaluation Function**: To evaluate the model on the test set

In [438]:
def evaluate_model(model, test_loader, device, transformer=False):
    all_preds = []
    all_labels = []
    model.eval()

    cnt = 0
    with torch.no_grad():
        for batch in test_loader: # x: (batch_size, seq_len), y: (batch_size,)

            if transformer:
                print(f"{cnt}/{len(test_loader)}", end='\r')
                x = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                y = batch['labels'].to(device)
                outputs = model(x, attention_mask=attention_mask)  # outputs: (batch_size, num_classes)
                outputs = outputs.logits
            else:
                x = batch[0].to(device)
                lengths = batch[1].to(device)
                y = batch[2].to(device)
                outputs = model(x, lengths)  # outputs: (batch_size, num_classes)

            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            cnt += 1
            all_preds.extend(preds)
            all_labels.extend(y.cpu().numpy())

    return all_preds, all_labels

4.2- **Model Training**: train_model function to train given a model, loader, criterion, and optimizer

In [439]:
def train_model(model, train_loader, criterion, optimizer, scheduler, device):
    model.train()
    step = 0

    running_acc = 0.0
    running_loss = 0.0

    for x, lengths, y in train_loader: # (x: (batch_size, seq_len), y: (batch_size,))
        x, lengths, y = x.to(device), lengths.to(device), y.to(device)
        # Forward pass:
        optimizer.zero_grad()
        with torch.autocast(device_type='cuda', dtype=torch.float16):
            outputs = model(x, lengths)
            loss = criterion(outputs, y)
        
        # Backward pass and optimization
        loss.backward()
        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        # Update running loss & accuracy
        _, preds = torch.max(outputs, dim=1)
        acc = (preds == y).float().mean().item()
        running_acc = running_acc * 0.90 + acc * 0.10 if running_acc > 0 else acc
        running_loss = running_loss * 0.90 + loss.item() * 0.10 if running_loss > 0 else loss.item()
        step += 1

        print(f"step {step:4d} | loss: {running_loss:.6f} | acc: {running_acc:.6f}", end='\r')


4.3- **Training Setup**

- For the BiLSTM model

In [440]:
from torch.optim.lr_scheduler import LambdaLR
import math

# Learning rate scheduler

warmup_steps = 60
total_steps = len(bitrain_loader) * bilstm_num_epochs
def lr_lambda(current_step):
    if current_step < warmup_steps:
        return float(current_step) / float(max(1, warmup_steps))
    progress = float(current_step - warmup_steps) / float(max(1, total_steps - warmup_steps))
    return 0.01 + 0.99 * 0.5 * (1.0 + math.cos(math.pi * progress))

In [441]:
modelBiLSTM = BiLSTMClassifier(
    vocab_size=len(vocab),
    embedding_dim=embedding_dim,  # Embedding dimension
    hidden_dim=hidden_dim,  # Hidden dimension for LSTM
    num_layers=num_layers,  # Number of LSTM layers
    num_classes=len(label_mapping),  # Number of classes
    pad_idx=vocab['<PAD>']  # Padding index
).to(device)

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(modelBiLSTM.parameters(), lr=lr, weight_decay=1e-5)
scheduler = LambdaLR(optimizer, lr_lambda=lr_lambda)

- For the Tranformer model

In [None]:
modelTF = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=len(label_mapping),
    id2label=label_mapping,
    label2id=label2id
).to(device)

- For the BiLSTM model

In [442]:
# Use high precision for float32 matrix multiplication to improve performance
torch.set_float32_matmul_precision('high')

modelBiLSTM = torch.compile(modelBiLSTM)

In [None]:
for epoch in range(bilstm_num_epochs):
    print(f"Epoch {epoch + 1}:")
    train_model(modelBiLSTM, bitrain_loader, criterion, optimizer, scheduler, device)
    y_pred, y_true = evaluate_model(modelBiLSTM, bidev_loader, device)
    acc = (np.array(y_pred) == np.array(y_true)).mean()
    print(f"validation accuracy: {acc:.4f}, f1: {f1_score(y_true, y_pred, average='weighted'):.4f}")
    for param_group in optimizer.param_groups:
        print(f"Updated learning rate: {param_group['lr']:.6f}")

In [445]:
# Save the model
torch.save(modelBiLSTM.state_dict(), "bilstm_best_model.pth")

- For the Tranformer model

In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir=output_dir,
    eval_strategy=evaluation_strategy,
    save_strategy=save_strategy,
    learning_rate=tf_learning_rate,
    bf16=True,  # instead of fp16
    fp16=False,
    per_device_eval_batch_size=per_device_eval_batch_size,
    per_device_train_batch_size=per_device_train_batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    num_train_epochs=num_train_epochs,
    weight_decay=weight_decay,
    logging_dir=logging_dir,
    load_best_model_at_end=load_best_model_at_end
    # torch_compile=True
)

trainer = Trainer(
    model=modelTF,
    args=training_args,
    train_dataset=tFtrain_dataset,
    eval_dataset=tFdev_dataset,
    compute_metrics=lambda p: {
        'accuracy': (np.argmax(p.predictions, axis=1) == p.label_ids).mean(),
        'f1': f1_score(p.label_ids, np.argmax(p.predictions, axis=1), average='weighted')
    }
)

trainer.train()

In [None]:
# Clean up memory
import gc
import torch

del modelBiLSTM
del modelTF
del trainer
gc.collect()
torch.cuda.empty_cache()

5- **Evaluation And Inference**

- For the BiLSTM model

In [None]:
print("\nModel Evaluation (On Test Set):")
y_pred, y_true = evaluate_model(modelBiLSTM, bitest_loader, device)
acc = (np.array(y_pred) == np.array(y_true)).mean()
print(f"Accuracy: {acc:.4f}")

print("\nClassification Report:")
class_report = classification_report(y_true, y_pred, target_names=label_encoder.classes_)
print(class_report)

print("\nConfusion Matrix:")
show_confusion_matrix(decode_labels(y_true), decode_labels(y_pred), labels=sorted(set(decode_labels(y_true)) | set(decode_labels(y_pred))))

In [None]:
def lstm_predict(text, model, vocab, max_len=500):
    # Preprocess the text
    processed_text = preprocess_text(text)
    tokens = processed_text.split()
    
    # Encode the text
    encoded = [vocab.get(word, vocab['<UNK>']) for word in tokens]
    
    # Pad or truncate to max_len
    if len(encoded) < max_len:
        encoded += [vocab['<PAD>']] * (max_len - len(encoded))
    else:
        encoded = encoded[:max_len]
    
    # Convert to tensor
    input_tensor = torch.tensor([encoded], dtype=torch.long).to(device)
    length_tensor = torch.tensor([len(tokens)], dtype=torch.long).to(device)
    
    # Make prediction
    model.eval()
    with torch.no_grad():
        outputs = model(input_tensor, length_tensor)
        prediction = torch.argmax(outputs, dim=1).cpu().numpy()[0]
    
    return label_encoder.inverse_transform([prediction])[0]

- For the Tranformer model

In [78]:
# load checkpoint
checkpoint_path = "results/checkpoint-288-best"
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path).to(device)

In [None]:
print("\nModel Evaluation (On Test Set):")
y_pred, y_true = evaluate_model(model, tFtest_loader, device, transformer=True)
acc = (np.array(y_pred) == np.array(y_true)).mean()
print(f"Accuracy: {acc:.4f}")

print("\nClassification Report:")
class_report = classification_report(y_true, y_pred, target_names=label_encoder.classes_)
print(class_report)

print("\nConfusion Matrix:")
show_confusion_matrix(decode_labels(y_true), decode_labels(y_pred), labels=sorted(set(decode_labels(y_true)) | set(decode_labels(y_pred))))

In [None]:
def arabert_predict(text, model, tokenizer, max_len=500):
    # Preprocess the text using AraBERT preprocessing
    processed_text = arabert_prep.preprocess(text)
    
    # Tokenize and encode
    tokens = tokenizer.encode(processed_text, add_special_tokens=True, 
                             max_length=max_len, truncation=True, padding='max_length')
    attention_mask = [1 if token != 0 else 0 for token in tokens]
    
    # Convert to tensors
    input_ids = torch.tensor([tokens], dtype=torch.long).to(device)
    attention_mask_tensor = torch.tensor([attention_mask], dtype=torch.long).to(device)
    
    # Make prediction
    model.eval()
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask_tensor)
        prediction = torch.argmax(outputs.logits, dim=1).cpu().numpy()[0]
    
    return label_encoder.inverse_transform([prediction])[0]

### 5. Comparative Analysis

### 6. Conclusion

### 7. Bonus: Interactive Demo

### Interactive Text Classification Demo
Here we demonstrate all our trained models by testing them on sample Arabic texts from different categories. Each model uses its own preprocessing and prediction pipeline.


### Model Comparison on Sample Texts


In [None]:
# Test samples for each category
test_samples = [
    ("أطلقت وزارة الثقافة برنامجًا وطنيًا يهدف إلى إحياء التراث الشعبي من خلال دعم الفنون التقليدية والمهرجانات المحلية التي تسلط الضوء على الهوية السعودية.", "CULTURE"),
    ("شهدت الأسواق المالية ارتفاعًا ملحوظًا في قيمة الأسهم السعودية بعد إعلان الحكومة عن خطة تنموية جديدة تركز على التنوع الاقتصادي وتقليل الاعتماد على النفط.", "ECONOMY"),
    ("عقدت القمة الخليجية الأوروبية اجتماعها السنوي في بروكسل لمناقشة التحديات العالمية مثل الأمن الغذائي والتغير المناخي وتعزيز التعاون بين الشرق والغرب.", "INTERNATIONAL"),
    ("بدأت أمانة المدينة بتنفيذ مشروع توسعة الطرق الداخلية بهدف تخفيف الازدحام المروري، كما تم الإعلان عن إنشاء ممرات مشاة ومواقف ذكية.", "LOCAL"),
    ("حثّ إمام المسجد خلال خطبة الجمعة على التمسك بالقيم الإسلامية ونشر التسامح بين أفراد المجتمع، مشيرًا إلى أهمية الصدق والأمانة في التعاملات اليومية.", "RELIGION"),
    ("تمكن المنتخب الوطني من الفوز على نظيره الإيراني في مباراة مثيرة انتهت بنتيجة ٣-٢، ليضمن التأهل إلى نهائي كأس آسيا وسط فرحة جماهيرية عارمة.", "SPORTS")
]

print("🔍 INTERACTIVE MODEL COMPARISON DEMO")
print("=" * 80)

for i, (text, expected) in enumerate(test_samples, 1):
    print(f"\n📝 Sample {i} (Expected: {expected}):")
    print(f"Text: {text[:100]}...")
    print("\n🔮 Predictions:")
    
    # Traditional method prediction
    traditional_pred = predict_category(text)
    print(f"   Traditional (SVM): {traditional_pred} {'✅' if traditional_pred == expected else '❌'}")
    
    # BiLSTM prediction
    bilstm_pred = lstm_predict(text, modelBiLSTM, vocab)
    print(f"   BiLSTM: {bilstm_pred} {'✅' if bilstm_pred == expected else '❌'}")
    
    # AraBERT prediction  
    arabert_pred = arabert_predict(text, model, tokenizer)
    print(f"   AraBERT: {arabert_pred} {'✅' if arabert_pred == expected else '❌'}")
    
    print("-" * 60)


### Task 2: Arabic Text Summarization

### 2.2 Traditional Approach: Seq2Seq with LSTM

In [None]:
# Load and preprocess Arabic summarization dataset
df_sum = pd.read_excel("Text summarization dataset.xlsx")
df_sum = df_sum.iloc[1:].reset_index(drop=True)
df_sum.columns = ['summary', 'text']
df_sum = df_sum.dropna(subset=['summary', 'text'])
df_sum.head()
df_sum.shape


In [None]:
def normalize_arabic(text):
    if pd.isna(text) or not isinstance(text, str):
        return ""
    text = re.sub(r'[إأآا]', 'ا', text)
    text = re.sub(r'ى', 'ي', text)
    text = re.sub(r'ؤ', 'و', text)
    text = re.sub(r'ئ', 'ي', text)
    text = re.sub(r'ة', 'ه', text)
    text = re.sub(r'[\u064B-\u0652]', '', text)  # remove Arabic diacritics
    text = re.sub(r'[^\u0600-\u06FF\s]', '', text)
    text = re.sub(r'\d+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Apply normalization
df_sum['text'] = df_sum['text'].apply(normalize_arabic)
df_sum['summary'] = df_sum['summary'].apply(normalize_arabic)
print(f"After preprocessing: {df_sum.shape}")


In [15]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Prepare sequences
def prepare_sequences(df_sum, max_text_len=128, max_summary_len=32):
    texts = df_sum['text']
    summaries = df_sum['summary']
    all_texts = list(texts) + list(summaries)
    tokenizer = Tokenizer(filters='', oov_token=None)
    tokenizer.fit_on_texts(all_texts)
    
    # Add special tokens
    tokenizer.word_index['<sos>'] = len(tokenizer.word_index) + 1
    tokenizer.word_index['<eos>'] = len(tokenizer.word_index) + 1
    
    # Convert texts to sequences
    text_sequences = tokenizer.texts_to_sequences(texts)
    summary_sequences = tokenizer.texts_to_sequences(summaries)
    
    # Add <sos> and <eos> to summaries
    summary_sequences = [[tokenizer.word_index['<sos>']] + seq + [tokenizer.word_index['<eos>']] 
                        for seq in summary_sequences]
    
    # Pad sequences
    text_padded = pad_sequences(text_sequences, maxlen=max_text_len, padding='post')
    summary_padded = pad_sequences(summary_sequences, maxlen=max_summary_len, padding='post')
    
    return tokenizer, text_padded, summary_padded


In [None]:
class SummarizationDataset(Dataset):
    def __init__(self, texts, summaries):
        self.texts = texts          # خله list او numpy
        self.summaries = summaries

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return (
            torch.tensor(self.texts[idx], dtype=torch.long),
            torch.tensor(self.summaries[idx], dtype=torch.long),
        )

#### Traditional Seq2Seq Model Architecture


In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, (hidden, cell)

class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
    def forward(self, x, hidden, cell):
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output)
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.encoder = Encoder(vocab_size, embedding_dim, hidden_dim)
        self.decoder = Decoder(vocab_size, embedding_dim, hidden_dim)
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(device)
        _, (hidden, cell) = self.encoder(src)
        input = trg[:, 0].unsqueeze(1)
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output.squeeze(1)
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(2)
            input = trg[:, t].unsqueeze(1) if teacher_force else top1
        return outputs

Evaluation funciton

In [7]:
from rouge_score import rouge_scorer

def evaluate_seq2seq_rouge(model, loader, tokenizer, device, max_len=128):
    model.eval()
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=False)
    totals = {'rouge1': 0.0, 'rouge2': 0.0, 'rougeL': 0.0}
    n = 0

    with torch.no_grad():
        for src, trg in loader:                       # src, trg: (B, T)
            src, trg = src.to(device), trg.to(device)

            # توليد الملخص (من دون teacher forcing)
            out = model(src, trg[:, :1], teacher_forcing_ratio=0.)   # (B, T, V)
            gen_ids = out.argmax(-1)                                # (B, T)

            # IDs → نص
            preds = tokenizer.batch_decode(gen_ids, skip_special_tokens=True)
            refs  = tokenizer.batch_decode(trg,    skip_special_tokens=True)

            # ROUGE لكل مثال
            for p, r in zip(preds, refs):
                s = scorer.score(r, p)
                for k in totals: totals[k] += s[k].fmeasure
                n += 1

    return {k: v / n for k, v in totals.items()}


training lop

In [1]:
def train_seq2seq_model(model, train_loader, optimizer, criterion, scaler, epoch):
    model.train()
    step = 0

    running_loss = 0.0
    
    for src, trg in train_loader:
        src, trg = src.to(device), trg.to(device)
        
        optimizer.zero_grad()
        with torch.autocast(device_type='cuda', dtype=torch.float16):
            output = model(src, trg)
            # Reshape output and target for loss calculation
            output = output[:, 1:].reshape(-1, output.shape[-1])
            trg = trg[:, 1:].reshape(-1)
            
            loss = criterion(output, trg)
        
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
        
        running_loss = running_loss * 0.90 + loss.item() * 0.10 if running_loss > 0 else loss.item()
        step += 1

        print(f"step {step:4d} | loss: {running_loss:.6f}", end='\r')

In [2]:
# Initialize model
seq2seq_max_text_len = 128
seq2seq_max_summary_len = 32
seq2seq_batch_size = 128
seq2seq_epochs = 10
seq2seq_learning_rate = 1e-4
seq2seq_embedding_dim = 256
seq2seq_hidden_dim = 512

In [None]:
tokenizer, text_padded, summary_padded = prepare_sequences(df_sum, max_text_len=seq2seq_max_text_len, max_summary_len=seq2seq_max_summary_len)
vocab_size = len(tokenizer.word_index) + 1
print(f"Vocabulary size: {vocab_size}")

In [None]:
X_train, X_devtest, y_train, y_devtest = train_test_split(
    text_padded, summary_padded, test_size=0.2, random_state=42
)

X_dev, X_test, y_dev, y_test = train_test_split(
    X_devtest, y_devtest, test_size=0.5, random_state=42
)

seq2seq_train_dataset = SummarizationDataset(X_train, y_train)
seq2seq_dev_dataset = SummarizationDataset(X_dev, y_dev)
seq2seq_test_dataset = SummarizationDataset(X_test, y_test)

seq2seq_train_loader = DataLoader(seq2seq_train_dataset, batch_size=seq2seq_batch_size, shuffle=False)
seq2seq_dev_loader = DataLoader(seq2seq_dev_dataset, batch_size=seq2seq_batch_size, shuffle=False)
seq2seq_test_loader = DataLoader(seq2seq_test_dataset, batch_size=seq2seq_batch_size, shuffle=False)

# stats
print(f"Seq2Seq Train dataset size: {len(seq2seq_train_dataset)}")
print(f"Seq2Seq Dev dataset size: {len(seq2seq_dev_dataset)}")
print(f"Seq2Seq Test dataset size: {len(seq2seq_test_dataset)}")

In [None]:
from torch.amp import GradScaler

modelSeq2Seq = Seq2Seq(
    vocab_size=vocab_size, 
    embedding_dim=seq2seq_embedding_dim, 
    hidden_dim=seq2seq_hidden_dim
    ).to(device)

# Training setup
seq2seq_criterion = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=0.1)
seq2seq_optimizer = optim.Adam(modelSeq2Seq.parameters(), lr=seq2seq_learning_rate, weight_decay=1e-5)
seq2seq_scaler = GradScaler()

In [None]:
for epoch in range(seq2seq_epochs):
    print(f"Epoch {epoch+1}:")
    train_seq2seq_model(modelSeq2Seq, seq2seq_train_loader, seq2seq_optimizer, seq2seq_criterion, seq2seq_scaler, epoch)
    rouge = evaluate_seq2seq_rouge(modelSeq2Seq, seq2seq_dev_loader, tokenizer, device)
    print(f"ROUGE-1: {rouge['rouge1']:.4f} | "
          f"ROUGE-2: {rouge['rouge2']:.4f} | "
          f"ROUGE-L: {rouge['rougeL']:.4f}")
torch.save(modelSeq2Seq.state_dict(), "arabic_summarizer.pth")

### 2.2 Modern Approach: Transformer-Based (AraBART)

### 2.3 Evaluation of both approaches

Loading the two models

In [None]:
modelSeq2Seq.load_state_dict(torch.load("arabic_summarizer.pth"))
# modelTfSeq2Seq = AutoModel ...

In [None]:
# Evaluation of the traditional approach
rouge = evaluate_seq2seq_rouge(modelSeq2Seq, seq2seq_dev_loader, tokenizer, device)
print(f"ROUGE-1: {rouge['rouge1']:.4f} | "
      f"ROUGE-2: {rouge['rouge2']:.4f} | "
      f"ROUGE-L: {rouge['rougeL']:.4f}")

### 2.4 Summarization Functions

In [9]:
test_text = """
تعتبر التغذية السليمة أساس الصحة الجيدة. يجب أن يحتوي النظام الغذائي اليومي على مجموعة متنوعة من الأطعمة المغذية. الخضروات والفواكه الطازجة توفر الفيتامينات والمعادن الضرورية للجسم. البروتينات الموجودة في اللحوم والأسماك والبقوليات تساعد في بناء العضلات وإصلاح الأنسجة.

من المهم تناول وجبات منتظمة وتجنب الوجبات السريعة الغنية بالدهون والسكريات. شرب الماء بكميات كافية يساعد في الحفاظ على رطوبة الجسم وتحسين عملية الهضم. يجب أيضاً التقليل من المشروبات الغازية والعصائر المحلاة.

تناول وجبة الإفطار يعتبر من أهم العادات الصحية. فهي تمد الجسم بالطاقة اللازمة لبدء اليوم بنشاط. من المهم أيضاً تناول وجبات خفيفة صحية بين الوجبات الرئيسية للحفاظ على مستوى الطاقة في الجسم.
"""

In [None]:
def generate_summary(model, tokenizer, text, max_length=128):
    model.eval()
    text = normalize_arabic(text)
    text_seq = tokenizer.texts_to_sequences([text])[0]
    text_padded = pad_sequences([text_seq], maxlen=seq2seq_max_text_len, padding='post')
    text_tensor = torch.tensor(text_padded, dtype=torch.long).to(device)
    sos_idx = tokenizer.word_index.get('<sos>', 2)
    eos_idx = tokenizer.word_index.get('<eos>', 3)
    decoder_input = torch.tensor([[sos_idx]], dtype=torch.long).to(device)
    with torch.no_grad():
        _, (hidden, cell) = model.encoder(text_tensor)
    summary = []
    for _ in range(max_length):
        output, hidden, cell = model.decoder(decoder_input, hidden, cell)
        predicted = output.argmax(2)
        pred_idx = predicted.item()
        if pred_idx == eos_idx:
            break
        summary.append(pred_idx)
        decoder_input = predicted
    idx2word = {v: k for k, v in tokenizer.word_index.items()}
    summary_words = [idx2word.get(idx, '') for idx in summary]
    return ' '.join(summary_words)

### Instructions for Deployment and Usage

**To use the trained models independently:**

1. **Load the saved models:**
   ```python
   # For traditional models  
   svm_model = joblib.load('svm_model.pkl')
   tfidf_vectorizer = joblib.load('tfidf_vectorizer.pkl')
   
   # For BiLSTM
   bilstm_model.load_state_dict(torch.load('bilstm_best_model.pth'))
   
   # For AraBERT
   arabert_model = AutoModelForSequenceClassification.from_pretrained('results/checkpoint-288-best')
   ```

2. **Use the prediction functions:**
   ```python
   # Traditional
   result = predict_category("نص عربي")
   
   # BiLSTM  
   result = lstm_predict("نص عربي", modelBiLSTM, vocab)
   
   # AraBERT
   result = arabert_predict("نص عربي", model, tokenizer)
   ```

3. **For web deployment:** Create a Flask/FastAPI wrapper around these functions

**System Requirements:**
- Python 3.8+, PyTorch 1.12+, Transformers 4.21+, scikit-learn 1.1+, NLTK with Arabic stopwords
