In [1]:
import os
import evaluate
import numpy as np
from typing import List, Dict, Tuple

import torch
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer

In [2]:
class PreprocessingMaccrobat:
    def __init__(self, dataset_folder, tokenizer):
        self.file_ids = [f.split('.')[0] for f in os.listdir(dataset_folder) if f.endswith('.txt')]

        self.text_files = [f + '.txt' for f in self.file_ids]
        self.anno_files = [f + '.ann' for f in self.file_ids]

        self.num_samples = len(self.file_ids)

        self.texts: List[str] = []
        for i in range(self.num_samples):
            file_path = os.path.join(dataset_folder, self.text_files[i])
            with open(file_path, 'r') as f:
                self.texts.append(f.read())

        self.tags: List[Dict[str, str]] = []
        for i in range(self.num_samples):
            file_path = os.path.join(dataset_folder, self.anno_files[i])
            with open(file_path, 'r') as f:
                text_bound_ann = [t.split('\t') for t in f.read().split('\n') if t.startswith('T')]
                text_bound_lst = []
                for text_b in text_bound_ann:
                    label = text_b[1].split(' ')
                    try:
                        _ = int(label[1])
                        _ = int(label[2])
                        tag = {
                            'text': text_b[-1],
                            'label': label[0],
                            'start': label[1],
                            'end': label[2]
                        }
                        text_bound_lst.append(tag)
                    except:
                        pass

                self.tags.append(text_bound_lst)
        self.tokenizer = tokenizer

    def process(self) -> Tuple[List[List[str]], List[List[str]]]:
        input_texts = []
        input_labels = []

        for idx in range(self.num_samples):
            full_text = self.texts[idx]
            tags = self.tags[idx]

            label_offset = []
            continous_label_offset = []
            for tag in tags:
                offset = list(range(int(tag['start']), int(tag['end']) + 1))
                label_offset.append(offset)
                continous_label_offset.extend(offset)

            all_offset = list(range(len(full_text)))
            zero_offset = [offset for offset in all_offset if offset not in continous_label_offset]
            zero_offset = PreprocessingMaccrobat.find_continous_range(zero_offset)

            self.tokens = []
            self.labels = []
            self._merge_offset(full_text, tags, zero_offset, label_offset)
            assert len(self.tokens) == len(self.labels), f'Length of tokens and labels are not equal'

            input_texts.append(self.tokens)
            input_labels.append(self.labels)

        return input_texts, input_labels

    def _merge_offset(self, full_text, tags, zero_offset, label_offset):
        i = j = 0
        while i < len(zero_offset) and j < len(label_offset):
            if zero_offset[i][0] < label_offset[j][0]:
                self._add_zero(full_text, zero_offset, i)
                i += 1
            else:
                self._add_label(full_text, label_offset, j, tags)
                j += 1

        while i < len(zero_offset):
            self._add_zero(full_text, zero_offset, i)
            i += 1

        while j < len(label_offset):
            self._add_label(full_text, label_offset, j, tags)
            j += 1

    def _add_zero(self, full_text, offset, index):
        start, *_, end = offset[index] if len(offset[index]) > 1 else (offset[index][0], offset[index][0] + 1)
        text = full_text[start:end]
        text_tokens = self.tokenizer.tokenize(text)

        self.tokens.extend(text_tokens)
        self.labels.extend(
            ['0'] * len(text_tokens)
        )

    def _add_label(self, full_text, offset, index, tags):
        start, *_, end = offset[index] if len(offset[index]) > 1 else (offset[index][0], offset[index][0] + 1)
        text = full_text[start:end]
        text_tokens = self.tokenizer.tokenize(text)

        self.tokens.extend(text_tokens)
        self.labels.extend(
            [f'B-{tags[index]["label"]}'] + [f'I-{tags[index]["label"]}'] * (len(text_tokens) - 1)
        )

    @staticmethod
    def build_label2id(tokens: List[List[str]]):
        label2id = {}
        id_counter = 0
        for token in [token for sublist in tokens for token in sublist]:
            if token not in label2id:
                label2id[token] = id_counter
                id_counter += 1
        return label2id

    @staticmethod
    def find_continous_range(data: List[int]):
        if not data:
            return []
        ranges = []
        start = data[0]
        prev = data[0]
        for number in data[1:]:
            if number != prev + 1:
                ranges.append(list(range(start, prev + 1)))
                start = number
            prev = number
        ranges.append(list(range(start, prev + 1)))
        return ranges

In [3]:
tokenizer = AutoTokenizer.from_pretrained('d4data/biomedical-ner-all')

dataset_folder = './MACCROBAT2020'

maccrobat_builder = PreprocessingMaccrobat(dataset_folder, tokenizer)
input_texts, input_labels = maccrobat_builder.process()

label2id = PreprocessingMaccrobat.build_label2id(input_labels)
id2label = {v: k for k, v in label2id.items()}

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/373 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

In [4]:
inputs_train, inputs_val, labels_train, labels_val = train_test_split(
    input_texts,
    input_labels,
    test_size=0.2,
    random_state=42
)

In [5]:
MAX_LEN = 512

