In [2]:
import torch
import accelerate
from transformers import pipeline
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch.nn.functional as F
import random
from typing import Optional, Union, Dict, Tuple, List, Set
import warnings
warnings.filterwarnings("ignore", message="The `seen_tokens` attribute is deprecated")
warnings.filterwarnings("ignore", message="`get_max_cache()` is deprecated")


phi3 = AutoModelForCausalLM.from_pretrained(
    "microsoft/Phi-3-mini-128k-instruct",
    device_map="auto",
    offload_folder="/Users/tmcn0009/Documents/offload_folder",
    torch_dtype="auto",
    trust_remote_code=True,
)

model = phi3

tokenizer = AutoTokenizer.from_pretrained("microsoft/Phi-3-mini-128k-instruct")


generated_text = pipeline(model="microsoft/Phi-3-mini-128k-instruct", torch_dtype=torch.bfloat16, trust_remote_code=True, device_map="auto")


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [3]:
class NoiseInjector:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.device = next(model.parameters()).device
        self.layer_mapping = self._create_layer_mapping()

    def _create_layer_mapping(self):
        """Create a mapping of layer types to their indices"""
        layer_mapping = {
            'attention': [],
            'ffn': [],
            'all': []
        }

        for name, module in self.model.named_modules():
            layer_idx = None
            # Extract layer index if present in name
            for part in name.split('.'):
                if part.isdigit():
                    layer_idx = int(part)
                    break

            if layer_idx is not None:
                if 'attention' in name.lower():
                    layer_mapping['attention'].append(layer_idx)
                elif any(x in name.lower() for x in ['ffn', 'mlp', 'feedforward']):
                    layer_mapping['ffn'].append(layer_idx)
                layer_mapping['all'].append(layer_idx)

        # Remove duplicates and sort
        for key in layer_mapping:
            layer_mapping[key] = sorted(list(set(layer_mapping[key])))

        return layer_mapping

    def prepare_inputs(self, prompt: str):
        inputs = self.tokenizer(prompt, return_tensors="pt")
        input_ids = inputs.input_ids.to(self.device)
        attention_mask = inputs.attention_mask.to(self.device)
        return input_ids, attention_mask
    
    def validate_config(self, config: dict) -> dict:
        """Validate configuration parameters"""
        config = config.copy()
        
        # Validate temperature
        if config.get('temperature', 1.0) <= 0:
            print("Warning: Temperature must be positive. Setting to 1.0")
            config['temperature'] = 1.0
            
        # Validate top_p
        if config.get('top_p', 1.0) <= 0 or config.get('top_p', 1.0) > 1:
            print("Warning: top_p must be between 0 and 1. Setting to 0.9")
            config['top_p'] = 0.9
            
        # Validate noise values
        noise_keys = ['query_noise', 'key_noise', 'value_noise', 'output_noise']
        for key in noise_keys:
            if key in config and config[key] < 0:
                print(f"Warning: {key} cannot be negative. Setting to 0")
                config[key] = 0.0
                
        return config


    def get_dropout_layers(self,
                         dropout_config: Dict[str, Union[float, List[int], str]]) -> Set[int]:
        """
        Determine which layers to drop based on configuration
        """
        dropout_prob = dropout_config.get('dropout_prob', 0.0)
        layer_type = dropout_config.get('layer_type', 'all')
        specific_layers = dropout_config.get('specific_layers', None)
        mode = dropout_config.get('mode', 'random')
        min_layers = dropout_config.get('min_layers', 1)

        # Get candidate layers
        if specific_layers is not None:
            candidate_layers = specific_layers
        else:
            candidate_layers = self.layer_mapping[layer_type]

        total_layers = len(candidate_layers)
        max_dropouts = max(0, total_layers - min_layers)

        if mode == 'random':
            # Randomly select layers to drop based on probability
            dropout_layers = set()
            for layer in candidate_layers:
                if len(dropout_layers) < max_dropouts and random.random() < dropout_prob:
                    dropout_layers.add(layer)
        else:  # sequential
            # Drop layers sequentially from the top
            num_dropouts = min(max_dropouts, int(total_layers * dropout_prob))
            dropout_layers = set(candidate_layers[-num_dropouts:])

        return dropout_layers

    def generate_with_noise(self,
                          prompt: str,
                          noise_config: Dict[str, Union[float, Dict]],
                          dropout_config: Dict[str, Union[float, List[int], str]] = None,
                          max_length: int = 100,
                          **kwargs) -> str:
        """
        Generate text with both noise injection and layer dropout
        """
        input_ids, attention_mask = self.prepare_inputs(prompt)
        hooks = []

        # Get layers to dropout if dropout_config is provided
        dropout_layers = set()
        if dropout_config is not None:
            dropout_layers = self.get_dropout_layers(dropout_config)

        def attention_forward_pre_hook(module, inputs):
            # Get layer index from module name
            layer_idx = None
            for part in str(module).split('.'):
                if part.isdigit():
                    layer_idx = int(part)
                    break

            # If this layer should be dropped, return zero tensor
            if layer_idx in dropout_layers:
                zero_tensor = torch.zeros_like(inputs[0])
                return (zero_tensor,) + inputs[1:] if len(inputs) > 1 else zero_tensor

            # Most transformer implementations pass query, key, value as the first input
            if not inputs:
                return inputs

            hidden_states = inputs[0]

            # Get the projection weights
            q_proj_weight = getattr(module, 'q_proj', None) or getattr(module, 'query_proj', None) or getattr(module, 'query', None)
            k_proj_weight = getattr(module, 'k_proj', None) or getattr(module, 'key_proj', None) or getattr(module, 'key', None)
            v_proj_weight = getattr(module, 'v_proj', None) or getattr(module, 'value_proj', None) or getattr(module, 'value', None)

            if q_proj_weight is not None and noise_config.get('query_noise', 0) > 0:
                query = F.linear(hidden_states, q_proj_weight)
                query_noise = torch.randn_like(query) * noise_config['query_noise']
                query = query + query_noise
                inputs = (query,) + inputs[1:]

            if k_proj_weight is not None and noise_config.get('key_noise', 0) > 0:
                key = F.linear(hidden_states, k_proj_weight)
                key_noise = torch.randn_like(key) * noise_config['key_noise']
                key = key + key_noise
                if len(inputs) > 1:
                    inputs = inputs[:1] + (key,) + inputs[2:]

            if v_proj_weight is not None and noise_config.get('value_noise', 0) > 0:
                value = F.linear(hidden_states, v_proj_weight)
                value_noise = torch.randn_like(value) * noise_config['value_noise']
                value = value + value_noise
                if len(inputs) > 2:
                    inputs = inputs[:2] + (value,) + inputs[3:]

            return inputs

        def attention_forward_hook(module, inputs, output):
            # Get layer index
            layer_idx = None
            for part in str(module).split('.'):
                if part.isdigit():
                    layer_idx = int(part)
                    break

            # If this layer should be dropped, return zeros
            if layer_idx in dropout_layers:
                if isinstance(output, tuple):
                    zero_tensor = torch.zeros_like(output[0])
                    return (zero_tensor,) + output[1:]
                return torch.zeros_like(output)

            # Add noise to the attention output
            if isinstance(output, tuple):
                attention_output = output[0]
            else:
                attention_output = output

            # Add output noise if specified
            if noise_config.get('output_noise', 0) > 0:
                noise = torch.randn_like(attention_output) * noise_config['output_noise']
                attention_output = attention_output + noise

            # Apply temperature scaling
            if noise_config.get('temperature', 1.0) != 1.0:
                attention_output = attention_output / noise_config['temperature']

            if isinstance(output, tuple):
                return (attention_output,) + output[1:]
            return attention_output

        # Register hooks for each attention layer
        for name, module in self.model.named_modules():
            if "attention" in name.lower():
                # Add pre-hook for Q/K/V noise
                pre_hook = module.register_forward_pre_hook(attention_forward_pre_hook)
                hooks.append(pre_hook)

                # Add post-hook for output noise
                post_hook = module.register_forward_hook(attention_forward_hook)
                hooks.append(post_hook)

        try:
            # Set up generation parameters
            generation_config = {
                'max_length': max_length,
                'do_sample': True,
                'pad_token_id': self.tokenizer.pad_token_id,
                'eos_token_id': self.tokenizer.eos_token_id,
                'top_k': noise_config.get('top_k', 50),
                'top_p': noise_config.get('top_p', 1.0),
                'temperature': noise_config.get('temperature', 1.0),
            }

            # Add any additional kwargs
            generation_config.update(kwargs)

            # Generate with noise injection
            outputs = self.model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                **generation_config
            )

            # Decode the output
            return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        finally:
            # Clean up hooks
            for hook in hooks:
                hook.remove()

