# Restriction Map Spectral Analysis

## Purpose
Validate the **Sheaf-Thermodynamic Bridge** by analyzing the spectral signatures of restriction maps (W_V, W_O) across different labs.

## Hypothesis
- **EleutherAI (Dampener)**: Lower spectral energy in W_V/W_O → weaker restriction maps
- **Meta/BigScience (Expander)**: Higher spectral energy → stronger restriction maps

## Method
1. Extract W_V (Value projection) and W_O (Output projection) from attention layers
2. Compute SVD (Singular Value Decomposition)
3. Compare spectral distributions across labs

## Theoretical Background
The restriction map in our Sheaf formalization is:
$$\rho_{ij} = \sqrt{A_{ij}} \cdot W_V$$

The spectral norm of W_V determines the "strength" of information flow.

---
*Paper #3: Thermodynamics of Language Models*
*Author: Davide D'Elia*
*Date: 2026-01-05*

In [None]:
# Install dependencies (Colab)
!pip install transformers torch matplotlib seaborn pandas -q

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from transformers import AutoModelForCausalLM, AutoConfig
import gc
import json
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# Visualization settings
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_context("paper", font_scale=1.2)

print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## Model Configuration

We test representatives from each lab/heritage group:

In [None]:
# Models to analyze - Representatives from each lab
MODELS_CONFIG = {
    # EleutherAI (Dampener Heritage)
    "EleutherAI/pythia-160m": {
        "lab": "EleutherAI",
        "expected": "DAMPENER",
        "color": "#E74C3C",  # Red
        "marker": "o"
    },
    "EleutherAI/pythia-410m": {
        "lab": "EleutherAI",
        "expected": "DAMPENER",
        "color": "#C0392B",  # Dark Red
        "marker": "s"
    },
    "EleutherAI/gpt-neo-125M": {
        "lab": "EleutherAI",
        "expected": "DAMPENER",
        "color": "#F39C12",  # Orange
        "marker": "^"
    },
    
    # Meta (Expander Heritage)
    "facebook/opt-125m": {
        "lab": "Meta",
        "expected": "EXPANDER",
        "color": "#3498DB",  # Blue
        "marker": "o"
    },
    "facebook/opt-350m": {
        "lab": "Meta",
        "expected": "EXPANDER",
        "color": "#2980B9",  # Dark Blue
        "marker": "s"
    },
    
    # BigScience (Expander Heritage - ALiBi)
    "bigscience/bloom-560m": {
        "lab": "BigScience",
        "expected": "EXPANDER (ALiBi)",
        "color": "#27AE60",  # Green
        "marker": "o"
    },
    
    # OpenAI (Baseline Reference)
    "gpt2": {
        "lab": "OpenAI",
        "expected": "NEUTRAL",
        "color": "#9B59B6",  # Purple
        "marker": "o"
    }
}

print(f"Testing {len(MODELS_CONFIG)} models from {len(set(v['lab'] for v in MODELS_CONFIG.values()))} labs")

## Core Analysis Functions

