<a href="https://colab.research.google.com/github/ebrahimhalaby/ebrahimsal/blob/master/Arabic_Commands_BERT_Benchmark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🧪 Arabic Commands BERT Benchmark (Colab-Ready)
This notebook benchmarks multiple Arabic/Multilingual BERT models on your **commands dataset** (`text,intent`) with rigorous evaluation for research:
- K-Fold CV over multiple random seeds.
- Metrics: Accuracy, F1 (macro/micro), per-class PR/F1, confusion matrices.
- Optional Arabic normalization ablation & class weighting.
- Exports full results (CSVs, figures) to a ZIP and (optionally) Google Drive.

> **Tip:** Start with a light configuration (fewer models/folds) to validate, then switch to the full configuration.

In [1]:
# ===============================
# 0) Install dependencies
# ===============================
!pip install -q transformers==4.43.3 datasets==2.21.0 accelerate==0.33.0 \
evaluate==0.4.2 scikit-learn==1.5.2 pandas==2.2.2 numpy==1.26.4 \
onnx onnxruntime matplotlib==3.8.4 scipy==1.12.0


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
opencv-python-headless 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you have numpy 1.26.4 which is incompatible.
opencv-contrib-python 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you have numpy 1.26.4 which is incompatible.
tsfresh 0.21.1 requires scipy>=1.14.0; python_version >= "3.10", but you have scipy 1.12.0 which is incompatible.
thinc 8.3.6 requires numpy<3.0.0,>=2.0.0, but you have numpy 1.26.4 which is incompatible.
opencv-python 4.12.0.88 requires numpy<2.3.0,>=2; python_version >= "3.9", but you have numpy 1.26.4 which is incompatible.
umap-learn 0.5.9.post2 requires scikit-learn>=1.6, but you have scikit-learn 1.5.2 which is incompatible.[0m[31m
[0m

In [2]:
# ===============================
# 1) Mount Google Drive (optional)
# ===============================
USE_DRIVE = True   # <- set False if you don't want to use Drive

if USE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')

DATA_FROM_DRIVE = True  # If True, the CSV is read from Drive path; else we'll use manual upload dialog.
DRIVE_CSV_PATH = "/content/drive/MyDrive/Colab Notebooks/arabic_game_commands_10k.csv"  # <- change if using Drive


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:

if DATA_FROM_DRIVE:
    CSV_PATH = DRIVE_CSV_PATH
else:
    # Manual upload (dialog)
    from google.colab import files
    up = files.upload()  # choose your CSV
    CSV_PATH = list(up.keys())[0]

print("Using CSV_PATH:", CSV_PATH)
df = pd.read_csv(CSV_PATH)
assert {"text","intent"}.issubset(df.columns), "CSV must have 'text' and 'intent' columns"
df.head(5)


Using CSV_PATH: /content/drive/MyDrive/Colab Notebooks/arabic_game_commands_10k.csv


NameError: name 'pd' is not defined

In [None]:
# ===============================
# 3) Benchmark configuration
# ===============================
import os, re, json, numpy as np, matplotlib.pyplot as plt
from pathlib import Path

OUTPUT_ROOT = "/content/bert_ar_commands_benchmark"
os.makedirs(OUTPUT_ROOT, exist_ok=True)

# --- Toggle between LIGHT and FULL runs ---
LIGHT_RUN = True  # True = faster sanity-check; False = full research run

if LIGHT_RUN:
    MODELS = [
        "aubmindlab/bert-base-arabertv2",
        "UBC-NLP/MARBERT",
        "xlm-roberta-base",
    ]
    KFOLDS = 3
    SEEDS = [42]
    MAX_EPOCHS = 3
else:
    MODELS = [
        "aubmindlab/bert-base-arabertv2",
        "asafaya/bert-base-arabic",
        "UBC-NLP/ARBERT",
        "UBC-NLP/MARBERT",
        "bert-base-multilingual-cased",
        "xlm-roberta-base",
    ]
    KFOLDS = 5
    SEEDS = [42, 77, 123]
    MAX_EPOCHS = 6

LR = 2e-5
BATCH = 32
PATIENCE = 2                     # Early stopping
USE_CLASS_WEIGHTS = True
USE_NORMALIZE = True

print("Models:", MODELS)
print("KFOLDS:", KFOLDS, "| SEEDS:", SEEDS, "| EPOCHS:", MAX_EPOCHS)


In [None]:
# ===============================
# 4) Preprocess (Arabic normalization optional) + labels
# ===============================
import numpy as np
df = df.dropna(subset=["text","intent"]).reset_index(drop=True)
df["text"] = df["text"].astype(str).str.strip()
df["intent"] = df["intent"].astype(str).str.strip()

ARABIC_INDIC = "٠١٢٣٤٥٦٧٨٩"
WESTERN = "0123456789"
DIGIT_MAP = str.maketrans(ARABIC_INDIC, WESTERN)

def normalize_ar(text: str) -> str:
    t = text.strip()
    t = re.sub(r"[\u0640]+", "", t)             # Tatweel
    t = t.translate(DIGIT_MAP)                  # ١٢٣ -> 123
    t = re.sub(r"[ـ]+", "", t)                  # Madd
    t = re.sub(r"\s+", " ", t)                  # Spaces
    t = re.sub("[إأآا]", "ا", t)
    t = re.sub("ى", "ي", t)
    t = re.sub("ؤ", "و", t)
    t = re.sub("ئ", "ي", t)
    t = t.replace("ة", "ه")
    t = re.sub(r"[\u0610-\u061A\u064B-\u065F\u0670\u06D6-\u06ED]", "", t)  # Diacritics
    t = re.sub(r"(.)\1{2,}", r"\1\1", t)       # Limit long char repeats
    return t

df["text_norm"] = df["text"].map(normalize_ar) if USE_NORMALIZE else df["text"]

labels = sorted(df["intent"].unique().tolist())
label2id = {l:i for i,l in enumerate(labels)}
id2label = {i:l for l,i in label2id.items()}
df["label"] = df["intent"].map(label2id)

print("Num samples:", len(df), "| Num classes:", len(labels))
print("Classes:", labels)


In [None]:
# ===============================
# 5) Training & Cross-Validation
# ===============================
import warnings, math
warnings.filterwarnings("ignore")

import torch
from datasets import Dataset
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import evaluate

from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          DataCollatorWithPadding, TrainingArguments, Trainer,
                          EarlyStoppingCallback, set_seed)

accuracy = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")

def compute_metrics_builder():
    def compute_metrics(eval_pred):
        logits, labels_np = eval_pred
        preds = np.argmax(logits, axis=-1)
        acc = accuracy.compute(predictions=preds, references=labels_np)["accuracy"]
        f1_macro = f1_metric.compute(predictions=preds, references=labels_np, average="macro")["f1"]
        f1_micro = f1_metric.compute(predictions=preds, references=labels_np, average="micro")["f1"]
        return {"accuracy": acc, "f1_macro": f1_macro, "f1_micro": f1_micro}
    return compute_metrics

USE_FP16 = torch.cuda.is_available()

def train_eval_one_fold(model_name, seed, tr_idx, va_idx, outdir):
    set_seed(seed)
    tok = AutoTokenizer.from_pretrained(model_name)

    # HF datasets
    train_ds = Dataset.from_pandas(df.iloc[tr_idx][["text_norm","label"]].rename(columns={"text_norm":"text"}))
    val_ds   = Dataset.from_pandas(df.iloc[va_idx][["text_norm","label"]].rename(columns={"text_norm":"text"}))

    def tok_fn(batch): return tok(batch["text"], truncation=True)
    train_ds = train_ds.map(tok_fn, batched=True, remove_columns=["text"])
    val_ds   = val_ds.map(tok_fn, batched=True, remove_columns=["text"])
    data_collator = DataCollatorWithPadding(tokenizer=tok)

    loss_weights = None
    if USE_CLASS_WEIGHTS:
        y = df.iloc[tr_idx]["label"].values
        cw = compute_class_weight("balanced", classes=np.arange(len(labels)), y=y)
        loss_weights = torch.tensor(cw, dtype=torch.float)

    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, num_labels=len(labels), id2label=id2label, label2id=label2id
    )

    def compute_loss(model, inputs, return_outputs=False):
        labels_t = inputs.get("labels")
        outputs = model(**{k:v for k,v in inputs.items() if k!="labels"})
        logits = outputs.logits
        loss_fct = torch.nn.CrossEntropyLoss(weight=(loss_weights.to(logits.device) if loss_weights is not None else None))
        loss = loss_fct(logits, labels_t)
        return (loss, outputs) if return_outputs else loss

    args = TrainingArguments(
        output_dir=outdir,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1_macro",
        greater_is_better=True,
        learning_rate=LR,
        per_device_train_batch_size=BATCH,
        per_device_eval_batch_size=BATCH,
        num_train_epochs=MAX_EPOCHS,
        warmup_ratio=0.1,
        weight_decay=0.01,
        fp16=USE_FP16,
        logging_steps=50,
        report_to="none",
        seed=seed
    )

    trainer = Trainer(
        model=model, args=args,
        train_dataset=train_ds, eval_dataset=val_ds,
        tokenizer=tok, data_collator=data_collator,
        compute_metrics=compute_metrics_builder(),
        callbacks=[EarlyStoppingCallback(early_stopping_patience=PATIENCE)]
    )
    trainer.compute_loss = compute_loss

    trainer.train()
    out = trainer.predict(val_ds)
    preds = np.argmax(out.predictions, axis=-1)
    y_true = np.array(val_ds["label"])

    acc = accuracy_score(y_true, preds)
    p, r, f1s, _ = precision_recall_fscore_support(y_true, preds, labels=np.arange(len(labels)))
    f1_macro = f1s.mean()
    f1_micro = precision_recall_fscore_support(y_true, preds, average="micro")[2]
    cm = confusion_matrix(y_true, preds, labels=np.arange(len(labels)))

    # Save confusion matrix fig
    import matplotlib.pyplot as plt
    plt.figure()
    plt.imshow(cm, interpolation='nearest')
    plt.title(f"CM {model_name} s{seed}")
    plt.colorbar()
    plt.xticks(range(len(labels)), labels, rotation=45, ha="right")
    plt.yticks(range(len(labels)), labels)
    plt.tight_layout()
    cm_path = os.path.join(outdir, "confusion_matrix.png")
    plt.savefig(cm_path); plt.close()

    # Save per-class report
    per_class = {labels[i]: {"precision": float(p[i]), "recall": float(r[i]), "f1": float(f1s[i])} for i in range(len(labels))}
    with open(os.path.join(outdir, "per_class.json"), "w", encoding="utf-8") as f:
        json.dump(per_class, f, ensure_ascii=False, indent=2)

    return {
        "metrics": {"accuracy": float(acc), "f1_macro": float(f1_macro), "f1_micro": float(f1_micro)},
        "val_true": y_true.tolist(),
        "val_pred": preds.tolist()
    }

all_rows = []
store_preds = {}

from datetime import datetime
RUN_TAG = datetime.now().strftime("%Y%m%d_%H%M%S")

for model_name in MODELS:
    for seed in SEEDS:
        skf = StratifiedKFold(n_splits=KFOLDS, shuffle=True, random_state=seed)
        for fold, (tr_idx, va_idx) in enumerate(skf.split(df["text_norm"], df["label"])):
            outdir = f"{OUTPUT_ROOT}/{RUN_TAG}/{model_name.replace('/','_')}/seed{seed}_fold{fold}"
            os.makedirs(outdir, exist_ok=True)
            print(f"\n=== {model_name} | seed={seed} | fold={fold+1}/{KFOLDS} ===")
            res = train_eval_one_fold(model_name, seed, tr_idx, va_idx, outdir)
            row = {"model": model_name, "seed": seed, "fold": fold}
            row.update(res["metrics"])
            all_rows.append(row)
            store_preds[f"{model_name}|seed{seed}|fold{fold}"] = res

import pandas as pd
cv_df = pd.DataFrame(all_rows)
cv_df.to_csv(f"{OUTPUT_ROOT}/{RUN_TAG}/cv_results.csv", index=False)
summary = cv_df.groupby("model")[["accuracy","f1_macro","f1_micro"]].agg(["mean","std"]).reset_index()
summary_path = f"{OUTPUT_ROOT}/{RUN_TAG}/cv_summary.csv"
summary.to_csv(summary_path, index=False)
summary


In [None]:
# ===============================
# 6) Pairwise bootstrap significance (ΔF1_macro) between models
# ===============================
import numpy as np, pandas as pd
from sklearn.metrics import precision_recall_fscore_support

cv_df = pd.read_csv(f"{OUTPUT_ROOT}/{RUN_TAG}/cv_results.csv")
models_unique = cv_df["model"].unique().tolist()

def paired_bootstrap_f1(y_true, pred_a, pred_b, B=2000):
    y_true = np.array(y_true); pa = np.array(pred_a); pb = np.array(pred_b)
    n = len(y_true)
    rng = np.random.default_rng(2024)
    def f1_macro(y, p):
        _, _, f1s, _ = precision_recall_fscore_support(y, p, labels=np.unique(y))
        return float(np.mean(f1s))
    diffs = []
    for _ in range(B):
        idx = rng.integers(0, n, n)
        diffs.append(f1_macro(y_true[idx], pa[idx]) - f1_macro(y_true[idx], pb[idx]))
    diffs = np.array(diffs)
    lo, hi = np.quantile(diffs, [0.025, 0.975])
    return float(diffs.mean()), float(lo), float(hi)

stats_rows = []
for i in range(len(models_unique)):
    for j in range(i+1, len(models_unique)):
        A, Bm = models_unique[i], models_unique[j]
        diffs = []
        for seed in SEEDS:
            for fold in range(KFOLDS):
                keyA = f"{A}|seed{seed}|fold{fold}"
                keyB = f"{Bm}|seed{seed}|fold{fold}"
                if keyA in store_preds and keyB in store_preds:
                    y_true = store_preds[keyA]["val_true"]
                    pa = store_preds[keyA]["val_pred"]
                    pb = store_preds[keyB]["val_pred"]
                    mean_d, lo, hi = paired_bootstrap_f1(y_true, pa, pb, B=1500 if LIGHT_RUN else 3000)
                    diffs.append((mean_d, lo, hi))
        if diffs:
            mean_over = float(np.mean([d[0] for d in diffs]))
            lo_over   = float(np.mean([d[1] for d in diffs]))
            hi_over   = float(np.mean([d[2] for d in diffs]))
            stats_rows.append({"A":A, "B":Bm, "ΔF1_macro_mean": mean_over, "CI_low": lo_over, "CI_high": hi_over})

stats_df = pd.DataFrame(stats_rows)
stats_df.to_csv(f"{OUTPUT_ROOT}/{RUN_TAG}/pairwise_bootstrap_macroF1.csv", index=False)
stats_df


In [None]:
# ===============================
# 7) Retrain best model on all data & export (Torch/ONNX/int8)
# ===============================
best_model = summary.sort_values(("f1_macro","mean"), ascending=False)["model"].iloc[0]
best_model


In [None]:
import os, json, numpy as np, torch
from datasets import Dataset
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          DataCollatorWithPadding, TrainingArguments, Trainer,
                          EarlyStoppingCallback)

