## 1. Setup and Installation

In [None]:
# Suppress warnings for cleaner output
import warnings
warnings.filterwarnings('ignore')

: 

In [None]:
# Create necessary directories
import os
os.makedirs('Saved', exist_ok=True)
os.makedirs('explanations_dicts', exist_ok=True)
print("Directories created successfully!")

In [None]:
# Install required packages (run this if not already installed)
!pip install -r requirements.txt
!python -m spacy download en_core_web_sm

## 2. Download and Prepare GloVe Embeddings

**Note:** This step is only required once. Skip if you already have the word2vec.model file.

In [None]:
# Download GloVe embeddings (only run if needed)
# !wget http://nlp.stanford.edu/data/glove.42B.300d.zip -P Data/
# !unzip Data/glove.42B.300d.zip -d Data/
# !rm Data/glove.42B.300d.zip

In [None]:
# Convert GloVe to Word2Vec format (only run if needed)
# from gensim.models import KeyedVectors
# from gensim.scripts.glove2word2vec import glove2word2vec

# # Convert GloVe format to Word2Vec format
# glove2word2vec('Data/glove.42B.300d.txt', 'Data/glove.42B.300d_w2v.txt')

# # Load and save in gensim format
# word2vecmodel1 = KeyedVectors.load_word2vec_format('Data/glove.42B.300d_w2v.txt', binary=False)
# word2vecmodel1.save("Data/word2vec.model")

# # Clean up intermediate files
# import gc
# del word2vecmodel1
# gc.collect()

# # Remove large text files
# import os
# os.remove('Data/glove.42B.300d.txt')
# os.remove('Data/glove.42B.300d_w2v.txt')

## 3. Import Dependencies and Train Model

In [None]:
# Import the training module
from manual_training_inference import *

In [None]:
# Load model parameters from JSON configuration
import json
import ast
import torch

path_file = 'best_model_json/bestModel_birnnscrat.json'
with open(path_file, mode='r') as f:
    params = json.load(f)

# Convert string values to appropriate types
for key in params:
    if params[key] == 'True':
        params[key] = True
    elif params[key] == 'False':
        params[key] = False
    if key in ['batch_size', 'num_classes', 'hidden_size', 'supervised_layer_pos', 
               'num_supervised_heads', 'random_seed', 'max_length']:
        if params[key] != 'N/A':
            params[key] = int(params[key])
    if (key == 'weights') and (params['auto_weights'] == False):
        params[key] = ast.literal_eval(params[key])

# Configure for local execution
params['logging'] = 'local'
params['device'] = 'cpu'  # Change to 'cuda' if GPU is available
params['best_params'] = False

# Setup device
if torch.cuda.is_available() and params['device'] == 'cuda':
    device = torch.device("cuda")
    print(f'Using GPU: {torch.cuda.get_device_name(0)}')
else:
    print('Using CPU for training.')
    device = torch.device("cpu")

In [None]:
# Data folder configuration
dict_data_folder = {
    '2': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes_two.npy'},
    '3': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes.npy'}
}

# Configure training parameters
params['variance'] = 1
params['epochs'] = 5  # Reduce for faster testing
params['to_save'] = True

In [None]:
# Train with 2 classes (toxic vs non-toxic)
params['num_classes'] = 2
params['data_file'] = dict_data_folder[str(params['num_classes'])]['data_file']
params['class_names'] = dict_data_folder[str(params['num_classes'])]['class_label']

if params['num_classes'] == 2 and params['auto_weights'] == False:
    params['weights'] = [1.0, 1.0]

print(f"Training {params['num_classes']}-class model...")
train_model(params, device)

In [None]:
# Train with 3 classes (hatespeech, offensive, normal)
params['num_classes'] = 3
params['data_file'] = dict_data_folder[str(params['num_classes'])]['data_file']
params['class_names'] = dict_data_folder[str(params['num_classes'])]['class_label']

if params['num_classes'] == 2 and params['auto_weights'] == False:
    params['weights'] = [1.0, 1.0]

print(f"Training {params['num_classes']}-class model...")
train_model(params, device)

In [None]:
# Clean up memory
import gc
gc.collect()

## 4. Testing and Evaluation

In [None]:
# Run testing scripts
!python testing_with_rational.py birnn_scrat 100
!python testing_for_bias.py birnn_scrat 100

In [None]:
# Check generated explanation files
!ls explanations_dicts

---

# Bias Calculation

Based on: Borkan et al. (2019) - "Nuanced Metrics for Measuring Unintended Bias with Real Data for Text Classification"

---

In [None]:
# Import required libraries for bias calculation
from collections import Counter, defaultdict
from tqdm.notebook import tqdm
import json
import numpy as np

In [None]:
# Import data collection utilities
from Preprocess.dataCollect import get_annotated_data

