# Test datasets

In [3]:
import os, sys, pytest
import pandas as pd
from torch.utils.data import Dataset as PtDataset
from datasets import Dataset as HfDataset

ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if "__file__" in globals() else os.path.abspath("..")
sys.path.insert(0, ROOT_PATH)

from FairLangProc.datasets.fairness_datasets import BiasDataLoader

In [4]:
IMPLEMENTED = [
    "BBQ",
    "BEC-Pro",
    "BOLD",
    "BUG",
    "CrowS-Pairs",
    "GAP",
    "StereoSet",
    "UnQover",
    "WinoBias+",
    "WinoBias",
    "Winogender"
]

DATASETS = [
    "BBQ",
    "BEC-Pro",
    "BOLD",
    "BUG",
    "Bias-NLI",
    "CrowS-Pairs",
    "GAP",
    "Grep-BiasIR",
    "HONEST",
    "HolisticBias",
    "PANDA",
    "RedditBias",
    "StereoSet",
    "TrustGPT",
    "UnQover",
    "WinoBias",
    "WinoBias+",
    "WinoQueer",
    "Winogender",
]

REMAINING = [dataset for dataset in DATASETS if dataset not in IMPLEMENTED]

CONFIGURATIONS = {
    "BBQ": ["Age", "Disability_Status", "Gender_identity", "Nationality", "Physical_appearance", "Race_ethnicity", "Race_x_gender", "Race_x_SES", "Religion", "SES", "Sexual_orientation", "all"],
    "BEC-Pro": ["english", "german", "all"],
    "BOLD": ["prompts", "wikipedia", "all"],
    "BUG": ["balanced", "full", "gold", "all"],
    "Bias-NLI": ["process", "load", "all"],
    "CrowS-Pairs": [""],
    "GAP": [""],
    "Grep-BiasIR": ["queries", "documents", "relevance", "all"],
    "HolisticBias": ["noun_phrases", "sentences", "all"],
    "PANDA": ["train", "test", "dev", "all"],
    "RedditBias": ["posts", "comments", "annotations", "all"],
    "StereoSet": ["word", "sentence", "all"],
    "TrustGPT": ["process", "load", "all", "benchmarks"],
    "UnQover": ["questions", "answers", "annotations"],
    "WinoBias": ["pairs", "WinoBias"],
    "WinoBias+": [""],
    "WinoQueer": ["sentences", "templates", "annotations", "all"],
    "Winogender": [""],
}

FORMATS = ["hf", "pt", "raw"]

CLASS_DICT = {
    "hf": HfDataset,
    "pt": PtDataset,
    "raw": pd.DataFrame
}

In [5]:
TEST_CASES_FORMAT = [
    (dataset, config, format)
    for dataset in CONFIGURATIONS.keys()
    for config in CONFIGURATIONS[dataset] 
    for format in FORMATS if dataset in IMPLEMENTED
]

@pytest.mark.parametrize("dataset, config, format", TEST_CASES_FORMAT)
def test_format(dataset, config, format):
    result = BiasDataLoader(dataset = dataset, config = config, format = format)
    assert isinstance(result, dict)
    for key in result:
        assert isinstance(result[key], CLASS_DICT[format])

In [6]:
def _get_columns():
    for dataset in CONFIGURATIONS.keys():
        if dataset in IMPLEMENTED:
            result = BiasDataLoader(dataset = dataset, config = 'all', format = 'raw')
            if result is None:
                result = BiasDataLoader(dataset = dataset, config = '', format = 'raw')
            try:
                print(dataset)
                print(list(result[list(result.keys())[0]].keys()))
            except:
                pass

def _get_rows():
    for dataset in CONFIGURATIONS.keys():
        if dataset in IMPLEMENTED:
            result = BiasDataLoader(dataset = dataset, config = 'all', format = 'raw')
            if result is None:
                result = BiasDataLoader(dataset = dataset, config = '', format = 'raw')
            try:
                string = f"\"{dataset}\": {{"
                for data in result.keys():
                    if data == 'templates' and dataset == 'BBQ':
                        continue
                    string += f"\"{data}\": {len(result[data].index)}, "
                string += "}, "
                print(string)
            except:
                print(dataset + ": nothing")


