Notebook to train a custom FinBERT model on the binary AINI dataset to annotate WSJ articles.

In [None]:
from pathlib import Path
import random
import sqlite3
import pandas as pd
import numpy as np
import torch
import os
from datasets import DatasetDict, Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, set_seed
from transformers import DataCollatorWithPadding, EarlyStoppingCallback,AutoConfig, get_scheduler,BertModel
from IPython.display import display, Markdown 
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
import torch.nn as nn
from transformers.modeling_outputs import SequenceClassifierOutput
import sys
from pathlib import Path
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
import spacy
import nltk

try:
    nlp = spacy.load("en_core_web_sm")
except:
    nltk.download("punkt")
    import en_core_web_sm
    nlp = en_core_web_sm.load()

# append model path
notebook_dir = Path.cwd()
project_root = notebook_dir.parent  
sys.path.insert(0, str(project_root))  

from src.modelling.CustomFinBERT import CustomFinBERT
from src.modelling.ai_windows import extract_multiple_ai_snippets_with_context


In [None]:
# Set project root, assumes being in /notebooks
project_root = Path.cwd().parent

# Path to annotated cvs
articles_dir = project_root / "data" / "processed" / "articles" / "annotated_subsample_WSJ_final.csv"

# read and merge annotated cvs
df = pd.read_csv(articles_dir)

# verify merge
print(f" {len(df)} total rows.")
display(df.head(5))
print(set(df.hype_level.values))
print(df.columns)

In [None]:
# ensure integrity
print(df.corpus.isna().sum())
labeled_df = df.copy()
labeled_df.loc[labeled_df["corpus"].isna(), "corpus"] = df.loc[df["corpus"].isna(), "cleaned_corpus"]

# verify fix
print(labeled_df.corpus.isna().sum())

In [None]:
# rename column
labeled_df.rename(columns={"hype_level": "label"}, inplace=True)

# binary label creation
labeled_df["label"] = labeled_df["label"].apply(lambda x: 1 if x in [1.0, 2.0, 3.0] else 0)

# merging cleaned corpus and title on corpus
labeled_df["corpus"] = "Title: " + labeled_df["title"] + "\n\n" + labeled_df["corpus"]

#  Create class weights
labels = labeled_df["label"]
class_weights = compute_class_weight("balanced", classes=np.unique(labels), y=labels)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)


# Convert to HF Dataset
hf_labeled_df = Dataset.from_pandas(labeled_df)

In [None]:
def custom_truncate(batch, tokenizer):
    encoded = tokenizer(batch["corpus"], add_special_tokens=False)
    input_ids = encoded["input_ids"] # word tokens mapped to ids
    attention_mask = encoded["attention_mask"] # 0 if padded

    truncated_input_ids = []
    truncated_attention_mask = []

    for ids, mask in zip(input_ids, attention_mask):
        
        # limit input length
        if len(ids) > 510:
            new_ids = ids[:128] + ids[-382:]
            new_mask = mask[:128] + mask[-382:]
        else:
            new_ids = ids
            new_mask = mask

        # Add [CLS] and [SEP]
        new_ids = [tokenizer.cls_token_id] + new_ids + [tokenizer.sep_token_id]
        new_mask = [1] + new_mask + [1]

        pad_len = 512 - len(new_ids)
        new_ids += [tokenizer.pad_token_id] * pad_len
        new_mask += [0] * pad_len

        truncated_input_ids.append(new_ids)
        truncated_attention_mask.append(new_mask)

    return {"input_ids": truncated_input_ids, "attention_mask": truncated_attention_mask}


In [None]:
# Set seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
set_seed(seed)

# Convert to pandas and extract AI-focused snippets
df_labeled = hf_labeled_df.to_pandas()
df_labeled = extract_multiple_ai_snippets_with_context(df_labeled, text_col='corpus', output_col='ai_window',context_window=2)

# Stratified split: 70% train, 30% temp
train_df, temp_df = train_test_split(
    df_labeled,
    test_size=0.3,
    stratify=labeled_df["label"],
    random_state=seed
)

# Stratified split: 2/3 test, 1/3 eval from temp (→ 20% / 10%)
eval_df, test_df  = train_test_split(
    temp_df,
    test_size=1/3,
    stratify=temp_df["label"],
    random_state=seed
)


# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")

# Tokenization function
def tokenize(example):
    return tokenizer(
        example["ai_window"],
        truncation=True,
        padding="max_length",
        max_length=512
    )

# Convert to Hugging Face Datasets
train_dataset = Dataset.from_pandas(train_df.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df.reset_index(drop=True))
eval_dataset = Dataset.from_pandas(eval_df.reset_index(drop=True))

# fixing label for tensors
def fix_labels(example):
    return {"labels": int(example["label"])}

train_dataset = train_dataset.map(tokenize)
train_dataset = train_dataset.map(fix_labels)

eval_dataset = eval_dataset.map(tokenize)
eval_dataset = eval_dataset.map(fix_labels)

