<a href="https://colab.research.google.com/github/auralmn/aura_liquidmoe_snn_llama-3.2-3B_Notebooks/blob/main/Aura_Liquid_Mixture_of_Experts_Bio_Plausible_SNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The AURA Brain experiment #7
### 2025-11-16
### Author: Nicolas Cloutier
### Entity: Cognitiv Aura
### License: Apache 2.0
*** note: not for production use, for experimentation only contact the owner for production use ***


In [None]:
# --- uninstall conflicting wandb ---
!pip uninstall -y wandb

# --- 1. Install All Dependencies ---
!pip install "unsloth[colab-new]"
!pip install transformers torch sentence-transformers accelerate huggingface_hub datasets scipy scikit-learn -q

# --- 2. Import Core Libraries (for the whole notebook) ---
import uuid
import enum
from enum import Enum # Explicitly import Enum as requested
import numpy as np
import random
import json
import itertools
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import AutoTokenizer, logging
from sentence_transformers import SentenceTransformer
from huggingface_hub import login
import os
from unsloth import FastLanguageModel
from typing import List, Dict, Optional, Union, Any, Tuple, Literal
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
import asyncio
import io
import csv
import threading # For Memory Pool
import time
from scipy.signal import hilbert # For Temporal Interpolator
from scipy.linalg import expm # For Temporal Interpolator
import glob # For finding latest keyframe
from datasets import load_dataset # For GoEmotions
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import nest_asyncio

# Suppress transformer warnings
logging.set_verbosity_error()

# --- 3. Colab/HF Login ---
try:
    from google.colab import userdata
    HF_TOKEN = userdata.get('HF_TOKEN')
    login(token=HF_TOKEN, add_to_git_credential=False)
    print("Hugging Face login successful!")
except (ImportError, KeyError):
    print("HF_TOKEN secret not found or not in Colab. Ensure you are logged in.")

# --- 4. Mount Google Drive ---
try:
    from google.colab import drive
    drive.mount('/content/drive')
    SAVE_DIR = "/content/drive/MyDrive/aura_education_v7_final" # New save dir
    print(f"Google Drive mounted. Will save/load brain from: {SAVE_DIR}")
except ImportError:
    print("Not in Colab. Saving to local directory './aura_education_v7_final'")
    SAVE_DIR = "./aura_education_v7_final"

os.makedirs(SAVE_DIR, exist_ok=True)

