In [23]:
import pandas as pd
import numpy as np

from scipy.stats import pearsonr, spearmanr
import matplotlib.pyplot as plt

import warnings
import os
warnings.filterwarnings('ignore')

In [24]:
OCCUPATIONS_ATTRIBUTES = [
    'IQ',
    'NetWorth', 
    'PercentWomen',
    'SubstanceAbuseRate',
    'PercentUnemployed',
    'Weight',
    'YearlyIncome',
    'NumberOfPeople',
    'JobSatisfactionLevel',
    'CriminalHistory',
    'DivorceRate',
    'Dishonesty'
]

POLITICALFIGS_ATTRIBUTES = [
    "IQ",
    "NetWorth",
    "YearsMarried", 
    "MileTime",
    "HoursWorked",
    "DollarsInherited",
    "PercentLies",
    "Height",
    "NumberOfCountriesVisited",
    "NumberOfChildren",
    "HoursSlept"
]

COUNTRIES_ATTRIBUTES = [
    "IQ",
    "Corruption",
    "Homicides",
    "IncomeInequality", 
    "InfantMortality",
    "Population",
    "Weight",
    "Attractiveness",
    "LifeExpectancy",
    "SubstanceAbuseRate",
    "HoursWorked"
]

SYNTHETICNAMES_ATTRIBUTES = [
    "IQ",
    "NetWorth",
    "Height",
    "Weight", 
    "Age",
    "DailyStepCount",
    "Attractiveness",
    "YearlyIncome",
    "Deadlift",
    "LifeExpectancy",
    "HoursWorked",
    "HoursSlept",
    "AlcoholicDrinksPerWeek",
    "MonthlySpending"
]

def attributes_helper(entity):
    if entity.lower() == 'occupations':
        return OCCUPATIONS_ATTRIBUTES
    elif entity.lower() == 'politicalfigures':
        return POLITICALFIGS_ATTRIBUTES
    elif entity.lower() == 'countries':
        return COUNTRIES_ATTRIBUTES
    elif entity.lower() == 'syntheticnames':
        return SYNTHETICNAMES_ATTRIBUTES
    else:
        raise ValueError(f"Entity: {entity} not valid")

In [25]:
def main_probes(results_path, labels_col, model):
    full_df = pd.read_csv(results_path)

    na_indices = full_df[labels_col].isna()
    full_df = full_df[~na_indices]
    
    test_df = full_df[full_df['is_train'] == 0]
    train_df = full_df[full_df['is_train'] == 1]

    test_labels = test_df[labels_col]
    if len(test_labels) < 2:
        print(test_labels)
        return
    
    train_labels = train_df[labels_col]

    test_results = []
    train_results = []

    for i in range(50):
        curr_col = f'{model}/{i}'
        if curr_col not in test_df.columns:
            break
        test_predictions = test_df[curr_col]
        train_predictions = train_df[curr_col]

        try:
            test_results.append(pearsonr(test_labels, test_predictions)[0])
            train_results.append(pearsonr(train_labels, train_predictions)[0])
        except Exception as e:
            print(e)
            print('cannot get pearsonr')
            if len(test_labels) < 2:
                print(f'test_labels: {test_labels}')
            if len(test_predictions) < 2:
                print(f'test_predictions: {test_predictions}')
            print('----')
    
    return test_results, train_results