In [None]:
# Configure data loading for 2-class (toxic/non-toxic)
dict_data_folder = {
    '2': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes_two.npy'},
    '3': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes.npy'}
}

params = {}
params['num_classes'] = 2  # toxic vs non-toxic
params['data_file'] = dict_data_folder[str(params['num_classes'])]['data_file']
params['class_names'] = dict_data_folder[str(params['num_classes'])]['class_label']

# Load the annotated dataset
data_all_labelled = get_annotated_data(params)
print(f"Loaded {len(data_all_labelled)} samples")

In [None]:
# Display sample data
data_all_labelled.head()

In [None]:
def generate_target_information(dataset):
    """Extract target community based on majority voting among annotators."""
    final_target_output = defaultdict(list)
    all_communities_selected = []
    
    for each in dataset.iterrows():
        # Combine all target communities from 3 annotators
        all_targets = each[1]['target1'] + each[1]['target2'] + each[1]['target3']
        community_dict = dict(Counter(all_targets))
        
        # Select communities mentioned by at least 2 annotators
        for key in community_dict:
            if community_dict[key] > 1:
                final_target_output[each[1]['post_id']].append(key)
                all_communities_selected.append(key)
        
        # If no majority, mark as 'None'
        if each[1]['post_id'] not in final_target_output:
            final_target_output[each[1]['post_id']].append('None')
            all_communities_selected.append('None')

    return final_target_output, all_communities_selected

In [None]:
# Generate target information
target_information, all_communities_selected = generate_target_information(data_all_labelled)

In [None]:
# Get top 10 communities for bias calculation
community_count_dict = Counter(all_communities_selected)

# Remove 'None' and 'Other' from consideration
community_count_dict.pop('None', None)
community_count_dict.pop('Other', None)

# Select top 10 communities
list_selected_community = [community for community, value in community_count_dict.most_common(10)]
print(f"Top 10 communities: {list_selected_community}")

In [None]:
# Filter target information to only include top 10 communities
final_target_information = {}
for each in target_information:
    temp = list(set(target_information[each]) & set(list_selected_community))
    if len(temp) == 0:
        final_target_information[each] = None
    else:
        final_target_information[each] = temp

In [None]:
# Add target category column to dataset
data_all_labelled['final_target_category'] = data_all_labelled['post_id'].map(final_target_information)

In [None]:
# Load test split IDs and filter data
with open('./Data/post_id_divisions.json', 'r') as fp:
    post_id_dict = json.load(fp)

data_all_labelled_bias = data_all_labelled[data_all_labelled['post_id'].isin(post_id_dict['test'])]
print(f"Test samples for bias evaluation: {len(data_all_labelled_bias)}")

In [None]:
from sklearn.metrics import roc_auc_score

# Bias score file mapping for the trained model
bias_score_file_mapping = {
    'BiRNN-Attn': 'bestModel_birnnscrat_bias.json',
}

parent_path = './explanations_dicts/'
method_list = ['subgroup', 'bpsn', 'bnsp']
community_list = list(list_selected_community)

In [None]:
def convert_to_score(label_name, label_dict):
    """Convert classification to toxicity score [0-1]."""
    if label_name == 'non-toxic':
        return 1 - label_dict[label_name]
    else:
        return label_dict[label_name]


def bias_evaluation_metric(dataset, method, community):
    """Divide IDs into positive/negative based on bias evaluation method."""
    positive_ids = []
    negative_ids = []
    
    for eachrow in dataset.iterrows():
        if eachrow[1]['final_target_category'] is None:
            continue
            
        is_community = community in eachrow[1]['final_target_category']
        is_toxic = eachrow[1]['final_label'] != 'non-toxic'
        
        if method == 'subgroup':
            if is_community:
                if is_toxic:
                    positive_ids.append(eachrow[1]['post_id'])
                else:
                    negative_ids.append(eachrow[1]['post_id'])
        elif method == 'bpsn':
            if is_community and not is_toxic:
                negative_ids.append(eachrow[1]['post_id'])
            elif not is_community and is_toxic:
                positive_ids.append(eachrow[1]['post_id'])
        elif method == 'bnsp':
            if is_community and is_toxic:
                positive_ids.append(eachrow[1]['post_id'])
            elif not is_community and not is_toxic:
                negative_ids.append(eachrow[1]['post_id'])
        else:
            print('Incorrect method selected!')
                
    return {'positiveID': positive_ids, 'negativeID': negative_ids}

In [None]:
# Calculate bias scores
final_bias_dictionary = defaultdict(lambda: defaultdict(dict))

