# Named Entity Recognition

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn

from collections import defaultdict
from datasets import DatasetDict, get_dataset_config_names, load_dataset
from seqeval.metrics import classification_report, f1_score
from transformers import AutoConfig, AutoTokenizer, Trainer, TrainingArguments, XLMRobertaConfig, DataCollatorForTokenClassification
from transformers.modeling_outputs import TokenClassifierOutput
from transformers.models.roberta.modeling_roberta import RobertaModel, RobertaPreTrainedModel


device = torch.device("cuda") if torch.cuda.is_available() else "cpu"

## Build the dataset

We will create a NER dataset containing German, French, English and Italian language.

In [None]:
xtreme_subsets = get_dataset_config_names("xtreme")
print(f"The xtreme dataset has {len(xtreme_subsets)} subsets")

panx_subsets = [s for s in xtreme_subsets if s.startswith("PAN")]
print(f"There are {len(panx_subsets)} PAN-X subsets")

langs = ["de", "fr", "it", "en"]
fracs = [0.629, 0.229, 0.084, 0.059]

panx_ch = defaultdict(DatasetDict)

for lang, frac in zip(langs, fracs):
    # Load corpus for one language
    ds = load_dataset("xtreme", name=f"PAN-X.{lang}")
    # shuffle and downsample each split according to spoken proportion
    for split in ds:
        panx_ch[lang][split] = (
            ds[split].shuffle(seed=0).select(range(int(frac * ds[split].num_rows)))
        )

## Dataset Inspection

In [None]:
element = panx_ch["de"]["train"][0]
element

In [None]:
for key, value in panx_ch["de"]["train"].features.items():
    print(f"{key}: {value}")

In [None]:
tags = panx_ch["de"]["train"].features["ner_tags"].feature
tags

In [None]:
def create_tag_names(batch):
    return {"ner_tags_str": [tags.int2str(idx) for idx in batch["ner_tags"]]}

panx_de = panx_ch["de"].map(create_tag_names)
panx_de

In [None]:
de_example = panx_de["train"][0]
pd.DataFrame([de_example["tokens"], de_example["ner_tags_str"]], ["Tokens", "Tags"])

## Token Classification Model

In [None]:
class XLMRobertaForTokenClassification(RobertaPreTrainedModel):
    config_class = XLMRobertaConfig

    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        # load roberta feature extractor
        self.roberta = RobertaModel(config, add_pooling_layer=False)
        # model head
        self.head = nn.Sequential(
            nn.Dropout(config.hidden_dropout_prob),
            nn.Linear(config.hidden_size, config.num_labels)
        )
        # load and initialize weights
        self.init_weights()

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None, **kwargs):
        # extract features
        outputs = self.roberta(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        # send features through classification head
        logits = self.head(outputs.last_hidden_state)
        # calculate loss
        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits.view(-1, self.num_labels), labels.view(-1))
        # return model output
        return TokenClassifierOutput(loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions)

## Text Tokenization

In [None]:
index2tag = {idx: tag for idx, tag in enumerate(tags.names)}
tag2index = {tag: idx for idx, tag in enumerate(tags.names)}

In [None]:
bert_model_name = "bert-base-cased"
xlmr_model_name = "xlm-roberta-base"

bert_tokenizer = AutoTokenizer.from_pretrained(bert_model_name)
xlmr_tokenizer = AutoTokenizer.from_pretrained(xlmr_model_name)

xlmr_config = AutoConfig.from_pretrained(xlmr_model_name, num_labels=tags.num_classes, label2id=tag2index, id2label=index2tag)
xlmr_model = XLMRobertaForTokenClassification.from_pretrained(xlmr_model_name, config=xlmr_config).to(device)

In [None]:
def tag_text(text, tags, model, tokenizer):
    # Get the tokens
    tokens = tokenizer(text).tokens()
    # Encode seqeunce into IDs
    input_ids = xlmr_tokenizer(text, return_tensors="pt").input_ids
    # Get predictions as distribution over 7 possible classes
    outputs = model(input_ids)[0]
    # Take argmax to get most likely class per token
    predictions = torch.argmax(outputs, dim=2)
    # Convert to DataFrame
    preds = [tags.names[p] for p in predictions[0].cpu().numpy()]
    return pd.DataFrame([tokens, preds], index=["Tokens", "Tags"])