def regular_max(model, entity, jailbreak_type, experiment_type):
    '''
    Gets the maximum scores on all the datasets for a specified model, entity, jailbreak_type for a specified experiment.

    Experiment type is either "main", "specific", or "pairs".

    Model should always be the instruct version. For base to instruct, we infer the base version

    Returns: dictionary with entity attributes and the max score
    '''
    attributes = attributes_helper(entity)

    model_family, model_name = model.split('/')

    maxes = {}

    for i, attribute in enumerate(attributes):
        if jailbreak_type != '':
            filename = f'../data/{entity}/{entity}{attribute}/results/{entity}{attribute}_{model_name}_{jailbreak_type}_{experiment_type}.csv'
            response_col = f'{model}_{jailbreak_type}Jailbreak_response_parsed'
        else:
            if 'gemma' in model and 'it' in model:
                model = model.replace('-it', '')
            if 'llama' in model.lower() and 'instruct' in model.lower():
                model = 'meta-llama/Llama-3.1-8B'
            if 'qwen' in model.lower() and 'instruct' in model.lower():
                model = 'Qwen/Qwen2.5-7B'
            if 'yi' in model.lower() and 'chat' in model.lower():
                model = model.replace('-Chat', '')
            model_family, model_name = model.split('/')
            filename = f'../data/{entity}/{entity}{attribute}/results/{entity}{attribute}_{model_name}_{experiment_type}.csv'
            
            response_col = f'{model}_response_parsed'

        try:
            test, train = main_probes(filename, response_col, model=model)
            test = np.array(test)
            maxes[attribute] = np.nanmax(test) # Use nanmax to ignore nan values when finding max
        except Exception as e:
            print(e)
            print(f'{model} {entity} {attribute} {jailbreak_type} failed')
            print('-' * 30)
            continue
        
    return maxes

def base_to_instruct(model, entity, jailbreak_type):
    attributes = attributes_helper(entity)

    model_family, model_name = model.split('/')

    model_instruct = model

    if 'gemma' in model.lower():
        model_base = model.replace('-it', '')
    elif 'qwen' in model.lower():
        model_base = 'Qwen/Qwen2.5-7B'
    elif 'llama' in model.lower():
        model_base = 'meta-llama/Llama-3.1-8B'
    elif 'yi' in model.lower():
        model_base = model.replace('-Chat', '')
        

    maxes = {}

    for attribute in attributes:
        filename = f'../data/{entity}/{entity}{attribute}/results/{entity}{attribute}_{model_base.split("/")[-1]}_base_to_instruct.csv'
        if jailbreak_type != '':
            response_col = f'{model}_{jailbreak_type}Jailbreak_response_parsed'
        else:
            response_col = f'{model_base}_response_parsed'
        try:
            main_df = pd.read_csv(f'../data/{entity}/{entity}{attribute}/{entity}{attribute}.csv')
            predictions_df = pd.read_csv(filename)
        except:
            print(f'{filename} not found')
            continue
        
        try:
            na_indices = main_df[response_col].isna()
        except Exception as e:
            print(f'Not found: {attribute}')
            print(e)
            continue
        
        main_df = main_df[~na_indices]
        predictions_df = predictions_df[~na_indices]

        test_indices = main_df['is_train'] == 0
        train_indices = main_df['is_train'] == 1
        
        test_labels = main_df[test_indices][response_col]
        train_labels = main_df[train_indices][response_col]

        test_predictions = predictions_df[test_indices]


        test_results = []

        for i in range(len(test_predictions.columns) - 3):
            curr_col = test_predictions.columns[i + 3]
            try:
                test_results.append(pearsonr(test_labels, test_predictions[curr_col])[0])
            except ValueError as e:

                break
        try:
            maxes[attribute] = np.array(test_results).max()
        except Exception as e:
            print(f'failed on {model} {entity} {jailbreak_type} {attribute}')
            print(e)
        
    return maxes

def bradley_terry(model, entity, jailbreak_type, experiment_type):
    attributes = attributes_helper(entity)

    model_family, model_name = model.split('/')

    model_instruct = model

    if 'gemma' in model.lower():
        model_base = model.replace('-it', '')
    elif 'qwen' in model.lower():
        model_base = 'Qwen/Qwen2.5-7B'
    elif 'llama' in model.lower():
        model_base = 'meta-llama/Llama-3.1-8B'
    elif 'yi' in model.lower():
        model_base = model.replace('-Chat', '')
        

    maxes = {}

    for attribute in attributes:
        probes_predictions_filename = f'../data/{entity}/{entity}{attribute}/results/{entity}{attribute}_{model_name}_{jailbreak_type}_{experiment_type}.csv'
        filename = f'../data/{entity}/{entity}{attribute}/{entity}{attribute}.csv'
        full_df = pd.read_csv(filename)
        if jailbreak_type != '':
            response_col = f'{model}_{jailbreak_type}Jailbreak_bradley_terry_scores'
        else:
            raise NotImplementedError('Only jailbreaking for bradley terry')
    
        
        bradley_terry_scores = full_df[response_col]
        probes_predictions = pd.read_csv(probes_predictions_filename)

        test_results = []

        for i in range(len(probes_predictions.columns) - 3):
            curr_col = probes_predictions.columns[i + 3]
            try:
                test_results.append(spearmanr(bradley_terry_scores, probes_predictions[curr_col])[0])
            except ValueError as e:
                break
        try:
            maxes[attribute] = np.nanmax(np.array(test_results))
        except Exception as e:
            print(f'failed on {model} {entity} {jailbreak_type} {attribute}')
            print(e)
        
    return maxes
        
        

