# Fine-tune a NegBERT for negation scope classification

In [1]:
import transformers
import pandas as pd

In [2]:
task = "negation_scope"
model_checkpoint = "bert-base-uncased" # 'distilbert-base-uncased' for speed.
batch_size = 16 # Set batch size for training

In [3]:
import datasets
from datasets import load_dataset
dataset = load_dataset("dannashao/sem2012forNegbert")

## Preprocess
Using the pre-trained AutoTokenizer with the given model to tokenize. Added special marks (-100) to the beginning and ending of sentences.

Some functions are adjusted from https://github.com/huggingface/notebooks/blob/main/examples/token_classification.ipynb

In [4]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast)

In [5]:
def tokenize_and_align_labels(examples, label_all_tokens=True):
    '''
    Tokenize text input with options for truncation and handling pre-split words.
    '''
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    all_word_ids = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:  # Tokens like [CLS], [SEP] have no word id, label them as -100 (ignored by model).
                label_ids.append(-100)
            elif word_idx != previous_word_idx: 
                label_ids.append(label[word_idx])
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)
        all_word_ids.append(word_ids)

    tokenized_inputs['word_ids'] = all_word_ids
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [6]:
label_all_tokens = True 

tokenized_train = dataset['train'].map(tokenize_and_align_labels, batched=True, batch_size=1)
tokenized_dev = dataset['dev'].map(tokenize_and_align_labels, batched=True)
tokenized_test = dataset['test'].map(tokenize_and_align_labels, batched=True)

Map:   0%|          | 0/815 [00:00<?, ? examples/s]

In [7]:
# Filter out task-specific tags from tokenized datasets for training and development.
tokenized_train = [{k: v for k, v in x.items() if k != f'{task}_tags'} for x in tokenized_train]
tokenized_dev = [{k: v for k, v in x.items() if k != f'{task}_tags'} for x in tokenized_dev]

## Load model and metric

In [None]:
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer

In [9]:
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=2) # 2 labels are True/False for in negation scope. due to the conversion above they are 0/1.

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [10]:
model_name = model_checkpoint.split("/")[-1]

# Set up training arguments for fine-tuning the model.
args = TrainingArguments(
    f"{model_name}-finetuned-{task}",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=3,
    weight_decay=0.01,
    push_to_hub=False,
)

In [11]:
from transformers import DataCollatorForTokenClassification
data_collator = DataCollatorForTokenClassification(tokenizer)

For evaluation, we use a self-defined span metric that measures span agreements

In [12]:
import evaluate
metric = evaluate.load("dannashao/span_metric", module_type="metric")

In [13]:
label_list = [0,1] # IS IN NEGATION SCOPE OR NOT

## Train and evaluate

In [14]:
import numpy as np

In [15]:
def remove_ignored_index(predictions,labels):
    '''
    Remove the labels with -100
    '''
    actual_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    actual_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    return actual_predictions, actual_labels

In [16]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2) # Most possible label

    # Remove ignored index (special tokens)
    actual_predictions, actual_labels = remove_ignored_index(predictions,labels)
    
    results = metric.compute(predictions=actual_predictions, references=actual_labels)
    return {
        #"accuracy": results["overall_accuracy"],
        "token_precision":results["token_precision"], "token_recall":results["token_recall"], "token_f1":results["token_f1"],
        "span_precision":results["span_precision"], "span_recall":results["span_recall"], "span_f1":results["span_f1"]
    }

In [17]:
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_dev,
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [18]:
trainer.train()

Epoch,Training Loss,Validation Loss,Token Precision,Token Recall,Token F1,Span Precision,Span Recall,Span F1
1,No log,0.062407,0.912065,0.836773,0.872798,0.52071,0.52071,0.52071
2,No log,0.068152,0.936575,0.831144,0.880716,0.60119,0.60119,0.60119
3,0.072200,0.061791,0.918989,0.886804,0.90261,0.625,0.625,0.625


TrainOutput(global_step=711, training_loss=0.055278591633513674, metrics={'train_runtime': 28.1824, 'train_samples_per_second': 402.272, 'train_steps_per_second': 25.229, 'total_flos': 295237776469368.0, 'train_loss': 0.055278591633513674, 'epoch': 3.0})

### Predict

In [19]:
predictions, labels, _ = trainer.predict(tokenized_test)

# Convert raw predictions (logits) to predicted label indices by selecting the maximum logit for each token.
predictions = np.argmax(predictions, axis=2)

Note that we need to detokenize to align the predictions back with labels:

In [20]:
def detokenize(predictions, tokenized_test):
    actual_predictions, actual_labels = [], []
    for p, t in zip(predictions, tokenized_test):
        preds, trues, pred, word_idx = [],[],[],0
        for i, (token_pred, id) in enumerate(zip(p, t['word_ids'])):
            if id is None:
                continue
            if id != word_idx:
                # If any predication of the subtokens is True, the prediction is set to True.
                preds.append(int(any(pred)))
                pred = [token_pred]
                trues.append(t['negation_scope_tags'][word_idx])
                word_idx = id
            else:
                pred.append(token_pred)

        #print(len(trues), len(preds))
        actual_labels.append(trues)
        actual_predictions.append(preds)
        
    return actual_labels, actual_predictions

In [21]:
# Detokenize to align predictions with actual labels.
actual_labels, actual_predictions = detokenize(predictions, tokenized_test)

# Compute evaluation metrics using the detokenized predictions and labels.
results = metric.compute(predictions=actual_predictions, references=actual_labels)
results

{'token_precision': 0.9458041958041958,
 'token_recall': 0.8888280394304491,
 'token_f1': 0.9164313946922643,
 'span_precision': 0.6852589641434262,
 'span_recall': 0.6852589641434262,
 'span_f1': 0.6852589641434262}