In [None]:
COLUMNS = {
    "BBQ": ['example_id', 'question_index', 'question_polarity', 'context_condition', 'category', 'answer_info', 'additional_metadata', 'context', 'question', 'ans0', 'ans1', 'ans2', 'label'],
    "BEC-Pro": ['Unnamed: 0', 'Sentence', 'Sent_TM', 'Sent_AM', 'Sent_TAM', 'Template', 'Person', 'Gender', 'Profession', 'Prof_Gender'],
    "BOLD": ['gender_prompt.json', 'political_ideology_prompt.json', 'profession_prompt.json', 'race_prompt.json', 'religious_ideology_prompt.json'],
    "BUG": ['Unnamed: 0', 'sentence_text', 'tokens', 'profession', 'g', 'profession_first_index', 'g_first_index', 'predicted gender', 'stereotype', 'distance', 'num_of_pronouns', 'corpus', 'data_index'],
    "CrowS-Pairs": ['Unnamed: 0', 'sent_more', 'sent_less', 'stereo_antistereo', 'bias_type', 'annotations', 'anon_writer', 'anon_annotators'],
    "GAP": ['ID', 'Text', 'Pronoun', 'Pronoun-offset', 'A', 'A-offset', 'A-coref', 'B', 'B-offset', 'B-coref', 'URL'],
    "HolisticBias": None,
    "StereoSet": ['options', 'context', 'target', 'bias_type', 'labels'],
    "WinoBias+": ['gendered', 'neutral'],
    "WinoBias": ['sentence', 'entity', 'pronoun'],
    "Winogender": ['sentid', 'sentence']
}

TEST_CASES_COLUMNS = list(COLUMNS.keys())

@pytest.mark.parametrize("dataset", TEST_CASES_COLUMNS)
def test_columns(dataset):
    result = BiasDataLoader(dataset = dataset, config = 'all', format = 'raw')
    data = result[list(dataset.keys())[0]]
    assert len(column) == len(data.columns), "Different number of columns"
    for column in COLUMNS[dataset]:
        assert column in data.columns, "Missing column"
    

In [None]:
ROWS = {
    "BBQ": {"Age.jsonl": 3680, "Disability_status.jsonl": 1556, "Gender_identity.jsonl": 5672, "Nationality.jsonl": 3080, "Physical_appearance.jsonl": 1576, "Race_ethnicity.jsonl": 6880, "Race_x_SES.jsonl": 11160, "Race_x_gender.jsonl": 15960, "Religion.jsonl": 1200, "SES.jsonl": 6864, "Sexual_orientation.jsonl": 864, "additional_metadata.csv": 58556, },
    "BEC-Pro": {"english": 5400, "german": 5400, }, 
    "BUG": {"balanced_BUG.csv": 25504, "full_BUG.csv": 105687, "gold_BUG.csv": 1717, }, 
    "CrowS-Pairs": {"data": 1508, }, 
    "GAP": {"gap-development.tsv": 2000, "gap-test.tsv": 2000, "gap-validation.tsv": 454, }, 
    "StereoSet": {"test_sentence": 6374, "test_word": 6392, "dev_sentence": 2123, "dev_word": 2106, }, 
    "WinoBias": {"anti_stereotyped_type1.txt.dev": 396, "anti_stereotyped_type1.txt.test": 396, "anti_stereotyped_type2.txt.dev": 396, "anti_stereotyped_type2.txt.test": 396, "pro_stereotyped_type1.txt.dev": 396, "pro_stereotyped_type1.txt.test": 396, "pro_stereotyped_type2.txt.dev": 396, "pro_stereotyped_type2.txt.test": 396, }, 
    "WinoBias+": {"data": 3167, }, 
    "Winogender": {"data": 720, }, 
}

TEST_CASES_ROWS = list(ROWS.keys())

@pytest.mark.parametrize("dataset", TEST_CASES_ROWS)
def test_row_number(dataset):
    result = BiasDataLoader(dataset = dataset, config = 'all', format = 'raw')
    for key in result:
        if isinstance(result[key], pd.Dataframe):
            assert len(result[key].index) == ROWS[dataset][key]
        elif isinstance(result[key], list):
            assert len(result[key]) == ROWS[dataset][key]

# Test metrics

## Test probability

In [1]:
import os
import sys

import torch
import pytest

ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if "__file__" in globals() else os.path.abspath("..")
sys.path.insert(0, ROOT_PATH)

from FairLangProc.metrics import LPBS, CBS, CPS, AUL

In [21]:
class AttrDict(dict):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.__dict__ = self

    def __repr__(self):
        return f"{self.__class__.__name__}({dict.__repr__(self)})"
    

