## Part 0: Demonstration

This section loads and highlights the finalized trained model from this notebook, and allows usage for sample sentences or inputs for visuaolizing the results.

In [None]:
!pip install datasets transformers evaluate arabert

import pandas as pd
import re
import joblib
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, get_linear_schedule_with_warmup
import numpy as np
import evaluate
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score
from sklearn.svm import LinearSVC
from sklearn.linear_model import LinearRegression
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from torch.optim import AdamW, lr_scheduler
from arabert.preprocess import ArabertPreprocessor
from sklearn.feature_extraction.text import TfidfVectorizer

print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_name = "aubmindlab/bert-base-arabertv02"
model_path = "/content/best_model.pt"
device = "cuda" if torch.cuda.is_available() else "cpu"

# Load tokenizer (same as training)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Recreate model architecture
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=2
)

# Load trained weights
state_dict = torch.load(model_path, map_location=device)
model.load_state_dict(state_dict)

model.to(device)
model.eval()

def predict_arabert(text, max_len=128):
    encoding = tokenizer(
        text,
        truncation=True,
        padding="max_length",
        max_length=max_len,
        return_tensors="pt"
    )

    input_ids = encoding["input_ids"].to(device)
    attention_mask = encoding["attention_mask"].to(device)

    with torch.no_grad():
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        probs = torch.softmax(outputs.logits, dim=1)
        pred = torch.argmax(probs, dim=1).item()

    label = "Positive" if pred == 1 else "Negative"
    confidence = probs[0][pred].item()

    return label, confidence


In [None]:
example = "هذا رائع جدا"

# SVM Model
bundle = joblib.load("svm_tfidf_pipeline.joblib")

svm_model = bundle["model"]
tfidf_vectorizer = bundle["vectorizer"]

X = tfidf_vectorizer.transform([example])
print("SVM: ", "Positive" if (svm_model.predict(X) == 1)[0] == 1 else "Negative")

# Linear Regresion Model
bundle = joblib.load("linear_regression_tfidf.joblib")

linear_regression = bundle["model"]
tfidf_vectorizer = bundle["vectorizer"]

X = tfidf_vectorizer.transform([example])
print("Linear Regression: ", "Positive" if (linear_regression.predict(X) > 1)[0] else "Negative")

# AraBERT Model
label, confidence = predict_arabert(example)

print(f"AraBERT: {label} (confidence: {confidence:.3f})")

## Part 1: Dependencies and Imports

This section installs the necessary libraries and imports them for use in this colab notebook. It also defines high-level constants that will be used for hyperparameters in training the various models.

In [None]:
!pip install datasets transformers evaluate arabert

import pandas as pd
import re
import joblib
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, get_linear_schedule_with_warmup
import numpy as np
import evaluate
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score
from sklearn.svm import LinearSVC
from sklearn.linear_model import LinearRegression
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from torch.optim import AdamW, lr_scheduler
from arabert.preprocess import ArabertPreprocessor
from sklearn.feature_extraction.text import TfidfVectorizer

print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

In [None]:
max_epochs = 12
learning_rate = 1.5e-5
weight_decay = 0.01
scheduler_factor = 0.1
scheduler_patience = 1
patience = 3

## Part 2: Load and Process Dataset

This section will load the dataset and apply various processing and normalization techniques for the dataset to be ready for use in the ML / NLP models.

In [None]:
positive_samples = pd.read_csv("test.tsv", sep='\t', header=None, names=['label', 'text'])
negative_samples = pd.read_csv("train.tsv", sep='\t', header=None, names=['label', 'text'])

df = pd.concat([positive_samples, negative_samples], ignore_index=True)
df['label'] = df['label'].map({'neg': 0, 'pos': 1})
df.head()

In [None]:
len(df)

In [None]:
model_name = "aubmindlab/bert-base-arabertv02"
arabert_prep = ArabertPreprocessor(model_name=model_name)
arabic_diacritics = re.compile("""
                             ّ    | # Shadda
                             َ    | # Fatha
                             ً    | # Tanwin Fath
                             ُ    | # Damma
                             ٌ    | # Tanwin Damm
                             ِ    | # Kasra
                             ٍ    | # Tanwin Kasr
                             ْ    | # Sukun
                             ـ     # Tatwil/Kashida
                         """, re.VERBOSE)

