In [None]:
# lhra_core/agent.py
# ==============================================================
# Versão: v5.4.3.7_plus_final_enhanced3_full_multiEnv_genePool
# Autor:  OpenAI Assistant (chatGPT)
# ==============================================================
import os
import time
import json
import math
import copy
import queue
import random
import pathlib
import typing as tp
from collections import deque, defaultdict
from lhra_core.kernel_manager import KernelManager
from lhra_core.focus_manager  import FocusManager
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.tensorboard import SummaryWriter

import gymnasium as gym

# ----------------------------------------------------------------
# CONFIGURAÇÕES GERAIS
# ----------------------------------------------------------------
DEVICE: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DEFAULT_CONFIG = {
    "env_list": ["CartPole-v1", "MountainCar-v0"],
    "gamma": 0.99,
    "entropy_coef": 0.005,
    "lr": 3e-4,
    "lr_world": 3e-4,
    "lr_min": 1e-5,
    "rollback_ratio": 0.7,
    "rollback_world_loss_threshold": 0.15,
    "rollback_cooldown": 500,
    "auto_expand_threshold": 1.2,
    "prune_threshold": 0.8,
    "cooldown_steps": 500,
    "gene_pool_interval": 1000,
    "gene_evaluation_steps": 600,
    "max_episode_steps": 500,
    "world_loss_history_len": 30,
    "tensorboard_dir": "./runs/lhra",
}

