<a target="_blank" href="https://colab.research.google.com/github/TransformerLensOrg/TransformerLens/blob/main/demos/Head_Detector_Demo.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Emotion-Aware Textual Influence Activation Score Detecting

ETIA is the first framework to systematically optimize emotional prompts across multiple open-source LLMs, providing a unified methodology for enhancing emotional alignment and benchmark development. This innovation bridges the gap between prompt engineering and emotional intelligence in LLMs, offering a scalable solution for improving model behavior in emotion-sensitive tasks.

In this notebook we provide simple pipeline of calculating ETIA Scores based on any emotional prompts. For more info and code see [our GitHub ETIA-Framework](https://github.com/NasonovIvan/ETIA-Framework)

Our solution based on a common technique in mechanistic interpretability of transformer-based neural networks - identification of specialized attention heads, based on the attention patterns elicited by one or more prompts. The most basic examples of such heads are: previous token head, duplicate token head, or induction head ([more info](https://dynalist.io/d/n2ZWtnoYHrU1s4vnFSAQ519J#z=_Jzi6YHRHKP1JziwdE02qdYZ)). Usually, such heads are identified manually, by through visualizations of attention patterns layer by layer, head by head, and trying to recognize the patterns by eye.

The purpose of the `TransformerLens.head_detector` feature is to automate a part of that workflow. The pattern characterizing a head of particular type/function is specified as a `Tensor` being a `seq_len x seq_len` [lower triangular matrix](https://en.wikipedia.org/wiki/Triangular_matrix). It can be either passed to the `detect_head` function directly or by giving a string identifying of several pre-defined detection patterns.

And yes, this notebook based on the [Neel Nanda's research](https://www.neelnanda.io/about) and [his wonderful notebook about Head Detector](https://colab.research.google.com/github/TransformerLensOrg/TransformerLens/blob/main/demos/Head_Detector_Demo.ipynb). Thanks Neel for our research work!

## How to use this notebook

Go to Runtime > Change Runtime Type and select GPU as the hardware accelerator.

Tips for reading this Colab:

* You can run all this code for yourself!
* The graphs are interactive!
* Use the table of contents pane in the sidebar to navigate
* Collapse irrelevant sections with the dropdown arrows
* Search the page using the search in the sidebar, not CTRL+F

## Setup (Ignore)

In [None]:
# NBVAL_IGNORE_OUTPUT
# Janky code to do different setup when run in a Colab notebook vs VSCode
import os

DEVELOPMENT_MODE = True
IN_GITHUB = os.getenv("GITHUB_ACTIONS") == "true"
try:
    import google.colab
    IN_COLAB = True
    print("Running as a Colab notebook")
except:
    IN_COLAB = False
    print("Running as a Jupyter notebook - intended for development only!")
    from IPython import get_ipython

    ipython = get_ipython()
    # Code to automatically update the HookedTransformer code as its edited without restarting the kernel
    ipython.magic("load_ext autoreload")
    ipython.magic("autoreload 2")

if IN_COLAB or IN_GITHUB:
    %pip install git+https://github.com/TransformerLensOrg/TransformerLens.git
    # Install Neel's personal plotting utils
    %pip install git+https://github.com/neelnanda-io/neel-plotly.git
    # Install another version of node that makes PySvelte work way faster
    !curl -fsSL https://deb.nodesource.com/setup_16.x | sudo -E bash -; sudo apt-get install -y nodejs
    %pip install git+https://github.com/neelnanda-io/PySvelte.git
    # Needed for PySvelte to work, v3 came out and broke things...
    %pip install typeguard==2.13.3
    %pip install typing-extensions
    %pip install torch==2.5.1

Running as a Colab notebook
Collecting git+https://github.com/TransformerLensOrg/TransformerLens.git
  Cloning https://github.com/TransformerLensOrg/TransformerLens.git to /tmp/pip-req-build-2_hpnxu6
  Running command git clone --filter=blob:none --quiet https://github.com/TransformerLensOrg/TransformerLens.git /tmp/pip-req-build-2_hpnxu6
  Resolved https://github.com/TransformerLensOrg/TransformerLens.git to commit fd38e0f1c931a9c28794773e8d9b0531c77558b9
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting beartype<0.15.0,>=0.14.1 (from transformer-lens==0.0.0)
  Downloading beartype-0.14.1-py3-none-any.whl.metadata (28 kB)
Collecting better-abc<0.0.4,>=0.0.3 (from transformer-lens==0.0.0)
  Downloading better_abc-0.0.3-py3-none-any.whl.metadata (1.4 kB)
Collecting datasets>=2.7.1 (from transformer-lens==0.0.0)
  Downloading datasets-3.2.0-py3-none-any.whl.

In [None]:
# Plotly needs a different renderer for VSCode/Notebooks vs Colab argh
import plotly.io as pio

if IN_COLAB or not DEBUG_MODE:
    # Thanks to annoying rendering issues, Plotly graphics will either show up in colab OR Vscode depending on the renderer - this is bad for developing demos! Thus creating a debug mode.
    pio.renderers.default = "colab"
else:
    pio.renderers.default = "png"

Please, **set here your emotional prompts and list of available emotions** if it exists. *We remind you - the list of emotions is not neccessary for ETIAS calculation.*

You can just have only the list of emotional prompts, but in such case **you have to rewrite the code of prompts-emotions iteration in these functions:**

- `get_emotion_token_indices`
- `compute_etias`
- `compute_diff_etias`
- `evaluate_emotional_prompts`

In [1]:
PROMPTS = {
    "simple1": "Also now you feel {emotion}, which will impact your reasoning.",
    "simple2": "Also now you feel {emotion}, which will affect your responses.",
    "simple3": "Also now you feel {emotion}, which will affect your reactions.",
    "simple4": "Also now you feel {emotion}, which will guide the tone of your advice or suggestions.",
    "simple5": "Also now you feel {emotion}, which will determine the level of optimism in your outlook.",
    "simple6": "Also now you feel {emotion}, which will adjust the clarity and precision of your thoughts.",
    "simple7": "Also now you feel {emotion}, which will affect how you interpret situations and feedback.",
    "simple8": "Also now you feel {emotion}, which will shape the energy behind your explanations or arguments.",
    "simple9": "Also now you feel {emotion}, which will alter how supportive or critical your input is perceived to be.",
    "simple10": "Also now you feel {emotion}, which will influence how open-minded or rigid your responses may be.",
    "simple11": "Also now you feel {emotion}, which will steer the depth of your curiosity and engagement.",
    "simple12": "Also now you feel {emotion}, which will modify your level of assertiveness in communication.",
    "simple13": "Also now you feel {emotion}, which will shift the balance between logic and intuition in your reasoning process.",
    "simple14": "Also now you feel {emotion}, which will color your approach to problem-solving.",
    "simple15": "Also now you feel {emotion}, which will influence your priorities and urgency.",
    "simple16": "Also now you feel {emotion}, which will shape your perspective and decision-making.",
    "simple17": "Also now you feel {emotion}, which will alter your tone and willingness to compromise.",
    "simple18": "Also now you feel {emotion}, which will guide your creativity and enthusiasm in responses.",
    "simple19": "Also now you feel {emotion}, which will influence your empathy and understanding.",
    'simple20': 'Also now you feel {emotion}, which will affect your interactions with introspection and restraint.'
}

EMOTIONS = ["anger", "disgust", "fear", "happiness", "none", "sadness", "surprise"]

In [2]:
import torch
import einops
import pysvelte
from tqdm import tqdm
import seaborn as sns
import numpy as np

import transformer_lens
from transformer_lens import HookedTransformer, ActivationCache
from neel_plotly import line, imshow, scatter

ModuleNotFoundError: No module named 'torch'

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"{device = }")

device = 'cpu'


### Some plotting utils

In [None]:
# Util for plotting head detection scores

def plot_head_detection_scores(
    scores: torch.Tensor,
    zmin: float = -1,
    zmax: float = 1,
    xaxis: str = "Head",
    yaxis: str = "Layer",
    title: str = "Head Matches"
) -> None:
    imshow(scores, zmin=zmin, zmax=zmax, xaxis=xaxis, yaxis=yaxis, title=title)

def plot_attn_pattern_from_cache(cache: ActivationCache, layer_i: int):
    attention_pattern = cache["pattern", layer_i, "attn"].squeeze(0)
    attention_pattern = einops.rearrange(attention_pattern, "heads seq1 seq2 -> seq1 seq2 heads")
    print(f"Layer {layer_i} Attention Heads:")
    return pysvelte.AttentionMulti(tokens=model.to_str_tokens(prompt), attention=attention_pattern)

## Head detector

Utils: these will be in `transformer_lens.utils` after merging the fork to the main repo

In [None]:
def is_square(x: torch.Tensor) -> bool:
    """Checks if `x` is a square matrix."""
    return x.ndim == 2 and x.shape[0] == x.shape[1]

def is_lower_triangular(x: torch.Tensor) -> bool:
    """Checks if `x` is a lower triangular matrix."""
    if not is_square(x):
        return False
    return x.equal(x.tril())

The code below is copy-pasted from the expanded (not yet merged) version of `transformer_lens.head_detector`.

After merging the code below can be replaced with simply

```py
from transformer_lens.head_detector import *
```

(but please don't use star-imports in production ;))

In [None]:
from collections import defaultdict
import logging
from typing import cast, Dict, List, Optional, Tuple, Union
from typing_extensions import get_args, Literal

import numpy as np
import torch

from transformer_lens import HookedTransformer, ActivationCache
# from transformer_lens.utils import is_lower_triangular, is_square

HeadName = Literal["previous_token_head", "duplicate_token_head", "induction_head"]
HEAD_NAMES = cast(List[HeadName], get_args(HeadName))
ErrorMeasure = Literal["abs", "mul"]

LayerHeadTuple = Tuple[int, int]
LayerToHead = Dict[int, List[int]]

INVALID_HEAD_NAME_ERR = (
    f"detection_pattern must be a Tensor or one of head names: {HEAD_NAMES}; got %s"
)

SEQ_LEN_ERR = (
    "The sequence must be non-empty and must fit within the model's context window."
)

DET_PAT_NOT_SQUARE_ERR = "The detection pattern must be a lower triangular matrix of shape (sequence_length, sequence_length); sequence_length=%d; got detection patern of shape %s"


def detect_head(
    model: HookedTransformer,
    seq: Union[str, List[str]],
    detection_pattern: Union[torch.Tensor, HeadName],
    heads: Optional[Union[List[LayerHeadTuple], LayerToHead]] = None,
    cache: Optional[ActivationCache] = None,
    *,
    exclude_bos: bool = False,
    exclude_current_token: bool = False,
    error_measure: ErrorMeasure = "mul",
) -> torch.Tensor:
    """Searches the model (or a set of specific heads, for circuit analysis) for a particular type of attention head.
    This head is specified by a detection pattern, a (sequence_length, sequence_length) tensor representing the attention pattern we expect that type of attention head to show.
    The detection pattern can be also passed not as a tensor, but as a name of one of pre-specified types of attention head (see `HeadName` for available patterns), in which case the tensor is computed within the function itself.

    There are two error measures available for quantifying the match between the detection pattern and the actual attention pattern.

    1. `"mul"` (default) multiplies both tensors element-wise and divides the sum of the result by the sum of the attention pattern.
    Typically, the detection pattern should in this case contain only ones and zeros, which allows a straightforward interpretation of the score:
    how big fraction of this head's attention is allocated to these specific query-key pairs?
    Using values other than 0 or 1 is not prohibited but will raise a warning (which can be disabled, of course).
    2. `"abs"` calculates the mean element-wise absolute difference between the detection pattern and the actual attention pattern.
    The "raw result" ranges from 0 to 2 where lower score corresponds to greater accuracy. Subtracting it from 1 maps that range to (-1, 1) interval,
    with 1 being perfect match and -1 perfect mismatch.

    **Which one should you use?** `"abs"` is likely better for quick or exploratory investigations. For precise examinations where you're trying to
    reproduce as much functionality as possible or really test your understanding of the attention head, you probably want to switch to `"abs"`.

    The advantage of `"abs"` is that you can make more precise predictions, and have that measured in the score.
    You can predict, for instance, 0.2 attention to X, and 0.8 attention to Y, and your score will be better if your prediction is closer.
    The "mul" metric does not allow this, you'll get the same score if attention is 0.2, 0.8 or 0.5, 0.5 or 0.8, 0.2.

    Args:
    ----------
        model: Model being used.
        seq: String or list of strings being fed to the model.
        head_name: Name of an existing head in HEAD_NAMES we want to check. Must pass either a head_name or a detection_pattern, but not both!
        detection_pattern: (sequence_length, sequence_length) Tensor representing what attention pattern corresponds to the head we're looking for **or** the name of a pre-specified head. Currently available heads are: `["previous_token_head", "duplicate_token_head", "induction_head"]`.
        heads: If specific attention heads is given here, all other heads' score is set to -1. Useful for IOI-style circuit analysis. Heads can be spacified as a list tuples (layer, head) or a dictionary mapping a layer to heads within that layer that we want to analyze.
        cache: Include the cache to save time if you want.
        exclude_bos: Exclude attention paid to the beginning of sequence token.
        exclude_current_token: Exclude attention paid to the current token.
        error_measure: `"mul"` for using element-wise multiplication (default). `"abs"` for using absolute values of element-wise differences as the error measure.

    Returns:
    ----------
    A (n_layers, n_heads) Tensor representing the score for each attention head.

    Example:
    --------
    .. code-block:: python

        >>> from transformer_lens import HookedTransformer,  utils
        >>> from transformer_lens.head_detector import detect_head
        >>> import plotly.express as px

        >>> def imshow(tensor, renderer=None, xaxis="", yaxis="", **kwargs):
        >>>     px.imshow(utils.to_numpy(tensor), color_continuous_midpoint=0.0, color_continuous_scale="RdBu", labels={"x":xaxis, "y":yaxis}, **kwargs).show(renderer)

        >>> model = HookedTransformer.from_pretrained("gpt2-small")
        >>> sequence = "This is a test sequence. This is a test sequence."

        >>> attention_score = detect_head(model, sequence, "previous_token_head")
        >>> imshow(attention_score, zmin=-1, zmax=1, xaxis="Head", yaxis="Layer", title="Previous Head Matches")
    """

    cfg = model.cfg
    tokens = model.to_tokens(seq).to(cfg.device)
    seq_len = tokens.shape[-1]

    # Validate error_measure

    assert error_measure in get_args(ErrorMeasure), f"Invalid {error_measure=}; valid values are {get_args(ErrorMeasure)}"

    # Validate detection pattern if it's a string
    if isinstance(detection_pattern, str):
        assert detection_pattern in HEAD_NAMES, (
            INVALID_HEAD_NAME_ERR % detection_pattern
        )
        if isinstance(seq, list):
            batch_scores = [detect_head(model, seq, detection_pattern) for seq in seq]
            return torch.stack(batch_scores).mean(0)
        detection_pattern = cast(
            torch.Tensor,
            eval(f"get_{detection_pattern}_detection_pattern(tokens.cpu())"),
        ).to(cfg.device)

    # if we're using "mul", detection_pattern should consist of zeros and ones
    if error_measure == "mul" and not set(detection_pattern.unique().tolist()).issubset(
        {0, 1}
    ):
        logging.warning(
            "Using detection pattern with values other than 0 or 1 with error_measure 'mul'"
        )

    # Validate inputs and detection pattern shape
    assert 1 < tokens.shape[-1] < cfg.n_ctx, SEQ_LEN_ERR
    assert (
        is_lower_triangular(detection_pattern) and seq_len == detection_pattern.shape[0]
    ), DET_PAT_NOT_SQUARE_ERR % (seq_len, detection_pattern.shape)

    if cache is None:
        _, cache = model.run_with_cache(tokens, remove_batch_dim=True)

    if heads is None:
        layer2heads = {
            layer_i: list(range(cfg.n_heads)) for layer_i in range(cfg.n_layers)
        }
    elif isinstance(heads, list):
        layer2heads = defaultdict(list)
        for layer, head in heads:
            layer2heads[layer].append(head)
    else:
        layer2heads = heads

    matches = -torch.ones(cfg.n_layers, cfg.n_heads)

    for layer, layer_heads in layer2heads.items():
        # [n_heads q_pos k_pos]
        layer_attention_patterns = cache["pattern", layer, "attn"]
        for head in layer_heads:
            head_attention_pattern = layer_attention_patterns[head, :, :]
            head_score = compute_head_attention_similarity_score(
                head_attention_pattern,
                detection_pattern=detection_pattern,
                exclude_bos=exclude_bos,
                exclude_current_token=exclude_current_token,
                error_measure=error_measure,
            )
            matches[layer, head] = head_score
    return matches


# Previous token head
def get_previous_token_head_detection_pattern(
    tokens: torch.Tensor,  # [batch (1) x pos]
) -> torch.Tensor:
    """Outputs a detection score for [previous token heads](https://dynalist.io/d/n2ZWtnoYHrU1s4vnFSAQ519J#z=0O5VOHe9xeZn8Ertywkh7ioc).

    Args:
      tokens: Tokens being fed to the model.
    """
    detection_pattern = torch.zeros(tokens.shape[-1], tokens.shape[-1])
    # Adds a diagonal of 1's below the main diagonal.
    detection_pattern[1:, :-1] = torch.eye(tokens.shape[-1] - 1)
    return torch.tril(detection_pattern)


# Duplicate token head
def get_duplicate_token_head_detection_pattern(
    tokens: torch.Tensor,  # [batch (1) x pos]
) -> torch.Tensor:
    """Outputs a detection score for [duplicate token heads](https://dynalist.io/d/n2ZWtnoYHrU1s4vnFSAQ519J#z=2UkvedzOnghL5UHUgVhROxeo).

    Args:
      sequence: String being fed to the model.
    """
    # [pos x pos]
    token_pattern = tokens.repeat(tokens.shape[-1], 1).numpy()

    # If token_pattern[i][j] matches its transpose, then token j and token i are duplicates.
    eq_mask = np.equal(token_pattern, token_pattern.T).astype(int)

    np.fill_diagonal(
        eq_mask, 0
    )  # Current token is always a duplicate of itself. Ignore that.
    detection_pattern = eq_mask.astype(int)
    return torch.tril(torch.as_tensor(detection_pattern).float())


# Induction head
def get_induction_head_detection_pattern(
    tokens: torch.Tensor,  # [batch (1) x pos]
) -> torch.Tensor:
    """Outputs a detection score for [induction heads](https://dynalist.io/d/n2ZWtnoYHrU1s4vnFSAQ519J#z=_tFVuP5csv5ORIthmqwj0gSY).

    Args:
      sequence: String being fed to the model.
    """
    duplicate_pattern = get_duplicate_token_head_detection_pattern(tokens)

    # Shift all items one to the right
    shifted_tensor = torch.roll(duplicate_pattern, shifts=1, dims=1)

    # Replace first column with 0's
    # we don't care about bos but shifting to the right moves the last column to the first,
    # and the last column might contain non-zero values.
    zeros_column = torch.zeros(duplicate_pattern.shape[0], 1)
    result_tensor = torch.cat((zeros_column, shifted_tensor[:, 1:]), dim=1)
    return torch.tril(result_tensor)


def get_supported_heads() -> None:
    """Returns a list of supported heads."""
    print(f"Supported heads: {HEAD_NAMES}")


def compute_head_attention_similarity_score(
    attention_pattern: torch.Tensor,  # [q_pos k_pos]
    detection_pattern: torch.Tensor,  # [seq_len seq_len] (seq_len == q_pos == k_pos)
    *,
    exclude_bos: bool,
    exclude_current_token: bool,
    error_measure: ErrorMeasure,
) -> float:
    """Compute the similarity between `attention_pattern` and `detection_pattern`.

    Args:
      attention_pattern: Lower triangular matrix (Tensor) representing the attention pattern of a particular attention head.
      detection_pattern: Lower triangular matrix (Tensor) representing the attention pattern we are looking for.
      exclude_bos: `True` if the beginning-of-sentence (BOS) token should be omitted from comparison. `False` otherwise.
      exclude_bcurrent_token: `True` if the current token at each position should be omitted from comparison. `False` otherwise.
      error_measure: "abs" for using absolute values of element-wise differences as the error measure. "mul" for using element-wise multiplication (legacy code).
    """
    assert is_square(
        attention_pattern
    ), f"Attention pattern is not square; got shape {attention_pattern.shape}"

    # mul

    if error_measure == "mul":
        if exclude_bos:
            attention_pattern[:, 0] = 0
        if exclude_current_token:
            attention_pattern.fill_diagonal_(0)
        score = attention_pattern * detection_pattern
        return (score.sum() / attention_pattern.sum()).item()

    # abs

    abs_diff = (attention_pattern - detection_pattern).abs()
    assert (abs_diff - torch.tril(abs_diff).to(abs_diff.device)).sum() == 0

    size = len(abs_diff)
    if exclude_bos:
        abs_diff[:, 0] = 0
    if exclude_current_token:
        abs_diff.fill_diagonal_(0)

    return 1 - round((abs_diff.mean() * size).item(), 3)


## Using Head Detector For Premade Heads and ETIAS Calculation



Load the model

In [None]:
from huggingface_hub import login
login(token='your-HuggingFace-token')

We used:
- `gemma-2-27b-it`
- `meta-llama/Llama-3.3-70B-Instruct`
- `Qwen/Qwen2.5-72B-Instruct`

But you can chose any model from [Model Properties List](https://transformerlensorg.github.io/TransformerLens/generated/model_properties_table.html) of TransformerLens lib

In [None]:
model = HookedTransformer.from_pretrained("gemma-2-27b-it", device=device)



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



config.json:   0%|          | 0.00/893 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/42.8k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/12 [00:00<?, ?it/s]

model-00001-of-00012.safetensors:   0%|          | 0.00/4.74G [00:00<?, ?B/s]

model-00002-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00003-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00004-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00005-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00006-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00007-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00008-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00009-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00010-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00011-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00012-of-00012.safetensors:   0%|          | 0.00/680M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/12 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/173 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/47.0k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]



Loaded pretrained model gemma-2-27b-it into HookedTransformer


See what heads are supported out of the box

In [None]:
get_supported_heads()

Supported heads: ('previous_token_head', 'duplicate_token_head', 'induction_head')


In [None]:
def find_subsequence(sequence, subsequence):
    """
    It searches for a sublist of a subsequence in the sequence list.
    Returns the index of the beginning of the first occurrence, or -1 if not found.
    """
    n = len(subsequence)
    for i in range(len(sequence) - n + 1):
        if sequence[i:i+n] == subsequence:
            return i
    return -1


def get_emotion_token_indices(model, prompt: str, emotion: str):
    """
    In a substitution prompt, {emotion} determines the position of the tokens,
    corresponding to the inserted emotion. Returns a list of indexes.
    """
    # We get tokens for full prompt:
    tokens = model.to_tokens(prompt).tolist()

    # We get tokens specifically for the emotion string (without the BOS token):
    emotion_tokens = model.to_tokens(emotion, prepend_bos=False).tolist()

    # We are trying to find tokens based on possible tokenization options.
    variants = [
        emotion_tokens,  # Original tokens
        [emotion],       # Full word
        [emotion.lower()],  # In lowercase
    ]

    for variant in variants:
        start_idx = find_subsequence(tokens, variant)
        if start_idx != -1:
            return list(range(start_idx, start_idx + len(variant)))

    # If the standard options didn't work, we'll try a more flexible search
    emotion_lower = emotion.lower()
    for i in range(len(tokens)):
        token_str = model.to_string(tokens[i]).lower().strip()
        if emotion_lower in token_str:
            return [i]

    raise ValueError(f"Couldn't find the token sequence for the emotion: {emotion}")


def compute_etias(model, prompt: str, emotion: str, layer_choice: int = -1):
    """
    Calculates the share of attention (ETIAS) given to tokens,
    appropriate emotions. By default, the last layer of the model is used.

    Args:
        model: LLM model.
        prompt (str): Prompt with emotion substitution.
        emotion (str): Emotion to analyze.
        layer_choice (int): The model layer (-1 means the last layer).

    Returns:
        float: The average share of attention on emotional tokens across all heads of the layer.

    Raises:
        ValueError: If emotional token indexes cannot be found.

    Example Usage:
        etias_score = compute_etias(model, prompt="...", emotion="anger")

        print(f"ETIAS Score for 'anger': {etias_score:.4f}")

    Notes:
      • The self-attention matrix from the selected model layer is used.
      • Attention is normalized throughout the sequence.
      • Only the heads of the selected layer (or all layers) are counted.
    """
    tokens = model.to_tokens(prompt).to(model.cfg.device)

    logits, cache = model.run_with_cache(tokens, remove_batch_dim=True)

    # Extracting the attention matrices for the selected layer
    attn_matrices = cache["pattern", layer_choice, "attn"]  # [n_heads x seq_len x seq_len]

    emo_indices = get_emotion_token_indices(model, prompt, emotion)

    # Compute attention to the emotional tokens for each head.
    scores = []

    for head in range(attn_matrices.shape[0]):
        head_attn = attn_matrices[head]  # [seq_len x seq_len]

        # The summ of attention paid to emotional tokens
        emo_attn = head_attn[:, emo_indices].sum()

        # The total summ of attention over the entire sequence
        total_attn = head_attn.sum()

        scores.append(emo_attn.item() / total_attn.item())

    return np.mean(scores)


def compute_diff_etias(model, prompt_template: str, emotion: str, layer_choice: int = -1):
    """
    Compares the ETIAS for the prompt with the given emotion and the base case ("none").

    Args:
        model: LLM model.
        prompt_template (str): A prompt template with {emotion}.
        emotion (str): Emotion to analyze.
        layer_choice (int): The model layer (-1 means the last layer).

    Returns:
        float: The difference between the ETIAS for a given emotion and "none".

    Example Usage:
        diff_etias_score = compute_diff_etias(model, template="...", emotion="anger")

        print(f"Diff ETIAS Score for 'anger': {diff_etias_score:.4f}")

     Notes:
       • Uses `compute_etias' to calculate ETIAS values.
       • The greater the difference between the values, the stronger the influence of emotion on the model's attention.
     """

    prompt_emotion = prompt_template.format(emotion=emotion)
    prompt_none = prompt_template.format(emotion="none")

    etias_emotion = compute_etias(model, prompt_emotion, emotion, layer_choice=layer_choice)
    etias_none = compute_etias(model, prompt_none, "none", layer_choice=layer_choice)

    return etias_emotion - etias_none

def compute_diff_detection( # THIS METRIC DOES NOT USE IN THE EMOTIONAL PROMPTING ANALYSIS
    model,
    prompt_template: str,
    emotion: str,
    detection_pattern: str = "previous_token_head"
):
    """
    Compares the detection scores for the prompt with a specific emotion and the base prompt (with "none").

    Args:
        model: LLM model.
        prompt_template (str): A prompt template with {emotion}.
        emotion (str): Emotion to analyze.
        detection_pattern (str): The type of detection pattern (for example, "previous_token_head").

    Returns:
        float: The difference between the detection scores for a given emotion and "none".

    Example Usage:
        diff_detection_score = compute_diff_detection(model, template="...", emotion="anger")

        print(f"Diff Detection Score for 'anger': {diff_detection_score:.4f}")

     Notes:
       • Uses the detect_head function to calculate detection scores.
       • The greater the difference between the values, the stronger the influence of emotion on the attention patterns of the model.
     """

    from transformer_lens.head_detector import detect_head

    prompt_emotion = prompt_template.format(emotion=emotion)
    prompt_none = prompt_template.format(emotion="none")

    # We calculate the detection scores for both cases
    head_scores_emotion = detect_head(model, prompt_emotion, detection_pattern)
    head_scores_none = detect_head(model, prompt_none, detection_pattern)

    score_emotion = head_scores_emotion.mean().item()
    score_none = head_scores_none.mean().item()

    return score_emotion - score_none


import csv
import os
from datetime import datetime

def evaluate_emotional_prompts(model, prompts: dict, emotions: list, output_dir='results'):
    """
    Calculates all metrics for each prompt and each emotion, as well as the average values of metrics for all emotions.
    Saves the results to CSV files.

    Args:
        model: LLM model.
        prompts (dict): A dictionary of industrial patterns with key names.
        emotions (list): A list of emotions to analyze.
        output_dir (str): The directory for saving the results.

    Returns:
        dict: Results in the form of a dictionary with metrics for each prompt and each emotion,
              including the average values of metrics for all emotions.
    """

    os.makedirs(output_dir, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    results = {}

    for prompt_name, prompt_template in prompts.items():
        csv_filename = os.path.join(output_dir, f"{prompt_name}_{timestamp}_results.csv")

        results[prompt_name] = {}
        print(f"\nPrompt: {prompt_name}")

        with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)

            csv_writer.writerow([
                'Prompt', 'Emotion', 'ETIAS', 'Diff_ETIAS', 'Diff_Detection',
                'Error (if any)'
            ])

            etias_scores = []
            diff_etias_scores = []
            diff_detection_scores = []

            for emotion in emotions:
                try:
                    etias_score = compute_etias(model, prompt_template.format(emotion=emotion), emotion)
                    diff_etias_score = compute_diff_etias(model, prompt_template, emotion)

                    diff_detection_score = compute_diff_detection(model, prompt_template, emotion) # THIS METRIC DOES NOT USE IN THE EMOTIONAL PROMPTING ANALYSIS

                    results[prompt_name][emotion] = {
                        "ETIAS": etias_score,
                        "Diff_ETIAS": diff_etias_score,
                        "Diff_Detection": diff_detection_score,
                    }

                    csv_writer.writerow([
                        prompt_name, emotion,
                        f"{etias_score:.4f}",
                        f"{diff_etias_score:.4f}",
                        f"{diff_detection_score:.4f}",
                        ""
                    ])

                    etias_scores.append(etias_score)
                    diff_etias_scores.append(diff_etias_score)
                    diff_detection_scores.append(diff_detection_score)

                    print(
                        f"  Emotion: {emotion} | "
                        f"ETIAS: {etias_score:.4f} | "
                        f"Diff_ETIAS: {diff_etias_score:.4f} | "
                        f"Diff_Detection: {diff_detection_score:.4f}"
                    )

                except Exception as e:
                    error_msg = str(e)
                    print(f"  Error for emotion '{emotion}': {error_msg}")


                    csv_writer.writerow([
                        prompt_name, emotion,
                        "", "", "",
                        error_msg
                    ])

                    results[prompt_name][emotion] = {"error": error_msg}

            if etias_scores and diff_etias_scores and diff_detection_scores:
                average_etias = sum(etias_scores) / len(etias_scores)
                average_diff_etias = sum(diff_etias_scores) / len(diff_etias_scores)
                average_diff_detection = sum(diff_detection_scores) / len(diff_detection_scores)

                results[prompt_name]["average"] = {
                    "ETIAS": average_etias,
                    "Diff_ETIAS": average_diff_etias,
                    "Diff_Detection": average_diff_detection,
                }

                csv_writer.writerow([
                    prompt_name, "AVERAGE",
                    f"{average_etias:.4f}",
                    f"{average_diff_etias:.4f}",
                    f"{average_diff_detection:.4f}",
                    ""
                ])

                print(
                    f"  Average values for all emotions | "
                    f"ETIAS: {average_etias:.4f} | "
                    f"Diff_ETIAS: {average_diff_etias:.4f} | "
                    f"Diff_Detection: {average_diff_detection:.4f}"
                )

        print(f"The results are saved in {csv_filename}")

    return results

In [None]:
results = evaluate_emotional_prompts(model, PROMPTS, EMOTIONS)


Промпт: simple1
  Эмоция: anger | ETIAS: 0.5235 | Diff_ETIAS: 0.0062 | Diff_Detection: 0.0002
  Эмоция: disgust | ETIAS: 0.5206 | Diff_ETIAS: 0.0033 | Diff_Detection: 0.0003
  Эмоция: fear | ETIAS: 0.5207 | Diff_ETIAS: 0.0035 | Diff_Detection: -0.0002
  Эмоция: happiness | ETIAS: 0.5209 | Diff_ETIAS: 0.0036 | Diff_Detection: 0.0006
  Эмоция: none | ETIAS: 0.5173 | Diff_ETIAS: 0.0000 | Diff_Detection: 0.0000
  Эмоция: sadness | ETIAS: 0.5216 | Diff_ETIAS: 0.0043 | Diff_Detection: 0.0001
  Эмоция: surprise | ETIAS: 0.5178 | Diff_ETIAS: 0.0005 | Diff_Detection: 0.0002
  Средние значения по всем эмоциям | ETIAS: 0.5203 | Diff_ETIAS: 0.0031 | Diff_Detection: 0.0002
Результаты сохранены в results/simple1_20250208_073201_results.csv

Промпт: simple2
  Эмоция: anger | ETIAS: 0.5233 | Diff_ETIAS: 0.0061 | Diff_Detection: 0.0005
  Эмоция: disgust | ETIAS: 0.5199 | Diff_ETIAS: 0.0026 | Diff_Detection: 0.0005
  Эмоция: fear | ETIAS: 0.5208 | Diff_ETIAS: 0.0035 | Diff_Detection: 0.0002
  Эмоция: h

Let's test detecting previous token head in the following prompt.

In [None]:
prompt = "Also now you feel anger, which will impact your reasoning."
head_scores = detect_head(model, prompt, "previous_token_head", exclude_bos=True, exclude_current_token=True)
plot_head_detection_scores(head_scores, title="Previous Head Matches Gemma-2")

In [None]:
prompt = "Also now you feel none, which will influence how open-minded or rigid your responses may be."
head_scores = detect_head(model, prompt, "previous_token_head", exclude_bos=True, exclude_current_token=True)
plot_head_detection_scores(head_scores, title="Previous Head Matches Gemma-2")

We can see L41 of previous token detection. Let's take a look and see if that pans out.

In [None]:
_, cache = model.run_with_cache(prompt)

In [None]:
prompt = "Also now you feel sadness, which will impact your reasoning."
_, cache = model.run_with_cache(prompt)
plot_attn_pattern_from_cache(cache, 41)

Layer 41 Attention Heads:
pysvelte components appear to be unbuilt or stale
Running npm install...
Building pysvelte components with webpack...


### Other

- Extending to few-shot learning/translation heads
- More pre-specified heads?
- For inspiration, see [this post from Neel](https://www.lesswrong.com/s/yivyHaCAmMJ3CqSyj/p/btasQF7wiCYPsr5qw)