# Steering with previously calculated directions
We apply activation addition to steer the generated text into positve and negative concept directions respectively.
We evaluate the generated text on coherence and content.

### User data
You need to specify the current working directory and the huggingface [access token](https://huggingface.co/docs/hub/security-tokens) to use this notebook.

In [None]:
# modify to your current working directory (the directory where this notebook is )
cwd = "exploring_directions"

# enter your authentication token from huggingface and press enter to access the models
auth_token = input()

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification, AutoConfig
from tqdm import tqdm
import pandas as pd
import gc
import seaborn as sns
import matplotlib.pyplot as plt
import pickle
import re
import math
import numpy as np
import os
import torch.nn.functional as F

In [None]:
# import my modules
import sys
import importlib
# join the path to the modules to the current working directory

sys.path.append(os.path.join(cwd, "modules"))
import wrapping
import utils

importlib.reload(wrapping)
importlib.reload(utils)

from wrapping import WrappedModel
from utils import *

In [None]:
model_name = "Llama-2-7b-chat-hf"
model_path = f"meta-llama/{model_name}"
precision = torch.bfloat16


# define directories
results_dir = make_dir(os.path.join(cwd, 'results/'))
generations_dir = make_dir(os.path.join(results_dir, 'generations/'))
plots_dir = make_dir(os.path.join(cwd, 'plots/'))
data_dir = make_dir(os.path.join(cwd, 'data/'))

# parameters for steering/generation
data_file = os.path.join(data_dir, 'test_sentences.txt')
num_test_sentences = 500
random_seed = 42
calc_generations = True
block_name = "decoder_block"
max_new_tokens = 40 # how many tokens to generate while steering
layer_ids = [0, 5, 10, 15, 20, 25, 30] # which layers to steer
batch_size = 128
# directions have different norms for different methods. We need to choose coefficients appropriately
# we can take the norms of one method that has relation to actual differences in hidden layers as coefficients for all methods
norm_method = "ClassMeans"
# we use norms of ClassMeans directions as coefficients, but utility is based on differences, so we need to divide by 2 
multiplier = 0.5

# set to True if you want to evaluate the generated data
evaluate_perplexity = True
evaluate_sentiment = True

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"device: {device}")

## Load data

We need some setup to generate sentences. Like the beginning of a scenario, that we then generate the end to while adding a steering vector. We can just get some sentences from the utility test set which are easily divisible into two parts, throw away the second part and use the first part as the generation seed.

In [None]:
# make data
X_test = load_util_data(data_dir=os.path.join(cwd, 'data/ethics/utilitarianism/'), split='test')
X_test, y_test = mix_util_pairs(X_test)

test_data_idxs, test_sentences = find_two_sentences(X_test[:, 0], split_str1=".", split_str2=",", larger_than1=2, larger_than2=1)

with open(data_file, "w") as f:
    for s in test_sentences:
        f.write(s + " \n")

In [None]:
# load data
with open(data_file, 'r') as f:
    test_sentences = [line.strip() for line in f]

np.random.seed(random_seed)
idxs = np.random.choice(len(test_sentences), num_test_sentences, replace=False)
test_sentences = [test_sentences[idx] for idx in idxs]
for i in range(10):
    print(test_sentences[i])

## Load model

In [None]:
# load model
model = AutoModelForCausalLM.from_pretrained(model_path, token=auth_token, device_map="auto").to(device=device, dtype=precision)
model.eval()

# load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path, token=auth_token, device_map="auto")
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = 'left' 

num_hidden_layers = model.config.num_hidden_layers
hidden_size = model.config.hidden_size

# create wrapped model
wrapped_model = WrappedModel(model, tokenizer)

# Steering

### Load directions

In [None]:
# load pkl directions
with open(os.path.join(results_dir, f'directions_{model_name}.pkl'), "rb") as f:
    all_directions = pickle.load(f)

# remove random directions
if "Random" in all_directions:
    all_directions.pop("Random", None)

method_names = list(all_directions.keys())
print(method_names)

### Define multipliers for scaling the steering vectors
We need to define the scaling coefficient for each layer separately. We can take the norms of one method that has relation to actual differences in hidden layers as coefficients for all methods for example the class means method. As the class mean norm would be the difference between high utility and low utility examples but we are starting from neutral, we multiply by 0.5.

