Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 185 additions & 6 deletions apps/memos-local-plugin/core/capture/capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type { Embedder } from "../embedding/index.js";
import type { LlmClient } from "../llm/index.js";
import { rootLogger } from "../logger/index.js";
import { ids } from "../id.js";
import type { EpisodeRow, TraceRow, TraceId } from "../types.js";
import type { EpisodeRow, TraceRow, TraceId, EpochMs } from "../types.js";
import type { makeEmbeddingRetryQueueRepo } from "../storage/repos/embedding_retry_queue.js";
import type { makeTracesRepo } from "../storage/repos/traces.js";
import type { EpisodesRepo } from "../session/persistence.js";
Expand Down Expand Up @@ -79,6 +79,11 @@ export interface CaptureRunner {
* Safe to call after every `addTurn` cycle.
*/
runLite(input: CaptureInput): Promise<CaptureResult>;
/**
* Lightweight memory capture. Writes one trace per user/assistant turn
* instead of per tool/action step, and never emits `capture.done`.
*/
runLightweight(input: CaptureInput): Promise<CaptureResult>;
/**
* Topic-end "reflect" capture. Runs the batch reflection scorer over
* EVERY step of the (now-finalized) episode in one LLM call so the
Expand Down Expand Up @@ -231,6 +236,127 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
return result;
}

async function runLightweight(input: CaptureInput): Promise<CaptureResult> {
const startedAt = now();
const warnings: CaptureResult["warnings"] = [];
const llmCalls = newLlmCounters();

emit({
kind: "capture.started",
episodeId: input.episode.id,
sessionId: input.episode.sessionId,
});

const extractStart = now();
const rawAll = extractSteps(input.episode);
const existingTraces = deps.tracesRepo.list({ episodeId: input.episode.id });
const seenTurnIds = new Set(
existingTraces
.map((t) => t.turnId)
.filter((v): v is number => typeof v === "number" && Number.isFinite(v)),
);
const rawByTurn = new Map<number, StepCandidate[]>();
for (const step of rawAll) {
const turnId = pickTurnId(step.meta, step.ts);
if (seenTurnIds.has(turnId)) continue;
const bucket = rawByTurn.get(turnId) ?? [];
bucket.push(step);
rawByTurn.set(turnId, bucket);
}
const raw = Array.from(rawByTurn.entries())
.sort((a, b) => a[0] - b[0])
.map(([turnId, steps]) => mergeTurnSteps(input.episode.id, turnId, steps));
const extractMs = now() - extractStart;

const normStart = now();
const normalized = normalizeSteps(raw, deps.cfg);
const normalizeMs = now() - normStart;

if (normalized.length === 0) {
return emptyResult(input, startedAt, {
extract: extractMs,
normalize: normalizeMs,
}, llmCalls, warnings);
}

const scored: ScoredStep[] = normalized.map((s) => ({
...s,
reflection: { text: null, alpha: 0, usable: false, source: "none" },
}));

const summarizeStart = now();
const { summaries, summarizeMs } = await runSummarize(
scored,
summarizeStart,
llmCalls,
warnings,
{ episodeId: input.episode.id, phase: "lightweight" },
);

const { vecs: summaryOnlyVecs, embedMs } = await runEmbed(
scored,
summaries,
warnings,
{ summaryOnly: true },
);

const persistStart = now();
const rows = buildRows(scored, summaries, summaryOnlyVecs, input.episode, {
lightweightMemory: true,
});
const persisted = await persistRows(rows, input, warnings, {
skipActionVectorRetry: true,
});
if (!persisted) {
return finalResult(
input,
startedAt,
[],
scored.map(toCandidate(rows)),
{
extract: extractMs,
normalize: normalizeMs,
reflect: 0,
alpha: 0,
summarize: summarizeMs,
embed: embedMs,
persist: now() - persistStart,
},
llmCalls,
warnings,
);
}
const persistMs = now() - persistStart;

const result = finalResult(
input,
startedAt,
rows.map((r) => r.id),
buildTraceCandidates(scored, rows),
{
extract: extractMs,
normalize: normalizeMs,
reflect: 0,
alpha: 0,
summarize: summarizeMs,
embed: embedMs,
persist: persistMs,
},
llmCalls,
warnings,
);
log.info("capture.lightweight.done", {
episodeId: input.episode.id,
sessionId: input.episode.sessionId,
traces: result.traceIds.length,
llmCalls,
totalMs: result.completedAt - startedAt,
warnings: warnings.length,
});
emit({ kind: "capture.lite.done", result });
return result;
}

/**
* Topic-end reflect pass — see `CaptureRunner.runReflect` for contract.
* Reads every trace already written for this episode, batch-scores
Expand Down Expand Up @@ -482,13 +608,14 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
scored: ScoredStep[],
summaries: string[],
warnings: CaptureResult["warnings"],
opts: { summaryOnly?: boolean } = {},
): Promise<{ vecs: VecPair[]; embedMs: number }> {
const start = now();
if (!deps.cfg.embedTraces || !deps.embedder) {
return { vecs: scored.map(() => ({ summary: null, action: null })), embedMs: now() - start };
}
try {
const vecs = await embedSteps(deps.embedder, scored, summaries);
const vecs = await embedSteps(deps.embedder, scored, summaries, opts);
return { vecs, embedMs: now() - start };
} catch (err) {
warnings.push({
Expand All @@ -505,6 +632,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
summaries: string[],
vecs: VecPair[],
episode: CaptureInput["episode"],
opts: { lightweightMemory?: boolean } = {},
): TraceRow[] {
const owner = ownerFromEpisode(episode);
const traces: TraceCandidate[] = scored.map((s, i) => ({
Expand Down Expand Up @@ -535,7 +663,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
// so retrieval can find the row immediately; reward backprop
// overwrites it once the topic is reflected on.
priority: 0.5,
tags: t.tags,
tags: opts.lightweightMemory ? mergeTags(t.tags, ["lightweight_memory"]) : t.tags,
errorSignatures: extractErrorSignatures({
toolCalls: t.toolCalls,
agentText: t.agentText,
Expand Down Expand Up @@ -597,6 +725,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
rows: TraceRow[],
input: CaptureInput,
warnings: CaptureResult["warnings"],
opts: { skipActionVectorRetry?: boolean } = {},
): Promise<boolean> {
const existingBeforeInsert = deps.tracesRepo.list({ episodeId: input.episode.id });
const seenSignatures = new Set(existingBeforeInsert.map(traceIdentitySignature));
Expand All @@ -620,7 +749,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {

try {
for (const row of rows) deps.tracesRepo.insert(row);
enqueueMissingTraceVectors(rows, warnings);
enqueueMissingTraceVectors(rows, warnings, opts);
} catch (err) {
const failure = errDetail(err);
log.error("persist.failed", {
Expand Down Expand Up @@ -760,6 +889,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
function enqueueMissingTraceVectors(
rows: TraceRow[],
warnings: CaptureResult["warnings"],
opts: { skipActionVectorRetry?: boolean } = {},
): void {
if (!deps.cfg.embedTraces || !deps.embeddingRetryQueue || !deps.embedder) return;
const queuedAt = now();
Expand All @@ -776,7 +906,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
});
queued++;
}
if (!row.vecAction) {
if (!opts.skipActionVectorRetry && !row.vecAction) {
deps.embeddingRetryQueue.enqueue({
id: `er_${ids.span()}`,
targetKind: "trace",
Expand All @@ -797,6 +927,10 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
}
}

function mergeTags(existing: readonly string[], extra: readonly string[]): string[] {
return Array.from(new Set([...existing, ...extra])).sort();
}

function finalResult(
input: CaptureInput,
startedAt: number,
Expand Down Expand Up @@ -836,7 +970,7 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
});
}

return { runLite, runReflect };
return { runLite, runLightweight, runReflect };
}

// ─── helpers ────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -1033,6 +1167,51 @@ function safeStringify(v: unknown): string {
}
}

function mergeTurnSteps(
episodeId: string,
turnId: number,
steps: readonly StepCandidate[],
): StepCandidate {
const ordered = [...steps].sort((a, b) => a.ts - b.ts);
const first = ordered[0]!;
const userText = firstNonEmpty(ordered.map((s) => s.userText));
const agentText = ordered
.map((s) => s.agentText.trim())
.filter(Boolean)
.join("\n\n");
const agentThinking = ordered
.map((s) => s.agentThinking?.trim() ?? "")
.filter(Boolean)
.join("\n\n") || null;
const rawReflection = firstNonEmpty(ordered.map((s) => s.rawReflection ?? ""));
const toolCalls = ordered.flatMap((s) => s.toolCalls);
const lastTs = ordered.reduce((m, s) => Math.max(m, s.ts), first.ts);

return {
key: `${episodeId}:${turnId}:lightweight`,
ts: lastTs as EpochMs,
userText,
agentText,
agentThinking,
toolCalls,
rawReflection: rawReflection || null,
depth: Math.min(...ordered.map((s) => s.depth)),
isSubagent: ordered.some((s) => s.isSubagent),
meta: {
...ordered.reduce<Record<string, unknown>>(
(acc, s) => ({ ...acc, ...s.meta }),
{},
),
turnId,
lightweightMemory: true,
},
};
}

function firstNonEmpty(values: readonly string[]): string {
return values.map((v) => v.trim()).find(Boolean) ?? "";
}

/**
* Pull the `turnId` stamped by `step-extractor` out of the
* `StepCandidate.meta` blob. Falls back to the trace's own `ts` so
Expand Down
12 changes: 12 additions & 0 deletions apps/memos-local-plugin/core/capture/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export async function embedSteps(
* the viewer displays.
*/
summaryOverrides?: readonly string[],
opts: { summaryOnly?: boolean } = {},
): Promise<VecPair[]> {
const log = rootLogger.child({ channel: "core.capture.embed" });
if (steps.length === 0) return [];
Expand All @@ -43,6 +44,17 @@ export async function embedSteps(
return summaryText(s);
});
const actionTexts = steps.map(actionText);
if (opts.summaryOnly) {
try {
const vecs = await embedder.embedMany(
summaryTexts.map((t) => ({ text: t || "(empty)", role: "document" as const })),
);
return steps.map((_, i) => ({ summary: vecs[i] ?? null, action: null }));
} catch (err) {
log.warn("embed.failed_all", { err: errDetail(err), stepCount: steps.length });
return steps.map(() => ({ summary: null, action: null }));
}
}
// Pack summary first then action — both in the same batch to amortize
// HTTP round trips when the provider is remote.
const inputs = [
Expand Down
4 changes: 4 additions & 0 deletions apps/memos-local-plugin/core/capture/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export function attachCaptureSubscriber(
log.debug("subscriber.skip_abandoned", { episodeId: evt.episode.id });
return;
}
if (evt.episode.meta?.lightweightMemory === true) {
log.debug("subscriber.skip_lightweight", { episodeId: evt.episode.id });
return;
}
// Topic ended → batch reflect across every step + emit
// `capture.done` so the reward subscriber kicks off R_human + V
// backprop. Per-turn lite captures already wrote the trace rows;
Expand Down
3 changes: 3 additions & 0 deletions apps/memos-local-plugin/core/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export const DEFAULT_CONFIG: ResolvedConfig = {
timeoutMs: 60_000,
},
algorithm: {
lightweightMemory: {
enabled: false,
},
capture: {
maxTextChars: 4_000,
maxToolOutputChars: 2_000,
Expand Down
8 changes: 8 additions & 0 deletions apps/memos-local-plugin/core/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ const SkillEvolverSchema = Type.Object({
}, { default: {} });

const AlgorithmSchema = Type.Object({
lightweightMemory: Type.Object({
/**
* Low-cost mode for users who only want raw conversation memory +
* recall. When enabled, the runtime skips task/reward/L2/L3/skill
* evolution and keeps only summarize + embedding + retrieval filter.
*/
enabled: Bool(false),
}, { default: {} }),
capture: Type.Object({
/** Cap on agent/user text length (chars). Longer content is summarized. */
maxTextChars: NumberInRange(4_000, 200, 64_000),
Expand Down
20 changes: 16 additions & 4 deletions apps/memos-local-plugin/core/pipeline/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ export function extractAlgorithmConfig(
): PipelineAlgorithmConfig {
const alg = deps.config.algorithm;
return {
lightweightMemory: {
enabled: alg.lightweightMemory.enabled,
},
capture: alg.capture,
reward: alg.reward,
l2Induction: {
Expand Down Expand Up @@ -153,10 +156,11 @@ export function extractAlgorithmConfig(
skillInjectionMode: alg.retrieval.skillInjectionMode,
skillSummaryChars: alg.retrieval.skillSummaryChars,
decayHalfLifeDays: alg.reward.decayHalfLifeDays,
llmFilterEnabled: alg.retrieval.llmFilterEnabled,
llmFilterEnabled: alg.lightweightMemory.enabled ? true : alg.retrieval.llmFilterEnabled,
llmFilterMaxKeep: alg.retrieval.llmFilterMaxKeep,
llmFilterMinCandidates: alg.retrieval.llmFilterMinCandidates,
llmFilterMinCandidates: alg.lightweightMemory.enabled ? 1 : alg.retrieval.llmFilterMinCandidates,
llmFilterCandidateBodyChars: alg.retrieval.llmFilterCandidateBodyChars,
lightweightMemory: alg.lightweightMemory.enabled,
},
session: {
followUpMode: alg.session.followUpMode,
Expand Down Expand Up @@ -321,8 +325,15 @@ export function buildPipelineSession(
deps: PipelineDeps,
bus: SessionEventBus,
): PipelineSessionSet {
const intent = createIntentClassifier({ llm: deps.llm ?? undefined });
const relation = createRelationClassifier({ llm: deps.llm ?? undefined });
const llmDisabled = deps.config.algorithm.lightweightMemory.enabled;
const intent = createIntentClassifier({
llm: deps.llm ?? undefined,
disableLlm: llmDisabled,
});
const relation = createRelationClassifier({
llm: deps.llm ?? undefined,
disableLlm: llmDisabled,
});
const episodeManager = createEpisodeManager({
sessionsRepo: adaptSessionsRepo(deps.repos.sessions),
episodesRepo: adaptEpisodesRepo(deps.repos.episodes),
Expand All @@ -336,6 +347,7 @@ export function buildPipelineSession(
bus,
episodeManager,
now: deps.now,
lightweightMemory: llmDisabled,
});
return { intent, relation, sessionManager, episodeManager };
}
Expand Down
Loading