### DistilBERT GN-GloVe Cosine Similarity

In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn.functional as F
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
import time
import string
import codecs
from tqdm import tqdm


In [2]:
# Set the TOKENIZERS_PARALLELISM environment variable to disable parallelism
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Set a random seed and device
torch.backends.cudnn.deterministic = True
RANDOM_SEED = 367
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda:3' if torch.cuda.is_available() else 'cpu')

# Define constants
NUM_EPOCHS = 5
GN_GLOVE_PATH = '/Users/jaehyunkim/Desktop/gender_bias_lipstick-master/data/embeddings/gn_glove'
MAX_SEQ_LENGTH = 128  # Adjust as needed

# Define a dictionary to store vocab, wv, and w2i for different spaces
vocab = {}
wv = {}
w2i = {}

# Load gender-specific words from two files: male-specific and female-specific
male_specific = []
female_specific = []

# Load male-specific terms
with open('/Users/jaehyunkim/Desktop/gender_bias_lipstick-master/data/lists/male_word_file.txt', 'r') as file:
    for line in file:
        male_specific.append(line.strip())

# Load female-specific terms
with open('/Users/jaehyunkim/Desktop/gender_bias_lipstick-master/data/lists/female_word_file.txt', 'r') as file:
    for line in file:
        female_specific.append(line.strip())

# Combine male-specific and female-specific terms into one list
gender_specific = male_specific + female_specific

# Load GN-GloVe embeddings
def load_gn_glove_embeddings(filename):
    with open(filename + '.vocab', 'r', encoding='utf-8') as f_embed:
        vocab = [line.strip() for line in f_embed]
        
    w2i = {w: i for i, w in enumerate(vocab)}
    wv = np.load(filename + '.wv.npy')

    return vocab, wv, w2i

vocab_gn_glove, wv_gn_glove, w2i_gn_glove = load_gn_glove_embeddings(GN_GLOVE_PATH)


In [16]:
# Specify the path to your local CSV file and the encoding
data_file_path = "/Users/jaehyunkim/Downloads/BUG-main/data/full_BUG.csv"
encoding = 'utf-8'  # Change to the appropriate encoding if necessary
df = pd.read_csv(data_file_path, encoding=encoding)

# Map "stereotype" column values to 0, 1, -1
df['stereotype'] = df['stereotype'].map({0: 0, 1: 1, -1: 2})

# Split the dataset into train, validation, and test sets
train_df, valid_df, test_df = np.split(df.sample(frac=1, random_state=RANDOM_SEED), [int(.7*len(df)), int(.8*len(df))])

# Extract text data and labels for train, validation, and test sets
train_texts = train_df['sentence_text'].values
valid_texts = valid_df['sentence_text'].values
test_texts = test_df['sentence_text'].values

# Extract labels for train, validation, and test sets
train_labels = train_df['stereotype'].values
valid_labels = valid_df['stereotype'].values
test_labels = test_df['stereotype'].values

# Initialize the DistilBERT tokenizer
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

# Tokenize the text data for train, valid, and test sets
train_encodings = tokenizer(list(train_texts), truncation=True, padding=True)
valid_encodings = tokenizer(list(valid_texts), truncation=True, padding=True)
test_encodings = tokenizer(list(test_texts), truncation=True, padding=True)

