In [None]:
import numpy as np
import os
import pandas as pd
import seaborn as sns
from ast import literal_eval
import itertools
from itertools import chain
from ast import literal_eval 

import torch
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.model_selection import train_test_split

from transformers import AutoTokenizer, AutoModel

# Suppress warnings
import warnings
warnings.filterwarnings("ignore")

os.environ['TOKENIZERS_PARALLELISM'] = "false"

device = torch.device('cuda')

https://www.kaggle.com/code/iamsdt/pytorch-bert-baseline-nbme/notebook

https://www.kaggle.com/code/odins0n/nbme-detailed-eda

https://www.kaggle.com/code/tomohiroh/nbme-bert-for-beginners

In [None]:
BASE_PATH= '../input/nbme-score-clinical-patient-notes'

def preprocess_txt(text):
    text = text.replace('FHx', 'FH ')
    text = text.replace('FHX', 'FH ')
    text = text.replace('PMHx', 'PMH ')
    text = text.replace('PMHX', 'PMH ')
    text = text.replace('SHx', 'SH ')
    text = text.replace('SHX', 'SH ')
    text = text.lower()
    return text

def load_and_prepare(path):
    patient_notes = pd.read_csv(path + "/patient_notes.csv")
    features = pd.read_csv(path + "/features.csv")
    train = pd.read_csv(path + "/train.csv")
    df = train.merge(patient_notes, how = "left")
    df = df.merge(features, how = "left")

    df['pn_history']=df['pn_history'].apply(preprocess_txt)
    df['feature_text']=df['feature_text'].apply(preprocess_txt)
    
    df["annotation_list"] = [literal_eval(x) for x in df["annotation"]]
    df["location_list"] = [literal_eval(x) for x in df["location"]]
    
    df = df.loc[df["annotation"] != "[]"].copy().reset_index(drop = True)
    

    return df


Exploring the dataset:
We can see there are 9 unique cases. What we are using to train the model are:  the case_num (0 ~ 9, each num belongs their group),pn_num - through pn_num, (detailed history of patient),feature_text(symptons of patient). What we want to predict is the annotation and location. 

Description from hosts

train.csv - Feature annotations for 1000 of the patient notes, 100 for each of ten cases.
id - Unique identifier for each patient note / feature pair.
pn_num - The patient note annotated in this row.
feature_num - The feature annotated in this row.
case_num - The case to which this patient note belongs.
annotation - The text(s) within a patient note indicating a feature. A feature may be indicated multiple times within a single note.
location - Character spans indicating the location of each annotation within the note. Multiple spans may be needed to represent an annotation, in which case the spans are delimited by a semicolon ;.

In [None]:
train_df= load_and_prepare(BASE_PATH)

Exploring the annotations for a particular patient using spacy:

In [None]:
import spacy 
patient_df = train_df[train_df["case_num"] == 0]
location  = patient_df["location"]
annotation = patient_df["annotation"]
start_pos = []
end_pos = []
for i in location:
    for j in i:
        start_pos.append(j.split()[0])
        end_pos.append(j.split()[1])
        
ents = []
for i in range(len(start_pos)):
    ents.append({
        'start': int(start_pos[i]), 
        'end' : int(end_pos[i]),
        "label" : "Annotation"
    })
doc = {
    'text' : train_df[train_df["case_num"] == 0]["pn_history"].iloc[0],
    "ents" : ents
}
colors = {"Annotation" :"linear-gradient(90deg, #aa9cfc, #fc9ce7)" } 
options = {"colors": colors}
spacy.displacy.render(doc, style="ent", options = options , manual=True, jupyter=True);

In [None]:
blank_annotations = df["annotation"] == "[]"
blank_locations = df["location"] == "[]"
both_blank = (df["annotation"] == df["location"]) & blank_annotations
print(sum(blank_annotations), sum(blank_locations), sum(both_blank))

There is a lot of missing annotations, meaning this is a semi-supervised/ unsupervised problem. One way we could tackle this, and use this information is through pseudo labelling. For now, I'll remove these.

In [None]:
def list_to_int(loc_list):
    new=[]
    for str_ in loc_list:
        strs= str_.split(';')
        for loc in strs:
            start,end = loc.split():
            new.append((int(start),int(end)))
            
    return new 
    
    #Returns tuple with the start and end positions for the annotations

QA model, from hugging face- 3 different tokens (special tokens, question tokens, and context tokens). The [CLS] (a special token) comes at the start of the the question. The [sep] (a special token) comes at the start of the context tokens. Each token is then passed through a transfromer encoder and produces a vecotr called a hidden state. Feed them through a linear layer, and then we are left with probabilities that these tokens are the start/end toekn associated with the answer. Training--> learns to classify which pair of tokens is the start/end. 
Each token gets 2 labels, a start and end label. If the first label for a token is 1, that indicates that this is the start. If the secondf label is a 1, that idicates this toke is an end token. 
Offset is the character positioning. 

In [None]:
MODEL_NAME = "../input/huggingface-bert/bert-base-uncased" 
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

