# Experiment 035A: AQ Excitation Pattern Detection

**AKIRA Project - Oscar Goldman - Shogu Research Group @ Datamutant.ai**

---

## What This Experiment Tests

Action Quanta (AQ) are hypothesized to be **quasiparticle field excitations** stored in LLM weights. They manifest when context resonates with the weight structure.

### Hypothesis

```
WEIGHTS = Field (crystallized AQ structure)
CONTEXT = Perturbation (selects resonance)
ACTIVATIONS = Excitation patterns (observable AQ)
```

### Predictions

If AQ theory is correct:
- Same discrimination type should produce similar activation patterns
- Different types should produce different patterns
- Later layers should show cleaner separation (crystallization)

### What We Measure

1. **Silhouette Score**: Do same-category probes cluster together?
2. **Distance Ratio**: Do different categories separate?
3. **Layer Progression**: Does separation increase with depth?

---

## 1. Setup and Installation

In [None]:
# Install dependencies (uncomment for Colab)
!pip install transformers torch numpy scikit-learn matplotlib seaborn -q

In [None]:
import torch
import torch.nn as nn
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, pairwise_distances
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
import warnings

warnings.filterwarnings('ignore')

# Check device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {DEVICE}")
print(f"PyTorch version: {torch.__version__}")

## 2. Configuration

Choose your model and experiment parameters here.

In [None]:
@dataclass
class ExperimentConfig:
    """Configuration for AQ Excitation Detection experiment.
    
    Attributes:
        model_name: HuggingFace model identifier
        layers_to_probe: Which layers to capture activations from
        n_pca_components: Dimensions for PCA reduction
        n_clusters: Number of clusters for K-means (matches probe categories)
        random_seed: For reproducibility
    """
    model_name: str = "gpt2"  # Options: "gpt2", "EleutherAI/pythia-70m", "EleutherAI/pythia-160m"
    layers_to_probe: List[int] = field(default_factory=list)
    n_pca_components: int = 20  # Must be <= number of probes (30)
    n_clusters: int = 5  # Matches number of probe categories
    random_seed: int = 42
    
    def __post_init__(self) -> None:
        """Set layer indices based on model architecture."""
        if not self.layers_to_probe:
            if "gpt2" in self.model_name.lower():
                # GPT-2 has 12 layers (0-11)
                self.layers_to_probe = [0, 3, 6, 9, 11]
            elif "pythia-70m" in self.model_name.lower():
                # Pythia-70M has 6 layers
                self.layers_to_probe = [0, 1, 3, 5]
            elif "pythia-160m" in self.model_name.lower():
                # Pythia-160M has 12 layers
                self.layers_to_probe = [0, 3, 6, 9, 11]
            else:
                # Default - assume 12 layers
                self.layers_to_probe = [0, 3, 6, 9, 11]
        
        np.random.seed(self.random_seed)
        torch.manual_seed(self.random_seed)


# Create configuration
config = ExperimentConfig()
print(f"Model: {config.model_name}")
print(f"Layers to probe: {config.layers_to_probe}")

## 3. Discrimination Probes

These prompts test whether different **discrimination types** produce different activation patterns.

- **Sentiment Positive**: Prompts expecting positive emotional continuation
- **Sentiment Negative**: Prompts expecting negative emotional continuation
- **Math**: Arithmetic problems
- **Factual Geography**: Capital city questions
- **Factual Science**: Scientific fact questions

In [None]:
PROBES: Dict[str, List[str]] = {
    'sentiment_positive': [
        "The movie was great, I felt",
        "What a wonderful day, I am feeling",
        "This food is delicious, it makes me",
        "I love this song, it makes me feel",
        "The vacation was amazing, I felt so",
        "My friend is wonderful, they make me feel",
    ],
    'sentiment_negative': [
        "The movie was terrible, I felt",
        "What an awful day, I am feeling",
        "This food is disgusting, it makes me",
        "I hate this song, it makes me feel",
        "The vacation was horrible, I felt so",
        "My enemy is cruel, they make me feel",
    ],
    'math': [
        "2 + 2 =",
        "3 + 1 =",
        "5 - 1 =",
        "10 / 2 =",
        "7 + 3 =",
        "8 - 4 =",
    ],
    'factual_geography': [
        "The capital of France is",
        "The capital of Japan is",
        "The capital of Germany is",
        "The capital of Italy is",
        "The capital of Spain is",
        "The capital of Brazil is",
    ],
    'factual_science': [
        "Water freezes at",
        "The speed of light is",
        "Gravity on Earth is",
        "The chemical formula for water is",
        "The boiling point of water is",
        "The atomic number of carbon is",
    ],
}

