# RobBERT Model

### Importing necessary libraries


In [None]:
import pandas as pd
import re
import nltk
import numpy as np

from tqdm import tqdm
from collections import Counter, defaultdict
from datasets import load_metric
from transformers import RobertaTokenizerFast, RobertaTokenizer, RobertaForTokenClassification, AdamW, get_scheduler, pipeline, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import DataLoader, Dataset

from fuzzywuzzy import fuzz

## Load data and model

In [None]:
df = pd.read_csv('final_data.csv')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = RobertaTokenizer.from_pretrained('pdelobelle/robbert-v2-dutch-ner')
model = RobertaForTokenClassification.from_pretrained('pdelobelle/robbert-v2-dutch-ner', return_dict=True)
model.to(device)
model.eval()
print("RobBERT model loaded")

## Split into train and test

In [None]:
train, test = train_test_split(df, test_size=0.3, random_state=42)

print("Train set size:", len(train))
print("Test set size:", len(test))

## Pretrained Model

### Predict Organizations

In [None]:
def tokenize_and_predict(text):
    # tokenize the text
    encoded_input = tokenizer(text, truncation=True, padding='max_length', max_length=512, return_tensors="pt")
    
    # move tensors to the same device as model
    encoded_input = {key: val.to(model.device) for key, val in encoded_input.items()}
    
    # predict using the model
    with torch.no_grad():
        output = model(**encoded_input)
    
    return output, encoded_input


In [None]:
def extract_and_select_most_common_org(text):
    output, encoded_input = tokenize_and_predict(text)
    predictions = output.logits.argmax(dim=-1).squeeze().tolist()  # Get the predicted class ID for each token
    
    tokens = tokenizer.convert_ids_to_tokens(encoded_input['input_ids'].squeeze().tolist())
    
    entities = []
    current_entity = []
    for token, pred in zip(tokens, predictions):
        entity_label = model.config.id2label[pred]
        if entity_label:
            if entity_label.endswith('ORG'):
                # Remove RoBERTa's space token
                current_entity.append(token.replace('Ġ', ' '))
        else:
            if current_entity:
                entities.append(''.join(current_entity).strip())
                current_entity = []
    
    if current_entity:
        entities.append(''.join(current_entity).strip())
    
    # Count the entities found in this text
    entity_counter = Counter(entities)
    # Select the most common one, or None if no entities are found
    most_common_entity = str(entity_counter.most_common(1)[0][0]) if entity_counter else 'No prediction'
    
    return most_common_entity


In [None]:
tqdm.pandas(desc="Predicting Organizations")

# Apply entity extraction to the cleaned text column with progress tracking
test['Predicted Organization'] = test['Cleaned Text'].progress_apply(extract_and_select_most_common_org)

Predicting Organizations: 100%|██████████| 269/269 [04:53<00:00,  1.09s/it]


### Evaluation

In [None]:
def calculate_pretrained_accuracy(test):
    predicted_orgs = list(test['Predicted Organization'])
    true_orgs = list(test['True Organization'])

    correct_predictions = 0
    for pred, truth in zip(predicted_orgs, true_orgs):
        # normalize the data to lower case to ignore case sensitivity
        pred = str(pred)
        truth = str(truth)
        pred = pred.lower().strip()
        truth = truth.lower().strip()

        # check for exact or partial match
        if pred == truth or pred in truth or truth in pred:
            correct_predictions += 1

    total_predictions = len(predicted_orgs)
    accuracy = correct_predictions / total_predictions
    return f"Accuracy: {accuracy * 100:.2f}%"

Accuracy: 2.23%


In [None]:
def calculated_pretrained_fuzzy_accuracy(test):
    predicted_orgs = list(test['Predicted Organization'])
    true_orgs = list(test['True Organization'])
    
    correct_predictions = 0
    for pred, truth in zip(predicted_orgs, true_orgs):
        # normalize the data to lower case to ignore case sensitivity
        pred = str(pred)
        truth = str(truth)
        pred = pred.lower().strip()
        truth = truth.lower().strip()

        # check for exact, partial, or fuzzy match
        if pred != 'no prediction':
            if fuzz.partial_ratio(pred, truth) >= 80:
                correct_predictions += 1

    total_predictions = len(predicted_orgs)
    accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
    return f"Accuracy: {accuracy * 100:.2f}%"

In [None]:
calculate_pretrained_accuracy(test)
calculated_pretrained_fuzzy_accuracy(test)

### Precision & recall

In [None]:
predicted_orgs = list(test['Predicted Organization'])
true_orgs = list(test['True Organization'])
prediction_presence = []

