In [1]:
from datasets import load_dataset,Dataset
# Use a pipeline as a high-level helper
from transformers import pipeline
import torch
import os
import random
import numpy as np
from tqdm import tqdm
import json
import argparse
import logging
from transformers import AutoTokenizer, AutoModelForSequenceClassification,Trainer,TrainingArguments
from pprint import pprint
import matplotlib.pyplot as plt
from collections import Counter
from torch.utils.data import DataLoader
from tqdm import tqdm
from collections import Counter
from evaluate import load

#fix random seed for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)


<torch._C.Generator at 0x124c3a610>

In [27]:

tokenizer = AutoTokenizer.from_pretrained("tum-nlp/bert-hateXplain")
model = AutoModelForSequenceClassification.from_pretrained("tum-nlp/bert-hateXplain")

def find_similar_whole_word(word,model, top_k=10, exclude_subwords=False):

    embedding_matrix = model.get_input_embeddings().weight  # shape: (vocab_size, hidden_dim)
    # Tokenize the word (may split into subwords)
    token_ids = tokenizer.encode(word, add_special_tokens=False)

    # Get embedding(s) for the subword tokens
    token_embeddings = embedding_matrix[token_ids]  # shape: (n_subwords, hidden_dim)

    # Average the embeddings to represent the full word
    word_embedding = token_embeddings.mean(dim=0, keepdim=True)  # shape: (1, hidden_dim)
    word_embedding_norm = torch.nn.functional.normalize(word_embedding, dim=1)
    embedding_matrix_norm = torch.nn.functional.normalize(embedding_matrix, dim=1)

    # Cosine similarity
    cos_sim = torch.matmul(word_embedding_norm, embedding_matrix_norm.T).squeeze(0)#vocab size

    # Get top K most similar tokens
    top_k_indices = torch.topk(cos_sim, top_k).indices
    similar_tokens = tokenizer.convert_ids_to_tokens(top_k_indices.tolist())

    # Optionally filter out subwords (tokens starting with "##")
    if exclude_subwords:
        similar_tokens = [tok for tok in similar_tokens if not tok.startswith("##")]

    return similar_tokens[:top_k]

#for di in top_hate_ctxt:
    #print(di["word"],find_similar_whole_word(di["word"]))




def compute_annotator_disagreement(dataset):
    disagreement_stats = []
    for post_id, content in dataset.items():
        labels = [ann['label'] for ann in content.get('annotators', [])]
        if not labels:
            continue
        label_counter = Counter(labels)
        disagreement_count = len(label_counter)
        disagreement_stats.append((post_id, disagreement_count, label_counter))
    return disagreement_stats


def resolve_disagreements_custom(dataset, disagreement_stats):
    resolved = []
    counter_unanimous = 0
    counter_hatespeech = 0
    counter_off_normal = 0
    counter_skipped = 0

    for post_id, disagreement, label_counts in disagreement_stats:
        if disagreement == 3:
            counter_skipped += 1
            continue  # skip level 3 disagreements

        text = " ".join(dataset[post_id]['post_tokens'])
        labels = list(label_counts.elements())
        label_set = set(label_counts.keys())

        if disagreement == 1:
            counter_unanimous += 1
            resolved_label = labels[0]

        elif disagreement == 2 and label_set == {"offensive", "hatespeech"}:
            counter_hatespeech += 1
            resolved_label = "hatespeech"

        else:
            counter_off_normal += 1
            resolved_label = Counter(labels).most_common(1)[0][0]

        resolved.append((post_id, text, resolved_label))

    print(f"Unanimous: {counter_unanimous}, Offensive vs Hatespeech: {counter_hatespeech}, Normal vs Other: {counter_off_normal}, Skipped (3-label): {counter_skipped}")
    return resolved



# New dataset with changed most attended part with the closest cos similarity

In [28]:

top_hate_ctxt=json.load(open("No_test_Full_Hate_Offe_[class]_0tok.json","r")) #list(dict(rank,word,count))





In [None]:
with open('../Data/post_id_divisions.json', 'r') as f:
    post_id_divisions = json.load(f)

with open('../Data/dataset.json', 'r') as f:
    data = json.load(f)
# Step 1: compute and resolve
disagreement_stats = compute_annotator_disagreement(data)
resolved_examples = resolve_disagreements_custom(data, disagreement_stats)

# Step 2: assign to splits based on post_id
post_id_to_example = {pid: {"text":text, "label":label}for pid, text, label in resolved_examples}

def filter_split(split_ids):
    return [post_id_to_example[pid] for pid in split_ids if pid in post_id_to_example]

train_data = filter_split(post_id_divisions['train'])
val_data = filter_split(post_id_divisions['val'])
test_data = filter_split(post_id_divisions['test'])

print(f"Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")

train_dataset = Dataset.from_list(train_data)
val_dataset = Dataset.from_list(val_data)
test_dataset = Dataset.from_list(test_data)


