## Imports and constants

In [1]:
import torch
import transformers
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import DataCollatorForTokenClassification
from transformers import EarlyStoppingCallback, IntervalStrategy
from datasets import load_metric
from datasets import Dataset
import numpy as np
import pandas as pd
from huggingface_hub import notebook_login

In [2]:
PATH_TRAIN = "train.bio"
PATH_DEV = "dev.bio"
PATH_TEST = "test.bio"
label_all_tokens = True

### Log in huggingface account to save the model there

You have to make a huggingface account and then go [here](https://huggingface.co/settings/tokens), to get a token.  
Then log in so that you can save the trained models to your huggingface account and be able to load them easily.

In [3]:
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [3]:
transformers.set_seed(21)

## Loading the data

In [4]:
def read_bio_file(path):
    
    data = []
    current_words = []
    current_tags = []

    with open(path, "r", encoding="utf-8") as f:
        lines = f.readlines()[2:]
        
    for line in lines:
        
        line = line.strip()
        
        if line: # if line is not an empty line
            tok = line.split('\t')
            current_words.append(tok[0])
            current_tags.append(tok[3])
            
        else:
            if current_words:
                data.append((current_words, current_tags))
            current_words = []
            current_tags = []
            
            
    if current_tags != []:
        data.append((current_words, current_tags))

    df = pd.DataFrame(data, columns=['words', 'tags'])
    df['id'] = df.index
    df = df[['id', 'words', 'tags']]
    
    return df

In [11]:
train_data = read_bio_file(PATH_TRAIN)
dev_data = read_bio_file(PATH_DEV)
test_data = read_bio_file(PATH_TEST)

## Create Tag/index dictionary

This is a dictionary containing all of the tags mapped to indices so we can easily swap between ids of the tags/labels and their string representation.

In [6]:
class Vocab():
    def __init__(self, pad_unk='<PAD>'):
        self.pad_unk = pad_unk
        self.word2idx = {}
        self.idx2word = []

    def getIdx(self, word, add=False):
        if word is None or word == self.pad_unk:
            return None
        if word not in self.word2idx:
            if add:
                idx = len(self.idx2word)
                self.word2idx[word] = idx
                self.idx2word.append(word)
                return idx
            else:
                return None
        return self.word2idx[word]

    def getWord(self, idx):
        return self.idx2word[idx]

label_indices = Vocab()
tags_column = train_data["tags"]

for tags in tags_column:
    for tag in tags:
        label_indices.getIdx(tag, add=True)

print(label_indices.word2idx)

{'O': 0, 'B-ORG': 1, 'B-PER': 2, 'B-LOC': 3, 'I-PER': 4, 'B-MISC': 5, 'I-ORG': 6, 'I-MISC': 7, 'I-LOC': 8}


We add a new column called "tag_idx" where we save the list of labels with their id representations using the dictionary from above

In [12]:
train_data['tag_idx'] = train_data['tags'].apply(lambda x: [label_indices.word2idx[tag] for tag in x])
dev_data['tag_idx'] = dev_data['tags'].apply(lambda x: [label_indices.word2idx[tag] for tag in x])
test_data['tag_idx'] = test_data['tags'].apply(lambda x: [label_indices.word2idx[tag] for tag in x])

model_checkpoint = "distilbert/distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, padding=True)

## Tokenization

Convert the sentences to lists of tokens using the tokenizer from distilbert from huggingface.  
The output is a dictionary where we have these lists of tokens in an entry called "words", and make sure that the labels in the label column correctly correspond to the newly tokenized subwords, where these tags are stored in an entry called "tag_idx".

In [15]:
def tokenize_and_align_labels(dataset, word_column, tag_column, tokenizer):
    tokenized_inputs = tokenizer(dataset[word_column].tolist(), truncation=True, is_split_into_words=True, padding = True)

    labels = []
    for i, label in enumerate(dataset[tag_column]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs.data

In [16]:
tokenized_data = tokenize_and_align_labels(train_data, "words", "tag_idx", tokenizer)
tokenized_dev_data = tokenize_and_align_labels(dev_data, "words", "tag_idx", tokenizer)
tokenized_test_data = tokenize_and_align_labels(test_data, "words", "tag_idx", tokenizer)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


## Model implementation

### Load the pretrained distilbert base uncased language model from huggingface

In [19]:
label_list = label_indices.idx2word
batch_size = 16

assert isinstance(tokenizer, transformers.PreTrainedTokenizerFast) # verifies the tokenizers compatibility with hugging face

model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))
#model_name = model_checkpoint.split("/")[-1]
model_name = "distilbert-base-uncased-no-perturb-early-stopping"

Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at distilbert/distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### Define the various metrics that should be evaluated and printed at training

In [20]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

## Training

### Convert the data to the "datasetdict" format to make it work with the huggingface trainer

In [21]:
# turning the data into datasetdicts, to make them compatible with the trainer (otherwise they can't be indexed)
train_dataset = Dataset.from_dict({
    'id': range(len(tokenized_data['input_ids'])),
    'input_ids': tokenized_data['input_ids'],
    'attention_mask': tokenized_data['attention_mask'],
    'labels': tokenized_data['labels']
})

dev_dataset = Dataset.from_dict({
    'id': range(len(tokenized_dev_data['input_ids'])),
    'input_ids': tokenized_dev_data['input_ids'],
    'attention_mask': tokenized_dev_data['attention_mask'],
    'labels': tokenized_dev_data['labels']
})

