Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/agent-core/src/harness/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ export {
synthesizeHarnessTurn,
ensureHarnessSessionInit,
appendHarnessTurn,
popLastTurnFromHarness,
readHarnessHistory,
readLastUserFromHarness,
writeHarnessSessionTitle,
type HarnessSessionInitOpts,
type AppendHarnessTurnOpts,
Expand Down
106 changes: 106 additions & 0 deletions packages/agent-core/src/harness/mirror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,112 @@ export function readHarnessHistory(sessionId: string, projectId?: string): Sessi
return entries
}

// ── Regenerate helpers ─────────────────────────────────────────────

/**
* Read the last user message text from `messages.jsonl` without mutating
* the file. Used by the Codex regenerate path: re-feeding the same user
* text as a fresh turn produces a regenerated assistant response while
* leaving the CLI's own thread state intact (we can't truncate Codex's
* internal history without a `thread/resume` RPC, which doesn't exist).
*/
export function readLastUserFromHarness(
sessionId: string,
projectId?: string,
): { userText: string } | null {
const dir = resolveSessionDir(sessionId, projectId)
const msgsPath = join(dir, 'messages.jsonl')
if (!existsSync(msgsPath)) return null

let raw: string
try {
raw = readFileSync(msgsPath, 'utf-8')
} catch (err) {
log.warn({ err, sessionId }, 'failed to read messages.jsonl for regenerate')
return null
}

const lines = raw.split('\n').filter((l) => l.trim().length > 0)
for (let i = lines.length - 1; i >= 0; i--) {
let parsed: { role?: unknown; content?: unknown }
try {
parsed = JSON.parse(lines[i])
} catch {
continue
}
if (parsed.role !== 'user') continue
let userText = ''
if (typeof parsed.content === 'string') {
userText = parsed.content
} else if (Array.isArray(parsed.content)) {
const blocks = parsed.content as Array<{ type?: string; text?: string }>
userText = blocks
.filter((b) => b?.type === 'text' && typeof b.text === 'string')
.map((b) => b.text as string)
.join('')
}
return { userText }
}
return null
}

/**
* Pop the last user→assistant exchange from `messages.jsonl`, returning
* the user's text so the caller can re-feed it to a fresh CLI process.
*
* Walks backwards looking for the last `role:'user'` line, truncates the
* file to lines preceding it, and extracts the plain-text content. Tool
* messages between the user and assistant are dropped along with the
* assistant turn.
*
* Returns null if there's no prior user message β€” nothing to regenerate.
*/
export function popLastTurnFromHarness(
sessionId: string,
projectId?: string,
): { userText: string } | null {
const dir = resolveSessionDir(sessionId, projectId)
const msgsPath = join(dir, 'messages.jsonl')
if (!existsSync(msgsPath)) return null

let raw: string
try {
raw = readFileSync(msgsPath, 'utf-8')
} catch (err) {
log.warn({ err, sessionId }, 'failed to read messages.jsonl for regenerate')
return null
}

const lines = raw.split('\n').filter((l) => l.trim().length > 0)
let lastUserIdx = -1
let userText = ''
for (let i = lines.length - 1; i >= 0; i--) {
let parsed: { role?: unknown; content?: unknown }
try {
parsed = JSON.parse(lines[i])
} catch {
continue
}
if (parsed.role !== 'user') continue
lastUserIdx = i
if (typeof parsed.content === 'string') {
userText = parsed.content
} else if (Array.isArray(parsed.content)) {
const textBlocks = parsed.content as Array<{ type?: string; text?: string }>
userText = textBlocks
.filter((b) => b?.type === 'text' && typeof b.text === 'string')
.map((b) => b.text as string)
.join('')
}
break
}
if (lastUserIdx < 0) return null

const truncated = lines.slice(0, lastUserIdx).join('\n')
writeFileSync(msgsPath, truncated.length > 0 ? `${truncated}\n` : '', 'utf-8')
return { userText }
}

// ── Internal helpers ───────────────────────────────────────────────

