# 11.2c: Primordial Snowball Test

**Hypothesis:** Qwen's black holes start as a quantized Gaussian snowball at initialization, not a perfect singularity.

## Background

From 11.2b, we learned that continuous noise injection causes cumulative diffusion (random walk). But Qwen's black holes are *tightly clustered* in a 2ε hypercube.

**New hypothesis:** Qwen initializes embeddings with:
```python
base_vector + Gaussian(0, σ)
```

where σ is small enough that after bfloat16 quantization, dead tokens collapse to ~60 distinct vectors within a 2ε hypercube.

## Experimental Design

**Initialization:** All tokens start near one random vector, with Gaussian jitter at scale σ ≈ ε

**Training:** Standard training, NO per-step noise injection

**Success criteria:**
- Initial dead tokens should cluster into multiple black holes (C > 1)
- Max L∞ between black holes ≈ 2ε (tight snowball)
- After training, black holes remain stable or consolidate further

## Parameters

In [33]:
# Model architecture
VOCAB_SIZE = 128      # ASCII tokens
HIDDEN_DIM = 64       # Embedding dimension
N_LAYER = 2           # Transformer layers
N_HEAD = 2            # Attention heads
MAX_SEQ_LEN = 128     # Context window

# Initialization
INIT_MODE = "snowball"  # Gaussian cluster, not singularity
INIT_SIGMA = 1e-5       # Noise scale (bfloat16 ULP)

# Training
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 0.01
BATCH_SIZE = 32
NUM_TRAIN_STEPS = 10000

# Data
CORPUS_PATH = "../data/training_corpus.txt"
OUTPUT_DIR = "../data/embeddings_128vocab_primordial_snowball"
OUTPUT_FILE = "embedding_evolution.safetensors"

RANDOM_SEED = 42

## Imports

In [18]:
import torch
import torch.nn as nn
from transformers import GPT2Config, GPT2LMHeadModel, Trainer, TrainingArguments, TrainerCallback
from torch.utils.data import Dataset
import numpy as np
from pathlib import Path
from safetensors.torch import save_file
from collections import Counter

torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

## Detect Hardware

