<a href="https://colab.research.google.com/github/aliyyah-u/NLP_Medical_NER/blob/main/NLP_CW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Load & Preprocess Dataset, Tokenisation, Baseline Model

In [None]:
!pip install -q datasets spacy evaluate
!pip install -q transformers torch
!python -m spacy download en_core_web_sm

from datasets import load_dataset
import spacy
import pandas as pd
from collections import Counter
from sklearn.metrics import classification_report
from transformers import AutoTokenizer, DataCollatorForTokenClassification
import evaluate
import numpy as np
import os
os.environ["WANDB_DISABLED"] = "true"

ds = load_dataset("rjac/biobert-ner-diseases-dataset")
print("\nInitial dataset:",ds)

# extract label names from tag values
label_list = ds["train"].features["tags"].feature.names

# NER tag distribution in training set
all_labels = [tag for seq in ds["train"]["tags"] for tag in seq]
label_counts = Counter(all_labels)
print("\nNER tag distribution (train):")
for i, count in label_counts.items():
    print(label_list[i]+ ":" + str(count))

# preprocessing (POS tagging)
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner", "lemmatizer"])

def add_pos_tags(batch):
    texts = [" ".join(tokens) for tokens in batch["tokens"]]
    batch["pos_tags"] = []
    for doc in nlp.pipe(texts):
        batch["pos_tags"].append([token.pos_ for token in doc])
    return batch

ds = ds.map(add_pos_tags, batched=True)

all_pos_tags = [tag for sublist in ds["train"]["pos_tags"] for tag in sublist]
pos_counts = Counter(all_pos_tags)
print("\nPOS Tag Distribution in training dataset:")
print(pos_counts.most_common())

for tag in set(all_pos_tags):
    print(tag + ": " + str(spacy.explain(tag)))

print("Dataset sample:")
print({
    "tokens": ds["train"][0]["tokens"][:5],
    "tags": ds["train"][0]["tags"][:5],
    "pos_tags": ds["train"][0]["pos_tags"][:5]
})

# Tokenisation
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, padding=True, is_split_into_words=True)
    labels = []

    for i, mylabel in enumerate(examples["tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(mylabel[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

tokenized_ds = ds.map(tokenize_and_align_labels, batched=True)

# baseline model
ds["test"] = ds["test"].add_column("baseline_pred", [[0]*len(tokens) for tokens in ds["test"]["tokens"]])

print("\nBaseline sample predictions:")
sample_df = pd.DataFrame({
    "Token": ds["test"]["tokens"][0],
    "POS tag": ds["test"]["pos_tags"][0],
    "True": [label_list[t] for t in ds["test"]["tags"][0]],
    "Pred": [label_list[p] for p in ds["test"]["baseline_pred"][0]]
}).head(20)
print(sample_df)

print("\nBaseline Classification Report:")
true_tags = [t for tags in ds["test"]["tags"] for t in tags]
pred_tags = [p for pred in ds["test"]["baseline_pred"] for p in pred]
print(classification_report(true_tags, pred_tags, target_names=label_list, zero_division=0, digits=4))

# Simple CRF Model

In [None]:
!pip install -q sklearn-crfsuite

import sklearn_crfsuite
from sklearn_crfsuite import CRF
from sklearn_crfsuite import metrics

# My feature dictionary for all tokens
def extract_token_features(token, pos):
    return {
        "word.lower()": token.lower(),
        "word.isupper()": token.isupper(),
        "word.istitle()": token.istitle(),
        "word.isdigit()": token.isdigit(),
        "postag": pos,
    }

def sent_to_features(tokens, pos_tags):
    return [extract_token_features(tok, pos) for tok, pos in zip(tokens, pos_tags)]

# method to extract label names from tag values
def sent_to_labels(tag_ids):
    return [label_list[tag] for tag in tag_ids]

# prepare data
X_train = [sent_to_features(tokens, pos_tags) for tokens, pos_tags in zip(ds["train"]["tokens"], ds["train"]["pos_tags"])]
y_train = [sent_to_labels(tags) for tags in ds["train"]["tags"]] # convert id2str

X_test = [sent_to_features(tokens, pos_tags) for tokens, pos_tags in zip(ds["test"]["tokens"], ds["test"]["pos_tags"])]
y_test = [sent_to_labels(tags) for tags in ds["test"]["tags"]]

# CRF model
crf = CRF()
crf.fit(X_train, y_train)
y_pred = crf.predict(X_test)

print("\nCRF Classification Report:")
print(metrics.flat_classification_report(y_test, y_pred, digits=4))

# DistilBERT Model

In [None]:
from transformers import DistilBertForTokenClassification, DistilBertTokenizerFast, Trainer, TrainingArguments
import numpy as np
import torch

# use GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load model and tokenizer
model = DistilBertForTokenClassification.from_pretrained("distilbert-base-uncased", num_labels=len(label_list)).to(device)
tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

data_collator = DataCollatorForTokenClassification(tokenizer)

train_dataset = tokenized_ds["train"]
eval_dataset = tokenized_ds["test"]

training_args = TrainingArguments(
    output_dir="./ner_output",
    eval_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    learning_rate=2e-5,
    weight_decay=0.01,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=tokenizer,
    data_collator = data_collator
)

trainer.train()

results = trainer.evaluate()
print("Eval Results:", results)

predictions, label_ids, _ = trainer.predict(eval_dataset)
pred_labels = np.argmax(predictions, axis=2)

true_labels = [[label_list[l] for l in label_ids[i]] for i in range(len(label_ids))]
pred_labels = [[label_list[l] for l in pred_labels[i]] for i in range(len(pred_labels))]

# flat_true = [item for sublist in true_labels for item in sublist]
# flat_pred = [item for sublist in pred_labels for item in sublist]
# print(classification_report(flat_true, flat_pred, digits=4))

flat_true = []
flat_pred = []

for i in range(len(label_ids)):
    for j in range(len(label_ids[i])):
        if label_ids[i][j] != -100:
            flat_true.append(label_list[label_ids[i][j]])
            flat_pred.append(pred_labels[i][j])

print(classification_report(flat_true, flat_pred, digits=4))

# save model
from google.colab import drive
drive.mount('/content/drive')

save_path = "/content/drive/My Drive/Colab Notebooks/CW_MedNER_UG/distilbert_ner_model"
os.makedirs(save_path, exist_ok=True)
model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)
print("Model saved to:" + str(save_path))

# uncomment code below to load model:
# from transformers import DistilBertForTokenClassification, DistilBertTokenizerFast
# model = DistilBertForTokenClassification.from_pretrained(save_path)
# tokenizer = DistilBertTokenizerFast.from_pretrained(save_path)