In [None]:
# !pip install evaluate
# !pip install -U transformers huggingface_hub
# !pip install seqeval

## Imports

In [None]:
import random
import unicodedata
from copy import deepcopy

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from datasets import (
    load_dataset, DatasetDict, Dataset,
    Features, Sequence, ClassLabel, Value,
    concatenate_datasets
)

import evaluate

from transformers import (
    AutoTokenizer, AutoModelForTokenClassification,
    DataCollatorForTokenClassification,
    TrainingArguments, Trainer
)

from peft import LoraConfig, get_peft_model, TaskType

## Configuration


In [None]:
# Data (MasakhaNER 2.0 Yorùbá Parquet files on Hugging Face)
parquet_urls = {
    "train": "https://huggingface.co/datasets/masakhane/masakhaner2/resolve/refs/convert/parquet/yor/train/0000.parquet",
    "validation": "https://huggingface.co/datasets/masakhane/masakhaner2/resolve/refs/convert/parquet/yor/validation/0000.parquet",
    "test": "https://huggingface.co/datasets/masakhane/masakhaner2/resolve/refs/convert/parquet/yor/test/0000.parquet",
}

label_names = ["O","B-PER","I-PER","B-ORG","I-ORG","B-LOC","I-LOC","B-DATE","I-DATE"]
id2label = {i: s for i, s in enumerate(label_names)}
label2id = {s: i for i, s in enumerate(label_names)}

english_fillers = ["abeg", "please", "nah", "even", "sha", "well", "like", "maybe"]
O_ID = 0

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

## Load MasakhaNER 2.0 (Yorùbá)

In [None]:
ds = load_dataset("parquet", data_files=parquet_urls)
print(ds)

print("labels:", label_names)
ex = ds["train"][0]
print("tokens:", ex["tokens"][:25])
print("ner ids:", ex["ner_tags"][:25])
print("ner tags:", [label_names[i] for i in ex["ner_tags"][:25]])

## Robustness split: remove diacritics (NFD + drop Mn)

In [None]:
def strip_diacritics(token: str) -> str:
    norm = unicodedata.normalize("NFD", token)
    return "".join(ch for ch in norm if unicodedata.category(ch) != "Mn")

def make_no_diacritics_split(split_ds):
    new_rows = []
    for ex in split_ds:
        new_tokens = [strip_diacritics(t) for t in ex["tokens"]]
        new_rows.append({"id": ex["id"], "tokens": new_tokens, "ner_tags": ex["ner_tags"]})
    return Dataset.from_list(new_rows)

no_diac = DatasetDict({
    "validation_no_diac": make_no_diacritics_split(ds["validation"]),
    "test_no_diac": make_no_diacritics_split(ds["test"]),
})
print(no_diac)
print("before:", " ".join(ds["validation"][0]["tokens"][:12]))
print("after :", " ".join(no_diac["validation_no_diac"][0]["tokens"][:12]))

## Robustness split: light code-switch inserts (at O-tag positions)

In [None]:
def make_codeswitch_split(split_ds, p_insert=0.12, max_inserts_per_sent=3):
    rows = []
    for ex in split_ds:
        toks, tags = ex["tokens"], ex["ner_tags"]
        new_toks, new_tags = [], []
        inserts = 0
        for t, tag in zip(toks, tags):
            new_toks.append(t); new_tags.append(tag)
            if tag == O_ID and inserts < max_inserts_per_sent and random.random() < p_insert:
                filler = random.choice(english_fillers)
                new_toks.append(filler)
                new_tags.append(O_ID)
                inserts += 1
        rows.append({"id": ex["id"], "tokens": new_toks, "ner_tags": new_tags})
    return Dataset.from_list(rows)

cs = DatasetDict({
    "validation_cs": make_codeswitch_split(ds["validation"], p_insert=0.12),
    "test_cs": make_codeswitch_split(ds["test"], p_insert=0.12),
})
print(cs)
print("example with inserts:", " ".join(cs["validation_cs"][0]["tokens"][:30]))

## Bundle all splits

In [None]:
all_splits = DatasetDict({
    "train": ds["train"],
    "validation": ds["validation"],
    "test": ds["test"],
    "validation_no_diac": no_diac["validation_no_diac"],
    "test_no_diac": no_diac["test_no_diac"],
    "validation_cs": cs["validation_cs"],
    "test_cs": cs["test_cs"],
})
all_splits