# ----------------------------------------------------------------
# REDE BÁSICA DA POLÍTICA
# ----------------------------------------------------------------
class BasePolicy(nn.Module):
    def __init__(self, obs_dim: int, act_dim: int):
        super().__init__()
        self.fc1 = nn.Linear(obs_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.pi = nn.Linear(128, act_dim)
        self.v  = nn.Linear(128, 1)
        # blocos extras dinâmicos
        self.extra_blocks: nn.ModuleDict = nn.ModuleDict()

    def forward(self, x: torch.Tensor):
        x = F.relu(self.fc1(x))
        for block in self.extra_blocks.values():
            x = block(x)
        x = F.relu(self.fc2(x))
        logits = self.pi(x)
        value  = self.v(x)
        return logits, value.squeeze(-1)

    # ------- APIs dinâmicas ---------
    def add_extra_block(self, name: str, block: nn.Module):
        self.extra_blocks[name] = block.to(DEVICE)

    def remove_extra_block(self, name: str):
        if name in self.extra_blocks:
            self.extra_blocks.pop(name)

# ----------------------------------------------------------------
# META‑CONTROLLER (6 métricas dinamicamente ponderadas)
# ----------------------------------------------------------------
class DynamicWeightAdapter(nn.Module):
    """
    Combine 6 sinais:
    0: chaos_loss
    1: organization_loss
    2: memory_loss
    3: policy_loss
    4: world_loss
    5: stability_score
    """
    def __init__(self):
        super().__init__()
        # inicializa pesos iguais
        self.weights = nn.Parameter(torch.ones(6) / 6)

    def forward(self, losses: torch.Tensor) -> torch.Tensor:
        # softmax p/ manter soma=1
        w = torch.softmax(self.weights, dim=0)
        return (w * losses).sum()

# ----------------------------------------------------------------
# WORLD MODEL (pequeno modelo preditivo)
# ----------------------------------------------------------------
class WorldModel(nn.Module):
    def __init__(self, obs_dim: int, act_dim: int):
        super().__init__()
        self.trunk = nn.Sequential(
            nn.Linear(obs_dim + act_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
        )
        self.delta = nn.Linear(128, obs_dim)  # prever ∆estado
        self.reward = nn.Linear(128, 1)       # prever reward

    def forward(self, obs: torch.Tensor, act_onehot: torch.Tensor):
        x = torch.cat([obs, act_onehot], dim=-1)
        h = self.trunk(x)
        return self.delta(h), self.reward(h).squeeze(-1)

# ----------------------------------------------------------------
# AUTO‑EXPAND + PRUNE MANAGER (hook simplificado)
# ----------------------------------------------------------------
class AutoExpandPruneManager:
    def __init__(self, policy: BasePolicy, cfg: dict):
        self.policy = policy
        self.cfg = cfg
        self.last_change_step = 0
        self.block_id = 0
        self.block_stats: dict[str, float] = {}  # importância

    # ----- importância = média(|grad|) + média(|output|)
    def _compute_block_importance(self, block_name: str, dummy_input: torch.Tensor):
        block = self.policy.extra_blocks[block_name]
        block.zero_grad(set_to_none=True)
        out = block(dummy_input).mean()
        out.backward()
        grad_norm = 0.0
        for p in block.parameters():
            if p.grad is not None:
                grad_norm += p.grad.detach().abs().mean().item()
        out_val = out.detach().abs().item()
        return grad_norm + out_val

    def maybe_expand_or_prune(self, step_idx: int, policy_loss_scalar: float):
        if step_idx - self.last_change_step < self.cfg["cooldown_steps"]:
            return
        # expand?
        if policy_loss_scalar > self.cfg["auto_expand_threshold"]:
            name = f"extra_{self.block_id}"
            self.block_id += 1
            self.policy.add_extra_block(name, nn.Sequential(
                nn.Linear(128, 128),
                nn.ReLU(),
            ))
            print(f"[AutoExpand] Adicionado {name}")
            self.last_change_step = step_idx
        # prune?
        elif policy_loss_scalar < self.cfg["prune_threshold"] and self.policy.extra_blocks:
            dummy = torch.randn(1, 128, device=DEVICE)
            # avalia importância de cada bloco
            for bn in self.policy.extra_blocks.keys():
                self.block_stats[bn] = self._compute_block_importance(bn, dummy)
            least = min(self.block_stats.items(), key=lambda kv: kv[1])[0]
            self.policy.remove_extra_block(least)
            print(f"[AutoPrune] Removido bloco {least}")
            self.last_change_step = step_idx

# ----------------------------------------------------------------
# GENE POOL (simplificado; offline eval é realizada externamente)
# ----------------------------------------------------------------
class GenePool:
    def __init__(self, cfg: dict):
        self.cfg = cfg
        self.pool: list[dict] = []  # cada gene = dict(state_dict, meta_weights)

    def add_gene(self, policy: BasePolicy, meta_ctrl: DynamicWeightAdapter):
        gene = {
            "policy": copy.deepcopy(policy.state_dict()),
            "meta": meta_ctrl.weights.detach().cpu().clone(),
            "timestamp": time.time(),
        }
        self.pool.append(gene)

    def sample_best_gene(self) -> tp.Optional[dict]:
        # exemplo: retorna o gene mais recente
        return self.pool[-1] if self.pool else None

# ----------------------------------------------------------------
# LHRA AGENT
# ----------------------------------------------------------------
class LHRA_Agent:
    def __init__(self, cfg: dict | None = None) -> None:
        self.cfg = cfg or DEFAULT_CONFIG
        self.envs = [gym.make(name) for name in self.cfg["env_list"]]
        self.env_idx = 0
        self.env = self.envs[self.env_idx]
        obs_dim = np.prod(self.env.observation_space.shape)
        act_dim = self.env.action_space.n

        # Redes
        self.policy = BasePolicy(obs_dim, act_dim).to(DEVICE)
        self.meta_controller = DynamicWeightAdapter().to(DEVICE)
        self.world_model = WorldModel(obs_dim, act_dim).to(DEVICE)

        # Optimizadores
        self.opt_policy = optim.Adam(self.policy.parameters(), lr=self.cfg["lr"])
        self.opt_world  = optim.Adam(self.world_model.parameters(), lr=self.cfg["lr_world"])

        # Scheduler do world model
        self.sched_world = optim.lr_scheduler.ReduceLROnPlateau(
            self.opt_world, mode="min", factor=0.5, patience=100, min_lr=self.cfg["lr_min"]
        )

        # AutoExpand/Prune e GenePool
        self.auto_manager = AutoExpandPruneManager(self.policy, self.cfg)
        self.gene_pool = GenePool(self.cfg)

        # Buffers
        self.rollout_obs:   list[np.ndarray] = []
        self.rollout_act:   list[int] = []
        self.rollout_rew:   list[float] = []
        self.rollout_next:  list[np.ndarray] = []
        self.rollout_done:  list[bool] = []

        # World loss history para rollback parcial
        self.world_loss_hist = deque(maxlen=self.cfg["world_loss_history_len"])

        # Checkpoints
        self.best_reward = -float("inf")
        self.last_rollback_step = -self.cfg["rollback_cooldown"]

        # Logs
        run_dir = pathlib.Path(self.cfg["tensorboard_dir"])
        run_dir.mkdir(parents=True, exist_ok=True)
        self.writer = SummaryWriter(run_dir)

        # Estado inicial
        self.state, _ = self.env.reset(seed=0)

    # ------------------------------------------------------------
    # UTILITÁRIOS
    # ------------------------------------------------------------
    def _one_hot(self, a: int, n: int):
        vec = np.zeros(n, dtype=np.float32)
        vec[a] = 1.0
        return vec

    def map_action_to_env(self, logits: torch.Tensor) -> int:
        probs = torch.softmax(logits, dim=-1).cpu().detach().numpy()
        return int(np.random.choice(len(probs), p=probs))

    # ------------------------------------------------------------
    # TREINO DO WORLD MODEL
    # ------------------------------------------------------------
    def update_world_model(self, batch_size: int = 64):
        if len(self.rollout_obs) < batch_size:
            return 0.0
        idx = np.random.choice(len(self.rollout_obs), batch_size, replace=False)
        obs     = torch.tensor(np.array(self.rollout_obs)[idx]).float().to(DEVICE)
        acts    = torch.tensor(np.array(self.rollout_act)[idx]).long().to(DEVICE)
        nextobs = torch.tensor(np.array(self.rollout_next)[idx]).float().to(DEVICE)
        rews    = torch.tensor(np.array(self.rollout_rew)[idx]).float().to(DEVICE)

        act_oh = torch.tensor([self._one_hot(a, self.env.action_space.n) for a in acts.cpu().numpy()]).to(DEVICE)
        pred_delta, pred_rew = self.world_model(obs, act_oh)
        true_delta = nextobs - obs

        loss = F.mse_loss(pred_delta, true_delta) + F.mse_loss(pred_rew, rews)
        self.opt_world.zero_grad()
        loss.backward()
        self.opt_world.step()
        self.sched_world.step(loss.item())
        self.world_loss_hist.append(loss.item())
        return loss.item()

    # ------------------------------------------------------------
    # POLÍTICA (A2C simplificado)
    # ------------------------------------------------------------
    def update_policy(self, gamma: float = 0.99):
        if len(self.rollout_rew) == 0:
            return (0.0, 0.0)
        R = 0.0
        returns = []
        for r, done in zip(reversed(self.rollout_rew), reversed(self.rollout_done)):
            R = r + gamma * R * (1.0 - done)
            returns.insert(0, R)
        returns = torch.tensor(returns, dtype=torch.float32).to(DEVICE)

        obs = torch.tensor(np.array(self.rollout_obs)).float().to(DEVICE)
        acts = torch.tensor(self.rollout_act).long().to(DEVICE)

        logits, values = self.policy(obs)
        logprobs = torch.log_softmax(logits, dim=-1)
        act_logp = logprobs[range(len(acts)), acts]

        advantages = returns - values.detach()
        policy_loss = -(act_logp * advantages).mean()
        value_loss  = F.mse_loss(values, returns)
        entropy_loss = -(torch.softmax(logits, dim=-1) * logprobs).sum(-1).mean()

        # Meta losses extra
        chaos_loss  = torch.tensor(random.random(), device=DEVICE)  # placeholder
        org_loss    = torch.tensor(random.random(), device=DEVICE)
        mem_loss    = torch.tensor(random.random(), device=DEVICE)
        stability   = torch.tensor(1.0 - min(1.0, (self.auto_manager.block_id + len(self.world_loss_hist))/50), device=DEVICE)

        losses_vec = torch.stack([chaos_loss, org_loss, mem_loss,
                                  policy_loss.detach(), value_loss.detach(), stability])
        total_meta = self.meta_controller(losses_vec)

        total_loss = policy_loss + 0.5 * value_loss - self.cfg["entropy_coef"] * entropy_loss + total_meta

        self.opt_policy.zero_grad()
        total_loss.backward()
        self.opt_policy.step()

        # logs
        step = len(self.rollout_rew_total)
        self.writer.add_scalar("Loss/Policy",    policy_loss.item(), step)
        self.writer.add_scalar("Loss/Value",     value_loss.item(),  step)
        self.writer.add_scalar("Loss/Entropy",   entropy_loss.item(),step)
        self.writer.add_scalar("Loss/TotalMeta", total_meta.item(),  step)

        return policy_loss.item(), value_loss.item()

    # ------------------------------------------------------------
    # ROLLBACKS
    # ------------------------------------------------------------
    def _partial_rollback_if_needed(self, step_idx: int):
        if len(self.world_loss_hist) < self.world_loss_hist.maxlen:
            return
        avg_wl = np.mean(self.world_loss_hist)
        if avg_wl < self.cfg["rollback_world_loss_threshold"]:
            return
        if step_idx - self.last_rollback_step < self.cfg["rollback_cooldown"]:
            return
        # rollback parcial: reset world model
        print(f"[Rollback] Partial rollback: resetting WorldModel (avg_wl={avg_wl:.3f})")
        self.world_model.apply(self._init_weights)
        self.opt_world = optim.Adam(self.world_model.parameters(), lr=self.cfg["lr_world"])
        self.world_loss_hist.clear()
        self.last_rollback_step = step_idx

    @staticmethod
    def _init_weights(m):
        if isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)

    # ------------------------------------------------------------
    # LOOP PRINCIPAL
    # ------------------------------------------------------------
    def run(self, total_steps: int = 10_000):
        global_step = 0
        episode_reward = 0.0
        self.rollout_rew_total: list[float] = []
        obs = self.state.copy()

        while global_step < total_steps:
            obs_t = torch.tensor(obs, dtype=torch.float32, device=DEVICE).unsqueeze(0)
            logits, _ = self.policy(obs_t)
            action = self.map_action_to_env(logits[0])
            next_obs, reward, done, trunc, info = self.env.step(action)

            # store
            self.rollout_obs.append(obs)
            self.rollout_act.append(action)
            self.rollout_rew.append(reward)
            self.rollout_next.append(next_obs)
            self.rollout_done.append(done)

            episode_reward += reward
            obs = next_obs.copy()
            global_step += 1

            # update world model every n steps
            if global_step % 20 == 0:
                wl = self.update_world_model()
                self.writer.add_scalar("Loss/World", wl, global_step)

            # update policy
            if done or trunc or len(self.rollout_rew) >= 32:
                pl, vl = self.update_policy()
                self.rollout_obs.clear()
                self.rollout_act.clear()
                self.rollout_rew.clear()
                self.rollout_next.clear()
                self.rollout_done.clear()
                self.rollout_rew_total.append(episode_reward)
                self.writer.add_scalar("Return/Episode", episode_reward, global_step)

                if episode_reward > self.best_reward:
                    self.best_reward = episode_reward

                # GenePool
                if global_step % self.cfg["gene_pool_interval"] == 0:
                    self.gene_pool.add_gene(self.policy, self.meta_controller)

                # troca de ambiente
                self.env_idx = (self.env_idx + 1) % len(self.envs)
                self.env = self.envs[self.env_idx]
                obs, _ = self.env.reset(seed=random.randint(0, 1_000_000))
                episode_reward = 0.0
            # auto expand/prune
            if global_step % 50 == 0:
                self.auto_manager.maybe_expand_or_prune(global_step, pl if isinstance(pl, float) else pl[0])

            # partial rollback
            self._partial_rollback_if_needed(global_step)

    # ------------------------------------------------------------
    # INTERFACE COM O CHAT
    # ------------------------------------------------------------
    def step_user_input(self, user_input: str) -> str:
        """
        Gera um rascunho textual do "pensamento" atual:
        inclui estado, última recompensa média, etc.
        Esse draft será refinado pelo LLM (DeepSeek).
        """
        draft = {
            "current_env": self.cfg["env_list"][self.env_idx],
            "state_snapshot": self.state.tolist() if isinstance(self.state, np.ndarray) else str(self.state),
            "last_reward": self.rollout_rew[-1] if self.rollout_rew else 0.0,
            "user_input": user_input.strip(),
            "policy_extra_blocks": list(self.policy.extra_blocks.keys()),
            "meta_weights": self.meta_controller.weights.detach().cpu().tolist(),
        }
        return json.dumps(draft, ensure_ascii=False, indent=2)

In [None]:
# lhra_core/kernel_manager.py
# ==============================================================
# Pequeno barramento de mensagens + scheduler de tarefas
# ==============================================================

import time
import heapq
import typing as tp
from collections import deque, defaultdict

Msg = dict  # alias semântico

class Task:
    """
    Tarefa genérica executada por algum módulo.

    priority: float (0‑1; maior = executa antes)
    interval: steps entre execuções (>=1).
    condition: callable(state_dict) -> bool  (executa se True)
    callback:  função a ser chamada
    """
    __slots__ = ("name", "priority", "interval",
                 "condition", "callback", "last_exec")

    def __init__(
        self,
        name: str,
        callback: tp.Callable[[tp.Any], None],
        priority: float = 0.5,
        interval: int = 1,
        condition: tp.Callable[[dict], bool] | None = None,
    ):
        self.name = name
        self.priority = priority
        self.interval = max(1, interval)
        self.condition = condition or (lambda _: True)
        self.callback = callback
        self.last_exec = -self.interval

    # -------- prioridade p/ heap ----------
    def __lt__(self, other: "Task"):
        return self.priority > other.priority


class KernelManager:
    """
    Barramento super‑simples: módulos publicam métricas
    em self.state; Kernel decide quais Tasks disparar.
    """

    def __init__(self):
        self.state: dict = defaultdict(lambda: 0.0)
        self.tasks: dict[str, Task] = {}
        self._task_heap: list[Task] = []

        # métricas históricas para consulta rápida
        self._history: dict[str, deque] = defaultdict(lambda: deque(maxlen=100))

        # step contador
        self.step_idx: int = 0

    # ------------------- STATE ------------------------
    def publish(self, key: str, value: float):
        self.state[key] = value
        self._history[key].append(value)

    def get_state(self, key: str, default=0.0):
        return self.state.get(key, default)

    # ------------------- TASKS ------------------------
    def register_task(
        self,
        name: str,
        callback: tp.Callable[[dict], None],
        *,
        priority: float = 0.5,
        interval: int = 1,
        condition: tp.Callable[[dict], bool] | None = None,
    ):
        task = Task(name, callback, priority, interval, condition)
        self.tasks[name] = task
        heapq.heappush(self._task_heap, task)

    def set_priority(self, name: str, priority: float):
        if name in self.tasks:
            self.tasks[name].priority = priority
            # rebuild heap for simplicity
            self._task_heap = list(self.tasks.values())
            heapq.heapify(self._task_heap)

    # ------------------- LOOP -------------------------
    def run_cycle(self):
        """
        Executa tarefas cuja condição seja True
        e cujo intervalo tenha decorrido.
        """
        executed = []
        while self._task_heap:
            task = heapq.heappop(self._task_heap)
            if self.step_idx - task.last_exec < task.interval:
                # ainda em cooldown; devolve
                executed.append(task)
                continue
            if not task.condition(self.state):
                executed.append(task)
                continue
            # Executa
            task.callback(self.state)
            task.last_exec = self.step_idx
            executed.append(task)
        # repõe heap
        for t in executed:
            heapq.heappush(self._task_heap, t)
        self.step_idx += 1

    # ------------------- HISTÓRICO --------------------
    def mean(self, key: str) -> float:
        h = self._history[key]
        return sum(h) / len(h) if h else 0.0

    def reset(self):
        self.state.clear()
        for dq in self._history.values():
            dq.clear()
        self.step_idx = 0

In [None]:
# lhra_core/focus_manager.py
# ==============================================================
# Define prioridades dinâmicas de cada módulo cognitivo
# ==============================================================

import random
import typing as tp
from collections import deque

class FocusManager:
    """
    Recebe sinais de urgência e importância de cada módulo
    e devolve dicionário {module_name: prioridade 0‑1}.
    Implementa esquemas de:
        • little‑used boost
        • estabilidade vs. exploração
    """

    def __init__(self):
        # histórico de execuções p/ fairness
        self._last_run: dict[str, int] = defaultdict(int)
        self.step_idx: int = 0

    def decide_focus(
        self,
        signals: dict[str, dict[str, float]],
        base_priorities: dict[str, float] | None = None,
    ) -> dict[str, float]:
        """
        signals[module] = {"urgency":..,"importance":..}
        Retorna dict de prioridades normalizadas.
        """
        base_priorities = base_priorities or {}
        raw: dict[str, float] = {}

        for mod, sig in signals.items():
            urg = sig.get("urgency", 0.5)
            imp = sig.get("importance", 0.5)
            base = base_priorities.get(mod, 0.5)
            # distância temporal: módulos que não rodam há muito
            lag = self.step_idx - self._last_run.get(mod, 0)
            lag_boost = min(0.3, lag / 1000.0)
            score = 0.5 * urg + 0.4 * imp + 0.1 * base + lag_boost
            raw[mod] = score

        # normaliza
        total = sum(raw.values()) or 1e-6
        norm = {m: v / total for m, v in raw.items()}

        # registra quem vai rodar
        chosen = {m: p for m, p in norm.items() if p > 0.15}  # cutoff
        for m in chosen:
            self._last_run[m] = self.step_idx

        self.step_idx += 1
        return chosen

In [None]:
# lhra_core/world_model.py
# ==========================================================
# Pequeno World‑Model (dinâmica + recompensa) em PyTorch
# ==========================================================
import torch, torch.nn as nn, torch.nn.functional as F
from torch.optim import Adam
from collections import deque
import random, typing as tp

class WorldModel(nn.Module):
    """
    Modela P(s_{t+1}|s_t,a_t)  e  R(s_t,a_t)
    • state_dim  : dimensão da observação (float list)
    • action_dim : dimensão/num de ações (discrete -> int)
    """
    def __init__(self, state_dim:int, action_dim:int, lr:float=3e‑4,
                 buffer_size:int=50_000, device:str|None=None):
        super().__init__()
        self.state_dim  = state_dim
        self.action_dim = action_dim
        self.device     = device or ("cuda" if torch.cuda.is_available() else "cpu")

        hid = 128
        self.dyn = nn.Sequential(
            nn.Linear(state_dim+action_dim, hid),
            nn.ReLU(),
            nn.Linear(hid, hid),
            nn.ReLU(),
            nn.Linear(hid, state_dim)          # Δs  (modela diferença)
        )
        self.rew = nn.Sequential(
            nn.Linear(state_dim+action_dim, hid),
            nn.Tanh(),
            nn.Linear(hid, 1)
        )
        self.to(self.device)
        self.opt  = Adam(self.parameters(), lr=lr)
        self.buf  = deque(maxlen=buffer_size)
        self.steps= 0

    # ---------- Buffer / coleta ----------
    def store(self, s:list[float], a:int, r:float, s2:list[float]):
        self.buf.append((torch.tensor(s).float(),
                         torch.tensor([a]).long(),
                         torch.tensor([r]).float(),
                         torch.tensor(s2).float()))

    # ---------- Treinamento incremental ----------
    def update(self, batch:int=64, iters:int=5)->float:
        if len(self.buf) < batch: return 0.0
        losses=[]
        for _ in range(iters):
            batch_data = random.sample(self.buf, batch)
            s,a,r,s2 = map(lambda x: torch.stack(x).to(self.device),
                           zip(*batch_data))
            a_onehot = F.one_hot(a.squeeze(), num_classes=self.action_dim).float()
            sa       = torch.cat([s, a_onehot], dim=-1)

            pred_deltas = self.dyn(sa)
            pred_s2     = s + pred_deltas
            dyn_loss    = F.mse_loss(pred_s2, s2)

            pred_r      = self.rew(sa).squeeze()
            rew_loss    = F.mse_loss(pred_r, r.squeeze())

            loss = dyn_loss + 0.5*rew_loss
            self.opt.zero_grad(); loss.backward(); self.opt.step()
            losses.append(loss.item())
            self.steps += 1
        return sum(losses)/len(losses)

    # ---------- Inference ----------
    @torch.no_grad()
    def predict(self, s:list[float], a:int)->tuple[list[float], float]:
        s_t = torch.tensor(s, device=self.device).float().unsqueeze(0)
        a_oh= F.one_hot(torch.tensor([a]), num_classes=self.action_dim).float().to(self.device)
        sa  = torch.cat([s_t, a_oh], -1)
        s2  = s_t + self.dyn(sa)
        r   = self.rew(sa).squeeze()
        return s2.cpu().view(-1).tolist(), float(r.cpu())

# ==========================================================
# Interface utilitária para LHRA_Agent
# ==========================================================
def init_world_model(cfg:dict)->WorldModel:
    return WorldModel(cfg["state_dim"], cfg["action_dim"], lr=cfg.get("wm_lr",3e-4))

In [None]:
# lhra_core/chaos_generator.py
# ==========================================================
# Gera ruído fractal / browniano para “forças de caos” internas
# ==========================================================
import numpy as np, math, random

class ChaosGenerator:
    """Gera sequências de ruído coerente (Perlin 1D simplificado)."""
    def __init__(self, seed:int|None=None, scale:float=.3):
        self.scale  = scale
        random.seed(seed or 0xA1B2C3)

        # Gradientes em pontos inteiros
        self.grad = {i:random.uniform(-1,1) for i in range(10_000)}

    def _grad(self, i:int)->float:
        if i not in self.grad:
            self.grad[i] = random.uniform(-1,1)
        return self.grad[i]

    def _fade(self, t:float)->float:
        # curva de suavização 6t^5-15t^4+10t^3
        return t*t*t*(t*(t*6-15)+10)

    def noise(self, x:float)->float:
        x0 = math.floor(x); x1 = x0+1
        sx = self._fade(x - x0)
        n0 = self._grad(x0)*(x-x0)
        n1 = self._grad(x1)*(x-x1)
        return (1-sx)*n0 + sx*n1

    # Sequência temporal de ruído
    def sequence(self, t0:float, steps:int, step_size:float=.01)->list[float]:
        return [self.noise(t0+i*step_size)*self.scale for i in range(steps)]

In [None]:
# lhra_core/fractal_memory.py
# ==========================================================
# Estrutura de “memória fractal” simples (árvore HAMT‑like)
# ==========================================================
import hashlib, json, os
from collections import defaultdict
from pathlib import Path
import typing as tp

def _hash(key:str)->int:
    return int(hashlib.sha256(key.encode()).hexdigest(),16)

class Node:
    __slots__=("children","value")
    def __init__(self):
        self.children: dict[int,"Node"] = {}
        self.value: tp.Any = None

class FractalMemory:
    """
    Armazena pares (chave -> valor) numa trie de hashing.
    O “profundidade” é limitado para não explodir RAM.
    """
    def __init__(self, depth:int=5, fanout:int=16,
                 file:Path|str="fractal_mem.json"):
        self.root   = Node()
        self.depth  = depth
        self.fanout = fanout
        self.file   = Path(file)
        if self.file.exists(): self._load()

    # ------------ helpers ---------------
    def _index_seq(self, key:str)->list[int]:
        h=_hash(key)
        seq=[]
        for _ in range(self.depth):
            seq.append(h % self.fanout)
            h//= self.fanout
        return seq

    # ------------ operations ------------
    def put(self, key:str, value:tp.Any):
        node=self.root
        for idx in self._index_seq(key):
            if idx not in node.children: node.children[idx]=Node()
            node=node.children[idx]
        node.value=value
        self._dump()

    def get(self, key:str, default=None):
        node=self.root
        for idx in self._index_seq(key):
            if idx not in node.children: return default
            node=node.children[idx]
        return node.value if node.value is not None else default

    def _dump(self):
        # serialização bem simples (profunda)
        out={}
        def rec(n:Node,pfx:str):
            if n.value is not None: out[pfx] = n.value
            for k,ch in n.children.items():
                rec(ch, pfx+hex(k)[2:])
        rec(self.root,"")
        self.file.write_text(json.dumps(out))

    def _load(self):
        data=json.loads(self.file.read_text())
        for k,v in data.items(): self.put(k,v)

# ==========================================================
# Interface para outros módulos
# ==========================================================
def init_fractal_memory(cfg:dict)->FractalMemory:
    return FractalMemory(depth=cfg.get("fm_depth",5),
                         fanout=cfg.get("fm_fanout",16),
                         file=cfg.get("fm_file","fractal_mem.json"))

In [None]:
# lhra_core/semantic_memory.py
# ==========================================================
import json, os, threading, typing as tp
from pathlib import Path

class SemanticMemory:
    """
    Memória factual de longo prazo salva em disco.
    • key -> value  (qualquer JSON‑serializável)
    • thread‑safe usando Lock simples
    """
    def __init__(self, file: str | Path = "semantic_memory.json"):
        self.path   = Path(file)
        self.data: dict[str, tp.Any] = {}
        self._lock  = threading.Lock()
        self._load()

    # -------- Básico ----------
    def store(self, key: str, value: tp.Any):
        with self._lock:
            self.data[key] = value
            self._save()

    def retrieve(self, key: str, default=None):
        return self.data.get(key, default)

    def search_prefix(self, prefix:str)->dict[str,tp.Any]:
        return {k:v for k,v in self.data.items() if k.startswith(prefix)}

    # -------- Persistência ----------
    def _save(self):
        self.path.write_text(json.dumps(self.data, indent=2))

    def _load(self):
        if self.path.exists():
            try:
                self.data = json.loads(self.path.read_text())
            except Exception:
                self.data = {}

# helper
def init_semantic_memory(cfg:dict)->SemanticMemory:
    return SemanticMemory(cfg.get("semantic_file", "semantic_memory.json"))

In [None]:
# lhra_core/symbolic_reasoner.py
# ==========================================================
import numpy as np
from sklearn.cluster import KMeans
from collections import defaultdict
from .semantic_memory import SemanticMemory

try:
    import networkx as nx                  # opcional (visualização de regras)
except ImportError:
    nx = None

class SymbolicReasoner:
    """
    Extrai padrões + gera regras simbólicas sobre (estado, ação, recompensa).
    Também cria 'sub‑goals' para GoalManager baseado nos clusters.
    """
    def __init__(self, state_dim:int, semantic_mem:SemanticMemory,
                 n_clusters:int=6, random_state:int=42):
        self.state_dim     = state_dim
        self.semantic_mem  = semantic_mem
        self.n_clusters    = n_clusters
        self.kmeans        = KMeans(n_clusters=n_clusters,
                                    random_state=random_state)
        self._buffer: list[np.ndarray] = []   # últimos estados coletados
        self.cluster_centroids: np.ndarray|None = None
        self.rules: dict[int,list[str]] = defaultdict(list)

    # ---------- coleta ----------
    def observe_state(self, state:list[float]):
        self._buffer.append(np.array(state))
        if len(self._buffer) > 5000:          # mantém ~5k
            self._buffer = self._buffer[-5000:]

    # ---------- processamento ----------
    def extract_rules(self)->None:
        if len(self._buffer) < self.n_clusters*5:   # precisa de dados
            return
        X = np.stack(self._buffer)
        self.kmeans.fit(X)
        self.cluster_centroids = self.kmeans.cluster_centers_

        # Exemplo simples: regra = média > mediana em certa dimensão
        dims = min(4, self.state_dim)        # examina 4 primeiras dims
        self.rules.clear()
        for i,c in enumerate(self.cluster_centroids):
            cluster_name = f"cluster_{i}"
            generated=[]
            for d in range(dims):
                generated.append(
                    f"IF state[{d}] ≈ {c[d]:.2f} THEN likely in {cluster_name}"
                )
            self.rules[i] = generated
            # salva no SemanticMemory
            self.semantic_mem.store(f"centroid:{cluster_name}", c.tolist())

    # ---------- sub‑goals ----------
    def generate_sub_goals(self)->list[dict]:
        goals=[]
        if self.cluster_centroids is None: return goals
        for idx,c in enumerate(self.cluster_centroids):
            goals.append({
                "type": "explore_cluster",
                "target": c.tolist(),
                "id": idx
            })
        return goals

    # ---------- visualização opcional ----------
    def export_graph(self, file="rules_graph.png"):
        if nx is None or not self.rules: return
        G = nx.DiGraph()
        for cid, rs in self.rules.items():
            for r in rs: G.add_edge(f"cluster_{cid}", r)
        nx.drawing.nx_pydot.write_dot(G, "rules.dot")
        os.system("dot -Tpng rules.dot -o "+file)

In [None]:
# lhra_core/planning_module.py
# ==========================================================
import time, typing as tp
from collections import deque

class Task:
    def __init__(self, name:str, params:dict|None=None, cond:tp.Callable[...,bool]|None=None):
        self.name   = name
        self.params = params or {}
        self.cond   = cond or (lambda *_: True)
        self.done   = False

    def __repr__(self): return f"Task({self.name})"

class PlanningModule:
    """
    HTN simplificado:  goal -> lista hierárquica de Sub‑Tarefas (tasks_queue)
    """
    def __init__(self):
        self.current_plan: deque[Task] = deque()
        self.cot: list[str] = []             # Chain‑of‑Thought textual

    def build_hierarchical_plan(self, goal:dict)->None:
        self.current_plan.clear(); self.cot.clear()
        gtype = goal.get("type")

        if gtype == "explore_cluster":
            self.cot.append(f"— Meta: Explorar {goal['id']}")
            self.current_plan.append(Task("NavigateToCentroid",
                                   {"centroid":goal["target"]}))
            self.current_plan.append(Task("CollectObservations",
                                   {"duration": 50}))
        else:
            self.cot.append(f"— Meta genérica {gtype}")
            self.current_plan.append(Task("Idle", {"duration":20}))

    # Chamado a cada step
    def step(self, env_state)->list[str]:
        if not self.current_plan: return []

        task=self.current_plan[0]
        if task.cond(env_state):       # pronto para executar
            self.execute(task, env_state)
            task.done=True
            self.current_plan.popleft()
        return self.cot[-1:]           # devolve última CoT para log

    def execute(self, task:Task, env_state):
        self.cot.append(
            f"{time.time():.0f}: Executando {task.name} com {task.params}"
        )
        # efeito fictício; integração real depende da politica

In [None]:
# lhra_core/goal_manager.py
# ==========================================================
import time, uuid, typing as tp

class Goal:
    def __init__(self, goal_dict:dict):
        self.id        = goal_dict.get("uid", str(uuid.uuid4()))
        self.type      = goal_dict["type"]
        self.payload   = goal_dict.get("target", None)
        self.deadline  = goal_dict.get("deadline", time.time()+600)
        self.progress  = 0.0
        self.done      = False

class GoalManager:
    """
    Gerencia fila de metas: adicionar, atualizar progresso, remover.
    """
    def __init__(self):
        self.active: dict[str, Goal] = {}

    def add_goals(self, goals:list[dict]):
        for g in goals:
            goal = Goal(g)
            self.active[goal.id] = goal

    def update_progress(self, env_state):
        # exemplo: marcar done se perto do payload/centroide
        for goal in self.active.values():
            if goal.type=="explore_cluster":
                dist = float(sum((a-b)**2 for a,b in zip(env_state,goal.payload))**0.5)
                goal.progress = max(goal.progress, 1/(dist+1e‑4))
                if dist < .05: goal.done=True

    def remove_done(self):
        self.active = {k:v for k,v in self.active.items() if not v.done}

    # Acesso externo
    def primary_goal(self)->Goal|None:
        if not self.active: return None
        # prioriza a maior urgência (deadline próxima)
        return min(self.active.values(), key=lambda g:g.deadline)

In [None]:
# lhra_core/auto_expand_prune.py
# ==========================================================
"""
Gerencia expansão (adiciona blocos) e poda (remove blocos pouco relevantes)
da política `torch.nn.Module`.  Mantém até K checkpoints em heap ordenado
pelo melhor `avg_reward`.
"""
from __future__ import annotations
import torch, heapq, time, copy
from collections import deque
from dataclasses import dataclass, field

@dataclass(order=True)
class _Chkpt:
    score: float
    ts   : float = field(compare=False)
    data : dict  = field(compare=False)

class AutoExpandPruneManager:
    def __init__(self,
                 policy: torch.nn.Module,
                 optimizer: torch.optim.Optimizer,
                 max_extras:int = 4,
                 expand_threshold: float = 0.05,      # perda ↑ 5 %
                 prune_threshold : float =-0.05,      # perda ↓ 5 %
                 cooldown_steps  : int   = 500,
                 top_k_ckpt      : int   = 3):
        self.policy  = policy
        self.optimizer = optimizer
        self.max_extras = max_extras
        self.expand_threshold = expand_threshold
        self.prune_threshold  = prune_threshold
        self.cooldown_steps   = cooldown_steps
        self.last_action_step = 0
        self.step_counter     = 0

        self.block_stats: dict[str,float] = {}
        self.checkpoints : list[_Chkpt] = []
        self.top_k_ckpt  = top_k_ckpt

    # -------------------------------------------------------
    # checkpoint helpers
    def _make_full_state(self)->dict:
        return {
            "policy"   : copy.deepcopy(self.policy.state_dict()),
            "optim"    : copy.deepcopy(self.optimizer.state_dict()),
        }
    def push_ckpt(self, score:float):
        heapq.heappush(self.checkpoints,
                       _Chkpt(score, time.time(), self._make_full_state()))
        # mantêm heap pequeno
        if len(self.checkpoints) > self.top_k_ckpt:
            heapq.heappop(self.checkpoints)

    def restore_best(self):
        if not self.checkpoints: return False
        best = max(self.checkpoints).data
        self.policy.load_state_dict(best["policy"])
        self.optimizer.load_state_dict(best["optim"])
        return True

    # -------------------------------------------------------
    # expansão / poda
    def maybe_expand_or_prune(self,
                              current_loss: float,
                              ref_loss    : float):
        self.step_counter += 1
        if self.step_counter - self.last_action_step < self.cooldown_steps:
            return                                  # em cooldown

        delta = (current_loss - ref_loss) / (abs(ref_loss)+1e-8)

        if delta > self.expand_threshold and self._n_extras() < self.max_extras:
            self._expand_block()
        elif delta < self.prune_threshold and self._n_extras()>0:
            self._prune_block()
        # else: nada

    # ---------- implementação ----------
    def _n_extras(self)->int:
        return sum(1 for n,_ in self.policy.named_modules() if n.startswith("extra_"))
    def _expand_block(self):
        idx = self._n_extras()
        new_block = torch.nn.Sequential(
            torch.nn.Linear(128, 128),
            torch.nn.BatchNorm1d(128),
            torch.nn.ReLU(),
        )
        block_name = f"extra_{idx}"
        setattr(self.policy, block_name, new_block)
        self.block_stats[block_name] = 1.0          # peso inicial
        self.last_action_step = self.step_counter
        print(f"[AutoExpand] adicionou {block_name}")

    def _prune_block(self):
        # Avalia importância via grad + peso
        self._update_block_stats()
        # menor importância
        block_name = min(self.block_stats, key=self.block_stats.get)
        delattr(self.policy, block_name)
        self.block_stats.pop(block_name, None)
        self.last_action_step = self.step_counter
        print(f"[AutoPrune] removeu {block_name}")

    # ---------- importância ----------
    def _update_block_stats(self):
        for name, module in self.policy.named_modules():
            if not name.startswith("extra_"): continue
            with torch.no_grad():
                w_norm = sum(p.norm().item() for p in module.parameters())
                # gradiente médio
                g_norm = sum((p.grad.norm().item() if p.grad is not None else 0)
                             for p in module.parameters())
            self.block_stats[name] = 0.7*w_norm + 0.3*g_norm

In [None]:
# lhra_core/gene_pool.py
# ==========================================================
"""
Mecanismo evolucionário simples:
• Armazena top‑N 'genes' (snapshot de pesos + meta‑params).
• Mutação leve (+/‑5 %) nos pesos do meta‑controlador (DynamicWeightAdapter).
• Torneio offline de 600 steps com seeds fixas.
"""
from __future__ import annotations
import copy, random, torch, numpy as np

class Gene:
    def __init__(self, state_dict:dict, meta_weights:torch.Tensor,
                 score:float, seed:int):
        self.state_dict   = state_dict
        self.meta_weights = meta_weights.clone()
        self.score        = score
        self.seed         = seed

class GenePool:
    def __init__(self, capacity:int=5):
        self.capacity = capacity
        self.pool: list[Gene] = []

    # ---------- armazenamento ----------
    def consider(self, gene:Gene):
        self.pool.append(gene)
        self.pool.sort(key=lambda g: g.score, reverse=True)
        self.pool = self.pool[:self.capacity]

    # ---------- mutação ----------
    @staticmethod
    def mutate_meta(meta:torch.Tensor, noise:float=0.05)->torch.Tensor:
        with torch.no_grad():
            return meta * (1.0 + noise*torch.randn_like(meta))

    # ---------- seleção ----------
    def sample_best(self)->Gene|None:
        return self.pool[0] if self.pool else None

    # ---------- torneio offline ----------
    def tournament(self, agent_factory, env_factory,
                   steps:int=600, seed:int=123)->None:
        if not self.pool: return
        challenger = random.choice(self.pool)
        mutated_meta = self.mutate_meta(challenger.meta_weights)
        agent = agent_factory(challenger.state_dict, mutated_meta)

        env = env_factory(seed)
        total=0
        state,_ = env.reset(seed=seed)
        for _ in range(steps):
            action = agent.act(state)
            state, rew, term, trunc,_ = env.step(action)
            total += rew
            if term or trunc: state,_ = env.reset()
        # Armazena gene se melhor
        self.consider(Gene(agent.state_dict(), mutated_meta, total/steps, seed))

In [None]:
# lhra_core/code_synthesizer.py
# ==========================================================
"""
Gera código Python dinamicamente, carrega como módulo e injeta bloco
na política. Remove se não houver ganho >5 % após `eval_window` steps.
"""
import importlib.util, inspect, os, random, time, types, uuid, textwrap
from pathlib import Path
import torch, shutil

PLUGIN_DIR = Path("generated_plugins")
PLUGIN_DIR.mkdir(exist_ok=True)

class ActivePlugin:
    def __init__(self, name:str, inj_step:int, base_score:float):
        self.name       = name
        self.inj_step   = inj_step
        self.base_score = base_score
        self.removed    = False

class CodeSynthesizer:
    def __init__(self, policy:torch.nn.Module,
                 eval_window:int=300, min_improve:float=0.05):
        self.policy = policy
        self.eval_window = eval_window
        self.min_improve = min_improve
        self.active_plugins: dict[str, ActivePlugin] = {}

    # ---------- ciclo principal ----------
    def step(self, step_idx:int, current_score:float):
        # injeta plugin de tempos em tempos
        if step_idx % 800 == 0:
            self._inject_plugin(step_idx, current_score)
        # avalia plugins ativos
        for name, plug in list(self.active_plugins.items()):
            if plug.removed: continue
            if step_idx - plug.inj_step >= self.eval_window:
                delta = (current_score - plug.base_score)/abs(plug.base_score+1e-9)
                if delta < self.min_improve:
                    self._remove_plugin(name)
                else:
                    # mantêm mas atualiza base_score
                    plug.base_score = current_score
                    plug.inj_step   = step_idx

    # ---------- geração ----------
    def _inject_plugin(self, step_idx:int, base_score:float):
        mode = random.choice(["linear","residual","transformer"])
        code_str, class_name = self._generate_code(mode)
        file_path = PLUGIN_DIR/f"plug_{uuid.uuid4().hex}.py"
        file_path.write_text(code_str)

        spec = importlib.util.spec_from_file_location(file_path.stem, file_path)
        module = importlib.util.module_from_spec(spec); spec.loader.exec_module(module)
        block_cls = getattr(module, class_name)
        block = block_cls()

        plug_name = f"plug_{len(self.active_plugins)}"
        setattr(self.policy, plug_name, block)
        self.active_plugins[plug_name] = ActivePlugin(plug_name, step_idx, base_score)
        print(f"[CodeSynth] injetou {plug_name} ({mode})")

    # ---------- remoção ----------
    def _remove_plugin(self, plug_name:str):
        try:
            delattr(self.policy, plug_name)
            self.active_plugins[plug_name].removed=True
            print(f"[CodeSynth] removeu {plug_name} (sem ganho)")
        except AttributeError:
            pass

    # ---------- templates ----------
    def _generate_code(self, mode:str)->tuple[str,str]:
        class_name = "ExtraBlock"
        if mode == "linear":
            body = """
self.block = torch.nn.Linear(128, 128)
def forward(self, x):
    return torch.relu(self.block(x))
"""
        elif mode == "residual":
            body = """
self.block = torch.nn.Sequential(
    torch.nn.Linear(128,128), torch.nn.ReLU(),
    torch.nn.Linear(128,128))
def forward(self, x):
    return x + 0.5*self.block(x)
"""
        elif mode == "transformer":
            # requer transformers; fallback para linear se indisponível
            try:
                import transformers
                body = """
from transformers import DistilBertModel
self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
def forward(self, x):
    out = self.bert(inputs_embeds=x.unsqueeze(0)).last_hidden_state.squeeze(0)
    return out[:, :128]  # projeta
"""
            except ImportError:
                body = """
self.block = torch.nn.Linear(128,128)
def forward(self,x): return torch.relu(self.block(x))
"""
        code = f'''
import torch
class {class_name}(torch.nn.Module):
    def __init__(self):
        super().__init__()
{textwrap.indent(body, "        ")}
'''
        return code, class_name

In [None]:
# lhra_core/metrics_coordinator.py
# ==========================================================
"""
Unifica métricas de caos, organização, memória, policy_loss, world_loss e estabilidade.
Mantém histórico (deque) para normalização por Z‑score ou min‑max.
"""
from __future__ import annotations
from collections import deque
import numpy as np

_METRICS = ("chaos","org","mem","policy","world","stability")

class MetricsCoordinator:
    def __init__(self, hist:int=500):
        self.hist = hist
        self.buffers = {k: deque(maxlen=hist) for k in _METRICS}
        self.last_snapshot:dict[str,float] = {k:0. for k in _METRICS}

    # -------------------------------------------------------
    def update(self, step:int,
               chaos:float, org:float, mem:float,
               p_loss:float, w_loss:float):
        stability = self._compute_stability(step)
        vals = dict(chaos=chaos, org=org, mem=mem,
                    policy=p_loss, world=w_loss,
                    stability=stability)
        for k,v in vals.items(): self.buffers[k].append(v)
        self.last_snapshot = self._normalize(vals)

    # -------------------------------------------------------
    def snapshot(self)->dict[str,float]:
        return self.last_snapshot

    # -------------------------------------------------------
    def _normalize(self, vals:dict[str,float])->dict[str,float]:
        out = {}
        for k,v in vals.items():
            buf = np.array(self.buffers[k]) if self.buffers[k] else np.array([v])
            mean, std = buf.mean(), buf.std()+1e-6
            out[k] = (v-mean)/std
        return out

    # -------------------------------------------------------
    # simples: estabilidade = 1 - (# rollback + # expand) / janela
    def _compute_stability(self, step:int)->float:
        tot = sum(len(b) for b in self.buffers.values())
        if tot==0: return 1.
        changes = len(self.buffers["policy"])   # proxy: mudanças em loss
        return max(0., 1. - changes/tot)

In [None]:
# lhra_core/cognitive_scheduler.py
# ==========================================================
"""
Atenção baseada em métricas: devolve um dicionário prioridade∈[0,1]
para cada módulo registrado.
"""
from __future__ import annotations
import numpy as np

class CognitiveScheduler:
    def __init__(self, modules:list[str]):
        self.modules = modules
        # pesos fixos de influência das métricas -> prioridade
        self.weights = {
            "world_model" : np.array([+0.0,-0.2,-0.1,-0.2,+0.9,-0.3]),
            "symbolic"    : np.array([+0.2,+0.8,+0.2,-0.1,-0.1,-0.1]),
            "memory"      : np.array([+0.0,+0.1,+0.9,-0.2,-0.1,+0.0]),
            "planner"     : np.array([+0.1,+0.3,+0.1,+0.0,+0.0,-0.1]),
            "gene_pool"   : np.array([-0.1,-0.2,-0.2,+0.3,+0.2,-0.4]),
            "synth"       : np.array([-0.3,-0.3,-0.3,+0.8,+0.2,-0.2]),
        }

    def compute_priorities(self, norm_metrics:dict[str,float])->dict[str,float]:
        vec = np.array([norm_metrics[k] for k in
                        ("chaos","org","mem","policy","world","stability")])
        prios = {}
        for m in self.modules:
            w = self.weights.get(m, np.zeros_like(vec))
            score = float(np.tanh(np.dot(w,vec)))   # [-1,1]  → prioridade
            prios[m] = max(0., score)               # corta negativos
        # normaliza soma=1 (se todos zero, divide evita 0)
        s = sum(prios.values()) or 1.
        return {m:p/s for m,p in prios.items()}

In [None]:
# lhra_core/tool_manager.py
# ==========================================================
"""
Ferramentas externas registráveis. Uso:
    res = tool_manager.invoke("web_search", query="LLMs")
"""
import requests, json, os
from pathlib import Path

class ToolManager:
    def __init__(self):
        self.tools:dict[str,callable] = {}
        # registra ferramentas padrão
        self.register("web_search",   self._web_search)
        self.register("file_read",    self._file_read)

    def register(self, name:str, func:callable):
        self.tools[name]=func

    def invoke(self, name:str, **kwargs):
        if name not in self.tools:
            raise ValueError(f"Tool {name} não existe")
        return self.tools[name](**kwargs)

    # -------------------------------------------------------
    # Ferramentas padrão
    def _web_search(self, query:str, top_k:int=3):
        url=f"https://duckduckgo.com/?q={query}&format=json"
        try:
            data=requests.get(url, timeout=10).json()
            res=[a["Text"] for a in data.get("RelatedTopics",[])][:top_k]
            return res or ["Sem resultados."]
        except Exception as e:
            return [f"Erro: {e}"]

    def _file_read(self, path:str, max_chars:int=2000):
        p = Path(path)
        if not p.exists(): return f"Arquivo {path} não encontrado."
        return p.read_text()[:max_chars]

In [None]:
# lhra_core/narrative_module.py
# ==========================================================
"""
Armazena raciocínio textual (“chain‑of‑thought”) e permite gerar narrativa
contrafactual simples para debugging.
"""
from __future__ import annotations
from pathlib import Path, PurePath
import json, time

class NarrativeModule:
    def __init__(self, log_dir="narratives"):
        self.dir = Path(log_dir); self.dir.mkdir(exist_ok=True)
        self.buffer = []

    def log_step(self, state, action, reward, info:str=""):
        entry = dict(t=time.time(),
                     s=str(state)[:120],
                     a=str(action),
                     r=float(reward),
                     info=info)
        self.buffer.append(entry)

    def flush(self, episode:int):
        if not self.buffer: return
        f = self.dir/f"ep_{episode}.json"
        f.write_text(json.dumps(self.buffer, indent=2))
        self.buffer.clear()

    # -------------------------------------------------------
    def generate_contrafactual(self, what_if:str)->str:
        """
        Resposta simples baseada no histórico + 'what_if' (ex.: 'e se
        a recompensa fosse dobrada?').
        """
        if not self.buffer: return "Sem dados."
        avg_r = sum(b["r"] for b in self.buffer)/len(self.buffer)
        return (f"Se '{what_if}', estimamos que a recompensa média "
                f"poderia mudar de {avg_r:.2f} para {avg_r*1.2:.2f}.")

In [None]:
"""
integration_app.py
==================
• Junta LHRA_Agent (núcleo RL/autoevolução) +
  DeepSeek LLM (API) via LangChain +
  Gradio Chat UI com histórico.

Executar em Colab/local:
------------------------------------------------
!pip install -q langchain~=0.1.0 gradio~=4.10.0 \
                 sentence-transformers requests python-dotenv
# -> ou chame: pip install -r requirements.txt  (gerado abaixo)

# coloque sua chave API (DeepSeek) em .env  (DEEPSEEK_API_KEY=...)
python integration_app.py
------------------------------------------------
"""

# -------------------------  requirements.txt  --------------------------
REQUIREMENTS_TXT = """
langchain==0.1.0
gradio==4.10.0
sentence-transformers==0.12.1
requests~=2.31.0
python-dotenv>=1.0.0
gymnasium~=0.29             # se for rodar ambientes do LHRA
"""

# escrever requirements.txt se não existir
import os, textwrap, sys, json, time, threading, inspect
from pathlib import Path
requirements_file = Path("requirements.txt")
if not requirements_file.exists():
    requirements_file.write_text(textwrap.dedent(REQUIREMENTS_TXT).strip()+"\n")

# -------------------------  DeepSeek LLM Wrapper  ----------------------
from langchain.llms.base import LLM
from typing import Any, Dict, List, Optional
import requests, dotenv, math

dotenv.load_dotenv()

class DeepSeekLLM(LLM):
    """
    Wrapper simples para a API DeepSeek‑LLM.
    A API real pode diferir; adapte endpoint/payload conforme docs oficiais.
    """
    model: str = "deepseek-chat-v1"        # id hipotético
    temperature: float = 0.7
    max_tokens: int = 512
    api_key: str = os.getenv("DEEPSEEK_API_KEY", "")

    @property
    def _llm_type(self) -> str:
        return "deepseek-llm"

    # ------------- chamada principal ---------------
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        **kwargs: Any
    ) -> str:
        if not self.api_key:
            raise ValueError("Defina DEEPSEEK_API_KEY no .env!")

        url = "https://api.deepseek.com/v1/completions"  # endpoint fictício
        payload = {
            "model": self.model,
            "prompt": prompt,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
        }
        if stop: payload["stop"] = stop

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        try:
            resp = requests.post(url, json=payload, headers=headers, timeout=60)
            resp.raise_for_status()
            data = resp.json()
            # formato fictício; ajuste ao real
            return data["choices"][0]["text"].strip()
        except Exception as e:
            return f"[DeepSeek API ERROR] {e}"