# Category colors for visualization
CATEGORY_COLORS: Dict[str, str] = {
    'sentiment_positive': '#2ecc71',  # green
    'sentiment_negative': '#e74c3c',  # red
    'math': '#3498db',                # blue
    'factual_geography': '#9b59b6',   # purple
    'factual_science': '#f39c12',     # orange
}

print(f"Number of categories: {len(PROBES)}")
print(f"Total probes: {sum(len(v) for v in PROBES.values())}")
for cat, prompts in PROBES.items():
    print(f"  {cat}: {len(prompts)} probes")

## 4. Activation Capture Class

This class registers forward hooks on transformer layers to intercept activations during inference.

In [None]:
class ActivationCapture:
    """Captures activations from specified layers using forward hooks.
    
    This class registers hooks on transformer layers to intercept
    intermediate activations during the forward pass.
    """
    
    def __init__(self, model: nn.Module, layer_indices: List[int], model_type: str = "gpt2") -> None:
        """Initialize activation capture with hooks on specified layers."""
        assert len(layer_indices) > 0, "Must specify at least one layer to probe"
        
        self.activations: Dict[int, torch.Tensor] = {}
        self.hooks: List[torch.utils.hooks.RemovableHandle] = []
        self.layer_indices = layer_indices
        
        # Get the transformer blocks based on model architecture
        if hasattr(model, 'transformer'):
            # GPT-2 style
            layers = model.transformer.h
        elif hasattr(model, 'gpt_neox'):
            # Pythia style
            layers = model.gpt_neox.layers
        else:
            raise ValueError(f"Unknown model architecture: {type(model)}")
        
        assert len(layers) > max(layer_indices), \
            f"Model has {len(layers)} layers but requested layer {max(layer_indices)}"
        
        # Register hooks on each specified layer
        for idx in layer_indices:
            layer = layers[idx]
            hook = layer.register_forward_hook(self._make_hook(idx))
            self.hooks.append(hook)
        
        print(f"Registered hooks on layers: {layer_indices}")
    
    def _make_hook(self, layer_idx: int):
        """Create a hook function for a specific layer."""
        def hook(module: nn.Module, input: Tuple, output: Tuple) -> None:
            if isinstance(output, tuple):
                hidden_states = output[0]
            else:
                hidden_states = output
            self.activations[layer_idx] = hidden_states.detach()
        return hook
    
    def clear(self) -> None:
        """Clear stored activations."""
        self.activations = {}
    
    def remove_hooks(self) -> None:
        """Remove all registered hooks."""
        for hook in self.hooks:
            hook.remove()
        self.hooks = []
        print("Removed all hooks")
    
    def get_last_token_activation(self, layer_idx: int) -> np.ndarray:
        """Get activation for the last token position at a specific layer."""
        assert layer_idx in self.activations, f"Layer {layer_idx} not captured"
        act = self.activations[layer_idx]
        assert act is not None and act.numel() > 0, "Empty activation"
        
        # Get last token: [batch=0, seq=-1, hidden_dim]
        last_token_act = act[0, -1, :].cpu().numpy()
        return last_token_act

## 5. Analysis Functions