function resolveSessionDir(sessionId: string, projectId?: string): string {
Expand Down
4 changes: 3 additions & 1 deletion packages/agent-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export {
type ProviderTokenResolver,
type ResolvedProviderToken,
} from './tools/factories.js'
export { initTracing, flushTraces, hashPromptVersion } from './tracing.js'
export { initTracing, flushTraces, hashPromptVersion, logSpanFeedback } from './tracing.js'
export { closeBrowserSession } from './tools/browser.js'
export {
type HarnessAdapter,
Expand Down Expand Up @@ -99,7 +99,9 @@ export {
synthesizeHarnessTurn,
ensureHarnessSessionInit,
appendHarnessTurn,
popLastTurnFromHarness,
readHarnessHistory,
readLastUserFromHarness,
writeHarnessSessionTitle,
buildReplaySeed,
extractHarnessMemoriesFromMirror,
Expand Down
115 changes: 115 additions & 0 deletions packages/agent-core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ import {
estimateCost,
hashPromptVersion,
logScore,
logSpanFeedback,
startChildTrace,
startTrace,
} from './tracing.js'
Expand Down Expand Up @@ -478,6 +479,13 @@ export class Session {
private parentTraceSpan?: Span // when this is a sub-agent, inherit parent's span
private workflowMetadata?: { workflowId: string; agentKey: string; promptVersion: string }
currentTraceSpan?: Span // current turn's trace span (exposed for sub-agent threading)
// (messageId β†’ Braintrust span id) for completed turns in this session.
// Bounded to the last 200 turns; older entries fall off the front when
// capacity is exceeded so long-lived sessions don't grow unbounded.
// Server assigns the messageId, registers it via `registerAssistantMessage`,
// and clients echo it back via the `feedback` protocol message.
private messageSpanIds: Map<string, string> = new Map()
private static readonly MESSAGE_SPAN_CAP = 200
private _promptVersion?: string // hash of assembled system prompt

// Safety limits
Expand Down Expand Up @@ -2307,6 +2315,113 @@ export class Session {
return structuredClone(this.piAgent.state.messages)
}

/**
* Get the Braintrust span id for the current turn. Returns undefined if
* tracing is disabled or if called outside an active turn. Used by the
* server to register the (messageId β†’ spanId) mapping when emitting the
* `done` event so feedback can be correlated post-hoc.
*/
getCurrentSpanId(): string | undefined {
const span = this.currentTraceSpan as (Span & { id?: string }) | undefined
return typeof span?.id === 'string' ? span.id : undefined
}

/**
* Pair a server-assigned messageId with a Braintrust span id so the
* server's feedback handler can attach thumbs-up/down to the right
* event later. Pruned to MESSAGE_SPAN_CAP entries (oldest first).
*/
registerAssistantMessage(messageId: string, spanId: string): void {
if (this.messageSpanIds.size >= Session.MESSAGE_SPAN_CAP) {
const firstKey = this.messageSpanIds.keys().next().value
if (firstKey !== undefined) this.messageSpanIds.delete(firstKey)
}
this.messageSpanIds.set(messageId, spanId)
}

/**
* Record thumbs-up/down user feedback for a specific assistant message.
* No-op when tracing is disabled or the messageId is unknown (e.g. session
* was resumed from disk and the in-memory map didn't survive the restart).
*/
logUserFeedback(messageId: string, value: 'up' | 'down'): void {
const spanId = this.messageSpanIds.get(messageId)
if (!spanId) return
logSpanFeedback(spanId, {
user_feedback: value === 'up' ? 1 : 0,
})
}

/**
* Prepare the session for regenerating its last assistant response.
*
* Pops the last user→assistant pair from `piAgent.state.messages` and
* returns the user content (text + image attachments) so the caller can
* re-feed it via `processMessage`. Returns null if there's nothing to
* regenerate (no prior user turn).
*
* This does NOT call processMessage itself β€” that's the caller's job, so
* the existing event-streaming pipeline in the server stays in one place.
*/
prepareRegenerate(): {
userText: string
attachments: ChatImageAttachmentInput[]
} | null {
type ContentBlock = {
type: string
text?: string
mimeType?: string
data?: string
name?: string
sizeBytes?: number
}
type RawMsg = { role: string; content?: string | ContentBlock[] }

const messages = this.piAgent.state.messages as RawMsg[]
let lastAssistantIdx = -1
for (let i = messages.length - 1; i >= 0; i--) {
if (messages[i].role === 'assistant') {
lastAssistantIdx = i
break
}
}
if (lastAssistantIdx < 0) return null

let lastUserIdx = -1
for (let i = lastAssistantIdx - 1; i >= 0; i--) {
if (messages[i].role === 'user') {
lastUserIdx = i
break
}
}
if (lastUserIdx < 0) return null

const userMsg = messages[lastUserIdx]
let userText = ''
const attachments: ChatImageAttachmentInput[] = []
if (typeof userMsg.content === 'string') {
userText = userMsg.content
} else if (Array.isArray(userMsg.content)) {
for (const block of userMsg.content) {
if (block.type === 'text') userText += block.text ?? ''
else if (block.type === 'image' && block.data && block.mimeType) {
attachments.push({
id: `regen-${attachments.length}`,
name: block.name ?? 'image',
mimeType: block.mimeType,
data: block.data,
sizeBytes: block.sizeBytes ?? 0,
})
}
}
}

// Truncate to before the last user message β€” caller will re-feed it.
this.piAgent.replaceMessages(messages.slice(0, lastUserIdx) as AgentMessage[])
this.persist()
return { userText, attachments }
}

/** Get the current tools array. Used by fork sub-agents. */
getTools(): import('@mariozechner/pi-agent-core').AgentTool[] {
return [...this.piAgent.state.tools]
Expand Down
23 changes: 23 additions & 0 deletions packages/agent-core/src/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,29 @@ export function logScore(
})
}

/**
* Log feedback (e.g. thumbs up/down) against a span by its id. Works AFTER
* the span has been ended β€” Braintrust treats this as a post-hoc score on
* the persisted event.
*/
export function logSpanFeedback(
spanId: string,
scores: Record<string, number>,
metadata?: Record<string, unknown>,
): void {
if (!tracingEnabled || !logger) return
try {
logger.logFeedback({
id: spanId,
scores,
...(metadata ? { metadata } : {}),
source: 'app',
})
} catch (err) {
log.warn({ err, spanId }, 'logSpanFeedback failed')
}
}

// ── Cost estimation ──────────────────────────────────────────────────

/** Price per million tokens: { input, output } in USD. */
Expand Down
Loading