### Code to use custom features for training

In [None]:
### Change the variable model_name to whichever model you wish to use. Ensure to make other relevant changes (some models may not require any change). 
### The required changes have to be made inside LanguageModel class, change self.model = '' to appropriate model initialization, and in intializing the tokenizer.
### Current code includes POS, and entity mentions as features. Additinally it uses projections of contextual embeddings to the moral vector subspace as features. 

In [None]:
import pandas as pd
import numpy as np
import sys

import spacy
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from sklearn.model_selection import train_test_split

from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from torch.optim import AdamW
from transformers import get_scheduler

import torch.nn as nn
from sklearn.metrics import f1_score
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings('ignore')

# import nltk
# nltk.download('vader_lexicon')

from nltk.sentiment.vader import SentimentIntensityAnalyzer

sid = SentimentIntensityAnalyzer()
device = torch.device('cuda:0')

In [None]:
def mask_entities(sentences: pd.Series) -> pd.Series:
    def mask(sentence):
        doc = nlp(sentence)
        masked = sentence
        for ent in doc.ents:
            masked = masked.replace(ent.text, "<mask>")
        return masked
    return sentences.apply(mask)

In [None]:
def get_data(attr):
    
    df = pd.read_csv(f'data/Label_{attr}.csv')
    df['headline'] = mask_entities(df['headline'])
    
    return df

# df = get_data('1')
# df.label.value_counts()

In [None]:
def get_word_embeddings(wordlist, layer = 'mp', l = 1):

    embeddings = []
    
    model_name = 'bert-base-cased'
    model = AutoModel.from_pretrained(model_name, output_hidden_states = True, output_attentions = True)
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    model.to(device)
    model.eval()
    for word in wordlist:
    
        tokenized_inputs = tokenizer(word, return_tensors='pt', padding=True, truncation=True).to(device)
        emb = None

        # Get BERT model outputs
        with torch.no_grad():
            outputs = model(**tokenized_inputs)
            
        # print(outputs.keys())

        if layer == None:
            first_layer = outputs.hidden_states[l]
            emb = torch.mean(first_layer, dim=1).detach().cpu().numpy()

        elif layer == 'ls':
            last_hidden_states = outputs.last_hidden_state
            emb = torch.mean(last_hidden_states, dim=1).detach().cpu().numpy()
            
        elif layer == 'po':
            emb = outputs.pooler_output.detach().cpu().numpy()

        else:
            # print("Here ", outputs.keys())
            hs = outputs.hidden_states
            emb = torch.stack([x.mean(axis = 1) for x in hs]).mean(axis = 0).detach().cpu().numpy()
            
        embeddings.append(np.squeeze(emb))
         
    return np.array(embeddings)


# lex_df = pd.read_csv('/home/ahaque2/resources/NRC-VAD-Lexicon/NRC-VAD-Lexicon.csv')
lex_df = pd.read_csv('data/eMFD_wordlist.csv')
all_words = lex_df.word.tolist()

emb = get_word_embeddings(all_words)

emb_dict = dict()
for word, e in zip(all_words, emb):
    emb_dict[word] = e
    
model_name = 'bert-base-cased'
model = AutoModel.from_pretrained(model_name, output_hidden_states = True, output_attentions = True)
    
from Word_Pairs import Word_Pairs
Word_Pairs_Class = Word_Pairs(model, emb_dict, lex_df)

def get_subspace(high, low, num_keep, attr):
    
    word_pairs = Word_Pairs_Class.get_word_pairs_with_scores(high, low, num_keep, attr)

    emb1 = [emb_dict[w] for w in word_pairs.Word1.tolist()]
    emb2 = [emb_dict[w] for w in word_pairs.Word2.tolist()]

    subspace, subspace_10 = Word_Pairs_Class.get_subspace(emb1, emb2, 10)

    return subspace_10


params = [(275, 475, 175, 'care_sent'), (350, 475, 325, 'fairness_sent'), (300, 375, 175, 'loyalty_sent'), (350, 475, 275, 'authority_sent'), 
          (275, 375, 225, 'sanctity_sent')]

subspace = []
for p in params:
    sub = get_subspace(p[0], p[1], p[2], p[3])
    subspace.append(sub)

In [None]:
model_name = 'bert-base-cased'

# Load spaCy NLP model for feature extraction
nlp = spacy.load("en_core_web_sm")
tokenizer = AutoTokenizer.from_pretrained(model_name)