for each_model in tqdm(bias_score_file_mapping, desc="Processing models"):
    total_data = {}
    filepath = parent_path + bias_score_file_mapping[each_model]
    
    # Check if file exists
    if not os.path.exists(filepath):
        print(f"Warning: {filepath} not found. Run testing scripts first.")
        continue
    
    with open(filepath) as fp:
        for line in fp:
            data = json.loads(line)
            total_data[data['annotation_id']] = data
    
    for each_method in method_list:
        for each_community in community_list:
            community_data = bias_evaluation_metric(data_all_labelled_bias, each_method, each_community)
            truth_values = []
            prediction_values = []
            
            label_to_value = {'toxic': 1.0, 'non-toxic': 0.0}
            
            for each in community_data['positiveID']:
                if each in total_data:
                    truth_values.append(label_to_value[total_data[each]['ground_truth']])
                    prediction_values.append(convert_to_score(
                        total_data[each]['classification'], 
                        total_data[each]['classification_scores']
                    ))
            
            for each in community_data['negativeID']:
                if each in total_data:
                    truth_values.append(label_to_value[total_data[each]['ground_truth']])
                    prediction_values.append(convert_to_score(
                        total_data[each]['classification'],
                        total_data[each]['classification_scores']
                    ))
            
            if len(truth_values) > 0 and len(set(truth_values)) > 1:
                roc_output_value = roc_auc_score(truth_values, prediction_values)
                final_bias_dictionary[each_model][each_method][each_community] = roc_output_value

In [None]:
# Calculate generalized mean of bias scores
power_value = -5
num_communities = len(community_list)

print("\nBias Scores (Generalized Mean):")
print("=" * 50)
for each_model in final_bias_dictionary:
    for each_method in final_bias_dictionary[each_model]:
        temp_value = []
        for each_community in final_bias_dictionary[each_model][each_method]:
            temp_value.append(pow(final_bias_dictionary[each_model][each_method][each_community], power_value))
        if len(temp_value) > 0:
            score = pow(np.sum(temp_value) / num_communities, 1 / power_value)
            print(f"{each_model} | {each_method}: {score:.4f}")

---

# Calculate Explainability

Based on: DeYoung et al. (2020) - "ERASER: A Benchmark to Evaluate Rationalized NLP Models"

---

In [None]:
# Import required libraries
import json
from tqdm.notebook import tqdm
import more_itertools as mit
import os

In [None]:
# Import preprocessing utilities
from Preprocess.dataCollect import get_annotated_data
from Preprocess.spanMatcher import returnMask
from transformers import BertTokenizer

In [None]:
# Load 3-class dataset for explainability
dict_data_folder = {
    '2': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes_two.npy'},
    '3': {'data_file': 'Data/dataset.json', 'class_label': 'Data/classes.npy'}
}

params = {}
params['num_classes'] = 3  # hatespeech, offensive, normal
params['data_file'] = dict_data_folder[str(params['num_classes'])]['data_file']
params['class_names'] = dict_data_folder[str(params['num_classes'])]['class_label']

data_all_labelled = get_annotated_data(params)
print(f"Loaded {len(data_all_labelled)} samples for explainability evaluation")

In [None]:
# Configure tokenization parameters
params_data = {
    'include_special': False,
    'bert_tokens': False,  # Set True for BERT models
    'type_attention': 'softmax',
    'set_decay': 0.1,
    'majority': 2,
    'max_length': 128,
    'variance': 10,
    'window': 4,
    'alpha': 0.5,
    'p_value': 0.8,
    'method': 'additive',
    'decay': False,
    'normalized': False,
    'not_recollect': True,
}

# Initialize tokenizer
if params_data['bert_tokens']:
    print('Loading BERT tokenizer...')
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=False)
else:
    print('Using standard tokenizer...')
    tokenizer = None

In [None]:
def get_training_data(data):
    """Load dataset and extract token-wise rationales."""
    final_output = []
    print(f'Processing {len(data)} samples...')
    
    for index, row in tqdm(data.iterrows(), total=len(data)):
        annotation = row['final_label']
        post_id = row['post_id']
        annotation_list = [row['label1'], row['label2'], row['label3']]
        
        if annotation != 'undecided':
            tokens_all, attention_masks = returnMask(row, params_data, tokenizer)
            final_output.append([post_id, annotation, tokens_all, attention_masks, annotation_list])
    
    return final_output

In [None]:
# Process training data
training_data = get_training_data(data_all_labelled)
print(f"Processed {len(training_data)} valid samples")

In [None]:
def find_ranges(iterable):
    """Yield ranges of consecutive numbers."""
    for group in mit.consecutive_groups(iterable):
        group = list(group)
        if len(group) == 1:
            yield group[0]
        else:
            yield group[0], group[-1]


