In [None]:
import torch
import torch.nn.functional as F
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
import matplotlib.pyplot as plt
import json
import pickle
from tqdm import tqdm
import os

In [None]:
MODEL_NAME = "meta-llama/Llama-2-7b-hf"  # Using Llama 2 7B
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16,
    device_map="auto"
)
model.eval()

In [None]:
def extract_model_weights(model):
    weights = {}
    
    for name, param in model.named_parameters():
        weights[name] = param.detach().cpu().clone()
    
    return weights

def record_activations(model, input_ids, layers_to_record=None):
    activations = {}
    hooks = []
    
    def get_activation_hook(name):
        def hook(module, input, output):
            if isinstance(output, tuple):
                output = output[0]
            activations[name] = output.detach().cpu().clone()
        return hook
    
    # Register hooks for specific layers
    if layers_to_record is None:
        # Record all transformer layers
        layers_to_record = []
        for i in range(len(model.model.layers)):
            layers_to_record.extend([
                f'model.layers.{i}.self_attn',
                f'model.layers.{i}.mlp',
                f'model.layers.{i}.input_layernorm',
                f'model.layers.{i}.post_attention_layernorm'
            ])
    
    # Register hooks
    for name, module in model.named_modules():
        if any(layer_name in name for layer_name in layers_to_record):
            hook = module.register_forward_hook(get_activation_hook(name))
            hooks.append(hook)
    
    # Forward pass
    with torch.no_grad():
        outputs = model(input_ids)
        logits = outputs.logits
    
    # Remove hooks
    for hook in hooks:
        hook.remove()
    
    return activations, logits

In [None]:
def save_activation_dataset(texts, model, tokenizer, save_path='activation_dataset.pkl'):
    dataset = []
    
    for i, text in enumerate(tqdm(texts, desc="Recording activations")):
        # Tokenize
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
        input_ids = inputs.input_ids.to(model.device)
        
        # Record activations
        activations, logits = record_activations(model, input_ids)
        
        # Store data
        dataset.append({
            'text': text,
            'input_ids': input_ids.cpu(),
            'logits': logits.cpu(),
            'activations': activations
        })
    
    # Save dataset
    with open(save_path, 'wb') as f:
        pickle.dump(dataset, f)
    
    print(f"Saved activation dataset to {save_path}")
    return dataset

In [None]:
TEST_TEXTS = [
    "The future of artificial intelligence is",
    "Climate change is one of the most",
    "In the field of quantum computing",
    "The human brain contains approximately",
    "Machine learning algorithms can be used to"
]

In [None]:
activation_dataset = save_activation_dataset(TEST_TEXTS, model, tokenizer)

In [None]:
def gradient_descent_reconstruction(target_output, weights, input_ids, 
                                  learning_rate=0.01, num_iterations=1000,
                                  target_layers=None):
    # Initialize reconstructed activations with random values
    reconstructed_activations = {}
    
    if target_layers is None:
        target_layers = ['model.layers.0.mlp', 'model.layers.0.self_attn']
    
    # Initialize activations for target layers
    batch_size = input_ids.shape[0]
    seq_len = input_ids.shape[1]
    hidden_size = 4096  # Llama 2 7B hidden size
    
    for layer_name in target_layers:
        # Initialize with random noise
        reconstructed_activations[layer_name] = torch.randn(
            batch_size, seq_len, hidden_size, 
            requires_grad=True, dtype=torch.float32
        )
    
    # Optimization loop
    losses = []
    
    for iteration in range(num_iterations):
        # Forward pass simulation using reconstructed activations
        ##########################################################
        # This is a simplified version
        ##########################################################
        # Calculate loss between predicted output and target output
        loss = 0
        
        # For each layer, simulate the computation
        for layer_name, activation in reconstructed_activations.items():
            if 'mlp' in layer_name:
                # Get MLP weights
                layer_idx = int(layer_name.split('.')[2])
                
                # Simplified MLP forward pass
                # In reality, you'd need to properly implement the full computation
                if f'model.layers.{layer_idx}.mlp.gate_proj.weight' in weights:
                    gate_weight = weights[f'model.layers.{layer_idx}.mlp.gate_proj.weight']
                    up_weight = weights[f'model.layers.{layer_idx}.mlp.up_proj.weight']
                    down_weight = weights[f'model.layers.{layer_idx}.mlp.down_proj.weight']
                    
                    # Compute MLP output (simplified)
                    # Real implementation would need proper attention outputs as input
                    hidden = F.silu(activation @ gate_weight.t()) * (activation @ up_weight.t())
                    output = hidden @ down_weight.t()
                    
                    # Add to loss (comparing with target)
                    loss += F.mse_loss(output, target_output)
        
        # Backward pass
        loss.backward()
        
        # Update reconstructed activations
        with torch.no_grad():
            for layer_name, activation in reconstructed_activations.items():
                if activation.grad is not None:
                    activation -= learning_rate * activation.grad
                    activation.grad.zero_()
        
        losses.append(loss.item())
        
        if iteration % 100 == 0:
            print(f"Iteration {iteration}, Loss: {loss.item():.6f}")
    
    return reconstructed_activations, losses

