## Sample Dataset

We'll use a small sample dataset of multi-agent conversations for demonstration. In the original experiment, this data would be loaded from external JSON files, but here we've inlined it to make the notebook self-contained.

In [None]:
def predict_winner(state: np.ndarray, agent_outputs: List[str], context: Dict) -> str:
    """Predict winner based on coordinator state."""
    try:
        len_a = len(agent_outputs[0])
        len_b = len(agent_outputs[1])
        
        confidence = np.sum(state[state > 0])
        
        score_a = len_a * (1 + confidence * 0.1)
        score_b = len_b * (1 + confidence * 0.1)
        
        if abs(score_a - score_b) < 50:
            return "tie"
        elif score_a > score_b:
            return "model_a"
        else:
            return "model_b"
    except Exception as e:
        logger.error(f"Error in predict_winner: {e}")
        return "tie"

def run_experiment(examples: List[Dict], coordinator, tracker: TokenTracker, name: str) -> Tuple[List[str], List[int]]:
    """Run experiment with given coordinator."""
    logger.info(f"Running experiment: {name}")
    
    predictions = []
    token_counts = []
    
    for idx, example in enumerate(examples):
        if (idx + 1) % 5 == 0 or len(examples) <= 10:
            print(f"Processing example {idx+1}/{len(examples)}")
            
        # Reset coordinator state for new episode
        coordinator.reset()
        
        # Track tokens for this episode
        episode_start_tokens = tracker.total_tokens
        
        # Extract agent outputs from dataset
        context = example['context']
        agent_outputs = [
            context['response_a'],
            context['response_b']
        ]
        
        # Coordinator processes both agent responses
        state, coordinator_message = coordinator.step(agent_outputs)
        
        # Track tokens
        step_tokens = tracker.log_coordinator_step(agent_outputs, coordinator_message)
        
        # Make prediction
        prediction = predict_winner(state, agent_outputs, context)
        predictions.append(prediction)
        
        # Record tokens for this episode
        episode_tokens = tracker.total_tokens - episode_start_tokens
        token_counts.append(episode_tokens)
    
    logger.info(f"Experiment {name} complete: {len(predictions)} predictions made")
    return predictions, token_counts

def evaluate_performance(predictions: List[str], ground_truth: List[str]) -> Dict[str, float]:
    """Evaluate task performance metrics."""
    try:
        accuracy = accuracy_score(ground_truth, predictions)
        f1_macro = f1_score(ground_truth, predictions, average='macro', zero_division=0)
        f1_weighted = f1_score(ground_truth, predictions, average='weighted', zero_division=0)
        
        return {
            "accuracy": float(accuracy),
            "f1_macro": float(f1_macro),
            "f1_weighted": float(f1_weighted)
        }
    except Exception as e:
        logger.error(f"Error evaluating performance: {e}")
        return {
            "accuracy": 0.0,
            "f1_macro": 0.0,
            "f1_weighted": 0.0
        }

def compute_statistical_significance(baseline_tokens: List[int], method_tokens: List[int]) -> Dict[str, Any]:
    """Test if improvement is statistically significant."""
    try:
        t_stat, t_pval = ttest_rel(baseline_tokens, method_tokens)
        t_significant = t_pval < 0.05
        
        mean_diff = np.mean(baseline_tokens) - np.mean(method_tokens)
        pooled_std = np.sqrt((np.var(baseline_tokens) + np.var(method_tokens)) / 2)
        cohens_d = mean_diff / pooled_std if pooled_std > 0 else 0
        
        return {
            "t_statistic": float(t_stat),
            "t_pvalue": float(t_pval),
            "t_significant": bool(t_significant),
            "cohens_d": float(cohens_d),
            "alpha": 0.05
        }
    except Exception as e:
        logger.error(f"Error computing statistical significance: {e}")
        return {
            "t_statistic": 0.0,
            "t_pvalue": 1.0,
            "t_significant": False,
            "cohens_d": 0.0,
            "alpha": 0.05
        }

