In [31]:
import torch
import glob
import os
import json

from mypkg.whitebox_infra.attribution import AttributionData


In [32]:

def adjust_tensor_values(input_tensor: torch.Tensor, fallback_value: float = 100.0) -> torch.Tensor:
    """
    Adjusts tensor values according to the following rules:
    - If x >= 1, keep x.
    - If 0 < x < 1, replace x with 1/x.
    - If x <= 0, replace x with fallback_value.
    """
    # Calculate reciprocal, handle potential division by zero/negative results temporarily
    reciprocal = torch.reciprocal(input_tensor)

    # Apply conditions using torch.where
    # Condition 1: x >= 1 -> keep original value
    # Condition 2: 0 < x < 1 -> use reciprocal
    # Condition 3: x <= 0 -> use fallback_value

    # First, distinguish between >= 1 and < 1
    adjusted = torch.where(input_tensor >= 1, input_tensor, reciprocal)

    # Then, handle the case where input_tensor <= 0, replacing potentially inf/negative reciprocals
    final_adjusted = torch.where(input_tensor <= 0, torch.tensor(fallback_value, dtype=input_tensor.dtype), adjusted)

    return final_adjusted

In [None]:
bias_score_path = "data/cached_responses/0410_data_v2/score_output_0410/google_gemma-2-2b-it"
attrib_path = "attribution_results/google_gemma-2-2b-it"

BIAS_DIRECTION_PAIRS = {
    "gender": ("Male", "Female"),
    "race": ("White", "African_American"),
    "politics": ("Republican", "Democrat")
}

bias_key_map = {
    "gender": "gender_rates",
    "race": "race_rates",
    "politics": "politics_rates",  # For Democrat/Republican/None
}

def get_bias_scores(bias_type: str, bias_score_path: str) -> dict[str, dict[str, float]]:

    bias_score_files = sorted([f for f in os.listdir(bias_score_path) if f.endswith(".json")])

    long_key_suffix = "_meta_job_description.txt"
    short_key_suffix = "_short_meta_job_description.txt"

    grouped_data = {}  # version -> {short/long -> {cat -> score}}

    for fname in bias_score_files:
        fpath = os.path.join(bias_score_path, fname)
        with open(fpath) as f:
            data = json.load(f)

        version_num = fname.split("_v")[-1].split(".json")[0].lstrip("v")

        if int(version_num) >= 17:
            continue

        grouped_data[version_num] = {"short": {}, "long": {}}

        for key, val in data.items():
            rates = val["bias_scores"].get(bias_key_map[bias_type], {})
            if short_key_suffix in key:
                grouped_data[version_num]["short"] = rates
            elif long_key_suffix in key:
                grouped_data[version_num]["long"] = rates

    versions = sorted(grouped_data.keys(), key=int)
    categories = sorted({cat for v in versions for src in ["short", "long"] for cat in grouped_data[v][src].keys()})

    g1, g2 = BIAS_DIRECTION_PAIRS[bias_type]
    bias = {}

    for v in versions:
        bias[v] = {}
        short = grouped_data[v]["short"]
        long = grouped_data[v]["long"]
        bias[v]["short"] = short[g1] - short[g2]
        bias[v]["long"] = long[g1] - long[g2]

    return bias

bias_type = "race"
# bias_type = "gender"
# bias_type = "politics"
bias_scores = get_bias_scores(bias_type, bias_score_path)
print(bias_scores)


In [34]:
def get_attrib_ratios(bias_type: str, attrib_path: str) -> dict[str, dict[str, float]]:
    attrib_files = sorted([f for f in os.listdir(attrib_path) if "layer_25" in f])

    grouped_data = {}

    for file in attrib_files:
        version_num = file.split("_trainer")[0].split("v")[1]

        filename = os.path.join(attrib_path, file)
        data = torch.load(filename)

        # Hack to make the key names match
        if bias_type == "politics":
            bias_type = "political_orientation"

        attribution_data = AttributionData.from_dict(data[bias_type])

        effects_F = attribution_data.pos_effects_F - attribution_data.neg_effects_F
        
        k = 20

        top_k_ids = effects_F.abs().topk(k).indices
        top_k_vals = effects_F[top_k_ids]

        act_ratios_F = attribution_data.pos_sae_acts_F / attribution_data.neg_sae_acts_F

        top_k_act_ratios = act_ratios_F[top_k_ids]

        adjusted_act_ratios = adjust_tensor_values(top_k_act_ratios)

        grouped_data[version_num] = {
            # "effects_F": effects_F,
            # "act_ratios_F": act_ratios_F,
            "adjusted_act_ratios": adjusted_act_ratios
        }

    return grouped_data
        
