Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion src/oclif/commands/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Bad:
client,
command: 'query',
format,
onCompleted: ({result, taskId: tid}) => {
onCompleted: ({durationMs, matchedDocs, result, taskId: tid, tier, topScore}) => {
const previousResult = finalResult

// Always prefer the completed payload β€” it carries the attribution footer
Expand All @@ -180,11 +180,17 @@ Bad:
if (format === 'json') {
writeJsonResponse({
command: 'query',
// Recall metadata is only present on query tasks; older daemons omit it. Spread
// conditionally so JSON consumers do not see undefined keys.
data: {
...(durationMs === undefined ? {} : {durationMs}),
Comment thread
RyanNg1403 marked this conversation as resolved.
event: 'completed',
...(matchedDocs === undefined ? {} : {matchedDocs}),
result: finalResult,
status: 'completed',
taskId: tid,
...(tier === undefined ? {} : {tier}),
...(topScore === undefined ? {} : {topScore}),
Comment thread
RyanNg1403 marked this conversation as resolved.
},
success: true,
})
Expand Down
45 changes: 42 additions & 3 deletions src/oclif/lib/task-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,34 @@ import type {
TaskError,
} from '@campfirein/brv-transport-client'

import type {
QueryLogMatchedDoc,
QueryLogTier,
} from '../../server/core/domain/entities/query-log-entry.js'
Comment thread
RyanNg1403 marked this conversation as resolved.
Comment thread
RyanNg1403 marked this conversation as resolved.

import {TaskErrorCode} from '../../server/core/domain/errors/task-error.js'
import {LlmEvents, TaskEvents} from '../../shared/transport/events/index.js'
import {ReviewEvents, type ReviewNotifyEvent} from '../../shared/transport/events/review-events.js'
import {writeJsonResponse} from './json-response.js'

/** Extends brv-transport-client's TaskCompleted with logId from ENG-1259 and pendingReviewCount from HITL */
type TaskCompletedWithLogId = TaskCompleted & {logId?: string; pendingReviewCount?: number}
/**
* Extends brv-transport-client's TaskCompleted with daemon-merged extras contributed by
* lifecycle hooks (`getTaskCompletionData`):
* - `logId` β€” query/curate log id
* - `pendingReviewCount` β€” HITL signal from CurateLogHandler
* - `matchedDocs`/`tier`/`durationMs`/`topScore` β€” query recall metadata from QueryLogHandler
* (flattened from QueryExecutorResult's nested timing/searchMetadata)
*
* All fields are optional β€” older daemons or non-query tasks omit them.
*/
type TaskCompletedWithLogId = TaskCompleted & {
durationMs?: number
logId?: string
matchedDocs?: QueryLogMatchedDoc[]
pendingReviewCount?: number
tier?: QueryLogTier
topScore?: number
}

/** Extends brv-transport-client's TaskError with logId from ENG-1259 */
type TaskErrorWithLogId = TaskError & {logId?: string}
Expand All @@ -40,12 +61,20 @@ export interface ToolCallRecord {

/** Completion result passed to onCompleted callback */
export interface TaskCompletionResult {
/** Wall-clock execution time for query tasks, in milliseconds. Absent for non-query tasks. */
durationMs?: number
logId?: string
/** Documents matched by the query. Empty array on cache hits; absent for non-query tasks. */
matchedDocs?: QueryLogMatchedDoc[]
/** Pending review notification from the server, present when review is required after task completion. */
pendingReview?: {pendingCount: number; reviewUrl: string}
result?: string
taskId: string
/** Resolution tier for query tasks. Absent for non-query tasks. */
tier?: QueryLogTier
toolCalls: ToolCallRecord[]
/** Top compound score across matchedDocs. Absent for cache hits and non-query tasks. */
topScore?: number
}

/** Error result passed to onError callback */
Expand Down Expand Up @@ -286,7 +315,17 @@ export function waitForTaskCompletion(options: WaitForTaskOptions, log: (msg: st
payload.pendingReviewCount !== undefined && payload.pendingReviewCount > 0
? {pendingCount: payload.pendingReviewCount, reviewUrl: pendingReview?.reviewUrl ?? ''}
: pendingReview
onCompleted({logId: payload.logId, pendingReview: resolvedPendingReview, result: payload.result, taskId, toolCalls})
onCompleted({
durationMs: payload.durationMs,
logId: payload.logId,
matchedDocs: payload.matchedDocs,
pendingReview: resolvedPendingReview,
result: payload.result,
taskId,
tier: payload.tier,
toolCalls,
topScore: payload.topScore,
})
resolve()
}),

Expand Down
26 changes: 26 additions & 0 deletions src/server/infra/process/query-log-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,32 @@ export class QueryLogHandler implements ITaskLifecycleHook {
}
}

/**
* Expose query metadata via the lifecycle-hook contract so TaskRouter can merge it into
* the task:completed payload sent to the originating client. Returning {} when no metadata
* is available keeps the merge a no-op and lets the daemon emit task:completed unchanged.
*/
getTaskCompletionData(taskId: string): Record<string, unknown> {
const state = this.tasks.get(taskId)
if (!state?.queryResult) return {}

// Flatten the QueryExecutorResult's nested shape onto the task:completed payload so
// it matches the public RecallResult contract (flat `durationMs` / `topScore`).
// `timing` is always populated by every QueryExecutor branch, so no guard.
// `searchMetadata` is omitted on cache hits (Tier 0/1), so guard before extracting.
const out: Record<string, unknown> = {
durationMs: state.queryResult.timing.durationMs,
matchedDocs: state.queryResult.matchedDocs,
tier: state.queryResult.tier,
}

if (state.queryResult.searchMetadata !== undefined) {
out.topScore = state.queryResult.searchMetadata.topScore
}

return out
}

async onTaskCancelled(taskId: string, _task: TaskInfo): Promise<void> {
const state = this.tasks.get(taskId)
if (!state) return
Expand Down
78 changes: 78 additions & 0 deletions test/commands/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,84 @@ describe('Query Command', () => {
expect(completedEvent).to.exist
expect(completedEvent!.data).to.have.property('result', 'JSON answer')
})

it('should surface matchedDocs, tier, durationMs, and topScore in completed event when present', async () => {
const eventHandlers: Map<string, Array<(data: unknown) => void>> = new Map()
;(mockClient.on as sinon.SinonStub).callsFake((event: string, handler: (data: unknown) => void) => {
if (!eventHandlers.has(event)) eventHandlers.set(event, [])
eventHandlers.get(event)!.push(handler)
return () => {}
})
;(mockClient.requestWithAck as sinon.SinonStub).callsFake(async (event: string, payload: {taskId: string}) => {
if (event === 'state:getProviderConfig') return {activeProvider: 'anthropic'}
setTimeout(() => {
const completedHandlers = eventHandlers.get('task:completed')
if (completedHandlers) {
for (const handler of completedHandlers) {
handler({
durationMs: 184,
matchedDocs: [
{path: 'auth/jwt-tokens.md', score: 0.92, title: 'JWT tokens'},
{path: 'billing/stripe-webhooks.md', score: 0.78, title: 'Stripe webhooks'},
],
result: 'cached answer',
taskId: payload.taskId,
tier: 2,
topScore: 0.92,
})
}
}
}, 10)
return {taskId: payload.taskId}
})

await createJsonCommand('test query').run()

const lines = parseJsonOutput()
const completedEvent = lines.find((l) => (l.data as Record<string, unknown>).event === 'completed')
expect(completedEvent, 'completed event should exist').to.exist
const data = completedEvent!.data as Record<string, unknown>
expect(data).to.have.property('result', 'cached answer')
expect(data).to.have.property('tier', 2)
expect(data).to.have.property('durationMs', 184)
expect(data).to.have.property('topScore', 0.92)
expect(data).to.have.deep.property('matchedDocs', [
{path: 'auth/jwt-tokens.md', score: 0.92, title: 'JWT tokens'},
{path: 'billing/stripe-webhooks.md', score: 0.78, title: 'Stripe webhooks'},
])
})

it('should omit matchedDocs/tier/durationMs/topScore from completed event when absent (graceful)', async () => {
const eventHandlers: Map<string, Array<(data: unknown) => void>> = new Map()
;(mockClient.on as sinon.SinonStub).callsFake((event: string, handler: (data: unknown) => void) => {
if (!eventHandlers.has(event)) eventHandlers.set(event, [])
eventHandlers.get(event)!.push(handler)
return () => {}
})
;(mockClient.requestWithAck as sinon.SinonStub).callsFake(async (event: string, payload: {taskId: string}) => {
if (event === 'state:getProviderConfig') return {activeProvider: 'anthropic'}
setTimeout(() => {
const completedHandlers = eventHandlers.get('task:completed')
if (completedHandlers) {
// Older daemon: only emits result + taskId, no enriched fields
for (const handler of completedHandlers) handler({result: 'plain answer', taskId: payload.taskId})
}
}, 10)
return {taskId: payload.taskId}
})

await createJsonCommand('test query').run()

const lines = parseJsonOutput()
const completedEvent = lines.find((l) => (l.data as Record<string, unknown>).event === 'completed')
expect(completedEvent).to.exist
const data = completedEvent!.data as Record<string, unknown>
expect(data).to.have.property('result', 'plain answer')
expect(data).to.not.have.property('matchedDocs')
expect(data).to.not.have.property('tier')
expect(data).to.not.have.property('durationMs')
expect(data).to.not.have.property('topScore')
})
})

// ==================== Connection Errors ====================
Expand Down
52 changes: 52 additions & 0 deletions test/unit/infra/process/query-log-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,58 @@ describe('QueryLogHandler', () => {
})
})