test_dataset = Dataset.from_dict({
    'id': range(len(tokenized_test_data['input_ids'])),
    'input_ids': tokenized_test_data['input_ids'],
    'attention_mask': tokenized_test_data['attention_mask'],
    'labels': tokenized_test_data['labels']
})

### Set the parameters for training

In [22]:
args = TrainingArguments(
   "train_on_perturbed", #model name in the huggingface account
   evaluation_strategy = "epoch", #whether to train on epochs or steps
   learning_rate=2e-5, #really small learning rate so we sh prob train more acc to rob
   num_train_epochs=5,
   metric_for_best_model = 'f1', #early stopping based on val span f1
   load_best_model_at_end=True,
   save_strategy="epoch")

data_collator = DataCollatorForTokenClassification(tokenizer)
metric = load_metric("seqeval")

  metric = load_metric("seqeval")
You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this metric from the next major release of `datasets`.


In [24]:
trainer = Trainer(
    model,
    args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


### Start training

In [25]:
trainer.train()

Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,0.162394,0.386838,0.38254,0.384677,0.958473
2,No log,0.138709,0.426367,0.48254,0.452718,0.961991
3,0.181700,0.146491,0.508117,0.496825,0.502408,0.966374
4,0.181700,0.150735,0.510046,0.52381,0.516836,0.966663
5,0.047100,0.152727,0.512977,0.533333,0.522957,0.967124


TrainOutput(global_step=1025, training_loss=0.11257464786855186, metrics={'train_runtime': 1530.585, 'train_samples_per_second': 5.354, 'train_steps_per_second': 0.67, 'total_flos': 127580269515930.0, 'train_loss': 0.11257464786855186, 'epoch': 5.0})

### Save the model locally

In [26]:
trainer.save_model("model_on_perturbed")

### Push the model to your huggingface account

In [27]:
trainer.push_to_hub("train_on_perturbed")

Upload 3 LFS files:   0%|          | 0/3 [00:00<?, ?it/s]

events.out.tfevents.1714993571.pav.22388.0:   0%|          | 0.00/7.98k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/265M [00:00<?, ?B/s]

training_args.bin:   0%|          | 0.00/4.92k [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/cria111/train_on_perturbed/commit/27c1e67595de836ebf6e257663e70a72db66d547', commit_message='train_on_perturbed', commit_description='', oid='27c1e67595de836ebf6e257663e70a72db66d547', pr_url=None, pr_revision=None, pr_num=None)

## Evaluation

Get metrics for your saved model (the saved model is the best one according to the evaluation metric we set)

In [28]:
trainer.evaluate()

{'eval_loss': 0.15272706747055054,
 'eval_precision': 0.5129770992366413,
 'eval_recall': 0.5333333333333333,
 'eval_f1': 0.5229571984435798,
 'eval_accuracy': 0.9671242357826739,
 'eval_runtime': 30.7423,
 'eval_samples_per_second': 23.095,
 'eval_steps_per_second': 2.895,
 'epoch': 5.0}

### Get formatted datasets for prediction

In [29]:
# evaluating using dev data
dev_dataset_new = Dataset.from_dict({
    'input_ids': dev_dataset['input_ids'],
    'attention_mask': dev_dataset['attention_mask'],
    'labels': dev_dataset['labels']
})

test_dataset_new = Dataset.from_dict({
    'input_ids': test_dataset['input_ids'],
    'attention_mask': test_dataset['attention_mask'],
    'labels': test_dataset['labels']
})

### Get predictions of the model on the test set

In [30]:
predictions, labels, _ = trainer.predict(test_dataset_new)
predictions = np.argmax(predictions, axis=2)

In [31]:
true_predictions = [
    [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]
true_labels = [
    [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]

### Map predicted labels on subwords back to the full words

In [38]:
def un_tok_labs(list_of_labels, list_of_words):
    tokenized_inputs = tokenizer(list_of_words, truncation=True, is_split_into_words=True)
    #print(tokenized_inputs)
    labels = []
    for i, label in enumerate(list_of_labels):
        #print(label)
        label_copy = label.copy()  # Create a copy of the label list

        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        #print(word_ids)
        #print(tokenizer.convert_ids_to_tokens(tokenized_inputs["input_ids"][i]))
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            #print("word_idx", word_idx)
            if word_idx is None:  # Only label the first token of a given word.
                continue
            elif word_idx == previous_word_idx:
                label_copy.pop(word_idx)
                continue
            else:
                label_ids.append(label_copy[word_idx])
            previous_word_idx = word_idx 
        labels.append(label_ids)
    return labels

In [39]:
test_words = read_bio_file(PATH_TEST)
test_words = list(test_words["words"])

untok_labs = un_tok_labs(true_predictions, test_words)

### Save model's predictions in an .iob2 formatted file

In [40]:
def save_preds(filename, tok, untok_labs):
    with open(filename, "w", encoding="utf-8") as f: 
        for idx, pair in enumerate(zip(tok, untok_labs)): 
            t, l = pair
            if len(t) != len(l):
                    print(idx)
                    print(t)
                    print(l)
            try:
                for i in range(len(t)): 
                    f.write(f"{i+1}\t{t[i]}\t{l[i]}\n")
            except:
                continue

            f.write("\n")
    return ("File has been saved")

In [41]:
filename = "preds_perturb_model.iob2"

save_preds(filename, test_words, untok_labs)

'File has been saved'