In [None]:
# Install dependencies (run this in a notebook cell, not needed in code below)
!pip install transformers datasets seqeval torch
!pip install datasets
!pip install seqeval --upgrade

In [None]:
import torch
import random
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datasets import load_dataset
from collections import defaultdict, Counter
from transformers import AutoTokenizer, AutoModelForTokenClassification, TokenClassificationPipeline
from seqeval.metrics import classification_report, precision_score, recall_score, f1_score
from sklearn.metrics import precision_score as sk_precision, recall_score as sk_recall, f1_score as sk_f1

# Sets device
device = 0 if torch.cuda.is_available() else -1
print(f"Device set to use {'cuda:0' if device == 0 else 'cpu'}")

# Loads dataset
dataset = load_dataset("dell-research-harvard/newswire")
full_data = dataset["train"]

# Selects 10000 random rows from the dataset
# Maintaining seed set by teammate
random.seed(42)
indices = random.sample(range(len(full_data)), 10000)
small_corpus = full_data.select(indices)

# Shuffle and split
random.seed(42)
indices = list(range(len(small_corpus)))
random.shuffle(indices)
split_idx = int(0.8 * len(indices))
train_data = small_corpus.select(indices[:split_idx])
test_data = small_corpus.select(indices[split_idx:])

label_names = [
    "O", "B-PER", "I-PER", "B-ORG", "I-ORG",
    "B-LOC", "I-LOC", "B-MISC", "I-MISC"
]

# Model dictionary
models_to_compare = {
    "Custom Historical NER": "dell-research-harvard/historical_newspaper_ner",
    "RoBERTa-Large (ConLL03)": "Jean-Baptiste/roberta-large-ner-english",
    "BERT-Large (ConLL03)": "dbmdz/bert-large-cased-finetuned-conll03-english",
    "DistilBERT (ConLL03)": "elastic/distilbert-base-uncased-finetuned-conll03-english"
}

# Custom palette created to match the colours from within the original paper images
custom_palette = [
    "#F4A300",
    "#4BE3AC",
    "#9B5DE5",
    "#FF6B6B",
    "#033268",
    "#E777C2",
    "#6BA3D1",
    "#CFA15A",
    "#C0D2C1",
    "#F1C6C6",
    "#F5D4A1",
    "#E2B97C",
    "#EDE9E0",
]

## Consolidating code
#
# Defines prediction function
def get_predictions(model_name, test_data):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)
    pipe = TokenClassificationPipeline(model=model, tokenizer=tokenizer, aggregation_strategy="simple", device=device)

    true_labels, pred_labels = [], []

    for row in test_data:
        words = row["ner_words"]
        gold = row["ner_labels"]
        sentence = " ".join(words)
        prediction = pipe(sentence)

        # Initialize with o
        pred_seq = ["O"] * len(words)
        for ent in prediction:
            entity = ent.get("entity_group", ent.get("entity"))
            word = ent["word"]

            # Match predicted word to a token
            for idx, w in enumerate(words):
                if word.lower() in w.lower() and pred_seq[idx] == "O":
                    pred_seq[idx] = "B-" + entity
                    break

        gold_seq = gold
        true_labels.append(gold_seq)
        pred_labels.append(pred_seq)

    return true_labels, pred_labels

# Flatten labels
def flatten_labels(true_labels, pred_labels):
    return (
        [label for seq in true_labels for label in seq],
        [label for seq in pred_labels for label in seq]
    )

# Evaluation
def evaluate_model(name, true_labels, pred_labels):
    print(f"\n=== {name} ===")
    flat_true, flat_pred = flatten_labels(true_labels, pred_labels)
    print(f"Precision: {sk_precision(flat_true, flat_pred, average='macro', zero_division=1)}")
    print(f"Recall: {sk_recall(flat_true, flat_pred, average='macro', zero_division=1)}")
    print(f"F1 Score: {sk_f1(flat_true, flat_pred, average='macro', zero_division=1)}")
    print("\nDetailed Report:\n", classification_report(true_labels, pred_labels, zero_division=1))

results = {}
detailed_results = {}

for name, model_path in models_to_compare.items():
    print(f"Evaluating {name}...")
    true, pred = get_predictions(model_path, test_data)
    evaluate_model(name, true, pred)
    flat_true, flat_pred = flatten_labels(true, pred)
    results[name] = {
        "Precision": sk_precision(flat_true, flat_pred, average="macro", zero_division=1),
        "Recall": sk_recall(flat_true, flat_pred, average="macro", zero_division=1),
        "F1 Score": sk_f1(flat_true, flat_pred, average="macro", zero_division=1)
    }
    detailed_results[name] = classification_report(true, pred, output_dict=True)

results_df = pd.DataFrame(results).T.sort_values("F1 Score", ascending=False)
print(results_df)

In [None]:
# Plots global scores
def plot_global_results(df):
    x = range(len(df))
    width = 0.25
    colors = custom_palette[:3]

    fig, ax = plt.subplots(figsize=(8, 6))
    ax.bar([i - width for i in x], df["Precision"], width=width, label="Precision", color=colors[0])
    ax.bar(x, df["Recall"], width=width, label="Recall", color=colors[1])
    ax.bar([i + width for i in x], df["F1 Score"], width=width, label="F1", color=colors[2])
    ax.set_xticks(x)
    ax.set_xticklabels(df.index, rotation=45, ha="right")
    ax.set_ylabel("Score")
    ax.set_title("Overall Metrics by Model")
    ax.legend()
    plt.tight_layout()
    plt.show()