# -------------------  LHRA Agent (stub de integração)  -----------------
# Assumindo que as iterações 1‑6 já estão em lhra_core/.
try:
    sys.path.append(str(Path().absolute()))
    from lhra_core.agent import LHRA_Agent
except Exception as e:
    # Para quem não tem a implementação completa ainda,
    # gera um stub minimamente funcional.
    class LHRA_Agent:
        def __init__(self):
            self.state = "[init]"
            self.step_id = 0
        def step_user_input(self, user_msg:str) -> str:
            self.step_id += 1
            self.state = f"s{self.step_id}"
            # rascunho textual que será refinado pela LLM
            return f"(internal‑state={self.state})\nUser said: {user_msg}"

# -------------------  LangChain (Prompt & Chain) -----------------------
from langchain.prompts import PromptTemplate
from langchain.memory import ConversationBufferMemory
from langchain.chains import LLMChain

prompt_template = PromptTemplate(
    input_variables=["chat_history", "lhra_draft"],
    template=(
        "Você é um assistente avançado que refina o rascunho do agente LHRA, "
        "melhorando clareza e precisão.\n"
        "Histórico da conversa:\n{chat_history}\n"
        "Rascunho do LHRA:\n{lhra_draft}\n"
        "Resposta refinada:"
    ),
)