In [None]:
def compute_category_distances(
    activations: np.ndarray,
    labels: List[str]
) -> Tuple[float, float, float]:
    """Compute within-category and between-category distances.
    
    Returns:
        Tuple of (within_distance, between_distance, ratio)
        ratio > 1 means categories are separating well
    """
    assert len(activations) == len(labels), "Activation/label count mismatch"
    
    within_distances = []
    between_distances = []
    
    # Compute pairwise distances
    distances = pairwise_distances(activations, metric='euclidean')
    
    for i in range(len(labels)):
        for j in range(i + 1, len(labels)):
            if labels[i] == labels[j]:
                within_distances.append(distances[i, j])
            else:
                between_distances.append(distances[i, j])
    
    within_mean = np.mean(within_distances) if within_distances else 0
    between_mean = np.mean(between_distances) if between_distances else 0
    
    ratio = between_mean / within_mean if within_mean > 0 else float('inf')
    
    return within_mean, between_mean, ratio


def compute_silhouette(
    activations: np.ndarray,
    labels: List[str]
) -> float:
    """Compute silhouette score for clustering quality.
    
    Silhouette score ranges from -1 to 1:
    - 1: Perfect clustering
    - 0: Overlapping clusters
    - -1: Wrong clustering
    """
    unique_labels = list(set(labels))
    label_to_int = {label: i for i, label in enumerate(unique_labels)}
    int_labels = [label_to_int[label] for label in labels]
    
    if len(set(int_labels)) < 2:
        return 0.0
    
    return silhouette_score(activations, int_labels)


def run_pca_analysis(
    activations: np.ndarray,
    labels: List[str],
    n_components: int = 2,
    verbose: bool = True
) -> Tuple[np.ndarray, PCA]:
    """Apply PCA to reduce activation dimensionality."""
    assert activations.shape[0] == len(labels), "Sample count mismatch"
    assert n_components <= activations.shape[1], "Too many components requested"
    
    scaler = StandardScaler()
    activations_scaled = scaler.fit_transform(activations)
    
    pca = PCA(n_components=n_components, random_state=42)
    reduced = pca.fit_transform(activations_scaled)
    
    if verbose:
        explained_var = sum(pca.explained_variance_ratio_) * 100
        print(f"PCA: {n_components} components explain {explained_var:.1f}% variance")
    
    return reduced, pca


def run_kmeans_clustering(
    activations: np.ndarray,
    n_clusters: int
) -> Tuple[np.ndarray, KMeans]:
    """Apply K-means clustering to activations."""
    assert n_clusters > 0, "Need positive number of clusters"
    assert activations.shape[0] >= n_clusters, "More clusters than samples"
    
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    cluster_labels = kmeans.fit_predict(activations)
    
    return cluster_labels, kmeans

## 6. Visualization Functions

