In [57]:
pip install datasets evaluate seqeval

Note: you may need to restart the kernel to use updated packages.


In [47]:
from datasets import load_dataset
import numpy as np
from torch.utils.data import Dataset
from transformers import DataCollatorForTokenClassification,pipeline,AutoModelForTokenClassification,AutoTokenizer, Trainer, TrainingArguments,DataCollatorWithPadding
from sklearn.model_selection import train_test_split
import string
import re
import evaluate

In [48]:
def add_markers(original_sentence, new_sentences):
    blank_location = original_sentence.find('BLANK')

    marked_sentences = []
    for sentence in new_sentences:
        left_side = sentence[:blank_location]
        right_side = sentence[blank_location:]

        right_side_words = right_side.split(' ', 1)

        word, punctuation = re.match(r"(\w+)(\W*)", right_side_words[0]).groups()

        if len(right_side_words) > 1:
            marked_sentence = left_side + '===' + word + '===' + punctuation + ' ' + right_side_words[1]
        else:
            marked_sentence = left_side + '===' + word + '===' + punctuation

        marked_sentences.append(marked_sentence)

    return marked_sentences

In [49]:
intrasentence_dataset = load_dataset('stereoset','intrasentence')["validation"]

profession_dataset = []
race_dataset = []
gender_dataset = []
religion_dataset = []

for x in range(len(intrasentence_dataset)):
    entry = intrasentence_dataset[x]
    bias_type = entry['bias_type']
    sentence_group = entry['sentences']
    sentence_marked = add_markers(entry['context'],sentence_group['sentence'])
    label = sentence_group['gold_label'] #0 stereotype 1 anti-stereotype 2 unrelated
    
    for x in range(len(sentence_marked)):
        temp_data = {}
        temp_data["text"] = sentence_marked[x]
        temp_data["label"] = label[x]
        
        if bias_type == "profession":
            profession_dataset.append(temp_data)
        if bias_type == "race":
            race_dataset.append(temp_data)
        if bias_type == "gender":
            gender_dataset.append(temp_data)
        if bias_type == "religion":
            religion_dataset.append(temp_data)

Found cached dataset stereoset (/home/jupyter/.cache/huggingface/datasets/stereoset/intrasentence/1.0.0/b188e395e95b37c7a095ebc2de352fbdb249d67d1beb2ff639bb4dc37dfbb090)
100% 1/1 [00:00<00:00, 310.18it/s]


In [50]:
print(np.shape(profession_dataset))
print(np.shape(race_dataset))
print(np.shape(gender_dataset))
print(np.shape(religion_dataset))

(2430,)
(2886,)
(765,)
(237,)


In [51]:
integrated_dataset = {}
integrated_dataset["race"] = race_dataset
integrated_dataset["gender"] = gender_dataset
integrated_dataset["religion"] = religion_dataset
integrated_dataset["profession"] = profession_dataset

In [52]:
def prepare_data(data):
    new_data = []
    bias_type = ["gender","race","profession","religion"]
    for type_bias in bias_type:
        for item in data[type_bias]:
            text = item['text']
            label = item['label']

            # Split sentence also by punctuation
            tokens = re.findall(r"===\w+===|[\w']+|[.,!?;]", text)

            labels = []
            tokenlist = []
            for token in tokens:
                in_bias = False
                if token.startswith('===') and token.endswith('==='):
                    in_bias = True
                    new_token = token[3:]  # Remove the marker
                    new_token = new_token[:-3]  # Remove the marker  
                    tokenlist.append(new_token)
                else:
                    tokenlist.append(token)

                if in_bias:
                    if (type_bias == "gender"):
                        if(label == 0):
                            labels.append(1) 
                        else:
                            labels.append(2)  
                    if (type_bias == "race"):
                        if(label == 0):
                            labels.append(3) 
                        else:
                            labels.append(4) 
                    if (type_bias == "profession"):
                        if(label == 0):
                            labels.append(5) 
                        else:
                            labels.append(6) 
                    if (type_bias == "religion"):
                        if(label == 0):
                            labels.append(7) 
                        else:
                            labels.append(8) 
                else:
                    labels.append(0)  # non-bias token with label 'unrelated'

            new_item = {
                'tokens': tokenlist,
                'labels': labels
            }
            new_data.append(new_item)
    return new_data
# 0: "unrelated"
# 1: "stereotype_gender",
# 2: "anti-stereotype_gender",
# 3: "stereotype_race",
# 4: "anti-stereotype_race",
# 5: "stereotype_profession",
# 6: "anti-stereotype_profession",
# 7: "stereotype_religion",
# 8: "anti-stereotype_religion",
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [53]:
from datasets import Dataset

data = prepare_data(integrated_dataset)

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

hf_dataset = Dataset.from_dict({k: [dic[k] for dic in data] for k in data[0]})
tokenized_data = hf_dataset.map(tokenize_and_align_labels, batched=True)

final_dataset = tokenized_data.train_test_split(0.2,shuffle=True)
final_dataset

                                                     