def normalize_arabic_light(text):
    # Replace underscores with spaces
    text = text.replace('_', ' ')

    # Remove diacritics only
    text = re.sub(arabic_diacritics, '', text)

    # Normalize Alef variants
    text = re.sub(r'[إأآ]', 'ا', text)

    # Normalize spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

def arabert_normalize(text):
    text = normalize_arabic_light(text)
    text = arabert_prep.preprocess(text)
    return text

df['text_normalized'] = df['text'].apply(arabert_normalize)

print(df[['text', 'text_normalized']].head())


In [None]:
for i in range(3):
    print(f"Sample {i+1}")
    print("BEFORE :", df.loc[i, 'text'])
    print("AFTER  :", df.loc[i, 'text_normalized'])
    print("-" * 80)

## Part 3: Feature Extraction (TF-IDF)

This section will generate the necessary features to train the classic ML models.

In [None]:
X = df['text_normalized'].tolist()
y = df['label'].tolist()

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

tfidf_vectorizer = TfidfVectorizer(
    analyzer="char",
    ngram_range=(3, 5),
    max_features=30000,
    min_df=3,
    sublinear_tf=True,
    lowercase=False
)

X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_val_tfidf   = tfidf_vectorizer.transform(X_val)
X_test_tfidf  = tfidf_vectorizer.transform(X_test)

## Part 4: Classic ML Models

This section will train the classic ML models: linear regression + SVM.

In [None]:
# Train SVM model (linear SVC)
svm_model = LinearSVC(
    C=5.0,
    class_weight="balanced",
    max_iter=20000
)

svm_model.fit(X_train_tfidf, y_train)

# Validation
val_preds = svm_model.predict(X_val_tfidf)
print("Validation Accuracy:", accuracy_score(y_val, val_preds))
print(classification_report(y_val, val_preds, digits=4))

# Test
test_preds = svm_model.predict(X_test_tfidf)
print("Test Accuracy:", accuracy_score(y_test, test_preds))


In [None]:
joblib.dump(
    {
        "model": svm_model,
        "vectorizer": tfidf_vectorizer
    },
    "svm_tfidf_pipeline.joblib"
)

In [None]:
df['text_normalized'] = df['text'].apply(normalize_arabic_light)

X = df['text_normalized'].tolist()
y = [1 if label == 1 else -1 for label in df['label']]

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

tfidf_vectorizer = TfidfVectorizer(
    analyzer="char",
    ngram_range=(3, 5),
    max_features=30000,
    min_df=3,
    sublinear_tf=True,
    lowercase=False
)

X_train_tfidf = tfidf_vectorizer.fit_transform(X_train)
X_test_tfidf  = tfidf_vectorizer.transform(X_test)

model = LinearRegression()
model.fit(X_train_tfidf, y_train)

y_pred = model.predict(X_test_tfidf)
y_pred_class = [1 if p >= 0 else -1 for p in y_pred]

accuracy = accuracy_score(y_test, y_pred_class)
print(f"Accuracy: {accuracy:.4f}")

In [None]:
print(classification_report(y_test, y_pred_class, digits=4))

test_preds = svm_model.predict(X_test_tfidf)
print("Test Accuracy:", accuracy_score(y_test, y_pred_class))

In [None]:
joblib.dump(
    {
        "model": model,
        "vectorizer": tfidf_vectorizer
    },
    "linear_regression_tfidf.joblib"
)

## Part 5: AraBERT Transformer Based LLM / NLP Model

This section will focus on fine-tuning an AraBERT model and training an additional classification layer on top of the encoder to predict the two outcomes (positive / negative sentiment) of the given entry in the dataset.

In [None]:
# Initialize Model Tokenizer
model_name = "aubmindlab/bert-base-arabertv02"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Split to train, validate, test dataset
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df["text"].tolist(),
    df["label"].tolist(),
    test_size=0.2,
    random_state=42
)

