In [None]:
# imports
import os
import numpy as np
from datasets import load_dataset, ClassLabel, load_metric
import evaluate
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    DataCollatorForTokenClassification,
    TrainingArguments,
    Trainer,
    set_seed
)

In [None]:
# consts
set_seed(42)

MODEL_NAME = "microsoft/deberta-v3-base"
OUTPUT_DIR = "deberta-v3-base-conll2003-ner"
MAX_LENGTH = 256            # safe for 8GB GPU
PER_DEVICE_TRAIN_BATCH_SIZE = 8   # tune downward if OOM; gradient_accumulation_steps simulates larger batch
GRADIENT_ACCUMULATION_STEPS = 2    # effective batch = 16
PER_DEVICE_EVAL_BATCH_SIZE = 16
NUM_EPOCHS = 4
LEARNING_RATE = 2e-5
WEIGHT_DECAY = 0.01
WARMUP_RATIO = 0.06

In [None]:
# dataset
raw_datasets = load_dataset("conll2003")

In [None]:
# BIO tags
features = raw_datasets["train"].features
ner_feature = features["ner_tags"]
label_list = ner_feature.feature.names   # e.g., ['O','B-PER',...]
num_labels = len(label_list)
print("Labels:", label_list)

In [None]:
# tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model = AutoModelForTokenClassification.from_pretrained(MODEL_NAME, num_labels=num_labels)

In [None]:
# enable (or NOT) gradient checkpoint
# try:
#     model.gradient_checkpointing_enable()
#     print("Gradient checkpointing enabled")
# except Exception:
#     pass

In [None]:
# tokenize and align labels
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        is_split_into_words=True,
        truncation=True,
        max_length=MAX_LENGTH,
        return_overflowing_tokens=False,
        return_offsets_mapping=False,
    )

    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # list with length = #input_ids
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  # special tokens
            elif word_idx != previous_word_idx:
                # first token of the word -> use label
                label_ids.append(label[word_idx])
            else:
                # for subsequent subtokens:
                # option A: set -100 to ignore
                # option B: repeat label (I- vs B- preference)
                # We'll repeat the label (simple approach) to help span continuity
                label_ids.append(label[word_idx])
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
# tokenize datasets
tokenized_datasets = raw_datasets.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=raw_datasets["train"].column_names
)

In [None]:
# data collator
data_collator = DataCollatorForTokenClassification(tokenizer)

In [None]:
# metrics
metric = load_metric("seqeval")

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_labels = [
        [label_list[l] for (l, p) in zip(label, pred) if l != -100]
        for label, pred in zip(labels, predictions)
    ]
    true_predictions = [
        [label_list[p] for (l, p) in zip(label, pred) if l != -100]
        for label, pred in zip(labels, predictions)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }


In [None]:
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    logging_steps=200,
    per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH_SIZE,
    per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH_SIZE,
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    num_train_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    warmup_ratio=WARMUP_RATIO,
    fp16=True,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    save_total_limit=3,
    dataloader_num_workers=2,
    run_name="deberta-v3-base-conll",
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)

In [None]:
trainer.train()

In [None]:
trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)

# 11) Quick eval on test set
print("Evaluating on test set...")
metrics = trainer.evaluate(tokenized_datasets["test"])
print(metrics)

In [None]:
def predict_sentence(sentence_tokens):
    # sentence_tokens: list of tokens (words)
    enc = tokenizer(sentence_tokens, is_split_into_words=True, return_tensors="pt", truncation=True, max_length=MAX_LENGTH)
    # send to same device as model
    device = trainer.model.device
    enc = {k: v.to(device) for k, v in enc.items()}
    outputs = trainer.model(**enc)
    logits = outputs.logits.detach().cpu().numpy()
    pred_ids = np.argmax(logits, axis=-1)[0]
    word_ids = tokenizer(sentence_tokens, is_split_into_words=True).word_ids()
    # align back
    pred_labels = []
    last_w = None
    for idx, wid in enumerate(word_ids):
        if wid is None:
            continue
        # take labels for first subtoken of each word (wid change)
        if wid != last_w:
            pred_labels.append(label_list[pred_ids[idx]])
        last_w = wid
    return list(zip(sentence_tokens, pred_labels))

In [None]:
example = ["EU", "rejects", "German", "call", "to", "boycott", "British", "lamb", "."]
print(predict_sentence(example))