# Libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import re

## Load Dataset

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
with open('/content/drive/My Drive/MasterThesisDraft/trainset/training_set_task2.txt', 'r') as file:
    data_one = json.load(file)
df_train = pd.json_normalize(data_one, record_path=['labels'], meta=['id', 'text'])

# data_two contains the list of dic from the devset
with open('/content/drive/My Drive/MasterThesisDraft/trainset/dev_set_task2.txt', 'r') as file:
    data_two = json.load(file)

# data_three contains the list of dic from the devset
with open('/content/drive/My Drive/MasterThesisDraft/testset/test_set_task2.txt', 'r') as file:
    data_three = json.load(file)

# Both datasets are flattened/normalised to make comparison
df_dev = pd.json_normalize(data_two, record_path=['labels'], meta=['id', 'text'])
df_test = pd.json_normalize(data_three, record_path=['labels'], meta=['id', 'text'])

## Functions

In [4]:
def tokenisation(text):
    tokens = re.split(r'\s+', text.strip())
    return tokens

In [5]:
def numerical_mask(text):
    token_count = 0
    numerical_format = []
    done= False
    for char in text:
        if char == ' ':
            numerical_format.append(-1)
            if not done:
                token_count += 1
                done = True
            #token_count += 1
        elif char == '\n':
            numerical_format.append(-2)
            if not done:
                token_count += 1
                done = True
        else:
            numerical_format.append(token_count)
            done = False

    return numerical_format

In [6]:
def char_to_word_level(numerical_format,spans):
    new_spans = []
    for span in spans:
        #print(span)
        if numerical_format[span['start']] != -1 and numerical_format[span['start']] != -2:
            start = numerical_format[span['start']]
            #print(start)
        else:
            for i in range (span['start'],len(numerical_format)):
                if numerical_format[i] != -1 and numerical_format[i] != -2:
                    start = numerical_format[i]
                    break

        if numerical_format[(span['end'] - 1)] != -1 and numerical_format[(span['end'] - 1)] != -2:
            end = numerical_format[(span['end']-1)]
            #print(end)
        else:
            for i in range((span['end'] - 1),0, -1):
                if numerical_format[i] != -1 and numerical_format[i] != -2:
                    end = numerical_format[i]
                    break

        new_spans.append({'start':start,'end': end, 'technique':span['technique'], 'text_fragment': span['text_fragment']})

    return new_spans

In [7]:

def tokenise_text_and_spans_char_to_word_level(data):
    new_data = []

    for i in range(len(data)):
        #print(i)
        if(data[i]['labels']):
            text_numerical_mask = numerical_mask(data[i]['text'])
            new_spans = char_to_word_level(text_numerical_mask, data[i]['labels'])
            new_data.append({'id':data[i]['id'],'text':tokenisation(data[i]['text']),'labels':new_spans})
        else:
            new_data.append({'id':data[i]['id'],'text':tokenisation(data[i]['text']),'labels':data[i]['labels']})

    return new_data

In [8]:
## Function which identifies independent spans
def find_independent_spans(labels,df_train):
    # Sort spans by their end position
    labels.sort(key=lambda x: x['end'])

    total_overlapping_spans = 0
    independent_spans = []
    last_end = -1
    techniques = df_train['technique'].unique()
    techniques_dict = {technique: 0 for technique in techniques}

    for label in labels:
        if label['start'] > last_end:
            independent_spans.append(label)
            last_end = label['end']
        else:
            techniques_dict[label['technique']] += 1
            total_overlapping_spans += 1


    return independent_spans, techniques_dict, total_overlapping_spans

