In [1]:
import torch
from torch import nn
from transformers import LlamaForCausalLM, LlamaTokenizer
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Variables
MODEL_PATH = "/opt/extra/avijit/projects/rlof/Meta-Llama-3.1-8B-Instruct"

In [5]:
class CVDataset(Dataset):
    def __init__(self, sentences, tokenizer, max_length=128):
        self.sentences = sentences
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.sentences)
    
    def __getitem__(self, idx):
        return self.tokenizer(self.sentences[idx], 
                            truncation=True, 
                            max_length=self.max_length, 
                            padding='max_length', 
                            return_tensors='pt')

class CVExtractor:
    def __init__(self, model_name=MODEL_PATH, device="cuda"):
        self.device = device
        self.model = LlamaForCausalLM.from_pretrained(model_name, 
                                                     output_hidden_states=True,
                                                     device_map=device)
        self.tokenizer = LlamaTokenizer.from_pretrained(model_name)
        self.model.eval()
    
    def extract_cvs(self, input_ids, attention_mask, target_layer=4):
        """Extract control vectors from specified layer."""
        with torch.no_grad():
            outputs = self.model(input_ids=input_ids,
                               attention_mask=attention_mask,
                               output_hidden_states=True)
            hidden_states = outputs.hidden_states[target_layer]
            # Average over sequence length for fixed-size representation
            cv = torch.mean(hidden_states * attention_mask.unsqueeze(-1), dim=1)
            return cv
    
    def generate_adversarial_cv(self, original_cv, epsilon=0.1, steps=10):
        """Generate adversarial CV using gradient ascent to maximize L2 distance."""
        adv_cv = original_cv.clone().detach().requires_grad_(True)
        optimizer = torch.optim.Adam([adv_cv], lr=0.01)
        
        for _ in range(steps):
            # Forward pass through remaining layers
            loss = -torch.norm(self.project_to_final_layer(adv_cv) - 
                             self.project_to_final_layer(original_cv))
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            # Constrain perturbation size
            delta = adv_cv - original_cv
            delta = epsilon * delta / torch.norm(delta)
            adv_cv.data = original_cv + delta
            
        return adv_cv.detach()
    
    def project_to_final_layer(self, cv):
        """Project CV to final layer representation."""
        # This is a simplified projection - you'd need to implement the actual
        # layer-to-layer propagation based on model architecture
        return self.model.layers[-1].mlp(cv)

def generate_cv_dataset(sentences, batch_size=32, num_workers=4):
    """Generate dataset of normal and adversarial CVs."""
    extractor = CVExtractor()
    dataset = CVDataset(sentences, extractor.tokenizer)
    dataloader = DataLoader(dataset, batch_size=batch_size, 
                          num_workers=num_workers)
    
    normal_cvs = []
    adversarial_cvs = []
    
    for batch in tqdm(dataloader):
        input_ids = batch['input_ids'].squeeze(1).to(extractor.device)
        attention_mask = batch['attention_mask'].squeeze(1).to(extractor.device)
        
        # Extract normal CVs
        cvs = extractor.extract_cvs(input_ids, attention_mask)
        normal_cvs.append(cvs.cpu())
        
        # Generate adversarial CVs
        adv_cvs = torch.stack([
            extractor.generate_adversarial_cv(cv) 
            for cv in cvs
        ])
        adversarial_cvs.append(adv_cvs.cpu())
    
    return (torch.cat(normal_cvs, dim=0), 
            torch.cat(adversarial_cvs, dim=0))

In [6]:
# Example usage:
if __name__ == "__main__":
    sentences = [
        "This is a normal sentence.",
        "Another example sentence.",
        # ... add more sentences
    ]
    
    normal_cvs, adversarial_cvs = generate_cv_dataset(sentences)
    print(f"Generated {len(normal_cvs)} normal and adversarial CV pairs")

Loading checkpoint shards: 100%|██████████| 4/4 [00:13<00:00,  3.28s/it]
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'PreTrainedTokenizerFast'. 
The class this function is called from is 'LlamaTokenizer'.
You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama.LlamaTokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message


TypeError: not a string