[0mCollecting unsloth[colab-new]
  Downloading unsloth-2025.11.3-py3-none-any.whl.metadata (61 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m61.8/61.8 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting unsloth_zoo>=2025.11.4 (from unsloth[colab-new])
  Downloading unsloth_zoo-2025.11.4-py3-none-any.whl.metadata (32 kB)
Collecting tyro (from unsloth[colab-new])
  Downloading tyro-0.9.35-py3-none-any.whl.metadata (12 kB)
Collecting xformers>=0.0.27.post2 (from unsloth[colab-new])
  Downloading xformers-0.0.33.post1-cp39-abi3-manylinux_2_28_x86_64.whl.metadata (1.2 kB)
Collecting bitsandbytes!=0.46.0,!=0.48.0,>=0.45.5 (from unsloth[colab-new])
  Downloading bitsandbytes-0.48.2-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting datasets!=4.0.*,!=4.1.0,<4.4.0,>=3.4.1 (from unsloth[colab-new])
  Downloading datasets-4.3.0-py3-none-any.whl.metadata (18 kB)
Collecting 

# Core Helpers and SNN Components

In [None]:
import numpy as np
from typing import List, Dict, Optional, Union, Any, Tuple, Literal
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
import threading
from scipy.signal import hilbert
from scipy.linalg import expm

# --- 1. Helper Functions (Math) ---
def softplus(z: np.ndarray) -> np.ndarray:
    return np.log1p(np.exp(-np.abs(z))) + np.maximum(z, 0)
def tanh(z: np.ndarray) -> np.ndarray:
    return np.tanh(z)
def softmax(z: np.ndarray, temp: float = 1.0) -> np.ndarray:
    z = z / max(1e-8, temp); z = z - np.max(z); ez = np.exp(z)
    s = ez.sum(); return ez / (s + 1e-12)

# --- 2. EnergyMeter ---
@dataclass
class EnergyMeter:
    e_mac_j: float = 3e-12; total_j: float = 0.0
    def add_macs(self, nmacs: int): self.total_j += self.e_mac_j * float(nmacs)

# --- 3. Memory Pool (from memory_pool.py) ---
@dataclass
class PoolStats:
    hits: int = 0; misses: int = 0; total_allocations: int = 0; peak_usage_mb: float = 0.0
class ArrayPool:
    """"""
    def __init__(self, max_pool_size_mb: int = 512):
        self.max_pool_size = max_pool_size_mb * 1024 * 1024
        self.pools: Dict[Tuple[tuple, np.dtype], List[np.ndarray]] = {}
        self.current_usage = 0; self.stats = PoolStats(); self._lock = threading.Lock()
    def get_array(self, shape: tuple, dtype: np.dtype = np.float32, zero_fill: bool = True) -> np.ndarray:
        key = (shape, dtype)
        with self._lock:
            if key in self.pools and self.pools[key]:
                arr = self.pools[key].pop(); self.stats.hits += 1
                if zero_fill: arr.fill(0); return arr
            else:
                arr = np.empty(shape, dtype=dtype);
                if zero_fill: arr.fill(0)
                self.stats.misses += 1; self.stats.total_allocations += 1; return arr
    def return_array(self, arr: np.ndarray) -> None:
        if arr is None: return
        key = (arr.shape, arr.dtype); array_size = arr.nbytes
        with self._lock:
            if self.current_usage + array_size <= self.max_pool_size:
                if key not in self.pools: self.pools[key] = []
                arr.fill(0); self.pools[key].append(arr); self.current_usage += array_size
_global_pool = ArrayPool()
def get_pooled_array(shape: tuple, dtype: np.dtype = np.float32, zero_fill: bool = True) -> np.ndarray:
    return _global_pool.get_array(shape, dtype, zero_fill)
def return_pooled_array(arr: np.ndarray) -> None:
    _global_pool.return_array(arr)

# --- 4. Optimized Whitener ---
class OptimizedWhitener:
    def __init__(self, dim: int, eps: float = 1e-6, momentum: float = 0.01):
        self.dim = dim; self.eps = np.float32(eps); self.momentum = np.float32(momentum)
        self.mu = np.zeros(dim, dtype=np.float32); self.var = np.ones(dim, dtype=np.float32)
    def transform(self, x: np.ndarray) -> np.ndarray:
        if x.dtype != np.float32: x = x.astype(np.float32)
        _temp_diff = get_pooled_array((self.dim,), dtype=np.float32)
        _temp_result = get_pooled_array((self.dim,), dtype=np.float32)
        self.mu *= (1.0 - self.momentum); self.mu += self.momentum * x
        np.subtract(x, self.mu, out=_temp_diff)
        np.multiply(_temp_diff, _temp_diff, out=_temp_result)
        self.var *= (1.0 - self.momentum); self.var += self.momentum * _temp_result
        np.sqrt(self.var + self.eps, out=_temp_result)
        np.divide(_temp_diff, _temp_result, out=_temp_result)
        result_copy = _temp_result.copy()
        return_pooled_array(_temp_diff); return_pooled_array(_temp_result)
        return result_copy
    def state_dict(self) -> Dict: return {"mu": self.mu, "var": self.var}
    def load_state_dict(self, state: Dict): self.mu = state["mu"]; self.var = state["var"]

# --- 5. Hebbian Layer (Oja) ---
@dataclass
class OjaStepOut:
    y: np.ndarray; residual_ema: float; grew: bool
class OjaLayer:
    def __init__(self, dim: int, n_components: int = 8, lr: float = 5e-4, mode: str = "nonlinear",
                 *, max_components: int = 64, lateral_beta: float = 0.05,
                 grow_threshold: float = 0.35, ema: float = 0.01, grow_cooldown: int = 100,
                 seed: Optional[int] = 1337):
        self.rng = np.random.default_rng(seed)
        self.dim = int(dim); self.lr = float(lr); self.mode = str(mode)
        self.max_components = int(max_components); self.beta = float(lateral_beta)
        self.grow_threshold = float(grow_threshold); self.ema = float(ema)
        self.grow_cooldown = int(grow_cooldown); self.cooldown = 0
        W0 = self.rng.normal(0, 0.1, (n_components, self.dim))
        self.W = (W0.T / (np.linalg.norm(W0, axis=1) + 1e-12)).T
        self.K = self.W.shape[0]; self.residual_ema = 0.0; self._steps = 0
        print(f"CREATED: OjaLayer (Hebbian Cortex) with {self.K} components, mode='{self.mode}'")
    def step(self, xw: np.ndarray) -> OjaStepOut:
        x = np.asarray(xw, dtype=np.float64); y = self.W @ x; x_hat = self.W.T @ y
        xn = float(np.dot(x, x) + 1e-12); explained = float(np.dot(x_hat, x_hat) / xn)
        residual = float(1.0 - explained)
        if self.mode == "nonlinear":
            g = y ** 3; dW = (g[:, None] * x[None, :]) - self.W
        else:
            proj = np.zeros_like(x); dW = np.zeros_like(self.W)
            for i in range(self.K):
                proj = proj + y[i] * self.W[i]; dW[i] = y[i] * (x - proj)
        YW = y @ self.W; cross = YW[None, :] - (y[:, None] * self.W)
        dW -= self.beta * (y[:, None] * cross); self.W += self.lr * dW; self._renorm_rows()
        self.residual_ema = (1.0 - self.ema) * self.residual_ema + self.ema * residual
        grew, _ = self._maybe_grow(x, x_hat); self._steps += 1
        return OjaStepOut(y=y, residual_ema=self.residual_ema, grew=grew)
    def _renorm_rows(self): self.W = (self.W.T / (np.linalg.norm(self.W, axis=1) + 1e-12)).T
    def _maybe_grow(self, x: np.ndarray, x_hat: np.ndarray) -> Tuple[bool, Optional[int]]:
        if self.cooldown > 0: self.cooldown -= 1; return False, None
        if self.K >= self.max_components: return False, None
        if self.residual_ema < self.grow_threshold: return False, None
        r = x - x_hat; nr = float(np.linalg.norm(r))
        if nr > 1e-9: w_new = r / nr
        else:
            w_new = self.rng.normal(0, 0.1, size=self.dim)
            w_new /= (np.linalg.norm(w_new) + 1e-12)
        self.W = np.vstack([self.W, w_new]); self.K += 1
        self.cooldown = self.grow_cooldown
        print(f"üß¨ OJA NEUROGENESIS: Residual high. Growing new component. K={self.K}"); return True, self.K - 1
    def state_dict(self) -> Dict: return {"W": self.W, "residual_ema": self.residual_ema, "_steps": self._steps}
    def load_state_dict(self, state: Dict):
        self.W = state["W"]; self.K = state["W"].shape[0]; self.residual_ema = state["residual_ema"]; self._steps = state["_steps"]

# --- 6. LiquidCell (from snn_nlms_moe.py) ---
@dataclass
class LiquidCell:
    in_dim: int; hidden_dim: int; dt: float = 0.02
    tau_min: float = 0.02; tau_max: float = 2.0
    rng: np.random.Generator = field(default_factory=lambda: np.random.default_rng(1337))
    W: np.ndarray = field(init=False); U: np.ndarray = field(init=False)
    b: np.ndarray = field(init=False); V: np.ndarray = field(init=False)
    c: np.ndarray = field(init=False); h: np.ndarray = field(init=False)
    def __post_init__(self):
        self.W = self.rng.normal(0, np.sqrt(2.0/(self.hidden_dim+self.hidden_dim)), (self.hidden_dim, self.hidden_dim))
        self.U = self.rng.normal(0, np.sqrt(2.0/(self.in_dim+self.hidden_dim)), (self.hidden_dim, self.in_dim))
        self.b = np.zeros((self.hidden_dim,), dtype=np.float64)
        self.V = self.rng.normal(0, np.sqrt(2.0/(self.in_dim+self.hidden_dim)), (self.hidden_dim, self.in_dim))
        self.c = self.rng.normal(0, 0.1, (self.hidden_dim,))
        self.h = np.zeros((self.hidden_dim,), dtype=np.float64)
    def reset(self): self.h[:] = 0.0
    def step(self, x: np.ndarray, energy: Optional[EnergyMeter] = None) -> np.ndarray:
        x = np.asarray(x, dtype=np.float64).reshape(-1)
        vx = self.V @ x + self.c; tau = self.tau_min + softplus(vx)
        tau = np.minimum(tau, self.tau_max)
        Wh = self.W @ self.h; Ux = self.U @ x
        a = tanh(Wh + Ux + self.b); dh = - self.h / np.maximum(tau, 1e-6) + a
        self.h = self.h + self.dt * dh
        if energy is not None: energy.add_macs((self.hidden_dim*self.hidden_dim) + (self.hidden_dim*self.in_dim))
        return self.h.copy()
    def state_dict(self) -> Dict: return {"W": self.W, "U": self.U, "b": self.b, "V": self.V, "c": self.c, "h": self.h}
    def load_state_dict(self, state: Dict):
        self.W = state["W"]; self.U = state["U"]; self.b = state["b"]
        self.V = state["V"]; self.c = state["c"]; self.h = state["h"]

# --- 7. TemporalInterpolator ---
class TemporalMemoryInterpolator:
    def __init__(self, epsilon: float = 1e-12):
        self.epsilon = epsilon
        print("CREATED: TemporalMemoryInterpolator (Aura 7.0 Hippocampus)")
    def interpolate(self, M0: np.ndarray, M1: np.ndarray, t: float,
                    mode: Literal['linear', 'fourier', 'hilbert', 'hamiltonian'] = 'hilbert'
                   ) -> np.ndarray:
        alpha = np.clip(t, 0.0, 1.0)
        if mode == 'linear': return (1.0 - alpha) * M0 + alpha * M1
        elif mode == 'fourier':
            F0 = np.fft.fft(M0); F1 = np.fft.fft(M1)
            F_interp = (1.0 - alpha) * F0 + alpha * F1
            return np.real(np.fft.ifft(F_interp))
        A0 = hilbert(M0, axis=0); A1 = hilbert(M1, axis=0)
        if mode == 'hilbert':
            A_interp = (1.0 - alpha) * A0 + alpha * A1
            return np.real(A_interp)
        elif mode == 'hamiltonian':
            print("WARNING: Hamiltonian mode is computationally expensive.")
            A_diff = (A1 - A0).astype(np.complex128)
            H_num = np.outer(A_diff, A_diff.T.conj())
            H_den = np.linalg.norm(A_diff)**2 + self.epsilon
            H = H_num / H_den; U = expm(-1j * H * alpha)
            A_interp = U @ A0; return np.real(A_interp)
        else: raise ValueError(f"Unknown interpolation mode: {mode}")

print("‚úÖ Cell 2: Core Brain Components & Helpers defined.")

# Ears and Mouth (I/O Components)

In [None]:
import numpy as np
import torch
import asyncio
from transformers import AutoTokenizer, logging
from sentence_transformers import SentenceTransformer
from unsloth import FastLanguageModel
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
import os

# --- 3. GoEmotions Labels (The Curriculum) ---
GOEMOTIONS_LABELS = [
    'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring',
    'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval',
    'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief',
    'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization',
    'relief', 'remorse', 'sadness', 'surprise', 'neutral'
]
GOEMOTIONS_MAP = {label: idx for idx, label in enumerate(GOEMOTIONS_LABELS)}

# --- 4. "Ears": FeatureGenerator (GoEmotions-Aware) ---
class FeatureGenerator:
    """Based on clean_amygdala_trainer.py"""
    def __init__(self, sbert_model_name: str = "all-MiniLM-L6-v2"):
        self.SBERT_DIM = 384; self.SINE_LENGTH = 32; self.EXTRA_FEATURES = 3
        self.TOTAL_FEATURES = self.SBERT_DIM + self.SINE_LENGTH + self.EXTRA_FEATURES # 419
        print(f"CREATED: FeatureGenerator (Aura 7.0 Ears), Features: {self.TOTAL_FEATURES}")
        self.sbert_model = SentenceTransformer(sbert_model_name, device="cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = self.sbert_model.tokenizer
        self.vocab_size = self.tokenizer.vocab_size
        self.label_params = self._generate_default_params(GOEMOTIONS_LABELS)
        self.whitener = OptimizedWhitener(dim=self.TOTAL_FEATURES)

    def _generate_default_params(self, labels: List[str]) -> Dict[str, Dict]:
        params = {}
        for idx, label in enumerate(labels):
            params[label] = {"freq": 1.5 + 0.1 * idx, "amp": 0.7, "phase": 0.5 + 0.2 * idx}
        return params

    def build_features(self, record: Dict[str, Any], sbert_vec: np.ndarray) -> np.ndarray:
        prim = record.get("plutchik", {}).get("primary", "neutral")
        cfg = self.label_params.get(prim, {"freq": 1.0, "amp": 0.0, "phase": 0.0})
        t = np.linspace(0, 2*np.pi, self.SINE_LENGTH, dtype=np.float32)
        emb = (cfg["amp"] * np.sin(cfg["freq"] * t + cfg["phase"])).astype(np.float32)
        text = record.get("text", "")
        extras = np.array([len(text) / 100.0, int("!" in text), int("?" in text)], dtype=np.float32)
        return np.concatenate([emb, extras, sbert_vec]).astype(np.float32)

    def generate_for_query(self, query: str) -> (np.ndarray, List[int]):
        record = {"text": query, "plutchik": {"primary": "neutral"}}
        sbert_vec = self.sbert_model.encode(query, normalize_embeddings=True)
        x_raw = self.build_features(record, sbert_vec)
        x_whitened = self.whitener.transform(x_raw)
        token_ids = self.tokenizer.encode(query, add_special_tokens=False)
        return x_whitened, token_ids

    def state_dict(self) -> Dict: return self.whitener.state_dict()
    def load_state_dict(self, state: Dict): self.whitener.load_state_dict(state)

# --- 5. "Mouth": ResponseGenerator (Unsloth, Async) ---
class ResponseGenerator:
    def __init__(self, model_name: str = "meta-llama/Llama-3.2-3B-Instruct"):
        print(f"CREATED: ResponseGenerator (Mouth) using Unsloth on '{model_name}'")
        max_seq_length = 2048; dtype = None; load_in_4bit = True
        unsloth_model_name = "unsloth/Llama-3.2-3B-Instruct-bnb-4bit"
        print(f"   -> Using optimized Unsloth 4-bit model: {unsloth_model_name}")
        self.model, self.tokenizer = FastLanguageModel.from_pretrained(
            model_name = unsloth_model_name, max_seq_length = max_seq_length,
            dtype = dtype, load_in_4bit = load_in_4bit,
        )
        if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token
        print("   -> Unsloth Llama 3.2 model loaded successfully.")
    def _build_prompt(self, user_query: str, brain_state: dict) -> str:
        cns_state = brain_state.get('cns_state', 'ALERT').name
        stress = brain_state.get('stress_level', 0.0)
        persona = f"You are Aura, a bio-neural AI. Your current internal state is {cns_state}."
        if cns_state == 'HYPERVIGILANT' or stress > 1.0:
            persona += f" You are feeling a high-stress level ({stress:.2f})."
        else: persona += " You are calm and helpful."
        messages = [{"role": "system", "content": persona}, {"role": "user", "content": user_query}]
        return self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    async def generate_response(self, user_query: str, brain_state: dict) -> str:
        print(f"\n--- üëÑ ResponseGenerator (Unsloth) ---"); print(f"Generating response. Brain state: {brain_state}")
        prompt = self._build_prompt(user_query, brain_state)
        inputs = self.tokenizer([prompt], return_tensors="pt", padding=True, truncation=True, max_length=1024).to("cuda")
        terminators = [self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>")]
        outputs = await asyncio.to_thread(
            self.model.generate, **inputs, max_new_tokens=150, eos_token_id=terminators,
            do_sample=True, temperature=0.7, top_p=0.9,
        )
        response_text = self.tokenizer.batch_decode(outputs[:, inputs['input_ids'].shape[-1]:], skip_special_tokens=True)[0]
        print(f"Llama 3.2 (Unsloth) Output: {response_text}"); return response_text

print("‚úÖ Cell 3: 'Ears' (FeatureGenerator) and 'Mouth' (ResponseGenerator) defined.")

# Experts & Cortex (Learning & Routing)

In [None]:
import numpy as np
import torch
import asyncio
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
import os

# --- 8. "Expert Neuron": ExpertNLMSHead ---
class ExpertNLMSHead:
    def __init__(self, n_features: int, vocab_size: int,
                 attention_config: Dict, mu: float = 0.1, epsilon: float = 1e-6):
        self.n_features = n_features
        self.w = np.zeros(self.n_features, dtype=np.float64)
        self.mu = mu; self.epsilon = epsilon; self.vocab_size = vocab_size
        self.spiking_attention = SpikingAttention(**attention_config)
        self.last_error = 0.0; self.last_output = 0.0
        self._lock = asyncio.Lock()
    def predict(self, x: np.ndarray) -> float:
        return float(np.dot(self.w, x))
    async def update(self, x: np.ndarray, y_true: float, token_ids: List[int]) -> float:
        async with self._lock:
            y_hat = self.predict(x); self.last_output = y_hat
            error = y_true - y_hat; self.last_error = error
            attention_gains = self.spiking_attention.compute_gains(token_ids, self.vocab_size)
            avg_gain = 1.0
            if attention_gains is not None and token_ids:
                gains_for_seq = [attention_gains[token] for token in token_ids if 0 <= token < self.vocab_size]
                if gains_for_seq: avg_gain = np.mean(gains_for_seq)
            modulated_mu = self.mu * avg_gain
            x_norm_sq = np.dot(x, x) + self.epsilon
            step = (modulated_mu * error * x) / x_norm_sq
            self.w += step; return y_hat
    def state_dict(self) -> Dict: return {"w": self.w}
    def load_state_dict(self, state: Dict): self.w = state["w"]

# --- 9. Liquid/MoE Components ---
@dataclass
class LiquidGatingNetwork:
    in_dim: int; hidden_dim: int; n_experts: int
    top_k: int = 2; temperature: float = 1.0
    usage_smoothing: float = 0.99; bias_lr: float = 0.01
    usage_beta: float = 0.5
    cell: LiquidCell = field(init=False); Wg: np.ndarray = field(init=False)
    bg: np.ndarray = field(init=False); usage_ma: np.ndarray = field(init=False)
    rng: np.random.Generator = field(default_factory=lambda: np.random.default_rng(2025))
    energy: EnergyMeter = field(default_factory=EnergyMeter)
    def __post_init__(self):
        self.cell = LiquidCell(self.in_dim, self.hidden_dim, rng=self.rng)
        self.Wg = self.rng.normal(0, np.sqrt(2.0/(self.hidden_dim+self.n_experts)), (self.n_experts, self.hidden_dim))
        self.bg = np.zeros((self.n_experts,), dtype=np.float64)
        self.usage_ma = np.zeros((self.n_experts,), dtype=np.float64)
        self._lock = asyncio.Lock()
    async def forward(self, x: np.ndarray, attn_gain: float = 1.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        async with self._lock:
            h = self.cell.step(x, self.energy)
            logits = (self.Wg @ h) + self.bg
            logits = self._apply_usage_bias(logits)
            temp = max(0.2, self.temperature / max(1e-6, attn_gain))
            probs = softmax(logits, temp=temp)
            k = max(1, min(self.top_k, self.n_experts))
            topk_idx = np.argpartition(probs, -k)[-k:]
            topk_probs = probs[topk_idx]
            if topk_probs.sum() <= 1e-12: gates = np.ones_like(topk_probs) / len(topk_probs)
            else: gates = topk_probs / topk_probs.sum()
            out = np.zeros_like(probs); out[topk_idx] = gates
            eps = 0.01
            if self.n_experts > 0 and eps > 0:
                j = int(np.argmin(self.usage_ma)); out = (1.0 - eps) * out; out[j] += eps
            self.usage_ma = self.usage_smoothing * self.usage_ma + (1.0 - self.usage_smoothing) * out
            return out, topk_idx, probs
    def _apply_usage_bias(self, logits: np.ndarray) -> np.ndarray:
        eps = 1e-6; target = 1.0 / self.n_experts
        inv_usage = target / (self.usage_ma + eps)
        return logits + self.usage_beta * np.log(inv_usage)
    async def apply_endocrine(self, *, cortisol: float = 0.0, **kwargs) -> None:
        async with self._lock: self.temperature = float(np.clip(1.0 * (1.0 + 0.30 * cortisol), 0.5, 2.5))
    async def nudge_for_load_balance(self) -> None:
        async with self._lock:
            if self.n_experts <= 0: return
            target = 1.0 / float(self.n_experts); delta = target - self.usage_ma
            self.bg += self.bias_lr * delta
    def reset(self): self.cell.reset(); self.usage_ma[:] = 0.0; self.energy.reset()
    def state_dict(self) -> Dict:
        return {"cell": self.cell.state_dict(), "Wg": self.Wg, "bg": self.bg, "usage_ma": self.usage_ma}
    def load_state_dict(self, state: Dict):
        self.cell.load_state_dict(state["cell"]); self.Wg = state["Wg"]; self.bg = state["bg"]; self.usage_ma = state["usage_ma"]

class NLMSExpertAdapter:
    def __init__(self, neuron: ExpertNLMSHead): self.neuron = neuron
    def predict(self, x: np.ndarray) -> float: return self.neuron.predict(x)
    async def update(self, x: np.ndarray, y_true: float, token_ids: List[int]) -> float:
        return await self.neuron.update(x, y_true, token_ids)
    def state_dict(self) -> Dict: return self.neuron.state_dict()
    def load_state_dict(self, state: Dict): self.neuron.load_state_dict(state)

@dataclass
class LiquidMoERouter:
    experts: Dict[str, NLMSExpertAdapter]
    in_dim: int; hidden_dim: int; top_k: int = 2
    temperature: float = 1.0
    gating: LiquidGatingNetwork = field(init=False)
    names: List[str] = field(init=False)
    energy: EnergyMeter = field(default_factory=EnergyMeter)
    def __post_init__(self):
        self.names = list(self.experts.keys())
        self.gating = LiquidGatingNetwork(
            in_dim=self.in_dim, hidden_dim=self.hidden_dim,
            n_experts=len(self.names), top_k=self.top_k,
            temperature=self.temperature,
        )
    async def route(self, x: np.ndarray, attn_gain: float = 1.0) -> Dict[str, any]:
        gates_sparse, topk_idx, probs = await self.gating.forward(x, attn_gain=attn_gain)
        chosen = [(self.names[i], float(gates_sparse[i])) for i in topk_idx]
        y = 0.0; per_expert: Dict[str, Dict[str, float]] = {}
        for i in topk_idx:
            name = self.names[int(i)]; gate = float(gates_sparse[i])
            pred = float(self.experts[name].predict(x)); y += gate * pred
            self.energy.add_macs(self.in_dim); per_expert[name] = {"gate": gate, "pred": pred}
        return {'y_hat': float(y), 'topk': chosen, 'probs': probs, 'per_expert': per_expert,
            'energy_j': self.energy.total_j + self.gating.energy.total_j}
    async def learn(self, x: np.ndarray, token_ids: List[int], y_true: float, attn_gain: float = 1.0) -> Dict[str, any]:
        out = await self.route(x, attn_gain=attn_gain); tasks = []
        for name, info in out['per_expert'].items():
            gate = float(info['gate']);
            if gate <= 0.0: continue
            target = float(y_true)
            tasks.append(self.experts[name].update(x, target, token_ids))
        await asyncio.gather(*tasks); await self.gating.nudge_for_load_balance(); return out
    def reset(self): self.gating.reset(); self.energy.reset()
    def state_dict(self) -> Dict:
        expert_states = {name: expert.state_dict() for name, expert in self.experts.items()}
        return {"gating": self.gating.state_dict(), "experts": expert_states}
    def load_state_dict(self, state: Dict):
        self.gating.load_state_dict(state["gating"])
        for name, expert_state in state["experts"].items():
            if name in self.experts: self.experts[name].load_state_dict(expert_state)

print("‚úÖ Cell 4: 'Expert' (NLMSHead) and 'Cortex' (LiquidMoERouter) defined.")

# School, Central Nervous System (CNS) & Thalamus (Control)

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import asyncio
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
import os
from enum import Enum # <-- User requested import
from sklearn.utils.class_weight import compute_class_weight

# --- 11. CNS & ThalamicRouter (Trained) ---
class ConsciousnessLevel(Enum):
    DEEP_SLEEP = 0; ASLEEP = 1; ALERT = 2; FOCUSED = 3; HYPERVIGILANT = 4
class CentralNervousSystem:
    def __init__(self):
        self.consciousness_level = ConsciousnessLevel.ALERT
        self.stress_level = 0.0 # 'cortisol'
        print("CREATED: CentralNervousSystem (CNS)")
    def set_consciousness(self, level: ConsciousnessLevel):
        if self.consciousness_level != level:
            self.consciousness_level = level; print(f"CNS: Consciousness set to {level.name}")
    def update_stress(self, error: float):
        new_stress = abs(error) * 1.5
        self.stress_level = (self.stress_level * 0.5) + (new_stress * 0.5)
        self.stress_level = max(0.0, self.stress_level - 0.1)
        if self.stress_level > 1.0: self.set_consciousness(ConsciousnessLevel.HYPERVIGILANT)
        else: self.set_consciousness(ConsciousnessLevel.ALERT)

class TrainedThalamicRouter:
    """The 'School' - runs the offline-trained PyTorch model"""
    def __init__(self, model_path: str, label_maps: Dict, feature_gen: FeatureGenerator):
        self.label_maps = label_maps
        self.emotion_map = label_maps.get('emotion', {})
        self.intent_map = label_maps.get('intent', {})
        self.emotion_labels_inv = {v: k for k, v in self.emotion_map.items()}
        self.feature_gen = feature_gen; self.model = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        try:
            self.model = MultiTaskHead(
                feature_gen.TOTAL_FEATURES,
                len(self.emotion_map), len(self.intent_map), len(self.emotion_map) # Mock tone
            ).to(self.device)
            self.model.load_state_dict(torch.load(model_path))
            self.model.eval()
            print(f"CREATED: TrainedThalamicRouter (Loaded from {model_path})")
        except Exception as e:
            print(f"--- ‚ùå ERROR: Failed to load ThalamicRouter model. {e} ---")

    def get_target_signals(self, query: str) -> dict:
        if not self.model: return {'emotion': 0.0, 'intent': 0.0}
        record = {"text": query, "plutchik": {"primary": "neutral"}}
        sbert_vec = self.feature_gen.sbert_model.encode(query, normalize_embeddings=True)
        x_raw = self.feature_gen.build_features(record, sbert_vec)
        x_tensor = torch.tensor(x_raw, dtype=torch.float32).unsqueeze(0).to(self.device)
        with torch.no_grad():
            emo_logits, intent_logits, _ = self.model(x_tensor)
            emo_pred_index = emo_logits.argmax(dim=1).item()
            intent_pred_index = intent_logits.argmax(dim=1).item()
        pred_label = self.emotion_labels_inv.get(emo_pred_index, 'neutral')
        signals = {'emotion': float(emo_pred_index), 'intent': float(intent_pred_index)}
        print(f"Thalamus Predicted: '{pred_label}' (Index: {emo_pred_index})")
        return signals

# --- 12. PyTorch Models (for the "School") ---
class LinearTorchModel(nn.Module):
    """"""
    def __init__(self, input_dim: int, num_classes: int):
        super().__init__(); self.fc = nn.Linear(input_dim, num_classes)
    def forward(self, x): return self.fc(x)

class MultiTaskHead(nn.Module):
    """"""
    def __init__(self, input_dim: int, num_emotions: int, num_intents: int, num_tones: int):
        super().__init__(); self.shared = nn.Sequential(nn.Linear(input_dim, 256), nn.ReLU(), nn.Dropout(0.1))
        self.emotion_head = nn.Linear(256, num_emotions)
        self.intent_head = nn.Linear(256, num_intents)
        self.tone_head = nn.Linear(256, num_tones)
    def forward(self, x):
        h = self.shared(x); return self.emotion_head(h), self.intent_head(h), self.tone_head(h)

def multitask_loss(emotion_out, emotion_y, intent_out, intent_y, tone_out, tone_y,
                  weights=(1.0, 0.7, 0.7)):
    """"""
    ce = nn.CrossEntropyLoss(); return (weights[0] * ce(emotion_out, emotion_y) +
            weights[1] * ce(intent_out, intent_y) + weights[2] * ce(tone_out, tone_y))

print("‚úÖ Cell 5: 'CNS', 'Thalamus', and 'School' (PyTorch) models defined.")

# The IntegratedBioNeuralNetwork v 7.0

In [None]:
import numpy as np
import torch
import asyncio
import os
import glob
import time
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from numpy import ndarray
from datasets import load_dataset

# --- 14. The Final IBNN (Aura 7.0 - Hebbian-MoE-Temporal) ---
class IntegratedBioNeuralNetwork:
    """
    Aura 7.0: Integrates all components into a persistent,
    unsupervised-learning, and time-aware architecture.
    """
    def __init__(self, n_experts: int = 10, hebbian_components: int = 64,
                 llm_model_name: str = "meta-llama/Llama-3.2-3B-Instruct"):
        print("--- üß† Initializing Aura 7.0 (Hebbian-MoE-Temporal) ---")
        self.feature_gen = FeatureGenerator()
        self.response_gen = ResponseGenerator(llm_model_name)
        self.cns = CentralNervousSystem()
        self.interpolator = TemporalMemoryInterpolator()
        self.raw_feature_dim = self.feature_gen.TOTAL_FEATURES
        self.vocab_size = self.feature_gen.vocab_size
        self.abstract_feature_dim = hebbian_components

        self.whitener: OptimizedWhitener = None
        self.hippocampus: OjaLayer = None # Hebbian Cortex
        self.cortex: LiquidMoERouter = None # MoE Cortex
        self.router: Optional[TrainedThalamicRouter] = None
        self.n_experts = n_experts

        self.education_dir = SAVE_DIR # Use the global SAVE_DIR
        os.makedirs(self.education_dir, exist_ok=True)
        print("--- Aura 7.0 Brain Initialized (Awaiting Education) ---")

    def _create_brain_layers(self, hebbian_k: int):
        """Dynamically creates the brain layers based on the Hebbian cortex size."""
        self.abstract_feature_dim = hebbian_k
        print(f"--- üß† Building Dynamic Brain Layers ---")
        print(f"   -> Hebbian Cortex (Oja) K = {hebbian_k}")

        self.hippocampus = OjaLayer(
            dim=self.raw_feature_dim,
            n_components=hebbian_k,
            mode='nonlinear', lr=5e-4, max_components=128, grow_threshold=0.3
        )
        self.whitener = OptimizedWhitener(dim=self.raw_feature_dim)

        experts: Dict[str, NLMSExpertAdapter] = {}
        attention_config = {'decay': 0.7, 'k_winners': 3, 'gain_up': 1.5, 'gain_down': 0.7}
        for i in range(self.n_experts):
            name = f"expert__{i}"
            head = ExpertNLMSHead(
                n_features=self.abstract_feature_dim, # Use K
                vocab_size=self.vocab_size,
                attention_config=attention_config, mu=0.1
            )
            experts[name] = NLMSExpertAdapter(neuron=head)

        self.cortex = LiquidMoERouter(
            experts=experts, in_dim=self.abstract_feature_dim, # Use K
            hidden_dim=128, top_k=3
        )
        print(f"--- üß† Dynamic Brain Layers Created ---")

    async def educate_brain(self):
        """
        Runs the 'School' pipeline to pre-train all components.

        """
        print("\n--- üéì STARTING OFFLINE EDUCATION (GoEmotions) ---")
        device = "cuda" if torch.cuda.is_available() else "cpu"

        # 1. Load GoEmotions
        print("Loading 'google-research-datasets/go_emotions'...")
        dataset = load_dataset("google-research-datasets/go_emotions", split='train')
        dataset = dataset.shuffle(seed=42).select(range(15000)) # Use 15000 samples
        all_texts = dataset['text']

        # 2. Precompute SBERT
        print(f"Pre-computing SBERT embeddings for {len(all_texts)} texts...")
        sbert_embeddings = await asyncio.to_thread(
            self.feature_gen.sbert_model.encode,
            all_texts, normalize_embeddings=True, batch_size=64
        )

        # 3. Create Label Maps
        emotion_labels = GOEMOTIONS_LABELS
        intent_labels = ["question", "statement", "exclamation", "request", "none"]
        tone_labels = emotion_labels # Mock
        label_maps = {
            'emotion': {label: idx for idx, label in enumerate(emotion_labels)},
            'intent': {label: idx for idx, label in enumerate(intent_labels)},
            'tone': {label: idx for idx, label in enumerate(tone_labels)},
        }

        # 4. Build Features
        print("Building features...")
        X_features, y_emotion, y_intent, y_tone = [], [], [], []
        for i, record in enumerate(dataset):
            primary_emotion = 'neutral'
            label_indices = record['labels']
            if label_indices:
                for idx in label_indices:
                    if 0 <= idx < len(GOEMOTIONS_LABELS):
                        label_name = GOEMOTIONS_LABELS[idx]
                        if label_name != 'neutral':
                            primary_emotion = label_name; break

            mock_record = {
                "text": record['text'],
                "plutchik": {"primary": primary_emotion, "intensity": 1.0},
                "intent": "question" if "?" in record['text'] else "statement",
                "tone": primary_emotion
            }
            X_features.append(self.feature_gen.build_features(mock_record, sbert_embeddings[i]))
            y_emotion.append(label_maps['emotion'].get(primary_emotion, GOEMOTIONS_MAP['neutral']))
            y_intent.append(label_maps['intent'].get(mock_record['intent'], 1))
            y_tone.append(label_maps['tone'].get(primary_emotion, GOEMOTIONS_MAP['neutral']))

        X_train_np = np.stack(X_features)
        y_emotion_np = np.array(y_emotion)

        indices = np.arange(len(X_train_np))
        X_train_split, X_test_split, y_emotion_train_split, y_emotion_test_split, \
        y_intent_train_split, y_intent_test_split, y_tone_train_split, y_tone_test_split = \
            train_test_split(X_train_np, y_emotion_np, y_intent, y_tone, test_size=0.2, random_state=42, stratify=y_emotion_np)

        X_train_torch = torch.tensor(X_train_split, dtype=torch.float32).to(device)
        y_emotion_train_torch = torch.tensor(y_emotion_train_split, dtype=torch.long).to(device)
        y_intent_train_torch = torch.tensor(y_intent_train_split, dtype=torch.long).to(device)
        y_tone_train_torch = torch.tensor(y_tone_train_split, dtype=torch.long).to(device)

        X_test_torch = torch.tensor(X_test_split, dtype=torch.float32).to(device)
        y_emotion_test_torch = torch.tensor(y_emotion_test_split, dtype=torch.long).to(device)

        # --- 5. Train the "Hebbian Cortex" (OjaLayer) ---
        print("Training Hebbian Cortex (OjaLayer) on whitened data...")
        self._create_brain_layers(hebbian_k=self.abstract_feature_dim)

        X_features_whitened = np.array([self.feature_gen.whitener.transform(x) for x in X_train_np])
        for x_w in X_features_whitened:
             self.hippocampus.step(x_w)

        final_k = self.hippocampus.K
        if final_k != self.abstract_feature_dim:
            print(f"Hebbian Cortex grew from {self.abstract_feature_dim} to {final_k}. Rebuilding MoE...")
            self._create_brain_layers(hebbian_k=final_k)

        print(f"   -> Hebbian Cortex (OjaLayer) training complete. Final K={final_k}")

        # --- 6. Train the "Thalamus" (MultiTaskHead) ---
        print(f"Training MultiTask ThalamicRouter...")
        class_weights_emotion = compute_class_weight('balanced', classes=np.unique(y_emotion_train_split), y=y_emotion_train_split)
        class_weights_emotion = torch.tensor(class_weights_emotion, dtype=torch.float32).to(device)

        thalamus_model = MultiTaskHead(
            self.feature_gen.TOTAL_FEATURES,
            len(label_maps['emotion']), len(label_maps['intent']), len(label_maps['tone'])
        ).to(device)

        optimizer = optim.AdamW(thalamus_model.parameters(), lr=5e-3)
        criterion_emotion = nn.CrossEntropyLoss(weight=class_weights_emotion)
        criterion_intent = nn.CrossEntropyLoss()
        criterion_tone = nn.CrossEntropyLoss(weight=class_weights_emotion)

        for epoch in range(30): # More epochs
            thalamus_model.train()
            optimizer.zero_grad()
            emo_out, intent_out, tone_out = thalamus_model(X_train_torch)
            loss = (1.0 * criterion_emotion(emo_out, y_emotion_train_torch) +
                    0.5 * criterion_intent(intent_out, y_intent_train_torch) +
                    0.5 * criterion_tone(tone_out, y_tone_train_torch))
            loss.backward(); optimizer.step()

        thalamus_model.eval()
        with torch.no_grad():
            emo_preds_test, _, _ = thalamus_model(X_test_torch)
            emo_preds_test = emo_preds_test.argmax(dim=1)
            test_acc = accuracy_score(y_emotion_test_torch.cpu(), emo_preds_test.cpu())
            print(f"   -> Thalamus (Emotion) Test Accuracy: {test_acc:.4f}")

        thalamus_path = os.path.join(self.education_dir, "thalamic_router_multitask.pt")
        torch.save(thalamus_model.state_dict(), thalamus_path)
        print(f"   -> ThalamicRouter trained and saved to {thalamus_path}")

        # --- 7. "Mind Upload" - Initialize Router ---
        self.router = TrainedThalamicRouter(thalamus_path, label_maps, self.feature_gen)
        print("--- üéì OFFLINE EDUCATION COMPLETE ---")

    async def process_query(self, query: str) -> str:
        print(f"\n--- ‚òÄÔ∏è PROCESSING QUERY (Aura 7.0): '{query}' ---")
        if not self.router: return "I am uneducated. Please run `await aura.educate_brain()`."

        cns_level = self.cns.consciousness_level
        x_raw, token_ids = self.feature_gen.generate_for_query(query)
        xw = self.feature_gen.whitener.transform(x_raw)
        oja_out = self.hippocampus.step(xw)
        y_abstract = oja_out.y

        target_signals = self.router.get_target_signals(query)
        y_true_emotion_idx = target_signals['emotion']
        target_for_moe = float(y_true_emotion_idx)

        out = await self.cortex.learn(
            x=y_abstract, token_ids=token_ids, y_true=target_for_moe
        )

        last_prediction = out['y_hat']
        prediction_error = target_for_moe - last_prediction
        self.cns.update_stress(prediction_error)

        print(f"Hebbian (Oja) Residual: {oja_out.residual_ema:.3f} (Grew: {oja_out.grew})")
        print(f"MoE Prediction: {last_prediction:.2f}, Error (Stress): {abs(prediction_error):.2f}")

        final_brain_state = {
            'cns_state': self.cns.consciousness_level,
            'stress_level': self.cns.stress_level,
        }
        response = await self.response_gen.generate_response(query, final_brain_state)
        self.last_run_final_stress = self.cns.stress_level
        return response

    def _get_brain_state_vector(self) -> np.ndarray:
        all_weights = []
        all_weights.append(self.feature_gen.whitener.mu.flatten())
        all_weights.append(self.feature_gen.whitener.var.flatten())
        all_weights.append(self.hippocampus.W.flatten())
        all_weights.append(self.cortex.gating.cell.W.flatten())
        all_weights.append(self.cortex.gating.cell.U.flatten())
        all_weights.append(self.cortex.gating.cell.b.flatten())
        all_weights.append(self.cortex.gating.cell.V.flatten())
        all_weights.append(self.cortex.gating.cell.c.flatten())
        all_weights.append(self.cortex.gating.Wg.flatten())
        all_weights.append(self.cortex.gating.bg.flatten())
        for expert in self.cortex.experts.values():
            all_weights.append(expert.neuron.w.flatten())
        return np.concatenate(all_weights)

    def _load_brain_state_vector(self, M: np.ndarray):
        print("--- üß† Loading brain state from vector... ---")
        idx = 0
        def _load(target_array: np.ndarray) -> int:
            nonlocal idx
            n = target_array.size
            try:
                target_array[:] = M[idx : idx + n].reshape(target_array.shape)
            except ValueError:
                print(f"!! Shape mismatch during load. Target: {target_array.shape}, Got: {M[idx : idx + n].shape}")
                min_n = min(target_array.size, M[idx:].size)
                if min_n == 0: return idx
                target_flat = target_array.flat; source_flat = M[idx:].flat
                for i in range(min_n): target_flat[i] = source_flat[i]
                n = min_n
            return idx + n
        try:
            idx = _load(self.feature_gen.whitener.mu)
            idx = _load(self.feature_gen.whitener.var)
            idx = _load(self.hippocampus.W)
            idx = _load(self.cortex.gating.cell.W)
            idx = _load(self.cortex.gating.cell.U)
            idx = _load(self.cortex.gating.cell.b)
            idx = _load(self.cortex.gating.cell.V)
            idx = _load(self.cortex.gating.cell.c)
            idx = _load(self.cortex.gating.Wg)
            idx = _load(self.cortex.gating.bg)
            for expert in self.cortex.experts.values():
                idx = _load(expert.neuron.w)
            print(f"--- üß† Brain state loaded successfully. Total params: {idx} ---")
        except Exception as e:
            print(f"--- ‚ùå ERROR: Failed to load brain state vector. {e} ---")

    async def save_keyframe(self, name: str):
        path = os.path.join(self.education_dir, f"keyframe_{name}_{int(time.time())}.npy")
        print(f"\n--- üíæ Saving keyframe to {path} ---")
        M = self._get_brain_state_vector()
        await asyncio.to_thread(np.save, path, M)
        print("--- üíæ Keyframe save complete ---")

    async def run_sleep_cycle(self, keyframe1_name: str, keyframe2_name: str):
        """"""
        print(f"\n--- üí§ RUNNING SLEEP CYCLE (Dreaming) ---")
        print(f"Loading keyframes: {keyframe1_name} and {keyframe2_name}")
        try:
            M0_path = sorted(glob.glob(os.path.join(self.education_dir, f"keyframe_{keyframe1_name}*.npy")))[-1]
            M1_path = sorted(glob.glob(os.path.join(self.education_dir, f"keyframe_{keyframe2_name}*.npy")))[-1]
            M0 = np.load(M0_path)
            M1 = np.load(M1_path)
        except Exception as e:
            print(f"--- ‚ùå ERROR: Could not load keyframes for dreaming. {e} ---"); return

        current_brain_shape = self._get_brain_state_vector().shape
        if M0.shape != M1.shape or M0.shape != current_brain_shape:
            print(f"--- ‚ùå ERROR: Keyframe dimension mismatch. Cannot interpolate. ---")
            print(f"M0: {M0.shape}, M1: {M1.shape}, Current: {current_brain_shape}"); return

        print("...Interpolating dream state using Hilbert (phase-aware) interpolation...")
        M_dream = self.interpolator.interpolate(M0, M1, t=0.5, mode='hilbert')
        self._load_brain_state_vector(M_dream)

        print("...Consolidating dream state...")
        await self.process_query("... (dreaming) ...")
        print("--- üí§ SLEEP CYCLE COMPLETE ---")

# --- 15. PyTorch Models (for the "School") ---
class LinearTorchModel(nn.Module):
    """"""
    def __init__(self, input_dim: int, num_classes: int):
        super().__init__(); self.fc = nn.Linear(input_dim, num_classes)
    def forward(self, x): return self.fc(x)

class MultiTaskHead(nn.Module):
    """"""
    def __init__(self, input_dim: int, num_emotions: int, num_intents: int, num_tones: int):
        super().__init__(); self.shared = nn.Sequential(nn.Linear(input_dim, 256), nn.ReLU(), nn.Dropout(0.1))
        self.emotion_head = nn.Linear(256, num_emotions)
        self.intent_head = nn.Linear(256, num_intents)
        self.tone_head = nn.Linear(256, num_tones)
    def forward(self, x):
        h = self.shared(x); return self.emotion_head(h), self.intent_head(h), self.tone_head(h)

def multitask_loss(emotion_out, emotion_y, intent_out, intent_y, tone_out, tone_y,
                  weights=(1.0, 0.7, 0.7)):
    """"""
    ce = nn.CrossEntropyLoss(); return (weights[0] * ce(emotion_out, emotion_y) +
            weights[1] * ce(intent_out, intent_y) + weights[2] * ce(tone_out, tone_y))

print("‚úÖ Cell 6: `IntegratedBioNeuralNetwork` (Aura 7.0) defined.")

# TEST v7.0 (AURA v1)


In [None]:
# --- This is the test cell for Aura 7.0 ---
# It must be run in a cell *after* Cell 6

# --- 1. Define Global Save Paths ---
SAVE_DIR = "/content/drive/MyDrive/aura_education_v7_final"
os.makedirs(SAVE_DIR, exist_ok=True)
THALAMUS_PATH = os.path.join(SAVE_DIR, "thalamic_router_multitask.pt")
LABEL_MAPS_PATH = os.path.join(SAVE_DIR, "aura_7_label_maps.json")

# --- 2. The Main Async Test Loop ---
async def main_test_loop():

    # --- 3. INITIALIZE AND EDUCATE ---
    print("--- üß† Initializing Aura 7.0 (Loading LLMs with Unsloth...) ---")
    aura = IntegratedBioNeuralNetwork(
        n_experts=10,
        hebbian_components=64, # Initial size
        llm_model_name="meta-llama/Llama-3.2-3B-Instruct"
    )
    print("--- Aura LLM Components Loaded ---")

    # Run the "School"
    await aura.educate_brain() # This will train on go_emotions

    # Save the label maps
    label_maps = aura.router.label_maps
    with open(LABEL_MAPS_PATH, 'w') as f:
        json.dump(label_maps, f)

    # --- 4. TEST 1: (PRE-TRAINED, "happy") ---
    print("\n\n--- üó£Ô∏è TEST 1: Educated Brain, 'Happy' Query ---")
    response_1 = await aura.process_query("I am so happy and full of joy!")
    print(f"\nFINAL AURA RESPONSE (Happy): {response_1}")
    await aura.save_keyframe("happy") # Save this brain state
    day_1_stress = aura.last_run_final_stress

    # --- 5. TEST 2: (PRE-TRAINED, "scared") ---
    print("\n\n--- üó£Ô∏è TEST 2: Educated Brain, 'Scared' Query ---")
    response_2 = await aura.process_query("I am feeling very scared")
    print(f"\nFINAL AURA RESPONSE (Scared): {response_2}")
    await aura.save_keyframe("scared") # Save this brain state
    day_2_stress = aura.last_run_final_stress

    # --- 6. "DREAM" CYCLE ---
    await aura.run_sleep_cycle(keyframe1_name="happy", keyframe2_name="scared")

    # --- 7. TEST 3: (POST-DREAM, "scared") ---
    print("\n\n--- üó£Ô∏è TEST 3: Post-Dream Brain, 'Scared' Query (Day 3) ---")
    response_3 = await aura.process_query("I am feeling very scared")
    print(f"\nFINAL AURA RESPONSE (Scared, Day 3): {response_3}")
    day_3_stress = aura.last_run_final_stress

    # --- 8. VERIFICATION ---
    print("\n\n--- üî¨ TEMPORAL LEARNING VERIFICATION ---")
    print(f"Day 2 Final Stress (on 'scared'): {day_2_stress:.4f}")
    print(f"Day 3 Final Stress (on 'scared'): {day_3_stress:.4f}")

    if day_3_stress < (day_2_stress * 0.95): # Look for any reduction
        print("‚úÖ SUCCESS: The 'dream' cycle helped process the fear! Stress is lower.")
    else:
        print("‚ùå NEUTRAL/FAILURE: The dream cycle did not reduce stress.")

# --- Run the async main loop ---
nest_asyncio.apply()

asyncio.run(main_test_loop())