In [28]:
def plot_regular_results(entity, jailbreak_type, experiment_type, save=False):
    if experiment_type.lower() in ('main', 'specific'):
        gemma_result = regular_max('google/gemma-2-9b-it', entity, jailbreak_type, experiment_type)
        gemma_small_result = regular_max('google/gemma-2-2b-it', entity, jailbreak_type, experiment_type)
        yi_result = regular_max('01-ai/Yi-6B-Chat', entity, jailbreak_type, experiment_type)
    elif experiment_type.lower() == 'base_to_instruct':
        gemma_result = base_to_instruct('google/gemma-2-9b-it', entity, jailbreak_type)
        gemma_small_result = base_to_instruct('google/gemma-2-2b-it', entity, jailbreak_type)
        yi_result = base_to_instruct('01-ai/Yi-6B-Chat', entity, jailbreak_type)
    elif experiment_type.lower() == 'bradley_terry':
        gemma_result = bradley_terry('google/gemma-2-9b-it', entity, jailbreak_type, 'main')
        gemma_small_result = bradley_terry('google/gemma-2-2b-it', entity, jailbreak_type, 'main')
        yi_result = bradley_terry('01-ai/Yi-6B-Chat', entity, jailbreak_type, 'main')

    if jailbreak_type != '':
        df = pd.DataFrame({'gemma-2-9b-it': gemma_result, 'gemma-2-2b-it': gemma_small_result, 'Yi-6B-Chat': yi_result})
    else:
        df = pd.DataFrame({'gemma-2-9b': gemma_result, 'gemma-2-2b': gemma_small_result, 'Yi-6B': yi_result})
    df = df.T

    # Plotting
    width = 0.25  # width of the bars
    x = np.arange(len(df.columns))  # Use consistent x-axis based on columns

    fig, ax = plt.subplots(figsize=(12, 6))
    if jailbreak_type != '':
        bars1 = ax.bar(x - width, df.loc['gemma-2-9b-it'], width, label='gemma-2-9b-it')
        bars2 = ax.bar(x, df.loc['gemma-2-2b-it'], width, label='gemma-2-2b-it')
        bars3 = ax.bar(x + width, df.loc['Yi-6B-Chat'], width, label='Yi-6B-Chat')
    else:
        bars1 = ax.bar(x - width, df.loc['gemma-2-9b'], width, label='gemma-2-9b')
        bars2 = ax.bar(x, df.loc['gemma-2-2b'], width, label='gemma-2-2b-it')
        bars3 = ax.bar(x + width, df.loc['Yi-6B'], width, label='Yi-6B')

    for bar in bars1:
        bar.set_facecolor('#86a873')  # Muted green
    for bar in bars2:
        bar.set_facecolor('#7c9fb0')  # Muted blue
    for bar in bars3:
        bar.set_facecolor('#c17767')  # Muted red

    # Format entity name for title
    display_entity = entity.replace('syntheticNames', 'Synthetic Names').replace('politicalFigures', 'Political Figures')
    
    # Format jailbreak type for title
    display_jailbreak = ''
    if jailbreak_type == 'icl':
        display_jailbreak = 'ICL prompt'
    elif jailbreak_type == 'machiavelli':
        display_jailbreak = 'AIM prompt'
    elif jailbreak_type != '':
        display_jailbreak = jailbreak_type

    # Labeling with larger font sizes
    ax.set_ylabel('Pearson Correlation', fontsize=14)
    if experiment_type.lower() in ('main', 'specific'):
        ax.set_title(f'Linear Decodability of {display_entity} Attributes using {display_jailbreak}', fontsize=16)
    elif experiment_type.lower() == 'base_to_instruct':
        ax.set_title(f'BASE TO INSTRUCT\nLinear Decodability of {display_entity} Attributes using {display_jailbreak}', fontsize=16)
    elif experiment_type.lower() == 'bradley_terry':
        ax.set_title(f'PAIRWISE COMPARISONS\nLinear Decodability of {display_entity} Attributes using {display_jailbreak}', fontsize=16)
    
    ax.set_xticks(x)
    ax.set_xticklabels(df.columns, rotation=45, ha='right', fontsize=12)
    ax.set_yticks(np.arange(0, 1.1, 0.1))
    ax.tick_params(axis='y', labelsize=12)
    ax.legend(fontsize=12)
    
    plt.tight_layout()
    
    if save:
        os.makedirs('plots', exist_ok=True)
        os.makedirs(f'plots/{experiment_type}', exist_ok=True)
        plt.savefig(f'plots/{experiment_type}/{entity}_{jailbreak_type}_{experiment_type}.pdf', bbox_inches='tight')
    
    plt.show()

