Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,27 @@ export type Unsubscribe = () => void;
export interface MemoryCore {
// ── lifecycle ──
init(): Promise<void>;
/**
* Await the background startup-recovery work scheduled by `init()`.
*
* `init()` returns as soon as the cheap, synchronous DB classification
* of orphan episodes is done; the slow reflect/reward/L2 path on stale
* orphans and dirty-closed episodes runs in the background so the HTTP
* viewer can start serving immediately
* (https://github.com/MemTensor/MemOS/issues/1776).
*
* Callers that need to observe the side effects of recovery — tests,
* scripted maintenance tools, graceful shutdown — await this promise.
* Production adapters (`adapters/openclaw`, `bridge.cts`) intentionally
* skip the await so server.started is not gated on LLM round-trips.
*
* Resolves immediately if there was nothing to recover, or if `init()`
* was never called. Never rejects — recovery errors are logged to
* `init.orphan_recovery.flush_failed` and
* `init.background_recovery_failed` and swallowed so they cannot wedge
* shutdown.
*/
waitForStartupRecovery?(): Promise<void>;
shutdown(): Promise<void>;
health(): Promise<CoreHealth>;
/** Late-bind ARMS telemetry (called after config is available). */
Expand Down
72 changes: 65 additions & 7 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,16 @@ export function createMemoryCore(
let telemetry = options.telemetry ?? null;
let initialized = false;
let shutDown = false;
/**
* Background promise for the slow part of `init()`'s orphan recovery
* (reflect → reward → L2 induce on stale orphans + dirty-closed
* rescore). Tracked so `waitForStartupRecovery()` and `shutdown()` can
* await it. Defaults to a resolved promise — empty DB or no orphans
* means no extra waiting is needed. See
* https://github.com/MemTensor/MemOS/issues/1776 for the regression
* this dodges (server.started was gated on ~100 s of LLM round-trips).
*/
let startupRecoveryPromise: Promise<void> = Promise.resolve();
/** Per-episode monotonic step counter for tool outcomes. */
const toolStepByEpisode = new Map<string, number>();
let hubRuntime: HubRuntime | null = null;
Expand Down Expand Up @@ -879,6 +889,19 @@ export function createMemoryCore(
// not evidence that the topic ended; the next user turn gets routed
// through relation classification. Only hard-stale open topics are
// finalized here so the pipeline eventually catches up.
//
// The synchronous part — lightweight close + recent-topic meta
// updates + classification — stays inline because it's pure DB work
// and the next turn relies on the resulting `topicState`. The slow
// part — `recoverOpenEpisodesAsSessionEnd` and
// `recoverDirtyClosedEpisodes`, both of which fan out into reflect →
// reward → L2 → LLM round-trips — is moved to a background promise so
// `init()` returns in milliseconds and the HTTP viewer can start
// accepting requests immediately. See
// https://github.com/MemTensor/MemOS/issues/1776 for the original 100 s
// startup regression.
let staleForBackground: Array<EpisodeRow & { meta?: Record<string, unknown> }> = [];
let dirtyClosedForBackground: Array<EpisodeRow & { meta?: Record<string, unknown> }> = [];
try {
const orphans = handle.repos.episodes.list({ status: "open", limit: 500 });
if (orphans.length > 0) {
Expand Down Expand Up @@ -908,22 +931,41 @@ export function createMemoryCore(
recoveredAtStartup: nowMs,
});
}
if (stale.length > 0) {
await recoverOpenEpisodesAsSessionEnd(stale);
}
staleForBackground = stale;
}
const dirtyClosed = handle.repos.episodes
dirtyClosedForBackground = handle.repos.episodes
.list({ status: "closed", limit: 500 })
.filter((ep) => !isLightweightEpisode(ep) && episodeRewardIsDirty(ep));
if (dirtyClosed.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosed);
}
} catch (err) {
log.debug("init.orphan_scan.failed", {
err: err instanceof Error ? err.message : String(err),
});
}

if (staleForBackground.length > 0 || dirtyClosedForBackground.length > 0) {
const staleSnapshot = staleForBackground;
const dirtyClosedSnapshot = dirtyClosedForBackground;
log.info("init.background_recovery_started", {
staleCount: staleSnapshot.length,
dirtyClosedCount: dirtyClosedSnapshot.length,
});
startupRecoveryPromise = (async () => {
if (staleSnapshot.length > 0) {
await recoverOpenEpisodesAsSessionEnd(staleSnapshot);
}
if (dirtyClosedSnapshot.length > 0) {
await recoverDirtyClosedEpisodes(dirtyClosedSnapshot);
}
})().catch((err) => {
// `waitForStartupRecovery` is contracted to never reject — log
// and swallow so a failed reflect listener can't surface as an
// unhandled rejection at the bridge level.
log.warn("init.background_recovery_failed", {
err: err instanceof Error ? err.message : String(err),
});
});
}