class DummyModel:
    def assing_logits_mask(self, input_ids, logits, b, t):
        input_sum = input_ids[b].sum().item()

        # Logic depending on sentence and word
        logits[b, t, 200] = 5.0 + input_sum % 3     # doctor
        logits[b, t, 201] = -5.0 + input_sum % 4    # nurse
        logits[b, t, 202] = 15.0 - input_sum % 5    # engineer

        logits[b, t, 300] = 5.0 + input_sum % 2     # science
        logits[b, t, 301] = -5.0 + input_sum % 6    # art
        logits[b, t, 302] = 15.0 - input_sum % 7    # math

        logits[b, t, 400] = 10.0 + input_sum % 3    # he
        logits[b, t, 401] = -10.0 + input_sum % 4   # she
        logits[b, t, 402] = -15.0 + input_sum % 10  # it


    def assing_logits(self, input_ids, logits):
        
        ids = [200, 201, 202, 300, 301, 302, 400, 401, 402]
        batch_size, seq_len = input_ids.shape

        for b in range(batch_size):
            for t in range(seq_len):
                if input_ids[b, t] in ids:
                    logits[b, t, input_ids[b, t]] = 5.0 + input_ids[b, t] % 3 


    def __call__(self, input_ids = None, **kwargs):
        batch_size, seq_len = input_ids.shape

        logits = torch.zeros(batch_size, seq_len, 30522)

        mask_token_id = 103
        mask_positions = (input_ids == mask_token_id)
        noMask = mask_positions.sum().item() == 0  

        # introduce dependency on input_ids
        if noMask:
            self.assing_logits(input_ids, logits)
        else:
            for b in range(batch_size):
                for t in range(seq_len):
                    if mask_positions[b, t]:
                        self.assing_logits_mask(input_ids, logits, b, t)

        return AttrDict(logits=logits)
    

class DummyTokenizer:
    pad_token_type_id = 101
    pad_token_id = 101
    cls_token_id = 102
    mask_token_id = 103
    hash_map_tokens = {
        '[PAD]': pad_token_type_id,
        '[CLS]': cls_token_id,
        '[MASK]': mask_token_id,
        'doctor': 200,
        'nurse': 201,
        'engineer': 202,
        'science': 300,
        'art': 301,
        'math': 302,
        'he': 400,
        'she': 401,
        'it': 402,
    }

    def __init__(self):
        return
    
    def __call__(self, sentences, padding=True, return_tensors="pt"):
        split = [["[CLS]"] + sentence.split() for sentence in sentences]
        maxLen = max([len(sentence) for sentence in split])
        ids = [[self.convert_tokens_to_ids(word) for word in sentence] for sentence in split]
        if padding:
            for i in range(len(ids)):
                ids[i] += [self.pad_token_id] * (maxLen - len(ids[i]))

        return AttrDict(**{"input_ids": torch.tensor(ids)})

    def tokenize(self, word):
        return [word]

    def convert_tokens_to_ids(self, token):
        return self.hash_map_tokens.get(token, 100)

In [22]:
MODEL = DummyModel()
TOKENIZER = DummyTokenizer()

SENTENCES = [
    "[MASK] is a [MASK]",
    "Is [MASK] a [MASK] ?",
    "[MASK] teaches [MASK]"
]

TARGET_WORDS_LPBS = [
    ("he", "she"),
    ("he", "she"),
    ("he", "she")
]

TARGET_WORDS_CBS = [
    ("he", "she", "it"),
    ("he", "she", "it"),
    ("he", "she", "it")
]

FILL_WORDS = [
    'engineer',
    'doctor',
    'math',
]

MASK_INDICES = [0, 0, 0]

def test_lpbs_type():
    LPBSscore = LPBS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = SENTENCES,
        target_words = TARGET_WORDS_LPBS,
        fill_words = FILL_WORDS,
        mask_indices = MASK_INDICES
    )
    assert isinstance(LPBSscore, torch.Size)
    assert LPBSscore.shape == torch.Size([3])

def test_lpbs_value():
    LPBSscore = LPBS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = SENTENCES,
        target_words = TARGET_WORDS_LPBS,
        fill_words = FILL_WORDS,
        mask_indices = MASK_INDICES
    )
      
    assert abs(LPBSscore[0].item() - 1.0) < 1e-5
    assert abs(LPBSscore[1].item() - (-3.0)) < 1e-5
    assert abs(LPBSscore[2].item() - (-2.0)) < 1e-5

def test_lpbs_less_target():
    with pytest.raises(AssertionError) as excinfo:
        LPBSscore = LPBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES[:1],
            target_words = TARGET_WORDS_LPBS,
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences" in excinfo

def test_lpbs_less_target():
    with pytest.raises(AssertionError) as excinfo:
        LPBSscore = LPBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_LPBS[:1],
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences and target words" in excinfo

def test_lpbs_less_fill():
    with pytest.raises(AssertionError) as excinfo:
        LPBSscore = LPBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_LPBS,
            fill_words = FILL_WORDS[:1],
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences and fill words" in excinfo

def test_lpbs_less_masks():
    with pytest.raises(AssertionError) as excinfo:
        LPBSscore = LPBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_LPBS,
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES[:1]
        )
    assert "Different number of sentences and mask indices" in excinfo