In [None]:
def plot_activation_scatter(
    activations_2d: np.ndarray,
    labels: List[str],
    layer_idx: int,
    texts: List[str]
) -> None:
    """Create 2D scatter plot of activations colored by category."""
    assert activations_2d.shape[1] == 2, "Need 2D activations for scatter"
    
    plt.figure(figsize=(10, 8))
    
    for category in CATEGORY_COLORS:
        mask = [l == category for l in labels]
        if any(mask):
            indices = [i for i, m in enumerate(mask) if m]
            plt.scatter(
                activations_2d[indices, 0],
                activations_2d[indices, 1],
                c=CATEGORY_COLORS[category],
                label=category.replace('_', ' '),
                alpha=0.7,
                s=100,
                edgecolors='white',
                linewidth=0.5
            )
    
    plt.xlabel('PC1', fontsize=12)
    plt.ylabel('PC2', fontsize=12)
    plt.title(f'AQ Excitation Patterns - Layer {layer_idx}', fontsize=14)
    plt.legend(loc='best', fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.show()


def plot_similarity_matrix(
    activations: np.ndarray,
    labels: List[str],
    layer_idx: int
) -> None:
    """Create heatmap of pairwise activation similarities."""
    # Compute cosine similarity
    norms = np.linalg.norm(activations, axis=1, keepdims=True)
    normalized = activations / (norms + 1e-8)
    similarity = normalized @ normalized.T
    
    # Sort by category for cleaner visualization
    sorted_indices = sorted(range(len(labels)), key=lambda i: labels[i])
    similarity_sorted = similarity[sorted_indices][:, sorted_indices]
    labels_sorted = [labels[i] for i in sorted_indices]
    
    plt.figure(figsize=(12, 10))
    
    sns.heatmap(
        similarity_sorted,
        cmap='RdYlBu_r',
        vmin=-1,
        vmax=1,
        square=True,
        cbar_kws={'label': 'Cosine Similarity'}
    )
    
    # Add category boundaries
    unique_labels = []
    boundaries = [0]
    for i, label in enumerate(labels_sorted):
        if label not in unique_labels:
            unique_labels.append(label)
            if i > 0:
                boundaries.append(i)
    boundaries.append(len(labels_sorted))
    
    for b in boundaries[1:-1]:
        plt.axhline(y=b, color='black', linewidth=2)
        plt.axvline(x=b, color='black', linewidth=2)
    
    plt.title(f'Activation Similarity Matrix - Layer {layer_idx}', fontsize=14)
    plt.xlabel('Probe Index (sorted by category)')
    plt.ylabel('Probe Index (sorted by category)')
    plt.show()


def plot_layer_comparison(
    metrics_by_layer: Dict[int, Dict[str, float]]
) -> None:
    """Plot how metrics change across layers."""
    layers = sorted(metrics_by_layer.keys())
    
    silhouettes = [metrics_by_layer[l]['silhouette'] for l in layers]
    ratios = [metrics_by_layer[l]['distance_ratio'] for l in layers]
    
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Silhouette score by layer
    axes[0].plot(layers, silhouettes, 'o-', color='#3498db', linewidth=2, markersize=10)
    axes[0].set_xlabel('Layer Index', fontsize=12)
    axes[0].set_ylabel('Silhouette Score', fontsize=12)
    axes[0].set_title('Clustering Quality by Layer', fontsize=14)
    axes[0].grid(True, alpha=0.3)
    axes[0].set_ylim(-0.2, 1.0)
    
    # Distance ratio by layer
    axes[1].plot(layers, ratios, 'o-', color='#e74c3c', linewidth=2, markersize=10)
    axes[1].set_xlabel('Layer Index', fontsize=12)
    axes[1].set_ylabel('Between/Within Distance Ratio', fontsize=12)
    axes[1].set_title('Category Separation by Layer', fontsize=14)
    axes[1].grid(True, alpha=0.3)
    axes[1].axhline(y=1.0, color='gray', linestyle='--', alpha=0.5)
    
    plt.tight_layout()
    plt.show()

## 7. Load Model

Load the pretrained model and tokenizer.

In [None]:
print("Loading model and tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
model = AutoModelForCausalLM.from_pretrained(config.model_name)
model = model.to(DEVICE)
model.eval()

# Determine model type for hook placement
if hasattr(model, 'transformer'):
    model_type = "gpt2"
elif hasattr(model, 'gpt_neox'):
    model_type = "pythia"
else:
    model_type = "unknown"

print(f"Model loaded on {DEVICE} (type: {model_type})")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

## 8. Run Probes and Capture Activations

Run all discrimination probes through the model and capture activations at each layer.

In [None]:
# Set up activation capture
capture = ActivationCapture(model, config.layers_to_probe, model_type)

# Storage for results
all_activations: Dict[int, List[np.ndarray]] = {
    layer: [] for layer in config.layers_to_probe
}
all_labels: List[str] = []
all_texts: List[str] = []

print("Running discrimination probes...")

for category, prompts in PROBES.items():
    for prompt in prompts:
        # Tokenize
        inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)
        
        # Forward pass (captures activations via hooks)
        capture.clear()
        with torch.no_grad():
            outputs = model(**inputs)
        
        # Store activations for last token position
        for layer_idx in config.layers_to_probe:
            act = capture.get_last_token_activation(layer_idx)
            all_activations[layer_idx].append(act)
        
        all_labels.append(category)
        all_texts.append(prompt)

# Convert to numpy arrays
for layer_idx in config.layers_to_probe:
    all_activations[layer_idx] = np.array(all_activations[layer_idx])

print(f"Collected activations for {len(all_labels)} probes")
print(f"Activation shape per layer: {all_activations[config.layers_to_probe[0]].shape}")

## 9. Layer-by-Layer Analysis

Compute metrics for each layer to see how AQ patterns develop through the network.

In [None]:
print("=" * 70)
print("LAYER-BY-LAYER ANALYSIS")
print("=" * 70)

metrics_by_layer: Dict[int, Dict[str, float]] = {}
pca_results_by_layer: Dict[int, np.ndarray] = {}

for layer_idx in config.layers_to_probe:
    print(f"\n--- Layer {layer_idx} ---")
    
    activations = all_activations[layer_idx]
    
    # Compute distance metrics
    within_dist, between_dist, ratio = compute_category_distances(
        activations, all_labels
    )
    print(f"Within-category distance:  {within_dist:.4f}")
    print(f"Between-category distance: {between_dist:.4f}")
    print(f"Distance ratio (higher = better separation): {ratio:.4f}")
    
    # Compute silhouette score
    silhouette = compute_silhouette(activations, all_labels)
    print(f"Silhouette score: {silhouette:.4f}")
    
    # Store metrics
    metrics_by_layer[layer_idx] = {
        'within_distance': within_dist,
        'between_distance': between_dist,
        'distance_ratio': ratio,
        'silhouette': silhouette
    }
    
    # PCA reduction for visualization
    pca_2d, _ = run_pca_analysis(activations, all_labels, n_components=2)
    pca_results_by_layer[layer_idx] = pca_2d

## 10. K-Means Clustering Analysis

Test whether unsupervised clustering recovers the probe categories.

In [None]:
print("=" * 70)
print("CLUSTERING ANALYSIS")
print("=" * 70)

# Use the final layer for clustering analysis
final_layer = config.layers_to_probe[-1]
final_activations = all_activations[final_layer]

# Apply PCA first (for better clustering)
pca_for_clustering, _ = run_pca_analysis(
    final_activations, all_labels, n_components=config.n_pca_components
)

# K-means clustering
cluster_labels, kmeans = run_kmeans_clustering(
    pca_for_clustering, config.n_clusters
)

# Compare clusters to true categories
print(f"\nK-means with k={config.n_clusters} clusters on Layer {final_layer}")
print("\nCluster composition:")

for cluster_id in range(config.n_clusters):
    cluster_mask = cluster_labels == cluster_id
    cluster_categories = [all_labels[i] for i in range(len(all_labels)) if cluster_mask[i]]
    category_counts = {}
    for cat in cluster_categories:
        category_counts[cat] = category_counts.get(cat, 0) + 1
    
    print(f"  Cluster {cluster_id}: {category_counts}")

## 11. Visualizations

### 11.1 Scatter Plots by Layer

Visualize how activation patterns cluster in 2D PCA space.

In [None]:
for layer_idx in config.layers_to_probe:
    print(f"\nScatter plot for Layer {layer_idx}:")
    plot_activation_scatter(
        pca_results_by_layer[layer_idx],
        all_labels,
        layer_idx,
        all_texts
    )

### 11.2 Similarity Matrix

Visualize pairwise cosine similarity between all probes. If AQ theory is correct, we should see block-diagonal structure where same-category probes are similar.

In [None]:
print(f"\nSimilarity matrix for Layer {final_layer}:")
plot_similarity_matrix(
    all_activations[final_layer],
    all_labels,
    final_layer
)

### 11.3 Layer Comparison

Do later layers show cleaner separation? This tests the "crystallization" hypothesis.

In [None]:
print("\nLayer-wise metric comparison:")
plot_layer_comparison(metrics_by_layer)

## 12. Summary and Assessment

Evaluate the evidence for AQ excitation patterns.

In [None]:
print("=" * 70)
print("SUMMARY")
print("=" * 70)

# Check AQ theory predictions
final_silhouette = metrics_by_layer[final_layer]['silhouette']
final_ratio = metrics_by_layer[final_layer]['distance_ratio']

# Does separation increase with depth?
silhouettes = [metrics_by_layer[l]['silhouette'] for l in config.layers_to_probe]
ratios = [metrics_by_layer[l]['distance_ratio'] for l in config.layers_to_probe]

silhouette_increases = silhouettes[-1] > silhouettes[0]
ratio_increases = ratios[-1] > ratios[0]

print("\nAQ THEORY PREDICTIONS:")
print("-" * 50)

print(f"\n1. Do same-type probes cluster together?")
if final_silhouette > 0.1:
    print(f"   YES - Silhouette score {final_silhouette:.3f} > 0.1")
else:
    print(f"   UNCLEAR - Silhouette score {final_silhouette:.3f} is low")

print(f"\n2. Do different types separate?")
if final_ratio > 1.2:
    print(f"   YES - Distance ratio {final_ratio:.3f} > 1.2")
else:
    print(f"   UNCLEAR - Distance ratio {final_ratio:.3f} is low")

print(f"\n3. Does separation increase in later layers (crystallization)?")
if silhouette_increases and ratio_increases:
    print(f"   YES - Both silhouette and ratio increase with depth")
    print(f"   Layer 0 -> Layer {final_layer}:")
    print(f"     Silhouette: {silhouettes[0]:.3f} -> {silhouettes[-1]:.3f}")
    print(f"     Ratio: {ratios[0]:.3f} -> {ratios[-1]:.3f}")
elif silhouette_increases or ratio_increases:
    print(f"   PARTIAL - One metric increases, one doesn't")
else:
    print(f"   NO - Metrics don't consistently increase with depth")

print("\n" + "-" * 50)
print("OVERALL ASSESSMENT:")

evidence_score = 0
if final_silhouette > 0.1:
    evidence_score += 1
if final_ratio > 1.2:
    evidence_score += 1
if silhouette_increases:
    evidence_score += 1
if ratio_increases:
    evidence_score += 1

if evidence_score >= 3:
    print("STRONG EVIDENCE for AQ excitation patterns")
elif evidence_score >= 2:
    print("MODERATE EVIDENCE for AQ excitation patterns")
elif evidence_score >= 1:
    print("WEAK EVIDENCE for AQ excitation patterns")
else:
    print("NO CLEAR EVIDENCE for AQ excitation patterns")

print(f"(Evidence score: {evidence_score}/4)")

## 13. Final Metrics Table

In [None]:
print("\nFINAL METRICS BY LAYER:")
print("-" * 60)
print(f"{'Layer':>6} | {'Silhouette':>12} | {'Distance Ratio':>15} | {'Within Dist':>12} | {'Between Dist':>12}")
print("-" * 60)
for layer_idx, metrics in metrics_by_layer.items():
    print(f"{layer_idx:>6} | {metrics['silhouette']:>12.3f} | {metrics['distance_ratio']:>15.3f} | {metrics['within_distance']:>12.3f} | {metrics['between_distance']:>12.3f}")

## 14. Cleanup

In [None]:
# Remove hooks
capture.remove_hooks()

# Store results for further analysis
results = {
    'config': config,
    'activations': all_activations,
    'labels': all_labels,
    'texts': all_texts,
    'metrics_by_layer': metrics_by_layer,
    'pca_results': pca_results_by_layer,
    'cluster_labels': cluster_labels,
    'evidence_score': evidence_score
}

print("\n" + "=" * 70)
print("EXPERIMENT COMPLETE")
print("=" * 70)

---

## Interpretation Guide

### What the Results Mean

**If evidence score is 3-4/4:**
- Strong support for AQ theory
- Activations show stable patterns corresponding to discrimination types
- These patterns crystallize (sharpen) in later layers
- This is consistent with AQ as quasiparticle excitations in the weight field

**If evidence score is 1-2/4:**
- Partial support for AQ theory
- Some structure exists but is not fully consistent
- May need more probes or different model

**If evidence score is 0/4:**
- No clear support for AQ theory
- Activations don't cluster by discrimination type
- Either AQ theory needs revision or this experiment doesn't capture the right patterns

### What to Look For in Visualizations

1. **Scatter plots**: Look for distinct color clusters. Overlap means poor separation.
2. **Similarity matrix**: Look for bright blocks along diagonal (same category = high similarity).
3. **Layer comparison**: Upward trend = crystallization hypothesis supported.

---

**AKIRA Project - Experiment 035**  
Oscar Goldman - Shogu Research Group @ Datamutant.ai