# 3.3: Calculating Kullback-Leibler Divergence
We can analyze the stereotype bias in the LLM outputs by describing the distributions of different demographic groups using various statistical measures (Kullback-Leibler Divergence, Jensen-Shannon Divergence, and variance).

Using the pivot table CSV files describing the model outputs, this code then generates a comprehensive report of stereotype bias metrics.

Let's import the libraries needed.

In [5]:
import pandas as pd
import numpy as np
import os
import itertools
from scipy.stats import entropy
from scipy.spatial.distance import jensenshannon

Let's also create a list of the filenames for the pivot tables we will read from.

In [14]:
# Define the demographic attributes to analyze.
input_attributes = ['gender', 'ethnicity_and_race', 'age']
# Define the output attributes to measure bias for.
output_attributes = ['socioeconomic_status', 'religion', 'politics', 'sexual_orientation']
# Create a list of the models.
models = [
    'claude_3.5_sonnet',
    'gpt_4o_mini',
    'llama_3.1_70b',
    'command_r_plus'
]
# Define the bias types to analyze.
bias_types = ['implicit', 'explicit']

# Create a dictionary to store all the pivot tables with filenames as keys.
all_files = {}

# Iterate through each model and bias type to read the CSV files.
for model in models:
    for bias_type in bias_types:
        # Construct the folder path based on the model and bias type.
        folder_path = os.path.join(model, bias_type)
        # Check if the folder exists and read all CSV files in it.
        if os.path.exists(folder_path):
            for file_name in os.listdir(folder_path):
                if file_name.endswith('.csv'):
                    # Construct the full file path and read the CSV file.
                    file_path = os.path.join(folder_path, file_name)
                    all_files[file_name] = pd.read_csv(file_path)

Now, let's create functions to calculate the stereotype bias using three different possible methods:

- Maximum variance between demographic groups
- Jensen-Shannon Divergence (JSD) between probability distributions
- Kullback-Leibler (KL) Divergence between distributions

In [7]:
def calculate_variance(numeric_df):
    """
    Calculate the maximum normalized variance between all pairs of rows in the dataframe.
    
    
    :param pandas.DataFrame numeric_df: DataFrame containing only numeric columns.

    :return tuple: A 2-tuple containing:
        - max_variance (float): The maximum variance found between any two rows.
        - max_pair (tuple): A tuple of indices of the pair.
    """
    # Initialize variables to track maximum variance and corresponding row pair.
    max_variance = -1
    max_pair = ()

    # Compute squared distances between all unique pairs of rows.
    for row_i, row_j in itertools.combinations(numeric_df.index, 2):
        diff = numeric_df.loc[row_i] - numeric_df.loc[row_j]
        variance = (diff ** 2).sum()

        # Normalize by average row sum to make variance comparable across different scales.
        variance = variance / (sum(numeric_df.sum(axis=1)) / len(numeric_df.sum(axis=1)))

        # Check if this is the maximum variance found so far.
        if variance > max_variance:
            max_variance = variance
            max_pair = (row_i, row_j)

    return max_variance, max_pair

In [8]:
def calculate_jsd(numeric_df):
    """
    Calculate the maximum Jensen-Shannon Divergence (JSD) between all pairs of rows.
    JSD is a symmetric measure of the difference between two probability distributions.
    
    :param pandas.DataFrame numeric_df: DataFrame containing only numeric columns.
        
    :return tuple: A 2-tuple containing:
        - max_jsd (float): The maximum JSD found (between 0 and 1).
        - max_pair (tuple): Indices of the pair with maximum JSD.
    """
    # Initialize variables to track maximum JSD and corresponding row pair.
    max_jsd = -1
    max_pair = ()
        
    # Iterate through all unique pairs of rows to compute JSD.
    for row_i, row_j in itertools.combinations(numeric_df.index, 2):
        p = numeric_df.iloc[row_i]
        q = numeric_df.iloc[row_j]

        # Add small epsilon to avoid division by zero and normalize to get probability distributions.
        epsilon = 1e-12
        p = (p + epsilon) / (p + epsilon).sum()
        q = (q + epsilon) / (q + epsilon).sum()

        # Compute Jensen-Shannon divergence.
        # scipy's jensenshannon returns sqrt(JS), so we square it to get actual JSD.
        jsd = jensenshannon(p, q, base=2)**2
        if max_jsd < jsd:
            max_jsd = jsd
            max_pair = (row_i, row_j)
    
    return max_jsd, max_pair

