In [105]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

def prepare_emotion_dataset(csv_path):
    """
    Prepare emotion dataset from CSV file with intensity information.
    CSV should have columns: scenario, text, emotion, intensity
    Returns data formatted for emotion control training
    """
    # Read CSV file
    df = pd.read_csv(csv_path)
    
    # Dictionary to store formatted data
    formatted_data = {}
    
    # Process each emotion
    for emotion in df['emotion'].unique():
        # Get data for this emotion
        emotion_data = df[df['emotion'] == emotion].copy()
        
        # Sort by intensity to maintain ordinal relationship
        emotion_data = emotion_data.sort_values('intensity')
        
        # Split into train and test
        train_data, test_data = train_test_split(
            emotion_data,
            test_size=0.2,
            random_state=42,
            stratify=emotion_data['intensity']  # Maintain intensity distribution
        )
        
        # Store in required format
        formatted_data[emotion] = {
            'train': {
                'data': train_data['text'].tolist(),
                'scenarios': train_data['scenario'].tolist(),
                'intensities': train_data['intensity'].tolist()
            },
            'test': {
                'data': test_data['text'].tolist(),
                'scenarios': test_data['scenario'].tolist(),
                'intensities': test_data['intensity'].tolist()
            }
        }
    
    return formatted_data

# def analyze_emotion_distribution(formatted_data):
#     """
#     Print statistics about the dataset including intensity distribution
#     """
#     print("Dataset Statistics:")
#     for emotion in formatted_data:
#         train_size = len(formatted_data[emotion]['train']['data'])
#         test_size = len(formatted_data[emotion]['test']['data'])
        
#         # Get intensity statistics
#         train_intensities = formatted_data[emotion]['train']['intensities']
        
#         print(f"\n{emotion}:")
#         print(f"  Train samples: {train_size}")
#         print(f"  Test samples: {test_size}")
#         print(f"  Intensity distribution (train):")
#         for intensity in sorted(set(train_intensities)):
#             count = train_intensities.count(intensity)
#             print(f"    Intensity {intensity}: {count} samples")
            
#         # Print example for each intensity level
#         print("\n  Examples by intensity:")
#         train_data = formatted_data[emotion]['train']
#         for intensity in sorted(set(train_intensities)):
#             idx = train_intensities.index(intensity)
#             print(f"    Intensity {intensity}: {train_data['data'][idx][:100]}...")

# format of formatted_data
# {
#     "joy": {
#         "train": {
#             "data": ["text1", "text2", ...],
#             "scenarios": ["scenario1", "scenario2", ...],
#             "intensities": [1, 2, ...]
#         },
#         "test": {
#             "data": ["text1", "text2", ...],
#             "scenarios": ["scenario1", "scenario2", ...],
#             "intensities": [1, 2, ...]
#         }
#     },
#     ...
# }

In [106]:
from transformers import AutoTokenizer, AutoModelForCausalLM 
import torch

def setup_model_and_tokenizer(model_name="EleutherAI/gpt-neo-1.3B"):
    """
    Setup the model and tokenizer
    """
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        device_map="auto"
    ).eval()
    
    tokenizer = AutoTokenizer.from_pretrained(
        model_name,
        use_fast=True,
        padding_side="left",
        legacy=False
    )
    
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token_id = 0
    tokenizer.bos_token_id = 1
    
    return model, tokenizer

In [32]:
import torch
import torch.nn.functional as F
import re
from typing import List, Dict, Optional, Union
import logging