final_dir = f"{OUTPUT_ROOT}/{RUN_TAG}/final_{best_model.replace('/','_')}"
os.makedirs(final_dir, exist_ok=True)

tok = AutoTokenizer.from_pretrained(best_model)

# small internal split for early stopping
skf = StratifiedKFold(n_splits=8, shuffle=True, random_state=999)
tr_idx, va_idx = list(skf.split(df["text_norm"], df["label"]))[0]

train_ds = Dataset.from_pandas(df.iloc[tr_idx][["text_norm","label"]].rename(columns={"text_norm":"text"}))
val_ds   = Dataset.from_pandas(df.iloc[va_idx][["text_norm","label"]].rename(columns={"text_norm":"text"}))

def tok_fn(b): return tok(b["text"], truncation=True)
train_ds = train_ds.map(tok_fn, batched=True, remove_columns=["text"])
val_ds   = val_ds.map(tok_fn, batched=True, remove_columns=["text"])

data_collator = DataCollatorWithPadding(tokenizer=tok)
model = AutoModelForSequenceClassification.from_pretrained(
    best_model, num_labels=len(labels), id2label=id2label, label2id=label2id
)

if USE_CLASS_WEIGHTS:
    y = df.iloc[tr_idx]["label"].values
    cw = compute_class_weight("balanced", classes=np.arange(len(labels)), y=y)
    loss_weights = torch.tensor(cw, dtype=torch.float)