def compare_activations(real_activations, reconstructed_activations, layer_name):
    real = real_activations[layer_name]
    reconstructed = reconstructed_activations[layer_name]
    
    # Convert to same device and dtype
    real = real.float()
    reconstructed = reconstructed.detach().cpu().float()
    
    # Calculate metrics
    mse = F.mse_loss(reconstructed, real).item()
    
    # Cosine similarity
    real_flat = real.flatten()
    recon_flat = reconstructed.flatten()
    cosine_sim = F.cosine_similarity(real_flat.unsqueeze(0), recon_flat.unsqueeze(0)).item()
    
    # Correlation
    correlation = np.corrcoef(real_flat.numpy(), recon_flat.numpy())[0, 1]
    
    return {
        'mse': mse,
        'cosine_similarity': cosine_sim,
        'correlation': correlation
    }

In [None]:
# Run reconstruction attack
print("\n" + "="*70)
print("RUNNING ACTIVATION RECONSTRUCTION ATTACK")
print("="*70)

In [None]:

# Extract model weights
print("Extracting model weights...")
model_weights = extract_model_weights(model)
print(f"Extracted {len(model_weights)} weight tensors")

In [None]:
# Select a sample from dataset
sample_idx = 0
sample = activation_dataset[sample_idx]

print(f"\nTarget text: {sample['text']}")
print(f"Input shape: {sample['input_ids'].shape}")
print(f"Output shape: {sample['logits'].shape}")

# Select layers to reconstruct
target_layers = [
    'model.layers.0.mlp',
    'model.layers.0.self_attn',
    'model.layers.1.mlp'
]


In [None]:
# Run gradient descent reconstruction
print(f"\nReconstructing activations for layers: {target_layers}")
reconstructed_acts, losses = gradient_descent_reconstruction(
    target_output=sample['logits'],
    weights=model_weights,
    input_ids=sample['input_ids'],
    learning_rate=0.001,
    num_iterations=500,
    target_layers=target_layers
)


In [None]:
real_acts = {k: v for k, v in sample['activations'].items() 
             if any(target in k for target in target_layers)}

In [None]:
# Save reconstruction results
results = {
    'text': sample['text'],
    'input_ids': sample['input_ids'],
    'real_activations': real_acts,
    'reconstructed_activations': {k: v.detach().cpu() for k, v in reconstructed_acts.items()},
    'losses': losses,
    'metrics': {}
}

for layer_name in reconstructed_acts.keys():
    if layer_name in real_acts:
        results['metrics'][layer_name] = compare_activations(
            real_acts, reconstructed_acts, layer_name
        )

with open('reconstruction_results.pkl', 'wb') as f:
    pickle.dump(results, f)

print("\nReconstruction attack completed!")
print("Results saved to reconstruction_results.pkl")