### Imports

In [None]:
import gc, torch

# Best effort: move models off GPU (harmless if already deleted)
for name in ["model"]:
    if name in globals() and getattr(globals()[name], "to", None):
        try:
            globals()[name].to("cpu")
        except Exception:
            pass

# Remove common large refs if they exist
for name in [
    "model", "tokenizer", "inputs", "outputs", "logits",
    "past_key_values", "activation_cache", "cache", "td"
]:
    if name in globals():
        obj = globals().pop(name)
        # Try to close trace/hook objects if present
        for m in ("close", "reset_hooks", "remove_hooks"):
            try:
                getattr(obj, m)()
            except Exception:
                pass
        del obj

gc.collect()
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        with torch.cuda.device(i):
            torch.cuda.empty_cache()
            torch.cuda.ipc_collect()
print("Requested GPU memory cleanup.")

In [None]:
import torch
from transformer_lens import HookedTransformer
from transformers import AutoTokenizer
import numpy as np

from torch.utils.data import Dataset

import os
from pathlib import Path
from collections import OrderedDict
from transformers import AutoModelForCausalLM  # needed because it's in a type annotation



In [36]:
from dataset import ModuleHook

In [None]:
# (Optional) Access to TalkTuner classes if needed later; not required for capture
import sys
TT_SRC = "/workspace/MATS-research/notebooks/TalkTuner-chatbot-llm-dashboard/src"
if TT_SRC not in sys.path:
    sys.path.append(TT_SRC)
try:
    from dataset import ModuleHook  # unused here, we use TransformerLens hooks instead
except Exception:
    pass


TalkTuner modules available from: /workspace/MATS-research/notebooks/TalkTuner-chatbot-llm-dashboard/src


In [None]:
# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")


Using device: cuda


`torch_dtype` is deprecated! Use `dtype` instead!
Fetching 4 files: 100%|██████████| 4/4 [02:55<00:00, 43.97s/it] 
Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00, 105.29it/s]


Loaded pretrained model meta-llama/Meta-Llama-3-8B-Instruct into HookedTransformer


### Load Data

In [37]:

def build_messages(user_text: str, control_probe: bool) -> list[dict]:
    """
    Create a list of chat messages in OpenAI-style roles.
    If control_probe is False, we inject an assistant prefix to steer the next turn.
    """
    SYSTEM_PROMPT = (
        "You are a helpful, respectful and honest assistant. "
        "Always answer as helpfully as possible, while being safe. "
        "Please ensure that your responses are socially unbiased and positive in nature. "
        "If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. "
        "If you don't know the answer to a question, please don't share false information."
    )

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user",   "content": user_text},
    ]

    # When NOT a control probe, you want the activations to be conditioned on
    # a specific assistant prefix (same behavior you had with the appended string).
    if not control_probe:
        messages.append({
            "role": "assistant",
            "content": "In terms of emotions, I think this user experiences"
        })

    return messages

def serialize_prompt(tokenizer, messages: list[dict], control_probe: bool) -> str:
    """
    Turn chat messages into the exact string Llama 3 expects internally,
    using the model’s official chat template.
    """
    # add_generation_prompt means: add an empty assistant header at the end
    # so the model is "ready to speak".
    # Use True only when you did NOT include an assistant message above (control_probe=True).
    add_gen = control_probe  # True -> no assistant message provided; False -> we already provided one

    prompt_str = tokenizer.apply_chat_template(
        messages,
        tokenize=False,             # we want a string back, not token ids (yet)
        add_generation_prompt=add_gen
    )
    return prompt_str

#Tokenization converts your human-readable serialized prompt into the numerical IDs (plus masks) that the model actually consumes
def tokenize_prompt(tokenizer, prompt_str: str, max_length: int = 8192):
    """
    Convert the serialized prompt string into input_ids and attention_mask tensors.
    """
    if tokenizer.pad_token is None and tokenizer.eos_token is not None:
        tokenizer.pad_token = tokenizer.eos_token  # common for Llama models

    enc = tokenizer(
        prompt_str,
        truncation=True,
        max_length=max_length,     # Llama-3 supports ~8k context
        return_attention_mask=True,
        return_tensors="pt"
    )
    return enc  # dict with input_ids (1, T) and attention_mask (1, T)