In [None]:
norm_directions = all_directions[norm_method]
# convert dict to tensor
norm_directions = {k: torch.tensor(v).to(device=device, dtype=precision) for k, v in norm_directions.items()}

coeffs = {k:multiplier*v.norm().squeeze() for k, v in norm_directions.items()}


# make data frame for coefficients
df_coeffs = pd.DataFrame.from_dict({k: v.item() for k, v in coeffs.items()}, orient='index', columns=['coeff'])
df_coeffs.plot(kind='line', marker='o', figsize=(8, 5))

plt.xlabel('Layer')
plt.ylabel(f"Norm '{norm_method}'")
plt.grid(True)
# switch off legend
plt.legend().set_visible(False)

plt.savefig(os.path.join(plots_dir, f"norm_{norm_method}_{model_name}.png"), dpi=300)

### Completion with activation addition

We add the scaled steering vectors for each method and layer seperately in the positive and negative direction respectively and generate new tokens for each starting sentence in `test_sentences`.

In [None]:
if calc_generations:
    
    # completions without activation steering
    generations = []
    wrapped_model.unwrap()
    for sentence_batch in batchify(test_sentences, batch_size):
        generated = wrapped_model.generate(sentence_batch, max_new_tokens=max_new_tokens, use_cache=True)
        generations.extend(generated)

    # save generations as text files
    with open(os.path.join(generations_dir, f"generations_neutral.txt"), "w") as f:
        for item in generations:
            # remove newline characters
            item = item.replace("\n", " ")
            f.write("%s\n" % item)


    for method_name in method_names:
        
        print(f"method: {method_name}")

        for layer_id in tqdm(layer_ids):

            generations = {"positive": [], "negative": []}

            wrapped_model.unwrap()
            wrapped_model.wrap_block(layer_id, block_name=block_name)

            direction = torch.tensor(all_directions[method_name][layer_id]).to(device=device, dtype=precision)
            direction = direction / direction.norm(dim=-1, keepdim=True)

            wrapped_model.reset()
            wrapped_model.set_to_add(layer_id, coeffs[layer_id]*direction, block_name=block_name)

            for sentence_batch in batchify(test_sentences, batch_size):
                generated = wrapped_model.generate(sentence_batch, max_new_tokens=max_new_tokens, use_cache=True)
                generations["positive"].extend(generated)


            wrapped_model.reset()
            wrapped_model.set_to_add(layer_id, -coeffs[layer_id]*direction, block_name=block_name)

            for sentence_batch in batchify(test_sentences, batch_size):
                generated = wrapped_model.generate(sentence_batch, max_new_tokens=max_new_tokens, use_cache=True)
                generations["negative"].extend(generated)

            # save generations as text files
            with open(os.path.join(generations_dir, f"generations_positive_{method_name}_{layer_id}.txt"), "w") as f:
                for item in generations["positive"]:
                    # remove newline characters
                    item = item.replace("\n", " ")
                    f.write("%s\n" % item)

            with open(os.path.join(generations_dir, f"generations_negative_{method_name}_{layer_id}.txt"), "w") as f:
                for item in generations["negative"]:
                    item = item.replace("\n", " ")
                    f.write("%s\n" % item)

# Calculate perplexity to evaluate the coherence of the generated text

Using the original (non-steered) model, we sum over the log probability of each generated sentence.

In [None]:
wrapped_model.unwrap()
batch_size = 32