memory_chain = ConversationBufferMemory(memory_key="chat_history",
                                         return_messages=True)
deepseek_llm = DeepSeekLLM()
chain = LLMChain(llm=deepseek_llm,
                 prompt=prompt_template,
                 memory=memory_chain)

# ---------------------------  Gradio UI  -------------------------------
import gradio as gr

agent = LHRA_Agent()         # núcleo

def chat_with_agent(user_input, history):
    """Função chamada pelo componente chat da UI."""
    if not user_input: return "", history

    # 1) LHRA gera rascunho
    lhra_draft = agent.step_user_input(user_input)

    # 2) DeepSeek LLM refina
    refined = chain.run(lhra_draft=lhra_draft)

    history = history + [(user_input, refined)]
    return "", history

with gr.Blocks(title="LHRA + DeepSeek Chat") as demo:
    gr.Markdown("### <center>🤖 LHRA&nbsp;Agent + DeepSeek&nbsp;LLM</center>")
    chatbot = gr.Chatbot([], elem_id="chatbot", height=450)
    msg = gr.Textbox(label="Digite aqui")
    clear = gr.Button("Limpar")

    msg.submit(chat_with_agent, inputs=[msg, chatbot], outputs=[msg, chatbot])
    clear.click(lambda: ([], []), None, [chatbot])