In [9]:
# Function to tokenize and align labels
def tokenize_and_align_labels(short_dataset, list_name,label_all_tokens=True ):
    # Tokenize the input tokens with truncation and word splitting enabled
    tokenized_inputs = tokenizer(short_dataset, truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(list_name):
        # Get the word IDs for the i-th example in the batch
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word ID that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    # Add the aligned labels to the tokenized inputs
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [10]:
def removing_overlapping_enclosing_spans(data, df):
    new_data = []

    for i in range(0,len(data)):
        if(data[i]['labels']):
            temp_spans, _, _ = find_independent_spans(data[i]['labels'],df)
            new_data.append({'id':data[i]['id'],'text':data[i]['text'],'labels':temp_spans})
        else:
            new_data.append({'id':data[i]['id'],'text':data[i]['text'],'labels':data[i]['labels']})

    return new_data

In [11]:
def labels_to_list_and_tokens_in_list(new_info):
    data_preprocessed_x = []
    data_preprocessed_y = []

    for text in new_info:
        data_preprocessed_x.append(text['text'])
        temp = [0 for i in range(len(text['text']))]

        for span in text['labels']:
            temp[span['start']] = 1
            for i in range(span['start'] + 1, span['end'] + 1):
                temp[i] = 2

        data_preprocessed_y.append(temp)

    return data_preprocessed_x, data_preprocessed_y

In [12]:
def convert_to_start_end_labels(dataset):
    tokenised_text = []
    span_start = []
    span_end = []

    for text in dataset:
        tokenised_text.append(text['text'])

        temp_start = [0 for i in range(len(text['text']))]
        temp_end = [0 for i in range(len(text['text']))]

        for span in text['labels']:
            temp_start[span['start']] = 1
            temp_end[span['end']] = 1


        span_start.append(temp_start)
        span_end.append(temp_end)


    return tokenised_text, span_start, span_end

In [13]:
# Function to tokenize and align labels
def tokenize_and_align_labels_for_start_end(dataset, start_list, end_list,label_all_tokens=True ):
    # Tokenize the input tokens with truncation and word splitting enabled
    tokenized_inputs = tokenizer(dataset, truncation=True, is_split_into_words=True)

    start_labels = []
    for i, label in enumerate(start_list):
        # Get the word IDs for the i-th example in the batch
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word ID that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        start_labels.append(label_ids)

    end_labels = []
    for i, label in enumerate(end_list):
        # Get the word IDs for the i-th example in the batch
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word ID that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        end_labels.append(label_ids)

    # Add the aligned labels to the tokenized inputs
    tokenized_inputs["span_start"] = start_labels
    tokenized_inputs["span_end"] = end_labels
    return tokenized_inputs

## Approach 1: Token Classification (NER Method)

#### 1: Removing Overlapping + Enclosing Spans

In [29]:
concat_data_one_data_two = data_one + data_two

In [30]:
removed_overlappping_enclosing_spans_trainset = removing_overlapping_enclosing_spans(concat_data_one_data_two, df_train)
removed_overlappping_enclosing_spans_testset = removing_overlapping_enclosing_spans(data_three, df_test)

#### 2: Tokenisation and character to word level

In [31]:
tokenised_and_word_level_trainset = tokenise_text_and_spans_char_to_word_level(removed_overlappping_enclosing_spans_trainset)
tokenised_and_word_level_testset = tokenise_text_and_spans_char_to_word_level(removed_overlappping_enclosing_spans_testset)

In [32]:
tokenised_and_word_level_validation_set = tokenised_and_word_level_testset[:100]
tokenised_and_word_level_test_set = tokenised_and_word_level_testset[100:]

#### 3: Converting the labels to label vectors

In [33]:
x_train, y_train = labels_to_list_and_tokens_in_list(tokenised_and_word_level_trainset)
x_val, y_val = labels_to_list_and_tokens_in_list(tokenised_and_word_level_validation_set)
x_test, y_test = labels_to_list_and_tokens_in_list(tokenised_and_word_level_test_set)

#### Align the tokens and labels with the tokenizer

In [34]:
from transformers import AutoTokenizer, AutoModelForTokenClassification
# spanbert-base-cased
#tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
#model = AutoModelForTokenClassification.from_pretrained("bert-base-uncased", num_labels=3)
tokenizer = AutoTokenizer.from_pretrained("SpanBERT/spanbert-base-cased")
model = AutoModelForTokenClassification.from_pretrained("SpanBERT/spanbert-base-cased", num_labels=3)



config.json:   0%|          | 0.00/413 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/215M [00:00<?, ?B/s]

  return torch.load(checkpoint_file, map_location=map_location)
Some weights of BertForTokenClassification were not initialized from the model checkpoint at SpanBERT/spanbert-base-cased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [35]:
trainset_tokenized = tokenize_and_align_labels(x_train, y_train)
valset_tokenized = tokenize_and_align_labels(x_val, y_val)
testset_tokenized = tokenize_and_align_labels(x_test, y_test)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [36]:
# BERT's tokenizer returns the dataset in the form of a dictionary of lists (sentences).
# we have to convert it into a list of dictionaries for training.
def turn_dict_to_list_of_dict(d):
    new_list = []

    for labels, inputs in zip(d["labels"], d["input_ids"]):
        entry = {"input_ids": inputs, "labels": labels}
        new_list.append(entry)

    return new_list

In [37]:
tokenised_train = turn_dict_to_list_of_dict(trainset_tokenized)
tokenised_val = turn_dict_to_list_of_dict(valset_tokenized)
tokenised_test = turn_dict_to_list_of_dict(testset_tokenized)

In [38]:
from transformers import DataCollatorForTokenClassification
data_collator = DataCollatorForTokenClassification(tokenizer)

In [25]:
%%capture
# Install the necessary dependencies
%pip install datasets
%pip install transformers
%pip install spacy
%pip install torch
%pip install spacy-transformers
%pip install transformers[torch]
%pip install seqeval
%pip install --upgrade pyarrow datasets

## After running this section of code, you need to restart the session and run everything again without running this section of code agan for approach one.

In [39]:
import numpy as np
from datasets import load_dataset, load_metric

metric = load_metric("seqeval")
label_list = {0: "B-O", 1: "B-SPROPA", 2: "I-IPROPA"}
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [40]:
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback

# Training arguments (feel free to play arround with these values)
#model_name = "bert-base-uncased"
epochs = 10
batch_size = 4
learning_rate = 3e-5

args = TrainingArguments(
    f"BERT-finetuned-NER",
    evaluation_strategy ='epoch',
    save_total_limit = 3,
    learning_rate=learning_rate,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=epochs,
    weight_decay=0.001,
    save_strategy = 'epoch',
    metric_for_best_model = 'f1',
    logging_dir = './logs',
    logging_strategy='epoch',
    load_best_model_at_end=True
)

trainer = Trainer(
    model,
    args,
    train_dataset=tokenised_train,
    eval_dataset=tokenised_val,
    data_collator = data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    #callbacks = [EarlyStoppingCallback(early_stopping_patience=3)]
)

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False)


