## **Step 0**: Getting access to Hugging Face account

In [None]:
# Run this cell first to log in to Hugging Face
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

## **Step 1**: Installing all the dependencies

In [None]:
!pip install transformers torch scikit-learn accelerate tqdm pandas openpyxl -q

import pandas as pd
import re
import os
import json
from typing import List, Dict, Tuple, Callable, Optional
from dataclasses import dataclass
import numpy as np
import torch
from torch import nn
from transformers import AutoTokenizer, AutoModelForCausalLM
from sklearn.linear_model import LogisticRegression
from sklearn.decomposition import PCA
from tqdm import tqdm
import sys

## **Step 2**: Extracting Activations

In [None]:
class ActivationExtractor:
    def __init__(self, model_name: str, device: str):
        self.device = device
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        self._layers_attr_path = self._find_layer_attr_path()
        self.num_layers = len(self._get_layers_list())
        print(f"[ActivationExtractor] Model loaded. Path: {self._layers_attr_path}, Layers: {self.num_layers}")

    def _find_layer_attr_path(self):
        candidates = [["model", "layers"], ["transformer", "h"], ["model", "decoder", "layers"]]
        for path in candidates:
            cur = self.model
            valid = True
            for p in path:
                if hasattr(cur, p): cur = getattr(cur, p)
                else: valid = False; break
            if valid and isinstance(cur, (list, nn.ModuleList)): return path
        raise AttributeError("Could not find transformer layer list in model.")

    def _get_layers_list(self):
        cur = self.model
        for p in self._layers_attr_path: cur = getattr(cur, p)
        return list(cur)

    def _resolve_layer_idx(self, idx: int):
        L = self.num_layers
        if idx < 0: idx = L + idx
        assert 0 <= idx < L, f"layer_index {idx} out of range"
        return idx

    def generate(self, prompt: str, max_new_tokens: int = 150, **kwargs) -> str:
        tok = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        out = self.model.generate(**tok, max_new_tokens=max_new_tokens, pad_token_id=self.tokenizer.pad_token_id, **kwargs)
        full_text = self.tokenizer.decode(out[0], skip_special_tokens=True)
        return full_text[len(prompt):].strip()

    @torch.no_grad()
    def get_activation_for_pair(self, input_text: str, output_text: str, layer_index: int) -> np.ndarray:
        idx = self._resolve_layer_idx(layer_index)
        concat = f"{input_text.strip()} {output_text.strip()}"
        tok = self.tokenizer(concat, return_tensors="pt").to(self.device)
        outputs = self.model(**tok, output_hidden_states=True, return_dict=True)
        hs = outputs.hidden_states[idx + 1]
        return hs[0, -1, :].detach().cpu().numpy()

# Style Vector Extraction Methods

def compute_mean_difference(pos: np.ndarray, neg: np.ndarray) -> np.ndarray:
    diff = (pos - neg).mean(axis=0)
    return diff / (np.linalg.norm(diff) + 1e-12)

def compute_logistic_regression(pos: np.ndarray, neg: np.ndarray) -> np.ndarray:
    X = np.vstack([pos, neg])
    y = np.concatenate([np.ones(len(pos)), np.zeros(len(neg))])
    clf = LogisticRegression(max_iter=1000).fit(X, y)
    w = clf.coef_.reshape(-1)
    return w / (np.linalg.norm(w) + 1e-12)

def compute_pca_vector(pos: np.ndarray, neg: np.ndarray) -> np.ndarray:
    diffs = pos - neg
    pca = PCA(n_components=1).fit(np.vstack([diffs, -diffs]))
    vec = pca.components_[0]
    return vec / (np.linalg.norm(vec) + 1e-12)

## **Step 3**: Steering Mechanism

In [None]:
class SteeringHook:
    def __init__(self, model, layer_path, layer_idx, style_vector, multiplier, device):
        self.model, self.layer_path, self.layer_idx = model, layer_path, layer_idx
        # Convert style_vector to the model's dtype (assuming model uses float16)
        self.style_vector = torch.from_numpy(style_vector).to(device).to(torch.float16) * multiplier
        self.handle = None
        self._register_hook()

    def _get_layer_module(self):
        cur = self.model
        for p in self.layer_path: cur = getattr(cur, p)
        idx = self.layer_idx if self.layer_idx >= 0 else len(cur) + self.layer_idx
        return cur[idx]

    def _hook(self, module, input, output):
        tensor_output = output[0] if isinstance(output, tuple) else output
        add_vec = self.style_vector.view(1, 1, -1)
        modified_tensor = tensor_output + add_vec.to(tensor_output.device)
        return (modified_tensor,) + output[1:] if isinstance(output, tuple) else modified_tensor

    def _register_hook(self):
        self.handle = self._get_layer_module().register_forward_hook(self._hook)

    def remove(self):
        if self.handle: self.handle.remove()

## **Step 4**: Main Pipeline

In [None]:
def prepare_user_history_from_xlsx(file_path: str):
    try:
        df = pd.read_excel(file_path)
        df = df.head(20) # Use a small subset to ensure timely completion
        print(f"INFO: Using only the first {len(df)} examples from the file.")
    except Exception as e:
        print(f"An error occurred while reading the Excel file: {e}")
        return None

    if 'Neutral_email' not in df.columns or 'User_email' not in df.columns:
        print("Error: Excel file must contain 'Neutral_email' and 'User_email' columns.")
        return None

    generic_query = "Generate an email about an event."
    training_examples = []
    for index, row in df.iterrows():
        neutral_email = row['Neutral_email']
        user_email = row['User_email']
        if pd.notna(neutral_email) and pd.notna(user_email):
            training_examples.append((generic_query, user_email, neutral_email))

    return {"user_1": training_examples}