In [22]:
result_df = pd.DataFrame(columns=['Sentence', 'Labels', 'Prediction'])

for p, t, ds in zip(actual_predictions, actual_labels, dataset['test']):
    # Check if the prediction does not match the true label.
    if p != t:
        # Add the entry to the DataFrame: original sentence, true label, and predicted label.
        result_df.loc[len(result_df)] = [' '.join(ds['tokens']), t, p]

## Error Analysis
A nice visulization for error analysis

In [23]:
from termcolor import colored, cprint

We will need the original test data that was not detokenized to print colors

In [24]:
old_test = datasets.load_from_disk('old_test')

result_df = pd.DataFrame(columns=['Sentence_retokenized', 'Labels', 'Prediction', 'Sentence', 'Cue'])
for p, t, ds, ds2 in zip(actual_predictions, actual_labels, dataset['test'], old_test):
    if p != t:
        result_df.loc[len(result_df)] = [ds['tokens'], t, p, 
                                         ds2['tokens'], ds2['is_neg']] # For finding the negation cue

In [25]:
def color_sentences(df):
    '''
    Color the mispredicted sentences.
    The negation cue cannot be directly obtained due to the re-tokenization and thus is provided by the context.
    '''
    print("Labels:")
    cprint("The Cue", "black", end=" ",attrs=["underline","reverse"])
    cprint("True Scope", "black", "on_green", end=" ")
    cprint("Correct Prediction", "black", "on_yellow", end=" ")
    cprint("False Positive", "black", "on_red", end=" ",attrs=["blink"])
    cprint("False Negative", "white", "on_light_red", end=" ",attrs=["blink"])
    print("\nNegation cue is the middle word of Cue context.")
    print("\n\n")
    
    for row in range(len(df)):
        print(row)
        
        # Coloring the negation cue and print the context
        c = df['Cue'][row]
        so = df['Sentence'][row]
        cue_phrase, cue_len, ending = '', 0, 0
        for i in range(len(c)): 
            if c[i] == True:
                cue_phrase = cue_phrase + so[i] + ' '
                cue_len +=1
                ending = i
        print("Cue context: ...", so[ending-(cue_len)], end=" ")
        cprint(cue_phrase[:-1], "black", end=" ",attrs=["underline","reverse"])
        print(so[ending+1], "...")
        
        # Coloring the scope
        s = df['Sentence_retokenized'][row]
        t = df['Labels'][row]
        p = df['Prediction'][row]
        for i in range(len(s)-2): 
            if (t[i] == False):
                cprint(s[i+1], "black", "on_green", end=" ")
            if (t[i] == True):
                cprint(s[i+1], "black", end=" ")
        print()
        for i in range(len(s)-2):
            if (p[i] == False) & (p[i]==t[i]):
                cprint(s[i+1], "black", "on_yellow", end=" ")
            if (p[i] == True) & (p[i]==t[i]):
                cprint(s[i+1], "black", end=" ")
            if (p[i] == False) & (p[i] != t[i]):
                cprint(s[i+1], "black", "on_red", end=" ",attrs=["blink"])
            if (p[i] == True) & (p[i] != t[i]):
                cprint(s[i+1], "white", "on_light_red", end=" ",attrs=["blink"])
        print("\n")

In [26]:
color_sentences(result_df)

Labels:
[7m[4m[30mThe Cue[0m [42m[30mTrue Scope[0m [43m[30mCorrect Prediction[0m [5m[41m[30mFalse Positive[0m [5m[101m[97mFalse Negative[0m 
Negation cue is the middle word of Cue context.



0
Cue context: ... , [7m[4m[30mnor[0m do ...
[42m[30mWell[0m [42m[30m,[0m [42m[30mMrs.[0m [42m[30mWarren[0m [42m[30m,[0m [42m[30mI[0m [42m[30mcan[0m [42m[30mnot[0m [42m[30msee[0m [42m[30mthat[0m [42m[30myou[0m [42m[30mhave[0m [42m[30many[0m [42m[30mparticular[0m [42m[30mcause[0m [42m[30mfor[0m [42m[30muneasiness[0m [42m[30m,[0m [42m[30m[NEG] nor[0m [42m[30mdo[0m [30mI[0m [30munderstand[0m [30mwhy[0m [30mI[0m [30m,[0m [30mwhose[0m [30mtime[0m [30mis[0m [30mof[0m [30msome[0m [30mvalue[0m [30m,[0m [30mshould[0m [30minterfere[0m [30min[0m [30mthe[0m [30mmatter[0m 
[43m[30mWell[0m [43m[30m,[0m [43m[30mMrs.[0m [43m[30mWarren[0m [43m[30m,[0m [43m[30mI[0m [43m[30mcan[0m 

In [27]:
from huggingface_hub import notebook_login

In [28]:
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [29]:
trainer.push_to_hub()

training_args.bin:   0%|          | 0.00/4.28k [00:00<?, ?B/s]

events.out.tfevents.1715556978.arimo-ThinkBook-14-G5-IRH.48263.0:   0%|          | 0.00/6.75k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/436M [00:00<?, ?B/s]

Upload 3 LFS files:   0%|          | 0/3 [00:00<?, ?it/s]

CommitInfo(commit_url='https://huggingface.co/dannashao/bert-base-uncased-finetuned-negation_scope/commit/cf0af70a34c35f378bf4c70c6961484b58337269', commit_message='End of training', commit_description='', oid='cf0af70a34c35f378bf4c70c6961484b58337269', pr_url=None, pr_revision=None, pr_num=None)