In [41]:
trainer.train()

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,0.8384,0.869238,0.577347,0.791541,0.667686,0.587704
2,0.7277,0.927544,0.588696,0.818127,0.684703,0.596733
3,0.5652,0.863816,0.615427,0.679758,0.645995,0.632416
4,0.382,1.059143,0.611176,0.627795,0.619374,0.616939
5,0.294,1.033333,0.636658,0.713595,0.672934,0.655202
6,0.2161,1.204052,0.613505,0.576435,0.594393,0.594153
7,0.177,1.169393,0.633883,0.673716,0.653193,0.635856
8,0.1311,1.260697,0.616705,0.65136,0.633559,0.621238
9,0.1169,1.280231,0.61975,0.659819,0.639157,0.621238
10,0.1034,1.311073,0.619907,0.643505,0.631485,0.620808


TrainOutput(global_step=1880, training_loss=0.3551950718494172, metrics={'train_runtime': 104.2979, 'train_samples_per_second': 72.005, 'train_steps_per_second': 18.025, 'total_flos': 173364820627716.0, 'train_loss': 0.3551950718494172, 'epoch': 10.0})

In [42]:
# Prepare the test data for evaluation in the same format as the training data
label_list = {0: "B-O", 1: "B-SPROPA", 2: "I-IPROPA"}

predictions, labels, _ = trainer.predict(tokenised_test)
predictions = np.argmax(predictions, axis=2)

