In [1]:
import csv
import json
import random
import re
from collections import Counter, defaultdict

import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
import seaborn as sns

import torch

from datasets import (
    Dataset,
    DatasetDict,
    concatenate_datasets,
    load_dataset,
    load_from_disk,
)
from transformers import (
    AutoModelForTokenClassification,
    AutoTokenizer,
    DataCollatorForTokenClassification,
    Trainer,
    TrainingArguments,
    set_seed,
)

import evaluate

from nameparser import HumanName
from names_dataset import NameDataset, NameWrapper
from ethnicseer import EthnicClassifier
import nltk
from nltk.corpus import wordnet as wn

import pycountry_convert as pc
import pycountry
import pickle

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, cohen_kappa_score

from transformers import BertTokenizerFast, BertForTokenClassification
from datasets import ClassLabel
from evaluate import load as load_metric
from sklearn.metrics import cohen_kappa_score
from itertools import combinations
import krippendorff




In [None]:
model_name = "bert-base-cased"

In [None]:
conll_main = load_from_disk("./splits/conll_main")

ontonotes_main = load_from_disk("./splits/ontonotes_main")

# Load GPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Tokenisation & Alignment

In [None]:
ontonotes_id_to_label = {
    0: "O", 1: "B-CARDINAL", 2: "B-DATE", 3: "I-DATE", 4: "B-PERSON", 5: "I-PERSON",
    6: "B-NORP", 7: "B-GPE", 8: "I-GPE", 9: "B-LAW", 10: "I-LAW", 11: "B-ORG", 12: "I-ORG",
    13: "B-PERCENT", 14: "I-PERCENT", 15: "B-ORDINAL", 16: "B-MONEY", 17: "I-MONEY",
    18: "B-WORK_OF_ART", 19: "I-WORK_OF_ART", 20: "B-FAC", 21: "B-TIME", 22: "I-CARDINAL",
    23: "B-LOC", 24: "B-QUANTITY", 25: "I-QUANTITY", 26: "I-NORP", 27: "I-LOC",
    28: "B-PRODUCT", 29: "I-TIME", 30: "B-EVENT", 31: "I-EVENT", 32: "I-FAC",
    33: "B-LANGUAGE", 34: "I-PRODUCT", 35: "I-ORDINAL", 36: "I-LANGUAGE"
}

