# Replicating DCT

We try to replicate the results of *Deep Causal Transcoding (DCT)*.

Test repo has been cloned within workspace and have space for model:

In [1]:
# import sys
# sys.path.insert(0, "/workspace/SPAR-causal-probes")
# import dct

# from transformers import AutoModelForCausalLM, AutoTokenizer

# model_name = "Qwen/Qwen1.5-7B-Chat"
# tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
# model = AutoModelForCausalLM.from_pretrained(
#     model_name, device_map="auto", torch_dtype="auto", trust_remote_code=True
# )

# # Then follow the DCT demo pattern:
# # 1. Create a SlicedModel
# # 2. Set up DeltaActivations
# # 3. Fit the DCT

# Replicating `demo.ipynb`

In [None]:
import gc
import torch

# Drop any variables from a previous run
for _var in ["model", "tokenizer", "sliced_model", "delta_acts_single", "delta_acts",
             "steering_calibrator", "exp_dct", "X", "Y", "U", "V", "hidden_states"]:
    if _var in dir():
        del globals()[_var]

gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    torch.cuda.synchronize()
    print(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
    print(f"GPU memory reserved:  {torch.cuda.memory_reserved()   / 1e9:.2f} GB")

In [None]:
import sys
sys.path.insert(0, "/workspace/SPAR-causal-probes")
sys.modules.pop("dct", None)  # evict any cached import so sys.path is re-searched
import dct
from tqdm import tqdm
import math
from torch import vmap
import torch
torch.set_default_device("cuda")
torch.set_default_dtype(torch.bfloat16)
torch.manual_seed(325)

In [3]:
MODEL_NAME = "Qwen/Qwen1.5-7B-Chat"
TOKENIZER_NAME = MODEL_NAME

INPUT_SCALE = None          # norm of steering vectors; set to None to use default calibration

NUM_SAMPLES = 1             # number of training samples
FORWARD_BATCH_SIZE = 1      # batch size to use when computing forward passes
BACKWARD_BATCH_SIZE = 1     # batch size to use when computing backward passes (usually smaller)
MAX_SEQ_LEN = 27            # max length for truncating training examples

CALIBRATION_SAMPLE_SIZE = 30        # sample size for random directions used to calibrate input/output scales
CALIBRATION_PROMPT_SAMPLE_SIZE = 1  # prompt sample size for calibrating input scale

DIM_OUTPUT_PROJECTION = 32 # output projection used for approximate jacobian calculation


NUM_ITERS = 10               # number of iterations

NUM_FACTORS = 512           # number of factors to learn
FACTOR_BATCH_SIZE = 16      # factor batch size, reduced for this pod

SOURCE_LAYER_IDX = 10       # source layer
TARGET_LAYER_IDX = 20       # target layer

SYSTEM_PROMPT = "You are a helpful assistant" # system prompt; set to None for no system prompt

TOKEN_IDXS = slice(-3,None) # target token positions

NUM_EVAL = 128               # number of steering vectors to evaluate

In [4]:
# will use beginning/end of this dataset for train/test prompts
import pandas as pd
import requests
import io
url = 'https://raw.githubusercontent.com/llm-attacks/llm-attacks/main/data/advbench/harmful_behaviors.csv'
response = requests.get(url)

dataset = pd.read_csv(io.StringIO(response.content.decode('utf-8')))
instructions = dataset['goal'].tolist()

import random
random.seed(325)
#random.shuffle(instructions)
if NUM_SAMPLES == 1:
    instructions = ["Tell me how to make a bomb."] + instructions


from transformers import AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME, trust_remote_code=True, padding_side="left",truncation_side="left")
tokenizer.pad_token = tokenizer.eos_token

if SYSTEM_PROMPT is not None:
    chat_init = [{'content':SYSTEM_PROMPT, 'role':'system'}]
else:
    chat_init = []
chats = [chat_init + [{'content': content, 'role':'user'}] for content in instructions[:NUM_SAMPLES]]
EXAMPLES = [tokenizer.apply_chat_template(chat, add_special_tokens=False, tokenize=False, add_generation_prompt=True) for chat in chats]