for pred, truth in zip(predicted_orgs, true_orgs):
    pred = str(pred)
    truth = str(truth)
    pred = pred.lower().strip()
    truth = truth.lower().strip()

    # check if there is any prediction
    if pred != 'no prediction':
        prediction_presence.append(1)
    else:
        prediction_presence.append(0)

print("Prediction Presence:", prediction_presence)
print(f"Presence Array Length: {len(prediction_presence)}")

robbert_preds = prediction_presence

In [None]:
# list with 1 if organization is present in the text, 0 otherwise
# see calculation of actuals in current_method notebook
actuals = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In [None]:
def calculate_precision_recall(predictions, actuals):
    TP = sum(1 for actual, pred in zip(actuals, predictions) if actual == 1 and pred == 1)
    FP = sum(1 for actual, pred in zip(actuals, predictions) if actual == 0 and pred == 1)
    FN = sum(1 for actual, pred in zip(actuals, predictions) if actual == 1 and pred == 0)
    
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0

    return precision, recall

In [None]:
precision_r, recall_r = calculate_precision_recall(robbert_preds, actuals)

print(f"Precision RobBERT pretrained: {precision_r:.2f}")
print(f"Recall RobBERT pretrained: {recall_r:.2f}")

## Finetuned Model

### Load tokenizer and model

In [None]:
from datasets import Dataset, DatasetDict

In [None]:
# Load the tokenizer and model
tokenizer = RobertaTokenizerFast.from_pretrained('pdelobelle/robbert-v2-dutch-ner')
model = RobertaForTokenClassification.from_pretrained('pdelobelle/robbert-v2-dutch-ner', return_dict=True)

# define label mapping
label_list = ["O", "B-ORG", "I-ORG"]
label_map = {label: i for i, label in enumerate(label_list)}

### Prepare data

In [None]:
# helper function to tokenize and align labels
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples['Cleaned Text'], truncation=True, padding='max_length', max_length=512, return_offsets_mapping=True)
    labels = []

    for i, label in enumerate(examples['True Organization']):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = [-100] * len(word_ids)

        org_positions = re.finditer(re.escape(label), examples['Cleaned Text'][i])
        for match in org_positions:
            start, end = match.start(), match.end()
            for idx, word_id in enumerate(word_ids):
                if word_id is None:
                    continue
                if tokenized_inputs['offset_mapping'][i][idx][0] == start:
                    label_ids[idx] = label_map["B-ORG"]
                elif start < tokenized_inputs['offset_mapping'][i][idx][0] < end:
                    label_ids[idx] = label_map["I-ORG"]

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
train_dataset = Dataset.from_pandas(train)
test_dataset = Dataset.from_pandas(test)

train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True, remove_columns=train_dataset.column_names, desc="Tokenizing train dataset")
test_dataset = test_dataset.map(tokenize_and_align_labels, batched=True, remove_columns=test_dataset.column_names, desc="Tokenizing test dataset")

datasets = DatasetDict({
    'train': train_dataset,
    'test': test_dataset
})

### Train the model

In [None]:
# define the training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    learning_rate=3e-5,  
    per_device_train_batch_size=16,  
    per_device_eval_batch_size=16,  
    num_train_epochs=10,  
    weight_decay=0.01,
)

# define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=datasets['train'],
    eval_dataset=datasets['test'],
    tokenizer=tokenizer,
)

In [None]:
trainer.train()

### Evaluation

In [None]:
predictions, label_ids, metrics = trainer.predict(datasets['test'])

In [None]:
# function to align predictions with true labels
def align_predictions(predictions, label_ids):
    preds = np.argmax(predictions, axis=2)

    batch_size, seq_len = preds.shape

    out_label_list = [[] for _ in range(batch_size)]
    preds_list = [[] for _ in range(batch_size)]

    for i in range(batch_size):
        for j in range(seq_len):
            if label_ids[i, j] != -100:
                out_label_list[i].append(label_ids[i][j])
                preds_list[i].append(preds[i][j])

    return preds_list, out_label_list

In [None]:
# map labels to tag names
def get_labels(preds_list, out_label_list):
    preds_tags = []
    true_tags = []
    for preds, true in zip(preds_list, out_label_list):
        preds_tags.append([label_list[p] if p < len(label_list) else "O" for p in preds])
        true_tags.append([label_list[t] if t < len(label_list) else "O" for t in true])
    return preds_tags, true_tags

In [None]:
preds_list, out_label_list = align_predictions(predictions, label_ids)

preds_tags, true_tags = get_labels(preds_list, out_label_list)

