Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions apps/memos-local-plugin/core/capture/capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { LlmClient } from "../llm/index.js";
import { rootLogger } from "../logger/index.js";
import { ids } from "../id.js";
import type { TraceRow, TraceId } from "../types.js";
import type { makeEmbeddingRetryQueueRepo } from "../storage/repos/embedding_retry_queue.js";
import type { makeTracesRepo } from "../storage/repos/traces.js";
import type { EpisodesRepo } from "../session/persistence.js";
import { disabledScore, scoreReflection } from "./alpha-scorer.js";
Expand All @@ -45,9 +46,11 @@ import type {
} from "./types.js";

type TracesRepo = ReturnType<typeof makeTracesRepo>;
type EmbeddingRetryQueueRepo = ReturnType<typeof makeEmbeddingRetryQueueRepo>;

export interface CaptureDeps {
tracesRepo: TracesRepo;
embeddingRetryQueue?: EmbeddingRetryQueueRepo;
episodesRepo: EpisodesRepo;
embedder: Embedder | null;
/** Main LLM — used for per-turn lite capture (summarisation). */
Expand Down Expand Up @@ -564,6 +567,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
): Promise<boolean> {
try {
for (const row of rows) deps.tracesRepo.insert(row);
enqueueMissingTraceVectors(rows, warnings);
} catch (err) {
const failure = errDetail(err);
log.error("persist.failed", {
Expand Down Expand Up @@ -599,6 +603,46 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
return true;
}

function enqueueMissingTraceVectors(
rows: TraceRow[],
warnings: CaptureResult["warnings"],
): void {
if (!deps.cfg.embedTraces || !deps.embeddingRetryQueue || !deps.embedder) return;
const queuedAt = now();
let queued = 0;
for (const row of rows) {
if (!row.vecSummary) {
deps.embeddingRetryQueue.enqueue({
id: `er_${ids.span()}`,
targetKind: "trace",
targetId: row.id,
vectorField: "vec_summary",
sourceText: row.summary?.trim() || row.userText.trim() || "(empty)",
now: queuedAt,
});
queued++;
}
if (!row.vecAction) {
deps.embeddingRetryQueue.enqueue({
id: `er_${ids.span()}`,
targetKind: "trace",
targetId: row.id,
vectorField: "vec_action",
sourceText: traceActionText(row),
now: queuedAt,
});
queued++;
}
}
if (queued > 0) {
warnings.push({
stage: "embed",
message: "embedding retry queued for missing trace vectors",
detail: { queued },
});
}
}

function finalResult(
input: CaptureInput,
startedAt: number,
Expand Down Expand Up @@ -805,6 +849,23 @@ function errDetail(err: unknown): Record<string, unknown> {
return { value: String(err) };
}

function traceActionText(row: Pick<TraceRow, "agentText" | "toolCalls">): string {
const toolSig = row.toolCalls
.map((t) => `${t.name}(${safeStringify(t.input).slice(0, 300)})`)
.join("; ");
return [row.agentText.trim(), toolSig].filter((s) => s.length > 0).join("\n---\n") || "(empty)";
}

function safeStringify(v: unknown): string {
if (v === undefined || v === null) return "";
if (typeof v === "string") return v;
try {
return JSON.stringify(v);
} catch {
return String(v);
}
}

/**
* Pull the `turnId` stamped by `step-extractor` out of the
* `StepCandidate.meta` blob. Falls back to the trace's own `ts` so
Expand Down
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/core/embedding/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export {
type EmbedCacheStats,
} from "./cache.js";
export { l2Normalize, enforceDim, postProcess, toFloat32 } from "./normalize.js";
export { createEmbeddingRetryWorker, systemErrorEvent } from "./retry-worker.js";
export type { EmbeddingRetryWorker } from "./retry-worker.js";
export type {
EmbedInput,
EmbedRole,
Expand Down
224 changes: 224 additions & 0 deletions apps/memos-local-plugin/core/embedding/retry-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import type { CoreEvent } from "../../agent-contract/events.js";
import { ids } from "../id.js";
import type { Embedder } from "./types.js";
import type { Logger } from "../logger/types.js";
import type { Repos } from "../storage/repos/index.js";
import type { EmbeddingRetryClaim, EmbeddingRetryJob } from "../storage/repos/embedding_retry_queue.js";
import type { EmbeddingVector } from "../types.js";

export interface EmbeddingRetryWorker {
start(): void;
stop(): void;
flush(): Promise<void>;
}

export interface EmbeddingRetryWorkerDeps {
repos: Repos;
embedder: Embedder | null;
log: Logger;
now?: () => number;
intervalMs?: number;
batchSize?: number;
onSystemError?: (payload: Record<string, unknown>, correlationId?: string) => void;
}

const DEFAULT_INTERVAL_MS = 60_000;
const BASE_BACKOFF_MS = 60_000;
const MAX_BACKOFF_MS = 60 * 60_000;
const DEFAULT_LEASE_MS = 5 * 60_000;

export function createEmbeddingRetryWorker(
deps: EmbeddingRetryWorkerDeps,
): EmbeddingRetryWorker {
const now = deps.now ?? Date.now;
const batchSize = deps.batchSize ?? 25;
const workerId = `embedding-retry-${ids.span()}`;
let timer: ReturnType<typeof setInterval> | null = null;
let running: Promise<void> | null = null;

async function runOnce(): Promise<void> {
if (!deps.embedder) return;
const at = now();
const jobs = deps.repos.embeddingRetryQueue.claimDue({
now: at,
workerId,
leaseUntil: at + DEFAULT_LEASE_MS,
limit: batchSize,
});
for (const job of jobs) {
await processJob(job);
}
}

async function processJob(job: EmbeddingRetryJob): Promise<void> {
if (!deps.embedder) return;
const claim = claimFor(job);
if (!claim) {
deps.log.debug("embedding_retry.stale_missing_claim", { jobId: job.id });
return;
}
const attemptNo = job.attempts + 1;
try {
const vec = await deps.embedder.embedOne({
text: job.sourceText || "(empty)",
role: job.embedRole,
});
const completed = deps.repos.embeddingRetryQueue.transact(() => {
const at = now();
if (!deps.repos.embeddingRetryQueue.touchClaimHeld(job.id, { ...claim, now: at })) {
return { stale: true, updated: false, completed: false };
}
const updated = applyVector(job, vec);
if (!updated) {
return { stale: false, updated: false, completed: false };
}
return {
stale: false,
updated: true,
completed: deps.repos.embeddingRetryQueue.markSucceededClaimed(job.id, {
...claim,
now: at,
}),
};
});
if (completed.stale) {
deps.log.debug("embedding_retry.stale_success_ignored", { jobId: job.id });
return;
}
if (!completed.updated) {
throw new Error(`embedding retry target not found: ${job.targetKind}:${job.targetId}`);
}
if (!completed.completed) {
deps.log.debug("embedding_retry.stale_success_ignored", { jobId: job.id });
return;
}
deps.log.info("embedding_retry.succeeded", {
jobId: job.id,
targetKind: job.targetKind,
targetId: job.targetId,
vectorField: job.vectorField,
attempts: attemptNo,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
const at = now();
const terminal = attemptNo >= job.maxAttempts;
const recorded = terminal
? deps.repos.embeddingRetryQueue.markFailedClaimed(job.id, {
...claim,
attempts: attemptNo,
error: message,
now: at,
})
: deps.repos.embeddingRetryQueue.markRetryClaimed(job.id, {
...claim,
attempts: attemptNo,
nextAttemptAt: at + backoffMs(attemptNo),
error: message,
now: at,
});
if (!recorded) {
deps.log.debug("embedding_retry.stale_failure_ignored", { jobId: job.id, terminal });
return;
}
emitFailure(job, attemptNo, message, terminal, at);
}
}

function claimFor(job: EmbeddingRetryJob): EmbeddingRetryClaim | null {
if (job.claimedBy !== workerId || job.leaseUntil === null) return null;
return { workerId, leaseUntil: job.leaseUntil };
}

function applyVector(job: EmbeddingRetryJob, vec: EmbeddingVector): boolean {
switch (job.targetKind) {
case "trace":
return deps.repos.traces.updateVector(
job.targetId as never,
job.vectorField === "vec_action" ? "vecAction" : "vecSummary",
vec,
);
case "policy":
return deps.repos.policies.updateVector(job.targetId as never, vec);
case "world_model":
return deps.repos.worldModel.updateVector(job.targetId as never, vec);
case "skill":
return deps.repos.skills.updateVector(job.targetId as never, vec);
}
}

function emitFailure(
job: EmbeddingRetryJob,
attempts: number,
message: string,
terminal: boolean,
at: number,
): void {
const payload = {
kind: "embedding.retry_failed",
jobId: job.id,
targetKind: job.targetKind,
targetId: job.targetId,
vectorField: job.vectorField,
attempts,
maxAttempts: job.maxAttempts,
terminal,
message,
};
deps.log.warn("embedding_retry.failed", payload);
try {
deps.repos.apiLogs.insert({
toolName: "system_error",
input: { role: "embedding_retry" },
output: payload,
durationMs: 0,
success: false,
calledAt: at,
});
} catch {
/* logging the retry failure is best-effort */
}
deps.onSystemError?.(payload, job.targetId);
}

function tick(): void {
if (running) return;
running = runOnce().finally(() => {
running = null;
});
}

return {
start(): void {
if (timer || !deps.embedder) return;
tick();
timer = setInterval(tick, deps.intervalMs ?? DEFAULT_INTERVAL_MS);
},
stop(): void {
if (timer) clearInterval(timer);
timer = null;
},
async flush(): Promise<void> {
tick();
if (running) await running;
},
};
}

function backoffMs(attemptNo: number): number {
return Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** Math.max(0, attemptNo - 1));
}