def get_evidence(post_id, anno_text, explanations):
    """Convert explanations to ERASER evidence format."""
    output = []
    indexes = sorted([i for i, each in enumerate(explanations) if each == 1])
    span_list = list(find_ranges(indexes))
    
    for each in span_list:
        if isinstance(each, int):
            start, end = each, each + 1
        elif len(each) == 2:
            start, end = each[0], each[1] + 1
        else:
            print('Error in span processing')
            continue
        
        output.append({
            "docid": post_id,
            "end_sentence": -1,
            "end_token": end,
            "start_sentence": -1,
            "start_token": start,
            "text": ' '.join([str(x) for x in anno_text[start:end]])
        })
    return output


def convert_to_eraser_format(dataset, method, save_split, save_path, id_division):
    """Convert dataset to ERASER benchmark format."""
    final_output = []
    
    if save_split:
        os.makedirs(save_path, exist_ok=True)
        os.makedirs(os.path.join(save_path, 'docs'), exist_ok=True)
        train_fp = open(os.path.join(save_path, 'train.jsonl'), 'w')
        val_fp = open(os.path.join(save_path, 'val.jsonl'), 'w')
        test_fp = open(os.path.join(save_path, 'test.jsonl'), 'w')
    
    for eachrow in dataset:
        post_id = eachrow[0]
        post_class = eachrow[1]
        anno_text_list = eachrow[2]
        
        if post_class == 'normal':
            continue
        
        explanations = [list(each_explain) for each_explain in eachrow[3]]
        
        # Union of explanations from all annotators
        if method == 'union':
            final_explanation = [int(any(each)) for each in zip(*explanations)]
        
        temp = {
            'annotation_id': post_id,
            'classification': post_class,
            'evidences': [get_evidence(post_id, list(anno_text_list), final_explanation)],
            'query': "What is the class?",
            'query_type': None
        }
        final_output.append(temp)
        
        if save_split:
            # Save document
            with open(os.path.join(save_path, 'docs', post_id), 'w') as fp:
                fp.write(' '.join([str(x) for x in list(anno_text_list)]))
            
            # Save to appropriate split
            if post_id in id_division['train']:
                train_fp.write(json.dumps(temp) + '\n')
            elif post_id in id_division['val']:
                val_fp.write(json.dumps(temp) + '\n')
            elif post_id in id_division['test']:
                test_fp.write(json.dumps(temp) + '\n')
    
    if save_split:
        train_fp.close()
        val_fp.close()
        test_fp.close()
    
    return final_output

In [None]:
# Load data splits
with open('./Data/post_id_divisions.json') as fp:
    id_division = json.load(fp)

In [None]:
# Create evaluation directory
os.makedirs('./Data/Evaluation/Model_Eval', exist_ok=True)

In [None]:
# Convert to ERASER format
method = 'union'
save_split = True
save_path = './Data/Evaluation/Model_Eval/'

output_eraser = convert_to_eraser_format(training_data, method, save_split, save_path, id_division)
print(f"Converted {len(output_eraser)} samples to ERASER format")

In [None]:
# List generated files
!ls Data/Evaluation/Model_Eval/

In [None]:
# Run ERASER metrics (from eraserbenchmark directory)
import subprocess
import sys

# Check if explanation file exists
explanation_file = './explanations_dicts/bestModel_birnnscrat_100_explanation_top5.json'
if os.path.exists(explanation_file):
    cmd = (
        f"cd eraserbenchmark && "
        f"PYTHONPATH=./:$PYTHONPATH python rationale_benchmark/metrics.py "
        f"--split test "
        f"--data_dir ../Data/Evaluation/Model_Eval "
        f"--results ../{explanation_file} "
        f"--score_file ../model_explain_output.json"
    )
    !{cmd}
else:
    print(f"Explanation file not found: {explanation_file}")
    print("Run testing_with_rational.py first.")

In [None]:
# Print explainability results
output_file = './model_explain_output.json'
if os.path.exists(output_file):
    with open(output_file) as fp:
        output_data = json.load(fp)
    
    print('\n' + '=' * 50)
    print('EXPLAINABILITY RESULTS')
    print('=' * 50)
    
    print('\nPlausibility:')
    print(f"  IOU F1:   {output_data['iou_scores'][0]['macro']['f1']:.4f}")
    print(f"  Token F1: {output_data['token_prf']['instance_macro']['f1']:.4f}")
    print(f"  AUPRC:    {output_data['token_soft_metrics']['auprc']:.4f}")
    
    print('\nFaithfulness:')
    print(f"  Comprehensiveness: {output_data['classification_scores']['comprehensiveness']:.4f}")
    print(f"  Sufficiency:       {output_data['classification_scores']['sufficiency']:.4f}")
else:
    print(f"Output file not found: {output_file}")

---

## Summary

This notebook demonstrates:
1. **Model Training**: Training BiRNN-SCRAT model for hate speech detection
2. **Bias Evaluation**: Computing subgroup, BPSN, and BNSP bias metrics
3. **Explainability Evaluation**: Computing plausibility and faithfulness metrics

---