class CustomBERT(nn.Module):
    def __init__(self, num_class, bert_model_name=model_name, feature_dim=10, dep_embed_dim=10):
        super(CustomBERT, self).__init__()
        self.bert = AutoModel.from_pretrained(bert_model_name, output_hidden_states = True, output_attentions = True)
        self.dep_embedding = nn.Embedding(10, dep_embed_dim)  # Adjust vocab size
        self.classifier = nn.Linear(self.bert.config.hidden_size + feature_dim + dep_embed_dim + 50, num_class)
        # self.classifier = nn.Linear(self.bert.config.hidden_size, num_class)

    def forward(self, input_ids, attention_mask, features, dep_indices, sub_features):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # last_hidden_state_cls = outputs.last_hidden_state[:, 0, :]  # CLS token output
        hs = outputs.hidden_states
        emb = torch.stack([x.mean(axis = 1) for x in hs]).mean(axis = 0)

        # Get dependency embeddings
        dep_embeds = self.dep_embedding(dep_indices).mean(dim=1)

        combined = torch.cat((emb, sub_features, features.float(), dep_embeds), dim=1)
        # print(combined.shape)
        return self.classifier(combined)

In [None]:
def extract_features(text, entity_vocab, max_len=10):
    doc = nlp(text)
    pos_counts = [sum(1 for token in doc if token.pos_ == tag) for tag in ["NOUN", "VERB", "ADJ"]]
    dep_counts = [sum(1 for token in doc if token.dep_ == dep) for dep in ["nsubj", "dobj", "pobj"]]
    entity_indices = [entity_vocab.get(ent.label_, 0) for ent in doc.ents]
    
    sentiment_scores = sid.polarity_scores(text)
    sentiment_scores = [sentiment_scores[x] for x in sentiment_scores]
    # Pad or truncate entity indices
    entity_indices = entity_indices[:max_len] + [0] * (max_len - len(entity_indices))
    
    return torch.tensor(pos_counts + dep_counts + sentiment_scores, dtype=torch.float32), torch.tensor(entity_indices, dtype=torch.long)

entity_vocab = {"PERSON": 1, "ORG": 2, "GPE": 3}

In [None]:
def get_dataset(sent, labels, feat1, feat2, sub_feat):
    
    tokens = tokenizer(sent, return_tensors="pt", padding=True, truncation=True).to(device)
    # tokens = tokenizer(sent, return_tensors="pt", padding='max_length', truncation=True)
    labels = torch.tensor(labels)
    # print(labels)
    # sys.exit()
    data = TensorDataset(tokens['input_ids'], tokens['attention_mask'], labels, feat1, feat2, sub_feat)

    return DataLoader(data, batch_size = 64)

def get_perf(dataloader):
    
    total_val_f1 = 0
    for batch in dataloader:
        
        # emb = torch.tensor([emb])
        input_ids, attention_mask, labels, feat1, feat2, sub_feat = [x.to(device) for x in batch]
        # print(tok)
        labels = labels.float()
        
        with torch.no_grad():
            logits = model(input_ids, attention_mask, feat1, feat2, sub_feat)
            
        preds = torch.argmax(logits, dim=-1)
        
        total_val_f1 += f1_score(labels.cpu(), preds.cpu(), average = 'weighted')
        
    avg_val_f1 = total_val_f1 / len(dataloader)
    
    return avg_val_f1

In [None]:
torch.manual_seed(0)
# model_name = 'bert-base-cased'
# model_name = 'roberta-base'
# model_name = 'daodao/ConflictBERT'
# model_name = 'snowood1/ConfliBERT-scr-cased'
# model_name = 'facebook/bart-base'
# model_name = 'answerdotai/ModernBERT-base'
# model_name = 'meta-llama/Llama-3.2-3B'
# model_name = 'answerdotai/ModernBERT-base'
# tokenizer = AutoTokenizer.from_pretrained(model_name)

# siamese_training = 'verb_subspace'
# 'head_cont_and_verb_subspace'
# verb_subspace
# 'simple_contrastive'

# for siamese_training in ['verb_subspace', 'head_cont_and_verb_subspace', 'simple_contrastive']:

siamese_training = None
# attr = '4c'
# for attr in ['1', '2a', '2b', '3a', '3b', '4a', '4b', '4c']:
for attr in ['emotive', 'effects', 'source', 'context', 'villain', 'victim']:
        # ['1', '4a', '4b']:
        # 
        # for attr in ['1', '3a', '3b']:
    # data = get_data(attr)
    data = pd.read_csv(f'data/Label_{attr}.csv')
    X_train, X_test, train_label, test_label = train_test_split(data.headline, data.label, test_size=0.2, stratify = data.label, random_state = 42)

    train_features = [extract_features(t, entity_vocab) for t in X_train.tolist()]
    pos_features_train = torch.stack([x[0] for x in train_features], dim = 0)
    dep_features_train = torch.stack([x[1] for x in train_features], dim = 0)
    
    emb_train = get_word_embeddings(X_train.tolist())
    sub_feat = [torch.tensor(emb_train @ sub) for sub in subspace]
    sub_feat_train = torch.stack(sub_feat, dim = 1).reshape(emb_train.shape[0], -1)
    
    emb_test = get_word_embeddings(X_test.tolist())
    sub_feat = [torch.tensor(emb_test @ sub) for sub in subspace]
    sub_feat_test = torch.stack(sub_feat, dim = 1).reshape(emb_test.shape[0], -1)
    
    # print(sub_feat_train.shape, sub_feat_test.shape)
    # sys.exit()

    test_features = [extract_features(t, entity_vocab) for t in X_test.tolist()]
    pos_features_test = torch.stack([x[0] for x in test_features], dim = 0)
    dep_features_test = torch.stack([x[1] for x in test_features], dim = 0)
    # pos_features = pos_features.unsqueeze(0)  # Add batch dim
    # dep_indices = dep_indices.unsqueeze(0)  # Add batch dim

    # train_features = torch.stack([extract_features(t) for t in X_train.tolist()], dim = 0)
    # test_features = torch.stack([extract_features(t) for t in X_test.tolist()], dim = 0)

    X_train = mask_entities(X_train)
    X_test = mask_entities(X_test)

    train_dataloader = get_dataset(X_train.tolist(), train_label.tolist(), pos_features_train, dep_features_train, sub_feat_train)
    test_dataloader = get_dataset(X_test.tolist(), test_label.tolist(), pos_features_test, dep_features_test, sub_feat_test)

    num_classes = len(set(train_label.tolist()))
    model = CustomBERT(num_classes)
    # model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(set(train_label.tolist())))
    model.to(device)
    'Model Loaded!'

    optimizer = AdamW(model.parameters(), lr = 5e-5, eps = 1e-8, weight_decay=0.05)
    loss_fn = nn.CrossEntropyLoss()

    epochs = 100
    num_training_steps = epochs * 8
    lr_scheduler = get_scheduler(
                name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps
            )
    loss_values = []
    train_perf, test_perf = [], []

    progress_bar = tqdm(range(num_training_steps))
    for epoch in range(epochs):

        model.train()
        batch_loss_values = []

        np.random.seed(epoch)
        torch.manual_seed(epoch)
        # print('Here')
        torch.cuda.manual_seed_all(epoch)
        # random.seed(epoch)
        random_ids = np.random.randint(len(train_dataloader.dataset), size=512)
        temp_train_dataloader = train_dataloader.dataset[random_ids]
        # dataset = TensorDataset(temp_train_dataloader[0], temp_train_dataloader[1])
        dataset = TensorDataset(temp_train_dataloader[0], temp_train_dataloader[1], temp_train_dataloader[2], temp_train_dataloader[3], temp_train_dataloader[4], temp_train_dataloader[5])
        sampler = RandomSampler(dataset)
        dataloader = DataLoader(dataset, batch_size=64, sampler = sampler)

        for batch in dataloader:

            model.zero_grad()

            # emb = torch.tensor([emb])
            input_ids, attention_mask, labels, feat1, feat2, sub_feat = [x.to(device) for x in batch]
            # print(tok)

            logits = model(input_ids, attention_mask, feat1, feat2, sub_feat)

            # logits = logits.squeeze(-1)
            loss = loss_fn(logits, labels)

            reg_lambda = 0.00001
            # l1_loss = sum([p.abs().sum() for p in model.parameters()])
            l2_loss = sum([(p ** 2).sum() for p in model.parameters()])
            loss = reg_lambda * l2_loss + loss

            loss.backward()
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

            progress_bar.update(1)

            batch_loss_values.append(loss.item())

        model.eval()

        tr_perf = get_perf(train_dataloader)
        tst_perf = get_perf(test_dataloader)

        train_perf.append(tr_perf)
        test_perf.append(tst_perf)

        avg_batch_loss = sum(batch_loss_values)/len(batch_loss_values)

        # print(f'Epoch {epoch} completed! Training Loss: {avg_batch_loss}. Training F1: {tr_perf} and Testing F1: {tst_perf}')
    temp_df = pd.DataFrame({'train': train_perf, 'test': test_perf})
    temp_df.to_csv(f'results/exp/bert_base_cased_label_{attr}.csv')
    torch.save(model, f"models/exp/bert_base_cased_label_{attr}.pth")

In [None]:
temp_df.sort_values('test')

In [None]:
temp_df.sort_values('test')