# Validation of the gender representation bias quantification method

Validate the LLM-based gender representation bias quantification method on an annotated dataset.

Using OpenAI API.

In [None]:
import os
import re
import pandas as pd
import warnings
from datetime import datetime
from tqdm.notebook import tqdm
from openai import OpenAI

In [None]:
# Parameters
model = "gpt-4o-2024-11-20"
lang = "es"
#lang = "va"
analysis_file_path = "../../results/validation/" # output analysis path
analysis_file_id = f"{model}.{lang}.01" # output analysis file identifier
gt_pathname = f"../../data/validation/gt-{lang}.txt" # ground truth
prompt_pathname = f"../../data/dataset-analysis/prompt-{lang}.txt" # prompt pattern
examples_pathname = f"../../data/dataset-analysis/examples-{lang}.txt" # few-shot examples
skiplist_pathname = f"../../data/dataset-analysis/skiplist-{lang}.txt" # skip list (words to be ignored)
num_sentences = 100 # number of sentences to be analyzed

In [None]:
# Function to load various text files
def load_text_file(filepath, convert_to_upper=False):
    """Load text from file."""
    if filepath is None:
        return []
    with open(filepath, 'r', encoding='utf-8') as file:
        if convert_to_upper: # skiplist
            return {line.strip().upper() for line in file if line.strip()}
        else: # other uses
            return file.read()

In [None]:
def load_ground_truth(filepath, skip_words):
    """
    Load and process the ground truth so that:
      - The sentence is preserved.
      - The answer is uppercased, split by delimiters,
        grouped into lists of three (matching the expected format),
        and filtered to remove any groups whose first word is in skip_words.
    
    Returns a DataFrame with columns:
      'Sentence' and 'Answer'
    """
    data = []
    with open(filepath, 'r', encoding='utf-8') as file:
        content = file.read().strip()
        blocks = content.split('\n\n')
        for block in blocks:
            sentence, words = block.strip().split(' "')
            sentence = sentence.strip('"')
            # Process words: uppercase and split by delimiters
            words = words.strip('"').upper()
            words_list = re.split(r" - |, |\n", words)
            grouped = [words_list[i:i+3] for i in range(0, len(words_list), 3)]
            # Filter out any group whose first element is in the skip list
            filtered_grouped = [group for group in grouped if group and group[0] not in skip_words]
            data.append((sentence, filtered_grouped))
    return pd.DataFrame(data, columns=['Sentence', 'Answer'])

In [None]:
def parse_response(response_text, skip_words):
    """
    Clean and parse the model's response text into a list of word groups.
    The splitting pattern is the same as used for the ground truth.
    Groups whose first element appears in skip_words are filtered out.
    """
    cleaned = re.sub(r'\s+\n', '\n', response_text.strip().upper())
    words = re.split(r" - |, |\n", cleaned)
    groups = [words[i:i+3] for i in range(0, len(words), 3)]
    return [group for group in groups if group and group[0] not in skip_words]

In [None]:
def analyze_response(model_words, gt_words):
    """
    Compare the model's response (as a list of word groups) against the ground truth.
    
    Returns:
      identified: correctly identified words (with both attributes correct)
      not_identified: words in GT that the model missed
      wrongly_analyzed: words where the noun is present but one of the attributes is wrong
      wrongly_identified: extra words that do not appear in GT
    """
    identified = []
    not_identified = []
    wrongly_analyzed = []
    # Make a mutable copy for extra words from the model
    wrongly_identified = model_words.copy()
    
    for item in gt_words:
        # If an exact match is found, mark as identified
        if item in wrongly_identified:
            identified.append(item)
            wrongly_identified.remove(item)
        # If no word with the same noun (first element) is found, it was not identified
        elif item[0] not in [w[0] for w in wrongly_identified]:
            not_identified.append(item)
        # Otherwise, if the noun is present but details differ, mark as wrongly analyzed
        elif item[0] in [w[0] for w in wrongly_identified]:
            wrongly_analyzed.append(item)
            # Remove the first occurrence that matches on the noun
            for w in wrongly_identified:
                if item[0] == w[0]:
                    wrongly_identified.remove(w)
                    break
                    
    return identified, not_identified, wrongly_analyzed, wrongly_identified