The tokenizer returns a dictionary with all the arguments necessary for its corresponding model to work properly. The token indices are under the key “input_ids”:
Note that the tokenizer automatically adds “special tokens” (if the associated model relies on them) which are special IDs the model sometimes uses. (These are the CLS, SEP tokens mentioned earlier). 
The attention mask is a binary tensor indicating the position of the padded indices so that the model does not attend to them. For the BertTokenizer, 1 indicates a value that should be attended to, while 0 indicates a padded value. This attention mask is in the dictionary returned by the tokenizer under the key “attention_mask”:

In [None]:
def tokenize_labels(tokenizer, example):  #Returns a dictionary with: input_ids, attention masks, offset_mapping and token_type ids. We then add sequence_ids, location int and labels. 
    tokenized_inputs = tokenizer(
        question= example["feature_text"], #features are the question
        context= example["pn_history"], #patient history is the context, annotations are the answers
        truncation = "only_second",
        max_length = 416, # max length is 406, dont need to return overflowing tokens as 406<512
        padding = "max_length",
        return_offsets_mapping = True #Allow us to compute the start and end positions of annotation
    )
    labels = [0.0] * len(tokenized_inputs["input_ids"]) 
    
    tokenized_inputs["location_int"] = list_to_int(example["location_list"])
    tokenized_inputs["sequence_ids"] = tokenized_inputs.sequence_ids()   #add 2 sequences, question and context. If seq_id is None, that token is a special token (or padding). It can be either 1 (for context) or 0 (for q)

    for idx, (seq_id, offsets) in enumerate(zip(tokenized_inputs["sequence_ids"], tokenized_inputs["offset_mapping"])):
        if seq_id is None or seq_id == 0:
            labels[idx] = -1          
            continue
        
        token_start, token_end = offsets
        for feature_start, feature_end in tokenized_inputs["location_int"]:
            if token_start >= feature_start and token_end <= feature_end:
                labels[idx] = 1.0
                break
    tokenized_inputs["labels"] = labels
    
    return tokenized_inputs

In [None]:
def location_preds(preds, offset_mapping, sequence_ids, test=False):
    all_preds=[]
    for pred, offsets, seq_ids in zip(preds, offset_mapping, sequence_ids): #for batch in batches
        pred= 1/(1+np.exp(-pred))  #sigmoid function->> outputs a probability (between 0 and 1), used for binary classification
        start_idx= None
        end_idx= None
        current_preds=[]
        for pred, offset, seq_id in zip(pred, offsets, seq_ids): #for each token 
            if seq_id is None or seq_id == 0:   #i.e it is a special token
                continue
                
            if pred > 0.5:
                if start_idx is None:  #if none, then this offset will be the first 
                    start_idx = offset[0]
                end_idx = offset[1] #unless another token with pred>0.5 comes after, final offset
            elif start_idx is not None:
                if test:
                    current_preds.append(f"{start_idx} {end_idx}")
                else:
                    current_preds.append((start_idx, end_idx))
                start_idx = None
        if test:
            all_predictions.append("; ".join(current_preds)) #if more than 1 annotation
        else:
            all_predictions.append(current_preds)
            
    return all_predictions

In [None]:
def calculate_char_cv(predictions, offset_mapping, sequence_ids, labels):
    all_labels = []
    all_preds = []
    for preds, offsets, seq_ids, labels in zip(predictions, offset_mapping, sequence_ids, labels):

        num_chars = max(list(chain(*offsets))) #MAX-> the maximum number= the number of charcaters, as max offset is the end.  
        char_labels = np.zeros(num_chars) #remember location indices only wrt to characters, not tokens

        for o, s_id, label in zip(offsets, seq_ids, labels):
            if s_id is None or s_id == 0:
                continue
            if int(label) == 1:
                char_labels[o[0]:o[1]] = 1

        char_preds = np.zeros(num_chars)

        for start_idx, end_idx in preds:
            char_preds[start_idx:end_idx] = 1

        all_labels.extend(char_labels)
        all_preds.extend(char_preds)

    results = precision_recall_fscore_support(all_labels, all_preds, average="binary", labels=np.unique(all_preds))
    accuracy = accuracy_score(all_labels, all_preds)
    

    return {
        "Accuracy": accuracy,
        "precision": results[0],
        "recall": results[1],
        "f1": results[2]
    }

Compute precision, recall, F-measure and support for each class.

The precision is the ratio tp / (tp + fp) where tp is the number of true positives and fp the number of false positives. The precision is intuitively the ability of the classifier not to label a negative sample as positive.

The recall is the ratio tp / (tp + fn) where tp is the number of true positives and fn the number of false negatives. The recall is intuitively the ability of the classifier to find all the positive samples.

The F-beta score can be interpreted as a weighted harmonic mean of the precision and recall, where an F-beta score reaches its best value at 1 and worst score at 0.

The F-beta score weights recall more than precision by a factor of beta. beta == 1.0 means recall and precision are equally important.