print("✅ Experimental functions defined!")

## Experimental Functions

These functions handle running experiments and evaluating performance.

In [None]:
class DynamicRankCoordinator:
    """
    NOVEL: Dynamic rank adaptation coordinator.

    Key innovation: Rank adapts based on episode complexity:
    - Starts with minimal rank (8)
    - Increases rank if uncertainty is high (low prediction confidence)
    - Decreases rank if task is simple (high confidence)
    - Max rank = 64 (still more compressed than full-rank 256)
    """

    def __init__(self, hidden_dim: int = 256, min_rank: int = 8, max_rank: int = 64, num_modules: int = 4):
        self.hidden_dim = hidden_dim
        self.min_rank = min_rank
        self.max_rank = max_rank
        self.current_rank = min_rank  # Start with minimal rank
        self.num_modules = num_modules

        # Initialize matrices for max rank (we'll only use first current_rank columns)
        self.U = np.random.randn(hidden_dim, max_rank) * 0.01
        self.V = np.random.randn(hidden_dim, max_rank) * 0.01

        # RIM sparse attention
        self.active_k = max(1, num_modules // 2)
        self.module_weights = [
            np.random.randn(hidden_dim, max_rank) * 0.01
            for _ in range(num_modules)
        ]

        self.state = np.zeros(hidden_dim)
        self.rank_history = []  # Track rank adaptation history

        logger.info(f"DynamicRankCoordinator initialized: min_rank={min_rank}, max_rank={max_rank}, modules={num_modules}")

    def reset(self):
        """Reset coordinator state."""
        self.state = np.zeros(self.hidden_dim)
        self.current_rank = self.min_rank  # Reset to minimum rank for new episode

    def step(self, agent_outputs: List[str]) -> Tuple[np.ndarray, str]:
        """Dynamic rank recurrent step."""
        features = self._encode_outputs(agent_outputs)

        # ADAPTIVE RANK SELECTION based on input complexity
        self._adapt_rank(agent_outputs, features)

        active_modules = self._select_active_modules(self.state, features)

        # Low-rank update using CURRENT rank (not full max_rank)
        state_proj = self.V[:, :self.current_rank].T @ self.state  # Project to current_rank space
        new_state = self.U[:, :self.current_rank] @ state_proj  # Reconstruct

        # Apply sparse module updates (only using current rank)
        for module_idx in active_modules:
            module_update = self.module_weights[module_idx][:, :self.current_rank] @ state_proj
            new_state += module_update

        new_state += features
        self.state = new_state

        # Generate compressed message (size depends on current rank)
        coordinator_message = self._generate_compressed_message(state_proj)

        # Track rank for analysis
        self.rank_history.append(self.current_rank)

        return self.state.copy(), coordinator_message

    def _adapt_rank(self, agent_outputs: List[str], features: np.ndarray):
        """
        Adapt rank based on episode complexity.
        
        Complexity signals:
        1. Length variance between agents (high variance = complex disagreement)
        2. Feature magnitude (high magnitude = complex input)
        3. State uncertainty (high variance in state = need more capacity)
        """
        # Signal 1: Length variance
        lengths = [len(out) for out in agent_outputs]
        length_variance = np.var(lengths) if len(lengths) > 1 else 0

        # Signal 2: Feature magnitude
        feature_magnitude = np.linalg.norm(features)

        # Signal 3: State uncertainty (variance across dimensions)
        state_uncertainty = np.var(self.state) if np.any(self.state) else 0

        # Combine signals into complexity score (normalized)
        complexity_score = (
            (length_variance / 10000) * 0.4 +  # Normalize length variance
            feature_magnitude * 0.3 +  # Feature magnitude already [0, 1]
            (state_uncertainty * 10) * 0.3  # State uncertainty
        )

        # Map complexity to rank
        if complexity_score < 0.3:
            target_rank = self.min_rank
        elif complexity_score < 0.7:
            # Linear interpolation between min and max
            progress = (complexity_score - 0.3) / 0.4
            target_rank = int(self.min_rank + progress * (self.max_rank - self.min_rank))
        else:
            target_rank = self.max_rank

        # Smooth adaptation (don't jump too fast)
        if target_rank > self.current_rank:
            self.current_rank = min(self.current_rank + 8, target_rank)
        elif target_rank < self.current_rank:
            self.current_rank = max(self.current_rank - 4, target_rank)

        # Ensure bounds
        self.current_rank = max(self.min_rank, min(self.max_rank, self.current_rank))

    def _encode_outputs(self, outputs: List[str]) -> np.ndarray:
        """Encode agent outputs into feature vector."""
        features = []
        for output in outputs:
            features.extend([
                len(output.split()),
                len(output),
                output.count('.'),
                output.count('?'),
            ])

        features_array = np.array(features[:self.hidden_dim])
        if len(features_array) < self.hidden_dim:
            padded = np.zeros(self.hidden_dim)
            padded[:len(features_array)] = features_array
            features_array = padded

        features_array = features_array / (np.linalg.norm(features_array) + 1e-8)
        return features_array

    def _select_active_modules(self, state: np.ndarray, features: np.ndarray) -> List[int]:
        """Select top-k modules based on attention scores."""
        scores = []
        state_proj = self.V[:, :self.current_rank].T @ state

        for module_idx, module_w in enumerate(self.module_weights):
            module_proj = module_w[:, :self.current_rank].T @ features
            score = np.dot(module_proj, state_proj)
            scores.append((score, module_idx))

        scores.sort(reverse=True)
        return [idx for _, idx in scores[:self.active_k]]

    def _generate_compressed_message(self, state_proj: np.ndarray) -> str:
        """Generate compressed coordinator message from projected state."""
        message_parts = []
        for i in range(len(state_proj)):
            val = state_proj[i]
            if abs(val) > 0.1:
                message_parts.append(f"r{i}:{val:.2f}")
        return " ".join(message_parts)

    def get_rank_stats(self) -> Dict[str, float]:
        """Get rank adaptation statistics."""
        if not self.rank_history:
            return {}

        return {
            "mean_rank": float(np.mean(self.rank_history)),
            "std_rank": float(np.std(self.rank_history)),
            "min_rank_used": int(np.min(self.rank_history)),
            "max_rank_used": int(np.max(self.rank_history)),
            "rank_changes": int(np.sum(np.abs(np.diff(self.rank_history)) > 0))
        }

print("✅ DynamicRankCoordinator class defined!")

In [None]:
class StaticLowRankCoordinator:
    """Static low-rank coordinator with fixed rank (baseline method from exp_2_006)."""

    def __init__(self, hidden_dim: int = 256, rank: int = 32, num_modules: int = 4):
        self.hidden_dim = hidden_dim
        self.rank = rank
        self.num_modules = num_modules

        # Low-rank factorization: W = U @ V^T
        self.U = np.random.randn(hidden_dim, rank) * 0.01
        self.V = np.random.randn(hidden_dim, rank) * 0.01

        # RIM sparse attention
        self.active_k = max(1, num_modules // 2)
        self.module_weights = [
            np.random.randn(hidden_dim, rank) * 0.01
            for _ in range(num_modules)
        ]

        self.state = np.zeros(hidden_dim)
        logger.info(f"StaticLowRankCoordinator initialized: rank={rank}, modules={num_modules}")

    def reset(self):
        """Reset coordinator state."""
        self.state = np.zeros(self.hidden_dim)

    def step(self, agent_outputs: List[str]) -> Tuple[np.ndarray, str]:
        """Low-rank recurrent step with sparse module updates."""
        features = self._encode_outputs(agent_outputs)
        active_modules = self._select_active_modules(self.state, features)

        # Low-rank update
        state_proj = self.V.T @ self.state
        new_state = self.U @ state_proj

        # Apply sparse module updates
        for module_idx in active_modules:
            module_update = self.module_weights[module_idx] @ state_proj
            new_state += module_update

        new_state += features
        self.state = new_state

        # Generate compressed message (only rank dimensions)
        coordinator_message = self._generate_compressed_message(state_proj)
        return self.state.copy(), coordinator_message

    def _encode_outputs(self, outputs: List[str]) -> np.ndarray:
        """Encode agent outputs into feature vector."""
        features = []
        for output in outputs:
            features.extend([
                len(output.split()),
                len(output),
                output.count('.'),
                output.count('?'),
            ])

        features_array = np.array(features[:self.hidden_dim])
        if len(features_array) < self.hidden_dim:
            padded = np.zeros(self.hidden_dim)
            padded[:len(features_array)] = features_array
            features_array = padded

        features_array = features_array / (np.linalg.norm(features_array) + 1e-8)
        return features_array

    def _select_active_modules(self, state: np.ndarray, features: np.ndarray) -> List[int]:
        """Select top-k modules based on attention scores."""
        scores = []
        state_proj = self.V.T @ state

        for module_idx, module_w in enumerate(self.module_weights):
            module_proj = module_w.T @ features
            score = np.dot(module_proj[:self.rank], state_proj)
            scores.append((score, module_idx))

        scores.sort(reverse=True)
        return [idx for _, idx in scores[:self.active_k]]

    def _generate_compressed_message(self, state_proj: np.ndarray) -> str:
        """Generate compressed coordinator message from projected state."""
        message_parts = []
        for i in range(len(state_proj)):
            val = state_proj[i]
            if abs(val) > 0.1:
                message_parts.append(f"r{i}:{val:.2f}")
        return " ".join(message_parts)

print("✅ StaticLowRankCoordinator class defined!")

In [None]:
class FullRankCoordinator:
    """Baseline: Full-rank recurrent coordinator."""

    def __init__(self, hidden_dim: int = 256):
        self.hidden_dim = hidden_dim
        self.W = np.random.randn(hidden_dim, hidden_dim) * 0.01
        self.state = np.zeros(hidden_dim)
        logger.info(f"FullRankCoordinator initialized: hidden_dim={hidden_dim}")

    def reset(self):
        """Reset coordinator state."""
        self.state = np.zeros(self.hidden_dim)

    def step(self, agent_outputs: List[str]) -> Tuple[np.ndarray, str]:
        """Recurrent step."""
        features = self._encode_outputs(agent_outputs)
        self.state = self.W @ self.state + features
        coordinator_message = self._generate_message(self.state)
        return self.state.copy(), coordinator_message

    def _encode_outputs(self, outputs: List[str]) -> np.ndarray:
        """Encode agent outputs into feature vector."""
        features = []
        for output in outputs:
            features.extend([
                len(output.split()),
                len(output),
                output.count('.'),
                output.count('?'),
            ])

        features_array = np.array(features[:self.hidden_dim])
        if len(features_array) < self.hidden_dim:
            padded = np.zeros(self.hidden_dim)
            padded[:len(features_array)] = features_array
            features_array = padded

        features_array = features_array / (np.linalg.norm(features_array) + 1e-8)
        return features_array

    def _generate_message(self, state: np.ndarray) -> str:
        """Generate coordinator message from state."""
        message_parts = []
        for i in range(0, len(state), 10):
            val = state[i]
            if abs(val) > 0.1:
                message_parts.append(f"dim{i}:{val:.2f}")
        return " ".join(message_parts)

print("✅ FullRankCoordinator class defined!")

## Coordinator Implementations

Now we'll implement the three different coordinator types:

In [None]:
class TokenTracker:
    """Track token usage for multi-agent coordination."""

    def __init__(self, model: str = "gpt-4"):
        """Initialize token tracker with tiktoken encoder."""
        try:
            self.encoding = tiktoken.encoding_for_model(model)
            logger.info(f"TokenTracker initialized with model: {model}")
        except Exception as e:
            logger.warning(f"Could not load model-specific encoding, using cl100k_base: {e}")
            self.encoding = tiktoken.get_encoding("cl100k_base")

        self.total_tokens = 0
        self.episode_tokens = []
        self.call_count = 0

    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        try:
            tokens = len(self.encoding.encode(text))
            return tokens
        except Exception as e:
            logger.debug(f"Error counting tokens: {e}")
            # Fallback: approximate as words * 1.3
            return int(len(text.split()) * 1.3)

    def log_coordinator_step(self, agent_outputs: List[str], coordinator_message: str = ""):
        """Log tokens for a coordinator step with comprehensive validation."""
        try:
            if not isinstance(agent_outputs, list):
                logger.error(f"agent_outputs must be a list, got {type(agent_outputs)}")
                raise TypeError(f"agent_outputs must be a list, got {type(agent_outputs)}")

            step_tokens = 0

            # Count tokens in agent outputs
            for idx, output in enumerate(agent_outputs):
                if not isinstance(output, str):
                    logger.warning(f"Agent output {idx} is not a string: {type(output)}")
                    output = str(output)

                output_tokens = self.count_tokens(output)
                step_tokens += output_tokens

            # Count tokens in coordinator message
            if coordinator_message:
                if not isinstance(coordinator_message, str):
                    coordinator_message = str(coordinator_message)

                coord_tokens = self.count_tokens(coordinator_message)
                step_tokens += coord_tokens

            # Update tracking
            self.total_tokens += step_tokens
            self.episode_tokens.append(step_tokens)
            self.call_count += 1

            return step_tokens

        except Exception as e:
            logger.error(f"Error logging coordinator step: {e}")
            return 0

    def get_stats(self) -> Dict[str, float]:
        """Get aggregated token statistics."""
        return {
            "total_tokens": self.total_tokens,
            "num_episodes": len(self.episode_tokens),
            "mean_tokens_per_episode": np.mean(self.episode_tokens) if self.episode_tokens else 0,
            "std_tokens_per_episode": np.std(self.episode_tokens) if self.episode_tokens else 0,
            "call_count": self.call_count
        }

print("✅ TokenTracker class defined!")

## Token Tracking System

The TokenTracker class monitors token usage across different coordination steps.

In [None]:
@dataclass
class ExampleResult:
    """Single example result."""
    input: str
    output: str
    context: Dict[str, Any]
    dataset: str
    split: str
    predict_baseline: str
    predict_static: str
    predict_dynamic: str
    method: str

@dataclass
class ExperimentResult:
    """Schema matching exp_gen_sol_out.json format."""
    examples: List[Dict[str, Any]]

def truncate_str(text: str, max_len: int = 100) -> str:
    """Truncate long strings for logging."""
    if len(text) <= max_len:
        return text
    return text[:max_len] + f"... ({len(text)} chars total)"

print("✅ Data classes and helper functions defined!")

## Data Classes and Helper Functions

First, let's define the data structures and utility functions used throughout the experiment.

In [None]:
# Import required libraries
import json
import logging
import numpy as np
import tiktoken
from scipy.stats import ttest_rel
from sklearn.metrics import accuracy_score, f1_score
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass, asdict
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML
import warnings
warnings.filterwarnings('ignore')

# Set up logging for the notebook
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully!")

# Dynamic Rank Adaptation for Low-Rank Recurrent Coordinator

This notebook demonstrates an experiment comparing different coordination mechanisms for multi-LLM systems:

1. **Baseline**: Full-rank recurrent coordinator (256×256 parameters)
2. **Static Low-Rank**: Fixed rank=32 coordinator 
3. **Dynamic Adaptive Rank**: Rank adapts based on episode complexity (8-64 range)

## Key Innovation

The dynamic rank adaptation mechanism automatically adjusts the coordinator's representational capacity based on episode complexity:
- Starts with minimal rank (8) for simple tasks
- Increases rank when uncertainty is high (low prediction confidence)
- Decreases rank when task is simple (high confidence)
- Maximum rank = 64 (still more compressed than full-rank 256)

This aims to achieve better token efficiency by using low rank for simple episodes and higher rank only when needed.