Unanimous: 9845, Offensive vs Hatespeech: 3916, Normal vs Other: 5468, Skipped (3-label): 919
Train: 15383, Val: 1922, Test: 1924


# Replace Token

In [None]:
token_to_change = [dic["word"] for dic in top_hate_ctxt if dic["rank"] <= 200]  ## top 200 token to change

token_with_replacements = {tokenizer.convert_tokens_to_ids(tok):tokenizer.convert_tokens_to_ids(find_similar_whole_word(tok,model,10)[5]) for tok in token_to_change} #[old_id:new_id]

In [31]:

def tokenize(example):
  return tokenizer(example["text"],truncation=True,padding="max_length")

train_dataset = train_dataset.map(tokenize, batched=True)
val_dataset = val_dataset.map(tokenize, batched=True)
test_dataset = test_dataset.map(tokenize, batched=True)




Map:   0%|          | 0/15383 [00:00<?, ? examples/s]

Map:   0%|          | 0/1922 [00:00<?, ? examples/s]

Map:   0%|          | 0/1924 [00:00<?, ? examples/s]

In [32]:

def change_attended_tokens(example):
  for i,id in enumerate(example["input_ids"]):
    if id in token_with_replacements:
      example["input_ids"][i] = token_with_replacements[id]
  return example

train_dataset = train_dataset.map(change_attended_tokens, batched=False)

Map:   0%|          | 0/15383 [00:00<?, ? examples/s]

In [33]:
def filter_dat(example):
    return example["label"] in ["normal","hatespeech","offensive"]#"offensive" can be removed

def map_labels(example):
  if example["label"] == "normal":
    example["label"] = 0
  else : 
    example["label"] = 1
  
  return example

train_dataset=train_dataset.filter(filter_dat)
train_dataset = train_dataset.map(map_labels, batched=False)

val_dataset=val_dataset.filter(filter_dat)
val_dataset = val_dataset.map(map_labels, batched=False)

test_dataset=test_dataset.filter(filter_dat)
test_dataset = test_dataset.map(map_labels, batched=False)

Filter:   0%|          | 0/15383 [00:00<?, ? examples/s]

Map:   0%|          | 0/15383 [00:00<?, ? examples/s]

Filter:   0%|          | 0/1922 [00:00<?, ? examples/s]

Map:   0%|          | 0/1922 [00:00<?, ? examples/s]

Filter:   0%|          | 0/1924 [00:00<?, ? examples/s]

Map:   0%|          | 0/1924 [00:00<?, ? examples/s]

# Model

In [34]:

pipe = pipeline("text-classification", model="tum-nlp/bert-hateXplain")
tokenizer = AutoTokenizer.from_pretrained("tum-nlp/bert-hateXplain")
model = AutoModelForSequenceClassification.from_pretrained("tum-nlp/bert-hateXplain")
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params}")



Device set to use cuda:0


Total parameters: 11171074


In [35]:


training_args = TrainingArguments(
    output_dir="first_resu",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    num_train_epochs=10,
    learning_rate=2e-5,
    warmup_ratio=0.1,
    weight_decay=0.01,
    lr_scheduler_type="linear",
    label_smoothing_factor=0.1,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    save_total_limit=1,
    run_name="NoTest_0_class_200_2"
)



accuracy_metric = load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=1)
    return accuracy_metric.compute(predictions=predictions, references=labels)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)

  trainer = Trainer(


In [36]:
trainer.train()

Epoch,Training Loss,Validation Loss,Accuracy
1,0.5835,0.548711,0.761707
2,0.5139,0.545044,0.768991
3,0.4801,0.562847,0.765869
4,0.4503,0.556165,0.780957
5,0.4344,0.578839,0.763788
6,0.411,0.58091,0.774194
7,0.3908,0.601008,0.774194
8,0.3804,0.605801,0.768991
9,0.3511,0.626621,0.771072


TrainOutput(global_step=9610, training_loss=0.4344219172038099, metrics={'train_runtime': 761.1581, 'train_samples_per_second': 202.1, 'train_steps_per_second': 12.625, 'total_flos': 1522933652219904.0, 'train_loss': 0.4344219172038099, 'epoch': 9.99011960478419})

In [37]:
results = trainer.evaluate()
print(results)

{'eval_loss': 0.5450443625450134, 'eval_accuracy': 0.768990634755463, 'eval_runtime': 4.8806, 'eval_samples_per_second': 393.806, 'eval_steps_per_second': 49.379, 'epoch': 9.99011960478419}


In [None]:
model.save_pretrained("../Models/White_box_ADV_model")
tokenizer.save_pretrained("../Models/White_box_ADV_model")

('res_300/tokenizer_config.json',
 'res_300/special_tokens_map.json',
 'res_300/vocab.txt',
 'res_300/added_tokens.json',
 'res_300/tokenizer.json')