In [None]:
def get_projection_matrices(model, model_name: str) -> Dict[str, List[torch.Tensor]]:
    """
    Extract W_V (Value projection) and W_O (Output projection) from all layers.
    
    These are the key components of the Restriction Map:
    rho_ij = sqrt(A_ij) * W_V
    
    The output projection W_O then maps back to residual stream.
    """
    W_V_list = []
    W_O_list = []
    W_Q_list = []
    W_K_list = []
    
    # Identify architecture
    if hasattr(model, 'gpt_neox'):  # Pythia
        layers = model.gpt_neox.layers
        arch = 'neox'
    elif hasattr(model, 'transformer') and hasattr(model.transformer, 'h'):  # GPT-2, GPT-J, GPT-Neo
        layers = model.transformer.h
        if hasattr(layers[0].attn, 'out_proj'):  # GPT-J
            arch = 'gptj'
        else:  # GPT-2, GPT-Neo
            arch = 'gpt2'
    elif hasattr(model, 'model') and hasattr(model.model, 'layers'):  # Llama/Mistral
        layers = model.model.layers
        arch = 'llama'
    elif hasattr(model, 'transformer') and hasattr(model.transformer, 'blocks'):  # BLOOM
        layers = model.transformer.blocks if hasattr(model.transformer, 'blocks') else model.transformer.h
        arch = 'bloom'
    else:
        print(f"Unknown architecture for {model_name}")
        return {}
    
    print(f"  Architecture: {arch}, Layers: {len(layers)}")
    
    for i, layer in enumerate(layers):
        try:
            if arch == 'neox':  # Pythia
                # Pythia uses query_key_value combined, then dense for output
                qkv = layer.attention.query_key_value.weight.detach()
                # Split into Q, K, V (each d_model x d_model/3 approximately)
                hidden_size = qkv.shape[0] // 3
                W_Q = qkv[:hidden_size, :]
                W_K = qkv[hidden_size:2*hidden_size, :]
                W_V = qkv[2*hidden_size:, :]
                W_O = layer.attention.dense.weight.detach()
                
            elif arch == 'gptj':  # GPT-J
                W_Q = layer.attn.q_proj.weight.detach()
                W_K = layer.attn.k_proj.weight.detach()
                W_V = layer.attn.v_proj.weight.detach()
                W_O = layer.attn.out_proj.weight.detach()
                
            elif arch == 'gpt2':  # GPT-2, GPT-Neo
                # GPT-2 combines c_attn (Q,K,V) and c_proj (O)
                c_attn = layer.attn.c_attn.weight.detach()
                hidden_size = c_attn.shape[1] // 3
                W_Q = c_attn[:, :hidden_size].T
                W_K = c_attn[:, hidden_size:2*hidden_size].T
                W_V = c_attn[:, 2*hidden_size:].T
                W_O = layer.attn.c_proj.weight.detach().T
                
            elif arch == 'llama':  # Llama/Mistral
                W_Q = layer.self_attn.q_proj.weight.detach()
                W_K = layer.self_attn.k_proj.weight.detach()
                W_V = layer.self_attn.v_proj.weight.detach()
                W_O = layer.self_attn.o_proj.weight.detach()
                
            elif arch == 'bloom':  # BLOOM
                # BLOOM uses query_key_value combined
                qkv = layer.self_attention.query_key_value.weight.detach()
                hidden_size = qkv.shape[0] // 3
                W_Q = qkv[:hidden_size, :]
                W_K = qkv[hidden_size:2*hidden_size, :]
                W_V = qkv[2*hidden_size:, :]
                W_O = layer.self_attention.dense.weight.detach()
            
            W_Q_list.append(W_Q.float())
            W_K_list.append(W_K.float())
            W_V_list.append(W_V.float())
            W_O_list.append(W_O.float())
            
        except Exception as e:
            print(f"  Error in layer {i}: {e}")
            continue
    
    return {
        'W_Q': W_Q_list,
        'W_K': W_K_list,
        'W_V': W_V_list,
        'W_O': W_O_list,
        'arch': arch,
        'n_layers': len(layers)
    }

In [None]:
def compute_spectral_signature(matrices: List[torch.Tensor]) -> Dict[str, np.ndarray]:
    """
    Compute spectral signatures (SVD) for a list of matrices.
    
    Returns:
    - singular_values: All singular values concatenated
    - spectral_norm: Max singular value per layer (||W||_2)
    - frobenius_norm: Frobenius norm per layer (||W||_F)
    - effective_rank: Effective rank per layer
    - condition_number: Condition number per layer
    """
    all_singular_values = []
    spectral_norms = []
    frobenius_norms = []
    effective_ranks = []
    condition_numbers = []
    
    for W in matrices:
        try:
            # Compute SVD
            S = torch.linalg.svdvals(W)
            s = S.numpy()
            
            all_singular_values.append(s)
            spectral_norms.append(s[0])  # Max singular value
            frobenius_norms.append(np.sqrt(np.sum(s**2)))
            
            # Effective rank: exp(entropy of normalized singular values)
            s_norm = s / s.sum()
            entropy = -np.sum(s_norm * np.log(s_norm + 1e-10))
            effective_ranks.append(np.exp(entropy))
            
            # Condition number: ratio of max to min singular value
            condition_numbers.append(s[0] / (s[-1] + 1e-10))
            
        except Exception as e:
            print(f"SVD error: {e}")
            continue
    
    return {
        'singular_values': all_singular_values,
        'spectral_norm': np.array(spectral_norms),
        'frobenius_norm': np.array(frobenius_norms),
        'effective_rank': np.array(effective_ranks),
        'condition_number': np.array(condition_numbers)
    }