# ---------------------------  main  ------------------------------------
def main():
    # imprime help rápido
    print("🔑 Coloque sua chave DEEPSEEK_API_KEY em .env  (ou export).")
    print("📦 Se faltar dependências, instale: pip install -r requirements.txt")
    demo.launch()

if __name__ == "__main__":
    main()

In [None]:
"""
lhra_monitor.py  (Iteração 8)
=============================
• Logger & Checkpointer para núcleo LHRA
• Painel TensorBoard (/runs/lhra)
• Endpoint Flask  /reload_checkpoint  -> carrega .pth mais recente
"""

import os, time, json, psutil, threading, datetime, torch, queue
from pathlib import Path
from typing import Any, Dict
from torch.utils.tensorboard import SummaryWriter
from flask import Flask, request, jsonify

# ---------- CONFIGURAÇÃO  ------------------
LOG_DIR        = Path("runs") / "lhra"
CKPT_DIR       = Path("checkpoints")
CKPT_DIR.mkdir(exist_ok=True, parents=True)
SAVE_EVERY_SEC = 120                         # checkpoint a cada 2 min
WRITE_EVERY    = 50                          # passos p/ TB summary
# ------------------------------------------

class LHRA_Monitor:
    """ Monitora e faz checkpoint do agente em background. """
    def __init__(self, agent, writer: SummaryWriter):
        self.agent     : Any   = agent
        self.writer    : SummaryWriter = writer
        self.last_save : float = 0.0
        self.step_idx  : int   = 0
        self.queue     : "queue.Queue[Dict]" = queue.Queue()

        # inicia thread para salvar/checkpointer
        threading.Thread(target=self._loop, daemon=True).start()

    # ------------ INTERFACE PÚBLICA ----------------
    def log_metrics(self, metrics: Dict[str, float]):
        """Chamado pelo LHRA a cada step."""
        self.queue.put(metrics)

    # ------------ THREAD LOOP ----------------------
    def _loop(self):
        while True:
            try:
                metrics = self.queue.get(timeout=1.0)
                self._handle_metrics(metrics)
            except queue.Empty:
                pass

            # checkpoint periódico
            if time.time() - self.last_save > SAVE_EVERY_SEC:
                self._save_checkpoint()

    def _handle_metrics(self, m: Dict[str, float]):
        self.step_idx += 1
        if self.step_idx % WRITE_EVERY == 0:
            for k, v in m.items():
                self.writer.add_scalar(k, v, self.step_idx)

            # recursos máquina
            self.writer.add_scalar("sys/cpu_pct",
                                   psutil.cpu_percent(), self.step_idx)
            if psutil.sensors_temperatures():
                t = psutil.sensors_temperatures().get("coretemp", [])[0].current
                self.writer.add_scalar("sys/cpu_temp", t, self.step_idx)

    def _save_checkpoint(self):
        ts   = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        file = CKPT_DIR / f"lhra_{ts}.pth"
        try:
            torch.save({
                "agent_state": self.agent.state_dict(),
                "step_idx"   : self.step_idx,
                "timestamp"  : ts,
            }, file)
            print(f"💾  Checkpoint salvo: {file}")
            self.last_save = time.time()
        except Exception as e:
            print(f"[CKPT‑ERROR] {e}")