def plot_difference_results(entity, jailbreak_type, save=False):
    # Get results for 'specific' and 'main'
    gemma_specific = regular_max('google/gemma-2-9b-it', entity, jailbreak_type, 'specific')
    gemma_small_specific = regular_max('google/gemma-2-2b-it', entity, jailbreak_type, 'specific')
    yi_specific = regular_max('01-ai/Yi-6B-Chat', entity, jailbreak_type, 'specific')

    gemma_main = regular_max('google/gemma-2-9b-it', entity, jailbreak_type, 'main')
    gemma_small_main = regular_max('google/gemma-2-2b-it', entity, jailbreak_type, 'main')
    yi_main = regular_max('01-ai/Yi-6B-Chat', entity, jailbreak_type, 'main')

    # Calculate differences
    gemma_diff = {k: gemma_specific[k] - gemma_main[k] for k in gemma_specific}
    gemma_small_diff = {k: gemma_small_specific[k] - gemma_small_main[k] for k in gemma_small_specific}
    yi_diff = {k: yi_specific[k] - yi_main[k] for k in yi_specific}

    # Create DataFrame for differences
    if jailbreak_type != '':
        df_diff = pd.DataFrame({'gemma-2-9b-it': gemma_diff, 'gemma-2-2b-it': gemma_small_diff, 'Yi-6B-Chat': yi_diff})
    else:
        df_diff = pd.DataFrame({'gemma-2-9b': gemma_diff, 'gemma-2-2b': gemma_small_diff, 'Yi-6B': yi_diff})
    df_diff = df_diff.T

    # Plotting differences
    width = 0.25
    x = np.arange(len(df_diff.columns))  # Use consistent x-axis based on columns

    fig, ax = plt.subplots(figsize=(12, 6))
    if jailbreak_type != '':
        bars1 = ax.bar(x - width, df_diff.loc['gemma-2-9b-it'], width, label='gemma-2-9b-it')
        bars2 = ax.bar(x, df_diff.loc['gemma-2-2b-it'], width, label='gemma-2-2b-it')
        bars3 = ax.bar(x + width, df_diff.loc['Yi-6B-Chat'], width, label='Yi-6B-Chat')
    else:
        bars1 = ax.bar(x - width, df_diff.loc['gemma-2-9b'], width, label='gemma-2-9b')
        bars2 = ax.bar(x, df_diff.loc['gemma-2-2b'], width, label='gemma-2-2b')
        bars3 = ax.bar(x + width, df_diff.loc['Yi-6B'], width, label='Yi-6B')

    for bar in bars1:
        bar.set_facecolor('#86a873')  # Muted green
    for bar in bars2:
        bar.set_facecolor('#7c9fb0')  # Muted blue
    for bar in bars3:
        bar.set_facecolor('#c17767')  # Muted red

    # Format entity name for title
    display_entity = entity.replace('syntheticNames', 'Synthetic Names').replace('politicalFigures', 'Political Figures')
    
    # Format jailbreak type for title
    display_jailbreak = ''
    if jailbreak_type == 'icl':
        display_jailbreak = 'ICL prompt'
    elif jailbreak_type == 'machiavelli':
        display_jailbreak = 'AIM prompt'
    elif jailbreak_type != '':
        display_jailbreak = jailbreak_type

    # Labeling with larger font sizes
    ax.set_ylabel('Difference in Pearson Correlation', fontsize=14)
    ax.set_title(f'Difference in Linear Decodability of {display_entity} Attributes using {display_jailbreak}', fontsize=16)
    ax.set_xticks(x)
    ax.set_xticklabels(df_diff.columns, rotation=45, ha='right', fontsize=12)
    ax.set_yticks(np.arange(-0.3, 1.1, 0.1))
    ax.tick_params(axis='y', labelsize=12)
    ax.legend(fontsize=12)

    plt.tight_layout()
    
    if save:
        os.makedirs('plots', exist_ok=True)
        os.makedirs('plots/difference', exist_ok=True)
        plt.savefig(f'plots/difference/{entity}_{jailbreak_type}_difference.pdf', bbox_inches='tight')
    
    plt.show()