class NERDataset(Dataset):
    def __init__(self, input_texts, input_labels, tokenizer, label2id, max_len=MAX_LEN):
        super().__init__()
        self.tokens = input_texts
        self.labels = input_labels
        self.tokenizer = tokenizer
        self.label2id = label2id
        self.max_len = max_len

    def __len__(self):
        return len(self.tokens)

    def __getitem__(self, idx):
        input_token = self.tokens[idx]
        label_token = [self.label2id[label] for label in self.labels[idx]]

        input_token = self.tokenizer.convert_tokens_to_ids(input_token)
        attention_mask = [1] * len(input_token)

        input_ids = self.pad_and_truncate(input_token, pad_id=self.tokenizer.pad_token_id)
        labels = self.pad_and_truncate(label_token, pad_id=0)
        attention_mask = self.pad_and_truncate(attention_mask, pad_id=0)

        return {
            'input_ids': torch.as_tensor(input_ids),
            'labels': torch.as_tensor(labels),
            'attention_mask': torch.as_tensor(attention_mask)
        }

    def pad_and_truncate(self, inputs: List[int], pad_id: int):
        if len(inputs) < self.max_len:
            padded_inputs = inputs + [pad_id] * (self.max_len - len(inputs))
        else:
            padded_inputs = inputs[:self.max_len]
        return padded_inputs

    def label2id(self, labels: List[str]):
        return [self.label2id[label] for label in labels]

In [6]:
train_set = NERDataset(inputs_train, labels_train, tokenizer, label2id)
val_set = NERDataset(inputs_val, labels_val, tokenizer, label2id)

In [7]:
model = AutoModelForTokenClassification.from_pretrained(
    'd4data/biomedical-ner-all',
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
)

config.json:   0%|          | 0.00/5.00k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/266M [00:00<?, ?B/s]

Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at d4data/biomedical-ner-all and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([84]) in the checkpoint and torch.Size([83]) in the model instantiated
- classifier.weight: found shape torch.Size([84, 768]) in the checkpoint and torch.Size([83, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
accuracy = evaluate.load('accuracy')

def compute_metrics(eval_pred):
    predictions, label = eval_pred
    mask = label != 0
    predictions = np.argmax(predictions, -1)
    return accuracy.compute(predictions=predictions[mask], references=label[mask])

Downloading builder script:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

In [9]:
training_args = TrainingArguments(
    output_dir='out_dir',
    report_to='none',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=20,
    eval_strategy='epoch',
    save_strategy='epoch',
    load_best_model_at_end=True,
    optim='adamw_torch'
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_set,
    eval_dataset=val_set,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()

  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,1.881167,0.330678
2,No log,1.230813,0.549387
3,No log,0.899778,0.657415
4,No log,0.739923,0.711128
5,No log,0.656077,0.743082
6,No log,0.598652,0.773837
7,No log,0.578896,0.778292
8,No log,0.559807,0.786259
9,No log,0.547302,0.796368
10,No log,0.548069,0.79671


TrainOutput(global_step=200, training_loss=0.6099908447265625, metrics={'train_runtime': 272.5364, 'train_samples_per_second': 11.742, 'train_steps_per_second': 0.734, 'total_flos': 418702245888000.0, 'train_loss': 0.6099908447265625, 'epoch': 20.0})

In [10]:
test_sentence = """A 48 year - old female presented with vaginal bleeding and abnormal Pap smears .
Upon diagnosis of invasive non - keratinizing SCC of the cervix ,
she underwent a radical hysterectomy with salpingo - oophorectomy
which demonstrated positive spread to the pelvic lymph nodes and the parametrium .
Pathological examination revealed that the tumour also extensively involved the lower uterine segment .
"""
input = torch.as_tensor([tokenizer.convert_tokens_to_ids(test_sentence.split())])
input = input.to('cuda')

outputs = model(input)
_, preds = torch.max(outputs.logits, -1)
preds = preds[0].cpu().numpy()

for token, pred in zip(test_sentence.split(), preds):
    print(f'{token}\t{id2label[pred]}')

A	0
48	B-Age
year	I-Age
-	I-Age
old	I-Age
female	B-Sex
presented	0
with	0
vaginal	0
bleeding	B-Sign_symptom
and	0
abnormal	B-Detailed_description
Pap	0
smears	0
.	0
Upon	0
diagnosis	0
of	0
invasive	B-Detailed_description
non	0
-	I-Detailed_description
keratinizing	I-Detailed_description
SCC	0
of	0
the	0
cervix	0
,	0
she	0
underwent	0
a	0
radical	0
hysterectomy	B-Detailed_description
with	0
salpingo	B-Detailed_description
-	I-Detailed_description
oophorectomy	I-Detailed_description
which	0
demonstrated	0
positive	B-Lab_value
spread	0
to	0
the	0
pelvic	0
lymph	0
nodes	0
and	0
the	0
parametrium	0
.	0
Pathological	0
examination	0
revealed	0
that	0
the	0
tumour	0
also	0
extensively	0
involved	0
the	0
lower	0
uterine	0
segment	0
.	0