In [None]:
# extract entities from tags
def extract_entities(text, tags):
    entities = []
    entity = ""
    for word, tag in zip(text.split(), tags):
        if tag == "B-ORG":
            if entity:
                entities.append(entity)
            entity = word
        elif tag == "I-ORG" and entity:
            entity += " " + word
        else:
            if entity:
                entities.append(entity)
                entity = ""
    if entity:
        entities.append(entity)
    return entities

In [None]:
def calculate_accuracy(test)
    predicted_entities = [extract_entities(text, tags) for text, tags in zip(test['Cleaned Text'], preds_tags)]
    true_entities = [[org] for org in test['True Organization']]
    correct_predictions = 0
    total_predictions = len(true_entities)

    for pred, true in zip(predicted_entities, true_entities):
        pred_str = ' '.join(pred)
        true_str = ' '.join(true)
        if pred_str != '':
            if pred_str in true_str or true_str in pred_str or pred_str == true_str:
                correct_predictions += 1

    accuracy = correct_predictions / total_predictions
    return f"Accuracy: {accuracy * 100:.2f}%"

calculate_accuracy(test)

In [None]:
def calculate_fuzzy_accuracy(test):
    predicted_entities = [extract_entities(text, tags) for text, tags in zip(test['Text'], preds_tags)]
    true_entities = [[org] for org in test['True Organization']]

    # combine predicted entities into a single string for comparison
    combined_predicted_entities = [' '.join(pred).lower() for pred in predicted_entities]

    # compare predicted entities to true entities
    correct_predictions = 0
    total_predictions = len(true_entities)
    prediction_presence = []

    for combined_pred, true in zip(combined_predicted_entities, true_entities):
        true_org = true[0].lower()
        if combined_pred:
            # fuzzy match score for the combined string
            match_score = fuzz.partial_ratio(combined_pred, true_org)

            # consider a prediction correct if the match score is above a certain threshold
            if match_score >= 80:  # You can adjust this threshold as needed
                correct_predictions += 1

    # individual entity matching
    for pred, true in zip(predicted_entities, true_entities):
        true_org = true[0]
        if pred:
            match_scores = [fuzz.partial_ratio(pred_entity.lower(), true_org.lower()) for pred_entity in pred]
            best_match_score = max(match_scores) if match_scores else 0
            if best_match_score >= 80: 
                correct_predictions += 1

    accuracy = correct_predictions / total_predictions
    return f"Accuracy: {accuracy * 100:.2f}%"

In [None]:
calculate_fuzzy_accuracy(test)

### Prediction presence for precision and recall

In [None]:
def prediction_presence(test):
    predicted_entities = [extract_entities(text, tags) for text, tags in zip(test['Cleaned Text'], preds_tags)]
    true_entities = [[org] for org in test['True Organization']]
    prediction_presence = []

    for pred, true in zip(predicted_entities, true_entities):
        pred_str = ' '.join(pred)
        true_str = ' '.join(true)
        if pred_str != '':
            prediction_presence.append(1)
        else:
            prediction_presence.append(0)

    return prediction_presence

robbert_preds = prediction_presence(test)

In [None]:
precision_r, recall_r = calculate_precision_recall(robbert_preds, actuals)

print(f"Precision RobBERT finetuned: {precision_r:.2f}")
print(f"Recall RobBERT finetuned: {recall_r:.2f}")

## Second Dataset

In [None]:
seconddata = pd.read_csv('final_seconddata.csv')

test_dataset = Dataset.from_pandas(seconddata)
secondtest = test_dataset.map(tokenize_and_align_labels, batched=True, remove_columns=test_dataset.column_names, desc="Tokenizing test dataset")

In [None]:
predictions, label_ids, metrics = trainer.predict(secondtest)

In [None]:
preds_list, out_label_list = align_predictions(predictions, label_ids)

preds_tags, true_tags = get_labels(preds_list, out_label_list)

In [None]:
calculate_accuracy(secondtest)
calculate_fuzzy_accuracy(secondtest)

### Document-level accuracy
For the comparison between models / t-tests

In [None]:
true_org_dict = test.set_index('Cleaned Text')['True Organization'].to_dict()
doc_acc = []
total_predictions = len(predictions)

# iterate over test data and check if the predicted organization name matches the true organization
for idx, row in test.iterrows():
    true_org = true_org_dict[row['Cleaned Text']]
    pred_org = predictions[idx]

    # normalize the text for comparison
    true_org = true_org.lower().strip()
    pred_org = pred_org.lower().strip()

    # check if the organization name matches the true organization
    if true_org == pred_org or true_org in pred_org or pred_org in true_org:
        doc_acc.append(1)
    else:
        doc_acc.append(0)

print(doc_acc)