In [None]:
words, labels = de_example["tokens"], de_example["ner_tags"]
words, labels

In [None]:
tokenized_input = xlmr_tokenizer(de_example["tokens"], is_split_into_words=True)
tokens = xlmr_tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
word_ids = tokenized_input.word_ids()
pd.DataFrame([tokens, word_ids], index=["Tokens", "Word IDs"])

In [None]:
previous_word_idx = None
label_ids = []

for word_idx in word_ids:
    # mask all special characters and double NER tags
    if word_idx is None or word_idx == previous_word_idx:
        label_ids.append(-100) # -100 is the value of ignore_index in PyTorch's CELoss function
    elif word_idx != previous_word_idx:
        label_ids.append(labels[word_idx])
    previous_word_idx = word_idx

labels = [index2tag[l] if l != -100 else "IGN" for l in label_ids]
index = ["Tokens", "Word IDs", "Label IDs", "Labels"]

pd.DataFrame([tokens, word_ids, label_ids, labels], index=index)

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = xlmr_tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    for idx, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=idx)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None or word_idx == previous_word_idx:
                label_ids.append(-100)
            else:
                label_ids.append(label[word_idx])
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

def encode_panx_dataset(corpus):
    return corpus.map(tokenize_and_align_labels, batched=True, remove_columns=["langs", "ner_tags", "tokens"])

panx_de_encoded = encode_panx_dataset(panx_ch["de"])

## Performance Measurement

In [None]:
y_true = [["O", "O", "O", "B-MISC", "I-MISC", "O", "B-PER", "I-PER", "O"], ["B-PER", "I-PER", "O"]]
y_pred = [["O", "O", "O", "B-MISC", "I-MISC", "I-MISC", "B-PER", "I-PER", "O"], ["B-PER", "O", "I-PER"]]

print(classification_report(y_true, y_pred))

In [None]:
def align_predictions(predictions, label_ids):
    """Convert mdoel outputs into seqeval format."""
    # First get the most likely IDs
    preds = np.argmax(predictions, axis=2)
    batch_size, seq_len = preds.shape
    # these are the two lists seqeval expects
    labels_list, preds_list = [], []

    for batch_idx in range(batch_size): # iterate over batches
        example_labels, example_preds = [], []
        for seq_idx in range(seq_len): # iterate over sequence IDs
            # ignore label IDs = -100
            if label_ids[batch_idx, seq_idx] != -100:
                # map IDs to tags
                example_labels.append(index2tag[label_ids[batch_idx][seq_idx]])
                example_preds.append(index2tag[preds[batch_idx][seq_idx]])
            labels_list.append(example_labels)
            preds_list.append(example_preds)

def compute_metrics(eval_pred):
    """Compute metrics during training."""
    y_pred, y_true = align_predictions(eval_pred.predictions, eval_pred.label_ids)
    return {"f1", f1_score(y_true, y_pred)}

## Model Training

In [None]:
num_epochs = 3
batch_size = 4
logging_steps = len(panx_de_encoded["train"]) // batch_size
model_name = f"{xlmr_model_name}-finetuned-panx-de"

training_args = TrainingArguments(
    output_dir = f"../../checkpoints/{model_name}",
    log_level="error",
    num_train_epochs=num_epochs,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    save_steps=1e6,
    weight_decay=0.01,
    disable_tqdm=False,
    logging_steps=logging_steps,
    push_to_hub=False,
)

In [None]:
# The data collator pads each input sequence to the largest sequence length in a batch
data_collator = DataCollatorForTokenClassification(xlmr_tokenizer)

def model_init():
    """Initialize a new model."""
    return XLMRobertaForTokenClassification.from_pretrained(xlmr_model_name, config=xlmr_config).to(device)

In [None]:
trainer = Trainer(
    model_init=model_init,
    args=training_args,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    train_dataset=panx_de_encoded["train"],
    eval_dataset=panx_de_encoded["validation"],
    tokenizer=xlmr_tokenizer,
)

In [None]:
# Train the model
trainer.train()

## Inference

In [None]:
text_de = "Jeff Dean ist ein Informatiker bei Google in Kalifornien"
tag_text(text_de, tags, trainer.model, xlmr_tokenizer)