## Tokeniser and BIO alignment

In [None]:
tok = AutoTokenizer.from_pretrained("xlm-roberta-base", use_fast=True)
print(tok.name_or_path, "— num labels:", len(label_names))

def tokenize_and_align(examples, label_col="ner_tags"):
    tokenized = tok(examples["tokens"], is_split_into_words=True, truncation=True)
    new_labels = []
    for i, labels in enumerate(examples[label_col]):
        word_ids = tokenized.word_ids(batch_index=i)  # map subword to original word index
        aligned = []
        prev_word = None
        for wid in word_ids:
            if wid is None:
                aligned.append(-100)
            elif wid != prev_word:
                aligned.append(labels[wid])
            else:
                aligned.append(-100)
            prev_word = wid
        new_labels.append(aligned)
    tokenized["labels"] = new_labels
    return tokenized

tok_ds = DatasetDict({
    "train": all_splits["train"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "validation": all_splits["validation"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "test": all_splits["test"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "validation_no_diac": all_splits["validation_no_diac"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "test_no_diac": all_splits["test_no_diac"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "validation_cs": all_splits["validation_cs"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
    "test_cs": all_splits["test_cs"].map(tokenize_and_align, batched=True, remove_columns=["id","tokens","ner_tags"]),
})
tok_ds

## Data collator and evaluation metric

In [None]:
collator = DataCollatorForTokenClassification(tokenizer=tok)
metric = evaluate.load("seqeval")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)

    true_preds, true_labels = [], []
    for p_row, l_row in zip(preds, labels):
        preds_i, labels_i = [], []
        for p, l in zip(p_row, l_row):
            if l != -100:
                preds_i.append(id2label[p])
                labels_i.append(id2label[l])
        true_preds.append(preds_i)
        true_labels.append(labels_i)
    return metric.compute(predictions=true_preds, references=true_labels)

## Model A - Full fine-tune (`xlm-roberta-base`)

In [None]:
model = AutoModelForTokenClassification.from_pretrained(
    "xlm-roberta-base",
    num_labels=len(label_names),
    id2label=id2label,
    label2id=label2id
)

args = TrainingArguments(
    output_dir="/kaggle/working/yoruba_ner_xlmr",
    eval_strategy="epoch",
    save_strategy="no",
    logging_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.01,
    fp16=True,
    load_best_model_at_end=False,
    report_to="none",
    seed=SEED,
    save_total_limit=1,
    save_only_model=True
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tok_ds["train"],
    eval_dataset=tok_ds["validation"],
    tokenizer=tok,
    data_collator=collator,
    compute_metrics=compute_metrics,
)

train_result = trainer.train()
trainer.save_model("/kaggle/working/yoruba_ner_xlmr/best")

val_metrics = trainer.evaluate(eval_dataset=tok_ds["validation"])
val_metrics

### Robustness evaluation: clean, no-diacritics, code-switch

In [None]:
def eval_split(name, dataset):
    out = trainer.evaluate(eval_dataset=dataset)
    return {
        "split": name,
        "precision": out["eval_overall_precision"],
        "recall": out["eval_overall_recall"],
        "f1": out["eval_overall_f1"],
        "accuracy": out["eval_overall_accuracy"],
        "loss": out["eval_loss"],
    }

rows = []
rows.append(eval_split("test_clean", tok_ds["test"]))
rows.append(eval_split("test_no_diacritics", tok_ds["test_no_diac"]))
rows.append(eval_split("test_codeswitch", tok_ds["test_cs"]))

results_df = pd.DataFrame(rows).sort_values("split").reset_index(drop=True)
display(results_df)

csv_path = "/kaggle/working/yoruba_ner_xlmr/robustness_results.csv"
results_df.to_csv(csv_path, index=False)
print("Saved:", csv_path)

### Per-entity F1 for full fine-tune

In [None]:
def per_entity_report(dataset):
    rep = trainer.evaluate(eval_dataset=dataset)
    ents = ["DATE","LOC","ORG","PER"]
    return {e: rep[f"eval_{e}"]["f1"] for e in ents}

print("per-entity F1 (clean):", per_entity_report(tok_ds["test"]))
print("per-entity F1 (no diacritics):", per_entity_report(tok_ds["test_no_diac"]))
print("per-entity F1 (codeswitch):", per_entity_report(tok_ds["test_cs"]))

## Model B - LoRA (parameter-efficient)

In [None]:
base_model_ckpt = "xlm-roberta-base"
lora_cfg = LoraConfig(
    task_type=TaskType.TOKEN_CLS,
    r=8,
    lora_alpha=16,
    lora_dropout=0.1,
    bias="none",
    target_modules=["query","key","value","output.dense"]
)

lora_model = AutoModelForTokenClassification.from_pretrained(
    base_model_ckpt,
    num_labels=len(label_names),
    id2label=id2label,
    label2id=label2id
)
lora_model = get_peft_model(lora_model, lora_cfg)
lora_model.print_trainable_parameters()

lora_args = TrainingArguments(
    output_dir="/kaggle/working/yoruba_ner_xlmr_lora",
    eval_strategy="epoch",
    save_strategy="no",
    logging_strategy="epoch",
    learning_rate=2e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.0,
    fp16=True,
    load_best_model_at_end=False,
    report_to="none",
    seed=SEED,
    save_total_limit=1,
    save_only_model=True
)

lora_trainer = Trainer(
    model=lora_model,
    args=lora_args,
    train_dataset=tok_ds["train"],
    eval_dataset=tok_ds["validation"],
    tokenizer=tok,
    data_collator=collator,
    compute_metrics=compute_metrics,
)

lora_trainer.train()
lora_trainer.save_model("/kaggle/working/yoruba_ner_xlmr_lora/best")
lora_val = lora_trainer.evaluate(eval_dataset=tok_ds["validation"])
lora_val

### Compare full vs LoRA on all conditions

In [None]:
def eval_with(tr, name, dataset, split_name):
    out = tr.evaluate(eval_dataset=dataset)
    return {
        "model": name, "split": split_name,
        "precision": out["eval_overall_precision"],
        "recall": out["eval_overall_recall"],
        "f1": out["eval_overall_f1"],
        "accuracy": out["eval_overall_accuracy"],
        "loss": out["eval_loss"],
    }

rows = []
# full fine-tune
rows += [
    eval_with(trainer, "xlmr_full", tok_ds["test"], "test_clean"),
    eval_with(trainer, "xlmr_full", tok_ds["test_no_diac"], "test_no_diacritics"),
    eval_with(trainer, "xlmr_full", tok_ds["test_cs"], "test_codeswitch"),
]
# LoRA
rows += [
    eval_with(lora_trainer, "xlmr_lora", tok_ds["test"], "test_clean"),
    eval_with(lora_trainer, "xlmr_lora", tok_ds["test_no_diac"], "test_no_diacritics"),
    eval_with(lora_trainer, "xlmr_lora", tok_ds["test_cs"], "test_codeswitch"),
]

comp = pd.DataFrame(rows)
display(comp)

out_path = "/kaggle/working/yoruba_ner_xlmr_lora/robustness_compare.csv"
comp.to_csv(out_path, index=False)
print("Saved:", out_path)

## Model C - Mixed training (concatenate original + no-diac train)

In [None]:
feats = Features({
    "id": Value("string"),
    "tokens": Sequence(Value("string")),
    "ner_tags": Sequence(ClassLabel(names=label_names)),
})

def make_no_diac_split(split_ds):
    rows = []
    for ex in split_ds:
        rows.append({
            "id": ex["id"],
            "tokens": [strip_diacritics(t) for t in ex["tokens"]],
            "ner_tags": ex["ner_tags"],
        })
    return Dataset.from_list(rows, features=feats)

train_no_diac = make_no_diac_split(all_splits["train"])
print(train_no_diac.features)

mixed_train_raw = concatenate_datasets([all_splits["train"], train_no_diac]).shuffle(seed=SEED)
len(all_splits["train"]), len(train_no_diac), len(mixed_train_raw)

mixed_train_tok = mixed_train_raw.map(
    tokenize_and_align,
    batched=True,
    remove_columns=["id","tokens","ner_tags"]
)

mixed_args = TrainingArguments(
    output_dir="/kaggle/working/yoruba_ner_xlmr_mixed",
    eval_strategy="epoch",
    save_strategy="no",
    logging_strategy="epoch",
    learning_rate=2e-5,
    warmup_ratio=0.1,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.01,
    fp16=True,
    load_best_model_at_end=False,
    report_to="none",
    seed=SEED,
    save_total_limit=1,
    save_only_model=True
)

mixed_model = AutoModelForTokenClassification.from_pretrained(
    "xlm-roberta-base",
    num_labels=len(label_names),
    id2label=id2label,
    label2id=label2id
)

mixed_trainer = Trainer(
    model=mixed_model,
    args=mixed_args,
    train_dataset=mixed_train_tok,
    eval_dataset=tok_ds["validation"],
    tokenizer=tok,
    data_collator=collator,
    compute_metrics=compute_metrics,
)

mixed_trainer.train()
mixed_trainer.save_model("/kaggle/working/yoruba_ner_xlmr_mixed/final")

### Evaluate mixed model on all test conditions + save tables and figures

In [None]:
def eval_with_table(tr, split_name, dataset):
    out = tr.evaluate(eval_dataset=dataset)
    return {
        "split": split_name,
        "precision": out["eval_overall_precision"],
        "recall": out["eval_overall_recall"],
        "f1": out["eval_overall_f1"],
        "accuracy": out["eval_overall_accuracy"],
        "loss": out["eval_loss"]
    }

rows = []
rows.append(eval_with_table(mixed_trainer, "test_clean", tok_ds["test"]))
rows.append(eval_with_table(mixed_trainer, "test_no_diacritics", tok_ds["test_no_diac"]))
rows.append(eval_with_table(mixed_trainer, "test_codeswitch", tok_ds["test_cs"]))

mixed_results = pd.DataFrame(rows)
mixed_results.to_csv("/kaggle/working/yoruba_ner_xlmr_mixed/mixed_robustness_results.csv", index=False)
display(mixed_results)

def per_entity(tr, dataset):
    rep = tr.evaluate(eval_dataset=dataset)
    ents = ["DATE","LOC","ORG","PER"]
    return {e: rep[f"eval_{e}"]["f1"] for e in ents}

# baseline full fine-tune
full_clean = per_entity(trainer, tok_ds["test"])
full_nodiac = per_entity(trainer, tok_ds["test_no_diac"])
full_cs    = per_entity(trainer, tok_ds["test_cs"])

# mixed model
mix_clean  = per_entity(mixed_trainer, tok_ds["test"])
mix_nodiac = per_entity(mixed_trainer, tok_ds["test_no_diac"])
mix_cs     = per_entity(mixed_trainer, tok_ds["test_cs"])

def tidy(model_name, cond, d):
    return pd.DataFrame([{"model": model_name, "cond": cond, "entity": k, "f1": v} for k,v in d.items()])

per_ent_df = pd.concat([
    tidy("full","clean",full_clean), tidy("full","no_diac",full_nodiac), tidy("full","codeswitch",full_cs),
    tidy("mixed","clean",mix_clean), tidy("mixed","no_diac",mix_nodiac), tidy("mixed","codeswitch",mix_cs),
], ignore_index=True)
per_ent_df.to_csv("/kaggle/working/yoruba_ner_xlmr/per_entity_f1.csv", index=False)
per_ent_df.head()

# Figures
def bar_drop(df, model_name, out_png):
    base = df[(df.model==model_name) & (df.cond=="clean")].set_index("entity")["f1"]
    nd   = df[(df.model==model_name) & (df.cond=="no_diac")].set_index("entity")["f1"]
    cs   = df[(df.model==model_name) & (df.cond=="codeswitch")].set_index("entity")["f1"]

    entities = base.index.tolist()
    x = range(len(entities))

    plt.figure(figsize=(7,4.5))
    plt.bar([i-0.25 for i in x], base.values, width=0.25, label="clean")
    plt.bar(x, nd.values, width=0.25, label="no_diac")
    plt.bar([i+0.25 for i in x], cs.values, width=0.25, label="codeswitch")
    plt.xticks(list(x), entities)
    plt.ylabel("F1")
    plt.title(f"Per-entity F1 by condition — {model_name}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_png, dpi=160)
    plt.show()

bar_drop(per_ent_df, "full", "/kaggle/working/yoruba_ner_xlmr/fig_full_per_entity.png")
bar_drop(per_ent_df, "mixed", "/kaggle/working/yoruba_ner_xlmr_mixed/fig_mixed_per_entity.png")