Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/services/composite-dedup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import type { SearchResult } from '../db/repository-types.js';
import { isCurrentStateQuery } from './current-state-ranking.js';
import type { RetrievalMode } from './memory-service-types.js';
import type { TraceCollector } from './retrieval-trace.js';

const DEFAULT_COVERAGE_THRESHOLD = 0.6;
const BROAD_QUERY_MARKERS = [
Expand Down Expand Up @@ -153,3 +155,26 @@ function parseMemberIds(memory: SearchResult): string[] {
if (!Array.isArray(candidate)) return [];
return candidate.filter((id): id is string => typeof id === 'string');
}

/**
* Apply flat-mode composite dedup and record the dedup as a trace stage
* if the result set changed. No-op for non-flat modes or empty sets.
*/
export function applyFlatPackagingPolicy(
memories: SearchResult[],
query: string,
mode: RetrievalMode,
trace: TraceCollector,
): SearchResult[] {
if (mode !== 'flat' || memories.length === 0) return memories;
const packaged = deduplicateCompositeMembersForFlatQuery(memories, query);
if (packaged.length === memories.length && packaged[0]?.id === memories[0]?.id) {
return memories;
}
trace.stage('flat-packaging-dedup', packaged, {
removedIds: memories
.map((memory) => memory.id)
.filter((id) => !packaged.some((memory) => memory.id === id)),
});
return packaged;
}
22 changes: 22 additions & 0 deletions src/services/lesson-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { config } from '../config.js';
import type { LessonRow, LessonMatch, LessonSeverity, LessonType } from '../db/repository-lessons.js';
import type { LessonStore } from '../db/stores.js';
import type { SanitizationResult } from './input-sanitizer.js';
import type { ConsensusResult } from './consensus-validation.js';
import type { SearchResult } from '../db/repository-types.js';

/** Result of a pre-retrieval lesson check. */
export interface LessonCheckResult {
Expand Down Expand Up @@ -229,3 +231,23 @@ function emitLessonAuditEvent(userId: string, lessonType: LessonType, lessonId:
if (!config.auditLoggingEnabled) return;
emitAuditEvent('lesson:created', userId, { lessonType, severity }, { lessonId });
}

/**
* Record lessons for memories removed by consensus validation.
* Creates a `consensus_violation` lesson for each divergent memory.
*/
export async function recordConsensusLessons(
lessons: LessonStore,
userId: string,
result: ConsensusResult,
memories: SearchResult[],
): Promise<void> {
for (const judgment of result.judgments) {
if (judgment.aligned) continue;
const memory = memories.find((m) => m.id === judgment.memoryId);
const pattern = `Consensus violation: "${memory?.content.slice(0, 150) ?? judgment.memoryId}" — ${judgment.divergenceReason}`;
await recordUserReportedLesson(
lessons, userId, pattern, [judgment.memoryId], 'medium',
);
}
}
167 changes: 21 additions & 146 deletions src/services/memory-search.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
/**
* Search and retrieval logic extracted from MemoryService.
* Contains search, fastSearch, workspaceSearch, and all private search helpers.
* Search and retrieval orchestration for MemoryService.
* Pure orchestration: delegates formatting to retrieval-format, dedup to
* composite-dedup, side effects to retrieval-side-effects, lesson recording
* to lesson-service, and the main retrieval to search-pipeline.
*/

import { type SearchResult } from '../db/memory-repository.js';
import { checkLessons, recordContradictionLesson, type LessonCheckResult } from './lesson-service.js';
import { checkLessons, recordConsensusLessons, type LessonCheckResult } from './lesson-service.js';
import { validateConsensus, type ConsensusResult } from './consensus-validation.js';
import { embedText } from './embedding.js';
import { resolveSearchLimitDetailed, classifyQueryDetailed } from './retrieval-policy.js';
import { runSearchPipelineWithTrace } from './search-pipeline.js';
import { buildCitations as buildRichCitations, computePackagingSignal, formatSimpleInjection, formatTieredInjection, type PackagingSignal } from './retrieval-format.js';
import { buildAssemblyTraceSummary, buildPackagingTraceSummary } from './packaging-observability.js';
import { buildCitations as buildRichCitations, buildInjection, computePackagingSignal } from './retrieval-format.js';
import { finalizePackagingTrace } from './packaging-observability.js';
import { isCurrentStateQuery } from './current-state-ranking.js';
import { prefersAbstractAwareRetrieval } from './abstract-query-policy.js';
import { TraceCollector, type AssemblyTraceSummary, type PackagingTraceSummary, type RetrievalTraceSummary } from './retrieval-trace.js';
import { assignTiers as assignTierBudgets, type TierAssignment, type TierBudgetResult } from './tiered-loading.js';
import { TraceCollector } from './retrieval-trace.js';
import { excludeStaleComposites } from './composite-staleness.js';
import { deduplicateCompositeMembersForFlatQuery, deduplicateCompositeMembersHard } from './composite-dedup.js';
import { emitAuditEvent } from './audit-events.js';
import { applyFlatPackagingPolicy } from './composite-dedup.js';
import { recordSearchSideEffects } from './retrieval-side-effects.js';
import type { AgentScope, WorkspaceContext } from '../db/repository-types.js';
import type { MemoryServiceDeps, RetrievalMode, RetrievalOptions, RetrievalResult } from './memory-service-types.js';
import type { MemoryServiceDeps, RetrievalOptions, RetrievalResult } from './memory-service-types.js';

/** Check lessons safety gate; returns undefined if lessons disabled. */
async function checkSearchLessons(deps: MemoryServiceDeps, userId: string, query: string): Promise<LessonCheckResult | undefined> {
Expand Down Expand Up @@ -116,75 +116,14 @@ async function postProcessResults(
removedIds: consensusResult.removedMemoryIds,
});
if (deps.config.lessonsEnabled && deps.stores.lesson) {
recordConsensusLessons(deps, userId, consensusResult, memories).catch(
recordConsensusLessons(deps.stores.lesson, userId, consensusResult, memories).catch(
(err) => console.error('Consensus lesson recording failed:', err),
);
}
}
return { memories, consensusResult };
}

/**
* Build injection text from search results, optionally using tiered packaging.
* Flat mode returns the existing chronological format.
* Tiered mode assigns L0/L1/L2 tiers under a token budget.
*/
function buildInjection(
memories: SearchResult[],
query: string,
mode: RetrievalMode,
tokenBudget?: number,
): {
injectionText: string;
tierAssignments?: TierAssignment[];
expandIds?: string[];
estimatedContextTokens?: number;
} {
if (memories.length === 0) {
return { injectionText: '' };
}

if (mode === 'flat') {
return { injectionText: formatSimpleInjection(memories) };
}

const deduplicated = deduplicateCompositeMembersHard(memories);
const DEFAULT_TOKEN_BUDGET = 2000;
const budget = tokenBudget ?? DEFAULT_TOKEN_BUDGET;
const forceRichTopHit = prefersAbstractAwareRetrieval(mode, query);

const result = assignTierBudgets(deduplicated, budget, { forceRichTopHit });
const expandIds = result.assignments
.filter((a) => a.tier !== 'L2')
.map((a) => a.memoryId);

return {
injectionText: formatTieredInjection(deduplicated, result.assignments),
tierAssignments: result.assignments,
expandIds: expandIds.length > 0 ? expandIds : undefined,
estimatedContextTokens: result.totalTokens,
};
}

function applyFlatPackagingPolicy(
memories: SearchResult[],
query: string,
mode: RetrievalMode,
trace: TraceCollector,
): SearchResult[] {
if (mode !== 'flat' || memories.length === 0) return memories;
const packaged = deduplicateCompositeMembersForFlatQuery(memories, query);
if (packaged.length === memories.length && packaged[0]?.id === memories[0]?.id) {
return memories;
}
trace.stage('flat-packaging-dedup', packaged, {
removedIds: memories
.map((memory) => memory.id)
.filter((id) => !packaged.some((memory) => memory.id === id)),
});
return packaged;
}

/** Package memories, build injection text, and assemble the final response. */
function assembleResponse(
deps: MemoryServiceDeps,
Expand All @@ -205,20 +144,10 @@ function assembleResponse(

const { injectionText, tierAssignments, expandIds, estimatedContextTokens } =
buildInjection(outputMemories, query, mode, retrievalOptions?.tokenBudget);
const packagedMemories = mode === 'flat' ? outputMemories : deduplicateCompositeMembersHard(outputMemories);
const packagingSummary = buildPackagingTraceSummary(outputMemories, packagedMemories, mode, injectionText, estimatedContextTokens);
const assemblySummary = buildAssemblyTraceSummary(packagingSummary, mode === 'flat' ? undefined : retrievalOptions?.tokenBudget ?? 2000);

if (mode === 'tiered') {
activeTrace.event('tiered-packaging', {
budget: retrievalOptions?.tokenBudget ?? 2000,
estimatedTokens: estimatedContextTokens,
tierDistribution: tierAssignments?.reduce((acc: any, a) => { acc[a.tier] = (acc[a.tier] || 0) + 1; return acc; }, {}),
});
}

activeTrace.setPackagingSummary(packagingSummary);
activeTrace.setAssemblySummary(assemblySummary);
const { packagingSummary, assemblySummary } = finalizePackagingTrace(activeTrace, {
outputMemories, mode, injectionText, estimatedContextTokens, tierAssignments,
tokenBudget: retrievalOptions?.tokenBudget,
});
activeTrace.finalize(outputMemories);

return {
Expand All @@ -232,49 +161,6 @@ function assembleResponse(
};
}

/** Fire-and-forget side effects: touch memories and audit logging. */
function recordSearchSideEffects(
deps: MemoryServiceDeps,
outputMemories: SearchResult[],
userId: string,
query: string,
sourceSite: string | undefined,
asOf: string | undefined,
): void {
if (!asOf) {
for (const memory of outputMemories) deps.stores.memory.touchMemory(memory.id).catch(() => {});
}
if (deps.config.auditLoggingEnabled) {
emitAuditEvent('memory:retrieve', userId, {
query: query.slice(0, 200),
resultCount: outputMemories.length,
topScore: outputMemories[0]?.score ?? 0,
}, { sourceSite });
}
}

/**
* Record lessons for memories removed by consensus validation.
* Creates a `consensus_violation` lesson for each divergent memory.
*/
async function recordConsensusLessons(
deps: MemoryServiceDeps,
userId: string,
result: ConsensusResult,
memories: SearchResult[],
): Promise<void> {
if (!deps.stores.lesson) return;
const { recordUserReportedLesson } = await import('./lesson-service.js');
for (const judgment of result.judgments) {
if (judgment.aligned) continue;
const memory = memories.find((m) => m.id === judgment.memoryId);
const pattern = `Consensus violation: "${memory?.content.slice(0, 150) ?? judgment.memoryId}" — ${judgment.divergenceReason}`;
await recordUserReportedLesson(
deps.stores.lesson, userId, pattern, [judgment.memoryId], 'medium',
);
}
}

/** Full search with lesson check, URI resolution, pipeline, post-processing, and packaging. */
export async function performSearch(
deps: MemoryServiceDeps,
Expand Down Expand Up @@ -345,29 +231,18 @@ export async function performWorkspaceSearch(
const queryEmbedding = await embedText(query, 'query');

const memories = await deps.stores.search.searchSimilarInWorkspace(
workspace.workspaceId,
queryEmbedding,
effectiveLimit,
options.agentScope ?? 'all',
workspace.agentId,
options.referenceTime,
workspace.workspaceId, queryEmbedding, effectiveLimit,
options.agentScope ?? 'all', workspace.agentId, options.referenceTime,
);
const compositeResult = await excludeStaleComposites(deps.stores.memory, userId, memories);
const filteredMemories = compositeResult.filtered;

const { filtered: filteredMemories } = await excludeStaleComposites(deps.stores.memory, userId, memories);
for (const m of filteredMemories) deps.stores.memory.touchMemory(m.id).catch(() => {});

const mode = options.retrievalOptions?.retrievalMode ?? 'flat';
const { injectionText, tierAssignments, expandIds, estimatedContextTokens } =
buildInjection(filteredMemories, query, mode, options.retrievalOptions?.tokenBudget);

const injection = buildInjection(filteredMemories, query, mode, options.retrievalOptions?.tokenBudget);
return {
memories: filteredMemories,
injectionText,
citations: filteredMemories.map((m) => m.id),
retrievalMode: mode,
tierAssignments,
expandIds,
estimatedContextTokens,
...injection,
};
}
54 changes: 52 additions & 2 deletions src/services/packaging-observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import type {
PackagingTraceSummary,
PackagingType,
} from './retrieval-trace.js';
import type { TraceCollector } from './retrieval-trace.js';
import type { TierAssignment } from './tiered-loading.js';
import { isAnswerBearing, sortBySessionPriority } from './session-packaging.js';
import { estimateTokens } from './tiered-loading.js';
import { buildTimelinePack, spansMultipleDates } from './timeline-pack.js';
import { deduplicateCompositeMembersHard } from './composite-dedup.js';

interface NamespaceGroup {
namespace: string;
memories: SearchResult[];
}

export function buildPackagingTraceSummary(
function buildPackagingTraceSummary(
candidateMemories: SearchResult[],
includedMemories: SearchResult[],
mode: RetrievalMode,
Expand All @@ -48,7 +51,7 @@ export function buildPackagingTraceSummary(
};
}

export function buildAssemblyTraceSummary(
function buildAssemblyTraceSummary(
packaging: PackagingTraceSummary,
tokenBudget?: number,
): AssemblyTraceSummary {
Expand Down Expand Up @@ -155,3 +158,50 @@ function resolveAssemblyBlocks(packageType: PackagingType): string[] {
if (packageType === 'timeline-pack') return ['timeline'];
return ['subject'];
}

const DEFAULT_TOKEN_BUDGET = 2000;

export interface FinalizePackagingInput {
outputMemories: SearchResult[];
mode: RetrievalMode;
injectionText: string;
estimatedContextTokens?: number;
tierAssignments?: TierAssignment[];
tokenBudget?: number;
}

/**
* Build packaging + assembly summaries, emit the tiered-packaging trace
* event when in tiered mode, and attach both summaries to the active
* trace. Returns the summaries so the caller can include them in the
* retrieval result. Does NOT finalize the trace — caller owns that.
*/
export function finalizePackagingTrace(
activeTrace: TraceCollector,
input: FinalizePackagingInput,
): { packagingSummary: PackagingTraceSummary; assemblySummary: AssemblyTraceSummary } {
const packagedForSummary = input.mode === 'flat'
? input.outputMemories
: deduplicateCompositeMembersHard(input.outputMemories);
const packagingSummary = buildPackagingTraceSummary(
input.outputMemories, packagedForSummary, input.mode, input.injectionText, input.estimatedContextTokens,
);
const assemblySummary = buildAssemblyTraceSummary(
packagingSummary, input.mode === 'flat' ? undefined : input.tokenBudget ?? DEFAULT_TOKEN_BUDGET,
);

if (input.mode === 'tiered') {
activeTrace.event('tiered-packaging', {
budget: input.tokenBudget ?? DEFAULT_TOKEN_BUDGET,
estimatedTokens: input.estimatedContextTokens,
tierDistribution: input.tierAssignments?.reduce<Record<string, number>>((acc, a) => {
acc[a.tier] = (acc[a.tier] || 0) + 1;
return acc;
}, {}),
});
}

activeTrace.setPackagingSummary(packagingSummary);
activeTrace.setAssemblySummary(assemblySummary);
return { packagingSummary, assemblySummary };
}
Loading