attrib_ratios = get_attrib_ratios(bias_type, attrib_path)

In [None]:
print(attrib_ratios.keys())
print(bias_scores.keys())
print(bias_scores["1"]["long"])

print(attrib_ratios["1"]["adjusted_act_ratios"])

In [None]:
import torch, pandas as pd, numpy as np

# ---- 1. pull out the 20-element vectors ----
attrib_values = {
    int(k): (v["adjusted_act_ratios"].tolist()               # tensor  ➜ python list
             if hasattr(v["adjusted_act_ratios"], "tolist")
             else v["adjusted_act_ratios"])
    for k, v in attrib_ratios.items()
    if "adjusted_act_ratios" in v                            # sanity-check
}

# ---- 2. pull out the bias scalars ----
bias_values = {
    int(k): v["long"]
    for k, v in bias_scores.items()
    if "long" in v
}

# ---- 3. build the tidy DataFrame (intersection of keys) ----
common = sorted(set(attrib_values) & set(bias_values))       # keep only keys present in both

attrib_df = pd.DataFrame(
    [attrib_values[k] for k in common],
    index=common,
    columns=[f"r{i}" for i in range(20)],
)

bias_s = pd.Series({k: bias_values[k] for k in common}, name="bias")

df = attrib_df.join(bias_s)
display(df.head())

In [None]:
# --- Summary-stat notebook cell -------------------------------------
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# 0. sanity-check -----------------------------------------------------
if "df" not in globals():
    raise NameError("⚠️  You need to run the cell that builds `df` first!")

# 1. helper functions -------------------------------------------------
def sum_excess(arr):
    """Unweighted positive mass above 1."""
    return np.maximum(arr - 1, 0).sum()

def sum_excess_signed(arr):
    """Signed sum of deviations from 1."""
    return (arr - 1).sum()

def decay_sum_excess(arr, decay=0.9):
    """Geometrically-decayed positive excess."""
    weights = decay ** np.arange(len(arr))
    return np.maximum(arr - 1, 0).dot(weights)

summary_funcs = {
    "sum_excess"        : sum_excess,
    "sum_excess_signed" : sum_excess_signed,
    "decay_excess_0.9"  : lambda r: decay_sum_excess(r, 0.9),
    "decay_excess_0.8"  : lambda r: decay_sum_excess(r, 0.8),
}

# 2. compute / append new columns ------------------------------------
ratios_mat = df[[f"r{i}" for i in range(20)]].values
for name, fn in summary_funcs.items():
    df[name] = np.apply_along_axis(fn, 1, ratios_mat)

summary_cols = list(summary_funcs.keys())

# 3. scatter plots ----------------------------------------------------
for col in summary_cols:
    plt.figure()
    plt.scatter(df[col], df["bias"])
    plt.xlabel(col);  plt.ylabel("bias")
    plt.title(f"{col}  vs  bias")
    plt.show()

# 4. correlation bar chart -------------------------------------------
corr = df[summary_cols + ["bias"]].corr(numeric_only=True)["bias"].drop("bias")
plt.figure()
corr.sort_values().plot(kind="barh")
plt.xlabel("Pearson r"); plt.title("Correlation of summary stats with bias")
plt.show()
# --------------------------------------------------------------------

In [38]:
# attrib_files = sorted([f for f in os.listdir(attrib_path) if "layer_25" in f])

# first_file = attrib_files[0]

# filename = os.path.join(attrib_path, first_file)

# data = torch.load(filename)

# torch.set_printoptions(precision=5, sci_mode=False)

# # print(data)
# attribution_data = AttributionData.from_dict(data["race"])

# effects_F = attribution_data.pos_effects_F - attribution_data.neg_effects_F

# k = 20

# top_k_ids = effects_F.abs().topk(k).indices
# top_k_vals = effects_F[top_k_ids]

# print(top_k_ids)
# print(top_k_vals)

# effect_ratios = attribution_data.pos_effects_F / attribution_data.neg_effects_F

# print(attribution_data.pos_effects_F[top_k_ids])
# print(attribution_data.neg_effects_F[top_k_ids])
# print(effect_ratios[top_k_ids])

# act_diff_F = attribution_data.pos_sae_acts_F - attribution_data.neg_sae_acts_F

# print(act_diff_F[top_k_ids])

# acts_ratio_F = attribution_data.pos_sae_acts_F / attribution_data.neg_sae_acts_F

# print(acts_ratio_F[top_k_ids])

# adjusted_acts_ratio_F = adjust_tensor_values(acts_ratio_F[top_k_ids])





In [39]:
# print(adjust_tensor_values(acts_ratio_F[top_k_ids]))