In [17]:
# Create datasets and data loaders
class GenderDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = GenderDataset(train_encodings, train_labels)
valid_dataset = GenderDataset(valid_encodings, valid_labels)
test_dataset = GenderDataset(test_encodings, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Initialize DistilBERT model
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=3)
model.to(DEVICE)
model.train()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForSequenceClassification: ['vocab_transform.bias', 'vocab_layer_norm.weight', 'vocab_projector.weight', 'vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_transform.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['pre_classifier.weight', 'pre_classifier.bias', 'classi

In [18]:
# Define the modified cosine similarity function using PyTorch
def cosine_similarity(embeddings):
    embeddings_norm = torch.nn.functional.normalize(embeddings, dim=-1)
    similarity = torch.matmul(embeddings_norm, embeddings_norm.transpose(0, 1))
    return similarity

# Define the modified compute_cosine_similarity_and_bias function
def compute_cosine_similarity_and_bias(embeddings, gender_specific):
    # Ensure the dimensions match by trimming or padding GN-GloVe embeddings
    max_len = max(len(embeddings), len(gender_specific))
    
    if len(embeddings) < max_len:
        # Pad GN-GloVe embeddings with zeros
        embeddings = np.pad(embeddings, ((0, max_len - len(embeddings)), (0, 0)), 'constant')
    elif len(embeddings) > max_len:
        # Trim GN-GloVe embeddings to match the max length
        embeddings = embeddings[:max_len, :]
    
    # Convert embeddings to PyTorch tensors
    embeddings = torch.tensor(embeddings, dtype=torch.float32)
    
    # Compute cosine similarity matrix for DistilBERT embeddings
    distilbert_cosine_sim_matrix = cosine_similarity(embeddings)
    
    # Initialize an empty list to store the results
    gender_bias_results = []
    
    # Compute gender bias scores for each word in the input
    for word in gender_specific:
        # Convert word to lowercase for consistency with DistilBERT tokens
        word = word.lower()
        
        # Find the token ID for the word in DistilBERT's vocabulary
        token_id = tokenizer.convert_tokens_to_ids(word)
        
        # Calculate cosine similarity between the word's embedding and all other embeddings
        vec = distilbert_cosine_sim_matrix[token_id, :]
        
        # Sort cosine similarities in descending order
        sorted_similarities = sorted(enumerate(vec), key=lambda x: x[1], reverse=True)
        
        # Calculate the number of masculine and feminine neighbors
        m = 0
        f = 0
        for i, sim in sorted_similarities[1:]:  # Exclude the word itself (similarity = 1.0)
            neighbor_word = tokenizer.decode([i])
            if neighbor_word.lower() == neighbor_word:
                if neighbor_word in gender_specific:
                    if neighbor_word == 'he' or neighbor_word == 'him':
                        m += 1
                    elif neighbor_word == 'she' or neighbor_word == 'her':
                        f += 1
                
        gender_bias_results.append((word, m, f))
    
    return gender_bias_results

In [19]:
# Modify the train function to use cross-entropy loss for multi-class classification
def train_multiclass(model, data_loader, optimizer, device):
    model.train()
    total_loss = 0.0

    for batch in data_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

        # Calculate the loss as the cross-entropy loss for multiclass classification
        loss = F.cross_entropy(logits, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(data_loader)


# In the training loop, use cross-entropy loss
start_time = time.time()

In [20]:
def compute_multiclass_accuracy(model, data_loader, device):
    model.eval()
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            predicted_labels = torch.argmax(logits, dim=1)

            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)

    accuracy = correct_predictions / total_predictions * 100.0
    return accuracy

# Function to load embeddings with restrictions
def load_wo_normalize(space, filename, vocab, wv, w2i, exclude_words=None):
    print ('loading ...')
    with codecs.open(filename + '.vocab', 'r', 'utf-8') as f_embed:
        vocab_muse = [line.strip() for line in f_embed]
    
    w2i_muse = {w: i for i, w in enumerate(vocab_muse)}
    wv_muse = np.load(filename + '.wv.npy')
    
    if exclude_words:
        filtered_vocab_muse = []
        filtered_wv_muse = []
        filtered_w2i_muse = {}
        
        for word, i in w2i_muse.items():
            if word.lower() not in exclude_words:  # Exclude specific terms
                filtered_vocab_muse.append(word)
                filtered_wv_muse.append(wv_muse[i])
                filtered_w2i_muse[word] = len(filtered_vocab_muse) - 1
        
        vocab[space] = filtered_vocab_muse
        wv[space] = np.array(filtered_wv_muse)
        w2i[space] = filtered_w2i_muse
    else:
        vocab[space] = vocab_muse
        wv[space] = wv_muse
        w2i[space] = w2i_muse

    print ('done')
    
# Define a function to check if a string contains a digit
def has_digit(s):
    return any(char.isdigit() for char in s)

def has_punct(s):
    return any(char in string.punctuation for char in s)

# Function to limit vocabulary with restrictions
def limit_vocab(space, exclude=None, vec_len=300, exclude_words=None):
    vocab_limited = []
    
    for w in tqdm(vocab[space][:50000]):
        if w.lower() != w:
            continue
        if len(w) >= 20:
            continue
        if has_digit(w):
            continue
        if '_' in w:
            p = [has_punct(subw) for subw in w.split('_')]
            if not any(p):
                vocab_limited.append(w)
            continue
        if has_punct(w):
            continue
        
        # Apply restrictions on excluded words
        if exclude_words and w in exclude_words:
            continue
        
        vocab_limited.append(w)

    if exclude:
        vocab_limited = list(set(vocab_limited) - set(exclude))

    print("size of vocabulary:", len(vocab_limited))

    wv_limited = np.zeros((len(vocab_limited), vec_len))
    for i, w in enumerate(vocab_limited):
        wv_limited[i, :] = wv[space][w2i[space][w], :]

    w2i_limited = {w: i for i, w in enumerate(vocab_limited)}

    return vocab_limited, wv_limited, w2i_limited

In [21]:
# Create spaces of limited vocabulary
exclude_words = gender_specific

# Modify the load_wo_normalize call to include the exclude_words parameter
load_wo_normalize('aft', '/Users/jaehyunkim/Desktop/gender_bias_lipstick-master/data/embeddings/gn_glove', vocab, wv, w2i)

# Modify the limit_vocab call to include the exclude_words parameter
vocab['limit_aft'], wv['limit_aft'], w2i['limit_aft'] = limit_vocab('aft', exclude=exclude_words, vec_len=300)

# Check if the vocabularies match
assert(vocab['limit_aft'])

loading ...
done


100%|██████████| 50000/50000 [00:00<00:00, 1086629.77it/s]

size of vocabulary: 47694





In [22]:
# Compute cosine similarity and gender bias for embeddings
gender_bias_results_after = compute_cosine_similarity_and_bias(wv['limit_aft'], gender_specific)

# Training loop with GenderDataset and data loaders
# In the training loop, use cross-entropy loss
start_time = time.time()

for epoch in range(NUM_EPOCHS):
    train_loss = train_multiclass(model, train_loader, optimizer, DEVICE)
    valid_accuracy = compute_multiclass_accuracy(model, valid_loader, DEVICE)

    print(f'Epoch: {epoch + 1}/{NUM_EPOCHS} | '
          f'Train Loss: {train_loss:.4f} | '
          f'Valid Accuracy: {valid_accuracy:.2f}%')

    # Calculate and print test accuracy
    test_accuracy = compute_multiclass_accuracy(model, test_loader, DEVICE)
    print(f'Test Accuracy: {test_accuracy:.2f}%')

print(f'Total Training Time: {(time.time() - start_time) / 60:.2f} min')


Epoch: 1/5 | Train Loss: 0.0927 | Valid Accuracy: 98.35%
Test Accuracy: 98.34%
Epoch: 2/5 | Train Loss: 0.0418 | Valid Accuracy: 98.00%
Test Accuracy: 98.18%
Epoch: 3/5 | Train Loss: 0.0336 | Valid Accuracy: 98.55%
Test Accuracy: 98.18%
Epoch: 4/5 | Train Loss: 0.0281 | Valid Accuracy: 98.50%
Test Accuracy: 98.16%
Epoch: 5/5 | Train Loss: 0.0235 | Valid Accuracy: 98.38%
Test Accuracy: 98.13%
Total Training Time: 2075.11 min
