In [52]:
!pip install evaluate



In [53]:
import os
from typing import List, Dict, Tuple

class Preprocessing_Maccrobat:
    def __init__(self, dataset_folder, tokenizer):
        self.file_ids = [f.split(".")[0] for f in os.listdir(dataset_folder) if f.endswith('.txt')]
        self.text_files = [f + ".txt" for f in self.file_ids]
        self.anno_files = [f + ".ann" for f in self.file_ids]
        self.num_samples = len(self.file_ids)
        
        # Load text data
        self.texts: List[str] = []
        for file in self.text_files:
            with open(os.path.join(dataset_folder, file), 'r') as f:
                self.texts.append(f.read())
        
        # Load annotations
        self.tags: List[Dict[str, str]] = []
        for file in self.anno_files:
            with open(os.path.join(dataset_folder, file), 'r') as f:
                text_bound_ann = [t.split("\t") for t in f.read().split("\n") if t.startswith("T")]
                text_bound_lst = []
                for text_b in text_bound_ann:
                    label = text_b[1].split(" ")
                    try:
                        _ = int(label[1])
                        _ = int(label[2])
                        tag = {
                            "text": text_b[-1],
                            "label": label[0],
                            "start": label[1],
                            "end": label[2]
                        }
                        text_bound_lst.append(tag)
                    except:
                        pass
                self.tags.append(text_bound_lst)
        
        self.tokenizer = tokenizer

    def process(self) -> Tuple[List[List[str]], List[List[str]]]:
        input_texts, input_labels = [], []
        
        for idx in range(self.num_samples):
            full_text = self.texts[idx]
            tags = self.tags[idx]

            label_offset, continuous_label_offset = [], []
            for tag in tags:
                offset = list(range(int(tag["start"]), int(tag["end"]) + 1))
                label_offset.append(offset)
                continuous_label_offset.extend(offset)
            
            all_offset = list(range(len(full_text)))
            zero_offset = [offset for offset in all_offset if offset not in continuous_label_offset]
            zero_offset = Preprocessing_Maccrobat.find_continuous_ranges(zero_offset)

            self.tokens, self.labels = [], []
            self._merge_offset(full_text, tags, zero_offset, label_offset)

            assert len(self.tokens) == len(self.labels), "Length of tokens and labels are not equal"

            input_texts.append(self.tokens)
            input_labels.append(self.labels)

        return input_texts, input_labels

    def _merge_offset(self, full_text, tags, zero_offset, label_offset):
        i = j = 0
        while i < len(zero_offset) and j < len(label_offset):
            if zero_offset[i][0] < label_offset[j][0]:
                self._add_zero(full_text, zero_offset, i)
                i += 1
            else:
                self._add_label(full_text, label_offset, j, tags)
                j += 1

        while i < len(zero_offset):
            self._add_zero(full_text, zero_offset, i)
            i += 1

        while j < len(label_offset):
            self._add_label(full_text, label_offset, j, tags)
            j += 1

    def _add_zero(self, full_text, offset, index):
        start, *_, end = offset[index] if len(offset[index]) > 1 else (offset[index][0], offset[index][0] + 1)
        text = full_text[start:end]
        text_tokens = self.tokenizer.tokenize(text)

        self.tokens.extend(text_tokens)
        self.labels.extend(["O"] * len(text_tokens))

    def _add_label(self, full_text, offset, index, tags):
        start, *_, end = offset[index] if len(offset[index]) > 1 else (offset[index][0], offset[index][0] + 1)
        text = full_text[start:end]
        text_tokens = self.tokenizer.tokenize(text)

        self.tokens.extend(text_tokens)
        self.labels.extend(
            [f"B-{tags[index]['label']}"] + [f"I-{tags[index]['label']}"] * (len(text_tokens) - 1)
        )

    @staticmethod
    def build_label2id(tokens: List[List[str]]):
        label2id = {}
        id_counter = 0
        for token in [token for sublist in tokens for token in sublist]:
            if token not in label2id:
                label2id[token] = id_counter
                id_counter += 1
        return label2id
        
    @staticmethod
    def find_continuous_ranges(data: List[int]):
        if not data:
            return []
        ranges = []
        start = prev = data[0]
        for number in data[1:]:
            if number != prev + 1:
                ranges.append(list(range(start, prev + 1)))
                start = number
            prev = number
        ranges.append(list(range(start, prev + 1)))
        return ranges


In [55]:
# Preprocessing
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("d4data/biomedical-ner-all")

dataset_folder = "/kaggle/input/maccrobat2018"

Maccrobat_builder = Preprocessing_Maccrobat(dataset_folder, tokenizer)
input_texts, input_labels = Maccrobat_builder.process()

label2id = Preprocessing_Maccrobat.build_label2id(input_labels)
id2label = {v: k for k, v in label2id.items()}


In [56]:
# Split
from sklearn.model_selection import train_test_split

inputs_train, inputs_val, labels_train, labels_val = train_test_split(
    input_texts,
    input_labels,
    test_size=0.2,
    random_state=42
)