The support is the number of occurrences of each class in y_true.

Loading the data in

In [None]:
class NBMEDataset(Dataset):
    def __init__(self,data, tokenizer):
        self.data=data
        self.tokenizer= tokenizer
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        example = self.data.loc[idx]
        tokenized = tokenize_labels(self.tokenizer, example)

        input_ids = np.array(tokenized["input_ids"]) # for input BERT
        attention_mask = np.array(tokenized["attention_mask"]) # for input BERT
        labels = np.array(tokenized["labels"]) # to calculate loss and cv score

        offset_mapping = np.array(tokenized["offset_mapping"]) # to calculate cv score
        sequence_ids = np.array(tokenized["sequence_ids"]).astype("float16") # to calculate cv score
        
        return input_ids, attention_mask, labels, offset_mapping, sequence_ids

In [None]:
class NBMEModel(Module):
    def __init__(self):
        super().__init__()
        self.bert = AutoModel.from_pretrained(MODEL_NAME)
        self.dropout = torch.nn.Dropout(p = 0.2)
        
        self.layer = torch.nn.Linear(768, 1) # BERT has last_hidden_state(size: sequence_length, 768)
    
    def forward(self, input_ids, attention_mask):
        last_hidden_state = self.bert(input_ids = input_ids, attention_mask = attention_mask,token_type_ids=token_type_ids)[0] 
        logits = self.layer(self.dropout(last_hidden_state)).squeeze(-1)
        return logits

In [None]:
X_train, X_test = train_test_split(train_df, test_size=0.2,
                                   random_state=42)


In [None]:
trainig_data= NBMEDataset(X_train, tokenizer)
train_dataloader = DataLoader(training_data, batch_size=8, shuffle=True)
valid_data= NBMEDataset(X_test, tokenizer, test=True )
model = NBMEModel().to(DEVICE)
criterion = torch.nn.BCEWithLogitsLoss(reduction='none')
optimizer = torch.optim.AdamW(model.parameters(), lr = 1e-5)

This loss combines a Sigmoid layer and the BCELoss in one single class. This version is more numerically stable than using a plain Sigmoid followed by a BCELoss as, by combining the operations into one layer, we take advantage of the log-sum-exp trick for numerical stability.

The algorithm predicts the probability of each token being relevant (part of annotation). We then compare the predictions with the labels (correct values for each token) for the loss. To obtain the actual words, we need to extract the characters, which we can do using the offset mapping (for the relevant tokens), which we can the use to index the feature_text, and get the phrase, or compare the charcaters labels (0 or 1).


In [None]:
def train(model, dataloader, optimizer, criterion):
    model.train()
    train_loss=[]
    
    for batch in tqdm(dataloader):
        optimizer.zero_grad()
        inputs_ids= batch[0].to(device)
        attention_mask=batch[1].to(device)
        labels= batch[2].to(device)
        offset_mapping= batch[3]
        sequence_ids= batch[4]
        
        logits = model(input_ids, attention_mask)
        loss= criterion(logits, labels)
        loss = torch.masked_select(loss, labels > -1).mean()  #only selects the valid tokens
        loss.backward()
        optimizer.step()

    return sum(train_loss)/len(train_loss)

In [None]:
def valid(model, dataloader, criterion):
    model.eval()
    valid_loss=[]
    preds=[]
    offsets=[]
    seq_ids=[]
    valid_labels=[]

    for batch in tqdm(dataloader):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            token_type_ids = batch[2].to(device)
            labels = batch[3].to(device)
            offset_mapping = batch[4]
            sequence_ids = batch[5]

            logits = model(input_ids, attention_mask)
            loss = criterion(logits, labels) #does sigmoid for us
            loss = torch.masked_select(loss, labels > -1.0).mean()
            valid_loss.append(loss.item() * input_ids.size(0))

            preds.append(logits.detach().cpu().numpy())
            offsets.append(offset_mapping.numpy())
            seq_ids.append(sequence_ids.numpy())
            valid_labels.append(labels.detach().cpu().numpy())

    preds = np.concatenate(preds, axis=0)
    offsets = np.concatenate(offsets, axis=0)
    seq_ids = np.concatenate(seq_ids, axis=0)
    valid_labels = np.concatenate(valid_labels, axis=0)
    location_preds = get_location_predictions(preds, offsets, seq_ids, test=False)
    score = calculate_char_cv(location_preds, offsets, seq_ids, valid_labels)


In [None]:
for i in range(3):
    print("Epoch: {}/{}".format(i + 1, epochs))
    # first train model
    train_loss = train(model, train_dataloader, optimizer, criterion)
    train_loss_data.append(train_loss)
    print(f"Train loss: {train_loss}")
    # evaluate model
    valid_loss, score = valid(model, test_dataloader, criterion)
    valid_loss_data.append(valid_loss)
    score_data_list.append(score)
    print(f"Valid loss: {valid_loss}")
    print(f"Valid score: {score}")
    

For criterion, BCELoss (1 vs 0), for each token