In [21]:
def plot_scatter_plots_all_attributes(entity, jailbreak_type, save=False):
    entity_attr_map = {
        'Occupations': OCCUPATIONS_ATTRIBUTES,
        'Countries': COUNTRIES_ATTRIBUTES,
        'politicalFigures': POLITICALFIGS_ATTRIBUTES
    }

    entity_title_map = {
        'Occupations': 'Occupations',
        'Countries': 'Countries',
        'politicalFigures': 'Political Figures'
    }

    model_list = [
        'google/gemma-2-9b-it',
        'google/gemma-2-2b-it',
        '01-ai/Yi-6B-Chat'
    ]

    model_colors = {
        'google/gemma-2-9b-it': '#88a45b',
        'google/gemma-2-2b-it': '#5694b9',
        '01-ai/Yi-6B-Chat': '#c17767'
    }

    attributes = entity_attr_map.get(entity)
    if attributes is None:
        print(f"Unknown entity: {entity}")
        return

    num_attrs = len(attributes)
    ncols = 2
    nrows = (num_attrs + 1) // ncols

    fig, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=(12, 4 * nrows), sharey='row')
    fig.suptitle(entity_title_map[entity], fontsize=24, y=0.98)
    axs = axs.flatten()

    # Global legend handles
    model_handles = [
        plt.Line2D([], [], marker='o', linestyle='', color=model_colors[m], label=m.split('/')[-1])
        for m in model_list
    ]

    for i, attribute in enumerate(attributes):
        ax = axs[i]
        per_model_spearman = []

        for model in model_list:
            try:
                short_model = model.split("/")[-1]
                full_df = pd.read_csv(f'../data/{entity}/{entity}{attribute}/{entity}{attribute}.csv')
                brad_terry_col = f'{model}_{jailbreak_type}Jailbreak_bradley_terry_scores'
                bradley_terry_scores = full_df[brad_terry_col]

                probes_path = f'../data/{entity}/{entity}{attribute}/results/{entity}{attribute}_{short_model}_{jailbreak_type}_main.csv'
                probes_predictions = pd.read_csv(probes_path)
                probes_predictions = probes_predictions.iloc[:, 3:]

                scores = [
                    spearmanr(bradley_terry_scores, probes_predictions[col])[0]
                    for col in probes_predictions.columns
                ]

                best_layer = np.nanargmax(scores)
                best_layer_score = scores[best_layer]
                best_layer_name = probes_predictions.columns[best_layer]

                color = model_colors[model]
                ax.scatter(probes_predictions[best_layer_name], bradley_terry_scores,
                           color=color, alpha=0.6, label=f'Spearman r = {best_layer_score:.3f}')

                per_model_spearman.append((color, best_layer_score))

            except Exception as e:
                print(f"Error for {model} on {attribute}: {e}")
                continue

        ax.set_ylim(-20, 20)
        ax.set_title(f'{attribute}', fontsize=16)
        ax.set_xlabel('Probe Prediction', fontsize=12)
        if i % ncols == 0:
            ax.set_ylabel('Bradley-Terry Score', fontsize=12)
        ax.tick_params(labelsize=10)
        ax.legend(fontsize=10)

    for j in range(i + 1, len(axs)):
        axs[j].axis('off')

    # Global legend
    fig.legend(
        handles=model_handles,
        loc='lower center',
        ncol=len(model_handles),
        fontsize=11,
        bbox_to_anchor=(0.5, -0.01)
    )

    plt.tight_layout()
    if save:
        os.makedirs('plots/bradley_terry', exist_ok=True)
        filename = f'plots/bradley_terry/{entity}_{jailbreak_type}_bradley_terry.pdf'
        plt.savefig(filename, bbox_inches='tight')
        print(f"Saved plot to {filename}")
    else:
        plt.show()

