Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 152 additions & 8 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
return
try:
self._bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
self._bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._open_session(session_id)
logger.info(
"MemOS: bridge ready session=%s platform=%s (episode deferred)",
Expand Down Expand Up @@ -713,9 +721,7 @@ def handle_tool_call(self, tool_name: str, args: dict[str, Any], **_kwargs: Any)
elif kind == "policy":
body = self._clip(
"\n\n".join(
part
for part in [item.get("title"), item.get("procedure")]
if part
part for part in [item.get("title"), item.get("procedure")] if part
)
)
meta = {
Expand Down Expand Up @@ -905,6 +911,148 @@ def shutdown(self) -> None: # type: ignore[override]
# as a daemon if its viewer is running, so the memory panel
# remains accessible between `hermes chat` sessions.

# ─── Host LLM bridge (fallback for plugin-side model failures) ────────

def _handle_host_llm_complete(self, params: dict[str, Any]) -> dict[str, Any]:
"""Run a fallback LLM call using the host (hermes) agent's models.

Wired into the bridge's reverse-RPC channel under the
``host.llm.complete`` method. Triggered when the plugin's
configured summary or skill-evolver model fails — instead of
bubbling the error straight up (which would stall the V7
capture / reflection / skill pipeline), we replay the prompt
through ``agent.auxiliary_client.call_llm`` so hermes' own
provider stack (including its OpenRouter / Codex / custom
endpoint resolution) handles it.

If the host LLM also fails this raises, the bridge converts
that into a JSON-RPC error, the LlmClient ``markFail``s, and
the Overview card flips red — exactly matching the spec
"if the agent's main model is also down, stop falling back
and surface red".
"""
messages = params.get("messages")
if not isinstance(messages, list) or not messages:
raise ValueError("host.llm.complete: missing messages")

# Lazy imports — these pull in heavy deps (openai client,
# credential pool, …) that we don't want to load until a
# fallback is actually requested.
try:
from agent.auxiliary_client import call_llm # type: ignore[import-not-found]
from hermes_cli.runtime_provider import ( # type: ignore[import-not-found]
resolve_runtime_provider,
)
except Exception as err:
raise RuntimeError(f"host LLM bridge unavailable: {err}") from err

# Resolve hermes' MAIN conversation provider so the fallback
# uses exactly what the user configured for chat. Walking the
# generic auxiliary auto-detect chain would otherwise depend
# on env vars (`OPENROUTER_API_KEY`, `OPENAI_API_KEY`, …) that
# often don't propagate into the bridge subprocess and would
# leave us with no working credential. Pinning to the resolved
# main runtime guarantees we hit the same endpoint the user
# already authenticated for chat.
try:
runtime = resolve_runtime_provider()
except Exception as err:
raise RuntimeError(f"could not resolve hermes main runtime: {err}") from err

main_runtime: dict[str, str] = {}
for field in ("provider", "model", "base_url", "api_key", "api_mode"):
value = runtime.get(field) if isinstance(runtime, dict) else None
if isinstance(value, str) and value.strip():
main_runtime[field] = value.strip()

normalized = [
{
"role": str(m.get("role", "user")),
"content": str(m.get("content", "")),
}
for m in messages
if isinstance(m, dict)
]
timeout_ms = params.get("timeoutMs")
timeout_s: float | None = None
if isinstance(timeout_ms, (int, float)) and timeout_ms > 0:
timeout_s = float(timeout_ms) / 1000.0

max_tokens = params.get("maxTokens")
temperature = params.get("temperature")

kwargs: dict[str, Any] = {
"messages": normalized,
# `main_runtime` makes `_resolve_auto` prefer the user's
# main conversation provider + model over the generic auto
# chain. If the user's main provider is also down,
# `call_llm` raises — which is exactly the "agent's own
# model is broken too, stop falling back" semantic we want
# (red light on Overview).
"main_runtime": main_runtime,
}
if isinstance(max_tokens, (int, float)) and max_tokens > 0:
kwargs["max_tokens"] = int(max_tokens)
if isinstance(temperature, (int, float)):
kwargs["temperature"] = float(temperature)
if timeout_s is not None:
kwargs["timeout"] = timeout_s

started = time.time()
try:
response = call_llm(**kwargs)
except Exception as err:
# Surface the original failure verbatim — the LlmClient
# will tag this as a "host fallback failed" terminal error
# and the Overview red-light path takes over.
raise RuntimeError(f"host LLM call failed: {err}") from err

# `call_llm` returns an OpenAI ChatCompletion-shaped object.
# Pluck the assistant text + token usage defensively so a
# non-standard host (e.g. Anthropic native) still produces a
# populated response.
text = ""
model = ""
usage_dict: dict[str, int] = {}
try:
choices = getattr(response, "choices", None) or response.get("choices", []) # type: ignore[union-attr]
if choices:
first = choices[0]
msg = getattr(first, "message", None) or first.get("message", {}) # type: ignore[union-attr]
content = getattr(msg, "content", None) or msg.get("content", "") # type: ignore[union-attr]
text = str(content or "")
model = (
getattr(response, "model", None)
or response.get("model", "") # type: ignore[union-attr]
or ""
)
u = getattr(response, "usage", None) or response.get("usage", None) # type: ignore[union-attr]
if u is not None:
pt = getattr(u, "prompt_tokens", None)
ct = getattr(u, "completion_tokens", None)
tt = getattr(u, "total_tokens", None)
if pt is None and isinstance(u, dict):
pt = u.get("prompt_tokens")
ct = u.get("completion_tokens")
tt = u.get("total_tokens")
if isinstance(pt, int):
usage_dict["promptTokens"] = pt
if isinstance(ct, int):
usage_dict["completionTokens"] = ct
if isinstance(tt, int):
usage_dict["totalTokens"] = tt
except Exception:
logger.debug("host.llm.complete: shape parse failed", exc_info=True)

result: dict[str, Any] = {
"text": text,
"model": str(model or ""),
"durationMs": int((time.time() - started) * 1000),
}
if usage_dict:
result["usage"] = usage_dict
return result

# ─── Internals ────────────────────────────────────────────────────────

def _open_session(self, session_id: str = "") -> None:
Expand All @@ -928,11 +1076,7 @@ def _is_transport_closed(self, err: Exception) -> bool:
if isinstance(err, BridgeError) and err.code == "transport_closed":
return True
msg = str(err).lower()
return (
"broken pipe" in msg
or "bridge closed" in msg
or "transport_closed" in msg
)
return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg

def _reconnect_bridge(self, session_id: str = "") -> None:
old_bridge = self._bridge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def __init__(
self._pending: dict[int, dict[str, Any]] = {}
self._events: list[Callable[[dict[str, Any]], None]] = []
self._logs: list[Callable[[dict[str, Any]], None]] = []
# Reverse-direction handlers: the bridge can send us a
# JSON-RPC request via `serverRequest(...)` (e.g.
# `host.llm.complete` for fallback LLM calls). Registered
# methods run on the dedicated reader thread; long-running
# work should spawn its own worker if it needs to. Each
# handler returns a JSON-serialisable value or raises to
# surface a JSON-RPC error back to the bridge.
self._host_handlers: dict[str, Callable[[dict[str, Any]], Any]] = {}
self._closed = False

node = node_binary or shutil.which("node") or "node"
Expand Down Expand Up @@ -173,6 +181,22 @@ def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None:
def on_log(self, cb: Callable[[dict[str, Any]], None]) -> None:
self._logs.append(cb)

def register_host_handler(
self,
method: str,
handler: Callable[[dict[str, Any]], Any],
) -> None:
"""Register a handler for bridge → adapter (reverse) requests.

The Node-side bridge calls these via ``stdio.serverRequest``.
Most-recent registration wins. The handler runs on the reader
thread; if it blocks for a long time it stalls every other
bridge → adapter notification, so handlers that need to do
heavy work (e.g. an LLM call) are still expected to return
within the bridge-side timeout (default 60 s).
"""
self._host_handlers[method] = handler

def close(self) -> None:
if self._closed:
return
Expand Down Expand Up @@ -225,6 +249,68 @@ def _read_loop(self) -> None:
except Exception:
logger.debug("log listener threw", exc_info=True)
continue
# Reverse-direction request: the bridge is asking the
# adapter to do something (e.g. run a fallback LLM call
# via `host.llm.complete`). Dispatch to the registered
# handler and write the response back synchronously.
method = msg.get("method")
rpc_id = msg.get("id")
if (
isinstance(method, str)
and rpc_id is not None
and "result" not in msg
and "error" not in msg
):
handler = self._host_handlers.get(method)
if handler is None:
self._send_response(
rpc_id,
error={
"code": -32601,
"message": f"method not found: {method}",
"data": {"code": "unknown_method"},
},
)
continue
params = msg.get("params") or {}
if not isinstance(params, dict):
params = {}
try:
result = handler(params)
self._send_response(rpc_id, result=result)
except Exception as err:
logger.warning("host handler %s failed: %s", method, err)
self._send_response(
rpc_id,
error={
"code": -32000,
"message": str(err) or err.__class__.__name__,
"data": {"code": "host_handler_failed"},
},
)
continue

def _send_response(
self,
rpc_id: Any,
*,
result: Any = None,
error: dict[str, Any] | None = None,
) -> None:
"""Write a JSON-RPC response for a reverse-direction request."""
if self._closed:
return
payload: dict[str, Any] = {"jsonrpc": "2.0", "id": rpc_id}
if error is not None:
payload["error"] = error
else:
payload["result"] = result
with self._lock:
try:
self._proc.stdin.write(json.dumps(payload, ensure_ascii=False) + "\n")
self._proc.stdin.flush()
except (BrokenPipeError, OSError):
pass

def _stderr_loop(self) -> None:
assert self._proc.stderr is not None
Expand Down
32 changes: 27 additions & 5 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,39 @@ export interface CoreHealth {

/**
* Per-model connectivity summary used by the viewer's Overview page.
* `available` reflects whether the client is configured (non-null);
* `lastOkAt` / `lastError` reflect the most recent **actual** call
* outcome tracked by the runtime. A green dot means "called successfully
* at least once and the last call was good"; red means "most recent
* call failed" + the error text to surface in a tooltip.
*
* The viewer renders the slot in one of four colours, picked by
* comparing the timestamps below — most-recent event wins:
*
* - **green** (`ok`) : `lastOkAt` is the latest stamp →
* primary provider answered directly.
* - **yellow** (`fallback`) : `lastFallbackAt` is the latest stamp →
* primary provider failed *but* the host LLM bridge rescued the
* call. Only ever set on the LLM / skillEvolver slots; the
* embedder has no fallback path so this stays `null`.
* - **red** (`err`) : `lastError.at` is the latest stamp →
* primary provider failed and either no fallback was configured
* or the fallback also failed. The accompanying `message` is
* surfaced verbatim on the card.
* - **idle / off** : every timestamp is `null` → the
* facade has not been called yet (idle) or no client is
* configured at all (off).
*
* `lastError` is **sticky** — it is not cleared by a later success.
* The viewer's timestamp comparison naturally promotes a fresh
* success over a stale failure, while keeping the message available
* in case a subsequent failure flips the card back to red.
*/
export interface ModelHealth {
available: boolean;
provider: string;
model: string;
lastOkAt: number | null;
/**
* Latest time the primary provider failed but the host LLM bridge
* answered successfully. Always `null` on the embedder slot.
*/
lastFallbackAt: number | null;
lastError: { at: number; message: string } | null;
}

Expand Down
Loading