In [4]:
# Load the model and tokenizer


noise_injector = NoiseInjector(model, tokenizer)


In [None]:
noise_injector = NoiseInjector(model, tokenizer)

#temperature must be higher than 0.0

noise_config = {
    'embedding_noise': 0.0,
    'query_noise': 0.0,
    'key_noise': 0.0,
    'value_noise': 0.0,
    'output_noise': 0.0,
    #temperature must be higher than 0.0
    'temperature': 1.0,
    'top_k': 0,  # Disable top-k sampling = 0
    'top_p': 1.0 # Disable top-p sampling = 1.0
}


# Configure dropout parameters
dropout_config = {
    'dropout_prob': 0.0,        # Probability of dropping a layer
    'layer_type': 'all',  # Type of layers to consider ('attention', 'ffn', or 'all')
    'mode': 'random',          # Dropout mode ('random' or 'sequential')
    'min_layers': 1,           # Minimum number of layers to keep
    'specific_layers': [2, 4, 7, 9]    # Optional: To use specific layers for dropout: Set specific_layers=[0, 2, 4] in dropout_config
}

#To only use noise: Omit the dropout_config parameter
# Generate text with both noise and dropout use following
generated_text = noise_injector.generate_with_noise(
    prompt="sad poem",
    noise_config=noise_config,
    dropout_config=dropout_config,
    max_length=100
)

generated_text = noise_injector.generate_with_noise(
    prompt="sad poem",
    noise_config=noise_config,
    max_length=80

)
print(generated_text)