In [None]:
def analyze_model(model_name: str, config: Dict) -> Optional[Dict]:
    """
    Full spectral analysis for a single model.
    """
    print(f"\n{'='*60}")
    print(f"Analyzing: {model_name}")
    print(f"Lab: {config['lab']}, Expected: {config['expected']}")
    print(f"{'='*60}")
    
    try:
        # Load model
        print("  Loading model...")
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float32,
            low_cpu_mem_usage=True
        )
        model.eval()
        
        # Extract projection matrices
        print("  Extracting projection matrices...")
        matrices = get_projection_matrices(model, model_name)
        
        if not matrices:
            return None
        
        # Compute spectral signatures
        print("  Computing spectral signatures...")
        results = {
            'model': model_name,
            'lab': config['lab'],
            'expected': config['expected'],
            'color': config['color'],
            'marker': config['marker'],
            'arch': matrices['arch'],
            'n_layers': matrices['n_layers']
        }
        
        for matrix_type in ['W_V', 'W_O', 'W_Q', 'W_K']:
            if matrices[matrix_type]:
                spectral = compute_spectral_signature(matrices[matrix_type])
                results[f'{matrix_type}_spectral_norm'] = spectral['spectral_norm']
                results[f'{matrix_type}_frobenius_norm'] = spectral['frobenius_norm']
                results[f'{matrix_type}_effective_rank'] = spectral['effective_rank']
                results[f'{matrix_type}_mean_spectral'] = float(np.mean(spectral['spectral_norm']))
                results[f'{matrix_type}_std_spectral'] = float(np.std(spectral['spectral_norm']))
        
        # Cleanup
        del model
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        print(f"  W_V mean spectral norm: {results.get('W_V_mean_spectral', 'N/A'):.4f}")
        print(f"  W_O mean spectral norm: {results.get('W_O_mean_spectral', 'N/A'):.4f}")
        
        return results
        
    except Exception as e:
        print(f"  FAILED: {e}")
        return None

## Run Analysis

In [None]:
# Run analysis on all models
all_results = []

for model_name, config in MODELS_CONFIG.items():
    result = analyze_model(model_name, config)
    if result:
        all_results.append(result)

print(f"\n\nSuccessfully analyzed {len(all_results)} models")

## Visualization: Spectral Signatures by Lab

In [None]:
# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 12))

# Plot 1: W_V Spectral Norm by Layer
ax1 = axes[0, 0]
for res in all_results:
    if 'W_V_spectral_norm' in res:
        ax1.plot(
            res['W_V_spectral_norm'],
            label=f"{res['model'].split('/')[-1]} ({res['lab']})",
            color=res['color'],
            marker=res['marker'],
            markersize=4,
            alpha=0.8
        )
ax1.set_title('W_V Spectral Norm (Value Projection) by Layer')
ax1.set_xlabel('Layer')
ax1.set_ylabel('Spectral Norm ||W_V||_2')
ax1.legend(fontsize=8)
ax1.grid(True, alpha=0.3)