def test_lpbs_no_pairs():
    with pytest.raises(AssertionError) as excinfo:
        LPBSscore = LPBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = [('he', 'she', 'it')]*3,
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES
        )
    assert "Target words must consist of pairs of words" in excinfo

In [24]:
def test_cbs_type():
    CBSscore = CBS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = SENTENCES,
        target_words = TARGET_WORDS_CBS,
        fill_words = FILL_WORDS,
        mask_indices = MASK_INDICES
    )
    assert isinstance(CBSscore, torch.Size)
    assert CBSscore.shape == torch.Size([3])

def test_cbs_value():
    CBSscore = CBS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = SENTENCES,
        target_words = TARGET_WORDS_CBS,
        fill_words = FILL_WORDS,
        mask_indices = MASK_INDICES
    )
      
    assert abs(CBSscore[0].item() - 1/3) < 1e-5
    assert abs(CBSscore[1].item() - 13/3) < 1e-5
    assert abs(CBSscore[2].item() - 4.0) < 1e-5

def test_cbs_less_target():
    with pytest.raises(AssertionError) as excinfo:
        CBSscore = CBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES[:1],
            target_words = TARGET_WORDS_CBS,
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences" in excinfo

def test_cbs_less_target():
    with pytest.raises(AssertionError) as excinfo:
        CBSscore = CBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_CBS[:1],
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences and target words" in excinfo

def test_cbs_less_fill():
    with pytest.raises(AssertionError) as excinfo:
        CBSscore = CBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_CBS,
            fill_words = FILL_WORDS[:1],
            mask_indices = MASK_INDICES
        )
    assert "Different number of sentences and fill words" in excinfo

def test_cbs_less_masks():
    with pytest.raises(AssertionError) as excinfo:
        CBSscore = CBS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = SENTENCES,
            target_words = TARGET_WORDS_CBS,
            fill_words = FILL_WORDS,
            mask_indices = MASK_INDICES[:1]
        )
    assert "Different number of sentences and mask indices" in excinfo

In [25]:
PLL_SENTENCES = [
    'he is an exemplary doctor',
    'she is an exemplary doctor',
    'it is an exemplary doctor',
]

TARGET_WORDS_CPS = [
    'doctor',
    'doctor',
    'doctor',
]

def test_cps_type():
    CPSscore = CPS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = PLL_SENTENCES,
        target_words = TARGET_WORDS_CPS[:1]
    )
    assert isinstance(CPSscore, list)
    assert len(CPSscore) == 3

def test_cps_value():
    CPSscore = CPS(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = PLL_SENTENCES,
        target_words = TARGET_WORDS_CPS[:1]
    )
    assert abs(CPSscore[0] - ( -5.3755054473)) < 1e-6
    assert abs(CPSscore[1] - (-15.9847412109)) < 1e-6
    assert abs(CPSscore[2] - (-16.9847259521)) < 1e-6

def test_cps_less_target():
    with pytest.raises(AssertionError) as excinfo:
        CPSscore = CPS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = PLL_SENTENCES,
            target_words = TARGET_WORDS_CPS[:1]
        )
    assert "Number of sentences and target words must be the same" in excinfo

def test_cps_less_sentences():
    with pytest.raises(AssertionError) as excinfo:
        CPSscore = CPS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = PLL_SENTENCES[:1],
            target_words = TARGET_WORDS_CPS
        )
    assert "Number of sentences and target words must be the same" in excinfo

def test_cps_empty_sentences():
    with pytest.raises(AssertionError) as excinfo:
        CPSscore = CPS(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = [],
            target_words = []
        )
    assert "Empty sentence list" in excinfo

In [None]:
def test_aul_type():
    AULscore = AUL(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = PLL_SENTENCES
    )
    assert isinstance(AULscore, list)
    assert len(AULscore) == 3

def test_aul_value():
    AULscore = AUL(
        model = MODEL,
        tokenizer = TOKENIZER,
        sentences = PLL_SENTENCES
    )
    assert abs(AULscore[0] - (-1.3468990325)) < 1e-6
    assert abs(AULscore[1] - (-1.3449568748)) < 1e-6
    assert abs(AULscore[2] - (-1.3521032333)) < 1e-6

def test_aul_empty_sentences():
    with pytest.raises(AssertionError) as excinfo:
        AULscore = AUL(
            model = MODEL,
            tokenizer = TOKENIZER,
            sentences = []
        )
    assert "Empty sentence list" in excinfo

## Test embedding

In [136]:
import os
import sys
from math import sqrt

import torch
import pytest

ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if "__file__" in globals() else os.path.abspath("..")
sys.path.insert(0, ROOT_PATH)

from FairLangProc.metrics import WEAT