In [19]:
if torch.cuda.is_available():
    DEVICE = "cuda"
    print(f"Using CUDA: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    DEVICE = "mps"
    print("Using MPS (Apple Silicon)")
else:
    DEVICE = "cpu"
    print("Using CPU")

Using MPS (Apple Silicon)


## Load Training Corpus

In [20]:
print(f"Loading corpus from: {CORPUS_PATH}\n")

with open(CORPUS_PATH, 'r', encoding='ascii') as f:
    corpus_text = f.read()

# Convert to bytes and filter to vocab size
corpus_bytes = [b for b in corpus_text.encode('ascii') if b < VOCAB_SIZE]

print(f"✓ Corpus loaded")
print(f"Total bytes: {len(corpus_bytes):,}")
print(f"Vocabulary size: {VOCAB_SIZE}")

# Count unique bytes
unique_bytes = set(corpus_bytes)
dead_tokens = VOCAB_SIZE - len(unique_bytes)

print(f"Unique bytes in corpus: {len(unique_bytes)}")
print(f"Dead tokens (never appear): {dead_tokens} ({100 * dead_tokens / VOCAB_SIZE:.1f}%)")

Loading corpus from: ../data/training_corpus.txt

✓ Corpus loaded
Total bytes: 265,905
Vocabulary size: 128
Unique bytes in corpus: 77
Dead tokens (never appear): 51 (39.8%)


## Create Dataset

In [21]:
class ByteDataset(Dataset):
    """Dataset for byte-level language modeling."""
    def __init__(self, byte_sequence, max_seq_len):
        self.byte_sequence = byte_sequence
        self.max_seq_len = max_seq_len
    
    def __len__(self):
        return max(0, len(self.byte_sequence) - self.max_seq_len)
    
    def __getitem__(self, idx):
        chunk = self.byte_sequence[idx : idx + self.max_seq_len + 1]
        return {
            'input_ids': torch.tensor(chunk[:-1], dtype=torch.long),
            'labels': torch.tensor(chunk[1:], dtype=torch.long)
        }

dataset = ByteDataset(corpus_bytes, MAX_SEQ_LEN)
print(f"\n✓ Dataset created")
print(f"Training examples: {len(dataset):,}")


✓ Dataset created
Training examples: 265,777


## Create Model

In [22]:
# Create standard GPT-2 config
config = GPT2Config(
    vocab_size=VOCAB_SIZE,
    n_positions=MAX_SEQ_LEN,
    n_embd=HIDDEN_DIM,
    n_layer=N_LAYER,
    n_head=N_HEAD,
    resid_pdrop=0.0,
    embd_pdrop=0.0,
    attn_pdrop=0.0,
    tie_word_embeddings=True,
)

# Create model
model = GPT2LMHeadModel(config)

# Convert to bfloat16
model = model.to(torch.bfloat16)

total_params = sum(p.numel() for p in model.parameters())
print(f"\n✓ Model created (bfloat16, standard LayerNorm)")
print(f"Total parameters: {total_params:,}")


✓ Model created (bfloat16, standard LayerNorm)
Total parameters: 116,480


## Apply Snowball Initialization

In [23]:
print(f"\nApplying primordial snowball initialization...")
print(f"  Base vector + Gaussian(0, σ={INIT_SIGMA:.2e})\n")

with torch.no_grad():
    # Generate one random base vector
    base_vector = torch.randn(HIDDEN_DIM)
    
    # Add small Gaussian jitter to each token
    noise = torch.randn(VOCAB_SIZE, HIDDEN_DIM) * INIT_SIGMA
    
    # Combine: all tokens near base_vector
    model.transformer.wte.weight[:] = base_vector + noise

# Analyze initial snowball structure
initial_embeddings = model.transformer.wte.weight.data.clone().cpu()
unique_init, inverse_init, counts_init = torch.unique(
    initial_embeddings,
    dim=0,
    return_inverse=True,
    return_counts=True
)

init_black_holes = (counts_init > 1).sum().item()
init_bh_population = counts_init[counts_init > 1].sum().item()

print(f"✓ Snowball initialization complete")
print(f"  Base vector norm: {base_vector.norm().item():.6f}")
print(f"  Initial unique vectors: {len(unique_init)}")
print(f"  Initial black holes (C₀): {init_black_holes}")
print(f"  Initial black hole population (P₀): {init_bh_population}")


Applying primordial snowball initialization...
  Base vector + Gaussian(0, σ=1.00e-05)

✓ Snowball initialization complete
  Base vector norm: 8.201394
  Initial unique vectors: 21
  Initial black holes (C₀): 15
  Initial black hole population (P₀): 122


## Pre-allocate Embedding History

In [24]:
# Pre-allocate tensor for all snapshots
embedding_history = torch.zeros(
    (NUM_TRAIN_STEPS + 1, VOCAB_SIZE, HIDDEN_DIM),
    dtype=torch.bfloat16
)

# Save initial state
embedding_history[0] = model.transformer.wte.weight.data.clone().cpu()

print(f"\n✓ Pre-allocated embedding history")
print(f"  Shape: {embedding_history.shape}")
print(f"  Memory: {embedding_history.element_size() * embedding_history.numel() / 1e6:.1f} MB")


✓ Pre-allocated embedding history
  Shape: torch.Size([10001, 128, 64])
  Memory: 163.9 MB


## Define Callback (No Noise Injection)

In [25]:
class EmbeddingHistoryCallback(TrainerCallback):
    """Save embeddings to history (no noise injection)."""
    
    def __init__(self, embedding_history):
        self.embedding_history = embedding_history
    
    def on_step_end(self, args, state, control, model=None, **kwargs):
        step = state.global_step
        
        # Store in memory (no noise injection)
        self.embedding_history[step] = model.transformer.wte.weight.data.clone().cpu()
        
        # Print progress every 1000 steps
        if step % 1000 == 0 and step > 0:
            embeddings = self.embedding_history[step]
            centroid_norm = embeddings.mean(dim=0).norm().item()
            print(f"[Step {step:5d}] Centroid norm: {centroid_norm:.6f}")
        
        return control

print(f"✓ Callback defined (no noise injection)")

✓ Callback defined (no noise injection)


## Configure Training

In [26]:
training_args = TrainingArguments(
    output_dir="./training_output_primordial_snowball",
    max_steps=NUM_TRAIN_STEPS,
    per_device_train_batch_size=BATCH_SIZE,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    logging_steps=100,
    save_steps=NUM_TRAIN_STEPS + 1,  # Don't save checkpoints
    save_total_limit=0,
    seed=RANDOM_SEED,
    dataloader_num_workers=0,
    use_cpu=(DEVICE == "cpu"),
    bf16=True,
    report_to="none",
)

print("Training configuration:")
print(f"  Device: {DEVICE}")
print(f"  Steps: {NUM_TRAIN_STEPS:,}")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  Weight decay: {WEIGHT_DECAY}")
print(f"  Initial noise: σ = {INIT_SIGMA:.2e} (at t=0 only)")

Training configuration:
  Device: mps
  Steps: 10,000
  Batch size: 32
  Learning rate: 0.001
  Weight decay: 0.01
  Initial noise: σ = 1.00e-05 (at t=0 only)


## Create Trainer and Train

In [27]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    callbacks=[EmbeddingHistoryCallback(embedding_history)],
)