# Plot 2: W_O Spectral Norm by Layer
ax2 = axes[0, 1]
for res in all_results:
    if 'W_O_spectral_norm' in res:
        ax2.plot(
            res['W_O_spectral_norm'],
            label=f"{res['model'].split('/')[-1]} ({res['lab']})",
            color=res['color'],
            marker=res['marker'],
            markersize=4,
            alpha=0.8
        )
ax2.set_title('W_O Spectral Norm (Output Projection) by Layer')
ax2.set_xlabel('Layer')
ax2.set_ylabel('Spectral Norm ||W_O||_2')
ax2.legend(fontsize=8)
ax2.grid(True, alpha=0.3)

# Plot 3: Lab Comparison (Bar Chart)
ax3 = axes[1, 0]
lab_data = {}
for res in all_results:
    lab = res['lab']
    if lab not in lab_data:
        lab_data[lab] = {'W_V': [], 'W_O': [], 'color': res['color']}
    if 'W_V_mean_spectral' in res:
        lab_data[lab]['W_V'].append(res['W_V_mean_spectral'])
    if 'W_O_mean_spectral' in res:
        lab_data[lab]['W_O'].append(res['W_O_mean_spectral'])

labs = list(lab_data.keys())
x = np.arange(len(labs))
width = 0.35

wv_means = [np.mean(lab_data[l]['W_V']) if lab_data[l]['W_V'] else 0 for l in labs]
wo_means = [np.mean(lab_data[l]['W_O']) if lab_data[l]['W_O'] else 0 for l in labs]
colors = [lab_data[l]['color'] for l in labs]

bars1 = ax3.bar(x - width/2, wv_means, width, label='W_V (Value)', color=colors, alpha=0.7)
bars2 = ax3.bar(x + width/2, wo_means, width, label='W_O (Output)', color=colors, alpha=0.4, hatch='//')

ax3.set_title('Mean Spectral Norm by Lab (Restriction Map Strength)')
ax3.set_xlabel('Lab')
ax3.set_ylabel('Mean Spectral Norm')
ax3.set_xticks(x)
ax3.set_xticklabels(labs)
ax3.legend()
ax3.grid(True, alpha=0.3, axis='y')

# Plot 4: Effective Rank Distribution
ax4 = axes[1, 1]
for res in all_results:
    if 'W_V_effective_rank' in res:
        ax4.plot(
            res['W_V_effective_rank'],
            label=f"{res['model'].split('/')[-1]}",
            color=res['color'],
            marker=res['marker'],
            markersize=4,
            alpha=0.8
        )
