In [None]:
from transformers import Trainer, TrainingArguments, DataCollatorForTokenClassification,  RobertaForTokenClassification, RobertaTokenizerFast, pipeline,AutoTokenizer, AutoModelForMaskedLM, EarlyStoppingCallback
import nltk
from nltk.tokenize import word_tokenize
import pandas as pd  
from functools import reduce
import torch
import os
import warnings
warnings.filterwarnings("ignore")
import evaluate
from evaluate import load  
from datasets import Dataset
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt


In [None]:
tokenizer = RobertaTokenizerFast.from_pretrained('PlanTL-GOB-ES/roberta-base-biomedical-clinical-es',add_prefix_space=True)
tokenizer.model_max_length = 512

In [None]:
df = pd.read_json(path_or_buf="HUFA_Corpus.jsonl", lines=True)
df

In [None]:
def entities_to_bio(text, entities):
    words = word_tokenize(text)
    bio_tags = ['O'] * len(words)
    end_char_prev=-1 
    cont=0
    for entity in entities:
        start_char, end_char, entity_type = entity
        if start_char>end_char_prev:
            end_char_prev=end_char
            select=text[start_char:end_char]
            tokens=word_tokenize(select)
            firstIn=True
            
            for i in tokens:
                I=True
                aux=-1
                for l in range(cont,len(words)):
                    if words[l]==i and firstIn==True and aux<cont:
                        bio_tags[l]=f'B-{entity_type}'
                        firstIn=False
                        cont=l+1
                        aux=len(words)
                    elif words[l]==i and I==True and aux<cont:
                        bio_tags[l]=f'I-{entity_type}'
                        cont=l+1
                        I=False
                        aux=len(words)
    return bio_tags

In [None]:
utterances = []
tokenized_utterances=[]
labels_for_tokens = []
for i in df.index:
    labels_for_tokens.append(entities_to_bio(df['text'][i], df['label'][i]))
    utterances.append(df['text'][i])
    tokenized_utterances.append(word_tokenize(df['text'][i])) 


In [None]:
unique_token_labels = list(set(reduce(lambda x, y: x + y, labels_for_tokens)))
labels_for_tokens = [[unique_token_labels.index(_) for _ in l] for l in labels_for_tokens]
len(unique_token_labels)

In [None]:
data = Dataset.from_dict(
    dict(
        utterance=utterances,
        tokens=tokenized_utterances,
        token_labels=labels_for_tokens
    )
)
data = data.train_test_split(test_size=0.2)

In [None]:
# -100 is reserved for labels where we do not want to calculate losses.
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    cont=0
    labels = []
    for i, label in enumerate(examples[f"token_labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i) 
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:  
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)  
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
tok_clf_tokenized = data.map(tokenize_and_align_labels, batched=True)

In [None]:
tok_clf_tokenized['train'] = tok_clf_tokenized['train'].remove_columns(
    ['utterance', 'tokens', 'token_labels']
)

tok_clf_tokenized['test'] = tok_clf_tokenized['test'].remove_columns(
    ['utterance', 'tokens', 'token_labels']
)

tok_clf_tokenized

In [None]:
tok_data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
#Configuration to run on GPU
'''
device = torch.device("cpu")
torch.cuda.is_available = lambda: False
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
torch.backends.cudnn.enabled = False'''

In [None]:
tok_clf_model = RobertaForTokenClassification.from_pretrained(
    'PlanTL-GOB-ES/roberta-base-biomedical-clinical-es', num_labels=len(unique_token_labels)
)
tok_clf_model.config.id2label = {i: l for i, l in enumerate(unique_token_labels)}

In [None]:

os.environ["WANDB_DISABLED"] = "true"
if torch.cuda.is_available():
    print("Model is running on GPU.")
else:
    print("Model is running on CPU.")

In [None]:
label_list=labels_for_tokens

metric = load("seqeval")
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:

directory = f"Models_Results/PlanTL-GOB-ES-roberta-base-biomedical-clinical-es/"

if not os.path.exists(directory):
    os.makedirs(directory)
    print(f"Directory '{directory}' created.")

elements = os.listdir(directory)
folders = [element for element in elements if os.path.isdir(os.path.join(directory, element))]
folders.sort()

index = -1
if folders:
    last_folder = folders[-1]
    index = elements.index(last_folder)
    print(f"The last folder is '{last_folder}' and its index is {index}.")
else:
    print(f"No folders found in '{directory}'. Starting index from 0.")


task = "NER"
model_checkpoint = "PlanTL-GOB-ES-roberta-base-biomedical-clinical-es"
model_name = model_checkpoint.split("/")[-1]

file = f"Models_Results/{model_name}/{model_name}-finetuned-{task}-{index+1}"

print(f"File path: {file}")


In [None]:
epochs = 8

training_args = TrainingArguments(
    output_dir=file, 
    num_train_epochs=epochs,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01, 
    logging_steps=10,
    log_level='info',
    evaluation_strategy='epoch', 
    save_strategy='epoch', 
    load_best_model_at_end=True,  
    metric_for_best_model='f1', 
    greater_is_better=True,  
)

trainer = Trainer(
    model=tok_clf_model,
    args=training_args,
    train_dataset=tok_clf_tokenized['train'],
    eval_dataset=tok_clf_tokenized['test'],
    data_collator=tok_data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]  

)

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.save_model()
tokenizer.save_pretrained(file)

In [None]:
predictions, labels, _ = trainer.predict(tok_clf_tokenized['test'])

predicted_labels = np.argmax(predictions, axis=2)

true_labels = labels

true_labels_flat = []
predicted_labels_flat = []

for true, pred in zip(true_labels, predicted_labels):
    for t, p in zip(true, pred):
        if t != -100:
            true_labels_flat.append(t)
            predicted_labels_flat.append(p)


conf_matrix = confusion_matrix(true_labels_flat, predicted_labels_flat, labels=np.arange(len(unique_token_labels)))

mask = np.eye(len(unique_token_labels), dtype=bool)

plt.figure(figsize=(14, 12))

sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=unique_token_labels, yticklabels=unique_token_labels, cbar=True, annot_kws={"size": 11})

sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='coolwarm', mask=~mask, cbar=False, xticklabels=unique_token_labels, yticklabels=unique_token_labels, annot_kws={"size": 11})

plt.xlabel('Predicted Labels', fontsize=14)
plt.ylabel('True Labels', fontsize=14)
plt.title('Confusion Matrix', fontsize=16)
plt.xticks(rotation=45, ha='right')  
plt.yticks(rotation=0)  
plt.tight_layout() 
plt.show()
report = classification_report(true_labels_flat, predicted_labels_flat, target_names=unique_token_labels)
print(report)