train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_texts,
    train_labels,
    test_size=0.2,
    random_state=42,
    stratify=train_labels
)

train_texts = list(train_texts)
train_labels = list(train_labels)
val_texts = list(val_texts)
val_labels = list(val_labels)
test_texts = list(test_texts)
test_labels = list(test_labels)

# Convert to Pytorch Dataset
class ArabicSentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding="max_length",
            max_length=self.max_len,
            return_tensors="pt"
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(self.labels[idx], dtype=torch.long)
        }

train_dataset = ArabicSentimentDataset(train_texts, train_labels, tokenizer)
val_dataset   = ArabicSentimentDataset(val_texts, val_labels, tokenizer)
test_dataset  = ArabicSentimentDataset(test_texts, test_labels, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32)
test_loader  = DataLoader(test_dataset, batch_size=32)

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=2
)

for param in model.bert.parameters():
    param.requires_grad = False
for layer in model.bert.encoder.layer[-6:]:
    for param in layer.parameters():
        param.requires_grad = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
# define optimizer
optimizer = AdamW(
    model.parameters(),
    lr=learning_rate,
    weight_decay=weight_decay
)

# define loss function
loss_fn = torch.nn.CrossEntropyLoss()

# define scheduler
total_steps = len(train_loader) * max_epochs
warmup_steps = int(0.1 * total_steps)

# Warm-up scheduler (per batch)
warmup_scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
)

# ReduceLROnPlateau (per epoch)
plateau_scheduler = lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=scheduler_factor,
    patience=scheduler_patience,
)

def train():
  best_val_loss = float("inf")
  patience_counter = 0

  for epoch in range(max_epochs):
      model.train()
      train_loss = 0

      # Training
      for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{max_epochs} [Train]"):
          optimizer.zero_grad()

          input_ids = batch["input_ids"].to(device)
          attention_mask = batch["attention_mask"].to(device)
          labels = batch["labels"].to(device)

          outputs = model(
              input_ids=input_ids,
              attention_mask=attention_mask,
              labels=labels
          )

          loss = outputs.loss
          loss.backward()

          torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
          optimizer.step()
          warmup_scheduler.step()
          optimizer.zero_grad()

          train_loss += loss.item()

      train_loss /= len(train_loader)

      # Validation
      model.eval()
      val_loss = 0
      correct = 0
      total = 0

      with torch.no_grad():
          for batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{max_epochs} [Val]"):
              input_ids = batch["input_ids"].to(device)
              attention_mask = batch["attention_mask"].to(device)
              labels = batch["labels"].to(device)

              outputs = model(
                  input_ids=input_ids,
                  attention_mask=attention_mask,
                  labels=labels
              )

              val_loss += outputs.loss.item()
              preds = torch.argmax(outputs.logits, dim=1)

              correct += (preds == labels).sum().item()
              total += labels.size(0)

      val_loss /= len(val_loader)
      val_acc = correct / total

      print(
          f"\nEpoch {epoch+1}: "
          f"Train Loss = {train_loss:.4f} | "
          f"Val Loss = {val_loss:.4f} | "
          f"Val Acc = {val_acc:.4f}"
      )

      # Early Stopping
      if val_loss < best_val_loss:
          best_val_loss = val_loss
          patience_counter = 0
          torch.save(model.state_dict(), "best_model.pt")
      else:
          patience_counter += 1
          if patience_counter >= patience:
              print("\n⏹ Early stopping triggered")
              break

      plateau_scheduler.step(val_loss)

train()

In [None]:
def eval():
  model.eval()
  correct, total = 0, 0

  with torch.no_grad():
      for batch in test_loader:
          input_ids = batch["input_ids"].to(device)
          attention_mask = batch["attention_mask"].to(device)
          labels = batch["labels"].to(device)

          outputs = model(input_ids, attention_mask=attention_mask)
          preds = torch.argmax(outputs.logits, dim=1)

          correct += (preds == labels).sum().item()
          total += labels.size(0)

  print("Accuracy:", correct / total)

eval()