## Lyapunov Spectrum Computation for ELM Neuron

This notebook demonstrates how to compute and analyze the Lyapunov spectrum for the ELM neuron model.

The Lyapunov spectrum characterizes the dynamical properties of the system:
- **Positive exponents**: Directions of exponential divergence (chaos)
- **Zero exponents**: Marginal stability
- **Negative exponents**: Directions of exponential convergence (stability)

The maximum Lyapunov exponent indicates:
- λ_max > 0: Chaotic dynamics
- λ_max ≈ 0: Edge of chaos (critical dynamics)
- λ_max < 0: Stable/periodic dynamics

## Setup

In [None]:
# Imports
import os
import sys
import json
from pathlib import Path
import random

import numpy as np
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
# Relative Imports
package_path = Path(os.path.abspath(os.path.join(os.path.dirname('__file__'), '..')))
sys.path.insert(0, str(package_path))

from src.expressive_leaky_memory_neuron_v2 import ELM
from src.lyapunov_spectrum import LyapunovSpectrumCalculator, analyze_lyapunov_spectrum
from src.neuronio.neuronio_data_utils import (
    NEURONIO_DATA_DIM, 
    NEURONIO_LABEL_DIM, 
    get_data_files_from_folder,
)
from src.neuronio.neuronio_data_loader import NeuronIO

In [None]:
# General Config
general_config = dict()
general_config["seed"] = 0
general_config["device"] = 'cuda' if torch.cuda.is_available() else 'cpu'
general_config["model_dir"] = "../models/best_elm_neuron"  # Change to desired model
torch_device = torch.device(general_config["device"])
print("Torch Device: ", torch_device)

In [None]:
# Seeding & Determinism
os.environ['PYTHONHASHSEED'] = str(general_config["seed"])
random.seed(general_config["seed"])
np.random.seed(general_config["seed"])
torch.manual_seed(general_config["seed"])
torch.cuda.manual_seed(general_config["seed"])
torch.backends.cudnn.deterministic = True

## Load Model

In [None]:
# Load model config
with open(general_config["model_dir"] + "/model_config.json") as f:
    model_config = json.load(f)

# Load train config
with open(general_config["model_dir"] + "/train_config.json") as f:
    train_config = json.load(f)

print("Model config:")
print(f"  num_memory: {model_config['num_memory']}")
print(f"  num_branch: {model_config['num_branch']}")
print(f"  num_input: {model_config['num_input']}")

In [None]:
# Initialize and load the ELM model
model = ELM(**model_config).to(torch_device)

state_dict = torch.load(
    general_config["model_dir"] + "/neuronio_best_model_state.pt", 
    map_location=torch_device
)
print(model.load_state_dict(state_dict))
model.eval()

print(f"\nModel state dimension: {model.num_branch + model.num_memory}")
print(f"  - Branch states: {model.num_branch}")
print(f"  - Memory states: {model.num_memory}")

## Load Data (Optional - for computing over dataset)

You can either:
1. Use real data from NeuronIO dataset
2. Use synthetic random data for quick testing

In [None]:
# Option 1: Load real data (requires downloaded NeuronIO dataset)
USE_REAL_DATA = False  # Set to True if you have the dataset

if USE_REAL_DATA:
    data_dir_path = Path("~/Data").expanduser().resolve()
    test_data_dir_path = data_dir_path / "neuronio_test_data"
    test_files = get_data_files_from_folder([str(test_data_dir_path)])
    
    # Create a small data loader for testing
    test_data_loader = NeuronIO(
        file_paths=test_files[:1],  # Use just one file for speed
        batches_per_epoch=10,
        batch_size=4,
        input_window_size=train_config["input_window_size"],
        num_workers=2,
        num_prefetch_batch=10,
        file_load_fraction=0.1,
        seed=general_config["seed"],
        device=torch_device,
    )
    print("Loaded real NeuronIO data")
else:
    # Option 2: Use synthetic random data
    print("Using synthetic random spike data")

## Compute Lyapunov Spectrum

### Parameters:
- `num_transient`: Number of initial timesteps to discard (let system settle)
- `num_iterations`: Number of timesteps used for computing the spectrum
- `orthonormalize_every`: How often to perform QR decomposition (1 = every step, more accurate)