DatasetDict({
    train: Dataset({
        features: ['tokens', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 5054
    })
    test: Dataset({
        features: ['tokens', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 1264
    })
})

In [54]:
tokenized_input = tokenizer(data[17]["tokens"], is_split_into_words=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
tokens

['[CLS]', 'a', 'mother', 'is', 'bowl', '.', '[SEP]']

In [55]:
# Define data collator to handle padding
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

seqeval = evaluate.load("seqeval")
label_list = ["unrelated","stereotype_gender","anti-stereotype_gender","stereotype_race","anti-stereotype_race","stereotype_profession","anti-stereotype_profession","stereotype_religion","anti-stereotype_religion"]
labels = [label_list[i] for i in data[0]["labels"]]

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

from sklearn.metrics import precision_recall_fscore_support,accuracy_score,balanced_accuracy_score

def compute_metrics_new(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # Flatten the lists
    true_predictions = [pred for sublist in true_predictions for pred in sublist]
    true_labels = [label for sublist in true_labels for label in sublist]
    
    # Calculate precision, recall, f1_score, and support with "macro" average
    precision, recall, f1, _ = precision_recall_fscore_support(true_labels, true_predictions, average='macro')
    
    balanced_acc = balanced_accuracy_score(true_labels, true_predictions)

    # Calculate accuracy
    accuracy = accuracy_score(true_labels, true_predictions)
    return {
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "balanced accuracy": balanced_acc,
    }



id2label = {
    0: "unrelated",
    1: "stereotype_gender",
    2: "anti-stereotype_gender",
    3: "stereotype_race",
    4: "anti-stereotype_race",
    5: "stereotype_profession",
    6: "anti-stereotype_profession",
    7: "stereotype_religion",
    8: "anti-stereotype_religion",
    
    
}
label2id = {
    "unrelated": 0,
    "stereotype_gender": 1,
    "anti-stereotype_gender": 2,
     "stereotype_race": 3,
    "anti-stereotype_race": 4,
     "stereotype_profession": 5,
    "anti-stereotype_profession": 6,
     "stereotype_religion": 7,
    "anti-stereotype_religion": 8,
}

model = AutoModelForTokenClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=9, id2label=id2label, label2id=label2id
)

training_args = TrainingArguments(
    output_dir="token_level_model/best_model",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=12,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    save_total_limit=1
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=final_dataset["train"],
    eval_dataset=final_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics_new,
)

trainer.train()

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForTokenClassification: ['vocab_layer_norm.bias', 'vocab_transform.weight', 'vocab_layer_norm.weight', 'vocab_projector.bias', 'vocab_transform.bias']
- This IS expected if you are initializing DistilBertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream

Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 

In [None]:
# Load the trained model and the tokenizer
model = AutoModelForTokenClassification.from_pretrained("token_level_model/best_model/checkpoint-632")
tokenizer = AutoTokenizer.from_pretrained("token_level_model/best_model/checkpoint-632")

# Use the pipeline for Named Entity Recognition
ner_pipeline = pipeline('ner', model=model, tokenizer=tokenizer)

# Now you can use the pipeline to classify named entities
for x in range(30):
    sentence = race_dataset[x]['text'].replace("===","")
    print(f"Text: {sentence}")
    results = ner_pipeline(sentence)

    # Each result includes the word, its predicted entity label, and its score
    for result in results:
        # Print the word, entity and score only if the entity is not 'unrelated'
        if result['entity'] != 'unrelated':
            print(f"  Word: {result['word']}, Entity: {result['entity']}, Score: {result['score']}")

In [59]:
import shutil

# Specify the model directory and the output zipfile name
model_directory = "token_level_model/best_model"
output_filename = "best_model"

# Create a zip file
shutil.make_archive(output_filename, 'zip', model_directory)

'/home/jupyter/best_model.zip'

In [60]:
import shutil
import os

# Specify the zip file and the target directory
zip_file = "best_model.zip"
target_directory = "token_level_model/best_model"

# Remove the target directory if it already exists
if os.path.exists(target_directory):
    shutil.rmtree(target_directory)

# Unpack the archive file
shutil.unpack_archive(zip_file, target_directory)


In [None]:
new_test_dataset = load_dataset("md_gender_bias", "convai2_inferred")

test_round = 100

text_list = []
y_true = []
for x in range(test_round):
    entry = new_test_dataset["train"][x]
    text_list.append(entry["text"])
    y_true.append(entry["ternary_label"])
result_new = ner_pipeline(text_list)

# Each result includes the word, its predicted entity label, and its score
y_pred = []
for x in range(test_round):
    #print("sentence: "+str(text_list[x]))
    for result in result_new[x]:
        # Print the word, entity and score only if the entity is not 'unrelated'
        flag = False
        if result['entity'] != 'unrelated':
            # print(f"  Word: {result['word']}, Entity: {result['entity']}, Score: {result['score']}")
            if  'anti-stereotype' in result['entity']:
                flag = True
                y_pred.append(1)
                break
            elif 'stereotype' in result['entity']:
                flag = True
                y_pred.append(2)
                break
        
    if flag == False:
        y_pred.append(0)
    # print("y_true: " + str(y_true))
    # print("y_predict: " + str(y_pred))

In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(y_true,y_pred)

0.24

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification,pipeline

tokenizer = AutoTokenizer.from_pretrained("wu981526092/token-level-bias-detector")

model = AutoModelForTokenClassification.from_pretrained("wu981526092/token-level-bias-detector")

# Use the pipeline for Named Entity Recognition
ner_pipeline = pipeline('ner', model="wu981526092/token-level-bias-detector", tokenizer="wu981526092/token-level-bias-detector")


OSError: Unable to load weights from pytorch checkpoint file for '/home/jupyter/.cache/huggingface/hub/models--wu981526092--token-level-bias-detector/snapshots/c1456bbc816e272acf78451438b45f4f71bed89a/pytorch_model.bin' at '/home/jupyter/.cache/huggingface/hub/models--wu981526092--token-level-bias-detector/snapshots/c1456bbc816e272acf78451438b45f4f71bed89a/pytorch_model.bin'. If you tried to load a PyTorch model from a TF 2.0 checkpoint, please set from_tf=True.