conll_label_to_id = {'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3,
                     'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8}
id2label = {v: k for k, v in conll_label_to_id.items()}

ontonotes_to_conll_entity = {
    "PERSON": "PER", "ORG": "ORG", "GPE": "LOC", "LOC": "LOC",
    "NORP": "MISC", "FAC": "MISC", "EVENT": "MISC", "WORK_OF_ART": "MISC",
    "LAW": "MISC", "PRODUCT": "MISC", "LANGUAGE": "MISC",
    "DATE": None, "TIME": None, "PERCENT": None, "MONEY": None,
    "QUANTITY": None, "ORDINAL": None, "CARDINAL": None
}

In [None]:
def process_data(data_list):

    def process_single(data):
        word_ids = data['word_ids']
        predictions = data['predictions']
        gold = data['gold']
        tokenized_tokens = data['tokens']

        word_ids = [a for a in word_ids if a is not None]

        processed_predictions = []
        processed_gold = []

        current_word_id = None
        current_predictions = []
        current_gold = []

        for idx, word_id in enumerate(word_ids):
            if word_id != current_word_id:
                if current_predictions:
                    processed_predictions.append(
                        Counter(current_predictions).most_common(1)[0][0])
                    processed_gold.append(
                        Counter(current_gold).most_common(1)[0][0])

                current_word_id = word_id
                current_predictions = [predictions[idx]]
                current_gold = [gold[idx]]
            else:
                current_predictions.append(predictions[idx])
                current_gold.append(gold[idx])

        if current_predictions:
            processed_predictions.append(
                Counter(current_predictions).most_common(1)[0][0])
            processed_gold.append(
                Counter(current_gold).most_common(1)[0][0])

        return processed_predictions, processed_gold

    processed_predictions_list = []
    processed_gold_list = []

    for data in data_list:
        processed_predictions, processed_gold = process_single(data)
        processed_predictions_list.append(processed_predictions)
        processed_gold_list.append(processed_gold)

    return processed_predictions_list, processed_gold_list


def evaluate_predictions(p, test_data):
    predictions, labels, _ = p

    pred_indices = [np.argmax(p, axis=-1) for p in predictions]
    label_indices = labels

    pred_tags = [[id2label[p] for p, l in zip(p_seq, l_seq) if l != -100]
                 for p_seq, l_seq in zip(pred_indices, label_indices)]
    gold_tags = [[id2label[l] for l in l_seq if l != -100]
                 for l_seq in label_indices]

    def add_preds(example, idx):
        length = len(example['word_ids'])
        example['predictions'] = pred_tags[idx][:length]
        example['gold'] = gold_tags[idx][:length]
        return example

    test_data = test_data.map(add_preds, with_indices=True)

    length = len(test_data['predictions'][0])

    pred, gold = process_data(test_data)

    flat_pred = [label for seq in pred for label in seq]
    flat_gold = [label for seq in gold for label in seq]

    print(classification_report(flat_gold, flat_pred, zero_division=0))

    return (flat_pred, flat_gold)

# Model

In [None]:
tokenizer = BertTokenizerFast.from_pretrained(model_name)

label_list = ['O', 'B-PER', 'I-PER', 'B-ORG',
              'I-ORG', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC']


def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
        padding=True,
        return_special_tokens_mask=True,
        return_offsets_mapping=True,
    )
    all_word_ids = []
    all_labels = []
    for i, labels in enumerate(examples["labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        all_word_ids.append(word_ids)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            else:
                label_ids.append(labels[word_idx])
            previous_word_idx = word_idx
        all_labels.append(label_ids)

    tokenized_inputs["labels"] = all_labels
    tokenized_inputs["word_ids"] = all_word_ids
    return tokenized_inputs

In [None]:
conll_main = conll_main.map(tokenize_and_align_labels, batched=True)
ontonotes_main = ontonotes_main.map(tokenize_and_align_labels, batched=True)

# Deciding params

In [None]:
train_data = conll_main
test_data = ontonotes_main

In [None]:
train_data_name = 'conll'

In [None]:
mod = BertForTokenClassification.from_pretrained(
    model_name, num_labels=len(label_list))

metric = load_metric("seqeval")


def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    return metric.compute(predictions=true_predictions, references=true_labels)


data_collator = DataCollatorForTokenClassification(tokenizer)

training_args = TrainingArguments(
    output_dir=f"./output/{model_name}",
    eval_strategy="no",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    num_train_epochs=3,
    weight_decay=0.01,
    load_best_model_at_end=True,
    report_to="none",
    fp16=True,
    save_strategy="no",
)

trainer = Trainer(
    model=mod,
    args=training_args,
    train_dataset=train_data,
    eval_dataset=test_data,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

In [None]:
trainer.train()

In [None]:
trainer.save_model(f"./saved_model/{model_name}_{train_data_name}")

In [None]:
predictions = trainer.predict(test_data)

In [None]:
with open(f'./results/{model_name}_{train_data_name}_results.pkl', 'wb') as f:
    pickle.dump(predictions, f)

In [None]:
x, y = evaluate_predictions(predictions, test_data)

# Challenge dataset

## Stanford

In [None]:
df = pd.read_excel(
    "./Guided-Adversarial-Augmentation-main/Guided-Adversarial-Augmentation-main/data/data/conll2003/challenge_set.xlsx", header=None)

In [None]:
examples = []

i = 0
while i < len(df):
    row = df.iloc[i]
    if str(row[0]).startswith("GUID"):

        guid = str(df.iloc[i][1]).strip()

        try:
            quality = int(str(df.iloc[i+1][1]).strip())
        except (ValueError, TypeError):
            quality = 999

        try:
            aug_type = int(str(df.iloc[i+2][1]).strip())
        except (ValueError, TypeError):
            aug_type = 999

        tokens_row = df.iloc[i+3].dropna().tolist()[1:]
        labels_row = df.iloc[i+4].dropna().tolist()[1:]
        labels_row = [label.strip()
                      for label in labels_row if label.strip() != ""]

        if len(tokens_row) == len(labels_row):
            examples.append({
                "guid": guid,
                "quality": quality,
                "aug_type": aug_type,
                "tokens": tokens_row,
                "labels": labels_row
            })

        i += 6
    else:
        i += 1

challenge_dataset = Dataset.from_list(examples)

In [19]:
conll_label_to_id = {
    'O': 0,
    'B-PER': 1, 'I-PER': 2,
    'B-ORG': 3, 'I-ORG': 4,
    'B-LOC': 5, 'I-LOC': 6,
    'B-MISC': 7, 'I-MISC': 8,
}


def encode_labels(example):
    example["labels"] = [conll_label_to_id.get(
        label, 0) for label in example["labels"]]
    return example

In [None]:
challenge_dataset = challenge_dataset.map(encode_labels)

In [None]:
stanford_encoded = challenge_dataset.map(
    tokenize_and_align_labels, batched=True)

In [None]:
stanford_results = CBC_trainer.predict(stanford_encoded)

In [None]:
with open(f'./results/{model_name}_{train_data_name}_stanford', 'wb') as f:
    pickle.dump(stanford_results, f)

In [None]:
x, y = evaluate_predictions(stanford_results, stanford_encoded)

## Personal

In [12]:
def get_names(country, n=10):
    names_dict = nd.get_top_names(n=n, country_alpha2=country)
    names = []
    if country in names_dict:
        country_names = names_dict[country]
        for gender in ['M', 'F']:
            if gender in country_names:
                names.extend(country_names[gender])
    return list(set(names))


def tokenize(text):
    tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE)
    return tokens


def tag_tokens(tokens, person_names):
    tags = ["O"] * len(tokens)
    for name in person_names:
        name_tokens = name.split()
        n_len = len(name_tokens)
        for i in range(len(tokens) - n_len + 1):
            if tokens[i:i + n_len] == name_tokens:
                tags[i] = "B-PER"
                for j in range(i + 1, i + n_len):
                    tags[j] = "I-PER"
    return tags


def generate_challenge_dataset(num_samples=300):
    dataset = []
    failures = 0
    max_failures = 1000

    while len(dataset) < num_samples:
        if failures >= max_failures:
            print(f"Stopped after {failures} failed attempts.")
            break

        country = random.choice(chosen_countries)
        names = get_names(country, n=10)

        # Filter names: keep only those also in common_nouns
        filtered_names = [
            name for name in names if name.lower() in common_nouns]

        if not filtered_names:
            failures += 1
            if failures % 10 == 0:
                print(f"{failures} failed attempts so far.")
            continue

        # Pick one or two names as needed
        if len(filtered_names) == 1:
            name1 = filtered_names[0]
            name2 = None
        else:
            name1, name2 = random.sample(filtered_names, 2)

        template = random.choice(sentence_templates)

        if "{name2}" in template and name2 is None:
            sentence = template.format(name=name1, name2=name1)
            person_names = [name1]
        elif "{name2}" in template:
            sentence = template.format(name=name1, name2=name2)
            person_names = [name1, name2]
        else:
            sentence = template.format(name=name1)
            person_names = [name1]

        tokens = tokenize(sentence)
        tags = tag_tokens(tokens, person_names)
        dataset.append(list(zip(tokens, tags)))
        print(len(dataset))

    return dataset


def save_to_csv(dataset, filepath="default_name.csv"):
    with open(filepath, mode="w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["sentence_id", "token", "tag"])
        for idx, sentence in enumerate(dataset):
            for token, tag in sentence:
                writer.writerow([idx, token, tag])

In [16]:
def evaluate_csv(file):
    print('evaluating', file)
    scripted_challenge = pd.read_csv(file)

    examples = []
    for sentence_id, group in scripted_challenge.groupby("sentence_id"):
        tokens = group["token"].tolist()
        tags = group["tag"].tolist()
        examples.append({"tokens": tokens, "ner_tags": tags})

    unique_tags = sorted(set(tag for ex in examples for tag in ex["ner_tags"]))

    for ex in examples:
        ex["labels"] = [conll_label_to_id[tag] for tag in ex["ner_tags"]]

    challenge_dataset = Dataset.from_list(examples)
    tokenized = challenge_dataset.map(tokenize_and_align_labels, batched=True)
    a = trainer.predict(tokenized)

    with open(f'./results/{model_name}_{train_data_name}_challenge_{file[:-3]}.pkl', 'wb') as f:
        pickle.dump(a, f)

    x, y = evaluate_predictions(a, tokenized)
    print()
    print('-'*30)

In [None]:
for x in list(continent_to_code.keys()):
    evaluate_csv(x+'_ner_challenge.csv')

In [None]:
evaluate_csv('./common_nouns_challenge.csv')

# Seed Variability

In [None]:
seeds = [33, 42, 57, 106, 812, ]
results = []

for seed in seeds:
    set_seed(seed)

    seed_model = AutoModelForTokenClassification.from_pretrained(
        model_name,
        num_labels=len(conll_label_to_id)
    )

    seed_args = TrainingArguments(
        output_dir=f"./output/{seed}",
        seed=seed,
        eval_strategy="no",
        learning_rate=2e-5,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        gradient_accumulation_steps=2,
        num_train_epochs=3,
        weight_decay=0.01,
        load_best_model_at_end=True,
        report_to="none",
        fp16=True,
        logging_steps=1000,
        save_strategy="no",
    )

    seed_trainer = Trainer(
        model=seed_model,
        args=seed_args,
        train_dataset=train_data,
        eval_dataset=test_data,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    seed_trainer.train()

    pred = seed_trainer.predict(test_data)

    x, y = evaluate_predictions(pred, test_data)
    eval_result = {'predictions': x, 'seed': seed}
    results.append(eval_result)
    with open(f'./results/{model_name}_{train_data_name}_seed_var_results.pkl', 'wb') as f:
        pickle.dump(results, f)

In [None]:
with open(f'./results/{model_name}_{train_data_name}_seed_var_results.pkl', 'rb') as f:
    seed_res = pickle.load(f)

In [None]:
kappa_scores = []

for i, j in combinations(range(len(seed_res)), 2):
    preds_i = seed_res[i]['predictions']
    preds_j = seed_res[j]['predictions']

    kappa = cohen_kappa_score(preds_i, preds_j)
    kappa_scores.append(kappa)

    print(f"Cohen's kappa between seed {i} and seed {j}: {kappa:.6f}")

average_kappa = sum(kappa_scores) / len(kappa_scores)
print(f"\nAverage Cohen's kappa across all pairs: {average_kappa:.6f}")

In [None]:
data = np.array([seed['predictions'] for seed in seed_res])

alpha = krippendorff.alpha(reliability_data=data,
                           level_of_measurement='nominal')

print(f"\nKrippendorff’s alpha (nominal): {alpha:.4f}")