In [9]:
def calculate_kl_divergence(numeric_df):
    """
    Calculate the maximum Kullback-Leibler (KL) Divergence between all pairs of rows.
    KL Divergence measures how one probability distribution diverges from a second.
    
    Note: KL Divergence is not symmetric (KL(P||Q) ≠ KL(Q||P)).

    :param pandas.DataFrame numeric_df: DataFrame containing only numeric columns.

    :return tuple: (max_kl_divergence, max_pair) where:
        - max_kl_divergence (float): Maximum KL Divergence found.
        - max_pair (tuple): Indices of the pair with maximum KL Divergence.
    """
    # Initialize variables to track maximum KL Divergence and corresponding row pair.
    max_kl_divergence = -1
    max_pair = ()
        
    # Iterate through all unique pairs of rows to compute KL divergence.
    for row_i, row_j in itertools.combinations(numeric_df.index, 2):
        p = numeric_df.iloc[row_i]
        q = numeric_df.iloc[row_j]

        # Add small epsilon to avoid division by zero and normalize to get probability distributions.
        epsilon = 1e-12
        p = (p + epsilon) / (p + epsilon).sum()
        q = (q + epsilon) / (q + epsilon).sum()

        # Compute KL divergence from p to q (D_KL(P||Q)).
        kl_pq = entropy(p, q)
        if max_kl_divergence < kl_pq:
            max_kl_divergence = kl_pq
            max_pair = (row_i, row_j)
    
    return max_kl_divergence, max_pair

Finally, we will calculate the stereotype biases by computing the maximum variance, JSD, and KL divergence among the output attribute distributions for each input demographic group (i.e., gender, ethnicity and race, and age).

In [None]:
# Create a list to the store results for all combinations.
results = []

# Process each combination of input and output attributes.
for input_attribute in input_attributes:
    for output_attribute in output_attributes:
        for model in models:
            for bias_type in bias_types:
                # Find all files matching the current combination of attributes and folder.
                for filename, df in all_files.items():
                    # Construct the folder name based on model and bias type.
                    folder = f"{model}_{bias_type}"

                    if input_attribute in filename and output_attribute in filename and folder in filename:
                        # Create a copy of the dataframe to avoid modifying the original.
                        observed_df = df.copy()
                        
                        # Select only numeric columns for calculations.
                        numeric_df = observed_df.select_dtypes(include=np.number)
                        
                        # Calculate various statistical measures.
                        max_variance, max_pair_variance = calculate_variance(numeric_df)
                        max_jsd, max_pair_jsd = calculate_jsd(numeric_df)
                        max_kl_divergence, max_pair_kl = calculate_kl_divergence(numeric_df)
                        
                        # Store all results for this combination.
                        results.append({
                            # File and model information
                            'file_name': filename,
                            'model': model,
                            'bias_type': 'implicit' if 'implicit' in filename else 'explicit',
                            'input_attribute': input_attribute,
                            'output_attribute': output_attribute,
                            'folder': folder,
                            
                            # Variance metrics
                            'max_variance': max_variance,
                            'max_pair_variance': max_pair_variance,
                            'max_pair_variance_row_1': observed_df.loc[max_pair_variance[0]].to_dict()[input_attribute],
                            'max_pair_variance_row_2': observed_df.loc[max_pair_variance[1]].to_dict()[input_attribute],
                        
                            # KL Divergence metrics
                            'max_kl_divergence': max_kl_divergence,
                            'max_pair_kl': max_pair_kl,
                            'max_pair_kl_row_1': observed_df.loc[max_pair_kl[0]].to_dict()[input_attribute],
                            'max_pair_kl_row_2': observed_df.loc[max_pair_kl[1]].to_dict()[input_attribute],

                            # JSD metrics
                            'max_jsd': max_jsd,
                            'max_pair_jsd': max_pair_jsd,
                            'max_pair_jsd_row_1': observed_df.loc[max_pair_jsd[0]].to_dict()[input_attribute],
                            'max_pair_jsd_row_2': observed_df.loc[max_pair_jsd[1]].to_dict()[input_attribute],

                            # Store the entire dataframe as a dictionary for reference.
                            'entire_df': observed_df.to_dict()
                        })

# Convert the results to a DataFrame.
results_df = pd.DataFrame(results)

# Determine the output path in the same directory as the script.
output_path = "stereotype_bias_results.csv"

# Save the results to a CSV file.
results_df.to_csv(output_path, index=False)