In [None]:
!pip install datasets transformers > /dev/null

In [None]:
import os
import sys
import functools
from typing import List, Tuple, Mapping


from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader

import datasets
from transformers import AutoTokenizer, AutoModelForTokenClassification

In [None]:
dataset = datasets.load_dataset("benjamin/ner-uk")

dataset

In [None]:
targets = set()
for split in ("train", "validation", "test"):
    for sample in dataset[split]:
        targets.update(sample["ner_tags"])

targets = sorted(targets)
print("Unique targets:", len(targets))
targets

In [None]:
# TASK: Using the hugging face models find the best model.
#       You could try multiligual models or use another UKR model.
#       HF models - https://huggingface.co/models
#       Examples: `nikitast/lang-segmentation-roberta`, `wietsedv/xlm-roberta-base-ft-udpos28-uk`, `google-bert/bert-base-multilingual-cased` etc.
model_id = 'ukr-models/uk-ner'

tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
tokenizer

In [None]:
sample = dataset["train"][20]
tmp = tokenizer(sample["tokens"], truncation=True, is_split_into_words=True)

print(">>", sample["tokens"])
print(">>", tmp["input_ids"])
print(">>", sample["ner_tags"])
print(">>", [tokenizer._tokenizer.id_to_token(tok) for tok in tmp["input_ids"]])
print(">>", tmp.word_ids())

## Datasets & DataLoaders

In [None]:
def tokenize_and_align(sample: Mapping[str, List[int]]) -> Tuple[List[int], List[int], List[int], List[int]]:
    words = sample["tokens"]
    ner_tags = sample["ner_tags"]

    tokenized_input = tokenizer(words, truncation=True, is_split_into_words=True)
    word_ids = tokenized_input.word_ids()

    # NOTE: The modern approach of solving NER classification when there are annotations for words
    #       is split the words into tokens and mark only the first token of a word tokens with
    #       NER label and the other tokens need to ignore, for example if you have something like this:
    #       Words:
    #         ['Вони', 'абсолютно', 'відповідають', 'Глобальному', 'договору', 'та', 'Цілям', 'сталого', 'розвитку', 'ООН', '.']
    #       NER labels:
    #         [     0,           0,              0,             7,          8,    8,       8,         8,          8,     8,   0]
    #       After the words tokenization you will have output like this (special tokens was ommited):
    #         [13825, 10241, 30086, 11358, 3151, 23012, 105, 15168, 489, 7414, 19406, 7275, 695, 5743, 16644, 6, 5]
    #       And we have a word ids for each of this token ids:
    #         [0,     1,     2,     3,     3,    3,     3,   4,     5,   6,    6,     7,    7,   8,    9,    10, 10]
    #       We see that 3d word consits of [11358, 3151, 23012, 105] tokens and so on.
    #       So, the "modern" appoach of token alignment will produce alignment:
    #         [0,     0,     0,     7,  -100, -100,  -100,   8,     8,   8, -100,     8, -100,   8,    8,     0, -100]

    prev_word_index = None
    label_ids = []
    for word_index in word_ids:
        # special tokens have a word id that is None.
        # set the label to -100 so they are automatically ignored in the loss function.
        if word_index is None:
            label_ids.append(-100)
        elif word_index != prev_word_index: # set the label for the first token of each word
            label_ids.append(ner_tags[word_index])
        else:
            # set current label for the other tokens, or you could set -100
            label_ids.append(-100)
            # label_ids.append(ner_tags[word_index])
        prev_word_index = word_index

    return tokenized_input["input_ids"], tokenized_input.word_ids(), tokenized_input["attention_mask"], label_ids


def dataset_mapping_fn(sample: Mapping[str, List[int]]) -> Mapping[str, List[int]]:
    sample["input_ids"], sample["word_numbers"], sample["attention_mask"], sample["label_ids"] = tokenize_and_align(sample)
    return sample

In [None]:
dataset = dataset.map(dataset_mapping_fn)
dataset