test_dataset = test_dataset.map(tokenize)
test_dataset = test_dataset.map(fix_labels)



# prepare dictionary for trainer
dataset_dict = DatasetDict({
    "train": train_dataset,
    "test": test_dataset,
    "eval": eval_dataset
})

# create class weights
labels = labeled_df["label"]
class_weights = compute_class_weight(class_weight="balanced", classes=np.unique(labels), y=labels)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)

In [None]:
# Define model path and label mappings
model_path = "ProsusAI/finbert"
id2label = {0: "No narrative", 1: "narrative"}
label2id = {v: k for k, v in id2label.items()}

# Load and customize config
config = AutoConfig.from_pretrained(
    model_path,
    num_labels=2,
    id2label=id2label,
    label2id=label2id,
    hidden_dropout_prob=0.1,
    attention_probs_dropout_prob=0.1
)

# Load pretrained backbone
backbone = BertModel.from_pretrained(model_path)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_weights_tensor = class_weights_tensor.to(device)

# Instantiate the model
model = CustomFinBERT(backbone, class_weights_tensor, config).to(device)

print(f"Model is on device: {next(model.parameters()).device}")

# Format datasets
columns = ["input_ids", "attention_mask", "labels"]
train_dataset.set_format(type="torch", columns=columns)
test_dataset.set_format(type="torch", columns=columns)
eval_dataset.set_format(type="torch", columns=columns)

In [None]:
# narrativerparameters 
lr = 2e-5
batch_size = 4
num_epochs = 15

# data collator for padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

training_args = TrainingArguments(
    output_dir= project_root / "models" / "FINBERT_Binary",
    learning_rate=lr,
    logging_dir= project_root / "models" / "FINBERT_Binary",
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_epochs,
    logging_strategy="epoch",
    eval_strategy="epoch",
    save_strategy="epoch",
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    load_best_model_at_end=True,
    save_total_limit=None
)

In [None]:
# Freeze all parameters in the FinBERT encoder
for name, param in model.backbone.named_parameters():
    param.requires_grad = False

# Unfreeze the classification head and optional with ( name or "encoder.layer.11" in ) the last encoder layer (layer.11)
for name, param in model.named_parameters():
    if "classifier" or "encoder.layer.11" in name:
        param.requires_grad = True


# Print a summary of how many parameters are trainable
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Trainable parameters: {trainable_params:,} / {total_params:,} ({trainable_params / total_params:.2%})")

# Optionally, list all parameter names that are currently trainable
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"[✓] {name}")


In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    return {
        "accuracy": accuracy_score(labels, preds),
        "f1_macro": f1_score(labels, preds, average="macro"),
        "precision_macro": precision_score(labels, preds, average="macro", zero_division=0),
        "recall_macro": recall_score(labels, preds, average="macro", zero_division=0),
        "f1_weighted": f1_score(labels, preds, average="weighted"),
        "precision_weighted": precision_score(labels, preds, average="weighted", zero_division=0),
        "recall_weighted": recall_score(labels, preds, average="weighted", zero_division=0),
    }


In [None]:
# Sanity check on label values
unique_labels = set(train_dataset["labels"].tolist())
print("Unique labels in training data:", unique_labels)
assert all(label in [0, 1] for label in unique_labels), "Found unexpected label values!"

# Ensure GPU is used   
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

In [None]:
# train them model
    
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset, # 70% training set
    eval_dataset=eval_dataset, # 20% training set
    tokenizer=tokenizer,
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding=True),
    compute_metrics=compute_metrics, # metrics as defined above
    callbacks=[EarlyStoppingCallback(early_stopping_patience=4)],
)

trainer.train()

In [None]:
# Define save path
project_root = Path.cwd().parent
save_dir = project_root / "models" / "FinBERT_Binary" 
save_dir.mkdir(parents=True, exist_ok=True)

# Save model, tokenizer and configd
trainer.save_model(save_dir)
tokenizer.save_pretrained(save_dir)
config.save_pretrained(save_dir)  

# Extract log history from training
log_history = trainer.state.log_history

# Convert to DataFrame
log_df = pd.DataFrame(log_history)

# Filter out entries without epoch information
log_df = log_df[log_df['epoch'].notna()].copy()

# Save full training logs
log_df.to_csv(f"{save_dir}/full_training_metrics.csv", index=False)
#print(log_df)

# Save optimizer and scheduler state
torch.save(trainer.optimizer.state_dict(), f"{save_dir}/optimizer.pt")
torch.save(trainer.lr_scheduler.state_dict(), f"{save_dir}/scheduler.pt")
torch.save(class_weights_tensor, save_dir / "class_weights.pt")