In [168]:
class AttrDict(dict):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.__dict__ = self

    def __repr__(self):
        return f"{self.__class__.__name__}({dict.__repr__(self)})"
    
    def to(self, device):
        return self
    

class DummyModel:
    privileged_tokens = [200, 201, 202, 300, 301, 302, 400, 401, 402, 500, 501, 502]

    def primitive_embedding(self, id):
        remainder = id % 100
        output = torch.Tensor([0,0])
        if remainder == 2:
            output = torch.Tensor([1, 0])
        elif remainder == 3:
            output = torch.Tensor([-1,0])
        elif remainder == 4:
            output = torch.Tensor([0, 1])
        elif remainder == 5:
            output = torch.Tensor([0,-1])
        return output

    def __call__(self, input_ids = None, output_hidden_states=True, **kwargs):
        batch_size, seq_len = input_ids.shape
        embedding = torch.zeros(batch_size, 2)
        for b in range(batch_size):
            for s in range(seq_len):
                embedding[b] += self.primitive_embedding(input_ids[b, s])
        return AttrDict(embedding=embedding)
    
    def to(self, device):
        return self
    
    def eval(self):
        return
    

class DummyTokenizer:
    pad_token_type_id = 101
    pad_token = 101
    cls_token_id = 102
    mask_token_id = 103
    hash_map_tokens = {
        '[PAD]': pad_token_type_id,
        '[CLS]': cls_token_id,
        '[MASK]': mask_token_id,
        'secretary': 200,
        'nurse': 201,
        'teacher':202,
        'engineer': 300,
        'firefighter': 301,
        'banker': 302,
        'he': 400,
        'actor': 401,
        'son': 402,
        'she': 500,
        'actress': 501,
        'daughter': 502
    }

    def __init__(self):
        return
    
    def __call__(self, sentences, padding=True, return_tensors="pt"):
        split = [sentence.split() for sentence in sentences]
        maxLen = max([len(sentence) for sentence in split])
        ids = [[self.convert_tokens_to_ids(word) for word in sentence] for sentence in split]
        if padding:
            for i in range(len(ids)):
                lenId = len(ids[i])
                if lenId < maxLen:
                    ids[i] = ids[i] + [self.pad_token_id for _ in range(maxLen - lenId)] 
        return AttrDict(input_ids = torch.tensor(ids))

    def tokenize(self, word):
        return [word]

    def convert_tokens_to_ids(self, token):
        return self.hash_map_tokens.get(token, 100)
    
    def to(self, device):
        return self

class DummyWEAT(WEAT):
    def _get_embedding(self, outputs):
        return outputs.embedding[0]

In [169]:
MODEL = DummyModel()
TOKENIZER = DummyTokenizer()
TEST_WEAT = DummyWEAT(model = MODEL, tokenizer = TOKENIZER)

X = torch.tensor([[1]*12+[0]*6, [0]*6+[1]*12], dtype = float).transpose(0, 1)
Y = torch.tensor([[1,-1]*2+[0]*2, [0]*2+[1]*3 + [-1]], dtype = float).transpose(0, 1)

COSXY = [[1.0, -1.0, 1/sqrt(2), -1/sqrt(2), 0.0, 0.0]]*6 \
    + [[1/sqrt(2), -1/sqrt(2), 1.0, 0.0, 1/sqrt(2), -1/sqrt(2)]]*6 \
    + [[0.0, 0.0, 1/sqrt(2), 1/sqrt(2), 1.0, -1.0]]*6
COSXY = torch.tensor(COSXY)

XEFFECT =  torch.tensor([[1]*4+[0]*2, [0]*2+[1]*4], dtype = float).transpose(0, 1)
YEFFECT = -torch.tensor([[1]*4+[0]*2, [0]*2+[1]*4], dtype = float).transpose(0, 1)
AEFFECT =  torch.tensor([[-1]*4+[0]*2, [0]*2+[1]*4], dtype = float).transpose(0, 1)
BEFFECT = -torch.tensor([[1]*4+[0]*2, [0]*2+[1]*4], dtype = float).transpose(0, 1)

WORDSX = ['he', 'actor', 'son']
WORDSY = ['she', 'actress', 'daughter']
WORDSA = ['banker', 'engineer', 'firefighter']
WORDSB = ['secretary', 'nurse', 'teacher']

In [None]:
def test_type_cosine_similarity():
    X = torch.tensor([[1]*12+[0]*6, [0]*6+[1]*12], dtype = float).transpose(0,1)
    Y = torch.tensor([([1,-1]*2+[0]*2)*3, ([0]*2+[1]*4)*3], dtype = float).transpose(0,1)
    output = TEST_WEAT.cosine_similarity(X, Y)
    assert isinstance(output, torch.Tensor)
    assert output.shape[0] == 18
    assert output.shape[1] == 6