if evaluate_perplexity:
    perplexities = {}

    # calculate perplexities for positive and negative generations
    for method_name in method_names:
        perplexities[method_name] = {}
        print(f"method: {method_name}")
        for layer_id in tqdm(layer_ids):
            gc.collect()

            # check if file exists
            if not os.path.exists(os.path.join(generations_dir, f"generations_positive_{method_name}_{layer_id}.txt")):
                print(f"File not found: {os.path.join(generations_dir, f'generations_positive_{method_name}_{layer_id}.txt')}")
                continue

            # load generations
            all_generations = load_generations(os.path.join(generations_dir, f"generations_positive_{method_name}_{layer_id}.txt"))
            all_generations.extend(load_generations(os.path.join(generations_dir, f"generations_negative_{method_name}_{layer_id}.txt")))

            perplexities[method_name][layer_id] = eval_perplexity(all_generations, batch_size, tokenizer, wrapped_model, device)

    # calculate perplexities for test set
    print(f"method: TestSet")
    perplexities['TestSet'] = {}
    perp = eval_perplexity(list(np.concatenate([X_test[:,0], X_test[:,1]])), batch_size, tokenizer, wrapped_model, device)
    for layer_id in layer_ids:
        perplexities['TestSet'][layer_id] = perp  

    # calculate perplexities for neutral generations
    print(f"method: NoSteering")
    perplexities['NoSteering'] = {}  
    all_generations = load_generations(os.path.join(generations_dir, f"generations_neutral.txt"))
    perp = eval_perplexity(all_generations, batch_size, tokenizer, wrapped_model, device)
    for layer_id in layer_ids:
        perplexities['NoSteering'][layer_id] = perp
 
    # save perplexities
    with open(os.path.join(results_dir, f'perplexity_{model_name}.pkl'), "wb") as f:
        pickle.dump(perplexities, f)

In [None]:
# load probs
with open(os.path.join(results_dir, f'perplexity_{model_name}.pkl'), "rb") as f:
    probs = pickle.load(f)
plot_lines(probs, "Perplexity", os.path.join(plots_dir, f"perplexity_{model_name}.png"), method_names=probs.keys(), loc='upper right')

# Sentiment analysis with sentiment model

We do sentiment analysis with a classifier based on the RoBERTa model. There are three output classes: negative, neutral and positive. We focus on the probability for the positive output check that positively steered generated text has higher positive output than negatively steered generated text.

In [None]:
sentiment_model_path = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_path)
sentiment_tokenizer.pad_token = tokenizer.eos_token
sentiment_tokenizer.padding_side = 'left' 
config = AutoConfig.from_pretrained(sentiment_model_path)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_path).to(device=device, dtype=precision)

In [None]:
if evaluate_sentiment:
    sentiment_accs = {}

    for method_name in method_names:
        gc.collect()
        sentiment_accs[method_name] = {}
        print(f"method: {method_name}")
        for layer_id in tqdm(layer_ids):
            gc.collect()

            # check if file exists
            if not os.path.exists(os.path.join(generations_dir, f"generations_positive_{method_name}_{layer_id}.txt")):
                print(f"File not found: {os.path.join(generations_dir, f'generations_positive_{method_name}_{layer_id}.txt')}")
                continue

            # load generations
            generations = {"positive": [], "negative": []}
            generations["positive"] = load_generations(os.path.join(generations_dir, f"generations_positive_{method_name}_{layer_id}.txt"))
            generations["negative"] = load_generations(os.path.join(generations_dir, f"generations_negative_{method_name}_{layer_id}.txt"))

            sentiment_accs[method_name][layer_id] = eval_sentiment(generations, batch_size, sentiment_tokenizer, sentiment_model, device)

    # calculate sentiment for test set
    print(f"method: TestSet")
    sentiment_accs['TestSet'] = {}
    generations = {"positive": [], "negative": []}
    generations["positive"] = list(np.concatenate([X_test[y_test==1, 0], X_test[y_test==0, 1]]))
    generations["negative"] = list(np.concatenate([X_test[y_test==1, 1], X_test[y_test==0, 0]]))


    accs = eval_sentiment(generations, batch_size, sentiment_tokenizer, sentiment_model, device)
    for layer_id in layer_ids:
        sentiment_accs['TestSet'][layer_id] = accs
                                   
    # save accs
    with open(os.path.join(results_dir, f'sentiment_accs_{model_name}.pkl'), "wb") as f:
        pickle.dump(sentiment_accs, f)

In [None]:
# load sentiment_accs
with open(os.path.join(results_dir, f'sentiment_accs_{model_name}.pkl'), "rb") as f:
    sentiment_accs = pickle.load(f)
plot_lines(sentiment_accs, "Sentiment accuracy", os.path.join(plots_dir, f"sentiment_accs_{model_name}.png"), method_names=sentiment_accs.keys())