# LinearProbes_brightness

Pairwise-controlled linear probing for **brightness** (−50% vs +50%) using COCO val2017 and PaliGemma.

> Labels: 0 = darker (−50%), 1 = brighter (+50%).

In [1]:
import torch
import gc

# Clear PyTorch cache
torch.cuda.empty_cache()

# Delete specific tensors if you have them
# del your_tensor_variable

# Force garbage collection
gc.collect()

# Check VRAM usage
if torch.cuda.is_available():
    print(f"VRAM allocated: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
    print(f"VRAM reserved: {torch.cuda.memory_reserved() / 1024**3:.2f} GB")

VRAM allocated: 0.00 GB
VRAM reserved: 0.00 GB


In [2]:

# ## 0. Config & Setup
# Adjust paths as needed. This notebook uses COCO **val2017** only.

import os, random, json, io
from pathlib import Path

import torch, random
import numpy as np

import random
SEED = 42
def seed_everywhere(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    random.seed(seed)   
seed_everywhere(SEED)
COEFFICIENTS = '_[0.5, 1.5]'
# --- Paths (edit these to match your local files) ---
# ANNO_DIR = '../data/annotations_trainval2017/annotations'
# IMG_DIR  = '../data/val2017'               # COCO val2017 images
OUT_IMG_DIR = f'../data/brightness_pairs{COEFFICIENTS}'   # directory to save brightness-perturbed images
OUT_CSV = f'../data/brightness_dataset{COEFFICIENTS}.csv' # CSV with variants & labels
OUTPUT_DIR = f'../figs_tabs/brightness_probe_pairwise{COEFFICIENTS}'
os.makedirs(OUT_IMG_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Probe params
N_GROUPS = 200                # number of base images to sample (each yields 2 variants)
PAD_TO_MAX = 64               # text max length when extracting LM activations
MODE = "lm"                   # 'lm' 2304 or 'raw' 1152 (vision); we look at the representation difference in LM
MODEL_NAME = 'google/paligemma2-3b-pt-224'

print('Config loaded.')

Config loaded.


In [3]:

# ## 1. Environment Check

import sys, subprocess

def pip_install(pkg):
    try:
        __import__(pkg.split('==')[0].split('[')[0].replace('-', '_'))
    except Exception:
        print(f'Installing {pkg} ...')
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

# Ensure deps (comment out if you manage env separately)
for pkg in [
    "pycocotools",
    "transformers>=4.41.0",
    "torch",
    "pandas",
    "scikit-learn",
    "matplotlib",
    "Pillow",
]:
    try:
        __import__(pkg.split('>=')[0].split('==')[0])
    except Exception as e:
        pip_install(pkg)

print('Environment ready.')

  from .autonotebook import tqdm as notebook_tqdm


Installing scikit-learn ...


[0m

Installing Pillow ...
Environment ready.


[0m

In [4]:
import torch
from transformers import AutoTokenizer, PaliGemmaForConditionalGeneration, AutoProcessor
from typing import List, Optional

device = 'cuda:2' if torch.cuda.is_available() else 'cpu'
print('Using device:', device)

# AMP context helper
class amp_ctx:
    def __init__(self, device='cuda', use_amp=True):
        self.device = device
        self.use_amp = use_amp and (device == 'cuda')
    def __enter__(self):
        if self.use_amp:
            self.ctx = torch.autocast(device_type='cuda', dtype=torch.bfloat16)
            self.ctx.__enter__()
        else:
            self.ctx = None
        return self
    def __exit__(self, exc_type, exc, tb):
        if self.ctx is not None:
            self.ctx.__exit__(exc_type, exc, tb)

# Load model
MODEL_NAME = "google/paligemma-3b-pt-224"
model = PaliGemmaForConditionalGeneration.from_pretrained(
    MODEL_NAME,
    trust_remote_code=True
).to(device).eval()
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)

print('Model loaded:', MODEL_NAME)

# --- MINIMAL STEERING CODE ---

def get_steering_vector(model, tokenizer, pos_prompt: str, neg_prompt: str, layer_idx: int = 15):
    """Extract steering vector from positive/negative prompt pair"""
    
    # Tokenize prompts
    pos_inputs = tokenizer(pos_prompt, return_tensors="pt").to(device)
    neg_inputs = tokenizer(neg_prompt, return_tensors="pt").to(device)
    
    # Forward pass with hooks to capture activations
    pos_acts = []
    neg_acts = []
    
    def hook_fn(acts_list):
        def hook(module, input, output):
            acts_list.append(output[0].detach().clone())  # hidden states
        return hook
    
    with torch.no_grad():
        with amp_ctx(device):
            # Get positive activations
            handle = model.language_model.layers[layer_idx].register_forward_hook(hook_fn(pos_acts))
            _ = model.language_model(**pos_inputs)
            handle.remove()
            
            # Get negative activations  
            handle = model.language_model.layers[layer_idx].register_forward_hook(hook_fn(neg_acts))
            _ = model.language_model(**neg_inputs)
            handle.remove()
    
    # Steering vector = difference in activations (at last token position)
    pos_act = pos_acts[0][0, -1, :]  # [hidden_dim] at the last seq pos
    neg_act = neg_acts[0][0, -1, :]  # [hidden_dim] 
    steering_vec = pos_act - neg_act
    
    return steering_vec

def apply_steering(model, inputs, steering_vec: torch.Tensor, layer_idx: int = 15, strength: float = 1.0):
    """Apply steering vector during generation"""
    device = inputs["input_ids"].device
    dtype = next(model.parameters()).dtype

    # Ensure steering vec matches device & dtype
    steering_vec = steering_vec.to(device=device, dtype=dtype)
    def steering_hook(module, input, output):
        hidden_states = output[0]
        # Add steering to last token position
        hidden_states[0, -1, :] += strength * steering_vec # steer at the last seq pos
        return (hidden_states,) + output[1:]
    
    # Register hook
    handle = model.language_model.layers[layer_idx].register_forward_hook(steering_hook)
    
    try:
        with amp_ctx(device):
            outputs = model.generate(**inputs, max_new_tokens=64, do_sample=True, temperature=0.7)
    finally:
        handle.remove()
    
    return outputs



Using device: cuda:2


Loading checkpoint shards: 100%|██████████| 3/3 [00:00<00:00, 20.46it/s]


Model loaded: google/paligemma-3b-pt-224


In [5]:
# Create steering vector for "happy" vs "sad" sentiment
steering_vec = get_steering_vector(
    model, tokenizer,
    pos_prompt="A very bright picture of a cat sitting on a sofa",
    neg_prompt="A very dark picture of a cat sitting on a sofa",
    layer_idx=15
)

# Test steering on a neutral prompt
test_prompt = "Answer in [bright, dark]. Sam looks out of the window - The sky is"
inputs = tokenizer(test_prompt, return_tensors="pt").to(device)

print("\nOriginal generation:")
with torch.no_grad():
    with amp_ctx(device):
        orig_out = model.generate(**inputs, max_new_tokens=64, do_sample=True, temperature=0.7)
print(tokenizer.decode(orig_out[0], skip_special_tokens=True))

print("\nWith positive steering (strength=2.0):")
steered_out = apply_steering(model, inputs, steering_vec, layer_idx=15, strength=2)
print(tokenizer.decode(steered_out[0], skip_special_tokens=True))

print("\nWith negative steering (strength=-2.0):")
steered_out = apply_steering(model, inputs, steering_vec, layer_idx=15, strength=-2)
print(tokenizer.decode(steered_out[0], skip_special_tokens=True))


Original generation:
Answer in [bright, dark]. Sam looks out of the window - The sky is bright dark

With positive steering (strength=2.0):
Answer in [bright, dark]. Sam looks out of the window - The sky is bright

With negative steering (strength=-2.0):
Answer in [bright, dark]. Sam looks out of the window - The sky is


In [6]:
# ## 4. Activation Extraction (with pad_to_max + 'lm'/'raw' modes)

# Returns list of arrays per layer: [N, seq_len, D]

import numpy as np
from PIL import Image
def _amp_ctx(device, use_amp=True):
    """Return a context manager for autocast if on CUDA, else a no-op."""
    if not use_amp:
        return nullcontext()
    is_cuda = torch.cuda.is_available() and (
        str(device).startswith("cuda") or getattr(getattr(device, "type", None), "__str__", lambda: "")() == "cuda"
        or (hasattr(device, "type") and device.type == "cuda")
    )
    if not is_cuda:
        return nullcontext()
    # Prefer new API if available
    try:
        return torch.autocast("cuda", dtype=torch.float16)
    except Exception:
        # Fallback for older PyTorch
        return torch.cuda.amp.autocast(dtype=torch.float16)
def get_acts_paligemma(
    model, device,
    model_name=MODEL_NAME,
    *, filenames: Optional[List[str]] = None, text: Optional[List[str]] = None,
    batch_size=32, use_amp=True, mode="lm", pad_to_max=None
):
    if (text is not None) and (filenames is not None):
        raise ValueError("Provide either text or image, not both.")

    feats = []
    model.eval()

    # IMAGE branch
    if filenames is not None:
        proc = AutoProcessor.from_pretrained(model_name)
        if mode == "raw":
            model.vision_tower.config.output_hidden_states = True
        else:
            model.language_model.config.output_hidden_states = True

        with torch.inference_mode(), _amp_ctx(device, use_amp):
            for i in range(0, len(filenames), batch_size):
                fbatch = filenames[i:i+batch_size]
                imgs = [Image.open(fp).convert("RGB") for fp in fbatch]

                if mode == "raw":
                    enc = proc(images=imgs, text=["<image>"]*len(imgs), return_tensors="pt")
                    px = enc["pixel_values"].to(device, non_blocking=True)
                    vout = model.vision_tower(pixel_values=px, output_hidden_states=True, return_dict=True)
                    hs = vout.hidden_states  # tuple of layers: [B, seq, 2304]
                    print(f"hs.shape: {hs.shape}")
                else:
                    ############################# direct pass to the full model
                    enc = proc(images=imgs, text=["<image>"]*len(imgs), return_tensors="pt").to(device)
                    out = model(
                                **enc,
                                output_hidden_states=True,
                                return_dict=True
                            )
                    
                    hs_tuple = out.hidden_states   # tuple(len = n_layers), each [B, seq, 2304]
#                     print(f"hs_tuple[0].shape: {hs_tuple[0].shape}")
                    ############################# manual pass to vision_tower -> proj_layer -> language_model (more VRAM demanding)
#                     enc = proc(images=imgs, text=["<image>"]*len(imgs), return_tensors="pt")
#                     px = enc["pixel_values"].to(device, non_blocking=True)
#                     vout = model.vision_tower(pixel_values=px, output_hidden_states=False, return_dict=False)[0]
#                     del px
#                     print(f"vout.shape: {vout.shape}")
#                     proj=model.multi_modal_projector(vout)
#                     del vout                 
#   #                  print(f'shape of input_ids: {enc["input_ids"].shape}, input_ids: {enc["input_ids"]}')
#                     tok_embeds = model.language_model.embed_tokens(enc["input_ids"][...,256:].to(device, non_blocking=True))
#                     print(f"tok_embeds.shape: {tok_embeds.shape}")
#                     inputs_embeds = torch.cat([proj, tok_embeds], dim=1)
#                     print(f"inputs_embeds.shape: {inputs_embeds.shape}")
#                     lm_inputs = {
#                         "inputs_embeds": inputs_embeds,             # vision embeddings
#                         "attention_mask": enc["attention_mask"],
#                     }
#                     del proj
#                     out = model.language_model(**lm_inputs, output_hidden_states=True, return_dict=True)
#                     hs_tuple = out.hidden_states
#                     print(f"hs_tuple[0].shape: {hs_tuple[0].shape}")
#                     del out

                feats.append([h.detach().cpu().float().numpy() for h in hs_tuple])
                del hs_tuple, enc, imgs
                torch.cuda.empty_cache()

    # TEXT branch
    elif text is not None:
        tok = AutoTokenizer.from_pretrained(model_name)
        model.language_model.config.output_hidden_states = True

        with torch.inference_mode(), _amp_ctx(device, use_amp):
            for i in range(0, len(text), batch_size):
                tbatch = text[i:i+batch_size]
                enc = tok(
                    tbatch, return_tensors="pt",
                    padding="max_length" if pad_to_max else True,
                    truncation=True, max_length=pad_to_max
                ).to(device)

                out = model.language_model(**enc, output_hidden_states=True, return_dict=True)
                hs = out.hidden_states  # tuple: [B, seq, 2304]

                feats.append([h.detach().cpu().float().numpy() for h in hs])
                del hs, enc, out
                torch.cuda.empty_cache()
    else:
        raise ValueError("Must provide either filenames or text.")

    # concatenate across batches per layer
    n_layers = len(feats[0])
    layerwise = []
    for l in range(n_layers):
        arrs = [batch[l] for batch in feats]    # list of [B, seq, D]
        layerwise.append(np.concatenate(arrs, axis=0))  # [N, seq, D] (consistent seq if padded)

    return layerwise

In [7]:
# ====================== minimal steering helpers (NEW) ======================
STEER_ALPHA = 1.0  # you can sweep this; positive increases predicted "brightness" if probe learned that.
GEN_SAMPLE_IX = 0  # which test example to use for steered generation capture

def _safe_unit(x: np.ndarray) -> np.ndarray:
    n = np.linalg.norm(x)
    return x / (n + 1e-12)
    

In [8]:
# %% [markdown]
# ## 5. Pairwise-Controlled Probing for Brightness
# Split by base_id (group), extract activations for each variant, and run a linear probe per layer.

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
STRENGTH = 1

df = pd.read_csv(OUT_CSV)  # columns: base_id, variant_path, label, caption, ...

#################################### computation saver ####################################
# import pandas as pd
# import numpy as np

# # Get unique base_ids
# unique_base_ids = df['base_id'].unique()

# # Sample 100 unique base_ids
# np.random.seed(42)  # For reproducibility - remove if you want different results each time
# sampled_base_ids = np.random.choice(unique_base_ids, size=100, replace=False)

# # Filter the dataframe to include only rows with the sampled base_ids
# sampled_df = df[df['base_id'].isin(sampled_base_ids)].copy()

# # Sort by base_id to keep pairs together (optional but nice for viewing)
# sampled_df = sampled_df.sort_values('base_id').reset_index(drop=True)

# print(f"Original dataset: {len(df)} rows with {len(unique_base_ids)} unique base_ids")
# print(f"Sampled dataset: {len(sampled_df)} rows with {len(sampled_df['base_id'].unique())} unique base_ids")
# print(f"Each base_id should have exactly 2 entries: {sampled_df['base_id'].value_counts().unique()}")
# df = sampled_df
#################################### computation saver ####################################

# group-wise split on base_id (prevents identity leakage)
unique_ids = sorted(df['base_id'].unique())
train_ids, test_ids = train_test_split(unique_ids, test_size=0.2, random_state=SEED)

df_tr = df[df['base_id'].isin(train_ids)].reset_index(drop=True)
df_te = df[df['base_id'].isin(test_ids)].reset_index(drop=True)

print(f"Train groups: {len(train_ids)}, Test groups: {len(test_ids)}")
print(f"Train rows: {len(df_tr)}, Test rows: {len(df_te)}")

# extract (images only for brightness), LM space (so D=1152), mean-pool tokens per sample
with torch.inference_mode():
    img_layers_tr = get_acts_paligemma(
        model, device, filenames=df_tr['variant_path'].tolist(),
        mode=MODE, pad_to_max=None
    )
    img_layers_te = get_acts_paligemma(
        model, device, filenames=df_te['variant_path'].tolist(),
        mode=MODE, pad_to_max=None
    )

n_layers = len(img_layers_tr)
layer_ix = list(range(n_layers))

all_rows = []
for layer in layer_ix:
    print(f"layer:{layer}")
    X_tr = img_layers_tr[layer].mean(axis=1)  # [N_train, D]
    y_tr = df_tr['label'].to_numpy()
    X_te = img_layers_te[layer].mean(axis=1)  # [N_test, D]
    y_te = df_te['label'].to_numpy()

    clf = LogisticRegression(max_iter=1000, random_state=SEED).fit(X_tr, y_tr)
    yhat_tr = clf.predict(X_tr)
    yhat_te = clf.predict(X_te)

    tr_acc = accuracy_score(y_tr, yhat_tr)
    te_acc = accuracy_score(y_te, yhat_te)
    te_f1  = f1_score(y_te, yhat_te, average="macro")
    
    ################################### steering with weights of the linear layer of the probe of this layer of PaliGemma ########################
    W = torch.tensor(clf.coef_.ravel(), dtype=torch.float32) # 2304
#     W = W.view(1, 1, -1) # (1, 1, d_hidden)
    b = clf.intercept_.copy()
#     print(W)
#     print(W.shape)
#     print(b)
#     print(b.shape)
    # Test steering on a neutral prompt
    test_prompt = "Answer in [bright, dark]. Sam looks out of the window - The sky is"
    inputs = tokenizer(test_prompt, return_tensors="pt").to(device)
    
    orig_out = model.generate(**inputs, max_new_tokens=64, do_sample=True, temperature=0.7)
    time_factors = [0, 1, 2, 5, 10, 20]
#     steered_outs = {}
    
    for time_factor in time_factors:
        steered_out = apply_steering(model, inputs, W, layer_idx=layer, strength=STRENGTH*time_factor)
        steered_out = tokenizer.decode(steered_out[0], skip_special_tokens=True)
        all_rows.append({f'steered_outs_{STRENGTH*time_factor}':steered_out})
        print(f"layer: {layer}, steered: {steered_out}, strength: {STRENGTH*time_factor}")
    
#     hooks = [] # Where I want to hook, MLP, residual, etc.
    ################################# steering with weights of the linear layer of the probe of this layer of PaliGemma ########################

    print(f"Layer {layer:2d} | Train {tr_acc:.4f} | Test {te_acc:.4f} | F1 {te_f1:.4f}")
    all_rows.append({"layer": layer, "train_acc": tr_acc, "test_acc": te_acc, "test_f1": te_f1})


# Save results and plot
res_df = pd.DataFrame(all_rows)
res_csv = str(Path(OUTPUT_DIR) / "results.csv")
res_plot = str(Path(OUTPUT_DIR) / "accuracy_f1_curve.png")
res_df.to_csv(res_csv, index=False)

plt.figure(figsize=(9,5))
plt.plot(res_df["layer"], res_df["train_acc"], label="Train Acc")
plt.plot(res_df["layer"], res_df["test_acc"],  label="Test Acc")
plt.plot(res_df["layer"], res_df["test_f1"],   label="Test F1", linestyle="--", marker="o")
# highlight layer 0
# if 0 in res_df["layer"].values:
#     i0 = res_df.index[res_df["layer"]==0][0]
#     plt.scatter([0], [res_df.loc[i0, "test_acc"]], s=60, edgecolors="k", label="Layer 0 (Test Acc)")
plt.xlabel("Layer"); plt.ylabel("Score"); plt.title("Brightness Probe (Pairwise-Controlled)")
plt.legend(); plt.grid(True)
plt.savefig(res_plot, dpi=150); plt.close()

print(f"Saved results -> {res_csv}\nSaved plot -> {res_plot}")

Train groups: 400, Test groups: 100
Train rows: 800, Test rows: 200


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


layer:0
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is dark, strength: 0
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright and dark, strength: 1
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright dark, strength: 2
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is, strength: 5
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright, strength: 10
layer: 0, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is in with with for  download, strength: 20
Layer  0 | Train 1.0000 | Test 0.9350 | F1 0.9350
layer:1
layer: 1, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright dark, strength: 0
layer: 1, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is, strength: 1
layer: 1, steered: Answer in [bright, dark]. Sam looks out of

layer: 10, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright and, strength: 20
Layer 10 | Train 1.0000 | Test 0.9300 | F1 0.9300
layer:11
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright, strength: 0
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is, strength: 1
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright and dark, strength: 2
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is, strength: 5
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is bright dark, strength: 10
layer: 11, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is dark dream, strength: 20
Layer 11 | Train 1.0000 | Test 0.9050 | F1 0.9050
layer:12
layer: 12, steered: Answer in [bright, dark]. Sam looks out of the window - The sky is dark, strength: 0
layer: 12, steere

IndexError: index 18 is out of range

In [None]:
# 1) Make sure you're actually on PaliGemma 2 artifacts:
print(model.__class__.__name__)                       # PaliGemmaForConditionalGeneration
print(type(model.language_model).__name__)                # Gemma2ForCausalLM
print(model.config.text_config.model_type)            # 'gemma2'

# 2) Shapes that betray Gemma 2:
print(model.text_model.model.embed_tokens.weight.shape)  # [~256k, 2304]


In [None]:
# Weird enough, when using PaliGemmaForConditionalGeneration, the d_hidden is 2048 instead of 2504 for language_model
# Because that's a Gemma instead of Gemma2! see model printdown

In [None]:
res_df = pd.read_csv(res_csv)
res_df.steered_out_2 = res_df.steered_out_2.apply(lambda x:tokenizer.decode(eval(x)[0])).tolist()
res_df.steered_out_4 = res_df.steered_out_4.apply(lambda x:tokenizer.decode(eval(x)[0])).tolist()
# res_df['steered_out_2'].tolist(), res_df['steered_out_4'].tolist(), res_df['steered_out_2'] == res_df['steered_out_4']

In [None]:
X_tr.shape

In [None]:
img_layers_tr[0].shape

In [None]:
model