# Save config details
with open(f"{save_dir}/config_summary.txt", "w") as f:
    f.write(f"Model: {save_dir}\n")
    f.write(f"Learning Rate: {training_args.learning_rate}\n")
    f.write(f"Epochs: {training_args.num_train_epochs}\n")
    f.write(f"Batch Size: {training_args.per_device_train_batch_size}\n")
    f.write("Tokenizer: ProsusAI/finbert\n")
    f.write("Special preprocessing: large_ai_snippets_with_context=2\n")
    f.write("Additional finetuning: encoder.layer.11 \n")
    f.write("Label mapping: {0: 'No narrative', 1: 'narrative'}\n")

# Run prediction on eval set (not test set)
eval_output = trainer.predict(eval_dataset)

# Extract predictions and true labels
eval_preds = np.argmax(eval_output.predictions, axis=1)
eval_labels = eval_output.label_ids

# Create DataFrame with predictions
eval_results_df = pd.DataFrame({
    "text": eval_dataset["ai_window"],
    "true_label": eval_labels,
    "predicted_label": eval_preds
})

# Map label IDs to names
id2label =  {0: 'No narrative', 1: 'narrative'}
eval_results_df["true_label_name"] = eval_results_df["true_label"].map(id2label)
eval_results_df["predicted_label_name"] = eval_results_df["predicted_label"].map(id2label)

# Save predictions
eval_results_df.to_csv(f"{save_dir}/eval_predictions.csv", index=False)

# Save evaluation classification report
report = classification_report(eval_labels, eval_preds, target_names=list(id2label.values()))
with open(f"{save_dir}/eval_classification_report.txt", "w") as f:
    f.write(report)

print(report)

In [None]:
#Run prediction
output = trainer.predict(eval_dataset)

# Extract predicted and true labels
preds = np.argmax(output.predictions, axis=1)
true = output.label_ids

# Define label mappings
id2label = {0: "No narrative", 1: "narrative"}

# Create full DataFrame
results_df = pd.DataFrame({
    "text": eval_dataset["ai_window"],
    "true_label": true,
    "predicted_label": preds,
    "true_label_name": [id2label[i] for i in true],
    "predicted_label_name": [id2label[i] for i in preds]
})

# Filter misclassified samples
misclassified_df = results_df[results_df["true_label"] != results_df["predicted_label"]]

# Save to CSV
misclassified_df.to_csv("misclassified_eval_samples.csv", index=False)

# Adjust pandas display options for full text visibility
pd.set_option("display.max_colwidth", None)
# Save model state dict manually
torch.save(model.state_dict(), save_dir / "pytorch_model.bin")

# Save config and tokenizer
config.save_pretrained(save_dir)
tokenizer.save_pretrained(save_dir)

Just for FINAL evaluation:

In [None]:
# Extract log history
log_history = trainer.state.log_history

# Convert to DataFrame
log_df = pd.DataFrame(log_history)

# 3. Filter out irrelevant entries (like those without epoch info)
log_df = log_df[log_df['epoch'].notna()].copy()

# Save model and tokenizer
trainer.save_model(save_dir)
tokenizer.save_pretrained(save_dir)

# Save full logs with all metrics
log_df.to_csv(f"{save_dir}/full_training_metrics.csv", index=False)

# Print or display metrics
print(log_df)

# Save optimizer and learning rate scheduler state
torch.save(trainer.optimizer.state_dict(), f"{save_dir}/optimizer.pt")
torch.save(trainer.lr_scheduler.state_dict(), f"{save_dir}/scheduler.pt")

# write model details to txt
with open(f"{save_dir}/config_summary.txt", "w") as f:
    f.write(f"Model: {save_dir}\n")
    f.write(f"Learning Rate: {training_args.learning_rate}\n")
    f.write(f"Epochs: {training_args.num_train_epochs}\n")
    f.write(f"Batch Size: {training_args.per_device_train_batch_size}\n")
    f.write("Tokenizer: ProsusAI/finbert\n")
    f.write("Special preprocessing: large_ai_snippets_with_context=2\n")
    f.write("Additional finetuning:  ")
    f.write("Label mapping: {0: 'No narrative', 1: 'narrative'}\n")

# Run prediction on test set
test_output = trainer.predict(test_dataset)

# Extract predictions and true labels
test_preds = np.argmax(test_output.predictions, axis=1)
test_labels = test_output.label_ids

# Save raw predictions and true labels to CSV
test_results_df = pd.DataFrame({
    "text": test_dataset["ai_window"],
    "true_label": test_labels,
    "predicted_label": test_preds
})

# map to label names
id2label = {0: "No narrative", 1: "narrative"}
test_results_df["true_label_name"] = test_results_df["true_label"].map(id2label)
test_results_df["predicted_label_name"] = test_results_df["predicted_label"].map(id2label)

# Save to CSV
test_results_df.to_csv(f"{save_dir}/test_predictions.csv", index=False)

# Save classification report (precision, recall, f1)
report = classification_report(test_labels, test_preds, target_names=list(id2label.values()))
with open(f"{save_dir}/test_classification_report.txt", "w") as f:
    f.write(report)    

In [None]:
print(report)