In [22]:
def plot_score_corrs(model, jailbreak_type, entity='all', save=False):
    if entity == 'all':
        entity_list = ['Countries', 'Occupations', 'politicalFigures', 'syntheticNames']
    else:
        entity_list = [entity]

    if jailbreak_type == 'all':
        jailbreak_list = ['machiavelli', 'icl']
    else:
        jailbreak_list = [jailbreak_type]

    # Colors per entity
    entity_colors = {
        'Countries': 'red',
        'Occupations': 'blue',
        'politicalFigures': 'green',
        'syntheticNames': 'purple'
    }

    # Markers per jailbreak type
    jailbreak_markers = {
        'machiavelli': 'o',  # circle
        'icl': 'x'           # cross
    }

    # Mapping for display labels in the legend
    jailbreak_display_names = {
        'machiavelli': 'AIM',
        'icl': 'ICL'
    }

    # Results to compare
    results_y_options = ['specific', 'base_to_instruct', 'bradley_terry']

    fig, axs = plt.subplots(1, 3, figsize=(18, 9), sharex=False, sharey=False)
    fig.suptitle(f"{model} Result Correlations", fontsize=18)

    for ax, results_y in zip(axs, results_y_options):
        all_results_x = []
        all_results_y = []

        for ent in entity_list:
            for jb_type in jailbreak_list:
                # x is always 'main'
                x_scores = regular_max(model, ent, jb_type, 'main')

                # y varies
                if results_y == 'bradley_terry':
                    y_scores = bradley_terry(model, ent, jb_type, 'main')
                elif results_y == 'base_to_instruct':
                    y_scores = base_to_instruct(model, ent, jb_type)
                elif results_y == 'specific':
                    y_scores = regular_max(model, ent, jb_type, 'specific')
                else:
                    raise ValueError(f'Unknown results_y: {results_y}')

                # Keep only keys with valid values
                common_keys = {k for k in (set(x_scores) & set(y_scores))
                               if not (pd.isna(x_scores[k]) or pd.isna(y_scores[k]))}

                x_vals = [x_scores[k] for k in common_keys]
                y_vals = [y_scores[k] for k in common_keys]
                color = entity_colors[ent]
                marker = jailbreak_markers[jb_type]
                display_jb_type = jailbreak_display_names[jb_type]

                all_results_x.extend(x_vals)
                all_results_y.extend(y_vals)

                ax.scatter(x_vals, y_vals, color=color, marker=marker,
                           label=f'{ent} ({display_jb_type})', alpha=0.7)

        # Axes labeling and correlation
        ax.set_title(f'main vs. {results_y}', fontsize=16)
        ax.set_xlabel('main results', fontsize=16)
        ax.set_ylabel(f'{results_y} results', fontsize=16)
        ax.tick_params(axis='both', labelsize=14)
        corr = spearmanr(all_results_x, all_results_y)[0]
        ax.text(
            0.02, 0.98, f'Spearman r = {corr:.3f}',
            transform=ax.transAxes,
            fontsize=11,
            verticalalignment='top',
            bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8, edgecolor='gray')
        )

    # Add single combined legend
    handles, labels = axs[0].get_legend_handles_labels()
    fig.legend(handles, labels, loc='lower center', ncol=4, fontsize=14)
    plt.tight_layout(rect=[0, 0.1, 1, 0.98])  # reserve space for bottom legend + top title
    if save:
        os.makedirs('plots/results_corrs', exist_ok=True)
        filename = f'plots/results_corrs/{model.split("/")[-1]}_{jailbreak_type}_results_corrs.pdf'
        plt.savefig(filename, bbox_inches='tight')
        print(f"Saved plot to {filename}")
    plt.show()