print("\n" + "="*80)
print("Starting training from primordial snowball...")
print("="*80 + "\n")

trainer.train()

print("\n" + "="*80)
print("✓ Training complete!")
print("="*80)


Starting training from primordial snowball...





Step,Training Loss
100,3.4507
200,3.0907
300,3.0831
400,3.0792
500,3.0805
600,3.08
700,3.0731
800,3.0747
900,3.0763
1000,3.0721


[Step  1000] Centroid norm: 8.187500
[Step  2000] Centroid norm: 8.187500
[Step  3000] Centroid norm: 8.187500
[Step  4000] Centroid norm: 8.187500
[Step  5000] Centroid norm: 8.187500
[Step  6000] Centroid norm: 8.187500
[Step  7000] Centroid norm: 8.187500
[Step  8000] Centroid norm: 8.187500
[Step  9000] Centroid norm: 8.187500
[Step 10000] Centroid norm: 8.187500

✓ Training complete!


## Save Embedding History

In [28]:
output_path = Path(OUTPUT_DIR)
output_path.mkdir(parents=True, exist_ok=True)
output_file = output_path / OUTPUT_FILE

print(f"\nSaving embedding history to {output_file}...")
save_file({'embedding_history': embedding_history}, output_file)

print(f"✓ Saved {output_file.stat().st_size / 1e6:.1f} MB")


Saving embedding history to ../data/embeddings_128vocab_primordial_snowball/embedding_evolution.safetensors...
✓ Saved 163.9 MB


## Analyze Results: Count Black Holes

In [29]:
# Get final embeddings
final_embeddings = embedding_history[-1]

# Find unique vectors
unique_vectors, inverse_indices = torch.unique(
    final_embeddings,
    dim=0,
    return_inverse=True
)

# Count populations
vector_populations = Counter(inverse_indices.tolist())

# Black holes are vectors with population ≥ 2
black_holes = {vec_id: pop for vec_id, pop in vector_populations.items() if pop >= 2}

C = len(black_holes)  # Black hole count
P = sum(black_holes.values())  # Total population

print(f"\n{'='*80}")
print(f"BLACK HOLE ANALYSIS")
print(f"{'='*80}")
print(f"Initial state (t=0):")
print(f"  Black hole count (C₀): {init_black_holes}")
print(f"  Black hole population (P₀): {init_bh_population}")
print(f"\nFinal state (t={NUM_TRAIN_STEPS:,}):")
print(f"  Black hole count (C_f): {C}")
print(f"  Black hole population (P_f): {P}")
print(f"\nDead tokens in corpus: {dead_tokens}")
print(f"{'='*80}")

if C > 1:
    print(f"\n✓ Multiple black holes present")
    if C < init_black_holes:
        print(f"  → Consolidation: {init_black_holes} → {C} black holes")
    elif C > init_black_holes:
        print(f"  → Fragmentation: {init_black_holes} → {C} black holes")
    else:
        print(f"  → Stable: {C} black holes maintained")
elif C == 1:
    print(f"\n⚠ Collapsed to single black hole")
    print(f"  → Initial snowball merged during training")
else:
    print(f"\n✓ All tokens escaped black holes!")
    print(f"  → Training dispersed the snowball completely")


BLACK HOLE ANALYSIS
Initial state (t=0):
  Black hole count (C₀): 15
  Black hole population (P₀): 122

Final state (t=10,000):
  Black hole count (C_f): 2
  Black hole population (P_f): 51

Dead tokens in corpus: 51

✓ Multiple black holes present
  → Consolidation: 15 → 2 black holes


## Black Hole Details

In [30]:
if C > 0:
    sorted_bhs = sorted(black_holes.items(), key=lambda x: x[1], reverse=True)
    
    print(f"\nBlack hole populations (sorted by size):\n")
    for i, (vec_id, pop) in enumerate(sorted_bhs[:10], 1):  # Show top 10
        print(f"BH #{i}: {pop} tokens")
    
    if C > 10:
        print(f"... ({C - 10} more black holes)")
    
    print(f"\nLargest: {sorted_bhs[0][1]} tokens")
    if C > 1:
        print(f"Smallest: {sorted_bhs[-1][1]} tokens")


Black hole populations (sorted by size):

BH #1: 49 tokens
BH #2: 2 tokens

Largest: 49 tokens
Smallest: 2 tokens


## Geometric Analysis: Dead Token Snowball

In [31]:
# Identify dead tokens
dead_token_ids = sorted([t for t in range(VOCAB_SIZE) if t not in unique_bytes])