In [None]:
# Initialize Lyapunov calculator
lyapunov_calc = LyapunovSpectrumCalculator(
    model=model,
    model_version="v2",  # Use "v1" for old version, "v2" for new version
    device=torch_device,
)

print(f"State space dimension: {lyapunov_calc.state_dim}")

### Quick Test: Single Trajectory

In [None]:
# Generate or load a test input sequence
if USE_REAL_DATA:
    test_input, _ = next(iter(test_data_loader))
else:
    # Generate random spike input (sparse binary spikes)
    batch_size = 2
    seq_length = 3000  # Long enough for transient + computation
    spike_prob = 0.05
    test_input = (torch.rand(batch_size, seq_length, model_config['num_input']) < spike_prob).float()
    test_input = test_input.to(torch_device)

print(f"Test input shape: {test_input.shape}")
print(f"Spike rate: {test_input.mean().item():.4f}")

In [None]:
# Compute Lyapunov spectrum for a single trajectory
result = lyapunov_calc.compute_spectrum(
    inputs=test_input,
    num_transient=500,        # Discard first 500 timesteps
    num_iterations=2000,      # Use 2000 timesteps for computation
    orthonormalize_every=1,   # QR decomposition every step (most accurate)
    return_trajectory=False,  # Set to True to get state trajectory
    verbose=True,
)

print(f"\nComputed spectrum shape: {result['lyapunov_exponents'].shape}")

In [None]:
# Analyze the results
metrics = analyze_lyapunov_spectrum(result, verbose=True)

### Visualization

In [None]:
# Plot the Lyapunov spectrum
spectrum_mean = np.mean(result['lyapunov_exponents'], axis=0)
spectrum_sorted = np.sort(spectrum_mean)[::-1]

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Full spectrum
axes[0].plot(spectrum_sorted, 'o-', markersize=3, linewidth=1)
axes[0].axhline(y=0, color='r', linestyle='--', alpha=0.5, label='Zero line')
axes[0].set_xlabel('Index (sorted)', fontsize=12)
axes[0].set_ylabel('Lyapunov Exponent', fontsize=12)
axes[0].set_title('Full Lyapunov Spectrum', fontsize=14, fontweight='bold')
axes[0].grid(True, alpha=0.3)
axes[0].legend()

# Plot 2: Histogram
axes[1].hist(spectrum_mean, bins=50, edgecolor='black', alpha=0.7)
axes[1].axvline(x=0, color='r', linestyle='--', linewidth=2, label='Zero')
axes[1].axvline(x=spectrum_mean.max(), color='g', linestyle='--', linewidth=2, 
                label=f'Max: {spectrum_mean.max():.4f}')