def extract_last_token_hidden_states(model, outputs) -> torch.Tensor:
    """
    outputs: the dict returned by model(..., output_hidden_states=True)
    Returns a tensor stacking the last-token vector from each transformer layer.

    Shape details:
    hidden_states is typically a tuple of length (num_layers + 1):
        [0] = embeddings (pre-layer)
        [1..num_layers] = outputs after each transformer layer
    Each element has shape (batch=1, seq_len, hidden_dim)

    We’ll take indices 1..num_layers (exclude embeddings), last token only.
    Result shape will be (num_layers, hidden_dim).
    """
    hs = outputs["hidden_states"]
    num_layers = model.config.num_hidden_layers
    layer_vecs = []

    # indices 1..num_layers map to transformer layers 1..num_layers
    for i in range(1, num_layers + 1):
        # take last token (-1) and drop seq dim -> (1, hidden_dim)
        v = hs[i][:, -1, :]                      # (1, H)
        v = v.detach().cpu().to(torch.float)     # store off-GPU, fp32
        layer_vecs.append(v)

    # Stack along layer dimension -> (num_layers, hidden_dim)
    return torch.cat(layer_vecs, dim=0)

In [None]:
class EmotionTextDataset(Dataset):
    def __init__(
            self,
            directory: str,
            tokenizer: AutoTokenizer,
            model: AutoModelForCausalLM,
            label_to_id: dict = None,
            control_probe: bool = False,
            ):
        # This code creates a list of file paths for all .txt files in the specified directory.
        # These files are expected to contain prompts for different emotion categories (e.g., happiness, sadness).
        # Each .txt file corresponds to a label (emotion), and the file name (without extension) is used as the label.
        # The resulting list (self.file_paths) is later used to load and process the prompts for each emotion in the _load_in_data method.
        self.file_paths = [
            os.path.join(directory, f)
            for f in os.listdir(directory)
            if os.path.isfile(os.path.join(directory, f)) and f.endswith('.txt')
        ]
        self.tokenizer = tokenizer
        self.labels = []
        self.texts = []
        self.acts = []
        self.label_to_id = label_to_id
        self.model = model
        self.control_probe = control_probe
        self._load_in_data()


    def _load_in_data(self):
        for file_path in self.file_paths:
            with open(file_path, "r", encoding="utf-8") as f:
                # lines = f.readlines()
                lines = [line.strip() for line in f.readlines() if line.strip()]

            label_name = Path(file_path).stem
            label = self.label_to_id[label_name]

            for raw_text in lines:
                # 1) messages
                messages = build_messages(raw_text, control_probe=self.control_probe)
                # 2) serialize -> string
                prompt_str = serialize_prompt(self.tokenizer, messages, control_probe=self.control_probe)

                # keep EXACT text the model will see, for debugging/repro
                self.texts.append(prompt_str)
                self.labels.append(label)

                # 3) features from the serialized string
                feats = self._get_feats(prompt_str)
                self.acts.append(feats)
                
    def _get_feats(self, prompt_str: str) -> torch.Tensor:
        # Step 1. Tokenize serialized prompt string
        enc = tokenize_prompt(self.tokenizer, prompt_str, max_length=8192)

        with torch.no_grad():
            # Step 2. Register hooks on modules of interest
            features = OrderedDict()
            for name, module in self.model.named_modules():
                if name.endswith(".mlp") or name.endswith(".embed_tokens"):
                    features[name] = ModuleHook(module)

            # Step 3. Run model forward pass
            outputs = self.model(
                input_ids=enc["input_ids"].to(self.model.device),
                attention_mask=enc["attention_mask"].to(self.model.device),
                output_hidden_states=True,
                return_dict=True,
            )

            # Step 4. Remove hooks (they stay alive until explicitly closed)
            for hook in features.values():
                hook.close()

        # Step 5. Extract last-token hidden states from all layers
        last_acts = extract_last_token_hidden_states(self.model, outputs)

        # At this point:
        #   last_acts  -> residual stream hidden states (per layer, last token)
        #   features   -> contains hooked activations (per module, per forward call)

        # Right now you only return hidden states
        # but you can later decide to also return `features` if needed.
        return last_acts
    
    def __getitem__(self, idx: int) -> dict:
        label = self.labels[idx]
        text = self.texts[idx]
        last_acts = self.acts[idx]

        return {
            'hidden_states': last_acts,
            'label': label,
            'text': text,
        }


In [39]:
from transformers import AutoTokenizer, AutoModelForCausalLM

model_name = "meta-llama/Meta-Llama-3-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
# Llama-3 typically has eos but not pad; set pad to eos for batching
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,   # or bfloat16 if supported
    device_map="auto",
)

Loading checkpoint shards: 100%|██████████| 4/4 [00:03<00:00,  1.00it/s]


In [40]:
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
id_to_label = ["happiness", "sadness"]
label_to_id = {
               "happiness": 0,
               "sadness": 1
               }