plot_global_results(results_df)

In [None]:
# Plot per-label F1
def plot_label_scores(detail_results):
    labels = sorted({label for r in detail_results.values() for label in r if "avg" not in label and label != "accuracy"})
    data = {model: {label: detail_results[model].get(label, {}).get("f1-score", 0) for label in labels} for model in detail_results}
    df = pd.DataFrame(data)
    df.plot(kind="bar", figsize=(10, 5), colormap='tab20c')
    plt.title("F1 Score by Entity Label")
    plt.ylabel("F1 Score")
    plt.xlabel("Entity Label")
    plt.legend(title="Model", bbox_to_anchor=(1.05, 1), loc="upper left")
    plt.tight_layout()
    plt.show()

plot_label_scores(detailed_results)

In [None]:
# Topic distribution plot
def plot_topic_distribution(df):
    topic_counts = df['ca_topic'].value_counts()
    plt.figure(figsize=(12, 8))
    sns.barplot(y=topic_counts.index, x=topic_counts.values, palette=custom_palette[:len(topic_counts)])
    plt.title('Distribution of Articles by Topic')
    plt.xlabel('Number of Articles')
    plt.ylabel('Topic')
    plt.tight_layout()
    plt.show()

plot_topic_distribution(pd.DataFrame(small_corpus))

In [None]:
# Entity distribution by topic
def plot_entity_distribution_by_topic(df):
    entity_counts_by_topic = defaultdict(Counter)
    for _, row in df.iterrows():
        entity_counts_by_topic[row['ca_topic']].update(row['ner_labels'])

    entity_counts_df = pd.DataFrame(entity_counts_by_topic).T.fillna(0).astype(int)
    entity_counts_df.plot(kind='barh', stacked=True, figsize=(12, 8), color=custom_palette[:len(entity_counts_df.columns)])
    plt.title('Entity Distribution by Topic')
    plt.xlabel('Count of Entities')
    plt.ylabel('Topic')
    plt.legend(title='Entity Types', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()
    plt.show()

plot_entity_distribution_by_topic(pd.DataFrame(small_corpus))

In [None]:
# Entity type distribution
def plot_entity_distribution(true_labels, title="Entity Type Distribution"):
    flat_labels = [label for seq in true_labels for label in seq]
    label_counts = Counter(flat_labels)
    plt.figure(figsize=(6, 4))
    sns.barplot(x=list(label_counts.keys()), y=list(label_counts.values()), palette=custom_palette[:len(label_counts)])
    plt.title(title)
    plt.xlabel("Entity Type")
    plt.ylabel("Count")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

plot_entity_distribution(true, title="Entity Distribution: Last Model Evaluated")

In [None]:
# Topic-wise metrics
def topic_metrics_by_model():
    topic_metrics = defaultdict(lambda: defaultdict(dict))
    for model_name, model_path in models_to_compare.items():
        true, pred = get_predictions(model_path, test_data)
        topics = [row["ca_topic"] for row in test_data]
        for topic in set(topics):
            subset = [(t, p) for t, p, tp in zip(true, pred, topics) if tp == topic]
            if not subset:
                continue
            flat_true = [label for seq, _ in subset for label in seq]
            flat_pred = [label for _, seq in subset for label in seq]
            topic_metrics[model_name][topic] = {
                "Precision": sk_precision(flat_true, flat_pred, average="macro", zero_division=1),
                "Recall": sk_recall(flat_true, flat_pred, average="macro", zero_division=1),
                "F1 Score": sk_f1(flat_true, flat_pred, average="macro", zero_division=1)
            }
    return topic_metrics

# Plotting topic-wise metrics
topic_metrics = topic_metrics_by_model()

df_topic = pd.concat({
    model: pd.DataFrame.from_dict(metrics, orient="index")
    for model, metrics in topic_metrics.items()
}).reset_index()

df_topic.columns = ["Model", "Topic", "Precision", "Recall", "F1 Score"]
plot_df = df_topic.melt(id_vars=["Model", "Topic"], var_name="Metric", value_name="Score")

In [None]:
sns.set(style="whitegrid")
sns.set_context("talk", font_scale=2.5)

g = sns.catplot(
    data=plot_df,
    kind="bar",
    x="Score",
    y="Topic",
    hue="Model",
    col="Metric",
    palette=custom_palette,
    height=20,
    aspect=1.2,
    sharex=False
)

g.set_titles("{col_name}")
g.set_xlabels("Score")
g.set_ylabels("Topic")

for ax in g.axes.flat:
    ax.tick_params(axis='x', labelsize=30)
    ax.tick_params(axis='y', labelsize=30)
    ax.set_title(ax.get_title(), fontsize=50)

g._legend.get_title().set_fontsize(15)
for label in g._legend.get_texts(15):
    label.set_fontsize(15)

plt.tight_layout()
plt.show()