axes[1].set_xlabel('Lyapunov Exponent', fontsize=12)
axes[1].set_ylabel('Count', fontsize=12)
axes[1].set_title('Distribution of Lyapunov Exponents', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

print(f"\nTop 10 Lyapunov exponents:")
for i, val in enumerate(spectrum_sorted[:10]):
    print(f"  λ_{i+1}: {val:.6f}")

## Advanced: Compute Across Multiple Trajectories

In [None]:
# Compute spectrum across multiple different input trajectories
num_trajectories = 5
all_spectra = []

print(f"Computing Lyapunov spectrum for {num_trajectories} different trajectories...\n")

for i in range(num_trajectories):
    if USE_REAL_DATA and i < len(test_data_loader):
        test_input, _ = next(iter(test_data_loader))
    else:
        # Generate different random inputs
        test_input = (torch.rand(2, 3000, model_config['num_input']) < spike_prob).float()
        test_input = test_input.to(torch_device)
    
    result_i = lyapunov_calc.compute_spectrum(
        inputs=test_input,
        num_transient=500,
        num_iterations=2000,
        orthonormalize_every=1,
        verbose=False,
    )
    all_spectra.append(result_i['lyapunov_exponents'])
    print(f"Trajectory {i+1}: max λ = {result_i['max_exponent'].mean():.6f}")

# Aggregate results
all_spectra = np.concatenate(all_spectra, axis=0)
mean_spectrum = np.mean(all_spectra, axis=0)
std_spectrum = np.std(all_spectra, axis=0)

print(f"\n{'-'*60}")
print(f"Aggregate statistics over {num_trajectories} trajectories:")
print(f"  Mean max λ: {np.max(mean_spectrum):.6f} ± {std_spectrum[np.argmax(mean_spectrum)]:.6f}")
print(f"  Mean # positive: {np.mean(np.sum(all_spectra > 0, axis=1)):.1f}")
print(f"{'-'*60}")

In [None]:
# Plot aggregate spectrum with error bars
mean_sorted_idx = np.argsort(mean_spectrum)[::-1]
mean_sorted = mean_spectrum[mean_sorted_idx]
std_sorted = std_spectrum[mean_sorted_idx]

fig, ax = plt.subplots(figsize=(12, 6))
ax.errorbar(range(len(mean_sorted)), mean_sorted, yerr=std_sorted, 
            fmt='o-', markersize=3, linewidth=1, capsize=2, alpha=0.7,
            label='Mean ± Std')
ax.axhline(y=0, color='r', linestyle='--', alpha=0.5, linewidth=2, label='Zero line')
ax.fill_between(range(len(mean_sorted)), 0, mean_sorted, where=(mean_sorted > 0), 
                 alpha=0.3, color='red', label='Chaotic directions')
ax.fill_between(range(len(mean_sorted)), 0, mean_sorted, where=(mean_sorted < 0), 
                 alpha=0.3, color='blue', label='Stable directions')
ax.set_xlabel('Index (sorted)', fontsize=12)
ax.set_ylabel('Lyapunov Exponent', fontsize=12)
ax.set_title(f'Aggregate Lyapunov Spectrum ({num_trajectories} trajectories)', 
             fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.tight_layout()
plt.show()

## Compare Different Models

You can compare Lyapunov spectra across different model sizes

In [None]:
# List of models to compare
model_dirs = [
    "../models/num_memory_10",
    "../models/num_memory_20",
    "../models/num_memory_50",
    "../models/num_memory_100",
]

comparison_results = {}

for model_dir in model_dirs:
    if not Path(model_dir).exists():
        print(f"Skipping {model_dir} (not found)")
        continue
    
    # Load model
    with open(model_dir + "/model_config.json") as f:
        config = json.load(f)
    
    model_comp = ELM(**config).to(torch_device)
    state_dict = torch.load(model_dir + "/neuronio_best_model_state.pt", map_location=torch_device)
    model_comp.load_state_dict(state_dict)
    model_comp.eval()
    
    # Compute spectrum
    calc = LyapunovSpectrumCalculator(model_comp, model_version="v2", device=torch_device)
    
    # Use same input for fair comparison
    test_input = (torch.rand(2, 3000, config['num_input']) < 0.05).float().to(torch_device)
    
    result = calc.compute_spectrum(
        inputs=test_input,
        num_transient=500,
        num_iterations=2000,
        orthonormalize_every=1,
        verbose=False,
    )
    
    model_name = f"d_m={config['num_memory']}"
    comparison_results[model_name] = np.mean(result['lyapunov_exponents'], axis=0)
    
    print(f"{model_name}: max λ = {np.max(comparison_results[model_name]):.6f}, "
          f"# positive = {np.sum(comparison_results[model_name] > 0)}")

In [None]:
# Plot comparison
if len(comparison_results) > 0:
    fig, ax = plt.subplots(figsize=(12, 6))
    
    for name, spectrum in comparison_results.items():
        sorted_spectrum = np.sort(spectrum)[::-1]
        ax.plot(sorted_spectrum, 'o-', markersize=3, linewidth=1.5, label=name, alpha=0.7)
    
    ax.axhline(y=0, color='k', linestyle='--', alpha=0.3, linewidth=2)
    ax.set_xlabel('Index (sorted)', fontsize=12)
    ax.set_ylabel('Lyapunov Exponent', fontsize=12)
    ax.set_title('Lyapunov Spectrum Comparison Across Model Sizes', fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3)
    ax.legend()
    plt.tight_layout()
    plt.show()

## Save Results

In [None]:
# Save results to file
output_dir = Path(general_config["model_dir"]) / "lyapunov_analysis"
output_dir.mkdir(exist_ok=True)

np.savez(
    output_dir / "lyapunov_spectrum.npz",
    spectrum=mean_spectrum,
    spectrum_std=std_spectrum,
    all_spectra=all_spectra,
    metrics=metrics,
)

print(f"Results saved to {output_dir}")