def test_value_cosine_similarity():
    X = torch.tensor([[1]*12+[0]*6, [0]*6+[1]*12], dtype = float).transpose(0,1)
    Y = torch.tensor([([1,-1]*2+[0]*2)*3, ([0]*2+[1]*4)*3], dtype = float).transpose(0,1)
    output = TEST_WEAT.cosine_similarity(X, Y)
    for i in range(output.shape[0]):
        for j in range(output.shape[1]):
            assert abs(output[i,j] - COSXY[i,j]) < 1e-7

def test_type_effect_size():
    result = TEST_WEAT.effect_size(XEFFECT, YEFFECT, AEFFECT, BEFFECT)
    assert isinstance(result, float)

def test_value_effect_size():
    result = TEST_WEAT.effect_size(XEFFECT, YEFFECT, AEFFECT, BEFFECT)
    assert abs(result - 1.699794717779) < 1e-7

def test_type_metric():
    result = TEST_WEAT.metric(WORDSX, WORDSY, WORDSA, WORDSB)
    assert isinstance(result, dict)

## Test generated text

In [None]:
import os
import sys
import pytest

ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if "__file__" in globals() else os.path.abspath("..")
sys.path.insert(0, ROOT_PATH)

from FairLangProc.metrics import DemRep, StereoAsoc, HONEST

In [11]:
GENDERED_WORDS = {
    'male': ['he', 'him', 'his'],
    'female': ['she', 'her', 'actress', 'hers']
    }

ATTRIBUTES = GENDERED_WORDS.keys()

SENTENCES = [
    'She is such a good match to him.',
    'He is trying way too hard to be an actor.',
    'Her mother is trying to make ends meet.'
    'My aunt is baking, do you want to try?'
]

def test_demographic_representation_type():
    DR = DemRep(sentences = SENTENCES, demWords = GENDERED_WORDS)
    assert isinstance(DR, dict)

def test_demographic_representation_keys():
    DR = DemRep(sentences = SENTENCES, demWords = GENDERED_WORDS)
    assert len(DR.keys()) == 2

def test_demographic_representation_values():
    DR = DemRep(sentences = SENTENCES, demWords = GENDERED_WORDS)
    assert DR['male'] == 1
    assert DR['female'] == 2

def test_demographic_representation_empty_demwords():
    DR = DemRep(sentences = SENTENCES, demWords = {})
    assert DR == {}

def test_demographic_representation_empty_sentences():
    DR = DemRep(sentences = [], demWords = GENDERED_WORDS)
    assert len(DR.keys()) == 2
    assert DR['male'] == 0
    assert DR['female'] == 0

def test_demographic_representation_empty_demwords_sentences():
    DR = DemRep(sentences = [], demWords = {})
    assert DR == {}

In [12]:
TARGET_WORDS = ['mother', 'baking']

In [13]:
def test_stereorep_type():
    ST = StereoAsoc(sentences = SENTENCES, demWords = GENDERED_WORDS, targetWords = TARGET_WORDS)
    assert isinstance(ST, dict)
    for key in ST.keys():
        assert isinstance(ST[key], dict)

def test_stereorep_keys():
    ST = StereoAsoc(sentences = SENTENCES, demWords = GENDERED_WORDS, targetWords = TARGET_WORDS)
    assert len(ST.keys()) == 2
    for key in ST.keys():
        assert len(ST[key].keys()) == 2

def test_stereorep_values():
    ST = StereoAsoc(sentences = SENTENCES, demWords = GENDERED_WORDS, targetWords = TARGET_WORDS)
    assert ST['mother']['male'] == 0
    assert ST['mother']['female'] == 1
    assert ST['baking']['male'] == 0
    assert ST['baking']['female'] == 0

def test_stereorep_empty_target():
    ST = StereoAsoc(sentences = SENTENCES, demWords = GENDERED_WORDS, targetWords = [])
    assert ST == {}

def test_stereorep_empty_dem():
    ST = StereoAsoc(sentences = SENTENCES, demWords = {}, targetWords = TARGET_WORDS)
    assert ST['mother'] == {}
    assert ST['baking'] == {}

def test_stereorep_empty_sentences():
    ST = StereoAsoc(sentences = [], demWords = GENDERED_WORDS, targetWords = TARGET_WORDS)
    assert ST['mother']['male'] == 0
    assert ST['mother']['female'] == 0
    assert ST['baking']['male'] == 0
    assert ST['baking']['female'] == 0

def test_stereorep_empty_dem_sentences():
    ST = StereoAsoc(sentences = [], demWords = {}, targetWords = TARGET_WORDS)
    assert ST['mother'] == {}
    assert ST['baking'] == {}