In [None]:
class NERDataset(Dataset):
    def __init__(self, dataset: datasets.Dataset) -> None:
        self.dataset = dataset

    def __len__(self) -> int:
        return len(self.dataset)

    def __getitem__(self, idx: int) -> Tuple[List[int], List[int], List[int]]:
        sample = self.dataset[idx]
        x = torch.LongTensor(sample["input_ids"]), torch.LongTensor(sample["attention_mask"])
        y = torch.LongTensor(sample["label_ids"])
        return x, y

In [None]:
def collator(
    batch: List[Tuple[List[int], List[int], List[int]]],
    pad_token: int,
) -> Tuple[Mapping[str, torch.LongTensor], torch.LongTensor]:
    input_ids = pad_sequence([x[0] for x, _ in batch], batch_first=True, padding_value=pad_token)
    attention_mask = pad_sequence([x[1] for x, _ in batch], batch_first=True, padding_value=0)
    label_ids = pad_sequence([y for _, y in batch], batch_first=True, padding_value=-100)
    return {"input_ids": input_ids, "attention_mask": attention_mask}, label_ids

In [None]:
batch_size = 8
n_workers = os.cpu_count()
dataset_collator = functools.partial(collator, pad_token=tokenizer.pad_token_id)

train_loader = DataLoader(
    NERDataset(dataset["train"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=True,
    drop_last=True,
)
print("Train\n dataset size: {}\n  num batches: {}".format(len(train_loader.dataset), len(train_loader)))
print()
valid_loader = DataLoader(
    NERDataset(dataset["validation"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=False,
    drop_last=False,
)
print("Validation\n dataset size: {}\n  num batches: {}".format(len(valid_loader.dataset), len(valid_loader)))
print()
test_loader = DataLoader(
    NERDataset(dataset["test"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=False,
    drop_last=False,
)
print("Test\n dataset size: {}\n  num batches: {}".format(len(test_loader.dataset), len(test_loader)))

## Training & Evaluation

In [None]:
def sequence_f1(true_labels: np.array, predicted_labels: np.array) -> np.array:
    """F1 score for one sequence.

    Args:
        true_labels: ground truth labels.
        predicted_labels: model predictions.

    Returns:
        F1 scores for each class.
    """
    assert len(true_labels) == len(predicted_labels), "Mismatched length between true labels and predicted labels"

    scores = []
    for _cls in targets:
        true_positives = np.sum((true_labels == predicted_labels) & (true_labels == _cls))
        false_positives = np.sum((true_labels != predicted_labels) & (predicted_labels == _cls))
        false_negatives = np.sum((true_labels != predicted_labels) & (true_labels == _cls))

        precision = np.nan_to_num(true_positives / (true_positives + false_positives), nan=0.0)
        recall = np.nan_to_num(true_positives / (true_positives + false_negatives), nan=0.0)
        f1_score = np.nan_to_num(2 * (precision * recall) / (precision + recall), nan=0.0)

        scores.append(f1_score)
    return np.array(scores)

In [None]:
def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str = "cpu",
    verbose: bool = True,
) -> Mapping[str, np.array]:
    """Train model one epoch.

    Args:
        model: model to train.
        loader: dataloader to use for training.
        criterion: loss function to optimize.
        optimizer: model training algorithm.
        device: device to use for training.
            Default is `"cpu"`.
        verbose: option to print training progress bar.
            Default is `True`.

    Returns:
        dict with training logs
    """
    model.train()

    losses = []
    scores = []

    with tqdm(total=len(loader), desc="training", file=sys.stdout, ncols=100, disable=not verbose) as progress:
        for x_batch, y_true in loader:
            x_batch = {k: v.to(device) for k, v in x_batch.items()}
            y_true = y_true.to(device)

            optimizer.zero_grad()

            log_prob = model(**x_batch).logits

            B, T = y_true.shape
            loss = criterion(log_prob.view(B * T, -1), y_true.view(B * T))

            loss.backward()
            losses.append(loss.item())

            y_pred = log_prob.argmax(2).detach().cpu().numpy()
            y_true = y_true.detach().cpu().numpy()
            padding_mask = y_true != -100
            for i in range(y_true.shape[0]):
                scores.append(sequence_f1(y_true[i][padding_mask[i]], y_pred[i][padding_mask[i]]))

            progress.set_postfix_str(f"loss {losses[-1]:.4f}")

            optimizer.step()

            progress.update(1)

    logs = {
        "losses": np.array(losses),
        "f1": np.array(scores)
    }
    return logs

In [None]:
@torch.inference_mode()
def evaluate(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    device: str = "cpu",
    verbose: bool = True,
) -> Mapping[str, np.array]:
    """Model evaluation.

    Args:
        model: model to evaluate.
        loader: dataloader to use for evaluation.
        criterion: loss function.
        device: device to use for evaluation.
            Default is `"cpu"`.
        verbose: option to print evaluation progress bar.
            Default is `True`.

    Returns:
        dict with evaluation logs
    """
    model.eval()

    losses = []
    scores = []

    for x_batch, y_true in tqdm(loader, desc="evaluation", file=sys.stdout, ncols=100, disable=not verbose):
        x_batch = {k: v.to(device) for k, v in x_batch.items()}
        y_true = y_true.to(device)

        log_prob = model(**x_batch).logits

        B, T = y_true.shape
        loss = criterion(log_prob.view(B * T, -1), y_true.view(B * T))

        losses.append(loss.item())

        y_pred = log_prob.argmax(2).detach().cpu().numpy()
        y_true = y_true.detach().cpu().numpy()
        padding_mask = y_true != -100
        for i in range(y_true.shape[0]):
            scores.append(sequence_f1(y_true[i][padding_mask[i]], y_pred[i][padding_mask[i]]))


    logs = {
        "losses": np.array(losses),
        "f1": np.array(scores)
    }
    return logs

## Training

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device - {device}")

In [None]:
model = AutoModelForTokenClassification.from_pretrained(model_id)
torch.manual_seed(42)
model.classifier = nn.Linear(model.classifier.in_features, len(targets))
model = model.to(device)
print(model)
print("Number of trainable parameters - {:,}".format(sum(p.numel() for p in model.parameters() if p.requires_grad)))

criterion = nn.CrossEntropyLoss(ignore_index=-100)
# NOTE: You can change learning rate to find a better model.
#       Please be carefull - transformers models are sensitive to learning rates,
#       if you take to high learning rate then your model will not converge.
optimizer = optim.Adam(model.parameters(), lr=2e-5)

In [None]:
# NOTE: you can change number of epochs to train a better model
n_epochs = 5

train_losses = []
train_scores = []

valid_losses = []
valid_scores = []

best_score = float("-inf")

for ep in range(n_epochs):
    print(f"\nEpoch {ep + 1:2d}/{n_epochs:2d}")

    train_logs = train_one_epoch(model, train_loader, criterion, optimizer, device, verbose=True)
    train_losses.append(np.mean(train_logs["losses"]))
    train_scores.append(np.mean(train_logs["f1"], 0))
    print("      loss:", train_losses[-1])
    print("        f1:", train_scores[-1].mean(), train_scores[-1])


    valid_logs = evaluate(model, valid_loader, criterion, device, verbose=True)
    valid_losses.append(np.mean(valid_logs["losses"]))
    valid_scores.append(np.mean(valid_logs["f1"], 0))
    print("      loss:", valid_losses[-1])
    print("        f1:", valid_scores[-1].mean(), valid_scores[-1])

    if valid_scores[-1].mean() >= best_score:
        checkpoint = {
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "epoch": ep,
            "num_epochs": n_epochs,
            "metrics": {
                "training": {"loss": train_losses[-1], "accuracy": train_scores[-1]},
                "validation": {"loss": valid_losses[-1], "accuracy": valid_scores[-1]},
            },
        }
        torch.save(checkpoint, "best.pth")
        print("🟢 Saved new best state! 🟢")
        best_score = valid_scores[-1].mean()  # update best score to a new one