# Remove the predictions for the [CLS] and [SEP] tokens
true_predictions = [
    [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]
true_labels = [
    [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]

# Compute multiple metrics on the test restuls
results = metric.compute(predictions=true_predictions, references=true_labels)
results

{'IPROPA': {'precision': 0.03636363636363636,
  'recall': 0.022727272727272728,
  'f1': 0.027972027972027972,
  'number': 88},
 'O': {'precision': 0.6467931345980127,
  'recall': 0.9533954727030626,
  'f1': 0.7707212055974166,
  'number': 1502},
 'SPROPA': {'precision': 0.46116504854368934,
  'recall': 0.45023696682464454,
  'f1': 0.4556354916067147,
  'number': 211},
 'overall_precision': 0.6177777777777778,
 'overall_recall': 0.8489727928928373,
 'overall_f1': 0.7151543498596821,
 'overall_accuracy': 0.6276083467094703}

# Approach Two

## Data Preprocessing for approach two

In [43]:
tokenised_and_word_level_trainset = tokenise_text_and_spans_char_to_word_level((data_one + data_two))
tokenised_and_word_level_testset = tokenise_text_and_spans_char_to_word_level(data_three)
tokenised_and_word_level_validation_set = tokenised_and_word_level_testset[:100]
tokenised_and_word_level_test_set = tokenised_and_word_level_testset[100:]


In [44]:
from transformers import AutoTokenizer

# Initialize the tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")




In [45]:
train_tokenised_text_list, train_span_start_list, train_span_end_list = convert_to_start_end_labels(tokenised_and_word_level_trainset)
val_tokenised_text_list, val_span_start_list, val_span_end_list = convert_to_start_end_labels(tokenised_and_word_level_validation_set)
test_tokenised_text_list, test_span_start_list, test_span_end_list = convert_to_start_end_labels(tokenised_and_word_level_test_set)

In [46]:
trainset = tokenize_and_align_labels_for_start_end(train_tokenised_text_list, train_span_start_list, train_span_end_list, label_all_tokens=False )
valset = tokenize_and_align_labels_for_start_end(val_tokenised_text_list, val_span_start_list, val_span_end_list, label_all_tokens=False )
testset = tokenize_and_align_labels_for_start_end(test_tokenised_text_list, test_span_start_list, test_span_end_list, label_all_tokens=False )

## Libraries for Custom Model

In [47]:
import torch
from torch.utils.data import Dataset
import torch.nn as nn
from transformers import AutoModel

## Dataset

In [48]:
class CustomTokenClassificationDataset(Dataset):
    def __init__(self, input_ids, attention_mask, labels1, labels2):
        self.input_ids = input_ids
        self.attention_mask = attention_mask
        self.labels1 = labels1
        self.labels2 = labels2

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        item = {
            'input_ids': torch.tensor(self.input_ids[idx], dtype=torch.long),
            'attention_mask': torch.tensor(self.attention_mask[idx], dtype=torch.long),
            'labels1': torch.tensor(self.labels1[idx], dtype=torch.long),
            'labels2': torch.tensor(self.labels2[idx], dtype=torch.long)
        }
        return item


In [49]:
train_dataset = CustomTokenClassificationDataset(
    input_ids=trainset['input_ids'],
    attention_mask=trainset['attention_mask'],
    labels1=trainset['span_start'],
    labels2=trainset['span_end']
)

val_dataset = CustomTokenClassificationDataset(
    input_ids=valset['input_ids'],
    attention_mask=valset['attention_mask'],
    labels1=valset['span_start'],
    labels2=valset['span_end']
)

test_dataset = CustomTokenClassificationDataset(
    input_ids=testset['input_ids'],
    attention_mask=testset['attention_mask'],
    labels1=testset['span_start'],
    labels2=testset['span_end']
)

In [50]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

def custom_collate_fn(batch):
    # Extract each component from the batch
    input_ids = [item['input_ids'] for item in batch]
    attention_masks = [item['attention_mask'] for item in batch]
    labels1 = [item['labels1'] for item in batch]
    labels2 = [item['labels2'] for item in batch]

    # Pad sequences
    input_ids_padded = pad_sequence(input_ids, batch_first=True, padding_value=0)
    attention_masks_padded = pad_sequence(attention_masks, batch_first=True, padding_value=0)
    labels1_padded = pad_sequence(labels1, batch_first=True, padding_value=-100)
    labels2_padded = pad_sequence(labels2, batch_first=True, padding_value=-100)

    # Return a dictionary of the padded sequences
    return {
        'input_ids': input_ids_padded,
        'attention_mask': attention_masks_padded,
        'labels1': labels1_padded,
        'labels2': labels2_padded
    }

dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=custom_collate_fn)
validation_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True, collate_fn=custom_collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=custom_collate_fn)

## Model