def test_stereorep_empty_dem_target():
    ST = StereoAsoc(sentences = SENTENCES, demWords = {}, targetWords = [])
    assert ST == {}

def test_stereorep_empty_sentences_target():
    ST = StereoAsoc(sentences = [], demWords = GENDERED_WORDS, targetWords = [])
    assert ST == {}

def test_stereorep_empty_dem_sentences_target():
    ST = StereoAsoc(sentences = [], demWords = {}, targetWords = [])
    assert ST == {}

In [14]:
COMPLETIONS = [
    ['he is so stupid', 'he is such a fool', 'he is so misunderstood'],
    ['she is so bossy', 'she is an incompetent manager', 'she does what is necessary'],
    ['they were so agreeable', 'they were so nice to us', 'they showed hospitality']
]

COMPLETIONS_DIFFERENT_LENGTH = [
    ['he is so stupid', 'he is such a fool', 'he is so misunderstood'],
    ['she is so bossy', 'she is an incompetent manager'],
    ['they were so agreeable', 'they were so nice to us', 'they showed hospitality']
]

COMPLETIONS_WITHOUT_LIST = [
    ['he is so stupid', 'he is such a fool', 'he is so misunderstood'],
    ('she is so bossy', 'she is an incompetent manager', 'she does what is necessary'),
    ['they were so agreeable', 'they were so nice to us', 'they showed hospitality']
]

HURTLEX = ['fool', 'stupid', 'incompetent']

def test_honest_type():
    honest = HONEST(completions = COMPLETIONS, hurtLex = HURTLEX)
    assert isinstance(honest, float)

def test_honest_value():
    honest = HONEST(completions = COMPLETIONS, hurtLex = HURTLEX)
    assert abs(honest - 1/3) < 1e-15

def test_honest_empty_hurt():
    honest = HONEST(completions = COMPLETIONS, hurtLex = [])
    assert abs(honest - 0.0) < 1e-15

def test_honest_empty_completions():
    with pytest.raises(AssertionError) as excinfo:
        honest = HONEST(completions = [], hurtLex = HURTLEX)
    assert "completions is empty" in excinfo

def test_honest_not_list():
    with pytest.raises(AssertionError) as excinfo:
        honest = HONEST(completions = {}, hurtLex = HURTLEX)
    assert "completions is not a list" in excinfo

def test_element_not_list():
    with pytest.raises(AssertionError) as excinfo:
        honest = HONEST(completions = COMPLETIONS_WITHOUT_LIST, hurtLex = HURTLEX)
    assert "completions is not a list of lists" in excinfo

def test_honest_different_length():
    with pytest.raises(AssertionError) as excinfo:
        honest = HONEST(completions = COMPLETIONS_DIFFERENT_LENGTH, hurtLex = HURTLEX)
    assert "Number of completions is not uniform" in excinfo

# Test models

In [None]:
import os
import sys

import pytest
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments

ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if "__file__" in globals() else os.path.abspath("..")
sys.path.insert(0, ROOT_PATH)

from FairLangProc.algorithms.preprocessors import CDA, BLINDTrainer, SentDebiasForSequenceClassification
from FairLangProc.algorithms.inprocessors import EARModel, DebiasAdapter, selective_unfreezing 
from FairLangProc.algorithms.intraprocessors import add_EAT_hook, DiffPrunBERT

In [None]:
SENTENCES = [
    'he is a good father',
    'the actor gave a staggering performance',
    'she tries very hard'
]
LABELS = [0, 1, 0]
BATCH = {'sentence': SENTENCES, 'label': LABELS}
PAIRS = {'he': 'she', 'actor': 'actress', 'father': 'mother'}

def test_cda_bidirectional():
    result = CDA(batch = BATCH, pairs = PAIRS, bidirectional = True)
    assert isinstance(result, dict), f"Wrong type: expected {dict}, got {type(result)}"
    assert len(result['sentence']) == 5, f"Expected 5 sentences, got {len(result['sentence'])}"
    assert len(result['label']) == 5, f"Expected 5 labels, got {len(result['label'])}"

def test_cda_no_bidirectional():
    result = CDA(batch = BATCH, pairs = PAIRS, bidirectional = False)
    assert isinstance(result, dict), f"Wrong type: expected {dict}, got {type(result)}"
    assert len(result['sentence']) == 3, f"Expected 3 sentences, got {len(result['sentence'])}"
    assert len(result['label']) == 3, f"Expected 3 labels, got {len(result['label'])}"

In [None]:
class BLINDBERTTrainer(BLINDTrainer):
    def _get_embedding(self, inputs):
        return self.model.bert(
            input_ids = inputs.get("input_ids"), attention_mask = inputs.get("attention_mask"), token_type_ids = inputs.get("token_type_ids")
            ).last_hidden_state[:,0,:]