def run_pipeline_example(model_name: str, layer_index: int, device: str, xlsx_path: str):
    user_hist = prepare_user_history_from_xlsx(xlsx_path)
    if not user_hist:
        print("Halting execution due to data loading error.")
        return

    ae = ActivationExtractor(model_name, device)

    user_style_vectors = {}
    for user_id, examples in user_hist.items():
        print(f"\n[Pipeline] Processing user '{user_id}' with {len(examples)} examples from Excel file...")
        pos_acts, neg_acts = [], []
        for (inp, user_out, neutral_out) in tqdm(examples, desc="Extracting activations"):
            pos_acts.append(ae.get_activation_for_pair(inp, user_out, layer_index))
            neg_acts.append(ae.get_activation_for_pair(inp, neutral_out, layer_index))

        pos_arr, neg_arr = np.vstack(pos_acts), np.vstack(neg_acts)
        user_style_vectors[user_id] = {
            "mean": compute_mean_difference(pos_arr, neg_arr),
            "logreg": compute_logistic_regression(pos_arr, neg_arr),
            "pca": compute_pca_vector(pos_arr, neg_arr)
        }
        print(f"[Pipeline] Computed style vectors for '{user_id}'.")

    # --- Steering Demonstration with Updated Test Case ---
    test_query = "Draft an email invitation for the Final Demo of the Zeta Feature, scheduled for November 5, 2025, at 3:30 PM in the Large Auditorium. The event is hosted and sent by Casey."
    ideal_styled_response = "Subject: Final Demo of the Zeta Feature – You’re Invited!\nHey everyone,\nWe’re wrapping up the Zeta Feature, and I’d love for you to join us for the final demo. The team’s put in a lot of work, and it’s time to see it all come together.\nWhen: November 5, 2025, at 3:30 PM\nWhere: Large Auditorium\nCome by to check it out, share your thoughts, and hang around for a bit afterward.\nSee you there,\nCasey"
    prompt = f"{test_query}\n\nSubject: Final Demo of the Zeta Feature – You’re Invited!\n\n"

    print("\n" + "="*50)
    print("Steering Demonstration")
    print("="*50)
    print(f"Test Query:\n{test_query}")
    print("\n--- Ideal Styled Response (Premade for Comparison) ---")
    print(ideal_styled_response)

    # --- Generate a response for each method ---
    for method, style_vec in user_style_vectors["user_1"].items():
        print(f"\n--- Steered Generated Response (Live, Method: {method.upper()}) ---")

        hook = SteeringHook(ae.model, ae._layers_attr_path, layer_index, style_vec, 2.0, device)
        try:
            steered_out = ae.generate(prompt, temperature=0.7, do_sample=True, top_p=0.9)
            # We add the subject line back manually for a clean, complete email format
            print(f"Subject: Final Demo of the Zeta Feature – You’re Invited!\n\n{steered_out}")
        finally:
            hook.remove()

    return user_style_vectors

## **Step 5**: Driver Code

In [None]:
if __name__ == "__main__":
    if 'ipykernel' in sys.modules: sys.argv = sys.argv[:1]

    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--model", type=str, default="meta-llama/Llama-2-7b-hf")
    parser.add_argument("--layer", type=int, default=-15)
    parser.add_argument("--device", type=str, default="cuda" if torch.cuda.is_available() else "cpu")
    parser.add_argument("--xlsx_file", type=str, default="email_invitations_output_1.xlsx")
    args = parser.parse_args()

    print(f"Running pipeline: model={args.model}, layer={args.layer}, device={args.device}")
    run_pipeline_example(args.model, args.layer, args.device, args.xlsx_file)
    print("\nPipeline finished successfully.")

Running pipeline: model=meta-llama/Llama-2-7b-hf, layer=-15, device=cuda
INFO: Using only the first 20 examples from the file.


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/609 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json:   0%|          | 0.00/26.8k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.50G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.98G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/188 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/776 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.84M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/414 [00:00<?, ?B/s]

[ActivationExtractor] Model loaded. Path: ['model', 'layers'], Layers: 32

[Pipeline] Processing user 'user_1' with 20 examples from Excel file...


Extracting activations: 100%|██████████| 20/20 [00:05<00:00,  3.44it/s]


[Pipeline] Computed style vectors for 'user_1'.

Steering Demonstration
Test Query:
Draft an email invitation for the Final Demo of the Zeta Feature, scheduled for November 5, 2025, at 3:30 PM in the Large Auditorium. The event is hosted and sent by Casey.

--- Ideal Styled Response (Premade for Comparison) ---
Subject: Final Demo of the Zeta Feature – You’re Invited!
Hey everyone,
We’re wrapping up the Zeta Feature, and I’d love for you to join us for the final demo. The team’s put in a lot of work, and it’s time to see it all come together.
When: November 5, 2025, at 3:30 PM
Where: Large Auditorium
Come by to check it out, share your thoughts, and hang around for a bit afterward.
See you there,
Casey

--- Steered Generated Response (Live, Method: MEAN) ---
Subject: Final Demo of the Zeta Feature – You’re Invited!

**Invitation Text**

Dear Casey,

I’m Casey, the product manager of the Zeta Feature.

I’d like to invite you to the final demo of the Zeta Feature on November 5th at 3:30 