From 9053d75fc399d4da641c6908b265b2c64a3da2ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=91=E5=B8=83=E6=9E=97?= <11641432+heiheiyouyou@user.noreply.gitee.com> Date: Thu, 30 Apr 2026 14:51:15 +0800 Subject: [PATCH] fix:embedder fail process --- .../core/capture/capture.ts | 61 +++ .../core/embedding/index.ts | 2 + .../core/embedding/retry-worker.ts | 224 ++++++++++ apps/memos-local-plugin/core/memory/l2/l2.ts | 28 +- .../core/memory/l2/subscriber.ts | 2 +- apps/memos-local-plugin/core/memory/l3/l3.ts | 37 +- .../core/memory/l3/subscriber.ts | 2 +- apps/memos-local-plugin/core/pipeline/deps.ts | 1 + .../core/pipeline/event-bridge.ts | 10 + .../core/pipeline/memory-core.ts | 78 +++- .../core/pipeline/orchestrator.ts | 24 + .../memos-local-plugin/core/pipeline/types.ts | 4 +- .../core/retrieval/retrieve.ts | 20 +- .../core/retrieval/types.ts | 8 + apps/memos-local-plugin/core/skill/skill.ts | 15 + .../migrations/002-embedding-retry-queue.sql | 24 + .../migrations/003-embedding-retry-lease.sql | 4 + .../core/storage/migrator.ts | 25 +- .../storage/repos/embedding_retry_queue.ts | 419 ++++++++++++++++++ .../core/storage/repos/index.ts | 4 + .../core/storage/repos/policies.ts | 7 + .../core/storage/repos/skills.ts | 7 + .../core/storage/repos/traces.ts | 12 + .../core/storage/repos/world_model.ts | 7 + .../tests/unit/capture/capture.test.ts | 41 +- .../tests/unit/embedding/retry-queue.test.ts | 189 ++++++++ .../tests/unit/embedding/retry-worker.test.ts | 204 +++++++++ .../tests/unit/retrieval/integration.test.ts | 34 ++ .../tests/unit/storage/migrator.test.ts | 52 +++ .../web/src/views/LogsView.tsx | 12 + .../web/src/views/OverviewView.tsx | 4 +- 31 files changed, 1532 insertions(+), 29 deletions(-) create mode 100644 apps/memos-local-plugin/core/embedding/retry-worker.ts create mode 100644 apps/memos-local-plugin/core/storage/migrations/002-embedding-retry-queue.sql create mode 100644 apps/memos-local-plugin/core/storage/migrations/003-embedding-retry-lease.sql create mode 100644 apps/memos-local-plugin/core/storage/repos/embedding_retry_queue.ts create mode 100644 apps/memos-local-plugin/tests/unit/embedding/retry-queue.test.ts create mode 100644 apps/memos-local-plugin/tests/unit/embedding/retry-worker.test.ts diff --git a/apps/memos-local-plugin/core/capture/capture.ts b/apps/memos-local-plugin/core/capture/capture.ts index fd1f55168..7c9509756 100644 --- a/apps/memos-local-plugin/core/capture/capture.ts +++ b/apps/memos-local-plugin/core/capture/capture.ts @@ -20,6 +20,7 @@ import type { LlmClient } from "../llm/index.js"; import { rootLogger } from "../logger/index.js"; import { ids } from "../id.js"; import type { TraceRow, TraceId } from "../types.js"; +import type { makeEmbeddingRetryQueueRepo } from "../storage/repos/embedding_retry_queue.js"; import type { makeTracesRepo } from "../storage/repos/traces.js"; import type { EpisodesRepo } from "../session/persistence.js"; import { disabledScore, scoreReflection } from "./alpha-scorer.js"; @@ -45,9 +46,11 @@ import type { } from "./types.js"; type TracesRepo = ReturnType; +type EmbeddingRetryQueueRepo = ReturnType; export interface CaptureDeps { tracesRepo: TracesRepo; + embeddingRetryQueue?: EmbeddingRetryQueueRepo; episodesRepo: EpisodesRepo; embedder: Embedder | null; /** Main LLM — used for per-turn lite capture (summarisation). */ @@ -564,6 +567,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner { ): Promise { try { for (const row of rows) deps.tracesRepo.insert(row); + enqueueMissingTraceVectors(rows, warnings); } catch (err) { const failure = errDetail(err); log.error("persist.failed", { @@ -599,6 +603,46 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner { return true; } + function enqueueMissingTraceVectors( + rows: TraceRow[], + warnings: CaptureResult["warnings"], + ): void { + if (!deps.cfg.embedTraces || !deps.embeddingRetryQueue || !deps.embedder) return; + const queuedAt = now(); + let queued = 0; + for (const row of rows) { + if (!row.vecSummary) { + deps.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "trace", + targetId: row.id, + vectorField: "vec_summary", + sourceText: row.summary?.trim() || row.userText.trim() || "(empty)", + now: queuedAt, + }); + queued++; + } + if (!row.vecAction) { + deps.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "trace", + targetId: row.id, + vectorField: "vec_action", + sourceText: traceActionText(row), + now: queuedAt, + }); + queued++; + } + } + if (queued > 0) { + warnings.push({ + stage: "embed", + message: "embedding retry queued for missing trace vectors", + detail: { queued }, + }); + } + } + function finalResult( input: CaptureInput, startedAt: number, @@ -805,6 +849,23 @@ function errDetail(err: unknown): Record { return { value: String(err) }; } +function traceActionText(row: Pick): string { + const toolSig = row.toolCalls + .map((t) => `${t.name}(${safeStringify(t.input).slice(0, 300)})`) + .join("; "); + return [row.agentText.trim(), toolSig].filter((s) => s.length > 0).join("\n---\n") || "(empty)"; +} + +function safeStringify(v: unknown): string { + if (v === undefined || v === null) return ""; + if (typeof v === "string") return v; + try { + return JSON.stringify(v); + } catch { + return String(v); + } +} + /** * Pull the `turnId` stamped by `step-extractor` out of the * `StepCandidate.meta` blob. Falls back to the trace's own `ts` so diff --git a/apps/memos-local-plugin/core/embedding/index.ts b/apps/memos-local-plugin/core/embedding/index.ts index 5d235b0ad..99faa0048 100644 --- a/apps/memos-local-plugin/core/embedding/index.ts +++ b/apps/memos-local-plugin/core/embedding/index.ts @@ -16,6 +16,8 @@ export { type EmbedCacheStats, } from "./cache.js"; export { l2Normalize, enforceDim, postProcess, toFloat32 } from "./normalize.js"; +export { createEmbeddingRetryWorker, systemErrorEvent } from "./retry-worker.js"; +export type { EmbeddingRetryWorker } from "./retry-worker.js"; export type { EmbedInput, EmbedRole, diff --git a/apps/memos-local-plugin/core/embedding/retry-worker.ts b/apps/memos-local-plugin/core/embedding/retry-worker.ts new file mode 100644 index 000000000..3620fd9e3 --- /dev/null +++ b/apps/memos-local-plugin/core/embedding/retry-worker.ts @@ -0,0 +1,224 @@ +import type { CoreEvent } from "../../agent-contract/events.js"; +import { ids } from "../id.js"; +import type { Embedder } from "./types.js"; +import type { Logger } from "../logger/types.js"; +import type { Repos } from "../storage/repos/index.js"; +import type { EmbeddingRetryClaim, EmbeddingRetryJob } from "../storage/repos/embedding_retry_queue.js"; +import type { EmbeddingVector } from "../types.js"; + +export interface EmbeddingRetryWorker { + start(): void; + stop(): void; + flush(): Promise; +} + +export interface EmbeddingRetryWorkerDeps { + repos: Repos; + embedder: Embedder | null; + log: Logger; + now?: () => number; + intervalMs?: number; + batchSize?: number; + onSystemError?: (payload: Record, correlationId?: string) => void; +} + +const DEFAULT_INTERVAL_MS = 60_000; +const BASE_BACKOFF_MS = 60_000; +const MAX_BACKOFF_MS = 60 * 60_000; +const DEFAULT_LEASE_MS = 5 * 60_000; + +export function createEmbeddingRetryWorker( + deps: EmbeddingRetryWorkerDeps, +): EmbeddingRetryWorker { + const now = deps.now ?? Date.now; + const batchSize = deps.batchSize ?? 25; + const workerId = `embedding-retry-${ids.span()}`; + let timer: ReturnType | null = null; + let running: Promise | null = null; + + async function runOnce(): Promise { + if (!deps.embedder) return; + const at = now(); + const jobs = deps.repos.embeddingRetryQueue.claimDue({ + now: at, + workerId, + leaseUntil: at + DEFAULT_LEASE_MS, + limit: batchSize, + }); + for (const job of jobs) { + await processJob(job); + } + } + + async function processJob(job: EmbeddingRetryJob): Promise { + if (!deps.embedder) return; + const claim = claimFor(job); + if (!claim) { + deps.log.debug("embedding_retry.stale_missing_claim", { jobId: job.id }); + return; + } + const attemptNo = job.attempts + 1; + try { + const vec = await deps.embedder.embedOne({ + text: job.sourceText || "(empty)", + role: job.embedRole, + }); + const completed = deps.repos.embeddingRetryQueue.transact(() => { + const at = now(); + if (!deps.repos.embeddingRetryQueue.touchClaimHeld(job.id, { ...claim, now: at })) { + return { stale: true, updated: false, completed: false }; + } + const updated = applyVector(job, vec); + if (!updated) { + return { stale: false, updated: false, completed: false }; + } + return { + stale: false, + updated: true, + completed: deps.repos.embeddingRetryQueue.markSucceededClaimed(job.id, { + ...claim, + now: at, + }), + }; + }); + if (completed.stale) { + deps.log.debug("embedding_retry.stale_success_ignored", { jobId: job.id }); + return; + } + if (!completed.updated) { + throw new Error(`embedding retry target not found: ${job.targetKind}:${job.targetId}`); + } + if (!completed.completed) { + deps.log.debug("embedding_retry.stale_success_ignored", { jobId: job.id }); + return; + } + deps.log.info("embedding_retry.succeeded", { + jobId: job.id, + targetKind: job.targetKind, + targetId: job.targetId, + vectorField: job.vectorField, + attempts: attemptNo, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const at = now(); + const terminal = attemptNo >= job.maxAttempts; + const recorded = terminal + ? deps.repos.embeddingRetryQueue.markFailedClaimed(job.id, { + ...claim, + attempts: attemptNo, + error: message, + now: at, + }) + : deps.repos.embeddingRetryQueue.markRetryClaimed(job.id, { + ...claim, + attempts: attemptNo, + nextAttemptAt: at + backoffMs(attemptNo), + error: message, + now: at, + }); + if (!recorded) { + deps.log.debug("embedding_retry.stale_failure_ignored", { jobId: job.id, terminal }); + return; + } + emitFailure(job, attemptNo, message, terminal, at); + } + } + + function claimFor(job: EmbeddingRetryJob): EmbeddingRetryClaim | null { + if (job.claimedBy !== workerId || job.leaseUntil === null) return null; + return { workerId, leaseUntil: job.leaseUntil }; + } + + function applyVector(job: EmbeddingRetryJob, vec: EmbeddingVector): boolean { + switch (job.targetKind) { + case "trace": + return deps.repos.traces.updateVector( + job.targetId as never, + job.vectorField === "vec_action" ? "vecAction" : "vecSummary", + vec, + ); + case "policy": + return deps.repos.policies.updateVector(job.targetId as never, vec); + case "world_model": + return deps.repos.worldModel.updateVector(job.targetId as never, vec); + case "skill": + return deps.repos.skills.updateVector(job.targetId as never, vec); + } + } + + function emitFailure( + job: EmbeddingRetryJob, + attempts: number, + message: string, + terminal: boolean, + at: number, + ): void { + const payload = { + kind: "embedding.retry_failed", + jobId: job.id, + targetKind: job.targetKind, + targetId: job.targetId, + vectorField: job.vectorField, + attempts, + maxAttempts: job.maxAttempts, + terminal, + message, + }; + deps.log.warn("embedding_retry.failed", payload); + try { + deps.repos.apiLogs.insert({ + toolName: "system_error", + input: { role: "embedding_retry" }, + output: payload, + durationMs: 0, + success: false, + calledAt: at, + }); + } catch { + /* logging the retry failure is best-effort */ + } + deps.onSystemError?.(payload, job.targetId); + } + + function tick(): void { + if (running) return; + running = runOnce().finally(() => { + running = null; + }); + } + + return { + start(): void { + if (timer || !deps.embedder) return; + tick(); + timer = setInterval(tick, deps.intervalMs ?? DEFAULT_INTERVAL_MS); + }, + stop(): void { + if (timer) clearInterval(timer); + timer = null; + }, + async flush(): Promise { + tick(); + if (running) await running; + }, + }; +} + +function backoffMs(attemptNo: number): number { + return Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** Math.max(0, attemptNo - 1)); +} + +export function systemErrorEvent( + payload: Record, + seq: number, + correlationId?: string, +): CoreEvent { + return { + type: "system.error", + ts: Date.now(), + seq, + correlationId, + payload, + }; +} diff --git a/apps/memos-local-plugin/core/memory/l2/l2.ts b/apps/memos-local-plugin/core/memory/l2/l2.ts index f15e2f832..201e507c5 100644 --- a/apps/memos-local-plugin/core/memory/l2/l2.ts +++ b/apps/memos-local-plugin/core/memory/l2/l2.ts @@ -29,6 +29,7 @@ import type { TraceRow, } from "../../types.js"; import type { Repos } from "../../storage/repos/index.js"; +import { ids } from "../../id.js"; import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js"; import { associateTraces } from "./associate.js"; import { makeCandidatePool } from "./candidate-pool.js"; @@ -47,7 +48,7 @@ import type { } from "./types.js"; export interface RunL2Deps { - repos: Pick; + repos: Pick; db: Parameters[0]["db"]; llm: LlmClient | null; log: Logger; @@ -224,6 +225,21 @@ export async function runL2( }); try { repos.policies.insert(policy); + if (!policy.vec) { + repos.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "policy", + targetId: policy.id, + vectorField: "vec", + sourceText: policyVectorText(policy), + now: input.now ?? Date.now(), + }); + warnings.push({ + stage: "embed", + message: "embedding retry queued for policy vector", + detail: { policyId: policy.id }, + }); + } pool.promote(bucket.candidateIds, policy.id); touched.set(policy.id, policy); inductionEvidenceByPolicy.set( @@ -377,6 +393,16 @@ function stageWarn( return { stage, message, detail }; } +function policyVectorText(policy: PolicyRow): string { + return [ + policy.title, + policy.trigger, + policy.procedure, + policy.verification, + policy.boundary, + ].filter(Boolean).join("\n"); +} + function pickOnePerEpisode(traces: readonly TraceRow[]): TraceRow[] { const byEp = new Map(); for (const t of traces) { diff --git a/apps/memos-local-plugin/core/memory/l2/subscriber.ts b/apps/memos-local-plugin/core/memory/l2/subscriber.ts index c36b90a1a..43fa4e37f 100644 --- a/apps/memos-local-plugin/core/memory/l2/subscriber.ts +++ b/apps/memos-local-plugin/core/memory/l2/subscriber.ts @@ -24,7 +24,7 @@ import type { L2Config, L2EventBus } from "./types.js"; export interface L2SubscriberDeps { db: StorageDb; - repos: Pick; + repos: Pick; rewardBus: RewardEventBus; l2Bus: L2EventBus; llm: LlmClient | null; diff --git a/apps/memos-local-plugin/core/memory/l3/l3.ts b/apps/memos-local-plugin/core/memory/l3/l3.ts index 61a70351a..1da4a1047 100644 --- a/apps/memos-local-plugin/core/memory/l3/l3.ts +++ b/apps/memos-local-plugin/core/memory/l3/l3.ts @@ -25,6 +25,7 @@ import type { Logger } from "../../logger/types.js"; import type { LlmClient } from "../../llm/index.js"; import { L3_ABSTRACTION_PROMPT } from "../../llm/prompts/l3-abstraction.js"; import type { Repos } from "../../storage/repos/index.js"; +import { ids } from "../../id.js"; import type { EpisodeId, EpochMs, @@ -53,7 +54,7 @@ import type { // ─── Deps ────────────────────────────────────────────────────────────────── export interface RunL3Deps { - repos: Pick; + repos: Pick; llm: LlmClient | null; log: Logger; bus?: L3EventBus; @@ -196,6 +197,21 @@ export async function runL3( vec: patch.vec, updatedAt: now, }); + if (!patch.vec) { + repos.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "world_model", + targetId: decision.target.id, + vectorField: "vec", + sourceText: worldModelVectorText(patch.title, patch.body), + now, + }); + warnings.push({ + stage: "embed", + message: "embedding retry queued for world model vector", + detail: { worldModelId: decision.target.id }, + }); + } const bumped = clamp01(decision.target.confidence + config.confidenceDelta); if (bumped !== decision.target.confidence) { repos.worldModel.updateConfidence(decision.target.id, bumped, now); @@ -248,6 +264,21 @@ export async function runL3( }); try { repos.worldModel.insert(wm); + if (!wm.vec) { + repos.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "world_model", + targetId: wm.id, + vectorField: "vec", + sourceText: worldModelVectorText(wm.title, wm.body), + now, + }); + warnings.push({ + stage: "embed", + message: "embedding retry queued for world model vector", + detail: { worldModelId: wm.id }, + }); + } abstractions.push({ clusterKey: cluster.key, worldModelId: wm.id, @@ -312,6 +343,10 @@ function stageWarn( return { stage, message, detail }; } +function worldModelVectorText(title: string, body: string): string { + return [title.trim(), body.trim()].filter(Boolean).join("\n\n") || "(empty)"; +} + function skipped( cluster: PolicyCluster, reason: Exclude, diff --git a/apps/memos-local-plugin/core/memory/l3/subscriber.ts b/apps/memos-local-plugin/core/memory/l3/subscriber.ts index 01cb7da78..0edc6be5b 100644 --- a/apps/memos-local-plugin/core/memory/l3/subscriber.ts +++ b/apps/memos-local-plugin/core/memory/l3/subscriber.ts @@ -29,7 +29,7 @@ import { adjustConfidence, runL3 } from "./l3.js"; import type { L3Config, L3EventBus, L3ProcessInput, L3ProcessResult } from "./types.js"; export interface L3SubscriberDeps { - repos: Pick; + repos: Pick; l2Bus: L2EventBus; l3Bus: L3EventBus; llm: LlmClient | null; diff --git a/apps/memos-local-plugin/core/pipeline/deps.ts b/apps/memos-local-plugin/core/pipeline/deps.ts index 28fa9f79c..6dcedf6cb 100644 --- a/apps/memos-local-plugin/core/pipeline/deps.ts +++ b/apps/memos-local-plugin/core/pipeline/deps.ts @@ -201,6 +201,7 @@ export function buildPipelineSubscribers( const captureRunner = createCaptureRunner({ tracesRepo: deps.repos.traces, + embeddingRetryQueue: deps.repos.embeddingRetryQueue, episodesRepo: adaptEpisodesRepo(deps.repos.episodes), embedder: deps.embedder, llm: deps.llm, diff --git a/apps/memos-local-plugin/core/pipeline/event-bridge.ts b/apps/memos-local-plugin/core/pipeline/event-bridge.ts index 8a1b90df4..f1efb7fdf 100644 --- a/apps/memos-local-plugin/core/pipeline/event-bridge.ts +++ b/apps/memos-local-plugin/core/pipeline/event-bridge.ts @@ -234,6 +234,16 @@ export function bridgeToCoreEvents(deps: EventBridgeDeps): EventBridgeHandle { case "retrieval.started": return send("retrieval.triggered", evt, evt.sessionId); case "retrieval.done": + if (evt.stats.embedding?.degraded) { + send("system.error", { + kind: "embedding.query_degraded", + message: evt.stats.embedding.errorMessage ?? "Query embedding failed; retrieval degraded", + reason: evt.reason, + sessionId: evt.sessionId, + episodeId: evt.episodeId, + errorCode: evt.stats.embedding.errorCode, + }, evt.sessionId); + } if (evt.stats.emptyPacket) { return send("retrieval.empty", evt, evt.sessionId); } diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 749ab557c..917c5a1e3 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -1110,6 +1110,7 @@ export function createMemoryCore( ); const filtered = candidates.filter((c) => !droppedIds.has(c.refId)); const dropped = candidates.filter((c) => droppedIds.has(c.refId)); + const stats = packet ? handle.consumeRetrievalStats(packet.packetId) : null; handle.repos.apiLogs.insert({ toolName: "memory_search", input: { @@ -1125,6 +1126,7 @@ export function createMemoryCore( hubCandidates: [] as unknown[], filtered, droppedByLlm: dropped, + stats: stats ? retrievalStatsPayload(stats) : undefined, } : { error: "turn_start_retrieval_failed" }, durationMs: Date.now() - startedAt, @@ -1370,6 +1372,13 @@ export function createMemoryCore( channelHits?: Record; queryTokens?: number; queryTags?: string[]; + embedding?: { + attempted: boolean; + ok: boolean; + degraded: boolean; + errorCode?: string; + errorMessage?: string; + }; } | undefined; try { const result = await turnStartRetrieve(deps, { @@ -1410,22 +1419,23 @@ export function createMemoryCore( // funnels. All fields are optional on the producer side so older // consumers keep working. const s = result.stats; - retrievalStats = { - raw: s.rawCandidateCount, - ranked: s.rankedCount, - droppedByThreshold: s.droppedByThresholdCount, - thresholdFloor: s.thresholdFloor, - topRelevance: s.topRelevance, - llmFilter: { - outcome: s.llmFilterOutcome, - kept: s.llmFilterKept, - dropped: s.llmFilterDropped, - sufficient: s.llmFilterSufficient ?? null, - }, - channelHits: s.channelHits as Record | undefined, - queryTokens: s.queryTokens, - queryTags: s.queryTags, - }; + retrievalStats = retrievalStatsPayload(s); + if (s.embedding?.degraded) { + handle.repos.apiLogs.insert({ + toolName: "system_error", + input: { role: "embedding" }, + output: { + role: "embedding", + provider: deps.embedder ? "retrieval" : "none", + model: "query", + message: s.embedding.errorMessage ?? "query embedding failed; retrieval degraded", + code: s.embedding.errorCode, + }, + durationMs: 0, + success: false, + calledAt: Date.now(), + }); + } return { query, @@ -2991,6 +3001,42 @@ function findLatestPersistedModelStatus( return null; } +function retrievalStatsPayload(s: import("../retrieval/types.js").RetrievalStats): { + raw?: number; + ranked?: number; + droppedByThreshold?: number; + thresholdFloor?: number; + topRelevance?: number; + llmFilter?: { + outcome?: string; + kept?: number; + dropped?: number; + sufficient?: boolean | null; + }; + channelHits?: Record; + queryTokens?: number; + queryTags?: string[]; + embedding?: import("../retrieval/types.js").RetrievalStats["embedding"]; +} { + return { + raw: s.rawCandidateCount, + ranked: s.rankedCount, + droppedByThreshold: s.droppedByThresholdCount, + thresholdFloor: s.thresholdFloor, + topRelevance: s.topRelevance, + llmFilter: { + outcome: s.llmFilterOutcome, + kept: s.llmFilterKept, + dropped: s.llmFilterDropped, + sufficient: s.llmFilterSufficient ?? null, + }, + channelHits: s.channelHits as Record | undefined, + queryTokens: s.queryTokens, + queryTags: s.queryTags, + embedding: s.embedding, + }; +} + function llmHealth( llm: PipelineHandle["llm"], // Kept in the signature for source compatibility with older callers diff --git a/apps/memos-local-plugin/core/pipeline/orchestrator.ts b/apps/memos-local-plugin/core/pipeline/orchestrator.ts index eab711708..fd645f483 100644 --- a/apps/memos-local-plugin/core/pipeline/orchestrator.ts +++ b/apps/memos-local-plugin/core/pipeline/orchestrator.ts @@ -76,6 +76,7 @@ import type { CoreEvent } from "../../agent-contract/events.js"; import type { LogRecord } from "../../agent-contract/log-record.js"; import { memoryBuffer } from "../logger/index.js"; import { onBroadcastLog } from "../logger/transports/sse-broadcast.js"; +import { createEmbeddingRetryWorker, systemErrorEvent } from "../embedding/index.js"; import type { EpisodeSnapshot } from "../session/index.js"; // ─── Factory ────────────────────────────────────────────────────────────── @@ -127,6 +128,18 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { const getRecentEvents = (): readonly CoreEvent[] => recentEvents.slice(); + let retryEventSeq = 1_000_000; + const embeddingRetryWorker = createEmbeddingRetryWorker({ + repos: deps.repos, + embedder: deps.embedder, + log: log.child({ channel: "core.embedding.retry" }), + now: deps.now, + onSystemError: (payload, correlationId) => { + emitCore(systemErrorEvent(payload, retryEventSeq++, correlationId)); + }, + }); + embeddingRetryWorker.start(); + // Hydrate the ring buffer with synthetic events derived from the // most-recent rows on disk. Without this, every plugin restart // produces an empty "实时活动" panel until the user happens to @@ -541,6 +554,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { // ─── Retrieval entry points ───────────────────────────────────────────── const retrievalDeps = buildRetrievalDeps(deps, algorithm); + const turnStartRetrievalStats = new Map(); async function retrieveTurnStart(input: TurnInputDTO): Promise { const ctx = { @@ -557,9 +571,16 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { ctx, { events: buses.retrieval }, ); + turnStartRetrievalStats.set(result.packet.packetId, result.stats); return result.packet; } + function consumeRetrievalStats(packetId: string): RetrievalResult["stats"] | null { + const stats = turnStartRetrievalStats.get(packetId) ?? null; + turnStartRetrievalStats.delete(packetId); + return stats; + } + async function retrieveToolDriven(ctx: ToolDrivenCtx): Promise { const result = await toolDrivenRetrieve( retrievalDeps, @@ -835,6 +856,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { await nextTick(); await subs.skills.flush(); await subs.feedback.flush(); + await embeddingRetryWorker.flush(); } async function shutdown(reason: string = "shutdown"): Promise { @@ -853,6 +875,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { subs.l3.detach(); subs.skills.dispose(); subs.feedback.dispose(); + embeddingRetryWorker.stop(); bridge.dispose(); logSubscription(); session.sessionManager.shutdown(reason); @@ -932,6 +955,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { getRecentEvents, subscribeLogs, onTurnStart, + consumeRetrievalStats, onTurnEnd, recordToolOutcome, retrieveToolDriven, diff --git a/apps/memos-local-plugin/core/pipeline/types.ts b/apps/memos-local-plugin/core/pipeline/types.ts index 27eecc1ec..7462fd5de 100644 --- a/apps/memos-local-plugin/core/pipeline/types.ts +++ b/apps/memos-local-plugin/core/pipeline/types.ts @@ -190,6 +190,7 @@ export interface PipelineHandle { // Orchestrator entry points (turn lifecycle). onTurnStart(input: TurnInputDTO): Promise; + consumeRetrievalStats(packetId: string): RetrievalResult["stats"] | null; onTurnEnd(result: TurnResultDTO): Promise; // Tool-level signals. @@ -287,8 +288,7 @@ export interface DerivedTurnStartCtx extends TurnStartCtx { /** * The rolled-up retrieval outcome used both by adapters and by the - * viewer. The pipeline always returns an `InjectionPacket` — tests that - * want richer stats should use `pipeline.retrievalDeps()` directly. + * viewer. The packet is what adapters inject; stats are kept for logs. */ export interface PipelineRetrievalResult extends RetrievalResult { /** For logging: why the retrieval ran. */ diff --git a/apps/memos-local-plugin/core/retrieval/retrieve.ts b/apps/memos-local-plugin/core/retrieval/retrieve.ts index c10267b7a..542617d21 100644 --- a/apps/memos-local-plugin/core/retrieval/retrieve.ts +++ b/apps/memos-local-plugin/core/retrieval/retrieve.ts @@ -192,11 +192,25 @@ async function runAll( }); try { + const embeddingStats: RetrievalStats["embedding"] = { + attempted: compiled.text.length > 0, + ok: false, + degraded: false, + }; const queryVec = compiled.text - ? await deps.embedder.embed(compiled.text, "query").catch((err) => { + ? await deps.embedder.embed(compiled.text, "query").then((vec) => { + embeddingStats.ok = true; + return vec; + }).catch((err) => { + const code = (err as { code?: string })?.code; + const message = err instanceof Error ? err.message : String(err); + embeddingStats.degraded = true; + embeddingStats.errorCode = code; + embeddingStats.errorMessage = message; log.warn("embed_failed", { reason: ctx.reason, - err: err instanceof Error ? err.message : String(err), + code, + err: message, }); return null; }) @@ -372,6 +386,7 @@ async function runAll( queryTokens: approxTokens(compiled.text), queryTags: compiled.tags, emptyPacket: packet.snippets.length === 0, + embedding: embeddingStats, rawCandidateCount, droppedByThresholdCount: ranked.droppedByThreshold, thresholdFloor: ranked.thresholdFloor, @@ -463,6 +478,7 @@ function emptyResult( queryTokens: 0, queryTags: [], emptyPacket: true, + embedding: { attempted: false, ok: false, degraded: false }, }, }; } diff --git a/apps/memos-local-plugin/core/retrieval/types.ts b/apps/memos-local-plugin/core/retrieval/types.ts index cae381070..88ab0db10 100644 --- a/apps/memos-local-plugin/core/retrieval/types.ts +++ b/apps/memos-local-plugin/core/retrieval/types.ts @@ -571,6 +571,14 @@ export interface RetrievalStats { queryTokens: number; queryTags: string[]; emptyPacket: boolean; + /** Query embedding status. `degraded=true` means vector recall was unavailable. */ + embedding?: { + attempted: boolean; + ok: boolean; + degraded: boolean; + errorCode?: string; + errorMessage?: string; + }; /** * Observability breakdown — populated so the Logs page (and * api_logs) can show "how many candidates survived each stage" and diff --git a/apps/memos-local-plugin/core/skill/skill.ts b/apps/memos-local-plugin/core/skill/skill.ts index f04601bbc..3cce0f5b1 100644 --- a/apps/memos-local-plugin/core/skill/skill.ts +++ b/apps/memos-local-plugin/core/skill/skill.ts @@ -26,6 +26,7 @@ import type { LlmClient } from "../llm/types.js"; import type { Logger } from "../logger/types.js"; import type { Repos } from "../storage/repos/index.js"; import { now as nowMs } from "../time.js"; +import { ids } from "../id.js"; import type { PolicyRow, SkillId, @@ -208,6 +209,20 @@ export async function runSkill( } repos.skills.upsert(row); + if (!row.vec && deps.embedder) { + repos.embeddingRetryQueue.enqueue({ + id: `er_${ids.span()}`, + targetKind: "skill", + targetId: row.id, + vectorField: "vec", + sourceText: built.vecSource || row.invocationGuide || row.name, + now: nowMs(), + }); + warnings.push({ + skillId: row.id, + reason: "embedding retry queued for skill vector", + }); + } timings.persist += nowMs() - tPersist; if (decision.action === "rebuild") rebuilt += 1; diff --git a/apps/memos-local-plugin/core/storage/migrations/002-embedding-retry-queue.sql b/apps/memos-local-plugin/core/storage/migrations/002-embedding-retry-queue.sql new file mode 100644 index 000000000..ce8bdc7e4 --- /dev/null +++ b/apps/memos-local-plugin/core/storage/migrations/002-embedding-retry-queue.sql @@ -0,0 +1,24 @@ +-- Persistent compensation queue for failed embedding writes. + +CREATE TABLE IF NOT EXISTS embedding_retry_queue ( + id TEXT PRIMARY KEY, + target_kind TEXT NOT NULL CHECK (target_kind IN ('trace','policy','world_model','skill')), + target_id TEXT NOT NULL, + vector_field TEXT NOT NULL CHECK (vector_field IN ('vec_summary','vec_action','vec')), + source_text TEXT NOT NULL, + embed_role TEXT NOT NULL CHECK (embed_role IN ('document','query')) DEFAULT 'document', + status TEXT NOT NULL CHECK (status IN ('pending','in_progress','failed','succeeded')) DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 6, + next_attempt_at INTEGER NOT NULL, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + UNIQUE (target_kind, target_id, vector_field) +) STRICT; + +CREATE INDEX IF NOT EXISTS idx_embedding_retry_due + ON embedding_retry_queue(status, next_attempt_at); + +CREATE INDEX IF NOT EXISTS idx_embedding_retry_target + ON embedding_retry_queue(target_kind, target_id); diff --git a/apps/memos-local-plugin/core/storage/migrations/003-embedding-retry-lease.sql b/apps/memos-local-plugin/core/storage/migrations/003-embedding-retry-lease.sql new file mode 100644 index 000000000..b6cda44fa --- /dev/null +++ b/apps/memos-local-plugin/core/storage/migrations/003-embedding-retry-lease.sql @@ -0,0 +1,4 @@ +-- Lease fields for multi-process-safe embedding retry claims. + +ALTER TABLE embedding_retry_queue ADD COLUMN claimed_by TEXT; +ALTER TABLE embedding_retry_queue ADD COLUMN lease_until INTEGER; diff --git a/apps/memos-local-plugin/core/storage/migrator.ts b/apps/memos-local-plugin/core/storage/migrator.ts index a6eb0c0a0..aa060caf2 100644 --- a/apps/memos-local-plugin/core/storage/migrator.ts +++ b/apps/memos-local-plugin/core/storage/migrator.ts @@ -107,10 +107,9 @@ export function runMigrations(db: StorageDb, dir: string = defaultMigrationsDir( skipped++; continue; } - const sql = fs.readFileSync(file.fullPath, "utf8"); const t0 = now(); db.tx(() => { - db.exec(sql); + applyMigration(db, file); db.prepare( `INSERT INTO schema_migrations (version, name, applied_at) VALUES (@version, @name, @applied_at)`, ).run({ version: file.version, name: file.name, applied_at: now() }); @@ -149,6 +148,28 @@ function migrationNeedsUnsafeMode(fullPath: string): boolean { return /PRAGMA\s+writable_schema/i.test(sql); } +function applyMigration(db: StorageDb, file: MigrationFile): void { + if (file.version === 3 && file.name === "embedding-retry-lease") { + ensureEmbeddingRetryLeaseColumns(db); + return; + } + db.exec(fs.readFileSync(file.fullPath, "utf8")); +} + +function ensureEmbeddingRetryLeaseColumns(db: StorageDb): void { + const columns = new Set( + db.prepare(`PRAGMA table_info(embedding_retry_queue)`) + .all() + .map((row) => row.name), + ); + if (!columns.has("claimed_by")) { + db.exec(`ALTER TABLE embedding_retry_queue ADD COLUMN claimed_by TEXT`); + } + if (!columns.has("lease_until")) { + db.exec(`ALTER TABLE embedding_retry_queue ADD COLUMN lease_until INTEGER`); + } +} + function ensureSchemaMigrationsTable(db: StorageDb): void { db.exec( `CREATE TABLE IF NOT EXISTS schema_migrations ( diff --git a/apps/memos-local-plugin/core/storage/repos/embedding_retry_queue.ts b/apps/memos-local-plugin/core/storage/repos/embedding_retry_queue.ts new file mode 100644 index 000000000..1410f0b15 --- /dev/null +++ b/apps/memos-local-plugin/core/storage/repos/embedding_retry_queue.ts @@ -0,0 +1,419 @@ +import type { StorageDb } from "../types.js"; + +export type EmbeddingRetryTargetKind = "trace" | "policy" | "world_model" | "skill"; +export type EmbeddingRetryVectorField = "vec_summary" | "vec_action" | "vec"; +export type EmbeddingRetryStatus = "pending" | "in_progress" | "failed" | "succeeded"; + +export interface EmbeddingRetryJob { + id: string; + targetKind: EmbeddingRetryTargetKind; + targetId: string; + vectorField: EmbeddingRetryVectorField; + sourceText: string; + embedRole: "document" | "query"; + status: EmbeddingRetryStatus; + attempts: number; + maxAttempts: number; + nextAttemptAt: number; + claimedBy: string | null; + leaseUntil: number | null; + lastError: string | null; + createdAt: number; + updatedAt: number; +} + +export interface EmbeddingRetryClaim { + workerId: string; + leaseUntil: number; +} + +interface RawEmbeddingRetryJob { + id: string; + target_kind: EmbeddingRetryTargetKind; + target_id: string; + vector_field: EmbeddingRetryVectorField; + source_text: string; + embed_role: "document" | "query"; + status: EmbeddingRetryStatus; + attempts: number; + max_attempts: number; + next_attempt_at: number; + claimed_by: string | null; + lease_until: number | null; + last_error: string | null; + created_at: number; + updated_at: number; +} + +export function makeEmbeddingRetryQueueRepo(db: StorageDb) { + const columns = ` + id, target_kind, target_id, vector_field, source_text, embed_role, status, + attempts, max_attempts, next_attempt_at, claimed_by, lease_until, last_error, + created_at, updated_at + `; + + return { + enqueue(input: { + id: string; + targetKind: EmbeddingRetryTargetKind; + targetId: string; + vectorField: EmbeddingRetryVectorField; + sourceText: string; + embedRole?: "document" | "query"; + maxAttempts?: number; + now: number; + }): void { + db.prepare<{ + id: string; + target_kind: EmbeddingRetryTargetKind; + target_id: string; + vector_field: EmbeddingRetryVectorField; + source_text: string; + embed_role: "document" | "query"; + max_attempts: number; + now: number; + }>( + `INSERT INTO embedding_retry_queue ( + id, target_kind, target_id, vector_field, source_text, embed_role, + status, attempts, max_attempts, next_attempt_at, last_error, created_at, updated_at + ) VALUES ( + @id, @target_kind, @target_id, @vector_field, @source_text, @embed_role, + 'pending', 0, @max_attempts, @now, NULL, @now, @now + ) + ON CONFLICT(target_kind, target_id, vector_field) DO UPDATE SET + source_text=excluded.source_text, + embed_role=excluded.embed_role, + status=CASE + WHEN embedding_retry_queue.status IN ('failed','succeeded') THEN 'pending' + ELSE embedding_retry_queue.status + END, + attempts=CASE + WHEN embedding_retry_queue.status IN ('failed','succeeded') THEN 0 + ELSE embedding_retry_queue.attempts + END, + claimed_by=NULL, + lease_until=NULL, + last_error=CASE + WHEN embedding_retry_queue.status IN ('failed','succeeded') THEN NULL + ELSE embedding_retry_queue.last_error + END, + max_attempts=excluded.max_attempts, + next_attempt_at=MIN(embedding_retry_queue.next_attempt_at, excluded.next_attempt_at), + updated_at=excluded.updated_at`, + ).run({ + id: input.id, + target_kind: input.targetKind, + target_id: input.targetId, + vector_field: input.vectorField, + source_text: input.sourceText, + embed_role: input.embedRole ?? "document", + max_attempts: input.maxAttempts ?? 6, + now: input.now, + }); + }, + + claimDue(input: { + now: number; + workerId: string; + leaseUntil: number; + limit?: number; + }): EmbeddingRetryJob[] { + const limit = Math.max(1, Math.min(200, Math.floor(input.limit ?? 25))); + const ids = db.tx(() => { + const rows = db.prepare<{ + now: number; + limit: number; + }, { id: string }>( + `SELECT id + FROM embedding_retry_queue + WHERE ( + status='pending' + OR (status='in_progress' AND lease_until IS NOT NULL AND lease_until <= @now) + ) + AND next_attempt_at <= @now + ORDER BY next_attempt_at ASC, created_at ASC + LIMIT @limit`, + ).all({ now: input.now, limit }); + if (rows.length === 0) return [] as string[]; + const picked = rows.map((r) => r.id); + const placeholders = picked.map((_, i) => `@id${i}`).join(","); + const params: Record = { + worker_id: input.workerId, + lease_until: input.leaseUntil, + now: input.now, + }; + picked.forEach((id, i) => { params[`id${i}`] = id; }); + db.prepare( + `UPDATE embedding_retry_queue + SET status='in_progress', + claimed_by=@worker_id, + lease_until=@lease_until, + updated_at=@now + WHERE id IN (${placeholders}) + AND ( + status='pending' + OR (status='in_progress' AND lease_until IS NOT NULL AND lease_until <= @now) + ) + AND next_attempt_at <= @now`, + ).run(params); + const claimed = db.prepare( + `SELECT id + FROM embedding_retry_queue + WHERE id IN (${placeholders}) + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until + ORDER BY next_attempt_at ASC, created_at ASC`, + ).all(params); + return claimed.map((r) => r.id); + }); + if (ids.length === 0) return []; + const placeholders = ids.map((_, i) => `@id${i}`).join(","); + const params: Record = {}; + ids.forEach((id, i) => { params[`id${i}`] = id; }); + const rows = db.prepare( + `SELECT ${columns} + FROM embedding_retry_queue + WHERE id IN (${placeholders}) + ORDER BY next_attempt_at ASC, created_at ASC`, + ).all(params); + return rows.map(mapRow); + }, + + listDue(now: number, limit = 25): EmbeddingRetryJob[] { + const rows = db.prepare<{ now: number; limit: number }, RawEmbeddingRetryJob>( + `SELECT ${columns} + FROM embedding_retry_queue + WHERE status='pending' + AND next_attempt_at <= @now + ORDER BY next_attempt_at ASC, created_at ASC + LIMIT @limit`, + ).all({ now, limit: Math.max(1, Math.min(200, Math.floor(limit))) }); + return rows.map(mapRow); + }, + + transact(fn: () => T): T { + return db.tx(fn); + }, + + touchClaimHeld(id: string, input: EmbeddingRetryClaim & { now: number }): boolean { + const res = db.prepare<{ + id: string; + worker_id: string; + lease_until: number; + now: number; + }>( + `UPDATE embedding_retry_queue + SET updated_at=@now + WHERE id=@id + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until`, + ).run({ + id, + worker_id: input.workerId, + lease_until: input.leaseUntil, + now: input.now, + }); + return res.changes > 0; + }, + + isClaimHeld(id: string, input: EmbeddingRetryClaim): boolean { + const row = db.prepare<{ + id: string; + worker_id: string; + lease_until: number; + }, { n: number }>( + `SELECT COUNT(*) AS n + FROM embedding_retry_queue + WHERE id=@id + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until`, + ).get({ + id, + worker_id: input.workerId, + lease_until: input.leaseUntil, + }); + return (row?.n ?? 0) > 0; + }, + + markRetry(id: string, input: { attempts: number; nextAttemptAt: number; error: string; now: number }): void { + db.prepare<{ + id: string; + attempts: number; + next_attempt_at: number; + last_error: string; + now: number; + }>( + `UPDATE embedding_retry_queue + SET status='pending', + attempts=@attempts, + next_attempt_at=@next_attempt_at, + claimed_by=NULL, + lease_until=NULL, + last_error=@last_error, + updated_at=@now + WHERE id=@id`, + ).run({ + id, + attempts: input.attempts, + next_attempt_at: input.nextAttemptAt, + last_error: input.error, + now: input.now, + }); + }, + + markRetryClaimed( + id: string, + input: EmbeddingRetryClaim & { attempts: number; nextAttemptAt: number; error: string; now: number }, + ): boolean { + const res = db.prepare<{ + id: string; + worker_id: string; + lease_until: number; + attempts: number; + next_attempt_at: number; + last_error: string; + now: number; + }>( + `UPDATE embedding_retry_queue + SET status='pending', + attempts=@attempts, + next_attempt_at=@next_attempt_at, + claimed_by=NULL, + lease_until=NULL, + last_error=@last_error, + updated_at=@now + WHERE id=@id + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until`, + ).run({ + id, + worker_id: input.workerId, + lease_until: input.leaseUntil, + attempts: input.attempts, + next_attempt_at: input.nextAttemptAt, + last_error: input.error, + now: input.now, + }); + return res.changes > 0; + }, + + markFailed(id: string, input: { attempts: number; error: string; now: number }): void { + db.prepare<{ id: string; attempts: number; last_error: string; now: number }>( + `UPDATE embedding_retry_queue + SET status='failed', + attempts=@attempts, + claimed_by=NULL, + lease_until=NULL, + last_error=@last_error, + updated_at=@now + WHERE id=@id`, + ).run({ id, attempts: input.attempts, last_error: input.error, now: input.now }); + }, + + markFailedClaimed( + id: string, + input: EmbeddingRetryClaim & { attempts: number; error: string; now: number }, + ): boolean { + const res = db.prepare<{ + id: string; + worker_id: string; + lease_until: number; + attempts: number; + last_error: string; + now: number; + }>( + `UPDATE embedding_retry_queue + SET status='failed', + attempts=@attempts, + claimed_by=NULL, + lease_until=NULL, + last_error=@last_error, + updated_at=@now + WHERE id=@id + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until`, + ).run({ + id, + worker_id: input.workerId, + lease_until: input.leaseUntil, + attempts: input.attempts, + last_error: input.error, + now: input.now, + }); + return res.changes > 0; + }, + + markSucceeded(id: string, now: number): void { + db.prepare<{ id: string; now: number }>( + `UPDATE embedding_retry_queue + SET status='succeeded', + next_attempt_at=@now, + claimed_by=NULL, + lease_until=NULL, + updated_at=@now + WHERE id=@id`, + ).run({ id, now }); + }, + + markSucceededClaimed( + id: string, + input: EmbeddingRetryClaim & { now: number }, + ): boolean { + const res = db.prepare<{ + id: string; + worker_id: string; + lease_until: number; + now: number; + }>( + `UPDATE embedding_retry_queue + SET status='succeeded', + next_attempt_at=@now, + claimed_by=NULL, + lease_until=NULL, + updated_at=@now + WHERE id=@id + AND status='in_progress' + AND claimed_by=@worker_id + AND lease_until=@lease_until`, + ).run({ + id, + worker_id: input.workerId, + lease_until: input.leaseUntil, + now: input.now, + }); + return res.changes > 0; + }, + + countByStatus(status: EmbeddingRetryStatus): number { + const row = db.prepare<{ status: string }, { n: number }>( + `SELECT COUNT(*) AS n FROM embedding_retry_queue WHERE status=@status`, + ).get({ status }); + return row?.n ?? 0; + }, + }; +} + +function mapRow(row: RawEmbeddingRetryJob): EmbeddingRetryJob { + return { + id: row.id, + targetKind: row.target_kind, + targetId: row.target_id, + vectorField: row.vector_field, + sourceText: row.source_text, + embedRole: row.embed_role, + status: row.status, + attempts: row.attempts, + maxAttempts: row.max_attempts, + nextAttemptAt: row.next_attempt_at, + claimedBy: row.claimed_by, + leaseUntil: row.lease_until, + lastError: row.last_error, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} diff --git a/apps/memos-local-plugin/core/storage/repos/index.ts b/apps/memos-local-plugin/core/storage/repos/index.ts index 8f4624216..11169b1b5 100644 --- a/apps/memos-local-plugin/core/storage/repos/index.ts +++ b/apps/memos-local-plugin/core/storage/repos/index.ts @@ -9,6 +9,7 @@ import { makeApiLogsRepo } from "./api_logs.js"; import { makeAuditRepo } from "./audit.js"; import { makeCandidatePoolRepo } from "./candidate_pool.js"; import { makeDecisionRepairsRepo } from "./decision_repairs.js"; +import { makeEmbeddingRetryQueueRepo } from "./embedding_retry_queue.js"; import { makeEpisodesRepo } from "./episodes.js"; import { makeFeedbackRepo } from "./feedback.js"; import { makeKvRepo } from "./kv.js"; @@ -24,6 +25,7 @@ export interface Repos { audit: ReturnType; candidatePool: ReturnType; decisionRepairs: ReturnType; + embeddingRetryQueue: ReturnType; episodes: ReturnType; feedback: ReturnType; kv: ReturnType; @@ -41,6 +43,7 @@ export function makeRepos(db: StorageDb): Repos { audit: makeAuditRepo(db), candidatePool: makeCandidatePoolRepo(db), decisionRepairs: makeDecisionRepairsRepo(db), + embeddingRetryQueue: makeEmbeddingRetryQueueRepo(db), episodes: makeEpisodesRepo(db), feedback: makeFeedbackRepo(db), kv: makeKvRepo(db), @@ -58,6 +61,7 @@ export { makeApiLogsRepo } from "./api_logs.js"; export { makeAuditRepo } from "./audit.js"; export { makeCandidatePoolRepo } from "./candidate_pool.js"; export { makeDecisionRepairsRepo } from "./decision_repairs.js"; +export { makeEmbeddingRetryQueueRepo } from "./embedding_retry_queue.js"; export { makeEpisodesRepo } from "./episodes.js"; export { makeFeedbackRepo } from "./feedback.js"; export { makeKvRepo } from "./kv.js"; diff --git a/apps/memos-local-plugin/core/storage/repos/policies.ts b/apps/memos-local-plugin/core/storage/repos/policies.ts index 50792e1c8..aab5f3902 100644 --- a/apps/memos-local-plugin/core/storage/repos/policies.ts +++ b/apps/memos-local-plugin/core/storage/repos/policies.ts @@ -231,6 +231,13 @@ export function makePoliciesRepo(db: StorageDb) { const sql = `UPDATE policies SET ${sets.join(", ")} WHERE id = @id`; db.prepare(sql).run(params); }, + + updateVector(id: PolicyId, vec: EmbeddingVector): boolean { + const res = db.prepare<{ id: string; vec: Buffer; updated_at: number }>( + `UPDATE policies SET vec=@vec, updated_at=@updated_at WHERE id=@id`, + ).run({ id, vec: toBlob(vec)!, updated_at: Date.now() }); + return res.changes > 0; + }, }; } diff --git a/apps/memos-local-plugin/core/storage/repos/skills.ts b/apps/memos-local-plugin/core/storage/repos/skills.ts index 1529e2255..e294526aa 100644 --- a/apps/memos-local-plugin/core/storage/repos/skills.ts +++ b/apps/memos-local-plugin/core/storage/repos/skills.ts @@ -320,6 +320,13 @@ export function makeSkillsRepo(db: StorageDb) { const sql = `UPDATE skills SET ${sets.join(", ")} WHERE id = @id`; db.prepare(sql).run(params); }, + + updateVector(id: SkillId, vec: EmbeddingVector): boolean { + const res = db.prepare<{ id: string; vec: Buffer; updated_at: number }>( + `UPDATE skills SET vec=@vec, updated_at=@updated_at WHERE id=@id`, + ).run({ id, vec: toBlob(vec)!, updated_at: Date.now() }); + return res.changes > 0; + }, }; } diff --git a/apps/memos-local-plugin/core/storage/repos/traces.ts b/apps/memos-local-plugin/core/storage/repos/traces.ts index fe97a1bae..ecd26c2fd 100644 --- a/apps/memos-local-plugin/core/storage/repos/traces.ts +++ b/apps/memos-local-plugin/core/storage/repos/traces.ts @@ -487,6 +487,18 @@ export function makeTracesRepo(db: StorageDb) { db.prepare(sql).run(params); }, + updateVector( + id: TraceId, + field: "vecSummary" | "vecAction", + vec: EmbeddingVector, + ): boolean { + const column = field === "vecAction" ? "vec_action" : "vec_summary"; + const res = db.prepare<{ id: string; vec: Buffer }>( + `UPDATE traces SET ${column}=@vec WHERE id=@id`, + ).run({ id, vec: toBlob(vec)! }); + return res.changes > 0; + }, + /** * Fill in reflection + α for a trace that was previously written * in the "lite" capture phase (reflection=null, α=0). Invoked diff --git a/apps/memos-local-plugin/core/storage/repos/world_model.ts b/apps/memos-local-plugin/core/storage/repos/world_model.ts index 3130422dc..b578ec680 100644 --- a/apps/memos-local-plugin/core/storage/repos/world_model.ts +++ b/apps/memos-local-plugin/core/storage/repos/world_model.ts @@ -322,6 +322,13 @@ export function makeWorldModelRepo(db: StorageDb) { const sql = `UPDATE world_model SET ${sets.join(", ")} WHERE id = @id`; db.prepare(sql).run(params); }, + + updateVector(id: WorldModelId, vec: EmbeddingVector): boolean { + const res = db.prepare<{ id: string; vec: Buffer; updated_at: number }>( + `UPDATE world_model SET vec=@vec, updated_at=@updated_at WHERE id=@id`, + ).run({ id, vec: toBlob(vec)!, updated_at: Date.now() }); + return res.changes > 0; + }, }; } diff --git a/apps/memos-local-plugin/tests/unit/capture/capture.test.ts b/apps/memos-local-plugin/tests/unit/capture/capture.test.ts index dcc87a29f..a233f34f2 100644 --- a/apps/memos-local-plugin/tests/unit/capture/capture.test.ts +++ b/apps/memos-local-plugin/tests/unit/capture/capture.test.ts @@ -10,6 +10,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it } from "vitest"; import { createCaptureEventBus } from "../../../core/capture/events.js"; import { createCaptureRunner, type CaptureRunner } from "../../../core/capture/capture.js"; +import type { Embedder } from "../../../core/embedding/types.js"; import { REFLECTION_SCORE_PROMPT } from "../../../core/llm/prompts/reflection.js"; import type { CaptureConfig, @@ -168,11 +169,16 @@ describe("capture/pipeline (end-to-end)", () => { tmp.cleanup(); }); - function buildRunner(overrides: Partial = {}, llm: ReturnType | null = null): CaptureRunner { + function buildRunner( + overrides: Partial = {}, + llm: ReturnType | null = null, + embedder: Embedder | null = fakeEmbedder({ dimensions: 8 }), + ): CaptureRunner { return createCaptureRunner({ tracesRepo: tmp.repos.traces, + embeddingRetryQueue: tmp.repos.embeddingRetryQueue, episodesRepo, - embedder: fakeEmbedder({ dimensions: 8 }), + embedder, llm, reflectLlm: llm, bus, @@ -350,6 +356,7 @@ describe("capture/pipeline (end-to-end)", () => { // chain. Two starts + one done is the correct topology. expect(seen.map((e) => e.kind)).toEqual([ "capture.started", + "capture.lite.done", "capture.started", "capture.done", ]); @@ -379,5 +386,35 @@ describe("capture/pipeline (end-to-end)", () => { const t = tmp.repos.traces.getById(result.traceIds[0]!)!; expect(t.vecSummary).toBeNull(); expect(t.vecAction).toBeNull(); + expect(tmp.repos.embeddingRetryQueue.countByStatus("pending")).toBe(0); + }); + + it("embedder failure queues missing trace vectors for retry", async () => { + const runner = buildRunner( + { embedTraces: true, alphaScoring: false }, + null, + fakeEmbedder({ dimensions: 8, throwWith: new Error("embedder offline") }), + ); + const ep = episodeSnapshot({ + id: "ep_1", + sessionId: "se_1", + turns: [turn("user", "q", 1_000), turn("assistant", "a", 1_100)], + }); + + const result = await runCapture(runner, ep); + const t = tmp.repos.traces.getById(result.traceIds[0]!)!; + + expect(t.vecSummary).toBeNull(); + expect(t.vecAction).toBeNull(); + expect(result.warnings).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + stage: "embed", + message: "embedding retry queued for missing trace vectors", + detail: { queued: 2 }, + }), + ]), + ); + expect(tmp.repos.embeddingRetryQueue.countByStatus("pending")).toBe(2); }); }); diff --git a/apps/memos-local-plugin/tests/unit/embedding/retry-queue.test.ts b/apps/memos-local-plugin/tests/unit/embedding/retry-queue.test.ts new file mode 100644 index 000000000..53045aadb --- /dev/null +++ b/apps/memos-local-plugin/tests/unit/embedding/retry-queue.test.ts @@ -0,0 +1,189 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { makeTmpDb, type TmpDbHandle } from "../../helpers/tmp-db.js"; + +const NOW = 1_700_000_000_000; + +interface QueueRow { + id: string; + status: string; + attempts: number; + next_attempt_at: number; + claimed_by: string | null; + lease_until: number | null; + last_error: string | null; + source_text: string; +} + +function row(handle: TmpDbHandle, id: string): QueueRow { + return handle.db.prepare<{ id: string }, QueueRow>( + `SELECT id, status, attempts, next_attempt_at, claimed_by, lease_until, last_error, source_text + FROM embedding_retry_queue + WHERE id=@id`, + ).get({ id })!; +} + +describe("embedding retry queue", () => { + let handle: TmpDbHandle; + + beforeEach(() => { + handle = makeTmpDb({ agent: "openclaw" }); + }); + + afterEach(() => handle.cleanup()); + + it("does not claim jobs before next_attempt_at", () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_future", + targetKind: "trace", + targetId: "tr_1", + vectorField: "vec_summary", + sourceText: "future", + now: NOW + 60_000, + }); + + expect(handle.repos.embeddingRetryQueue.claimDue({ + now: NOW, + workerId: "worker-a", + leaseUntil: NOW + 300_000, + })).toEqual([]); + expect(row(handle, "er_future")).toMatchObject({ + status: "pending", + claimed_by: null, + lease_until: null, + }); + }); + + it("leases a due job once and allows another worker only after lease expiry", () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_due", + targetKind: "trace", + targetId: "tr_1", + vectorField: "vec_summary", + sourceText: "due", + now: NOW, + }); + + const first = handle.repos.embeddingRetryQueue.claimDue({ + now: NOW, + workerId: "worker-a", + leaseUntil: NOW + 300_000, + }); + expect(first.map((j) => j.id)).toEqual(["er_due"]); + expect(row(handle, "er_due")).toMatchObject({ + status: "in_progress", + claimed_by: "worker-a", + lease_until: NOW + 300_000, + }); + + expect(handle.repos.embeddingRetryQueue.claimDue({ + now: NOW + 10_000, + workerId: "worker-b", + leaseUntil: NOW + 310_000, + })).toEqual([]); + + const stolen = handle.repos.embeddingRetryQueue.claimDue({ + now: NOW + 300_000, + workerId: "worker-b", + leaseUntil: NOW + 600_000, + }); + expect(stolen.map((j) => j.id)).toEqual(["er_due"]); + expect(row(handle, "er_due")).toMatchObject({ + status: "in_progress", + claimed_by: "worker-b", + lease_until: NOW + 600_000, + }); + }); + + it("rejects stale completion from a worker whose lease was stolen", () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_stale", + targetKind: "trace", + targetId: "tr_1", + vectorField: "vec_summary", + sourceText: "due", + now: NOW, + }); + const first = handle.repos.embeddingRetryQueue.claimDue({ + now: NOW, + workerId: "worker-a", + leaseUntil: NOW + 300_000, + })[0]!; + const stolen = handle.repos.embeddingRetryQueue.claimDue({ + now: NOW + 300_000, + workerId: "worker-b", + leaseUntil: NOW + 600_000, + })[0]!; + + expect(handle.repos.embeddingRetryQueue.markSucceededClaimed(first.id, { + workerId: first.claimedBy!, + leaseUntil: first.leaseUntil!, + now: NOW + 300_001, + })).toBe(false); + expect(handle.repos.embeddingRetryQueue.markRetryClaimed(first.id, { + workerId: first.claimedBy!, + leaseUntil: first.leaseUntil!, + attempts: 1, + nextAttemptAt: NOW + 360_000, + error: "stale failure", + now: NOW + 300_001, + })).toBe(false); + expect(row(handle, "er_stale")).toMatchObject({ + status: "in_progress", + claimed_by: "worker-b", + lease_until: NOW + 600_000, + attempts: 0, + last_error: null, + }); + + expect(handle.repos.embeddingRetryQueue.markSucceededClaimed(stolen.id, { + workerId: stolen.claimedBy!, + leaseUntil: stolen.leaseUntil!, + now: NOW + 300_002, + })).toBe(true); + expect(row(handle, "er_stale")).toMatchObject({ + status: "succeeded", + claimed_by: null, + lease_until: null, + }); + }); + + it("reenqueue resets terminal jobs and clears stale lease/error state", () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_reset", + targetKind: "trace", + targetId: "tr_1", + vectorField: "vec_summary", + sourceText: "old", + now: NOW, + }); + handle.repos.embeddingRetryQueue.claimDue({ + now: NOW, + workerId: "worker-a", + leaseUntil: NOW + 300_000, + }); + handle.repos.embeddingRetryQueue.markFailed("er_reset", { + attempts: 6, + error: "embedder down", + now: NOW + 1, + }); + + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_reset_2", + targetKind: "trace", + targetId: "tr_1", + vectorField: "vec_summary", + sourceText: "new", + now: NOW + 2, + }); + + expect(row(handle, "er_reset")).toMatchObject({ + status: "pending", + attempts: 0, + claimed_by: null, + lease_until: null, + last_error: null, + source_text: "new", + }); + }); +}); diff --git a/apps/memos-local-plugin/tests/unit/embedding/retry-worker.test.ts b/apps/memos-local-plugin/tests/unit/embedding/retry-worker.test.ts new file mode 100644 index 000000000..f08a37f32 --- /dev/null +++ b/apps/memos-local-plugin/tests/unit/embedding/retry-worker.test.ts @@ -0,0 +1,204 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { createEmbeddingRetryWorker } from "../../../core/embedding/retry-worker.js"; +import { rootLogger } from "../../../core/logger/index.js"; +import type { EpisodeId, SessionId, TraceId } from "../../../core/types.js"; +import { makeTmpDb, type TmpDbHandle } from "../../helpers/tmp-db.js"; +import { fakeEmbedder } from "../../helpers/fake-embedder.js"; + +const NOW = 1_700_000_000_000; + +interface QueueRow { + status: string; + attempts: number; + next_attempt_at: number; + claimed_by: string | null; + lease_until: number | null; + last_error: string | null; +} + +function queueRow(handle: TmpDbHandle, id: string): QueueRow { + return handle.db.prepare<{ id: string }, QueueRow>( + `SELECT status, attempts, next_attempt_at, claimed_by, lease_until, last_error + FROM embedding_retry_queue + WHERE id=@id`, + ).get({ id })!; +} + +describe("embedding retry worker", () => { + let handle: TmpDbHandle; + + beforeEach(() => { + handle = makeTmpDb({ agent: "openclaw" }); + handle.repos.sessions.upsert({ + id: "s1" as SessionId, + agent: "openclaw", + startedAt: NOW, + lastSeenAt: NOW, + meta: {}, + }); + handle.repos.episodes.upsert({ + id: "ep1" as EpisodeId, + sessionId: "s1" as SessionId, + startedAt: NOW as never, + endedAt: null, + traceIds: [], + rTask: null, + status: "open", + }); + handle.repos.traces.insert({ + id: "tr_retry" as TraceId, + episodeId: "ep1" as EpisodeId, + sessionId: "s1" as SessionId, + ts: NOW as never, + userText: "retry me", + agentText: "ok", + toolCalls: [], + reflection: null, + value: 0, + alpha: 0, + rHuman: null, + priority: 0.5, + tags: [], + vecSummary: null, + vecAction: null, + turnId: NOW as never, + schemaVersion: 1, + }); + }); + + afterEach(() => handle.cleanup()); + + it("fills a queued trace vector", async () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_ok", + targetKind: "trace", + targetId: "tr_retry", + vectorField: "vec_summary", + sourceText: "retry me", + now: NOW, + }); + const worker = createEmbeddingRetryWorker({ + repos: handle.repos, + embedder: fakeEmbedder({ dimensions: 8 }), + log: rootLogger.child({ channel: "test.embedding.retry" }), + now: () => NOW, + intervalMs: 60_000, + }); + + await worker.flush(); + + expect(handle.repos.traces.getById("tr_retry" as TraceId)?.vecSummary).not.toBeNull(); + expect(handle.repos.embeddingRetryQueue.countByStatus("succeeded")).toBe(1); + }); + + it("marks terminal failures and records a system error", async () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_fail", + targetKind: "trace", + targetId: "tr_retry", + vectorField: "vec_summary", + sourceText: "retry me", + maxAttempts: 1, + now: NOW, + }); + const events: unknown[] = []; + const worker = createEmbeddingRetryWorker({ + repos: handle.repos, + embedder: fakeEmbedder({ throwWith: new Error("retry boom") }), + log: rootLogger.child({ channel: "test.embedding.retry" }), + now: () => NOW, + onSystemError: (payload) => events.push(payload), + }); + + await worker.flush(); + + expect(handle.repos.embeddingRetryQueue.countByStatus("failed")).toBe(1); + expect(handle.repos.apiLogs.list({ toolName: "system_error", limit: 5, offset: 0 })).toHaveLength(1); + expect(events).toHaveLength(1); + }); + + it("leaves queued work untouched when no embedder is configured", async () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_no_embedder", + targetKind: "trace", + targetId: "tr_retry", + vectorField: "vec_summary", + sourceText: "retry me", + now: NOW, + }); + const worker = createEmbeddingRetryWorker({ + repos: handle.repos, + embedder: null, + log: rootLogger.child({ channel: "test.embedding.retry" }), + now: () => NOW, + }); + + await worker.flush(); + + expect(queueRow(handle, "er_no_embedder")).toMatchObject({ + status: "pending", + attempts: 0, + claimed_by: null, + lease_until: null, + }); + }); + + it("backs off non-terminal embedder failures and clears the lease", async () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_retry", + targetKind: "trace", + targetId: "tr_retry", + vectorField: "vec_summary", + sourceText: "retry me", + maxAttempts: 3, + now: NOW, + }); + const worker = createEmbeddingRetryWorker({ + repos: handle.repos, + embedder: fakeEmbedder({ throwWith: new Error("temporary outage") }), + log: rootLogger.child({ channel: "test.embedding.retry" }), + now: () => NOW, + }); + + await worker.flush(); + + expect(queueRow(handle, "er_retry")).toMatchObject({ + status: "pending", + attempts: 1, + next_attempt_at: NOW + 60_000, + claimed_by: null, + lease_until: null, + last_error: "temporary outage", + }); + expect(handle.repos.apiLogs.list({ toolName: "system_error", limit: 5, offset: 0 })).toHaveLength(1); + }); + + it("treats missing target rows as retry failures", async () => { + handle.repos.embeddingRetryQueue.enqueue({ + id: "er_missing", + targetKind: "trace", + targetId: "tr_missing", + vectorField: "vec_summary", + sourceText: "orphan", + maxAttempts: 1, + now: NOW, + }); + const worker = createEmbeddingRetryWorker({ + repos: handle.repos, + embedder: fakeEmbedder({ dimensions: 8 }), + log: rootLogger.child({ channel: "test.embedding.retry" }), + now: () => NOW, + }); + + await worker.flush(); + + expect(queueRow(handle, "er_missing")).toMatchObject({ + status: "failed", + attempts: 1, + claimed_by: null, + lease_until: null, + last_error: "embedding retry target not found: trace:tr_missing", + }); + }); +}); diff --git a/apps/memos-local-plugin/tests/unit/retrieval/integration.test.ts b/apps/memos-local-plugin/tests/unit/retrieval/integration.test.ts index e13eb7b23..3779f9e46 100644 --- a/apps/memos-local-plugin/tests/unit/retrieval/integration.test.ts +++ b/apps/memos-local-plugin/tests/unit/retrieval/integration.test.ts @@ -311,6 +311,40 @@ describe("retrieval/integration", () => { // Graceful degradation: empty packet + started + done, not a throw. expect(res.packet.snippets.length).toBe(0); expect(res.stats.emptyPacket).toBe(true); + expect(res.stats.embedding).toMatchObject({ + attempted: true, + ok: false, + degraded: true, + errorMessage: "boom", + }); expect(kinds).toEqual(["retrieval.started", "retrieval.done"]); }); + + it("does not call the query embedder for blank turn-start text", async () => { + let calls = 0; + const deps: RetrievalDeps = { + ...makeDeps(handle), + embedder: { + embed: async () => { + calls++; + throw new Error("should not be called"); + }, + }, + }; + + const res = await turnStartRetrieve(deps, { + reason: "turn_start", + agent: "openclaw", + sessionId: "s1" as SessionId, + userText: " ", + ts: NOW as never, + }); + + expect(calls).toBe(0); + expect(res.stats.embedding).toMatchObject({ + attempted: false, + ok: false, + degraded: false, + }); + }); }); diff --git a/apps/memos-local-plugin/tests/unit/storage/migrator.test.ts b/apps/memos-local-plugin/tests/unit/storage/migrator.test.ts index c0d4ee997..82579389a 100644 --- a/apps/memos-local-plugin/tests/unit/storage/migrator.test.ts +++ b/apps/memos-local-plugin/tests/unit/storage/migrator.test.ts @@ -102,4 +102,56 @@ describe("storage/migrator", () => { db.close(); } }); + + it("treats embedding retry lease migration as satisfied when columns already exist", () => { + const { dbPath, cleanup } = tmpDb(); + cleanups.push(cleanup); + const db = openDb({ filepath: dbPath, agent: "openclaw" }); + try { + db.exec(` + CREATE TABLE schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied_at INTEGER NOT NULL + ) STRICT; + CREATE TABLE embedding_retry_queue ( + id TEXT PRIMARY KEY, + target_kind TEXT NOT NULL CHECK (target_kind IN ('trace','policy','world_model','skill')), + target_id TEXT NOT NULL, + vector_field TEXT NOT NULL CHECK (vector_field IN ('vec_summary','vec_action','vec')), + source_text TEXT NOT NULL, + embed_role TEXT NOT NULL CHECK (embed_role IN ('document','query')) DEFAULT 'document', + status TEXT NOT NULL CHECK (status IN ('pending','in_progress','failed','succeeded')) DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 6, + next_attempt_at INTEGER NOT NULL, + claimed_by TEXT, + lease_until INTEGER, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + UNIQUE (target_kind, target_id, vector_field) + ) STRICT; + INSERT INTO schema_migrations(version, name, applied_at) + VALUES (1, 'initial', 0), (2, 'embedding-retry-queue', 0); + `); + + const result = runMigrations(db); + + expect(result.applied.map((m) => m.version)).toContain(3); + const columns = db + .prepare(`PRAGMA table_info(embedding_retry_queue)`) + .all() + .map((row) => row.name); + expect(columns.filter((name) => name === "claimed_by")).toHaveLength(1); + expect(columns.filter((name) => name === "lease_until")).toHaveLength(1); + expect(db + .prepare<{ version: number }, { n: number }>( + `SELECT COUNT(*) AS n FROM schema_migrations WHERE version=@version`, + ) + .get({ version: 3 })?.n).toBe(1); + } finally { + db.close(); + } + }); }); diff --git a/apps/memos-local-plugin/web/src/views/LogsView.tsx b/apps/memos-local-plugin/web/src/views/LogsView.tsx index e7463c9ad..4fecfe013 100644 --- a/apps/memos-local-plugin/web/src/views/LogsView.tsx +++ b/apps/memos-local-plugin/web/src/views/LogsView.tsx @@ -371,6 +371,13 @@ interface RetrievalStatsPayload { channelHits?: Record; queryTokens?: number; queryTags?: string[]; + embedding?: { + attempted?: boolean; + ok?: boolean; + degraded?: boolean; + errorCode?: string; + errorMessage?: string; + }; } interface SearchCandidate { tier?: number; @@ -482,6 +489,11 @@ function RetrievalFunnel({ stats }: { stats: RetrievalStatsPayload }) { class="hstack" style="gap:var(--sp-3);flex-wrap:wrap;font-size:var(--fs-xs)" > + {stats.embedding?.degraded && ( + + embedder degraded · {stats.embedding.errorCode ?? stats.embedding.errorMessage ?? "failed"} + + )} raw {raw} ranked {ranked} {dropped > 0 && ( diff --git a/apps/memos-local-plugin/web/src/views/OverviewView.tsx b/apps/memos-local-plugin/web/src/views/OverviewView.tsx index bc0177f78..f304d9540 100644 --- a/apps/memos-local-plugin/web/src/views/OverviewView.tsx +++ b/apps/memos-local-plugin/web/src/views/OverviewView.tsx @@ -220,7 +220,9 @@ export function OverviewView() { {recent.map((evt) => (
{new Date(evt.ts).toLocaleTimeString()} - {evt.type} + + {evt.type} + {JSON.stringify(evt.payload ?? {}).slice(0, 240)}