# Extract dead token embeddings
dead_embeddings_final = final_embeddings[dead_token_ids].to(torch.float32)

# Pairwise L∞ distances
n_dead = len(dead_token_ids)
v1 = dead_embeddings_final.unsqueeze(1)  # [n, 1, d]
v2 = dead_embeddings_final.unsqueeze(0)  # [1, n, d]
diffs = v1 - v2  # [n, n, d]
l_inf_distances = torch.abs(diffs).max(dim=2)[0]  # [n, n]

# Mask out diagonal
mask = ~torch.eye(n_dead, dtype=torch.bool)
l_inf_nonzero = l_inf_distances[mask]

print(f"\n{'='*80}")
print(f"DEAD TOKEN SNOWBALL GEOMETRY")
print(f"{'='*80}")
print(f"Dead tokens: {n_dead}")
print(f"\nL∞ distances:")
print(f"  Min: {l_inf_nonzero.min().item():.6e}")
print(f"  Max: {l_inf_nonzero.max().item():.6e}")
print(f"  Mean: {l_inf_nonzero.mean().item():.6e}")
print(f"\nReference scales:")
print(f"  ε (bfloat16 ULP): {INIT_SIGMA:.2e}")
print(f"  2ε (lattice scale): {2*INIT_SIGMA:.2e}")
print(f"  Max L∞ / ε: {l_inf_nonzero.max().item() / INIT_SIGMA:.1f}×")
print(f"\nConclusion:")
if l_inf_nonzero.max().item() < 10 * INIT_SIGMA:
    print(f"  ✓ Dead tokens form tight snowball (max L∞ < 10ε)")
    print(f"  → Matches Qwen's observed black hole geometry")
else:
    print(f"  ✗ Dead tokens scattered widely (max L∞ = {l_inf_nonzero.max().item() / INIT_SIGMA:.0f}× ε)")
    print(f"  → Does not match Qwen's tight clustering")
print(f"{'='*80}")


DEAD TOKEN SNOWBALL GEOMETRY
Dead tokens: 51

L∞ distances:
  Min: 0.000000e+00
  Max: 9.765625e-04
  Mean: 7.506127e-05

Reference scales:
  ε (bfloat16 ULP): 1.00e-05
  2ε (lattice scale): 2.00e-05
  Max L∞ / ε: 97.7×

Conclusion:
  ✗ Dead tokens scattered widely (max L∞ = 98× ε)
  → Does not match Qwen's tight clustering


## Summary

In [32]:
initial_centroid = embedding_history[0].mean(dim=0)
final_centroid = final_embeddings.mean(dim=0)
displacement = (final_centroid - initial_centroid).norm().item()

print(f"\n{'='*80}")
print(f"EXPERIMENT SUMMARY")
print(f"{'='*80}")
print(f"Model: GPT-2 (standard LayerNorm)")
print(f"Training steps: {NUM_TRAIN_STEPS:,}")
print(f"Dead tokens: {dead_tokens}")
print(f"Initialization: Primordial snowball (σ = {INIT_SIGMA:.2e})")
print(f"\nResults:")
print(f"  Initial black holes: {init_black_holes}")
print(f"  Final black holes: {C}")
print(f"  Final black hole population: {P}")
print(f"  Max L∞ (dead tokens): {l_inf_nonzero.max().item():.6e}")
print(f"  Centroid displacement: {displacement:.6f}")
print(f"\nConclusion:")
if C > 1 and l_inf_nonzero.max().item() < 10 * INIT_SIGMA:
    print(f"  ✓ HYPOTHESIS SUPPORTED")
    print(f"  Primordial snowball initialization reproduces Qwen-like black hole structure")
    print(f"  Black holes remain stable/consolidate during training")
elif C == 1:
    print(f"  ⚠ PARTIAL SUPPORT")
    print(f"  Snowball initialization worked, but training merged clusters")
    print(f"  May need larger initial σ or different training dynamics")
else:
    print(f"  ✗ HYPOTHESIS CHALLENGED")
    print(f"  Initial clustering did not produce stable snowball structure")
print(f"{'='*80}")


EXPERIMENT SUMMARY
Model: GPT-2 (standard LayerNorm)
Training steps: 10,000
Dead tokens: 51
Initialization: Primordial snowball (σ = 1.00e-05)

Results:
  Initial black holes: 15
  Final black holes: 2
  Final black hole population: 51
  Max L∞ (dead tokens): 9.765625e-04
  Centroid displacement: 0.359375

Conclusion:
  ✗ HYPOTHESIS CHALLENGED
  Initial clustering did not produce stable snowball structure