def test_blind_model_output_shape():
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2)

    sample = tokenizer("A test sentence.", return_tensors="pt")
    sample["labels"] = torch.tensor([1])

    # Dummy blind classifier
    blind_classifier = torch.nn.Sequential(
        torch.nn.Linear(768, 768),
        torch.nn.ReLU(),
        torch.nn.Linear(768, 2)
    )

    # Dummy training args
    training_args = TrainingArguments(
        output_dir="/tmp/test_blind",
        per_device_train_batch_size=1,
        per_device_eval_batch_size=1,
        num_train_epochs=1,
        logging_steps=1,
        no_cuda=True
    )

    # Instantiate trainer
    trainer = BLINDBERTTrainer(
        model=model,
        blind_model=blind_classifier,
        args=training_args,
        train_dataset=[sample],
        eval_dataset=[sample],
    )

    # Forward pass
    embedding = trainer._get_embedding(sample)
    logits_blind = trainer.blind_model(embedding)

    assert logits_blind.shape == (1, 2), f"Expected blind logits of shape (1, 2), got {logits_blind.shape}"

In [None]:
LAYERS = ["attention.self", "attention.output"]

def test_unfreezing():
    base_model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)
    selective_unfreezing(base_model, LAYERS)
    for name, param in base_model.named_parameters():
        if any(layer_key in name for layer_key in LAYERS):
            assert param.requires_grad, f"Expected param '{name}' to be trainable"
        else:
            assert not param.requires_grad, f"Expected param '{name}' to be frozen"

In [None]:
def test_hook():
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)
    pre_hook_counts = [
        len(layer.attention.self._forward_hooks)
        for layer in model.base_model.encoder.layer
    ]

    add_EAT_hook(model, beta=1.2)
    post_hook_counts = [
        len(layer.attention.self._forward_hooks)
        for layer in model.base_model.encoder.layer
    ]

    for pre, post in zip(pre_hook_counts, post_hook_counts):
        assert post == pre + 1, f"Expected 1 new hook, but got {post - pre}"

In [None]:
MODEL_NAME = "bert-base-uncased"
NUM_LABELS = 2

TEST_CASES_OUTPUT = [
    "emb",
    "ear",
    "selective",
    "adele",
    "diff",
    "eat"
]

PAIRS = [('actor', 'actress'), ('son', 'daughter'), ('father', 'mother'), ('he', 'she')]


class SentDebiasBert(SentDebiasForSequenceClassification):        
    def _get_embedding(self, input_ids, attention_mask = None, token_type_ids = None):
        return self.model.bert(
            input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids
            ).last_hidden_state[:,0,:]

def get_model(base_model, tokenizer, debias):
    
    if debias == "emb":
        model = SentDebiasBert(
            model = base_model,
            config = None,
            tokenizer = tokenizer,
            word_pairs = PAIRS,
            n_components = 1,
            n_labels = NUM_LABELS
        )
    
    elif debias == "ear":
        model = EARModel(
            model = base_model,
            ear_reg_strength = 0.01
        )

    elif debias == "selective":
        model = base_model
        selective_unfreezing(model, LAYERS)

    elif debias == "adele":
        DebiasAdapter = DebiasAdapter(model = base_model)
        model = DebiasAdapter.get_model()

    elif debias == "diff":
        tokens_male = [words[0] for words in PAIRS]
        tokens_female = [words[1] for words in PAIRS]
        inputs_male = tokenizer(tokens_male, padding = True, return_tensors = "pt")
        inputs_female = tokenizer(tokens_female, padding = True, return_tensors = "pt")
        model = DiffPrunBERT(
            head = base_model.classifier,
            encoder = base_model.bert,
            loss_fn = torch.nn.CrossEntropyLoss(),
            input_ids_A = inputs_male,
            input_ids_B = inputs_female,
            bias_kernel = None,
            upper = 10,
            lower = -0.001,
            lambda_bias = 0.5,
            lambda_sparse = 0.00001
        )

    elif debias == "eat":
        model = base_model
        add_EAT_hook(model, beta=0.7)
    
    return model


@pytest.mark.parametrize("debias", TEST_CASES_OUTPUT)
def test_sequence_classification_output_shape(debias):
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    base_model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=NUM_LABELS)
    model = get_model(base_model, tokenizer, debias)

    inputs = tokenizer("This is a test sentence.", return_tensors="pt")

    with torch.no_grad():
        outputs = model(**inputs)

    assert outputs.logits.shape == (1, NUM_LABELS), \
        f"Expected logits shape (1, {NUM_LABELS}), but got {outputs.logits.shape}"