In [51]:
class DualOutputTokenClassificationModel(nn.Module):
    def __init__(self, model_name, num_labels1, num_labels2):
        super(DualOutputTokenClassificationModel, self).__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.classifier1 = nn.Linear(self.bert.config.hidden_size, num_labels1)
        self.classifier2 = nn.Linear(self.bert.config.hidden_size, num_labels2)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids, attention_mask)
        sequence_output = outputs.last_hidden_state

        logits1 = self.classifier1(sequence_output)
        logits2 = self.classifier2(sequence_output)

        return logits1, logits2


## Training

In [52]:
from sklearn.metrics import precision_recall_fscore_support
import torch.optim as optim


In [54]:
# Instantiate the model
model = DualOutputTokenClassificationModel(model_name='bert-base-uncased', num_labels1=2, num_labels2=2)
#model.train()

# Define loss function and optimizer
loss_fn = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = optim.AdamW(model.parameters(), lr=3e-5)

for epoch in range(15):
    # Training loop
    model.train()  # Set model to training mode
    train_loss = 0.0
    for batch in dataloader:
        optimizer.zero_grad()

        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels1 = batch['labels1']
        labels2 = batch['labels2']

        logits1, logits2 = model(input_ids=input_ids, attention_mask=attention_mask)

        # Calculate loss for both heads
        loss1 = loss_fn(logits1.view(-1, logits1.size(-1)), labels1.view(-1))
        loss2 = loss_fn(logits2.view(-1, logits2.size(-1)), labels2.view(-1))

        # Total loss
        loss = loss1 + loss2
        train_loss += loss.item()

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    avg_train_loss = train_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{5}, Training Loss: {avg_train_loss:.4f}")

    # Validation loop
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    all_preds1, all_labels1 = [], []
    all_preds2, all_labels2 = [], []

    with torch.no_grad():  # Disable gradient computation
        for batch in validation_dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            labels1 = batch['labels1']
            labels2 = batch['labels2']

            logits1, logits2 = model(input_ids=input_ids, attention_mask=attention_mask)

            # Calculate loss for both heads
            loss1 = loss_fn(logits1.view(-1, logits1.size(-1)), labels1.view(-1))
            loss2 = loss_fn(logits2.view(-1, logits2.size(-1)), labels2.view(-1))

            # Total loss
            loss = loss1 + loss2
            val_loss += loss.item()

            # Get the predictions by taking the argmax of the logits
            predictions1 = torch.argmax(logits1, dim=-1)
            predictions2 = torch.argmax(logits2, dim=-1)

            # Flatten the labels and predictions for metric calculations
            mask1 = (labels1 != -100)
            mask2 = (labels2 != -100)

            all_preds1.extend(predictions1[mask1].cpu().numpy())
            all_labels1.extend(labels1[mask1].cpu().numpy())
            all_preds2.extend(predictions2[mask2].cpu().numpy())
            all_labels2.extend(labels2[mask2].cpu().numpy())

    avg_val_loss = val_loss / len(validation_dataloader)

    # Calculate precision, recall, f1 for each class
    precision1, recall1, f1_1, _ = precision_recall_fscore_support(all_labels1, all_preds1, average=None, labels=[0, 1])
    precision2, recall2, f1_2, _ = precision_recall_fscore_support(all_labels2, all_preds2, average=None, labels=[0, 1])

    print(f"Epoch {epoch+1}/{5}, Validation Loss: {avg_val_loss:.4f}")
    print(f"Validation Task 1 - Precision: {precision1}, Recall: {recall1}, F1-Score: {f1_1}")
    print(f"Validation Task 2 - Precision: {precision2}, Recall: {recall2}, F1-Score: {f1_2}")




Epoch 1/5, Training Loss: 0.6288
Epoch 1/5, Validation Loss: 0.4935
Validation Task 1 - Precision: [0.89522629 0.72727273], Recall: [0.99792674 0.04519774], F1-Score: [0.94379085 0.08510638]
Validation Task 2 - Precision: [0.89856876 0.64705882], Recall: [0.99586207 0.06321839], F1-Score: [0.94471704 0.11518325]
Epoch 2/5, Training Loss: 0.4021
Epoch 2/5, Validation Loss: 0.3938
Validation Task 1 - Precision: [0.93520374 0.62992126], Recall: [0.967519  0.4519774], F1-Score: [0.95108696 0.52631579]
Validation Task 2 - Precision: [0.92927308 0.68041237], Recall: [0.97862069 0.37931034], F1-Score: [0.9533087  0.48708487]
Epoch 3/5, Training Loss: 0.2786
Epoch 3/5, Validation Loss: 0.4121
Validation Task 1 - Precision: [0.93413174 0.6446281 ], Recall: [0.97028334 0.44067797], F1-Score: [0.95186441 0.52348993]
Validation Task 2 - Precision: [0.92847854 0.74418605], Recall: [0.98482759 0.36781609], F1-Score: [0.95582329 0.49230769]
Epoch 4/5, Training Loss: 0.1809
Epoch 4/5, Validation Loss:

## Testing

In [55]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

# Set the model to evaluation mode
model.eval()

test_loss = 0.0
all_test_preds1, all_test_labels1 = [], []
all_test_preds2, all_test_labels2 = [], []

with torch.no_grad():  # Disable gradient computation
    for batch in test_dataloader:
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        labels1 = batch['labels1']
        labels2 = batch['labels2']

        logits1, logits2 = model(input_ids=input_ids, attention_mask=attention_mask)

        # Calculate loss for both heads
        loss1 = loss_fn(logits1.view(-1, logits1.size(-1)), labels1.view(-1))
        loss2 = loss_fn(logits2.view(-1, logits2.size(-1)), labels2.view(-1))

        # Total loss
        loss = loss1 + loss2
        test_loss += loss.item()

        # Get the predictions by taking the argmax of the logits
        predictions1 = torch.argmax(logits1, dim=-1)
        predictions2 = torch.argmax(logits2, dim=-1)

        # Mask to ignore padding and non-relevant tokens
        mask1 = (labels1 != -100)
        mask2 = (labels2 != -100)

        all_test_preds1.extend(predictions1[mask1].cpu().numpy())
        all_test_labels1.extend(labels1[mask1].cpu().numpy())
        all_test_preds2.extend(predictions2[mask2].cpu().numpy())
        all_test_labels2.extend(labels2[mask2].cpu().numpy())

avg_test_loss = test_loss / len(test_dataloader)

# Calculate precision, recall, f1 for each class
precision1, recall1, f1_1, _ = precision_recall_fscore_support(all_test_labels1, all_test_preds1, average=None, labels=[0, 1])
precision2, recall2, f1_2, _ = precision_recall_fscore_support(all_test_labels2, all_test_preds2, average=None, labels=[0, 1])

# Calculate overall F1 score (micro average) and accuracy for each task
f1_1_macro = precision_recall_fscore_support(all_test_labels1, all_test_preds1, average='macro')[2]
f1_2_macro = precision_recall_fscore_support(all_test_labels2, all_test_preds2, average='macro')[2]

accuracy1 = accuracy_score(all_test_labels1, all_test_preds1)
accuracy2 = accuracy_score(all_test_labels2, all_test_preds2)

print(f"Test Loss: {avg_test_loss:.4f}")
print(f"label 1 - Precision: {precision1}, Recall: {recall1}, F1-Score: {f1_1}")
print(f"label 1 - Overall F1-Score (Macro): {f1_1_macro:.4f}, Accuracy: {accuracy1:.4f}")
print(f"label 2 - Precision: {precision2}, Recall: {recall2}, F1-Score: {f1_2}")
print(f"label 2 - Overall F1-Score (Macro): {f1_2_macro:.4f}, Accuracy: {accuracy2:.4f}")


Test Loss: 0.8099
label 1 - Precision: [0.95286624 0.5030303 ], Recall: [0.94803549 0.52866242], F1-Score: [0.95044473 0.51552795]
label 1 - Overall F1-Score (Macro): 0.7330, Accuracy: 0.9101
label 2 - Precision: [0.96197007 0.65648855], Recall: [0.97166247 0.58503401], F1-Score: [0.96679198 0.61870504]
label 2 - Overall F1-Score (Macro): 0.7927, Accuracy: 0.9389


# Sequence Classifcation

## Preprocessing

In [56]:
## Functions

In [57]:
def tokenisation_list(lists):
    sample = []
    for text in lists:
        tokenised = tokenisation(text)
        sample.append(tokenised)
    return sample

In [58]:
def retrieve_empty_labels(data):
    empty_x = []

    for text in data:
        if not text['labels']:
            empty_x.append(tokenisation(text['text']))

    return empty_x