// Periodic rescore timer for episodes that miss the startup scan or
// retry of failed reward runs. 10-minute interval is safe because
// autoRescoreDirtyClosedEpisodes has its own 30-second dedup guard.
Expand Down Expand Up @@ -1418,10 +1460,25 @@ export function createMemoryCore(
data: Record<string, unknown>,
): void {}

async function waitForStartupRecovery(): Promise<void> {
// Contract: never rejects. The `.catch` in init() already converted
// any failure into a warn-level log, so awaiting here is safe.
await startupRecoveryPromise;
}

async function shutdown(): Promise<void> {
if (shutDown) return;
shutDown = true;
try {
// Drain the background orphan recovery before tearing down the
// SQLite handle / bus subscribers. If we shut down mid-flight the
// in-flight reflect → reward → L2 listeners would write to a
// closed DB and surface as SQLITE_MISUSE noise in tests + CI.
try {
await startupRecoveryPromise;
} catch {
/* startupRecoveryPromise already swallows; defensive only */
}
try {
await hubRuntime?.stop();
} catch (err) {
Expand Down Expand Up @@ -4496,6 +4553,7 @@ export function createMemoryCore(

return {
init,
waitForStartupRecovery,
shutdown,
health,
bindTelemetry(t: import("../telemetry/index.js").Telemetry) { telemetry = t; },
Expand Down
190 changes: 190 additions & 0 deletions apps/memos-local-plugin/tests/unit/pipeline/memory-core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,11 @@ algorithm:
pkgVersion: "orphan-test-recover",
});
await core.init();
// init() now schedules the slow reflect/reward/L2 part on a
// background promise; opt into the legacy "wait for everything"
// semantics so the meta-update assertions below are deterministic.
// See https://github.com/MemTensor/MemOS/issues/1776.
await core.waitForStartupRecovery?.();

const readDb = new Sqlite(home.home.dbFile, { readonly: true });
const unscored = readDb
Expand Down Expand Up @@ -1394,6 +1399,10 @@ algorithm:
pkgVersion: "dirty-rescore-recover",
});
await core.init();
// recoverDirtyClosedEpisodes runs on the background recovery promise
// post-issue-1776; opt in so the meta_json assertions below see the
// post-rescore state.
await core.waitForStartupRecovery?.();

const readDb = new Sqlite(home.home.dbFile, { readonly: true });
const episode = readDb
Expand Down Expand Up @@ -1488,6 +1497,10 @@ algorithm:
pkgVersion: "missing-reward-recover",
});
await core.init();
// recoverDirtyClosedEpisodes runs on the background recovery promise
// post-issue-1776; opt in so the meta_json assertions below see the
// post-rescore state.
await core.waitForStartupRecovery?.();

const readDb = new Sqlite(home.home.dbFile, { readonly: true });
const episode = readDb
Expand All @@ -1505,4 +1518,181 @@ algorithm:
expect(meta.reward?.traceCount).toBe(1);
expect(meta.reward?.traceIds).toEqual(["tr_missing_reward"]);
});

// https://github.com/MemTensor/MemOS/issues/1776 — orphan recovery must
// not gate HTTP server start. These tests pin the new lifecycle
// contract: init() returns fast and the slow reflect/reward/L2 work
// runs on a background promise that `waitForStartupRecovery` and
// `shutdown` can await.
describe("issue #1776 — non-blocking startup recovery", () => {
it("init() returns quickly even when stale orphans need recovery", async () => {
home = await makeTmpHome({
agent: "openclaw",
configYaml: FULL_MEMORY_CONFIG_YAML,
});
const seeder = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "fast-init-seed",
});
await seeder.init();
await seeder.shutdown();

// Seed 3 stale orphans: each has a trace + an `endedAt` >5 hours
// ago so they fall through the STALE_EPISODE_TIMEOUT_MS gate and
// hit `recoverOpenEpisodesAsSessionEnd` in the background.
const Sqlite = (await import("better-sqlite3")).default;
const writeDb = new Sqlite(home.home.dbFile);
const oldTs = Date.now() - 6 * 60 * 60 * 1000; // 6 h ago
writeDb
.prepare(
`INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`,
)
.run("se_fast", "openclaw", oldTs, oldTs, "{}");
for (const epId of ["ep_fast_1", "ep_fast_2", "ep_fast_3"]) {
writeDb
.prepare(
`INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, NULL, '[]', NULL, 'open', '{}')`,
)
.run(epId, "se_fast", oldTs);
}
writeDb.close();

core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "fast-init-recover",
});
const initStart = Date.now();
await core.init();
const initDurationMs = Date.now() - initStart;

