## Prepare Data for Sequence Labelling Task (Propaganda Span Detection)

In [4]:
import os
import pandas as pd

# prepare the data. Read tsv files into a dataframe and drop not_propaganda instances. 
def load_data_and_return_dataframe(data_dir="../data", propaganda_only=False):
    train_data_path = os.path.join(data_dir, "propaganda_train.tsv")
    test_data_path = os.path.join(data_dir, "propaganda_val.tsv")
    
    train_data = pd.read_csv(train_data_path, delimiter="\t")
    test_data = pd.read_csv(test_data_path, delimiter="\t")
    
    if propaganda_only:
        train_data = train_data[train_data['label'] != 'not_propaganda'].reset_index(drop=True)
        test_data = test_data[test_data['label'] != 'not_propaganda'].reset_index(drop=True)

    return train_data, test_data

train_data, test_data = load_data_and_return_dataframe(propaganda_only=True)

In [5]:
def convert_to_bio_for_span_detection(sentence, label):
    """
    Convert sentence into tokens and BIO labels for classification
    """
    # remove BOS and EOS tags and tokenize
    tokens = sentence.replace('<BOS>', ' <BOS> ').replace('<EOS>', ' <EOS> ').split()

    span_start = tokens.index('<BOS>')
    span_end = tokens.index('<EOS>')

    # Remove the markers
    tokens = [t for t in tokens if t not in ('<BOS>', '<EOS>')]
    bio_labels = ['O'] * len(tokens)
    bio_labels[span_start] = 'B-PROP'
    
    for i in range(span_start + 1, span_end - 1):
        bio_labels[i] = 'I-PROP'

    return tokens, bio_labels

def process_data_for_bio(dataframe, func):
    all_tokens = []
    all_labels = []

    for label, sentence in dataframe.values:
        tokens, bio_tags = func(sentence, label)
        all_tokens.append(tokens)
        all_labels.append(bio_tags)

    return all_tokens, all_labels

In [6]:
from datasets import Dataset

# Apply to train/test
tokens_train, tags_train = process_data_for_bio(train_data, convert_to_bio_for_span_detection)
tokens_test, tags_test = process_data_for_bio(test_data, convert_to_bio_for_span_detection)

# Make label2id map
all_labels_set = sorted(set(tag for seq in tags_train for tag in seq))
label2id = {label: i for i, label in enumerate(all_labels_set)}
id2label = {i: label for label, i in label2id.items()}

train_dataset = Dataset.from_dict({
    "tokens": tokens_train,
    "labels": [[label2id[t] for t in seq] for seq in tags_train]
})
test_dataset = Dataset.from_dict({
    "tokens": tokens_test,
    "labels": [[label2id[t] for t in seq] for seq in tags_test]
})

In [7]:
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification

model_name = "SpanBERT/spanbert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForTokenClassification.from_pretrained(
    model_name,
    num_labels=len(label2id),
    id2label=id2label,
    label2id=label2id,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at SpanBERT/spanbert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(28996, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12

## Tokenize Inputs

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        is_split_into_words=True,
        max_length=256,
        truncation=True,
    )

    labels = []
    for i, label_seq in enumerate(examples["labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        previous_word_idx = None
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label_seq[word_idx])
            else:
                # for the subword, convert B to I or repeat label
                label_name = id2label[label_seq[word_idx]]
                if label_name.startswith("B-"):
                    label_name = "I-" + label_name[2:]
                label_ids.append(label2id[label_name])
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Tokenize
tokenized_train = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_test = test_dataset.map(tokenize_and_align_labels, batched=True)

## Define Compute Metrics Function Using seqeval's F1 Score

In [None]:
from seqeval.metrics import f1_score, precision_score, recall_score
import numpy as np

def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=2)
    labels = p.label_ids

    true_preds = []
    true_labels = []

    for pred_seq, label_seq in zip(preds, labels):
        pred_labels = []
        true_labels_seq = []
        for p_i, l_i in zip(pred_seq, label_seq):
            if l_i != -100:
                pred_labels.append(id2label[p_i])
                true_labels_seq.append(id2label[l_i])
        true_preds.append(pred_labels)
        true_labels.append(true_labels_seq)

    # Sequence-level (seqeval)
    seq_f1 = f1_score(true_labels, true_preds)

    return {
        "seq_f1": seq_f1,
    }

## Define Training Arguements and Data Collator

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    save_strategy="epoch",
    num_train_epochs=15,
    learning_rate=2e-05,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="seq_f1",
    greater_is_better=True,
    logging_dir="./logs",
    logging_steps=100,
)

In [None]:
from transformers import DataCollatorForTokenClassification

# init data collator for appropriate padding
data_collator = DataCollatorForTokenClassification(tokenizer)

## Train Model

In [None]:
from transformers import Trainer, EarlyStoppingCallback

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

trainer.train()

## Savel Model

In [None]:
trainer.save_model("spanbert-for-propaganda-span-detection")