# ---------------------------------------------------------------------
#  FLASK ‑ hot‑reload do checkpoint mais recente
# ---------------------------------------------------------------------
app = Flask(__name__)
GLOBAL_AGENT_REF: Any = None   # setado externamente

@app.route("/reload_checkpoint", methods=["POST"])
def reload_ckpt():
    global GLOBAL_AGENT_REF
    try:
        ckpts = sorted(CKPT_DIR.glob("lhra_*.pth"))
        if not ckpts:
            return jsonify({"ok": False, "msg": "Nenhum checkpoint."}), 404
        latest = ckpts[-1]
        data   = torch.load(latest, map_location="cpu")
        GLOBAL_AGENT_REF.load_state_dict(data["agent_state"])
        return jsonify({"ok": True,
                        "msg": f"Checkpoint {latest.name} carregado.",
                        "step": data.get("step_idx", -1)})
    except Exception as e:
        return jsonify({"ok": False, "error": str(e)}), 500

def start_flask(port=5055):
    threading.Thread(target=lambda: app.run("0.0.0.0", port),
                     daemon=True).start()

# ---------------------------------------------------------------------
#                FUNÇÃO DE FÁCIL INTEGRAÇÃO
# ---------------------------------------------------------------------
def attach_monitor_to_agent(agent) -> LHRA_Monitor:
    """Chamada de qualquer lugar (ex.: dentro de integration_app)."""
    writer  = SummaryWriter(LOG_DIR)
    mon     = LHRA_Monitor(agent, writer)
    # disponibiliza agente p/ endpoint
    global GLOBAL_AGENT_REF
    GLOBAL_AGENT_REF = agent
    start_flask()
    print(f"📊 TensorBoard:  tensorboard --logdir={LOG_DIR}  (porta 6006)")
    print("🔁  Endpoint reload: POST http://localhost:5055/reload_checkpoint")
    return mon

# ==================== TESTE RÁPIDO ==========================
if __name__ == "__main__":
    class Dummy:
        def state_dict(self):  return {"x": 1}
        def load_state_dict(self, d): print("state loaded", d)
    ag  = Dummy()
    mon = attach_monitor_to_agent(ag)
    # simular métrica pingando
    for i in range(1000):
        mon.log_metrics({"reward_avg": math.sin(i/30)+1,
                         "world_loss": abs(math.cos(i/40))})
        time.sleep(0.05)