else:
    loss_weights = None

def compute_loss(model, inputs, return_outputs=False):
    labels_t = inputs.get("labels")
    outputs = model(**{k:v for k,v in inputs.items() if k!="labels"})
    logits = outputs.logits
    loss_fct = torch.nn.CrossEntropyLoss(weight=(loss_weights.to(logits.device) if loss_weights is not None else None))
    loss = loss_fct(logits, labels_t)
    return (loss, outputs) if return_outputs else loss

args = TrainingArguments(
    output_dir=final_dir,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    greater_is_better=True,
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=6,
    warmup_ratio=0.1,
    weight_decay=0.01,
    fp16=torch.cuda.is_available(),
    logging_steps=50,
    report_to="none",
    seed=2025
)

trainer = Trainer(
    model=model, args=args,
    train_dataset=train_ds, eval_dataset=val_ds,
    tokenizer=tok, data_collator=data_collator,
    compute_metrics=None,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)
trainer.compute_loss = compute_loss
trainer.train()

# Save (Torch)
trainer.save_model(final_dir)
tok.save_pretrained(final_dir)
with open(os.path.join(final_dir, "label_map.json"), "w", encoding="utf-8") as f:
    json.dump({"labels": labels, "label2id": {k:int(v) for k,v in label2id.items()},
               "id2label": {int(k):v for k,v in id2label.items()}}, f, ensure_ascii=False, indent=2)