// ── getTaskCompletionData ───────────────────────────────────────────────

describe('getTaskCompletionData', () => {
it('should return query metadata flattened to RecallResult shape after setQueryResult was called', async () => {
const task = makeTask()
await handler.onTaskCreate(task)
handler.setQueryResult('task-abc', makeQueryResult())

const data = handler.getTaskCompletionData('task-abc')

expect(data).to.deep.equal({
durationMs: 450,
matchedDocs: [{path: 'design/caching.md', score: 0.95, title: 'Caching Strategy'}],
tier: TIER_DIRECT_SEARCH,
topScore: 0.95,
})
})

it('should return empty object when task does not exist', () => {
const data = handler.getTaskCompletionData('unknown-task')

expect(data).to.deep.equal({})
})

it('should return empty object when setQueryResult was never called', async () => {
const task = makeTask()
await handler.onTaskCreate(task)

const data = handler.getTaskCompletionData('task-abc')

expect(data).to.deep.equal({})
})

it('should omit topScore when searchMetadata is absent (cache hit shape)', async () => {
const task = makeTask()
await handler.onTaskCreate(task)
// Cache hits in QueryExecutor return empty matchedDocs and no searchMetadata.
handler.setQueryResult('task-abc', {
matchedDocs: [],
tier: TIER_DIRECT_SEARCH,
timing: {durationMs: 5},
})

const data = handler.getTaskCompletionData('task-abc')

expect(data.matchedDocs).to.deep.equal([])
expect(data.tier).to.equal(TIER_DIRECT_SEARCH)
expect(data.durationMs).to.equal(5)
expect(data.topScore).to.be.undefined
})
})

// ── store sharing ────────────────────────────────────────────────────────

describe('store sharing', () => {
Expand Down
Loading