In [None]:
# This code is originally obtained from https://github.com/paul-rottger/hatecheck-experiments and modified.

In [None]:
import numpy as np
import pandas as pd
import pickle
import argparse
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from sklearn.metrics import classification_report, f1_score
from sklearn.utils.class_weight import compute_class_weight
import os
import random
#os.environ["TOKENIZERS_PARALLELISM"] = "false"

import wandb
from huggingface_hub import HfApi

from peft import LoraConfig, get_peft_model

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class HateDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
from transformers import Trainer
import torch

class WeightedTrainer(Trainer):
    def __init__(self, class_weights, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = torch.FloatTensor(class_weights).to(self.args.device)
        self.loss_fn = torch.nn.CrossEntropyLoss(weight=self.class_weights)

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs[0]
        loss = self.loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
def create_datasets(data_dir, tokenizer):
    torch.manual_seed(42)
    train_df = pd.read_csv(data_dir + "/train.csv")
    train_df = train_df.dropna()
    valid_df = pd.read_csv(data_dir + "/valid.csv")
    valid_df = valid_df.dropna()
    test_df = pd.read_csv(data_dir + "/test.csv")
    test_df = test_df.dropna()

    train_texts = train_df['text'].astype("string").tolist()
    valid_texts = valid_df['text'].astype("string").tolist()
    test_texts = test_df['text'].astype("string").tolist()

    train_labels = train_df['label'].astype("int").tolist()
    valid_labels = valid_df['label'].astype("int").tolist()
    test_labels = test_df['label'].astype("int").tolist()

    # add special tokens for URLs, emojis and mentions (--> see pre-processing)
    special_tokens_dict = {'additional_special_tokens': ['[USER]', '[EMOJI]', '[URL]']}
    num_added_toks = tokenizer.add_special_tokens(special_tokens_dict)

    train_encodings = tokenizer(train_texts, padding=True, truncation=True, return_tensors="pt")#.to(DEVICE)
    valid_encodings = tokenizer(valid_texts, padding=True, truncation=True, return_tensors="pt")#.to(DEVICE)
    test_encodings = tokenizer(test_texts, padding=True, truncation=True, return_tensors="pt")

    train_dataset = HateDataset(train_encodings, train_labels)
    valid_dataset = HateDataset(valid_encodings, valid_labels)
    test_dataset = HateDataset(test_encodings, test_labels)
    tok_len = len(tokenizer)

    return train_dataset, valid_dataset, test_dataset, tok_len

In [None]:
def calculate_class_weights(data_dir):
    dataset = pd.read_csv(data_dir + "/train.csv")
    train_labels = dataset.label.to_numpy()
    class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
    print("class weights are {}".format(class_weights))
    return class_weights

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def compute_metrics(p):
    preds = p.predictions.argmax(-1)
    labels = p.label_ids
    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='weighted')
    recall = recall_score(labels, preds, average='weighted')
    f1 = f1_score(labels, preds, average='weighted')
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }

In [None]:
models = [
    # "bert-base-uncased",
    # "bert-large-uncased",
    "roberta-base",
    # "roberta-large",
    "distilbert-base-uncased",
    "distilroberta-base",
    ]

learning_rates = [5e-5, 1e-4, 2e-5]

for use_LoRA in [True]:
    for model_name in models:
        for lr in learning_rates:
            print("Training model {} with lr {}".format(model_name, lr))

            ###
            use_LoRA = use_LoRA
            LoRA_rank = 16
            model_name = model_name
            lr = lr
            ###


            dataset_dir = "./Data/"
            output_dir = "./Model/"
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)

            dataset = "Davidson_hate"
            dd_dir = dataset_dir + dataset

            experience_name = dataset + "_" + model_name

            print("Loading tokenizer...")
            tokenizer = AutoTokenizer.from_pretrained(model_name)

            print("Creating datasets...")
            train_dataset, valid_dataset, test_dataset, tok_len = create_datasets(dd_dir, tokenizer)

            print("Loading model...")
            model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(DEVICE)
            model.resize_token_embeddings(tok_len)

            if use_LoRA:

                experience_name += "_LoRA_r" + str(LoRA_rank)

                lora_config = LoraConfig(
                    r=LoRA_rank,
                    lora_alpha=32,
                    lora_dropout=0.05,
                    bias="none",
                    task_type="SEQ_CLS",
                )

                model = get_peft_model(model, lora_config)

            experience_name += "_lr" + str(lr)

            print("Calculating class weights...")
            class_weights = calculate_class_weights(dd_dir)

            training_args = TrainingArguments(
                seed=1234,
                output_dir=output_dir,
                num_train_epochs=5,
                warmup_ratio=0.1,

                learning_rate = lr,
                per_device_train_batch_size=64,
                weight_decay=0.01,
                
                logging_steps=10,
                report_to="wandb",

                per_device_eval_batch_size=64,
                evaluation_strategy="epoch",

                save_strategy="no",
            )

            trainer = WeightedTrainer(
                model=model,
                class_weights=class_weights,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=valid_dataset,
                compute_metrics=compute_metrics
            )

            wandb.init(project="hatecheck_full", name=experience_name, config=training_args)

            print("No checkpoints found. training from scratch...")
            trainer.train()
            wandb.finish()

            print("Training done, saving on HF...")
            repo_name="SNLP_XAI_hate-speech"
            hf_repo_id = repo_name + "_" + experience_name

            api = HfApi(token = "your_token")
            api.create_repo(repo_id=hf_repo_id, repo_type="model", private=False)

            # Push to Hugging Face Hub
            if use_LoRA:
                model.merge_and_unload()
            model.push_to_hub(hf_repo_id)
            tokenizer.push_to_hub(hf_repo_id)