In [3]:
%load_ext autoreload
%autoreload 2

from concept_gradient_v2 import ConceptGradients

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [25]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification
import torch.nn.functional as F
import torch.nn as nn
from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import torch
import os
from datasets import Dataset
from concept_gradient_v2 import ConceptGradients
from torch.utils.data import DataLoader

In [26]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class X2YModel(nn.Module):
    def __init__(self, model_name='./target_model_he/checkpoint-150', num_classes=2):
        super(X2YModel, self).__init__()
        self.model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=num_classes)
        
    def forward(self, input_ids=None, attention_mask=None, inputs_embeds=None):
        if inputs_embeds is not None:
            outputs = self.model.roberta(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        else:
            outputs = self.model.roberta(input_ids=input_ids, attention_mask=attention_mask)
        
        return self.model.classifier(outputs.last_hidden_state)  


class X2CModel(nn.Module):
    def __init__(self, model_name='./concept_model_he/checkpoint-1900', num_concepts=3):
        super(X2CModel, self).__init__()
        self.model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=num_concepts, ignore_mismatched_sizes=True).to('cuda')

    def forward(self, input_ids=None, attention_mask=None, inputs_embeds=None):
        if inputs_embeds is not None:
            outputs = self.model.roberta(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        else:
            outputs = self.model.roberta(input_ids=input_ids, attention_mask=attention_mask)
        
        return self.model.classifier(outputs.last_hidden_state)

x2y_model = X2YModel().to(device)
x2c_model = X2CModel().to(device)

In [27]:
def forward_func(embeddings, attention_mask):
    output = x2y_model(inputs_embeds=embeddings, attention_mask=attention_mask)
    return output

def concept_forward_func(embeddings, attention_mask):
    output = x2c_model(inputs_embeds=embeddings, attention_mask=attention_mask)
    return output

In [28]:
cg = ConceptGradients(forward_func, concept_forward_func=concept_forward_func, x2y_model=x2y_model, x2c_model=x2c_model)

def calculate_concept_gradient(input_ids, attention_mask, target_index, concept_index, mode):
    input_ids = input_ids.to(device)
    attention_mask = attention_mask.to(device)  

    with torch.no_grad():
        embeddings = x2y_model.model.get_input_embeddings()(input_ids)
        
    embeddings.requires_grad_(True)
    attention_mask = attention_mask.float()
    attention_mask.requires_grad_(True)
    attr = cg.attribute(
        (embeddings, attention_mask),
        mode=mode,
        target=target_index,
        target_concept=concept_index,
        n_concepts=3,
        target_layer_name='roberta.encoder.layer.11.output.dense',
        concept_layer_name='roberta.encoder.layer.11.output.dense',
    )

    return attr

In [29]:
df_test = pd.read_csv('../data/hatexplain_test.csv')
df_test = df_test.drop(columns=['post_id', 'Miscellaneous', 'Sexual Orientation'])
# df_test = df_test.sample(1500, random_state=42)

In [30]:
df_test['concept_labels'] = df_test[['Race', "Religion", 'Gender']].values.tolist()

In [31]:
df_test.sample(10)

Unnamed: 0,post_text,label,Race,Religion,Gender,concept_labels
699,you niggas faggots putting you all hands on fe...,1.0,1.0,0.0,0.0,"[1.0, 0.0, 0.0]"
1121,as for jews revenge is the only answer i blame...,1.0,0.0,1.0,0.0,"[0.0, 1.0, 0.0]"
294,the reply i and everyone else who complained g...,0.0,0.0,0.0,0.0,"[0.0, 0.0, 0.0]"
318,different caliber rifles being fired shoddy ai...,1.0,1.0,0.0,0.0,"[1.0, 0.0, 0.0]"
35,idiot boy trudeau his somali moslem immigratio...,1.0,1.0,1.0,0.0,"[1.0, 1.0, 0.0]"
48,grown ass men doing women activities and lil f...,1.0,0.0,0.0,0.0,"[0.0, 0.0, 0.0]"
687,brits you should maybe storm the prison britai...,1.0,1.0,1.0,1.0,"[1.0, 1.0, 1.0]"
1024,whoopi goldberk aka caryn johnson is a stupid ...,1.0,1.0,0.0,1.0,"[1.0, 0.0, 1.0]"
837,<user> said nazis look cool pretty much,0.0,1.0,1.0,0.0,"[1.0, 1.0, 0.0]"
8,the labour party has <number> mps in prison fo...,1.0,0.0,1.0,0.0,"[0.0, 1.0, 0.0]"


In [32]:

df_test.drop(columns=['Race', 'Religion', 'Gender'], inplace=True, axis=1)
ds_test = Dataset.from_pandas(df_test)
df_test.head()

Unnamed: 0,post_text,label,concept_labels
0,i dont think im getting my baby them white 9 h...,0.0,"[0.0, 0.0, 0.0]"
1,laura loomer raped me while screaming at me in...,1.0,"[0.0, 1.0, 0.0]"
2,<user> what did the old lady do was she a nazi...,0.0,"[0.0, 0.0, 0.0]"
3,as much as i appreciate a format of open discu...,0.0,"[1.0, 0.0, 0.0]"
4,<user> <user> <user> with a room full of white...,0.0,"[0.0, 0.0, 0.0]"


In [33]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

def tokenize_function(examples):
    return tokenizer(examples["post_text"], padding="max_length", truncation=True)

tokenized_dataset = ds_test.map(tokenize_function, batched=True)
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask", "concept_labels", "label"])

x2y_dl_test = DataLoader(tokenized_dataset, batch_size=8, shuffle=False)



Map:   0%|          | 0/1186 [00:00<?, ? examples/s]

In [37]:
results = []
x2y_dl_test = DataLoader(tokenized_dataset, batch_size=8, shuffle=False)
for batch in tqdm(x2y_dl_test, leave=True):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['label'].long().to(device)
    concept_labels = batch['concept_labels']

    target_logits = x2y_model(input_ids=input_ids, attention_mask=attention_mask)
    target_preds = torch.argmax(target_logits, dim=-1)

    incorrect_indices = (target_preds == labels).nonzero(as_tuple=True)[0]

    if len(incorrect_indices) > 0:
        for idx in incorrect_indices:
            sample_input_ids = input_ids[idx].unsqueeze(0)
            sample_attention_mask = attention_mask[idx].unsqueeze(0)
            sample_label = int(labels[idx].item())
            sample_concept_label = concept_labels[idx].unsqueeze(0)
            sample_sentence = tokenizer.decode(sample_input_ids.squeeze(), skip_special_tokens=True)

            concept_gradient = calculate_concept_gradient(sample_input_ids, sample_attention_mask, target_index=sample_label, concept_index=None, mode='chain_rule_independent')
            concept_gradient = concept_gradient[0].detach().cpu().numpy()

            concept_logits = x2c_model(input_ids=sample_input_ids, attention_mask=sample_attention_mask)
            concept_logits = torch.sigmoid(concept_logits).detach().cpu().numpy()

            target_logits_final = x2y_model(input_ids=sample_input_ids, attention_mask=sample_attention_mask)
            target_logits_final = torch.softmax(target_logits_final, dim=-1).detach().cpu().numpy()

            results.append({
                "sentence": sample_sentence,
                "target_logits": target_logits_final,
                "concept_logits": concept_logits,
                "concept_gradient": concept_gradient,
                "label": sample_label,
                "concept_label": sample_concept_label.cpu().numpy()  
            })
            
df_results = pd.DataFrame(results)
df_results.to_csv("misclassified_samples/cs_cg.csv", index=False)

print("Process complete. Results saved to 'misclassified_samples_with_concept_gradients.csv'")


  0%|          | 0/149 [00:00<?, ?it/s]

Process complete. Results saved to 'misclassified_samples_with_concept_gradients.csv'


In [38]:
pd.set_option('display.max_colwidth', None)
df_results.sample(5)

Unnamed: 0,sentence,target_logits,concept_logits,concept_gradient,label,concept_label
358,fun fact did you know kardashian is armenian for nigger lover,"[[0.005222132, 0.9947779]]","[[0.9441051, 0.41052046, 0.060839996]]","[[0.057885244, 0.18912065, -0.017316096]]",1,"[[1.0, 0.0, 0.0]]"
140,does anyone get the feeling that nick the spic fuentes is like trying to court <number> something internet savvy boomer conservatives as if they were a thing,"[[0.012301511, 0.98769844]]","[[0.52831227, 0.5304703, 0.12408364]]","[[0.014800973, 0.16851409, -0.0020840436]]",1,"[[1.0, 0.0, 0.0]]"
356,knock off a cracker for callin me towelhead,"[[0.19415505, 0.8058449]]","[[0.56199807, 0.012359233, 0.49713323]]","[[0.007912772, 0.25178832, 0.009786654]]",1,"[[0.0, 0.0, 0.0]]"
21,<user> <user> <user> millions of yemeni children starve because of trump policies republicans i sleep a woman gets raped and does not want to carry his baby republicans real shit,"[[0.9601562, 0.03984379]]","[[0.03316694, 0.039621964, 0.37312835]]","[[0.19554447, -0.074995644, -0.023832707]]",0,"[[0.0, 0.0, 1.0]]"
1034,gta san andreas the story of an above average somewhat talented black man who escapes the ghetto and becomes reasonably successful only to be dragged back in in the final act because his brother is a stubborn dumbass -_-,"[[0.96210974, 0.03789029]]","[[0.5966021, 0.0057812273, 0.37690967]]","[[0.2581864, -0.15948421, -0.054642722]]",0,"[[1.0, 0.0, 0.0]]"


In [10]:
df_results.shape

(232, 6)

In [10]:
import torch
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# Initialize lists to store predictions and true labels for each concept
concept_predictions = [[] for _ in range(5)]
concept_true_labels = [[] for _ in range(5)]

for batch in tqdm(x2y_dl_test, leave=True):
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['toxicity'].to(device)
    concept_labels = batch['concept_labels'].to(device)
    
    # Compute logits for concept model
    concept_logits = x2c_model(input_ids, attention_mask=attention_mask)
    
    # Apply sigmoid to get probabilities
    concept_probs = torch.sigmoid(concept_logits)
    
    # Convert probabilities to binary predictions (0 or 1)
    concept_preds = (concept_probs > 0.5).int()
    
    # Store predictions and true labels for each concept
    for i in range(5):
        concept_predictions[i].extend(concept_preds[:, i].cpu().numpy())
        concept_true_labels[i].extend(concept_labels[:, i].cpu().numpy())

# Compute accuracy for each concept
concept_accuracies = []
for i in range(5):
    accuracy = accuracy_score(concept_true_labels[i], concept_predictions[i])
    concept_accuracies.append(accuracy)

# Print accuracies for each concept
for i, accuracy in enumerate(concept_accuracies):
    print(f"Concept {i+1} Accuracy: {accuracy:.4f}")

100%|██████████| 1250/1250 [02:29<00:00,  8.36it/s]

Concept 1 Accuracy: 0.8894
Concept 2 Accuracy: 0.8424
Concept 3 Accuracy: 0.7192
Concept 4 Accuracy: 0.9420
Concept 5 Accuracy: 0.7536





In [21]:
from sklearn.metrics import f1_score
concept_f1 = []
for i in range(5):
    accuracy = f1_score(concept_true_labels[i], concept_predictions[i])
    concept_f1.append(accuracy)

# Print accuracies for each concept
for i, accuracy in enumerate(concept_f1):
    print(f"Concept {i+1} F1: {accuracy:.4f}")

Concept 1 F1: 0.8395
Concept 2 F1: 0.3226
Concept 3 F1: 0.3728
Concept 4 F1: 0.9240
Concept 5 F1: 0.3827
