Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ def _register_tool_call_hook(self) -> None:
if self._hook_registered:
return
try:
from hermes_cli.plugins import get_plugin_manager # pyright: ignore[reportMissingImports]
from hermes_cli.plugins import (
get_plugin_manager, # pyright: ignore[reportMissingImports]
)

mgr = get_plugin_manager()
mgr._hooks.setdefault("post_tool_call", []).append(self._on_post_tool_call)
Expand Down Expand Up @@ -1161,11 +1163,11 @@ def save_config(self, values: dict[str, Any], hermes_home: str) -> None: # type
def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignore[override]
if not self._bridge:
return
# `sync_turn` already flushed the turn data synchronously.
# Just close the episode and session.
if self._episode_id:
with contextlib.suppress(Exception):
self._bridge.request("episode.close", {"episodeId": self._episode_id})
# `sync_turn` already flushed completed turn data synchronously.
# Closing the host session is not the same as ending the topic:
# the core will pause or finalize the open episode according to
# topic-boundary rules so interrupted Hermes sessions can resume
# into the same task later.
with contextlib.suppress(Exception):
self._bridge.request("session.close", {"sessionId": self._session_id})

Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/adapters/openclaw/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ export function createOpenClawBridge(opts: BridgeOptions): BridgeHandle {
const routedEpisodeId = packet.query.episodeId as EpisodeId | undefined;
if (routedEpisodeId) {
openEpisodeBySession.set(routedSessionId, routedEpisodeId);
openEpisodeBySession.set(sessionId, routedEpisodeId);
}

opts.log.info("memos.onTurnStart", {
Expand Down
5 changes: 5 additions & 0 deletions apps/memos-local-plugin/agent-contract/dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ export interface EpisodeListItemDTO {
* failed) without guessing from `rTask`.
*/
closeReason?: "finalized" | "abandoned" | null;
/** Topic-level lifecycle state used by the viewer to distinguish
* interrupted/paused-but-continuable tasks from truly skipped ones. */
topicState?: "active" | "paused" | "interrupted" | "ended" | null;
/** Human-readable audit reason for a paused/interrupted open topic. */
pauseReason?: string | null;
/**
* User-readable reason when `closeReason === "abandoned"`. Mirrors
* the legacy plugin's Chinese skip-reason strings (e.g. "对话内容
Expand Down
90 changes: 60 additions & 30 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,44 +429,39 @@ export function createMemoryCore(
}
}

// ─── Stale episode auto-finalize ──
// Mirrors `memos-local-openclaw` ViewerServer.autoFinalizeStaleTasks().
// Open episodes older than 4 hours (configurable via
// `algorithm.session.mergeMaxGapMs * 2`) are abandoned so the Tasks
// view shows them as completed/skipped rather than perpetually "active".
// ─── Stale topic auto-finalize ──
// Open topics are allowed to survive clean session closes and process
// restarts so the next user turn can be classified against them. Once a
// topic exceeds this hard window, we treat it as ended and run the normal
// reflect/reward path.
const STALE_EPISODE_TIMEOUT_MS = Math.max(
handle.config.algorithm.session.mergeMaxGapMs * 2,
4 * 60 * 60 * 1000,
);
let lastStaleScan = 0;
function autoFinalizeStaleTasks(): void {
async function autoFinalizeStaleTasks(): Promise<void> {
const nowMs = Date.now();
if (nowMs - lastStaleScan < 30_000) return;
lastStaleScan = nowMs;
try {
const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 200 });
if (openEpisodes.length === 0) return;
const stale: Array<EpisodeRow & { meta?: Record<string, unknown> }> = [];
for (const ep of openEpisodes) {
const epAge = nowMs - (ep.endedAt ?? ep.startedAt);
if (epAge > STALE_EPISODE_TIMEOUT_MS) {
log.info("stale_episode.auto_abandon", {
log.info("stale_topic.auto_finalize", {
episodeId: ep.id,
sessionId: ep.sessionId,
ageMs: epAge,
thresholdMs: STALE_EPISODE_TIMEOUT_MS,
});
try {
handle.episodeManager.abandon(
ep.id as import("../../agent-contract/dto.js").EpisodeId,
`自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`,
);
} catch {
// Episode may have been finalized concurrently — safe to ignore.
}
stale.push(ep);
}
}
if (stale.length > 0) await recoverOpenEpisodesAsSessionEnd(stale);
} catch (err) {
log.debug("stale_episode.scan_error", {
log.debug("stale_topic.scan_error", {
err: err instanceof Error ? err.message : String(err),
});
}
Expand All @@ -482,19 +477,31 @@ export function createMemoryCore(
}
initialized = true;

// Any `status: "open"` row we see on boot is an orphan from a
// previous unclean shutdown — the plugin host was SIGKILL'd, the
// gateway was bootout'd, the process crashed mid-turn, etc.
//
// Treat rows that already have traces as a missed `session_end`,
// not as user abandonment: finalize them and emit the same
// `episode.finalized` event the normal session close path emits so
// capture → reward → L2 → L3 → Skill can catch up on restart. Rows
// that already carry rTask only need their final status repaired.
// Preserve recent open topics across restarts. A crash or Ctrl+C is
// not evidence that the topic ended; the next user turn gets routed
// through relation classification. Only hard-stale open topics are
// finalized here so the pipeline eventually catches up.
try {
const orphans = handle.repos.episodes.list({ status: "open", limit: 500 });
if (orphans.length > 0) {
await recoverOpenEpisodesAsSessionEnd(orphans);
const nowMs = Date.now();
const stale = orphans.filter(
(ep) =>
ep.rTask != null ||
(ep.traceIds?.length ?? 0) > 0 ||
nowMs - (ep.endedAt ?? ep.startedAt) > STALE_EPISODE_TIMEOUT_MS,
);
const recent = orphans.filter((ep) => !stale.includes(ep));
for (const ep of recent) {
handle.repos.episodes.updateMeta(ep.id as EpisodeId, {
topicState: (ep.meta?.topicState as string | undefined) ?? "interrupted",
pauseReason: (ep.meta?.pauseReason as string | undefined) ?? "startup_recovered_open_topic",
recoveredAtStartup: nowMs,
});
}
if (stale.length > 0) {
await recoverOpenEpisodesAsSessionEnd(stale);
}
}
} catch (err) {
log.debug("init.orphan_scan.failed", {
Expand Down Expand Up @@ -1796,10 +1803,9 @@ export function createMemoryCore(
}): Promise<Parameters<MemoryCore["listEpisodeRows"]> extends unknown[] ? Awaited<ReturnType<MemoryCore["listEpisodeRows"]>> : never> {
ensureLive();

// Legacy parity: auto-finalize stale open episodes when the task
// list is fetched, matching `memos-local-openclaw` ViewerServer's
// `autoFinalizeStaleTasks()`. Default threshold: 4 hours.
autoFinalizeStaleTasks();
// Auto-finalize only hard-stale open topics. Recent interrupted
// topics stay open so the next user turn can be merged by topic.
await autoFinalizeStaleTasks();

const rows = handle.repos.episodes.list({
sessionId: input?.sessionId,
Expand Down Expand Up @@ -1859,13 +1865,35 @@ export function createMemoryCore(
// abandon. Surface them through the API so TasksView can render
// a human-readable status badge without guessing from rTask.
const meta = (r as { meta?: Record<string, unknown> }).meta ?? {};
if (!preview) {
const fallback =
typeof meta.initialUserText === "string"
? meta.initialUserText
: typeof meta.pendingUserText === "string"
? meta.pendingUserText
: typeof meta.lastUserText === "string"
? meta.lastUserText
: "";
const raw = fallback.replace(/\s+/g, " ").trim();
if (raw) preview = raw.length > 160 ? raw.slice(0, 157) + "…" : raw;
}
const closeReasonRaw = meta.closeReason;
const closeReason: "finalized" | "abandoned" | null =
closeReasonRaw === "finalized" || closeReasonRaw === "abandoned"
? closeReasonRaw
: null;
const abandonReason =
typeof meta.abandonReason === "string" ? meta.abandonReason : null;
const topicStateRaw = meta.topicState;
const topicState =
topicStateRaw === "active" ||
topicStateRaw === "paused" ||
topicStateRaw === "interrupted" ||
topicStateRaw === "ended"
? topicStateRaw
: null;
const pauseReason =
typeof meta.pauseReason === "string" ? meta.pauseReason : null;

return {
id: r.id,
Expand All @@ -1883,6 +1911,8 @@ export function createMemoryCore(
skillReasonParams: derivation.reasonParams,
linkedSkillId: derivation.linkedSkillId,
closeReason,
topicState,
pauseReason,
abandonReason,
};
});
Expand Down
Loading