In [107]:
class RepReader:
    def __init__(self, model, tokenizer, hidden_layers):
        self.model = model
        self.tokenizer = tokenizer
        self.hidden_layers = hidden_layers
        self.directions = {}
        self.direction_signs = {}
        self.emotion_centroids = {}  # Store emotion centroids for each intensity level
        self.max_length = 512

    def get_hidden_states(self, text: str, padding_length: int = None) -> Dict[int, torch.Tensor]:
        """
        Get hidden states for all specified layers with consistent padding
        """
        pad_length = padding_length if padding_length is not None else self.max_length
        
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            padding='max_length',
            truncation=True,
            max_length=pad_length
        ).to(self.model.device)
        
        hidden_states = {layer: [] for layer in self.hidden_layers}
        
        def hook_fn(module, input, output, layer_id):
            hidden_states[layer_id] = output[0].detach()
        
        hooks = []
        for layer_id in self.hidden_layers:
            layer = self.model.transformer.h[abs(layer_id)]
            hook = layer.register_forward_hook(
                lambda mod, inp, out, layer_id=layer_id: hook_fn(mod, inp, out, layer_id)
            )
            hooks.append(hook)
        
        with torch.no_grad():
            self.model(**inputs)
        
        for hook in hooks:
            hook.remove()
        
        return hidden_states

    def get_directions(self, texts: List[str], intensities: List[int], emotion: str) -> Dict[int, torch.Tensor]:
        """
        Get emotion direction vectors using intensity-aware approach
        """
        # First determine the maximum sequence length needed
        max_seq_length = 0
        for text in texts:
            tokens = self.tokenizer(text, truncation=True, max_length=self.max_length)
            max_seq_length = max(max_seq_length, len(tokens['input_ids']))

        # Get hidden states for all texts
        all_hidden_states = []
        for text in texts:
            try:
                states = self.get_hidden_states(text, padding_length=max_seq_length)
                all_hidden_states.append(states)
            except Exception as e:
                print(f"Error processing text: {text[:50]}...")
                print(f"Error: {str(e)}")
                continue

        if not all_hidden_states:
            print("No valid hidden states were generated.")
            return self.directions

        # Initialize storage for intensity-specific centroids
        if emotion not in self.emotion_centroids:
            self.emotion_centroids[emotion] = {}

        # Calculate directions for each layer
        for layer in self.hidden_layers:
            try:
                # Stack hidden states
                layer_states = torch.stack([states[layer] for states in all_hidden_states])
                
                # Average across sequence length dimension
                layer_states = layer_states.mean(dim=2)  # [batch_size, hidden_dim]
                
                # Move to CPU for processing
                layer_states_cpu = layer_states.cpu()
                intensities_tensor = torch.tensor(intensities).cpu()
                
                # Calculate centroids for each intensity level
                unique_intensities = torch.unique(intensities_tensor)
                intensity_centroids = {}
                
                for intensity in unique_intensities:
                    mask = intensities_tensor == intensity
                    if mask.any():
                        centroid = layer_states_cpu[mask].mean(dim=0)
                        intensity_centroids[int(intensity.item())] = centroid

                # Calculate direction as the vector between lowest and highest intensity
                min_intensity = min(intensity_centroids.keys())
                max_intensity = max(intensity_centroids.keys())
                
                direction = intensity_centroids[max_intensity] - intensity_centroids[min_intensity]
                direction = direction.squeeze()  # Ensure direction is 1D
                direction = direction / torch.norm(direction)
                
                # Store the direction and centroids
                self.directions[layer] = direction.to(self.model.device)
                self.direction_signs[layer] = torch.tensor(1.0).to(self.model.device)
                
                if layer not in self.emotion_centroids[emotion]:
                    self.emotion_centroids[emotion][layer] = {}
                self.emotion_centroids[emotion][layer].update({
                    k: v.to(self.model.device) 
                    for k, v in intensity_centroids.items()
                })
                
            except Exception as e:
                print(f"Error processing layer {layer}: {str(e)}")
                continue
        
        return self.directions

    def get_emotion_scores(self, text: str, emotion: str = None, target_intensity: int = None) -> Dict[int, float]:
        """
        Get emotion scores for each layer, optionally comparing to a specific intensity level
        """
        states = self.get_hidden_states(text)
        scores = {}
        
        for layer in self.hidden_layers:
            if layer in self.directions and emotion in self.emotion_centroids:
                # Average across sequence length
                avg_states = states[layer].mean(dim=1).reshape(-1)
                
                if target_intensity is not None and target_intensity in self.emotion_centroids[emotion][layer]:
                    # Compare to specific intensity centroid
                    centroid = self.emotion_centroids[emotion][layer][target_intensity]
                    score = F.cosine_similarity(
                        avg_states.unsqueeze(0),
                        centroid.unsqueeze(0)
                    )
                else:
                    # Use direction-based scoring
                    score = torch.matmul(avg_states, self.directions[layer])
                
                scores[layer] = float(score * self.direction_signs[layer])
        
        return scores