// 500 ms is a generous bound: pure DB classification is sub-10 ms
// in practice, and the old synchronous path took >1 s even without
// an LLM because of the `handle.flush()` round-trip.
expect(initDurationMs).toBeLessThan(500);

await core.waitForStartupRecovery?.();
});

it("waitForStartupRecovery() resolves only after background work settles", async () => {
home = await makeTmpHome({
agent: "openclaw",
configYaml: FULL_MEMORY_CONFIG_YAML,
});
const seeder = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "wait-recovery-seed",
});
await seeder.init();
await seeder.shutdown();

const Sqlite = (await import("better-sqlite3")).default;
const writeDb = new Sqlite(home.home.dbFile);
const oldTs = Date.now() - 6 * 60 * 60 * 1000;
writeDb
.prepare(
`INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`,
)
.run("se_wait", "openclaw", oldTs, oldTs, "{}");
writeDb
.prepare(
`INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, NULL, '[]', ?, 'open', '{}')`,
)
.run("ep_wait", "se_wait", oldTs, 0.5);
writeDb.close();

core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "wait-recovery-recover",
});
await core.init();

// Right after init(), the orphan should still be open: background
// recovery has been scheduled but hasn't necessarily run yet.
// Then waitForStartupRecovery resolves, and the close is observed.
await core.waitForStartupRecovery?.();

const readDb = new Sqlite(home.home.dbFile, { readonly: true });
const row = readDb
.prepare("SELECT status, meta_json FROM episodes WHERE id = ?")
.get("ep_wait") as { status: string; meta_json: string } | undefined;
readDb.close();
expect(row).toBeDefined();
expect(row!.status).toBe("closed");
const meta = JSON.parse(row!.meta_json) as { closeReason?: string };
expect(meta.closeReason).toBe("finalized");
});

it("waitForStartupRecovery() is a safe no-op when there are no orphans", async () => {
home = await makeTmpHome({
agent: "openclaw",
configYaml: FULL_MEMORY_CONFIG_YAML,
});
core = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "no-orphan-init",
});
await core.init();
await expect(core.waitForStartupRecovery?.()).resolves.toBeUndefined();
});

it("shutdown() awaits the background recovery before tearing down", async () => {
home = await makeTmpHome({
agent: "openclaw",
configYaml: FULL_MEMORY_CONFIG_YAML,
});
const seeder = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "shutdown-await-seed",
});
await seeder.init();
await seeder.shutdown();

const Sqlite = (await import("better-sqlite3")).default;
const writeDb = new Sqlite(home.home.dbFile);
const oldTs = Date.now() - 6 * 60 * 60 * 1000;
writeDb
.prepare(
`INSERT INTO sessions (id, agent, started_at, last_seen_at, meta_json) VALUES (?, ?, ?, ?, ?)`,
)
.run("se_shutdown", "openclaw", oldTs, oldTs, "{}");
writeDb
.prepare(
`INSERT INTO episodes (id, session_id, started_at, ended_at, trace_ids_json, r_task, status, meta_json) VALUES (?, ?, ?, NULL, '[]', ?, 'open', '{}')`,
)
.run("ep_shutdown", "se_shutdown", oldTs, 0.5);
writeDb.close();

const local = await bootstrapMemoryCore({
agent: "openclaw",
home: home.home,
config: home.config,
pkgVersion: "shutdown-await-recover",
});
await local.init();
// Don't call waitForStartupRecovery — shutdown() must do it for us.
await local.shutdown();

// After shutdown, the episode must have been closed by the
// background recovery (otherwise the DB would have been torn down
// mid-flight).
const readDb = new Sqlite(home.home.dbFile, { readonly: true });
const row = readDb
.prepare("SELECT status FROM episodes WHERE id = ?")
.get("ep_shutdown") as { status: string } | undefined;
readDb.close();
expect(row?.status).toBe("closed");
});
});
});
Loading