In [61]:
import torch
from torch.utils.data import Dataset

MAX_LEN = 512

class NER_Dataset(Dataset):
    def __init__(self, input_texts, input_labels, tokenizer, label2id, max_len=MAX_LEN):
        super().__init__()
        self.tokens = input_texts
        self.labels = input_labels
        self.tokenizer = tokenizer
        self.label2id = label2id
        self.max_len = max_len

    def __len__(self):
        return len(self.tokens)

    def __getitem__(self, idx):
        input_token = self.tokens[idx]
        label_token = [self.label2id[label] for label in self.labels[idx]]

        input_token = self.tokenizer.convert_tokens_to_ids(input_token)
        attention_mask = [1] * len(input_token)

        input_ids = self.pad_and_truncate(input_token, pad_id=self.tokenizer.pad_token_id)
        labels = self.pad_and_truncate(label_token, pad_id=0)
        attention_mask = self.pad_and_truncate(attention_mask, pad_id=0)

        return {
            "input_ids": torch.as_tensor(input_ids),
            "labels": torch.as_tensor(labels),
            "attention_mask": torch.as_tensor(attention_mask)
        }

    def pad_and_truncate(self, inputs: List[int], pad_id: int):
        if len(inputs) < self.max_len:
            padded_inputs = inputs + [pad_id] * (self.max_len - len(inputs))
        else:
            padded_inputs = inputs[:self.max_len]
        return padded_inputs

    def label2id(self, labels: List[str]):
        return [self.label2id[label] for label in labels]

train_set = NER_Dataset(inputs_train, labels_train, tokenizer, label2id)
val_set = NER_Dataset(inputs_val, labels_val, tokenizer, label2id)

In [62]:
from transformers import AutoModelForTokenClassification
model = AutoModelForTokenClassification.from_pretrained(
    "d4data/biomedical-ner-all",
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
)

Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at d4data/biomedical-ner-all and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([84]) in the checkpoint and torch.Size([83]) in the model instantiated
- classifier.weight: found shape torch.Size([84, 768]) in the checkpoint and torch.Size([83, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [63]:
import evaluate
import numpy as np

accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    mask = labels != 0
    predictions = np.argmax(predictions, axis=-1)
    return accuracy.compute(predictions=predictions[mask], references=labels[mask])

In [64]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="out_dir",
    learning_rate=1e-4,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=20,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    optim="adamw_torch"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_set,
    eval_dataset=val_set,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

trainer.train()

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,2.493638,0.10962
2,No log,1.930282,0.248239
3,No log,1.481573,0.44577
4,No log,1.16536,0.551993
5,No log,0.969479,0.631121
6,No log,0.852793,0.684067
7,No log,0.778048,0.721269
8,No log,0.737211,0.736184
9,No log,0.71701,0.733201
10,No log,0.693431,0.749275




TrainOutput(global_step=100, training_loss=0.7360022735595703, metrics={'train_runtime': 144.0277, 'train_samples_per_second': 22.218, 'train_steps_per_second': 0.694, 'total_flos': 418702245888000.0, 'train_loss': 0.7360022735595703, 'epoch': 20.0})

In [65]:
test_sentence = """A 48 year -old female presented with vaginal bleeding and abnormal
Pap smears. Upon diagnosis of invasive non -keratinizing SCC of the cervix , she
underwent a radical hysterectomy with salpingo -oophorectomy which demonstrated
positive spread to the pelvic lymph nodes and the parametrium. Pathological
examination revealed that the tumour also extensively involved the lower uterine
segment. """

# tokenization
_input = torch.as_tensor([tokenizer.convert_tokens_to_ids(test_sentence.split())])

_input = _input.to("cuda")

# prediction
outputs = model(_input)
_, preds = torch.max(outputs.logits, -1)
preds = preds[0].cpu().numpy()

# decode
for token, pred in zip(test_sentence.split(), preds):
    print(f"{token}\t{id2label[pred]}")

A	O
48	B-Date
year	I-Date
-old	I-Age
female	B-Sex
presented	O
with	O
vaginal	B-Detailed_description
bleeding	B-Sign_symptom
and	O
abnormal	B-Lab_value
Pap	B-Lab_value
smears.	B-Lab_value
Upon	O
diagnosis	O
of	O
invasive	B-Detailed_description
non	O
-keratinizing	O
SCC	O
of	O
the	O
cervix	O
,	O
she	O
underwent	O
a	O
radical	O
hysterectomy	B-Lab_value
with	O
salpingo	B-Lab_value
-oophorectomy	I-Detailed_description
which	O
demonstrated	O
positive	B-Lab_value
spread	O
to	O
the	O
pelvic	B-Biological_structure
lymph	I-Biological_structure
nodes	O
and	O
the	O
parametrium.	B-Diagnostic_procedure
Pathological	I-Diagnostic_procedure
examination	B-Diagnostic_procedure
revealed	O
that	O
the	O
tumour	O
also	O
extensively	O
involved	O
the	O
lower	O
uterine	O
segment.	I-Detailed_description
