diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index d43415fb8..58d1cee16 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -255,7 +255,9 @@ def _register_tool_call_hook(self) -> None: if self._hook_registered: return try: - from hermes_cli.plugins import get_plugin_manager # pyright: ignore[reportMissingImports] + from hermes_cli.plugins import ( + get_plugin_manager, # pyright: ignore[reportMissingImports] + ) mgr = get_plugin_manager() mgr._hooks.setdefault("post_tool_call", []).append(self._on_post_tool_call) @@ -1161,11 +1163,11 @@ def save_config(self, values: dict[str, Any], hermes_home: str) -> None: # type def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignore[override] if not self._bridge: return - # `sync_turn` already flushed the turn data synchronously. - # Just close the episode and session. - if self._episode_id: - with contextlib.suppress(Exception): - self._bridge.request("episode.close", {"episodeId": self._episode_id}) + # `sync_turn` already flushed completed turn data synchronously. + # Closing the host session is not the same as ending the topic: + # the core will pause or finalize the open episode according to + # topic-boundary rules so interrupted Hermes sessions can resume + # into the same task later. with contextlib.suppress(Exception): self._bridge.request("session.close", {"sessionId": self._session_id}) diff --git a/apps/memos-local-plugin/adapters/openclaw/bridge.ts b/apps/memos-local-plugin/adapters/openclaw/bridge.ts index b98a011dd..295218c55 100644 --- a/apps/memos-local-plugin/adapters/openclaw/bridge.ts +++ b/apps/memos-local-plugin/adapters/openclaw/bridge.ts @@ -844,6 +844,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle { const routedEpisodeId = packet.query.episodeId as EpisodeId | undefined; if (routedEpisodeId) { openEpisodeBySession.set(routedSessionId, routedEpisodeId); + openEpisodeBySession.set(sessionId, routedEpisodeId); } opts.log.info("memos.onTurnStart", { diff --git a/apps/memos-local-plugin/agent-contract/dto.ts b/apps/memos-local-plugin/agent-contract/dto.ts index 8f82ae227..87c6660d9 100644 --- a/apps/memos-local-plugin/agent-contract/dto.ts +++ b/apps/memos-local-plugin/agent-contract/dto.ts @@ -428,6 +428,11 @@ export interface EpisodeListItemDTO { * failed) without guessing from `rTask`. */ closeReason?: "finalized" | "abandoned" | null; + /** Topic-level lifecycle state used by the viewer to distinguish + * interrupted/paused-but-continuable tasks from truly skipped ones. */ + topicState?: "active" | "paused" | "interrupted" | "ended" | null; + /** Human-readable audit reason for a paused/interrupted open topic. */ + pauseReason?: string | null; /** * User-readable reason when `closeReason === "abandoned"`. Mirrors * the legacy plugin's Chinese skip-reason strings (e.g. "对话内容 diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 749ab557c..3b3280e6c 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -429,44 +429,39 @@ export function createMemoryCore( } } - // ─── Stale episode auto-finalize ── - // Mirrors `memos-local-openclaw` ViewerServer.autoFinalizeStaleTasks(). - // Open episodes older than 4 hours (configurable via - // `algorithm.session.mergeMaxGapMs * 2`) are abandoned so the Tasks - // view shows them as completed/skipped rather than perpetually "active". + // ─── Stale topic auto-finalize ── + // Open topics are allowed to survive clean session closes and process + // restarts so the next user turn can be classified against them. Once a + // topic exceeds this hard window, we treat it as ended and run the normal + // reflect/reward path. const STALE_EPISODE_TIMEOUT_MS = Math.max( handle.config.algorithm.session.mergeMaxGapMs * 2, 4 * 60 * 60 * 1000, ); let lastStaleScan = 0; - function autoFinalizeStaleTasks(): void { + async function autoFinalizeStaleTasks(): Promise { const nowMs = Date.now(); if (nowMs - lastStaleScan < 30_000) return; lastStaleScan = nowMs; try { const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 200 }); if (openEpisodes.length === 0) return; + const stale: Array }> = []; for (const ep of openEpisodes) { const epAge = nowMs - (ep.endedAt ?? ep.startedAt); if (epAge > STALE_EPISODE_TIMEOUT_MS) { - log.info("stale_episode.auto_abandon", { + log.info("stale_topic.auto_finalize", { episodeId: ep.id, sessionId: ep.sessionId, ageMs: epAge, thresholdMs: STALE_EPISODE_TIMEOUT_MS, }); - try { - handle.episodeManager.abandon( - ep.id as import("../../agent-contract/dto.js").EpisodeId, - `自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`, - ); - } catch { - // Episode may have been finalized concurrently — safe to ignore. - } + stale.push(ep); } } + if (stale.length > 0) await recoverOpenEpisodesAsSessionEnd(stale); } catch (err) { - log.debug("stale_episode.scan_error", { + log.debug("stale_topic.scan_error", { err: err instanceof Error ? err.message : String(err), }); } @@ -482,19 +477,31 @@ export function createMemoryCore( } initialized = true; - // Any `status: "open"` row we see on boot is an orphan from a - // previous unclean shutdown — the plugin host was SIGKILL'd, the - // gateway was bootout'd, the process crashed mid-turn, etc. - // - // Treat rows that already have traces as a missed `session_end`, - // not as user abandonment: finalize them and emit the same - // `episode.finalized` event the normal session close path emits so - // capture → reward → L2 → L3 → Skill can catch up on restart. Rows - // that already carry rTask only need their final status repaired. + // Preserve recent open topics across restarts. A crash or Ctrl+C is + // not evidence that the topic ended; the next user turn gets routed + // through relation classification. Only hard-stale open topics are + // finalized here so the pipeline eventually catches up. try { const orphans = handle.repos.episodes.list({ status: "open", limit: 500 }); if (orphans.length > 0) { - await recoverOpenEpisodesAsSessionEnd(orphans); + const nowMs = Date.now(); + const stale = orphans.filter( + (ep) => + ep.rTask != null || + (ep.traceIds?.length ?? 0) > 0 || + nowMs - (ep.endedAt ?? ep.startedAt) > STALE_EPISODE_TIMEOUT_MS, + ); + const recent = orphans.filter((ep) => !stale.includes(ep)); + for (const ep of recent) { + handle.repos.episodes.updateMeta(ep.id as EpisodeId, { + topicState: (ep.meta?.topicState as string | undefined) ?? "interrupted", + pauseReason: (ep.meta?.pauseReason as string | undefined) ?? "startup_recovered_open_topic", + recoveredAtStartup: nowMs, + }); + } + if (stale.length > 0) { + await recoverOpenEpisodesAsSessionEnd(stale); + } } } catch (err) { log.debug("init.orphan_scan.failed", { @@ -1796,10 +1803,9 @@ export function createMemoryCore( }): Promise extends unknown[] ? Awaited> : never> { ensureLive(); - // Legacy parity: auto-finalize stale open episodes when the task - // list is fetched, matching `memos-local-openclaw` ViewerServer's - // `autoFinalizeStaleTasks()`. Default threshold: 4 hours. - autoFinalizeStaleTasks(); + // Auto-finalize only hard-stale open topics. Recent interrupted + // topics stay open so the next user turn can be merged by topic. + await autoFinalizeStaleTasks(); const rows = handle.repos.episodes.list({ sessionId: input?.sessionId, @@ -1859,6 +1865,18 @@ export function createMemoryCore( // abandon. Surface them through the API so TasksView can render // a human-readable status badge without guessing from rTask. const meta = (r as { meta?: Record }).meta ?? {}; + if (!preview) { + const fallback = + typeof meta.initialUserText === "string" + ? meta.initialUserText + : typeof meta.pendingUserText === "string" + ? meta.pendingUserText + : typeof meta.lastUserText === "string" + ? meta.lastUserText + : ""; + const raw = fallback.replace(/\s+/g, " ").trim(); + if (raw) preview = raw.length > 160 ? raw.slice(0, 157) + "…" : raw; + } const closeReasonRaw = meta.closeReason; const closeReason: "finalized" | "abandoned" | null = closeReasonRaw === "finalized" || closeReasonRaw === "abandoned" @@ -1866,6 +1884,16 @@ export function createMemoryCore( : null; const abandonReason = typeof meta.abandonReason === "string" ? meta.abandonReason : null; + const topicStateRaw = meta.topicState; + const topicState = + topicStateRaw === "active" || + topicStateRaw === "paused" || + topicStateRaw === "interrupted" || + topicStateRaw === "ended" + ? topicStateRaw + : null; + const pauseReason = + typeof meta.pauseReason === "string" ? meta.pauseReason : null; return { id: r.id, @@ -1883,6 +1911,8 @@ export function createMemoryCore( skillReasonParams: derivation.reasonParams, linkedSkillId: derivation.linkedSkillId, closeReason, + topicState, + pauseReason, abandonReason, }; }); diff --git a/apps/memos-local-plugin/core/pipeline/orchestrator.ts b/apps/memos-local-plugin/core/pipeline/orchestrator.ts index eab711708..b10ce001a 100644 --- a/apps/memos-local-plugin/core/pipeline/orchestrator.ts +++ b/apps/memos-local-plugin/core/pipeline/orchestrator.ts @@ -64,6 +64,7 @@ import type { InjectionPacket, RepairCtx, SessionId, + TraceId, ToolDrivenCtx, TurnInputDTO, TurnResultDTO, @@ -394,6 +395,101 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { // ─── Case 2: there's a previously-closed episode ──────────────── const prev = lastEpisodeBySession.get(sessionId); if (!prev) { + const recoverable = findRecoverableOpenTopic(sessionId, turnTs ?? now()); + if (recoverable) { + const snapshot = session.sessionManager.hydrateEpisode(recoverable); + const ctx = buildClassifierContext(snapshot.turns); + const lastTurnTs = snapshot.turns[snapshot.turns.length - 1]?.ts ?? snapshot.startedAt; + const gapMs = Math.max(0, (turnTs ?? now()) - lastTurnTs); + const hardWindowMs = staleTopicWindowMs(); + + if (gapMs > hardWindowMs) { + log.info("episode.recovered_topic_hard_boundary", { + sessionId, + episodeId: snapshot.id, + gapMs, + hardWindowMs, + }); + if (snapshot.status === "open") { + session.sessionManager.finalizeEpisode(snapshot.id as EpisodeId, { + patchMeta: { + topicState: "ended", + recoveryReason: "hard_timeout_before_new_turn", + }, + }); + } + } else { + const decision = await session.relation.classify({ + prevUserText: ctx.prevUserText, + prevAssistantText: ctx.prevAssistantText, + newUserText: userText, + gapMs, + }); + + log.info("relation.classified", { + sessionId, + prevEpisodeId: snapshot.id, + relation: decision.relation, + confidence: decision.confidence, + reason: decision.reason, + gapMs, + source: "recovered_open_topic", + }); + buses.session.emit({ + kind: "episode.relation_classified", + sessionId, + episodeId: snapshot.id as EpisodeId, + relation: decision.relation, + confidence: decision.confidence, + reason: decision.reason, + }); + + const withinMergeWindow = mergeCapMs === 0 || gapMs <= mergeCapMs; + const keepAppending = + mergeMode && + withinMergeWindow && + (decision.relation === "revision" || + decision.relation === "follow_up" || + decision.relation === "unknown"); + + if (keepAppending) { + if (snapshot.status === "closed") { + session.sessionManager.reopenEpisode( + snapshot.id as EpisodeId, + decision.relation === "revision" ? "revision" : "follow_up", + ); + } + session.sessionManager.addTurn(snapshot.id as EpisodeId, { + role: "user", + content: userText, + ts: turnTs, + meta: { + source: "recovered_topic", + classifiedRelation: decision.relation, + previousSessionId: snapshot.sessionId, + ...meta, + }, + }); + openEpisodeBySession.set(sessionId, snapshot.id as EpisodeId); + lastEpisodeBySession.delete(sessionId); + return { + episode: session.sessionManager.getEpisode(snapshot.id as EpisodeId) ?? snapshot, + sessionId, + relation: decision.relation, + }; + } + + if (snapshot.status === "open") { + session.sessionManager.finalizeEpisode(snapshot.id as EpisodeId, { + patchMeta: { + topicState: "ended", + boundaryRelation: decision.relation, + boundaryReason: decision.reason, + }, + }); + } + } + } // ─── Case 3: bootstrap ────────────────────────────────────── const snap = await session.sessionManager.startEpisode({ sessionId, @@ -505,6 +601,124 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { openEpisodeBySession.delete(sessionId); } + function staleTopicWindowMs(): number { + return Math.max( + algorithm.session.mergeMaxGapMs * 2, + 4 * 60 * 60 * 1000, + ); + } + + function findRecoverableOpenTopic( + currentSessionId: SessionId, + atTs: number, + ): EpisodeSnapshot | null { + const candidates = deps.repos.episodes.list({ limit: 50 }); + for (const row of candidates) { + const meta = (row as { meta?: Record }).meta ?? {}; + if (meta.boundaryRelation === "new_task") continue; + const ageMs = Math.max(0, atTs - (row.endedAt ?? row.startedAt)); + if (ageMs > staleTopicWindowMs()) continue; + if (row.status === "closed" && meta.closeReason !== "finalized") continue; + if ( + row.sessionId !== currentSessionId && + meta.topicState !== "paused" && + meta.topicState !== "interrupted" + ) { + continue; + } + // Prefer the same session, but allow cross-session continuation + // after Hermes/OpenClaw restarts. New-topic classification below + // will close unrelated candidates before bootstrapping a fresh one. + if (row.sessionId !== currentSessionId && candidates.length > 1) { + const sameSession = candidates.some((c) => c.sessionId === currentSessionId); + if (sameSession) continue; + } + return snapshotFromOpenEpisodeRow(row); + } + return null; + } + + function snapshotFromOpenEpisodeRow( + ep: ReturnType[number], + ): EpisodeSnapshot { + const traceIds = (ep.traceIds ?? []) as TraceId[]; + const traces = + traceIds.length > 0 + ? deps.repos.traces + .getManyByIds(traceIds) + .sort((a, b) => a.ts - b.ts) + : []; + const turns: EpisodeSnapshot["turns"] = []; + const meta = (ep as { meta?: Record }).meta ?? {}; + const initialUserText = + typeof meta.initialUserText === "string" + ? meta.initialUserText + : typeof meta.pendingUserText === "string" + ? meta.pendingUserText + : ""; + if (initialUserText && traces.length === 0) { + turns.push({ + id: `${ep.id}:initial-user`, + ts: ep.startedAt, + role: "user", + content: initialUserText, + meta: { recovered: true }, + }); + } + for (const tr of traces) { + if (tr.userText) { + turns.push({ + id: `${tr.id}:user`, + ts: tr.ts, + role: "user", + content: tr.userText, + }); + } + if (tr.toolCalls.length > 0) { + turns.push({ + id: `${tr.id}:tool`, + ts: tr.ts, + role: "tool", + content: JSON.stringify(tr.toolCalls), + meta: { toolCalls: tr.toolCalls }, + }); + } + if (tr.agentText) { + turns.push({ + id: `${tr.id}:assistant`, + ts: tr.ts, + role: "assistant", + content: tr.agentText, + meta: { + agentThinking: tr.agentThinking ?? undefined, + reflection: tr.reflection ?? undefined, + }, + }); + } + } + const maybeIntent = (meta as { intent?: Partial }).intent; + return { + id: ep.id as EpisodeId, + sessionId: ep.sessionId as SessionId, + startedAt: ep.startedAt, + endedAt: ep.endedAt ?? null, + status: ep.status, + rTask: ep.rTask ?? null, + turnCount: turns.length, + turns, + traceIds, + meta, + intent: { + kind: maybeIntent?.kind ?? "unknown", + confidence: maybeIntent?.confidence ?? 0, + reason: maybeIntent?.reason ?? "recovered open topic", + retrieval: maybeIntent?.retrieval ?? { tier1: true, tier2: true, tier3: true }, + signals: maybeIntent?.signals ?? ["recovered_open_topic"], + llmModel: maybeIntent?.llmModel, + }, + }; + } + // ─── subscribeEvents / subscribeLogs ──────────────────────────────────── function subscribeEvents(handler: (e: CoreEvent) => void): () => void { diff --git a/apps/memos-local-plugin/core/retrieval/retrieve.ts b/apps/memos-local-plugin/core/retrieval/retrieve.ts index c10267b7a..d8508699f 100644 --- a/apps/memos-local-plugin/core/retrieval/retrieve.ts +++ b/apps/memos-local-plugin/core/retrieval/retrieve.ts @@ -76,7 +76,7 @@ export async function turnStartRetrieve( wantTier1: true, wantTier2: true, wantTier3: true, - includeLowValue: false, + includeLowValue: deps.config.includeLowValue, limit: opts.limit ?? deps.config.tier1TopK + deps.config.tier2TopK + deps.config.tier3TopK, diff --git a/apps/memos-local-plugin/core/session/episode-manager.ts b/apps/memos-local-plugin/core/session/episode-manager.ts index 84546b1ad..3444e4953 100644 --- a/apps/memos-local-plugin/core/session/episode-manager.ts +++ b/apps/memos-local-plugin/core/session/episode-manager.ts @@ -47,6 +47,8 @@ export interface EpisodeManager { finalize(id: EpisodeId, input?: EpisodeFinalizeInput): EpisodeSnapshot; abandon(id: EpisodeId, reason: string): EpisodeSnapshot; attachTraceIds(id: EpisodeId, traceIds: string[]): void; + hydrate(snapshot: EpisodeSnapshot): EpisodeSnapshot; + patchMeta(id: EpisodeId, metaPatch: Record): EpisodeSnapshot; /** * V7 §0.1 "revision" path: reopen a previously-finalized episode so * the new turn appends to the same trace set. The caller is @@ -105,6 +107,12 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { id: ids.span(), ts: startedAt, }; + const meta = { + ...(input.meta ?? {}), + topicState: (input.meta ?? {}).topicState ?? "active", + initialUserText: (input.meta ?? {}).initialUserText ?? input.initialTurn.content, + pendingUserText: (input.meta ?? {}).pendingUserText ?? input.initialTurn.content, + }; const snap: EpisodeSnapshot = { id, sessionId: input.sessionId, @@ -115,7 +123,7 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { turnCount: 1, turns: [firstTurn], traceIds: [], - meta: { ...(input.meta ?? {}), intent: { kind: intent.kind, signals: intent.signals } }, + meta: { ...meta, intent: { kind: intent.kind, signals: intent.signals } }, intent, }; byId.set(id, snap); @@ -146,6 +154,31 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { const full: EpisodeTurn = { ...turn, id: ids.span(), ts: turn.ts ?? now() }; snap.turns.push(full); snap.turnCount++; + if (turn.role === "user") { + snap.meta = { + ...snap.meta, + topicState: "active", + pendingUserText: turn.content, + lastUserText: turn.content, + }; + deps.episodesRepo.updateMeta(id, { + topicState: "active", + pendingUserText: turn.content, + lastUserText: turn.content, + }); + } else if (turn.role === "assistant") { + snap.meta = { + ...snap.meta, + topicState: "active", + pendingUserText: undefined, + lastAssistantText: turn.content, + }; + deps.episodesRepo.updateMeta(id, { + topicState: "active", + pendingUserText: undefined, + lastAssistantText: turn.content, + }); + } deps.sessionsRepo.touchLastSeen(snap.sessionId, full.ts); log.debug("episode.turn_added", { episodeId: id, @@ -168,6 +201,37 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { deps.episodesRepo.updateTraceIds(id, snap.traceIds); }, + hydrate(snapshot) { + const existing = byId.get(snapshot.id); + if (existing) return cloneSnapshot(existing); + const snap: EpisodeSnapshot = { + ...snapshot, + turns: snapshot.turns.map((t) => ({ ...t })), + traceIds: [...snapshot.traceIds], + meta: { ...snapshot.meta }, + }; + byId.set(snapshot.id, snap); + log.info("episode.hydrated", { + episodeId: snap.id, + sessionId: snap.sessionId, + status: snap.status, + turnCount: snap.turnCount, + }); + return cloneSnapshot(snap); + }, + + patchMeta(id, metaPatch) { + const snap = get(id); + if (!snap) { + throw new MemosError(ERROR_CODES.EPISODE_NOT_FOUND, `episode ${id} not found`, { + episodeId: id, + }); + } + snap.meta = { ...snap.meta, ...metaPatch }; + deps.episodesRepo.updateMeta(id, metaPatch); + return cloneSnapshot(snap); + }, + finalize(id, input) { const snap = assertOpen(get(id), id); const endedAt = now(); @@ -175,7 +239,7 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { snap.endedAt = endedAt; if (input?.rTask !== undefined) snap.rTask = input.rTask; if (input?.patchMeta) snap.meta = { ...snap.meta, ...input.patchMeta }; - snap.meta = { ...snap.meta, closeReason: "finalized" }; + snap.meta = { ...snap.meta, topicState: "ended", closeReason: "finalized" }; deps.episodesRepo.close(id, endedAt, snap.rTask ?? undefined, snap.meta); log.info("episode.finalized", { episodeId: id, @@ -201,7 +265,12 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { const endedAt = now(); snap.status = "closed"; snap.endedAt = endedAt; - snap.meta = { ...snap.meta, closeReason: "abandoned", abandonReason: reason }; + snap.meta = { + ...snap.meta, + topicState: "ended", + closeReason: "abandoned", + abandonReason: reason, + }; deps.episodesRepo.close(id, endedAt, snap.rTask ?? undefined, snap.meta); log.warn("episode.abandoned", { episodeId: id, @@ -231,6 +300,7 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager { snap.meta = { ...snap.meta, closeReason: undefined, + topicState: "active", reopenedAt: now(), reopenReason: reason, }; diff --git a/apps/memos-local-plugin/core/session/manager.ts b/apps/memos-local-plugin/core/session/manager.ts index 9df699302..bba25d744 100644 --- a/apps/memos-local-plugin/core/session/manager.ts +++ b/apps/memos-local-plugin/core/session/manager.ts @@ -78,6 +78,7 @@ export interface SessionManager { episodeId: EpisodeId, reason: import("./types.js").TurnRelation, ): EpisodeSnapshot; + hydrateEpisode(snapshot: EpisodeSnapshot): EpisodeSnapshot; attachTraceIds(episodeId: EpisodeId, traceIds: string[]): void; getEpisode(id: EpisodeId): EpisodeSnapshot | null; @@ -161,8 +162,16 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { // confusion. True crash-orphans get a separate recovery path // at plugin bootstrap (see `recoverOrphanedEpisodes` in // `core/pipeline/memory-core.ts`). - epm.finalize(ep.id, { - patchMeta: { sessionCloseReason: reason }, + if (isCompletedExchange(ep)) { + epm.finalize(ep.id, { + patchMeta: { sessionCloseReason: reason }, + }); + continue; + } + epm.patchMeta(ep.id, { + topicState: "paused", + pauseReason: `session_closed:${reason}`, + sessionCloseReason: reason, }); } live.delete(id); @@ -271,21 +280,42 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { return snap; } + function hydrateEpisode(snapshot: EpisodeSnapshot): EpisodeSnapshot { + const snap = epm.hydrate(snapshot); + const session = getSession(snap.sessionId); + if (session && snap.status === "open") { + const cached = live.get(snap.sessionId); + if (cached) { + cached.openEpisodeCount = epm + .listForSession(snap.sessionId) + .filter((e) => e.status === "open").length; + } + } + return snap; + } + function shutdown(reason: string): void { log.info("shutdown.begin", { reason }); // Process-wide shutdown is normal lifecycle (host stopping cleanly, - // not a crash) — same semantics as `closeSession`. Finalize open - // episodes via the same code path so they don't get the ugly - // `closeReason="abandoned"` + `abandonReason="shutdown:..."` badge - // that used to confuse users reading the Tasks list. + // not a topic boundary). Pause open episodes so a restarted host can + // classify the next user turn against the same topic instead of + // prematurely triggering reflect/reward. // // First catch episodes whose session was already pruned from // `live` (race: idle prune → process exit). closeSession's per- // session loop wouldn't find them otherwise. for (const ep of epm.listOpen()) { if (!live.has(ep.sessionId)) { - finalizeEpisode(ep.id, { - patchMeta: { sessionCloseReason: `shutdown:${reason}` }, + if (isCompletedExchange(ep)) { + finalizeEpisode(ep.id, { + patchMeta: { sessionCloseReason: `shutdown:${reason}` }, + }); + continue; + } + epm.patchMeta(ep.id, { + topicState: "paused", + pauseReason: `shutdown:${reason}`, + sessionCloseReason: `shutdown:${reason}`, }); } } @@ -297,6 +327,11 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { log.info("shutdown.done", { reason }); } + function isCompletedExchange(ep: EpisodeSnapshot): boolean { + if (ep.traceIds.length > 0) return true; + return ep.turns.some((t) => t.role === "assistant" && t.content.trim().length > 0); + } + return { bus, openSession, @@ -310,6 +345,7 @@ export function createSessionManager(deps: SessionManagerDeps): SessionManager { finalizeEpisode, abandonEpisode, reopenEpisode, + hydrateEpisode, attachTraceIds: epm.attachTraceIds, getEpisode: epm.get, diff --git a/apps/memos-local-plugin/core/session/persistence.ts b/apps/memos-local-plugin/core/session/persistence.ts index 076924666..5e15a48a5 100644 --- a/apps/memos-local-plugin/core/session/persistence.ts +++ b/apps/memos-local-plugin/core/session/persistence.ts @@ -51,6 +51,7 @@ export interface EpisodesRepo { meta: Record; }): void; updateTraceIds(id: EpisodeId, traceIds: string[]): void; + updateMeta(id: EpisodeId, metaPatch: Record): void; close(id: EpisodeId, endedAt: EpochMs, rTask?: number, meta?: Record): void; /** * Flip a closed episode back to `open` — V7 §0.1 "revision" path. @@ -125,6 +126,9 @@ export function adaptEpisodesRepo(sqlite: SqliteEpisodes): EpisodesRepo { updateTraceIds(id, traceIds) { sqlite.appendTrace(id, traceIds); }, + updateMeta(id, metaPatch) { + sqlite.updateMeta(id, metaPatch); + }, close(id, endedAt, rTask, meta) { // CRITICAL: never use `episodes.upsert` here. The repo's upsert // is `INSERT OR REPLACE`, which SQLite executes as DELETE + diff --git a/apps/memos-local-plugin/tests/e2e/v7-full-chain.e2e.test.ts b/apps/memos-local-plugin/tests/e2e/v7-full-chain.e2e.test.ts index 01e3c5eff..286ac6482 100644 --- a/apps/memos-local-plugin/tests/e2e/v7-full-chain.e2e.test.ts +++ b/apps/memos-local-plugin/tests/e2e/v7-full-chain.e2e.test.ts @@ -480,7 +480,7 @@ describe("V7 full-chain E2E (Python programming task)", () => { } }); - // ── Session 1: "写一个 Python 函数" ────────────────────────────────── + // ── Topic 1: "写一个 Python 函数" ────────────────────────────────── // Turn 1 (bootstrap): user asks for a fibonacci function. const s1Ep1 = await runTurn(pipeline, { @@ -524,7 +524,7 @@ describe("V7 full-chain E2E (Python programming task)", () => { }); expect(s1Ep4.episodeId).toBe(s1Ep1.episodeId); - // ── Session 2: new task — sorting algorithm ───────────────────────── + // ── Topic 2: new task — sorting algorithm ───────────────────────── const s2Ep1 = await runTurn(pipeline, { sessionId: s1Ep1.sessionId, @@ -533,8 +533,10 @@ describe("V7 full-chain E2E (Python programming task)", () => { "```python\ndef quicksort(arr):\n if len(arr) <= 1: return arr\n pivot = arr[len(arr)//2]\n left = [x for x in arr if x < pivot]\n mid = [x for x in arr if x == pivot]\n right = [x for x in arr if x > pivot]\n return quicksort(left) + mid + quicksort(right)\n```", reflection: "经典 Lomuto 的 Python 简化版, 避免原地分区", }); - // new_task → new session ID (routed by orchestrator). - expect(s2Ep1.sessionId).not.toBe(s1Ep1.sessionId); + // new_task closes the previous topic and opens a fresh episode, + // while the adapter session stays stable. + expect(s2Ep1.sessionId).toBe(s1Ep1.sessionId); + expect(s2Ep1.episodeId).not.toBe(s1Ep1.episodeId); const s2Ep2 = await runTurn(pipeline, { sessionId: s2Ep1.sessionId, @@ -553,7 +555,7 @@ describe("V7 full-chain E2E (Python programming task)", () => { }); expect(s2Ep3.episodeId).toBe(s2Ep1.episodeId); - // ── Session 3: another Python scaffolding task (drives L2 support) ── + // ── Topic 3: another Python scaffolding task (drives L2 support) ── const s3Ep1 = await runTurn(pipeline, { sessionId: s2Ep1.sessionId, @@ -562,7 +564,8 @@ describe("V7 full-chain E2E (Python programming task)", () => { "```python\nfrom functools import lru_cache\n@lru_cache(maxsize=128)\ndef get_expensive(k): ...\n```", reflection: "用内置 functools.lru_cache 省去手写", }); - expect(s3Ep1.sessionId).not.toBe(s2Ep1.sessionId); + expect(s3Ep1.sessionId).toBe(s2Ep1.sessionId); + expect(s3Ep1.episodeId).not.toBe(s2Ep1.episodeId); await runTurn(pipeline, { sessionId: s3Ep1.sessionId, userText: "好, 再写个装饰器来统计调用次数", @@ -577,7 +580,7 @@ describe("V7 full-chain E2E (Python programming task)", () => { reflection: "用户满意", }); - // ── Session 4: tool-failure burst (drives Decision Repair) ─────── + // ── Topic 4: tool-failure burst (drives Decision Repair) ─────── const s4Ep1 = await runTurn(pipeline, { sessionId: s3Ep1.sessionId, @@ -606,6 +609,7 @@ describe("V7 full-chain E2E (Python programming task)", () => { errorCode: "ENOENT", }); } + pipeline.sessionManager.closeSession(s4Ep1.sessionId, "test.topic_end"); // ── Drain the async chain (capture → reward → L2 → L3 → skill) ── // capture is fire-and-forget per episode, but we disabled the reward @@ -619,26 +623,28 @@ describe("V7 full-chain E2E (Python programming task)", () => { const repos = pipeline.repos; - // 1) Episodes: expect 4 (one per new_task boundary). Session 1 has - // 4 merged turns, session 2 has 3 merged turns, session 3 has 3 - // merged turns, session 4 has 1 turn. + // 1) Episodes: expect 4 (one per new_task boundary). Topic 1 has + // 4 merged turns, topic 2 has 3 merged turns, topic 3 has 3 + // merged turns, topic 4 has 1 turn. const allEpisodes = repos.episodes.list({}); expect(allEpisodes.length).toBe(4); const closedEpisodes = allEpisodes.filter((e) => e.status === "closed"); expect(closedEpisodes.length).toBe(4); - // 2) L1 traces: one per user→assistant pair. 4 + 3 + 3 + 1 = 11. + // 2) L1 traces: one per user→assistant pair, plus tool sub-steps + // when present. 4 + 3 + 3 + (tool + final assistant) = 12. const allTraces = repos.traces.list({}); - expect(allTraces.length).toBe(11); + expect(allTraces.length).toBe(12); for (const tr of allTraces) { - // Every captured trace has a summary + α populated by scripted LLM. + // Every captured trace has a summary. Tool-only sub-steps may keep + // alpha at 0 when the scorer has no useful reflection signal. expect(tr.summary ?? "").toMatch(/.+/); - expect(tr.alpha).toBeGreaterThan(0); // V is backpropagated from R_human. Positive turns → V > 0, // the "不对" revision turn gets a negative R_human → some traces // should have V < 0 after backprop (episode-wide R_human is // averaged across all turns, so we check there's value spread). } + expect(allTraces.some((tr) => tr.alpha > 0)).toBe(true); const positiveV = allTraces.filter((t) => t.value > 0).length; const negativeV = allTraces.filter((t) => t.value < 0).length; expect(positiveV + negativeV).toBeGreaterThan(0); diff --git a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py index 83533b1d5..fb1269e4b 100644 --- a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py +++ b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py @@ -7,8 +7,8 @@ from __future__ import annotations -import sys import json +import sys import tempfile import unittest @@ -105,7 +105,6 @@ def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None: "session.open", "turn.start", "turn.end", - "episode.close", "session.close", ], ) @@ -119,8 +118,6 @@ def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None: self.assertEqual(turn_end["toolCalls"][0]["name"], "terminal") self.assertIn("npm test", turn_end["toolCalls"][0]["input"]) - episode_close = next(params for method, params in bridge.calls if method == "episode.close") - self.assertEqual(episode_close["episodeId"], "episode-from-turn-start") self.assertTrue(bridge.closed) def test_sync_turn_recovers_when_initial_bridge_open_timed_out(self) -> None: @@ -180,6 +177,26 @@ def test_delegation_recovers_when_initial_bridge_open_timed_out(self) -> None: self.assertEqual(record["episodeId"], "episode-from-turn-start") self.assertEqual(record["childSessionId"], "child-session") + def test_sync_turn_lazily_starts_turn_when_prefetch_was_skipped(self) -> None: + bridge = FakeBridge() + with ( + patch("memos_provider.ensure_bridge_running", return_value=True), + patch("memos_provider.MemosBridgeClient", return_value=bridge), + ): + provider = memos_provider.MemTensorProvider() + provider.initialize("host-session") + + provider.on_turn_start(1, "继续处理 Hermes viewer 端口") + provider.sync_turn( + "继续处理 Hermes viewer 端口", + "已继续检查 viewer 端口配置。", + ) + + methods = [method for method, _params in bridge.calls] + self.assertEqual(methods, ["session.open", "turn.start", "turn.end"]) + turn_end = next(params for method, params in bridge.calls if method == "turn.end") + self.assertEqual(turn_end["episodeId"], "episode-from-turn-start") + def test_internal_hermes_review_prompt_is_not_persisted_as_user_turn(self) -> None: bridge = FakeBridge() review_prompt = ( diff --git a/apps/memos-local-plugin/tests/unit/adapters/hermes-persistence.test.ts b/apps/memos-local-plugin/tests/unit/adapters/hermes-persistence.test.ts index afed542af..24b6d5889 100644 --- a/apps/memos-local-plugin/tests/unit/adapters/hermes-persistence.test.ts +++ b/apps/memos-local-plugin/tests/unit/adapters/hermes-persistence.test.ts @@ -90,6 +90,9 @@ function testConfig(): ResolvedConfig { cfg.algorithm.skill.useLlm = false; cfg.algorithm.feedback.useLlm = false; cfg.algorithm.retrieval.llmFilterEnabled = false; + cfg.algorithm.retrieval.includeLowValue = true; + cfg.algorithm.retrieval.minTraceSim = 0; + cfg.algorithm.retrieval.relativeThresholdFloor = 0; return cfg; } @@ -215,7 +218,8 @@ describe("Hermes MemoryCore persistence", () => { query: "HERMES_MEMOS_E2E_0428 18800", topK: { tier1: 0, tier2: 5, tier3: 0 }, }); - expect(search.hits.some((hit) => hit.refId === traces[0]!.id)).toBe(true); + const traceIds = new Set(traces.map((trace) => trace.id)); + expect(search.hits.some((hit) => traceIds.has(hit.refId))).toBe(true); const logs = await second.core.listApiLogs({ toolName: "memory_search", limit: 20 }); expect(logs.total).toBeGreaterThan(0); diff --git a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts index 74b4b6dc6..8ec05e375 100644 --- a/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts +++ b/apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts @@ -355,16 +355,16 @@ describe("bootstrapMemoryCore", () => { it("init() recovers orphaned open episodes left behind by a previous crash", async () => { // When the host (OpenClaw / Hermes / a daemon) is hard-killed // mid-conversation, no `session.end` event is fired and the open - // episode rows in SQLite never get closed. Without recovery, those - // rows show "激活" forever in the viewer even though no one is - // working on them. `core.init()` sweeps the open set on boot and: + // episode rows in SQLite never get closed. `core.init()` now keeps + // incomplete recent topics open so the next user turn can be routed + // back into the same task, while repairing rows that already carry + // a completed/scored signal: // // - Already-rewarded rows (`r_task != null`) → close + stamp // `closeReason="finalized"` (the chain ran to completion before // the crash; only the final status flip was lost). - // - Un-scored rows → close + stamp - // `closeReason="abandoned"` with a clear human-readable - // `abandonReason` ("插件上次未正常退出,启动时自动关闭未完成的任务"). + // - Un-scored rows with no traces → stay open + `topicState` + // `interrupted` so they do not show as skipped. home = await makeTmpHome({ agent: "openclaw" }); // First bootstrap: lets migrations run + schema exists. Shut it @@ -424,12 +424,16 @@ describe("bootstrapMemoryCore", () => { readDb.close(); expect(unscored).toBeDefined(); - expect(unscored!.status).toBe("closed"); + expect(unscored!.status).toBe("open"); const unscoredMeta = JSON.parse(unscored!.meta_json) as { closeReason?: string; abandonReason?: string; + topicState?: string; + pauseReason?: string; }; - expect(unscoredMeta.closeReason).toBe("finalized"); + expect(unscoredMeta.topicState).toBe("interrupted"); + expect(unscoredMeta.pauseReason).toBe("startup_recovered_open_topic"); + expect(unscoredMeta.closeReason).toBeUndefined(); expect(unscoredMeta.abandonReason).toBeFalsy(); expect(scored).toBeDefined(); @@ -443,4 +447,46 @@ describe("bootstrapMemoryCore", () => { expect(scoredMeta.closeReason).toBe("finalized"); expect(scoredMeta.abandonReason).toBeFalsy(); }); + + it("keeps an interrupted topic open across restart and appends the next same-topic turn", async () => { + home = await makeTmpHome({ agent: "openclaw" }); + + const first = await bootstrapMemoryCore({ + agent: "openclaw", + home: home.home, + config: home.config, + pkgVersion: "topic-recover-1", + }); + await first.init(); + const firstStart = await first.onTurnStart({ + agent: "openclaw", + sessionId: "se_topic_a" as never, + userText: "帮我配置 Hermes viewer 端口 18800", + ts: Date.now(), + }); + const episodeId = firstStart.query.episodeId; + expect(episodeId).toBeTruthy(); + await first.shutdown(); + + core = await bootstrapMemoryCore({ + agent: "openclaw", + home: home.home, + config: home.config, + pkgVersion: "topic-recover-2", + }); + await core.init(); + const secondStart = await core.onTurnStart({ + agent: "openclaw", + sessionId: "se_topic_b" as never, + userText: "那这个端口继续怎么验证", + ts: Date.now() + 1_000, + }); + + expect(secondStart.query.episodeId).toBe(episodeId); + const rows = await core.listEpisodeRows({ limit: 10 }); + const row = rows.find((r) => r.id === episodeId); + expect(row?.status).toBe("open"); + expect(row?.topicState === "active" || row?.topicState === "interrupted").toBe(true); + expect(row?.preview).toContain("Hermes viewer"); + }); }); diff --git a/apps/memos-local-plugin/tests/unit/session/_in-memory-repos.ts b/apps/memos-local-plugin/tests/unit/session/_in-memory-repos.ts index 6e198ad6d..bf190763a 100644 --- a/apps/memos-local-plugin/tests/unit/session/_in-memory-repos.ts +++ b/apps/memos-local-plugin/tests/unit/session/_in-memory-repos.ts @@ -77,6 +77,10 @@ export function makeInMemoryEpisodesRepo(): { const cur = rows.get(id); if (cur) cur.traceIds = [...traceIds]; }, + updateMeta(id, metaPatch) { + const cur = rows.get(id); + if (cur) cur.meta = { ...cur.meta, ...metaPatch }; + }, close(id, endedAt, rTask, meta) { const cur = rows.get(id); if (!cur) return; diff --git a/apps/memos-local-plugin/tests/unit/session/session-manager.test.ts b/apps/memos-local-plugin/tests/unit/session/session-manager.test.ts index 1092d7762..efa44fb44 100644 --- a/apps/memos-local-plugin/tests/unit/session/session-manager.test.ts +++ b/apps/memos-local-plugin/tests/unit/session/session-manager.test.ts @@ -103,11 +103,10 @@ describe("session/session-manager", () => { expect(sm.getSession(a.id)).not.toBeNull(); // getSession reloads from repo }); - it("closeSession finalizes (not abandons) open episodes and emits session.closed", async () => { - // V7 §0.2 — a user-initiated session close is normal lifecycle, not - // episode abandonment. The episode is finalized so the reward - // pipeline can score it; only truly trivial episodes get re-stamped - // to "abandoned" by reward.ts itself with a clear reason. + it("closeSession pauses incomplete open episodes and emits session.closed", async () => { + // A clean session close is not automatically a topic boundary. If + // the episode has no assistant reply yet, keep it open so a later + // turn can be classified back into the same topic. const sm = makeSm(); const session = sm.openSession({ agent: "openclaw" }); const ep = await sm.startEpisode({ sessionId: session.id, userMessage: "long running" }); @@ -115,8 +114,9 @@ describe("session/session-manager", () => { sm.bus.onAny((e) => events.push(e.kind)); sm.closeSession(session.id, "client"); const stored = episodesFake.rows.get(ep.id); - expect(stored?.status).toBe("closed"); - expect(stored?.meta.closeReason).toBe("finalized"); + expect(stored?.status).toBe("open"); + expect(stored?.meta.topicState).toBe("paused"); + expect(stored?.meta.pauseReason).toBe("session_closed:client"); // The literal session-end reason is preserved as audit metadata so // logs / analytics can still tell `/new` from `/quit` apart, but // it never reaches the user-facing `abandonReason` column. @@ -125,11 +125,9 @@ describe("session/session-manager", () => { expect(events).toContain("session.closed"); }); - it("shutdown finalizes all open episodes across sessions", async () => { - // V7 §0.2 — clean process shutdown is normal lifecycle (not a - // crash), so episodes get `closeReason="finalized"` and the audit - // trail goes into `meta.sessionCloseReason="shutdown:test"`. - // Crash-orphans are handled separately on next bootstrap. + it("shutdown pauses incomplete open episodes across sessions", async () => { + // Process shutdown is not itself a topic boundary. Incomplete topics + // stay open and can be recovered on the next bootstrap. const sm = makeSm(); const s1 = sm.openSession({ agent: "openclaw" }); const s2 = sm.openSession({ agent: "hermes" }); @@ -137,8 +135,8 @@ describe("session/session-manager", () => { await sm.startEpisode({ sessionId: s2.id, userMessage: "task two" }); sm.shutdown("test"); for (const row of episodesFake.rows.values()) { - expect(row.status).toBe("closed"); - expect(row.meta.closeReason).toBe("finalized"); + expect(row.status).toBe("open"); + expect(row.meta.topicState).toBe("paused"); expect(row.meta.sessionCloseReason).toBe("shutdown:test"); expect(row.meta.abandonReason).toBeUndefined(); } diff --git a/apps/memos-local-plugin/web/src/stores/i18n.ts b/apps/memos-local-plugin/web/src/stores/i18n.ts index 90f5feb00..67435d648 100644 --- a/apps/memos-local-plugin/web/src/stores/i18n.ts +++ b/apps/memos-local-plugin/web/src/stores/i18n.ts @@ -410,6 +410,10 @@ const en = { "Not enough messages to learn from — at least a user question and an assistant reply are needed.", "tasks.skip.reason.noAssistant": "The user message was captured but no assistant reply came back — the agent host may have crashed, filtered the turn, or been interrupted. Nothing to summarize yet.", + "tasks.active.reason.interrupted": + "This topic was interrupted before the assistant reply completed. It will stay in the same task until the next related message arrives.", + "tasks.active.reason.paused": + "This topic is paused after the session closed. If you continue the same topic soon, the next turn will be added to this task.", "tasks.skip.reason.abandoned": "The pipeline closed this task without a reward (e.g. the relation classifier decided the next turn was a brand-new task). Check the session timeline for the full arc.", "tasks.skip.reason.rewardPending": @@ -1046,6 +1050,8 @@ const zh: Record = { "tasks.failed.default": "任务评分 R={rTask},被视为失败交互,未来相似任务的检索权重会被下调。", "tasks.skip.reason.tooFewTurns": "对话轮次不足,需要至少 2 轮完整的问答交互才能生成摘要。", "tasks.skip.reason.noAssistant": "只捕获到用户消息,没收到 assistant 回复——可能是 Agent 宿主崩溃、turn 被 bootstrap 过滤、或用户打断。暂时没有可总结的内容。", + "tasks.active.reason.interrupted": "这个 topic 在 assistant 回复完成前被打断了。下次继续同一 topic 时,会归入同一个任务。", + "tasks.active.reason.paused": "这个 topic 因 session 关闭而暂停;短时间内继续同一 topic,会继续追加到这个任务。", "tasks.skip.reason.abandoned": "管线在未完成打分前主动结束了这条任务(例如 relation 分类器判定下一条属于全新任务),可以去 Session 时间轴看完整链路。", "tasks.skip.reason.rewardPending": "Reward 管线还没给它打分——可能仍在计算中,也可能 LLM 打分失败了;到 Logs 面板搜 `reward.*` 事件看看。", "tasks.skip.reason.default": "对话未达到生成摘要的条件。", diff --git a/apps/memos-local-plugin/web/src/views/TasksView.tsx b/apps/memos-local-plugin/web/src/views/TasksView.tsx index 0b0b2fe5d..c5e675b3b 100644 --- a/apps/memos-local-plugin/web/src/views/TasksView.tsx +++ b/apps/memos-local-plugin/web/src/views/TasksView.tsx @@ -42,6 +42,8 @@ interface EpisodeRow { skillReasonParams?: Record | null; linkedSkillId?: string | null; closeReason?: "finalized" | "abandoned" | null; + topicState?: "active" | "paused" | "interrupted" | "ended" | null; + pauseReason?: string | null; abandonReason?: string | null; } @@ -473,7 +475,12 @@ function deriveStatus(r: EpisodeRow): "active" | "completed" | "skipped" | "fail */ function statusReason(r: EpisodeRow): string | null { const s = deriveStatus(r); - if (s === "active" || s === "completed") return null; + if (s === "active") { + if (r.topicState === "interrupted") return t("tasks.active.reason.interrupted" as any); + if (r.topicState === "paused") return t("tasks.active.reason.paused" as any); + return null; + } + if (s === "completed") return null; if (r.abandonReason && r.abandonReason.trim().length > 0) { if (r.abandonReason.includes("插件上次未正常退出")) {