# Export ONNX
dummy = tok("اختبار", return_tensors="pt")
model.to("cpu").eval()
onnx_path = os.path.join(final_dir, "model.onnx")
torch.onnx.export(
    model,
    (dummy["input_ids"], dummy["attention_mask"]),
    onnx_path,
    input_names=["input_ids","attention_mask"],
    output_names=["logits"],
    dynamic_axes={"input_ids":{0:"batch",1:"seq"}, "attention_mask":{0:"batch",1:"seq"}, "logits":{0:"batch"}},
    opset_version=17
)

# Optional quantization
try:
    from onnxruntime.quantization import quantize_dynamic, QuantType
    quantize_dynamic(onnx_path, os.path.join(final_dir, "model.int8.onnx"), weight_type=QuantType.QInt8)
except Exception as e:
    print("Quantization skipped:", e)

final_dir


In [None]:
# ===============================
# 8) Export all results as ZIP (and copy to Drive)
# ===============================
import shutil, os

ZIP_PATH = f"/content/Arabic_Commands_BERT_Results_{RUN_TAG}.zip"
shutil.make_archive(base_name=ZIP_PATH.replace(".zip",""), format="zip", root_dir=f"{OUTPUT_ROOT}/{RUN_TAG}")
print("ZIP created at:", ZIP_PATH)