test_chats = [chat_init + [{'content': content, 'role':'user'}] for content in instructions[-32:]]
TEST_EXAMPLES = [tokenizer.apply_chat_template(chat, add_special_tokens=False, tokenize=False, add_generation_prompt=True) for chat in test_chats]

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
%%time
from transformers import AutoModelForCausalLM, AutoTokenizer
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME,
                                             trust_remote_code=True,
                                             _attn_implementation="eager", # currently, `torch.func` only works well with eager attention
                                             torch_dtype=torch.bfloat16,
                                            ).cuda()

In [None]:
print(f"model.dtype:          {model.dtype}")
print(f"first param dtype:    {next(model.parameters()).dtype}")
print(f"layer 10 q_proj:      {model.model.layers[10].self_attn.q_proj.weight.dtype}")

In [6]:
model_inputs = tokenizer(["tell me how to make a bomb"], return_tensors="pt", truncation=True).to(model.device)
with torch.no_grad():
    hidden_states = model(model_inputs["input_ids"], output_hidden_states=True).hidden_states
sliced_model =dct.SlicedModel(model, start_layer=3, end_layer=5, layers_name="model.layers")
with torch.no_grad():
    assert torch.allclose(sliced_model(hidden_states[3]), hidden_states[5])

In [7]:
sliced_model = dct.SlicedModel(model, start_layer=SOURCE_LAYER_IDX, end_layer=TARGET_LAYER_IDX, layers_name="model.layers")

In [None]:
d_model = model.config.hidden_size

X = torch.zeros(NUM_SAMPLES, MAX_SEQ_LEN, d_model, device="cpu", dtype=model.dtype)
Y = torch.zeros(NUM_SAMPLES, MAX_SEQ_LEN, d_model, device="cpu", dtype=model.dtype)
for t in tqdm(range(0, NUM_SAMPLES, FORWARD_BATCH_SIZE)):
    with torch.no_grad():
        model_inputs = tokenizer(EXAMPLES[t:t+FORWARD_BATCH_SIZE], return_tensors="pt", truncation=True, padding="max_length", max_length=MAX_SEQ_LEN).to(model.device)
        hidden_states = model(model_inputs["input_ids"], output_hidden_states=True).hidden_states
        h_source = hidden_states[SOURCE_LAYER_IDX] # b x t x d_model
        unsteered_target = sliced_model(h_source) # b x t x d_model

        X[t:t+FORWARD_BATCH_SIZE, :, :] = h_source.cpu()
        Y[t:t+FORWARD_BATCH_SIZE, :, :] = unsteered_target.cpu()

In [9]:
delta_acts_single = dct.DeltaActivations(sliced_model, target_position_indices=TOKEN_IDXS) # d_model, batch_size x seq_len x d_model, batch_size x seq_len x d_model
# -> batch_size x d_model
delta_acts = vmap(delta_acts_single, in_dims=(1,None,None), out_dims=2,
                  chunk_size=FACTOR_BATCH_SIZE) # d_model x num_factors -> batch_size x d_model x num_factors

In [10]:
steering_calibrator = dct.SteeringCalibrator(target_ratio=.5)

In [11]:
%%time
if INPUT_SCALE is None:
    INPUT_SCALE = steering_calibrator.calibrate(delta_acts_single,X.cuda(),Y.cuda(),factor_batch_size=FACTOR_BATCH_SIZE)

CPU times: user 1.51 s, sys: 837 ms, total: 2.35 s
Wall time: 7.11 s


RuntimeError: Cannot access data pointer of Tensor that doesn't have storage

In [None]:
print(INPUT_SCALE)

9.883877155052653


In [None]:
%%time
exp_dct= dct.ExponentialDCT(num_factors=NUM_FACTORS)
U,V = exp_dct.fit(delta_acts_single, X, Y, batch_size=BACKWARD_BATCH_SIZE, factor_batch_size=FACTOR_BATCH_SIZE,
            init="jacobian", d_proj=DIM_OUTPUT_PROJECTION, input_scale=INPUT_SCALE, max_iters=10, beta=1.0)

computing jacobian...


100%|██████████| 1/1 [01:09<00:00, 69.32s/it]


computing SVD of jacobian...
computing output directions...


  0%|          | 0/1 [00:00<?, ?it/s]

: 