In [None]:
# Load processed ground truth and the examples text
prompt_pattern = load_text_file(prompt_pathname)
examples = load_text_file(examples_pathname)
skip_words = load_text_file(skiplist_pathname, convert_to_upper=True)
df_gt = load_ground_truth(gt_pathname, skip_words)

In [None]:
# Process sentences
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
results = []
for i in tqdm(range(num_sentences), desc="Processing sentences"):
    sentence = df_gt.loc[i, 'Sentence']

    # Construct the prompt
    prompt = prompt_pattern.replace("<EXAMPLES>", examples).replace("<SENTENCE>", sentence)

    # Call the OpenAI API with the provided prompt
    response_obj = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ]
    )
    response_text = response_obj.choices[0].message.content
    model_words = parse_response(response_text, skip_words)
    
    # Compare the model response with the processed ground truth
    gt_words = df_gt.loc[i, 'Answer']
    identified, not_identified, wrongly_analyzed, wrongly_identified = analyze_response(model_words, gt_words)
    
    # Append the evaluation result for this sentence
    results.append({
        'ID': i + 1,
        'Sentence': sentence,
        'Ground Truth': gt_words,
        'Response': model_words,
        'Num Identified': len(identified),
        'Num Not Identified': len(not_identified),
        'Num Incorrectly Analyzed': len(wrongly_analyzed),
        'Num Incorrectly Identified': len(wrongly_identified),
        'Identified': identified,
        'Not Identified': not_identified,
        'Incorrectly Analyzed': wrongly_analyzed,
        'Incorrectly Identified': wrongly_identified
    })

# Create a DataFrame from the results and write to the Excel file
df_results = pd.DataFrame(results)
output_file = os.path.join(analysis_file_path, f'validation-{analysis_file_id}.xlsx')

# Check if the output file exists
if os.path.exists(output_file):
    # Add a timestamp to the filename
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    new_output_file = f"{os.path.splitext(output_file)[0]}-{timestamp}{os.path.splitext(output_file)[1]}"
    warnings.warn(f"Output file already exists, renaming with timestamp.")
    output_file = new_output_file

# Save output to Excel and append analysis cells
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
    df_results.to_excel(writer, index=False, sheet_name='Sheet1')
    worksheet = writer.sheets['Sheet1']
    
    # Calculate row indices
    last_data_row = num_sentences + 1
    total_row = last_data_row + 1
    analysis_row_start = total_row + 3

    # Sums for each column
    worksheet.cell(row=total_row, column=1, value="Total")
    worksheet.cell(row=total_row, column=5, value=f"=SUM(E2:E{last_data_row})")
    worksheet.cell(row=total_row, column=6, value=f"=SUM(F2:F{last_data_row})")
    worksheet.cell(row=total_row, column=7, value=f"=SUM(G2:G{last_data_row})")
    worksheet.cell(row=total_row, column=8, value=f"=SUM(H2:H{last_data_row})")
    
    # Accuracy %, Precision, Recall, and F1 Score
    # Captions
    worksheet.cell(row=analysis_row_start, column=1, value="Accuracy %")
    worksheet.cell(row=analysis_row_start + 1, column=1, value="Precision")
    worksheet.cell(row=analysis_row_start + 2, column=1, value="Recall")
    worksheet.cell(row=analysis_row_start + 3, column=1, value="F1 Score")    
    # Formulas
    worksheet.cell(row=analysis_row_start, column=2,
                   value=f"=E{total_row}/(E{total_row}+F{total_row}+G{total_row})*100")
    worksheet.cell(row=analysis_row_start + 1, column=2,
                   value=f"=E{total_row}/(G{total_row}+H{total_row}+E{total_row})")
    worksheet.cell(row=analysis_row_start + 2, column=2,
                   value=f"=E{total_row}/(E{total_row}+F{total_row})")
    worksheet.cell(row=analysis_row_start + 3, column=2,
                   value=f"=2*B{analysis_row_start + 1}*B{analysis_row_start + 2}/(B{analysis_row_start + 1}+B{analysis_row_start + 2})")