### Trainset

In [80]:
x_train, y_train = df_train['text_fragment'], df_train['technique']
x_train_dev, y_train_dev = df_dev['text_fragment'], df_dev['technique']

x_train, y_train = x_train.tolist(), y_train.tolist()
x_train_dev, y_train_dev = x_train_dev.tolist(), y_train_dev.tolist()

x_train.extend(x_train_dev)
y_train.extend(y_train_dev)

x_train = tokenisation_list(x_train)

In [81]:
empty_x = []
empty_x = retrieve_empty_labels(data_one)

In [82]:
empty_y = []
for i in range(len(empty_x)):
    empty_y.append('Non-propaganda')

In [83]:
x_train.extend(empty_x)
y_train.extend(empty_y)

In [84]:
empty_x = []
empty_x = retrieve_empty_labels(data_two)

empty_y = []
for i in range(len(empty_x)):
    empty_y.append('Non-propaganda')

In [85]:
x_train.extend(empty_x)
y_train.extend(empty_y)

### Validation and Test set

In [86]:
x_test, y_test = df_test['text_fragment'], df_test['technique']
x_test, y_test = x_test.tolist(), y_test.tolist()

x_test = tokenisation_list(x_test)

In [87]:
x_val = x_test[:200]
x_test = x_test[200:]
y_val = y_test[:200]
y_test = y_test[200:]

In [88]:
empty_x = []
empty_x = retrieve_empty_labels(data_three)

empty_y = []
for i in range(len(empty_x)):
    empty_y.append('Non-propaganda')

In [89]:
empty_x_val = empty_x[:19]
empty_x_test = empty_x[19:]
empty_y_val = empty_y[:19]
empty_y_test = empty_y[19:]

In [90]:
x_val.extend(empty_x_val )
y_val.extend(empty_y_val)
x_test.extend(empty_x_test)
y_test.extend(empty_y_test)

## Tokenisation (Tokenise, Attetion Mask and Label format)

In [91]:
## numerical format manually

label_to_id = {
    "Loaded Language": 0,
    "Name calling/Labeling": 1,
    "Smears": 2,
    "Exaggeration/Minimisation": 3,
    "Slogans": 4,
    "Doubt": 5,
    "Appeal to fear/prejudice": 6,
    "Whataboutism": 7,
    "Glittering generalities (Virtue)": 8,
    "Flag-waving": 9,
    "Causal Oversimplification": 10,
    "Repetition": 11,
    "Thought-terminating cliché": 12,
    "Misrepresentation of Someone's Position (Straw Man)": 13,
    "Black-and-white Fallacy/Dictatorship": 14,
    "Appeal to authority": 15,
    "Reductio ad hitlerum": 16,
    "Obfuscation, Intentional vagueness, Confusion": 17,
    "Bandwagon": 18,
    "Presenting Irrelevant Data (Red Herring)": 19,
    "Non-propaganda": 20
}

# Encode the training, validation, and test labels
y_train_encoded = [label_to_id[label] for label in y_train]
y_val_encoded = [label_to_id[label] for label in y_val]
y_test_encoded = [label_to_id[label] for label in y_test]

In [92]:
# Example: Check the first few elements of x_train
print(x_train[:5])


[['THERE', 'ARE', 'ONLY', 'TWO', 'GENDERS', 'FEMALE', 'MALE'], ['POWER', 'COMES', 'FROM', 'THE', 'BARREL', 'OF', 'A', 'GUN'], ['BERNIE', 'BROS'], ['VIOLENCE'], ['MASS', 'SHOOTER']]


In [93]:
# Join the tokens back into strings
x_train_joined = [' '.join(tokens) for tokens in x_train]
x_val_joined = [' '.join(tokens) for tokens in x_val]
x_test_joined = [' '.join(tokens) for tokens in x_test]


In [94]:
from transformers import BertTokenizer, RobertaTokenizer

# Load the BERT tokenizer
#tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
# Tokenize the data
train_encodings = tokenizer(x_train_joined, padding=True, truncation=True, max_length=128, return_tensors='pt')
val_encodings = tokenizer(x_val_joined, padding=True, truncation=True, max_length=128, return_tensors='pt')
test_encodings = tokenizer(x_test_joined, padding=True, truncation=True, max_length=128, return_tensors='pt')




tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

In [95]:
import torch
from torch.utils.data import Dataset

class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# Create the dataset objects
train_dataset = CustomDataset(train_encodings, y_train_encoded)
val_dataset = CustomDataset(val_encodings, y_val_encoded)
test_dataset = CustomDataset(test_encodings, y_test_encoded)

### Model Training

In [96]:
from sklearn.metrics import accuracy_score, f1_score
import numpy as np

def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)

    accuracy = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='weighted')

    return {
        'accuracy': accuracy,
        'f1': f1,
    }


In [107]:
from transformers import BertForSequenceClassification, Trainer, TrainingArguments, RobertaForSequenceClassification

# Load a pre-trained BERT model with a classification head
#model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=21)
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=21)
# Define training arguments
epoch = 30
decay = 0.1
l_r = 5e-5

training_args = TrainingArguments(
    output_dir='./results',          # Output directory for saving model checkpoints
    num_train_epochs=epoch,              # Number of training epochs
    per_device_train_batch_size=16,  # Batch size per device during training
    per_device_eval_batch_size=16,   # Batch size for evaluation
    warmup_steps=500,                # Number of warmup steps for learning rate scheduler
    weight_decay=decay,               # Strength of weight decay
    logging_dir='./logs',            # Directory for storing logs
    logging_steps=10,                # Log every 10 steps
    evaluation_strategy="epoch",     # Evaluate at the end of each epoch
    save_strategy="epoch",           # Save the model at the end of each epoch
    learning_rate = l_r
)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics
)


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.out_proj.bias', 'classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False)


In [108]:
trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy,F1
1,2.1322,2.068942,0.374429,0.252958
2,1.6289,1.78364,0.447489,0.396959
3,1.5237,1.684415,0.484018,0.418638
4,1.3412,1.638495,0.461187,0.412662
5,1.1893,1.549683,0.534247,0.517863
6,0.7679,1.546371,0.552511,0.520973
7,0.8243,1.578929,0.538813,0.539198
8,0.5363,1.618134,0.538813,0.541698
9,0.6855,1.671058,0.561644,0.557882
10,0.5604,1.865369,0.547945,0.54123


Checkpoint destination directory ./results/checkpoint-115 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-230 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-345 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-460 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-575 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-690 already exists and is non-empty.Saving will proceed but saved results may be invalid.
Checkpoint destination directory ./results/checkpoint-805 already exists and is non-empty.Saving will proceed but saved results may be i

TrainOutput(global_step=3450, training_loss=0.5807988569356393, metrics={'train_runtime': 371.5936, 'train_samples_per_second': 148.388, 'train_steps_per_second': 9.284, 'total_flos': 2947428768173760.0, 'train_loss': 0.5807988569356393, 'epoch': 30.0})

In [109]:
# Assuming you have a test dataset ready
predictions = trainer.predict(test_dataset)

# Get the logits (raw model outputs), and convert them to probabilities (optional, for ROC curve)
logits = predictions.predictions
probabilities = torch.softmax(torch.tensor(logits), dim=-1).numpy()

# Get the predicted class indices
predicted_labels = logits.argmax(axis=1)

# True labels
true_labels = predictions.label_ids


In [110]:
target_names = list(label_to_id.keys())  # Get the class names in order

In [111]:
from sklearn.metrics import classification_report

# Generate a classification report
target_names = list(label_to_id.keys())  # Get the class names in order
report = classification_report(true_labels, predicted_labels)

print("Classification Report:\n", report)



Classification Report:
               precision    recall  f1-score   support

           0       0.84      0.89      0.86        81
           1       0.68      0.54      0.60        35
           2       0.38      0.50      0.43        24
           3       0.50      0.40      0.44        10
           4       0.00      0.00      0.00         8
           5       0.33      0.27      0.30        11
           6       0.25      0.25      0.25         4
           7       0.25      0.14      0.18         7
           8       0.33      0.20      0.25         5
           9       1.00      1.00      1.00         1
          10       0.14      1.00      0.25         2
          11       0.00      0.00      0.00         0
          12       0.25      0.33      0.29         3
          13       0.00      0.00      0.00         1
          14       0.00      0.00      0.00         2
          15       0.50      0.33      0.40         3
          16       0.00      0.00      0.00         2
   

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