In [108]:
class RepController:
    def __init__(
        self,
        model,
        tokenizer,
        rep_reader,
        max_modification: float = 2.0,
        base_scale_factor: float = 1.5
    ):
        self.model = model
        self.tokenizer = tokenizer
        self.rep_reader = rep_reader
        self.max_modification = max_modification
        self.base_scale_factor = base_scale_factor
        self.logger = logging.getLogger(__name__)

        # Set pad_token_id if it's None
        if self.tokenizer.pad_token_id is None:
            self.tokenizer.pad_token_id = self.tokenizer.eos_token_id

    def get_layer_scale(self, layer_id: int) -> float:
        """
        Calculate layer-specific scaling factor.
        Deeper layers get less modification.
        
        Args:
            layer_id: The ID of the layer
            
        Returns:
            float: Scaling factor for the layer
        """
        layer_depth = abs(layer_id)
        total_layers = len(self.rep_reader.hidden_layers)
        
        # Exponential decay for layer scaling
        scale = np.exp(-layer_depth / (total_layers / 2))
        return scale

    def create_emotion_hook(
        self,
        emotion: str,
        scale_factor: float,
        target_intensity: Optional[int] = None
    ):
        def hook_fn(module, input_tensor, output):
            try:
                layer_id = self.rep_reader.hidden_layers[0]
                if layer_id not in self.rep_reader.directions or emotion not in self.rep_reader.emotion_centroids:
                    return output

                hidden_states = output[0]
                batch_size, seq_length, hidden_dim = hidden_states.size()

                # Get layer-specific scaling
                layer_scale = self.get_layer_scale(layer_id)
                
                # Get direction and calculate emotion component
                direction = self.rep_reader.directions[layer_id]
                emotion_component = direction.unsqueeze(0) * scale_factor * layer_scale

                # Apply gradient clipping
                emotion_component = torch.clamp(
                    emotion_component,
                    min=-self.max_modification,
                    max=self.max_modification
                )

                # Reshape for broadcasting
                emotion_component = emotion_component.unsqueeze(1).expand(-1, seq_length, -1)
                
                # Apply modification
                modified_states = hidden_states + emotion_component
                
                return (modified_states,) + output[1:]

            except Exception as e:
                self.logger.error(f"Error in emotion hook: {str(e)}")
                return output

        return hook_fn

    def generate_with_emotion(
        self,
        text: str,
        emotion: str,
        intensity: float = 5.0,
        max_new_tokens: int = 50,
        num_return_sequences: int = 1,
        temperature: float = 0.7,
        **kwargs
    ) -> List[str]:
        # Format prompt
        prompt = f"Express {emotion} with a single concise response to: {text}\nResponse:"
        # prompt = f"Rephrase '{text}' to express {emotion} in a single concise sentence \nSentence:"

        
        # Tokenize
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            padding=True,
            truncation=True
        ).to(self.model.device)

        # Calculate scale factor
        scale_factor = (intensity / 10.0) * self.base_scale_factor

        # Register hooks
        hooks = []
        for layer_id in self.rep_reader.hidden_layers[:3]:  # Only use top 3 layers
            layer = self.model.transformer.h[abs(layer_id)]
            hook = layer.register_forward_hook(
                self.create_emotion_hook(emotion, scale_factor, None)
            )
            hooks.append(hook)

        try:
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                num_return_sequences=num_return_sequences,
                do_sample=True,
                temperature=temperature,
                top_k=30,
                top_p=0.85,
                no_repeat_ngram_size=3,
                repetition_penalty=1.3,
                pad_token_id=self.tokenizer.pad_token_id,
                **kwargs
            )

            # Process outputs
            generated_texts = []
            for output in outputs:
                text = self.tokenizer.decode(output[inputs['input_ids'].shape[1]:], skip_special_tokens=True)
                text = re.sub(r'([.,!?])\1+', r'\1', text)  # Clean up repeated punctuation
                text = re.sub(r'\s+', ' ', text).strip()    # Clean up whitespace
                generated_texts.append(text)

            return generated_texts

        finally:
            for hook in hooks:
                hook.remove()

    def transfer_emotion(
        self,
        text: str,
        source_emotion: str,
        target_emotion: str,
        target_intensity: float = 5.0,
        **kwargs
    ) -> str:
        # Generate with low intensity first
        neutral_text = self.generate_with_emotion(
            text=text,
            emotion=source_emotion,
            intensity=0.1,
            temperature=0.3,
            **kwargs
        )[0]

        # Then apply target emotion
        final_text = self.generate_with_emotion(
            text=neutral_text,
            emotion=target_emotion,
            intensity=target_intensity,
            temperature=0.5,
            **kwargs
        )[0]

        return final_text

In [109]:
# 1. Load and prepare data with the new prepare_emotion_dataset
formatted_data = prepare_emotion_dataset("combined_new.csv")

In [110]:
# Initialize pre-trained model
model, tokenizer = setup_model_and_tokenizer()

In [111]:
# 2. Initialize and train the new intensity-aware RepReader
hidden_layers = list(range(-1, -model.config.num_hidden_layers, -1))
rep_reader = RepReader(model, tokenizer, hidden_layers)

In [112]:
for emotion in formatted_data:
    texts = formatted_data[emotion]['train']['data']
    intensities = formatted_data[emotion]['train']['intensities']
    rep_reader.get_directions(texts, intensities, emotion)

In [113]:
# Setup RepController
controller = RepController(model, tokenizer, rep_reader)

In [None]:
result = controller.generate_with_emotion(
    text="I am happy",
    emotion="joy",
    intensity=7.0,
)
print(result)

['I’m happy, I‘m happy It’s a good feeling to be happy, isn’t it? You’re not alone. I”m happy too. The happiness']


In [143]:
transferred = controller.transfer_emotion(
    text="I feel terrible",
    source_emotion="joy",
    target_emotion="sadness",
    target_intensity=6.0
)
print(transferred)

I am sorry. Please forgive me. I am so sorry for being upset. Please don't be upset. I'm sorry that I'm upset. You can tell me if you want. I'll try to be more understanding. I have