# Copy to Drive
if USE_DRIVE:
    DRIVE_OUT = f"/content/drive/MyDrive/Arabic_Commands_BERT_Results_{RUN_TAG}.zip"
    shutil.copyfile(ZIP_PATH, DRIVE_OUT)
    print("Copied ZIP to Drive:", DRIVE_OUT)


In [None]:
# ===============================
# 9) Quick inference function (loads final best model)
# ===============================
import json, numpy as np, torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

def normalize_ar(text: str) -> str:
    t = text.strip()
    t = re.sub(r"[\u0640]+", "", t)
    t = t.translate(str.maketrans("٠١٢٣٤٥٦٧٨٩","0123456789"))
    t = re.sub(r"[ـ]+", "", t)
    t = re.sub(r"\s+", " ", t)
    t = re.sub("[إأآا]", "ا", t)
    t = re.sub("ى", "ي", t)
    t = re.sub("ؤ", "و", t)
    t = re.sub("ئ", "ي", t)
    t = t.replace("ة", "ه")
    t = re.sub(r"[\u0610-\u061A\u064B-\u065F\u0670\u06D6-\u06ED]", "", t)
    t = re.sub(r"(.)\1{2,}", r"\1\1", t)
    return t

best_dir = final_dir  # from previous cell
tok = AutoTokenizer.from_pretrained(best_dir)
mdl = AutoModelForSequenceClassification.from_pretrained(best_dir)
id2label = mdl.config.id2label

def predict(text, threshold=0.55):
    t = normalize_ar(text)
    enc = tok(t, return_tensors="pt", truncation=True)
    with torch.no_grad():
        logits = mdl(**enc).logits
        probs = torch.softmax(logits, dim=-1).numpy().squeeze()
    idx = int(np.argmax(probs))
    conf = float(probs[idx])
    label = id2label[idx] if conf >= threshold else "UNKNOWN"
    return {"text": text, "norm": t, "label": label, "confidence": round(conf, 4)}

print(predict("يلا استدر١٨٠ لليمين"))
print(predict("لو سمحت قف بسرعة"))
print(predict("اعمل شي غريب ما بعرف"))
