Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 154 additions & 35 deletions hypaware-core/plugins-workspace/codex/src/exchange-projector.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ function openAiChatMessageToProjected(message) {
function openAiResponsesMessages(reqBody, responseBody, streamEvents) {
/** @type {AiGatewayProjectedMessage[]} */
const messages = responsesInputMessages(reqBody.input)
const assistant = responsesAssistantFromBody(responseBody)
?? responsesAssistantFromStream(streamEvents)
if (assistant) messages.push(assistant)
let assistant = responsesAssistantMessagesFromBody(responseBody)
if (assistant.length === 0) assistant = responsesAssistantMessagesFromStream(streamEvents)
for (const msg of assistant) messages.push(msg)
return messages
}

/**
* Mirror `codex/src/backfill.js`: fan items out so each `function_call` /
* `function_call_output` becomes its own projected message.
*
* @param {unknown} input
* @returns {AiGatewayProjectedMessage[]}
*/
Expand All @@ -283,6 +286,17 @@ function responsesInputMessages(input) {
const out = []
for (const item of input) {
if (!isPlainObject(item)) continue
const itemType = stringValue(item.type)
if (itemType === 'function_call' || itemType === 'custom_tool_call') {
const block = toolUseBlockFromPayload(item)
if (block) out.push({ role: 'assistant', content: [block] })
continue
}
if (itemType === 'function_call_output' || itemType === 'custom_tool_call_output') {
const block = toolResultBlockFromPayload(item)
if (block) out.push({ role: 'tool', content: [block] })
continue
}
const role = stringValue(item.role) ?? 'user'
const blocks = openAiContentBlocks(item.content)
if (blocks.length === 0) continue
Expand All @@ -309,58 +323,78 @@ function openAiContentBlocks(content) {
}

/**
* Fan out response `output[]` items so each becomes its own assistant
* message — same per-item shape `responsesInputMessages` produces for
* replayed input items, so turn-1 response rows hash equal to turn-2
* input rows in the kernel's content-hash dedupe.
*
* @param {unknown} responseBody
* @returns {AiGatewayProjectedMessage | undefined}
* @returns {AiGatewayProjectedMessage[]}
*/
function responsesAssistantFromBody(responseBody) {
if (!isPlainObject(responseBody)) return undefined
const outputText = stringValue(responseBody.output_text)
if (outputText) {
return { role: 'assistant', content: [{ type: 'text', text: outputText }] }
}
function responsesAssistantMessagesFromBody(responseBody) {
if (!isPlainObject(responseBody)) return []
/** @type {AiGatewayProjectedMessage[]} */
const out = []
let sawMessage = false
const output = Array.isArray(responseBody.output) ? responseBody.output : []
/** @type {JsonObject[]} */
const content = []
for (const item of output) {
if (!isPlainObject(item)) continue
if (item.type === 'message' || item.role === 'assistant') {
content.push(...openAiContentBlocks(item.content))
const itemType = stringValue(item.type)
if (itemType === 'function_call' || itemType === 'custom_tool_call') {
const block = toolUseBlockFromPayload(item)
if (block) out.push({ role: 'assistant', content: [block] })
} else if (itemType === 'message' || item.role === 'assistant') {
const blocks = openAiContentBlocks(item.content)
if (blocks.length > 0) {
out.push({ role: 'assistant', content: blocks })
sawMessage = true
}
}
}
if (content.length === 0) return undefined
return { role: 'assistant', content }
if (!sawMessage) {
const outputText = stringValue(responseBody.output_text)
if (outputText) out.unshift({ role: 'assistant', content: [{ type: 'text', text: outputText }] })
}
return out
}

/**
* Stitch a streamed OpenAI Responses assistant message back together
* from the captured SSE events. Each `response.output_text.delta`
* carries a `delta` token; `response.completed` may also carry the
* full final text as a snapshot, which we treat as a fallback when no
* deltas arrived.
* Stitch streamed Responses assistant messages from SSE events. When
* `response.completed` arrives, its body is preferred (already per-item
* via `responsesAssistantMessagesFromBody`); streamed text and tool_uses
* not represented there are merged in so a truncated completed body
* cannot silently drop captured content.
*
* @param {Array<{ event: string, data: string }>} streamEvents
* @returns {AiGatewayProjectedMessage | undefined}
* @returns {AiGatewayProjectedMessage[]}
*/
function responsesAssistantFromStream(streamEvents) {
function responsesAssistantMessagesFromStream(streamEvents) {
let text = ''
/** @type {string | undefined} */
let responseId
/** @type {Map<string, JsonObject>} */
const toolUsesByCallId = new Map()
/** @type {AiGatewayProjectedMessage[]} */
let completedMessages = []
for (const row of streamEvents) {
const payload = parseEventData(row.data)
if (!isPlainObject(payload)) continue
const type = stringValue(payload.type) ?? stringValue(row.event)
if (type === 'response.output_text.delta' || type === 'response.output_text.annotation.added') {
const delta = stringValue(payload.delta)
if (delta) text += delta
} else if (type === 'response.output_item.done') {
const item = isPlainObject(payload.item) ? payload.item : undefined
if (item) {
const block = toolUseBlockFromPayload(item)
if (block) {
const id = stringValue(block.id)
if (id && !toolUsesByCallId.has(id)) toolUsesByCallId.set(id, block)
}
}
} else if (type === 'response.completed') {
const response = isPlainObject(payload.response) ? payload.response : payload
const completed = responsesAssistantFromBody(response)
if (!text && completed) {
const completedText = textFromBlocks(/** @type {JsonObject[]} */ (
Array.isArray(completed.content) ? completed.content : []
))
if (completedText) text = completedText
}
completedMessages = responsesAssistantMessagesFromBody(response)
const maybeId = stringValue(payload.id) ?? stringValue(/** @type {Record<string, unknown>} */ (response).id)
if (maybeId) responseId = maybeId
} else if (type === 'response.created' && !responseId) {
Expand All @@ -369,13 +403,41 @@ function responsesAssistantFromStream(streamEvents) {
if (maybeId) responseId = maybeId
}
}
if (!text) return undefined
/** @type {AiGatewayProjectedMessage} */
const message = { role: 'assistant', content: [{ type: 'text', text }] }
/** @type {AiGatewayProjectedMessage[]} */
let messages
if (completedMessages.length > 0) {
messages = [...completedMessages]
/** @type {Set<string>} */
const seenCallIds = new Set()
let hasTextMessage = false
for (const msg of messages) {
if (!Array.isArray(msg.content)) continue
for (const block of msg.content) {
const blockType = stringValue(block.type)
if (blockType === 'text') hasTextMessage = true
if (blockType === 'tool_use') {
const id = stringValue(block.id)
if (id) seenCallIds.add(id)
}
}
}
if (!hasTextMessage && text) {
messages.unshift({ role: 'assistant', content: [{ type: 'text', text }] })
}
for (const block of toolUsesByCallId.values()) {
const id = stringValue(block.id)
if (id && !seenCallIds.has(id)) messages.push({ role: 'assistant', content: [block] })
}
} else {
messages = []
if (text) messages.push({ role: 'assistant', content: [{ type: 'text', text }] })
for (const block of toolUsesByCallId.values()) messages.push({ role: 'assistant', content: [block] })
}
if (messages.length === 0) return []
if (responseId) {
message.raw_frame = { response_id: responseId }
for (const msg of messages) msg.raw_frame = { ...msg.raw_frame, response_id: responseId }
}
return message
return messages
}

// ---------------------------------------------------------------------
Expand Down Expand Up @@ -696,6 +758,63 @@ function firstChoice(responseBody) {
return isPlainObject(choice) ? choice : undefined
}

/**
* Mirror `codex/src/backfill.js` so live-captured tool calls land in the
* same shape as backfilled ones.
*
* @param {Record<string, unknown>} payload
* @returns {JsonObject | undefined}
*/
function toolUseBlockFromPayload(payload) {
const name = stringValue(payload.name)
const callId = stringValue(payload.call_id) ?? stringValue(payload.id)
if (!name || !callId) return undefined
const rawArgs = payload.arguments !== undefined ? payload.arguments : payload.input
return { type: 'tool_use', id: callId, name, input: normalizeToolInput(rawArgs) }
}

/**
* @param {Record<string, unknown>} payload
* @returns {JsonObject | undefined}
*/
function toolResultBlockFromPayload(payload) {
const callId = stringValue(payload.call_id) ?? stringValue(payload.id)
if (!callId) return undefined
const text = toolOutputText(payload.output)
/** @type {JsonObject} */
const block = { type: 'tool_result', tool_use_id: callId }
if (text !== undefined) block.content = text
return block
}

/** @param {unknown} value */
function normalizeToolInput(value) {
if (typeof value === 'string') {
const parsed = parseMaybeJson(value)
return parsed === value ? value : /** @type {any} */ (parsed)
}
if (value === undefined) return null
return /** @type {any} */ (value)
}

/**
* Codex tool output can arrive as a string, a `{ output | content | text }`
* wrapper, or a structured payload — fall back to JSON.stringify so the
* row keeps a faithful trace.
*
* @param {unknown} output
* @returns {string | undefined}
*/
function toolOutputText(output) {
if (typeof output === 'string') return output.length > 0 ? output : undefined
if (isPlainObject(output)) {
const inner = stringValue(output.output) ?? stringValue(output.content) ?? stringValue(output.text)
return inner ?? JSON.stringify(output)
}
if (output === undefined || output === null) return undefined
return JSON.stringify(output)
}

/** @param {JsonObject[]} blocks */
function textFromBlocks(blocks) {
const parts = blocks
Expand Down
Loading