ax4.set_title('W_V Effective Rank by Layer')
ax4.set_xlabel('Layer')
ax4.set_ylabel('Effective Rank')
ax4.legend(fontsize=8)
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('restriction_map_spectral_analysis.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nFigure saved: restriction_map_spectral_analysis.png")

## Summary Statistics

In [None]:
# Create summary table
summary_data = []
for res in all_results:
    summary_data.append({
        'Model': res['model'].split('/')[-1],
        'Lab': res['lab'],
        'Expected': res['expected'],
        'Layers': res['n_layers'],
        'W_V Mean': res.get('W_V_mean_spectral', np.nan),
        'W_V Std': res.get('W_V_std_spectral', np.nan),
        'W_O Mean': res.get('W_O_mean_spectral', np.nan),
        'W_O Std': res.get('W_O_std_spectral', np.nan),
    })

df = pd.DataFrame(summary_data)
print("\n" + "="*80)
print("SPECTRAL SIGNATURE SUMMARY")
print("="*80)
print(df.to_string(index=False))

# Lab averages
print("\n" + "="*80)
print("LAB AVERAGES")
print("="*80)
lab_summary = df.groupby('Lab').agg({
    'W_V Mean': ['mean', 'std'],
    'W_O Mean': ['mean', 'std']
}).round(4)
print(lab_summary)

## Hypothesis Test: EleutherAI vs Others

In [None]:
from scipy import stats

# Separate EleutherAI from others
eleuther_wv = df[df['Lab'] == 'EleutherAI']['W_V Mean'].dropna().values
others_wv = df[df['Lab'] != 'EleutherAI']['W_V Mean'].dropna().values

eleuther_wo = df[df['Lab'] == 'EleutherAI']['W_O Mean'].dropna().values
others_wo = df[df['Lab'] != 'EleutherAI']['W_O Mean'].dropna().values

print("\n" + "="*80)
print("HYPOTHESIS TEST: EleutherAI (Dampener) vs Others (Expander)")
print("="*80)

if len(eleuther_wv) > 0 and len(others_wv) > 0:
    # Mann-Whitney U test (non-parametric)
    stat_wv, p_wv = stats.mannwhitneyu(eleuther_wv, others_wv, alternative='less')
    stat_wo, p_wo = stats.mannwhitneyu(eleuther_wo, others_wo, alternative='less')
    
    print(f"\nW_V Spectral Norm:")
    print(f"  EleutherAI mean: {np.mean(eleuther_wv):.4f}")
    print(f"  Others mean:     {np.mean(others_wv):.4f}")
    print(f"  Mann-Whitney U:  {stat_wv:.2f}, p = {p_wv:.4f}")
    print(f"  H1 (EleutherAI < Others): {'SUPPORTED' if p_wv < 0.05 else 'NOT SUPPORTED'}")
    
    print(f"\nW_O Spectral Norm:")
    print(f"  EleutherAI mean: {np.mean(eleuther_wo):.4f}")
    print(f"  Others mean:     {np.mean(others_wo):.4f}")
    print(f"  Mann-Whitney U:  {stat_wo:.2f}, p = {p_wo:.4f}")
    print(f"  H1 (EleutherAI < Others): {'SUPPORTED' if p_wo < 0.05 else 'NOT SUPPORTED'}")
else:
    print("Insufficient data for statistical test.")

## Save Results

In [None]:
# Prepare results for JSON
def convert_to_serializable(obj):
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (np.float32, np.float64)):
        return float(obj)
    elif isinstance(obj, (np.int32, np.int64)):
        return int(obj)
    return obj

# Convert all results
serializable_results = []
for res in all_results:
    clean_res = {}
    for k, v in res.items():
        clean_res[k] = convert_to_serializable(v)
    serializable_results.append(clean_res)

# Save to JSON
output = {
    'experiment': 'Restriction Map Spectral Analysis',
    'date': datetime.now().isoformat(),
    'hypothesis': 'EleutherAI (Dampener) has lower spectral energy than others (Expander)',
    'models_tested': len(all_results),
    'results': serializable_results,
    'summary': df.to_dict('records') if len(df) > 0 else []
}

timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'restriction_map_spectral_{timestamp}.json'

with open(filename, 'w') as f:
    json.dump(output, f, indent=2)

print(f"\nResults saved to: {filename}")

## Interpretation

### Expected Results:

If our **Sheaf-Thermodynamic Bridge** hypothesis is correct:

1. **EleutherAI models** (Dampeners) should have **lower** spectral norms in W_V and W_O
   - This means "weaker" restriction maps
   - Less energy transfer through the sheaf
   - Leads to DAMPENING of residual stream

2. **Meta/BigScience models** (Expanders) should have **higher** spectral norms
   - "Stronger" restriction maps
   - More energy transfer
   - Leads to EXPANSION of residual stream

### Connection to Sheaf Theory:

The spectral norm of W_V directly relates to the Sheaf Laplacian:

$$||L_\mathcal{F}|| \propto \sum_{j} A_{ij} \cdot ||W_V^T W_V||$$

Higher spectral norm → Stronger coupling → More diffusion → Expansion
Lower spectral norm → Weaker coupling → Less diffusion → Dampening

In [None]:
print("\n" + "="*80)
print("ANALYSIS COMPLETE")
print("="*80)
print(f"\nFiles generated:")
print(f"  - restriction_map_spectral_analysis.png")
print(f"  - {filename}")
print("\nThis data provides empirical evidence for the Sheaf-Thermodynamic Bridge.")