export function systemErrorEvent(
payload: Record<string, unknown>,
seq: number,
correlationId?: string,
): CoreEvent {
return {
type: "system.error",
ts: Date.now(),
seq,
correlationId,
payload,
};
}
28 changes: 27 additions & 1 deletion apps/memos-local-plugin/core/memory/l2/l2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import type {
TraceRow,
} from "../../types.js";
import type { Repos } from "../../storage/repos/index.js";
import { ids } from "../../id.js";
import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js";
import { associateTraces } from "./associate.js";
import { makeCandidatePool } from "./candidate-pool.js";
Expand All @@ -47,7 +48,7 @@ import type {
} from "./types.js";

export interface RunL2Deps {
repos: Pick<Repos, "candidatePool" | "policies" | "traces">;
repos: Pick<Repos, "candidatePool" | "embeddingRetryQueue" | "policies" | "traces">;
db: Parameters<typeof makeCandidatePool>[0]["db"];
llm: LlmClient | null;
log: Logger;
Expand Down Expand Up @@ -224,6 +225,21 @@ export async function runL2(
});
try {
repos.policies.insert(policy);
if (!policy.vec) {
repos.embeddingRetryQueue.enqueue({
id: `er_${ids.span()}`,
targetKind: "policy",
targetId: policy.id,
vectorField: "vec",
sourceText: policyVectorText(policy),
now: input.now ?? Date.now(),
});
warnings.push({
stage: "embed",
message: "embedding retry queued for policy vector",
detail: { policyId: policy.id },
});
}
pool.promote(bucket.candidateIds, policy.id);
touched.set(policy.id, policy);
inductionEvidenceByPolicy.set(
Expand Down Expand Up @@ -377,6 +393,16 @@ function stageWarn(
return { stage, message, detail };
}

function policyVectorText(policy: PolicyRow): string {
return [
policy.title,
policy.trigger,
policy.procedure,
policy.verification,
policy.boundary,
].filter(Boolean).join("\n");
}

function pickOnePerEpisode(traces: readonly TraceRow[]): TraceRow[] {
const byEp = new Map<string, TraceRow>();
for (const t of traces) {
Expand Down
2 changes: 1 addition & 1 deletion apps/memos-local-plugin/core/memory/l2/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type { L2Config, L2EventBus } from "./types.js";

export interface L2SubscriberDeps {
db: StorageDb;
repos: Pick<Repos, "candidatePool" | "policies" | "traces">;
repos: Pick<Repos, "candidatePool" | "embeddingRetryQueue" | "policies" | "traces">;
rewardBus: RewardEventBus;
l2Bus: L2EventBus;
llm: LlmClient | null;
Expand Down
Loading