From 9194ffdb4c58705f9589f6931c486e976490452f Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 2 Jul 2026 14:40:34 +0300 Subject: [PATCH 1/3] refactor: overhaul plugin architecture --- .../middleware/openAiResponsesContinuation.ts | 92 ---- agent/models/AgentModeResolver.ts | 9 - agent/turn/TurnContextBuilder.ts | 36 -- agent/turn/TurnLifecycleService.ts | 47 -- agent/turn/TurnPersistenceService.ts | 33 -- agent/turn/TurnPromptBuilder.ts | 51 --- agent/turn/TurnStreamConsumer.ts | 69 --- agentTurnService.ts | 314 -------------- application/ports.ts | 53 +++ application/runTurnUseCase.ts | 408 ++++++++++++++++++ agentEvents.ts => domain/agentEvents.ts | 2 +- {agent => domain}/languageDetect.ts | 0 {agent => domain}/systemPrompt.ts | 2 +- {agent => domain}/toolCallEvents.ts | 0 {agent/turn => domain}/turnTypes.ts | 27 +- .../vegaLiteStreamBuffer.ts | 2 +- index.ts | 99 +++-- .../AgentContext.ts => llm/agentContext.ts | 6 +- agent/simpleAgent.ts => llm/agentModels.ts | 0 .../AgentRuntime.ts => llm/agentRuntime.ts | 46 +- llm/langGraphLlm.ts | 66 +++ .../middleware/apiToolsMiddleware.ts | 12 +- {agent => llm}/middleware/sequenceDebug.ts | 2 +- .../modelFactory.ts | 4 +- llm/streamAdapter.ts | 77 ++++ .../checkpointStore.ts | 0 .../sessionStore.ts | 43 +- errors.ts => shared/errors.ts | 0 .../sanitizeSpeechText.ts | 0 tests/session_store.test.ts | 2 +- tests/sse_emitter.test.ts | 2 +- tests/stream_adapter.test.ts | 59 +++ tests/system_prompt.test.ts | 2 +- tests/turn_flow.test.ts | 253 ++++++----- tests/units.test.ts | 31 +- .../agentToolProvider.ts | 4 +- apiBasedTools.ts => tools/apiBasedTools.ts | 0 {agent/tools => tools}/apiTool.ts | 2 +- {agent/tools => tools}/fetchSkill.ts | 2 +- {agent/tools => tools}/fetchToolSchema.ts | 2 +- {agent/tools => tools}/getUserLocation.ts | 0 {agent/tools => tools}/index.ts | 2 +- {agent/tools => tools}/navigateUser.ts | 2 +- {agent => tools}/skills/registry.ts | 0 .../http/chatSurfaceEndpoints.ts | 0 {endpoints => transport/http}/context.ts | 4 +- .../http/coreEndpoints.ts | 8 +- .../http/sessionEndpoints.ts | 8 +- .../sse/sseWriter.ts | 19 +- .../surfaces/chatSurfaceService.ts | 20 +- .../surfaces/speechTurnService.ts | 6 +- types.ts | 14 +- 52 files changed, 1042 insertions(+), 900 deletions(-) delete mode 100644 agent/middleware/openAiResponsesContinuation.ts delete mode 100644 agent/models/AgentModeResolver.ts delete mode 100644 agent/turn/TurnContextBuilder.ts delete mode 100644 agent/turn/TurnLifecycleService.ts delete mode 100644 agent/turn/TurnPersistenceService.ts delete mode 100644 agent/turn/TurnPromptBuilder.ts delete mode 100644 agent/turn/TurnStreamConsumer.ts delete mode 100644 agentTurnService.ts create mode 100644 application/ports.ts create mode 100644 application/runTurnUseCase.ts rename agentEvents.ts => domain/agentEvents.ts (95%) rename {agent => domain}/languageDetect.ts (100%) rename {agent => domain}/systemPrompt.ts (99%) rename {agent => domain}/toolCallEvents.ts (100%) rename {agent/turn => domain}/turnTypes.ts (73%) rename agent/turn/VegaLiteStreamBuffer.ts => domain/vegaLiteStreamBuffer.ts (97%) rename agent/runtime/AgentContext.ts => llm/agentContext.ts (80%) rename agent/simpleAgent.ts => llm/agentModels.ts (100%) rename agent/runtime/AgentRuntime.ts => llm/agentRuntime.ts (60%) create mode 100644 llm/langGraphLlm.ts rename agent/middleware/apiBasedTools.ts => llm/middleware/apiToolsMiddleware.ts (94%) rename {agent => llm}/middleware/sequenceDebug.ts (99%) rename agent/models/AgentModelFactory.ts => llm/modelFactory.ts (85%) create mode 100644 llm/streamAdapter.ts rename agent/checkpointer.ts => persistence/checkpointStore.ts (100%) rename sessionStore.ts => persistence/sessionStore.ts (73%) rename errors.ts => shared/errors.ts (100%) rename sanitizeSpeechText.ts => shared/sanitizeSpeechText.ts (100%) create mode 100644 tests/stream_adapter.test.ts rename agent/tools/AgentToolProvider.ts => tools/agentToolProvider.ts (86%) rename apiBasedTools.ts => tools/apiBasedTools.ts (100%) rename {agent/tools => tools}/apiTool.ts (96%) rename {agent/tools => tools}/fetchSkill.ts (98%) rename {agent/tools => tools}/fetchToolSchema.ts (94%) rename {agent/tools => tools}/getUserLocation.ts (100%) rename {agent/tools => tools}/index.ts (94%) rename {agent/tools => tools}/navigateUser.ts (98%) rename {agent => tools}/skills/registry.ts (100%) rename endpoints/chatSurfaces.ts => transport/http/chatSurfaceEndpoints.ts (100%) rename {endpoints => transport/http}/context.ts (94%) rename endpoints/core.ts => transport/http/coreEndpoints.ts (94%) rename endpoints/sessions.ts => transport/http/sessionEndpoints.ts (94%) rename surfaces/web-sse/createSseEventEmitter.ts => transport/sse/sseWriter.ts (86%) rename chatSurfaceService.ts => transport/surfaces/chatSurfaceService.ts (92%) rename agent/speech/SpeechTurnService.ts => transport/surfaces/speechTurnService.ts (96%) diff --git a/agent/middleware/openAiResponsesContinuation.ts b/agent/middleware/openAiResponsesContinuation.ts deleted file mode 100644 index 6dbeda7..0000000 --- a/agent/middleware/openAiResponsesContinuation.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { AIMessage } from "@langchain/core/messages"; -import { createMiddleware } from "langchain"; - -type OpenAiResponsesMetadata = { - id?: string; -}; - -type OpenAiResponsesContext = { - sessionId: string; - turnId: string; -}; - -function getTurnKey(context: OpenAiResponsesContext) { - return `${context.sessionId}:${context.turnId}`; -} - -function getResponseId(message: AIMessage) { - const metadata = message.response_metadata as OpenAiResponsesMetadata | undefined; - return metadata?.id ?? null; -} - -function getPreviousResponseId(modelSettings?: Record) { - return (modelSettings as { previous_response_id?: string } | undefined) - ?.previous_response_id; -} - -function getContinuationMessages( - messages: T[], - previousResponseId: string, -) { - let continuationStartIndex: number | null = null; - - for (let index = messages.length - 1; index >= 0; index -= 1) { - const message = messages[index]; - - if ( - AIMessage.isInstance(message) && - (message.response_metadata as OpenAiResponsesMetadata | undefined)?.id === - previousResponseId - ) { - continuationStartIndex = index + 1; - break; - } - } - - if (continuationStartIndex === null) { - return null; - } - - return messages.slice(continuationStartIndex); -} - -export function createOpenAiResponsesContinuationMiddleware() { - const responseIdsByTurn = new Map(); - - return createMiddleware({ - name: "OpenAiResponsesContinuationMiddleware", - async wrapModelCall(request, handler) { - const context = request.runtime.context as OpenAiResponsesContext; - const turnKey = getTurnKey(context); - const previousResponseId = - getPreviousResponseId(request.modelSettings) ?? - responseIdsByTurn.get(turnKey); - const continuationMessages = previousResponseId - ? getContinuationMessages(request.messages, previousResponseId) - : null; - - const response = await handler( - previousResponseId && continuationMessages - ? { - ...request, - messages: continuationMessages, - modelSettings: { - ...request.modelSettings, - previous_response_id: previousResponseId, - }, - } - : request, - ) as AIMessage; - - const responseId = getResponseId(response); - - if (responseId) { - responseIdsByTurn.set(turnKey, responseId); - } else { - responseIdsByTurn.delete(turnKey); - } - - return response; - }, - }); -} diff --git a/agent/models/AgentModeResolver.ts b/agent/models/AgentModeResolver.ts deleted file mode 100644 index 50f5bd4..0000000 --- a/agent/models/AgentModeResolver.ts +++ /dev/null @@ -1,9 +0,0 @@ -import type { PluginOptions } from "../../types.js"; - -export class AgentModeResolver { - constructor(private readonly options: PluginOptions) {} - - resolve(modeName?: string | null) { - return this.options.modes.find((mode) => mode.name === modeName) ?? this.options.modes[0]; - } -} diff --git a/agent/turn/TurnContextBuilder.ts b/agent/turn/TurnContextBuilder.ts deleted file mode 100644 index 5ccfd15..0000000 --- a/agent/turn/TurnContextBuilder.ts +++ /dev/null @@ -1,36 +0,0 @@ -import type { AdminUser, IAdminForth } from "adminforth"; -import type { AgentTurnContext, BaseAgentTurnInput } from "./turnTypes.js"; - -export type UserContextProvider = { - getUserTimeZone(adminUser: AdminUser): Promise | string | null | undefined; -}; - -export class TurnContextBuilder { - constructor( - private readonly getAdminforth: () => IAdminForth, - private readonly userContextProvider?: UserContextProvider, - ) {} - - async build(input: { - base: BaseAgentTurnInput; - turnId: string; - }): Promise { - const adminforth = this.getAdminforth(); - - return { - adminUser: input.base.adminUser, - sessionId: input.base.sessionId, - turnId: input.turnId, - abortSignal: input.base.abortSignal, - currentPage: input.base.currentPage, - chatSurface: input.base.chatSurface, - userTimeZone: - input.base.userTimeZone - ?? await this.userContextProvider?.getUserTimeZone(input.base.adminUser) - ?? "UTC", - adminPublicOrigin: - input.base.adminPublicOrigin - ?? adminforth.config.baseUrlSlashed, - }; - } -} diff --git a/agent/turn/TurnLifecycleService.ts b/agent/turn/TurnLifecycleService.ts deleted file mode 100644 index 6987f1c..0000000 --- a/agent/turn/TurnLifecycleService.ts +++ /dev/null @@ -1,47 +0,0 @@ -import type { AgentSessionStore } from "../../sessionStore.js"; -import type { PluginOptions } from "../../types.js"; -import type { BaseAgentTurnInput } from "./turnTypes.js"; -import { TurnPersistenceService } from "./TurnPersistenceService.js"; - -export class TurnLifecycleService { - constructor( - private readonly sessionStore: AgentSessionStore, - private readonly persistence: TurnPersistenceService, - private readonly options: PluginOptions, - ) {} - - async start(input: BaseAgentTurnInput) { - const previousUserMessages = await this.sessionStore.getPreviousUserMessages(input.sessionId); - const turnId = await this.sessionStore.createNewTurn(input.sessionId, input.prompt); - await this.persistence.touchSession(input.sessionId); - - return { - turnId, - previousUserMessages, - }; - } - - async resume(input: BaseAgentTurnInput) { - const latestTurn = await this.sessionStore.getLatestTurn(input.sessionId); - - if (!latestTurn) { - throw new Error(`No agent turn found for session "${input.sessionId}".`); - } - - return { - turnId: latestTurn[this.options.turnResource.idField], - previousUserMessages: await this.sessionStore.getPreviousUserMessages(input.sessionId), - initialResponse: latestTurn[this.options.turnResource.responseField] === "not_finished" - ? "" - : String(latestTurn[this.options.turnResource.responseField]), - }; - } - - async finish(input: { - turnId: string; - responseText: string; - debugHistory?: unknown; - }) { - await this.persistence.saveTurnResponse(input); - } -} diff --git a/agent/turn/TurnPersistenceService.ts b/agent/turn/TurnPersistenceService.ts deleted file mode 100644 index abfee05..0000000 --- a/agent/turn/TurnPersistenceService.ts +++ /dev/null @@ -1,33 +0,0 @@ -import type { IAdminForth } from "adminforth"; -import type { PluginOptions } from "../../types.js"; - -export class TurnPersistenceService { - constructor( - private readonly getAdminforth: () => IAdminForth, - private readonly options: PluginOptions, - ) {} - - async touchSession(sessionId: string) { - await this.getAdminforth().resource(this.options.sessionResource.resourceId).update(sessionId, { - [this.options.sessionResource.createdAtField]: new Date().toISOString(), - }); - } - - async saveTurnResponse(input: { - turnId: string; - responseText: string; - debugHistory?: unknown; - }) { - const turnUpdates: Record = { - [this.options.turnResource.responseField]: input.responseText, - }; - - if (this.options.turnResource.debugField) { - turnUpdates[this.options.turnResource.debugField] = input.debugHistory; - } - - await this.getAdminforth() - .resource(this.options.turnResource.resourceId) - .update(input.turnId, turnUpdates); - } -} diff --git a/agent/turn/TurnPromptBuilder.ts b/agent/turn/TurnPromptBuilder.ts deleted file mode 100644 index 8c04965..0000000 --- a/agent/turn/TurnPromptBuilder.ts +++ /dev/null @@ -1,51 +0,0 @@ -import type { AdminUser, IAdminForth } from "adminforth"; -import { logger } from "adminforth"; -import { HumanMessage, SystemMessage } from "langchain"; -import { detectUserLanguage, type PreviousUserMessage } from "../languageDetect.js"; -import { buildAgentTurnSystemPrompt } from "../systemPrompt.js"; -import { getErrorMessage, isAbortError } from "../../errors.js"; -import type { AgentModeCompletionAdapter } from "../simpleAgent.js"; - -export class TurnPromptBuilder { - constructor( - private readonly options: { - getAgentSystemPrompt: () => Promise; - getAdminforth: () => IAdminForth; - }, - ) {} - - async build(input: { - prompt: string; - previousUserMessages: PreviousUserMessage[]; - adminUser: AdminUser; - completionAdapter: AgentModeCompletionAdapter; - chatSurface?: string; - abortSignal?: AbortSignal; - }) { - const adminforth = this.options.getAdminforth(); - const userLanguage = await detectUserLanguage( - input.completionAdapter, - input.prompt, - input.previousUserMessages, - ).catch((error) => { - if (input.abortSignal?.aborted || isAbortError(error)) { - throw error; - } - - logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); - return null; - }); - const systemPrompt = buildAgentTurnSystemPrompt({ - agentSystemPrompt: await this.options.getAgentSystemPrompt(), - adminUser: input.adminUser, - usernameField: adminforth.config.auth!.usernameField, - userLanguage, - chatSurface: input.chatSurface, - }); - - return [ - new SystemMessage(systemPrompt), - new HumanMessage(input.prompt), - ]; - } -} diff --git a/agent/turn/TurnStreamConsumer.ts b/agent/turn/TurnStreamConsumer.ts deleted file mode 100644 index 8f306a9..0000000 --- a/agent/turn/TurnStreamConsumer.ts +++ /dev/null @@ -1,69 +0,0 @@ -import type { AgentEventEmitter } from "../../agentEvents.js"; -import { VegaLiteStreamBuffer } from "./VegaLiteStreamBuffer.js"; - -export class TurnStreamConsumer { - async consume(input: { - stream: AsyncIterable<["messages", [any, any]] | ["updates", Record]>; - abortSignal?: AbortSignal; - emit?: AgentEventEmitter; - onInterrupt?: (interrupt: unknown) => void | Promise; - }) { - let fullResponse = ""; - const textBuffer = new VegaLiteStreamBuffer(); - - for await (const [mode, chunk] of input.stream) { - if (input.abortSignal?.aborted) { - throw new DOMException("This operation was aborted", "AbortError"); - } - - if (mode === "updates") { - if ("__interrupt__" in chunk) { - await input.onInterrupt?.(chunk.__interrupt__); - } - continue; - } - - const [token, metadata] = chunk; - const nodeName = - typeof metadata?.langgraph_node === "string" - ? metadata.langgraph_node - : ""; - - if (nodeName && !["model", "model_request"].includes(nodeName)) { - continue; - } - - const blocks = Array.isArray(token?.contentBlocks) - ? token.contentBlocks - : Array.isArray(token?.content) - ? token.content - : []; - const reasoningDelta = blocks - .filter((block: any) => block?.type === "reasoning") - .map((block: any) => String(block.reasoning ?? "")) - .join(""); - const textDelta = blocks - .filter((block: any) => block?.type === "text") - .map((block: any) => String(block.text ?? "")) - .join(""); - - if (reasoningDelta) { - await input.emit?.({ - type: "reasoning-delta", - delta: reasoningDelta, - }); - } - - if (textDelta) { - fullResponse += textDelta; - await textBuffer.push(textDelta, input.emit); - } - } - - await textBuffer.flush(input.emit); - - return { - text: fullResponse, - }; - } -} diff --git a/agentTurnService.ts b/agentTurnService.ts deleted file mode 100644 index b49cc24..0000000 --- a/agentTurnService.ts +++ /dev/null @@ -1,314 +0,0 @@ -import { logger } from "adminforth"; -import { randomUUID } from "crypto"; -import { Command } from "@langchain/langgraph"; -import { AgentModelFactory } from "./agent/models/AgentModelFactory.js"; -import { AgentModeResolver } from "./agent/models/AgentModeResolver.js"; -import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js"; -import { AgentRuntime } from "./agent/runtime/AgentRuntime.js"; -import { TurnContextBuilder } from "./agent/turn/TurnContextBuilder.js"; -import { TurnLifecycleService } from "./agent/turn/TurnLifecycleService.js"; -import { TurnPromptBuilder } from "./agent/turn/TurnPromptBuilder.js"; -import { TurnStreamConsumer } from "./agent/turn/TurnStreamConsumer.js"; -import type { - BaseAgentTurnInput, - HandleTurnInput, - PreparedAgentTurn, - RunAndPersistAgentResponseInput, - RunAndPersistAgentResponseResult, -} from "./agent/turn/turnTypes.js"; -import { getErrorMessage, isAbortError } from "./errors.js"; - -export type { - BaseAgentTurnInput, - HandleSpeechTurnInput, - HandleTurnInput, - RunAndPersistAgentResponseInput, - RunAndPersistAgentResponseResult, -} from "./agent/turn/turnTypes.js"; - -function getApprovalDecision(input: BaseAgentTurnInput) { - return "approvalDecision" in input - && (input.approvalDecision === "approve" || input.approvalDecision === "reject") - ? input.approvalDecision - : undefined; -} - -function getInterruptItems(interrupt: unknown): unknown[] { - return Array.isArray(interrupt) ? interrupt : [interrupt]; -} - -function getHitlInterrupts(interrupt: unknown): { id: string; count: number }[] { - return getInterruptItems(interrupt).flatMap((item) => { - const value = item && typeof item === "object" && "value" in item - ? (item as { value: unknown }).value - : item; - const actionRequests = value && typeof value === "object" - ? (value as { actionRequests?: unknown }).actionRequests - : undefined; - const interruptId = item && typeof item === "object" - ? (item as { id?: unknown }).id - : undefined; - - return typeof interruptId === "string" && Array.isArray(actionRequests) - ? [{ id: interruptId, count: actionRequests.length }] - : []; - }); -} - -function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { - if (decision === "approve") { - return { type: "approve" as const }; - } - - return { - type: "reject" as const, - message: prompt - ? `User rejected the pending tool execution and sent a new instruction instead: ${prompt}` - : "User rejected executing this tool", - }; -} - -function buildHitlResumeValue(input: { - decision: "approve" | "reject"; - count: number; - prompt?: string; -}) { - return { - decisions: Array.from({ length: input.count }, () => ( - buildHitlDecision(input.decision, input.prompt) - )), - }; -} - -function buildLangGraphResume(input: { - decision: "approve" | "reject"; - interrupts?: { id: string; count: number }[]; - prompt?: string; -}) { - const interrupts = input.interrupts ?? []; - - if (interrupts.length === 0) { - throw new Error("No pending approval interrupt found for resume."); - } - - if (interrupts.length === 1) { - return buildHitlResumeValue({ - decision: input.decision, - count: interrupts[0].count, - prompt: input.prompt, - }); - } - - return Object.fromEntries( - interrupts.map((interrupt) => [ - interrupt.id, - buildHitlResumeValue({ - decision: input.decision, - count: interrupt.count, - prompt: input.prompt, - }), - ]), - ); -} - -export class AgentTurnService { - private readonly pendingInterrupts = new Map(); - - constructor( - private readonly lifecycle: TurnLifecycleService, - private readonly contextBuilder: TurnContextBuilder, - private readonly modeResolver: AgentModeResolver, - private readonly modelFactory: AgentModelFactory, - private readonly promptBuilder: TurnPromptBuilder, - private readonly runtime: AgentRuntime, - private readonly streamConsumer: TurnStreamConsumer, - ) {} - - private async prepareTurn(input: BaseAgentTurnInput): Promise { - const sequenceDebugCollector = createSequenceDebugCollector(); - const approvalDecision = getApprovalDecision(input); - const shouldResume = Boolean(approvalDecision); - const pendingInterrupts = this.pendingInterrupts.get(input.sessionId); - if (shouldResume && (!pendingInterrupts || pendingInterrupts.length === 0)) { - throw new Error(`No pending approval interrupt found for session "${input.sessionId}".`); - } - const lifecycleTurn = shouldResume - ? await this.lifecycle.resume(input) - : await this.lifecycle.start(input); - const context = await this.contextBuilder.build({ - base: input, - turnId: lifecycleTurn.turnId, - }); - - return { - prompt: input.prompt, - sessionId: input.sessionId, - turnId: lifecycleTurn.turnId, - previousUserMessages: lifecycleTurn.previousUserMessages, - modeName: input.modeName, - context, - observability: { - emit: undefined, - sequenceDebugSink: sequenceDebugCollector, - }, - resume: shouldResume - ? { - decision: approvalDecision!, - interrupts: pendingInterrupts, - } - : undefined, - initialResponse: shouldResume && "initialResponse" in lifecycleTurn - ? (lifecycleTurn as { initialResponse?: string }).initialResponse - : undefined, - }; - } - - private async runAgentTurn(input: PreparedAgentTurn) { - const selectedMode = this.modeResolver.resolve(input.modeName); - const [models, messages] = await Promise.all([ - this.modelFactory.create(selectedMode.completionAdapter), - this.promptBuilder.build({ - prompt: input.prompt, - previousUserMessages: input.previousUserMessages, - adminUser: input.context.adminUser, - completionAdapter: selectedMode.completionAdapter, - chatSurface: input.context.chatSurface, - abortSignal: input.context.abortSignal, - }), - ]); - const stream = await this.runtime.stream({ - models, - input: input.resume - ? new Command({ - resume: buildLangGraphResume({ - decision: input.resume.decision, - interrupts: input.resume.interrupts, - prompt: input.prompt, - }), - }) - : { messages }, - context: input.context, - observability: input.observability, - }); - - let interrupted = false; - try { - return await this.streamConsumer.consume({ - stream: stream as AsyncIterable<["messages", [any, any]] | ["updates", Record]>, - abortSignal: input.context.abortSignal, - emit: input.observability.emit, - onInterrupt: async (interrupt) => { - interrupted = true; - const interrupts = getHitlInterrupts(interrupt); - const pendingInterrupts = this.pendingInterrupts.get(input.sessionId) ?? []; - const mergedInterrupts = new Map( - pendingInterrupts.map((pendingInterrupt) => [ - pendingInterrupt.id, - pendingInterrupt.count, - ]), - ); - - for (const pendingInterrupt of interrupts) { - mergedInterrupts.set(pendingInterrupt.id, pendingInterrupt.count); - } - - this.pendingInterrupts.set( - input.sessionId, - [...mergedInterrupts.entries()].map(([id, count]) => ({ id, count })), - ); - await input.observability.emit?.({ - type: "interrupt", - sessionId: input.sessionId, - interrupt, - }); - }, - }); - } finally { - if (!interrupted) { - this.pendingInterrupts.delete(input.sessionId); - } - } - } - - async runAndPersistAgentResponse( - input: RunAndPersistAgentResponseInput, - ): Promise { - const preparedTurn = await this.prepareTurn(input); - preparedTurn.observability.emit = input.emit; - - let fullResponse = preparedTurn.initialResponse ?? ""; - let aborted = false; - let failed = false; - - try { - const agentResponse = await this.runAgentTurn(preparedTurn); - fullResponse += agentResponse.text; - } catch (error) { - if (input.abortSignal?.aborted || isAbortError(error)) { - aborted = true; - logger.info(input.abortLogMessage); - } else { - failed = true; - fullResponse = getErrorMessage(error); - logger.error(`${input.failureLogMessage}:\n${fullResponse}`); - } - } - - preparedTurn.observability.sequenceDebugSink.flush(); - await this.lifecycle.finish({ - turnId: preparedTurn.turnId, - responseText: fullResponse, - debugHistory: preparedTurn.observability.sequenceDebugSink.getHistory(), - }); - - return { - text: fullResponse, - turnId: preparedTurn.turnId, - aborted, - failed, - }; - } - - async handleTurn(input: HandleTurnInput) { - await input.emit({ - type: "turn-started", - messageId: randomUUID(), - }); - - const agentResponse = await this.runAndPersistAgentResponse({ - prompt: input.prompt, - sessionId: input.sessionId, - modeName: input.modeName, - userTimeZone: input.userTimeZone, - currentPage: input.currentPage, - chatSurface: input.chatSurface, - adminPublicOrigin: input.adminPublicOrigin, - approvalDecision: input.approvalDecision, - abortSignal: input.abortSignal, - adminUser: input.adminUser, - emit: input.emit, - failureLogMessage: input.failureLogMessage ?? "Agent response failed", - abortLogMessage: input.abortLogMessage ?? "Agent response aborted", - }); - - if (agentResponse.failed) { - await input.emit({ - type: "error", - error: agentResponse.text, - }); - } else if (!agentResponse.aborted) { - await input.emit({ - type: "response", - text: agentResponse.text, - sessionId: input.sessionId, - turnId: agentResponse.turnId, - }); - } - - await input.emit({ - type: "finish", - }); - - return agentResponse; - } -} diff --git a/application/ports.ts b/application/ports.ts new file mode 100644 index 0000000..02cb4da --- /dev/null +++ b/application/ports.ts @@ -0,0 +1,53 @@ +import type { AgentModeCompletionAdapter } from "../llm/agentModels.js"; +import type { DetectedLanguage, PreviousUserMessage } from "../domain/languageDetect.js"; +import type { + AgentMessage, + AgentStreamChunk, + AgentTurnContext, + AgentTurnObservability, +} from "../domain/turnTypes.js"; + +/** + * Input for a single streamed agent turn. Either a fresh set of messages, or a + * human-in-the-loop resume payload for a previously interrupted turn. + */ +export type LlmStreamInput = { + completionAdapter: AgentModeCompletionAdapter; + input: { messages: AgentMessage[] } | { resume: unknown }; + context: AgentTurnContext; + observability: AgentTurnObservability; +}; + +/** + * The boundary between the application layer and the concrete LLM runtime + * (LangChain / LangGraph). Everything provider-specific — model construction, + * the agent graph, middleware, and the raw stream shape — lives behind this + * port. The application layer depends only on this interface, so the turn + * orchestration is testable with a scripted fake and reusable across providers. + */ +export interface LlmPort { + /** + * Run one turn and return a normalized stream of typed chunks. + */ + streamTurn(input: LlmStreamInput): Promise>; + + /** + * Detect the language the assistant should answer in for the current message. + */ + detectLanguage(input: { + completionAdapter: AgentModeCompletionAdapter; + prompt: string; + previousUserMessages: PreviousUserMessage[]; + }): Promise; + + /** + * Rebuild the pending human-in-the-loop interrupts for a session from persisted + * state (the checkpointer). Used as a fallback when the in-process interrupt + * cache is empty — e.g. after a restart or on a second instance — so approvals + * survive process boundaries. Returns raw interrupt objects (LangGraph shape). + */ + getPendingInterrupts(input: { + completionAdapter: AgentModeCompletionAdapter; + sessionId: string; + }): Promise; +} diff --git a/application/runTurnUseCase.ts b/application/runTurnUseCase.ts new file mode 100644 index 0000000..8492920 --- /dev/null +++ b/application/runTurnUseCase.ts @@ -0,0 +1,408 @@ +import { logger, type IAdminForth } from "adminforth"; +import { randomUUID } from "crypto"; +import { createSequenceDebugCollector } from "../llm/middleware/sequenceDebug.js"; +import { VegaLiteStreamBuffer } from "../domain/vegaLiteStreamBuffer.js"; +import { buildAgentTurnSystemPrompt } from "../domain/systemPrompt.js"; +import { getErrorMessage, isAbortError } from "../shared/errors.js"; +import type { PreviousUserMessage } from "../domain/languageDetect.js"; +import type { AgentSessionStore } from "../persistence/sessionStore.js"; +import type { PluginOptions } from "../types.js"; +import type { LlmPort } from "./ports.js"; +import type { + AgentMessage, + AgentTurnContext, + AgentTurnObservability, + BaseAgentTurnInput, + HandleTurnInput, + RunAndPersistAgentResponseInput, + RunAndPersistAgentResponseResult, +} from "../domain/turnTypes.js"; + +type AgentMode = PluginOptions["modes"][number]; + +type PendingInterrupt = { id: string; count: number }; + +type PreparedTurn = { + prompt: string; + sessionId: string; + turnId: string; + previousUserMessages: PreviousUserMessage[]; + mode: AgentMode; + context: AgentTurnContext; + observability: AgentTurnObservability; + resume?: { decision: "approve" | "reject"; interrupts?: PendingInterrupt[] }; + initialResponse?: string; +}; + +function getApprovalDecision(input: BaseAgentTurnInput) { + return "approvalDecision" in input + && (input.approvalDecision === "approve" || input.approvalDecision === "reject") + ? input.approvalDecision + : undefined; +} + +function getInterruptItems(interrupt: unknown): unknown[] { + return Array.isArray(interrupt) ? interrupt : [interrupt]; +} + +function getHitlInterrupts(interrupt: unknown): PendingInterrupt[] { + return getInterruptItems(interrupt).flatMap((item) => { + const value = item && typeof item === "object" && "value" in item + ? (item as { value: unknown }).value + : item; + const actionRequests = value && typeof value === "object" + ? (value as { actionRequests?: unknown }).actionRequests + : undefined; + const interruptId = item && typeof item === "object" + ? (item as { id?: unknown }).id + : undefined; + + return typeof interruptId === "string" && Array.isArray(actionRequests) + ? [{ id: interruptId, count: actionRequests.length }] + : []; + }); +} + +function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { + if (decision === "approve") { + return { type: "approve" as const }; + } + + return { + type: "reject" as const, + message: prompt + ? `User rejected the pending tool execution and sent a new instruction instead: ${prompt}` + : "User rejected executing this tool", + }; +} + +function buildHitlResumeValue(input: { + decision: "approve" | "reject"; + count: number; + prompt?: string; +}) { + return { + decisions: Array.from({ length: input.count }, () => ( + buildHitlDecision(input.decision, input.prompt) + )), + }; +} + +/** + * Build the provider-agnostic resume payload for a human-in-the-loop turn. + * Exported for unit testing; the LLM port wraps it into a LangGraph Command. + */ +export function buildResumeValue(input: { + decision: "approve" | "reject"; + interrupts?: PendingInterrupt[]; + prompt?: string; +}) { + const interrupts = input.interrupts ?? []; + + if (interrupts.length === 0) { + throw new Error("No pending approval interrupt found for resume."); + } + + if (interrupts.length === 1) { + return buildHitlResumeValue({ + decision: input.decision, + count: interrupts[0].count, + prompt: input.prompt, + }); + } + + return Object.fromEntries( + interrupts.map((interrupt) => [ + interrupt.id, + buildHitlResumeValue({ + decision: input.decision, + count: interrupt.count, + prompt: input.prompt, + }), + ]), + ); +} + +export type RunTurnUseCaseDeps = { + llm: LlmPort; + sessions: AgentSessionStore; + modes: PluginOptions["modes"]; + getAdminforth: () => IAdminForth; + getAgentSystemPrompt: () => Promise; +}; + +/** + * Single entry point for running an agent turn: prepare (start/resume) → build + * the prompt → stream from the LLM port → consume the typed stream into SSE + * events → persist the result. Replaces the former AgentTurnService plus the + * TurnLifecycle/Context/Prompt/Persistence/StreamConsumer + ModeResolver stack. + */ +export class RunTurnUseCase { + private readonly pendingInterrupts = new Map(); + + constructor(private readonly deps: RunTurnUseCaseDeps) {} + + private resolveMode(modeName?: string | null): AgentMode { + return this.deps.modes.find((mode) => mode.name === modeName) ?? this.deps.modes[0]; + } + + private buildContext(input: BaseAgentTurnInput, turnId: string): AgentTurnContext { + return { + adminUser: input.adminUser, + sessionId: input.sessionId, + turnId, + abortSignal: input.abortSignal, + currentPage: input.currentPage, + chatSurface: input.chatSurface, + userTimeZone: input.userTimeZone ?? "UTC", + adminPublicOrigin: + input.adminPublicOrigin ?? this.deps.getAdminforth().config.baseUrlSlashed, + }; + } + + /** + * Resolve the pending HITL interrupts for a resume. Prefers the in-process cache + * (populated when the interrupt fired this run) and falls back to the persisted + * checkpoint via the LLM port when the cache is empty (restart / other instance). + */ + private async resolvePendingInterrupts(sessionId: string, mode: AgentMode): Promise { + const cached = this.pendingInterrupts.get(sessionId); + if (cached && cached.length > 0) { + return cached; + } + const raw = await this.deps.llm + .getPendingInterrupts({ completionAdapter: mode.completionAdapter, sessionId }) + .catch(() => [] as unknown[]); + return getHitlInterrupts(raw); + } + + private async prepareTurn(input: RunAndPersistAgentResponseInput): Promise { + const sequenceDebugSink = createSequenceDebugCollector(); + const mode = this.resolveMode(input.modeName); + const approvalDecision = getApprovalDecision(input); + const shouldResume = Boolean(approvalDecision); + + let turnId: string; + let previousUserMessages: PreviousUserMessage[] = []; + let initialResponse: string | undefined; + let resumeInterrupts: PendingInterrupt[] | undefined; + + if (shouldResume) { + resumeInterrupts = await this.resolvePendingInterrupts(input.sessionId, mode); + if (resumeInterrupts.length === 0) { + throw new Error(`No pending approval interrupt found for session "${input.sessionId}".`); + } + const resumeState = await this.deps.sessions.getResumeState(input.sessionId); + turnId = resumeState.turnId; + initialResponse = resumeState.initialResponse; + } else { + previousUserMessages = await this.deps.sessions.getPreviousUserMessages(input.sessionId); + turnId = await this.deps.sessions.createNewTurn(input.sessionId, input.prompt); + await this.deps.sessions.touchSession(input.sessionId); + } + + return { + prompt: input.prompt, + sessionId: input.sessionId, + turnId, + previousUserMessages, + mode, + context: this.buildContext(input, turnId), + observability: { + emit: input.emit, + sequenceDebugSink, + }, + resume: shouldResume + ? { decision: approvalDecision!, interrupts: resumeInterrupts } + : undefined, + initialResponse, + }; + } + + private async buildMessages(prepared: PreparedTurn): Promise { + const userLanguage = await this.deps.llm + .detectLanguage({ + completionAdapter: prepared.mode.completionAdapter, + prompt: prepared.prompt, + previousUserMessages: prepared.previousUserMessages, + }) + .catch((error) => { + if (prepared.context.abortSignal?.aborted || isAbortError(error)) { + throw error; + } + logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); + return null; + }); + + const adminforth = this.deps.getAdminforth(); + const systemPrompt = buildAgentTurnSystemPrompt({ + agentSystemPrompt: await this.deps.getAgentSystemPrompt(), + adminUser: prepared.context.adminUser, + usernameField: adminforth.config.auth!.usernameField, + userLanguage, + chatSurface: prepared.context.chatSurface, + }); + + return [ + { role: "system", content: systemPrompt }, + { role: "user", content: prepared.prompt }, + ]; + } + + private async handleInterrupt(prepared: PreparedTurn, interrupt: unknown) { + const interrupts = getHitlInterrupts(interrupt); + const existing = this.pendingInterrupts.get(prepared.sessionId) ?? []; + const merged = new Map(existing.map((item) => [item.id, item.count])); + for (const item of interrupts) { + merged.set(item.id, item.count); + } + this.pendingInterrupts.set( + prepared.sessionId, + [...merged.entries()].map(([id, count]) => ({ id, count })), + ); + await prepared.observability.emit?.({ + type: "interrupt", + sessionId: prepared.sessionId, + interrupt, + }); + } + + private async runAgentTurn(prepared: PreparedTurn): Promise<{ text: string }> { + const streamInput = prepared.resume + ? { + resume: buildResumeValue({ + decision: prepared.resume.decision, + interrupts: prepared.resume.interrupts, + prompt: prepared.prompt, + }), + } + : { messages: await this.buildMessages(prepared) }; + + const stream = await this.deps.llm.streamTurn({ + completionAdapter: prepared.mode.completionAdapter, + input: streamInput, + context: prepared.context, + observability: prepared.observability, + }); + + const { emit } = prepared.observability; + const abortSignal = prepared.context.abortSignal; + const textBuffer = new VegaLiteStreamBuffer(); + let fullResponse = ""; + let interrupted = false; + + try { + for await (const chunk of stream) { + if (abortSignal?.aborted) { + throw new DOMException("This operation was aborted", "AbortError"); + } + + if (chunk.kind === "interrupt") { + interrupted = true; + await this.handleInterrupt(prepared, chunk.interrupt); + continue; + } + + if (chunk.kind === "reasoning") { + if (chunk.delta) { + await emit?.({ type: "reasoning-delta", delta: chunk.delta }); + } + continue; + } + + if (chunk.delta) { + fullResponse += chunk.delta; + await textBuffer.push(chunk.delta, emit); + } + } + + await textBuffer.flush(emit); + return { text: fullResponse }; + } finally { + if (!interrupted) { + this.pendingInterrupts.delete(prepared.sessionId); + } + } + } + + async runAndPersistAgentResponse( + input: RunAndPersistAgentResponseInput, + ): Promise { + const prepared = await this.prepareTurn(input); + + let fullResponse = prepared.initialResponse ?? ""; + let aborted = false; + let failed = false; + + try { + const agentResponse = await this.runAgentTurn(prepared); + fullResponse += agentResponse.text; + } catch (error) { + if (input.abortSignal?.aborted || isAbortError(error)) { + aborted = true; + logger.info(input.abortLogMessage); + } else { + failed = true; + fullResponse = getErrorMessage(error); + logger.error(`${input.failureLogMessage}:\n${fullResponse}`); + } + } + + prepared.observability.sequenceDebugSink.flush(); + await this.deps.sessions.saveTurnResponse({ + turnId: prepared.turnId, + responseText: fullResponse, + debugHistory: prepared.observability.sequenceDebugSink.getHistory(), + }); + + return { + text: fullResponse, + turnId: prepared.turnId, + aborted, + failed, + }; + } + + async handleTurn(input: HandleTurnInput) { + await input.emit({ + type: "turn-started", + messageId: randomUUID(), + }); + + const agentResponse = await this.runAndPersistAgentResponse({ + prompt: input.prompt, + sessionId: input.sessionId, + modeName: input.modeName, + userTimeZone: input.userTimeZone, + currentPage: input.currentPage, + chatSurface: input.chatSurface, + adminPublicOrigin: input.adminPublicOrigin, + approvalDecision: input.approvalDecision, + abortSignal: input.abortSignal, + adminUser: input.adminUser, + emit: input.emit, + failureLogMessage: input.failureLogMessage ?? "Agent response failed", + abortLogMessage: input.abortLogMessage ?? "Agent response aborted", + }); + + if (agentResponse.failed) { + await input.emit({ + type: "error", + error: agentResponse.text, + }); + } else if (!agentResponse.aborted) { + await input.emit({ + type: "response", + text: agentResponse.text, + sessionId: input.sessionId, + turnId: agentResponse.turnId, + }); + } + + await input.emit({ + type: "finish", + }); + + return agentResponse; + } +} diff --git a/agentEvents.ts b/domain/agentEvents.ts similarity index 95% rename from agentEvents.ts rename to domain/agentEvents.ts index 1625cda..70c8448 100644 --- a/agentEvents.ts +++ b/domain/agentEvents.ts @@ -1,4 +1,4 @@ -import type { ToolCallEvent } from "./agent/toolCallEvents.js"; +import type { ToolCallEvent } from "./toolCallEvents.js"; export type AgentEvent = | { diff --git a/agent/languageDetect.ts b/domain/languageDetect.ts similarity index 100% rename from agent/languageDetect.ts rename to domain/languageDetect.ts diff --git a/agent/systemPrompt.ts b/domain/systemPrompt.ts similarity index 99% rename from agent/systemPrompt.ts rename to domain/systemPrompt.ts index 010324d..ea6fab6 100644 --- a/agent/systemPrompt.ts +++ b/domain/systemPrompt.ts @@ -3,7 +3,7 @@ import type { DetectedLanguage } from "./languageDetect.js"; import { listProjectSkillManifests, type AgentSkillManifest, -} from "./skills/registry.js"; +} from "../tools/skills/registry.js"; export const DEFAULT_AGENT_SYSTEM_PROMPT = [ "You are helpful AI Assistant for Admin Panel.", diff --git a/agent/toolCallEvents.ts b/domain/toolCallEvents.ts similarity index 100% rename from agent/toolCallEvents.ts rename to domain/toolCallEvents.ts diff --git a/agent/turn/turnTypes.ts b/domain/turnTypes.ts similarity index 73% rename from agent/turn/turnTypes.ts rename to domain/turnTypes.ts index a260297..1febba4 100644 --- a/agent/turn/turnTypes.ts +++ b/domain/turnTypes.ts @@ -1,11 +1,11 @@ import type { AdminUser, AudioAdapter } from "adminforth"; import type { Messages } from "@langchain/langgraph"; import type { Command } from "@langchain/langgraph"; -import type { AgentChatModel, AgentMiddleware } from "../simpleAgent.js"; -import type { SequenceDebugCollector } from "../middleware/sequenceDebug.js"; -import type { PreviousUserMessage } from "../languageDetect.js"; +import type { AgentChatModel, AgentMiddleware } from "../llm/agentModels.js"; +import type { SequenceDebugCollector } from "../llm/middleware/sequenceDebug.js"; +import type { PreviousUserMessage } from "./languageDetect.js"; import type { CurrentPageContext } from "../tools/getUserLocation.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "./agentEvents.js"; export type BaseAgentTurnInput = { prompt: string; @@ -98,3 +98,22 @@ export type RunAndPersistAgentResponseResult = { export type HandleTurnInput = TextAgentTurnInput; export type HandleSpeechTurnInput = SpeechAgentTurnInput; + +/** + * Provider-agnostic message passed from the application layer to the LLM port. + * The infrastructure adapter maps it onto concrete LangChain messages. + */ +export type AgentMessage = { + role: "system" | "user"; + content: string; +}; + +/** + * Normalized unit of a streamed agent turn. The LLM port converts the raw + * LangGraph stream into these typed chunks so the application layer never + * touches LangGraph's tuple/`any` stream shape. + */ +export type AgentStreamChunk = + | { kind: "text"; delta: string } + | { kind: "reasoning"; delta: string } + | { kind: "interrupt"; interrupt: unknown }; diff --git a/agent/turn/VegaLiteStreamBuffer.ts b/domain/vegaLiteStreamBuffer.ts similarity index 97% rename from agent/turn/VegaLiteStreamBuffer.ts rename to domain/vegaLiteStreamBuffer.ts index 69ef2b3..8edbf5f 100644 --- a/agent/turn/VegaLiteStreamBuffer.ts +++ b/domain/vegaLiteStreamBuffer.ts @@ -1,4 +1,4 @@ -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "./agentEvents.js"; const VEGA_LITE_FENCE_START = "```vega-lite"; const FENCE_END = "```"; diff --git a/index.ts b/index.ts index 400e35c..aa6c1f8 100644 --- a/index.ts +++ b/index.ts @@ -4,31 +4,26 @@ import type { IHttpServer, } from "adminforth"; -import { AdminForthPlugin } from "adminforth"; +import { AdminForthPlugin, logger } from "adminforth"; import type { PluginOptions } from './types.js'; import { MemorySaver, type BaseCheckpointSaver } from "@langchain/langgraph"; -import { AdminForthCheckpointSaver } from "./agent/checkpointer.js"; -import { appendCustomSystemPrompt, buildAgentSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./agent/systemPrompt.js"; -import { setupCoreEndpoints } from "./endpoints/core.js"; -import { setupSessionEndpoints } from "./endpoints/sessions.js"; -import { setupChatSurfaceEndpoints } from "./endpoints/chatSurfaces.js"; -import type { AgentEndpointsContext } from "./endpoints/context.js"; -import { AgentSessionStore } from "./sessionStore.js"; -import { ChatSurfaceService } from "./chatSurfaceService.js"; -import { AgentTurnService } from "./agentTurnService.js"; -import { AgentModelFactory } from "./agent/models/AgentModelFactory.js"; -import { AgentModeResolver } from "./agent/models/AgentModeResolver.js"; -import { AgentRuntime } from "./agent/runtime/AgentRuntime.js"; -import { SpeechTurnService } from "./agent/speech/SpeechTurnService.js"; -import { AgentToolProvider } from "./agent/tools/AgentToolProvider.js"; -import { TurnContextBuilder } from "./agent/turn/TurnContextBuilder.js"; -import { TurnLifecycleService } from "./agent/turn/TurnLifecycleService.js"; -import { TurnPersistenceService } from "./agent/turn/TurnPersistenceService.js"; -import { TurnPromptBuilder } from "./agent/turn/TurnPromptBuilder.js"; -import { TurnStreamConsumer } from "./agent/turn/TurnStreamConsumer.js"; +import { AdminForthCheckpointSaver } from "./persistence/checkpointStore.js"; +import { appendCustomSystemPrompt, buildAgentSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./domain/systemPrompt.js"; +import { setupCoreEndpoints } from "./transport/http/coreEndpoints.js"; +import { setupSessionEndpoints } from "./transport/http/sessionEndpoints.js"; +import { setupChatSurfaceEndpoints } from "./transport/http/chatSurfaceEndpoints.js"; +import type { AgentEndpointsContext } from "./transport/http/context.js"; +import { AgentSessionStore } from "./persistence/sessionStore.js"; +import { ChatSurfaceService } from "./transport/surfaces/chatSurfaceService.js"; +import { RunTurnUseCase } from "./application/runTurnUseCase.js"; +import { LangGraphLlm } from "./llm/langGraphLlm.js"; +import { AgentModelFactory } from "./llm/modelFactory.js"; +import { AgentRuntime } from "./llm/agentRuntime.js"; +import { SpeechTurnService } from "./transport/surfaces/speechTurnService.js"; +import { AgentToolProvider } from "./tools/agentToolProvider.js"; -export type { AgentEvent, AgentEventEmitter } from "./agentEvents.js"; +export type { AgentEvent, AgentEventEmitter } from "./domain/agentEvents.js"; export default class AdminForthAgentPlugin extends AdminForthPlugin { options: PluginOptions; @@ -37,9 +32,10 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { private agentSystemPrompt: string | null = null; private checkpointer: BaseCheckpointSaver | null = null; private sessionStore: AgentSessionStore; - private agentTurnService: AgentTurnService; + private runTurnUseCase: RunTurnUseCase; private speechTurnService: SpeechTurnService; private chatSurfaceService: ChatSurfaceService; + private getCheckpointer() { if (this.checkpointer) return this.checkpointer; @@ -58,6 +54,22 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { ].filter((resourceId): resourceId is string => Boolean(resourceId)); } + private async getAgentSystemPrompt(): Promise { + if (!this.agentSystemPrompt) { + const systemPrompt = await buildAgentSystemPrompt( + this.adminforth, + this.getInternalAgentResourceIds(), + ).catch((err) => { + logger.error( + `Failed to build agent system prompt, falling back to default: ${err instanceof Error ? err.message : String(err)}`, + ); + return DEFAULT_AGENT_SYSTEM_PROMPT; + }); + this.agentSystemPrompt = appendCustomSystemPrompt(systemPrompt, this.options.systemPrompt); + } + return this.agentSystemPrompt; + } + constructor(options: PluginOptions) { super(options, import.meta.url); this.options = options; @@ -72,39 +84,26 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { getCheckpointer: this.getCheckpointer.bind(this), toolProvider, }); - const persistence = new TurnPersistenceService(() => this.adminforth, this.options); - this.agentTurnService = new AgentTurnService( - new TurnLifecycleService(this.sessionStore, persistence, this.options), - new TurnContextBuilder(() => this.adminforth), - new AgentModeResolver(this.options), - new AgentModelFactory(this.options.maxTokens ?? 1000), - new TurnPromptBuilder({ - getAdminforth: () => this.adminforth, - getAgentSystemPrompt: async () => { - if (!this.agentSystemPrompt) { - const systemPrompt = await buildAgentSystemPrompt( - this.adminforth, - this.getInternalAgentResourceIds(), - ).catch((err) => { - return DEFAULT_AGENT_SYSTEM_PROMPT; - }); - this.agentSystemPrompt = appendCustomSystemPrompt(systemPrompt, this.options.systemPrompt); - } - return this.agentSystemPrompt; - }, - }), + const llm = new LangGraphLlm( runtime, - new TurnStreamConsumer(), + new AgentModelFactory(this.options.maxTokens ?? 1000), ); + this.runTurnUseCase = new RunTurnUseCase({ + llm, + sessions: this.sessionStore, + modes: this.options.modes, + getAdminforth: () => this.adminforth, + getAgentSystemPrompt: this.getAgentSystemPrompt.bind(this), + }); this.speechTurnService = new SpeechTurnService( - this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), ); this.chatSurfaceService = new ChatSurfaceService( () => this.adminforth, this.options, this.sessionStore, - this.agentTurnService.handleTurn.bind(this.agentTurnService), - this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + this.runTurnUseCase.handleTurn.bind(this.runTurnUseCase), + this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), ); this.agentSystemPromptPromise = Promise.resolve( appendCustomSystemPrompt(DEFAULT_AGENT_SYSTEM_PROMPT, this.options.systemPrompt), @@ -151,7 +150,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { throw new Error("sessionResource is required for AdminForthAgentPlugin"); } } - + validateConfigAfterDiscover(adminforth: IAdminForth, resourceConfig: AdminForthResource) { this.options.audioAdapter?.validate(); for (const chatSurfaceAdapter of this.options.chatSurfaceAdapters ?? []) { @@ -176,9 +175,9 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { const endpointContext = { adminforth: this.adminforth, options: this.options, - handleTurn: this.agentTurnService.handleTurn.bind(this.agentTurnService), + handleTurn: this.runTurnUseCase.handleTurn.bind(this.runTurnUseCase), handleSpeechTurn: this.speechTurnService.handle.bind(this.speechTurnService), - runAndPersistAgentResponse: this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + runAndPersistAgentResponse: this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), getSessionTurns: this.sessionStore.getSessionTurns.bind(this.sessionStore), createNewTurn: this.sessionStore.createNewTurn.bind(this.sessionStore), createSystemTurn: this.sessionStore.createSystemTurn.bind(this.sessionStore), diff --git a/agent/runtime/AgentContext.ts b/llm/agentContext.ts similarity index 80% rename from agent/runtime/AgentContext.ts rename to llm/agentContext.ts index 12d3d65..9747e62 100644 --- a/agent/runtime/AgentContext.ts +++ b/llm/agentContext.ts @@ -1,9 +1,9 @@ import type { AdminUser } from "adminforth"; import { z } from "zod"; -import type { AgentEventEmitter } from "../../agentEvents.js"; -import type { SequenceDebugCollector } from "../middleware/sequenceDebug.js"; +import type { AgentEventEmitter } from "../domain/agentEvents.js"; +import type { SequenceDebugCollector } from "./middleware/sequenceDebug.js"; import type { CurrentPageContext } from "../tools/getUserLocation.js"; -import type { AgentTurnContext } from "../turn/turnTypes.js"; +import type { AgentTurnContext } from "../domain/turnTypes.js"; export const contextSchema = z.object({ adminUser: z.custom(), diff --git a/agent/simpleAgent.ts b/llm/agentModels.ts similarity index 100% rename from agent/simpleAgent.ts rename to llm/agentModels.ts diff --git a/agent/runtime/AgentRuntime.ts b/llm/agentRuntime.ts similarity index 60% rename from agent/runtime/AgentRuntime.ts rename to llm/agentRuntime.ts index ef8a8f8..cb808a5 100644 --- a/agent/runtime/AgentRuntime.ts +++ b/llm/agentRuntime.ts @@ -1,13 +1,13 @@ import type { IAdminForth } from "adminforth"; import { createAgent, summarizationMiddleware, humanInTheLoopMiddleware } from "langchain"; import type { BaseCheckpointSaver } from "@langchain/langgraph"; -import { createApiBasedToolsMiddleware } from "../middleware/apiBasedTools.js"; -import { createSequenceDebugMiddleware } from "../middleware/sequenceDebug.js"; -import { createAgentLlmMetricsLogger } from "../simpleAgent.js"; -import type { AgentToolProvider } from "../tools/AgentToolProvider.js"; -import type { AgentRuntimeRunInput } from "../turn/turnTypes.js"; -import { contextSchema, toLangchainAgentContext } from "./AgentContext.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import { createApiBasedToolsMiddleware } from "./middleware/apiToolsMiddleware.js"; +import { createSequenceDebugMiddleware } from "./middleware/sequenceDebug.js"; +import { createAgentLlmMetricsLogger } from "./agentModels.js"; +import type { AgentToolProvider } from "../tools/agentToolProvider.js"; +import type { AgentRuntimeRunInput, AgentTurnModels } from "../domain/turnTypes.js"; +import { contextSchema, toLangchainAgentContext } from "./agentContext.js"; +import type { ApiBasedTool } from "../tools/apiBasedTools.js"; function createHumanInTheLoopInterrupts( apiBasedTools: Record, @@ -86,4 +86,36 @@ export class AgentRuntime { }), }); } + + /** + * Read the pending human-in-the-loop interrupts persisted for a thread from the + * checkpointer. Builds a minimal agent (model + checkpointer + HITL middleware) and + * queries its state — no model call is made. Returns raw LangGraph interrupt objects. + */ + async getPendingInterrupts(input: { + models: AgentTurnModels; + sessionId: string; + }): Promise { + const apiBasedTools = this.options.toolProvider.getApiBasedTools(); + const tools = await this.options.toolProvider.getTools(apiBasedTools); + const agent = createAgent({ + name: this.options.name, + model: input.models.model, + checkpointer: this.options.getCheckpointer(), + tools, + contextSchema, + middleware: [ + humanInTheLoopMiddleware({ + interruptOn: createHumanInTheLoopInterrupts(apiBasedTools), + descriptionPrefix: "Tool execution pending approval", + }), + ], + }); + + const state = await (agent as { getState: (config: unknown) => Promise }).getState({ + configurable: { thread_id: input.sessionId }, + }); + const tasks = Array.isArray(state?.tasks) ? state.tasks : []; + return tasks.flatMap((task: any) => (Array.isArray(task?.interrupts) ? task.interrupts : [])); + } } diff --git a/llm/langGraphLlm.ts b/llm/langGraphLlm.ts new file mode 100644 index 0000000..bfb2b5a --- /dev/null +++ b/llm/langGraphLlm.ts @@ -0,0 +1,66 @@ +import { Command } from "@langchain/langgraph"; +import { HumanMessage, SystemMessage } from "langchain"; +import type { AgentRuntime } from "./agentRuntime.js"; +import type { AgentModelFactory } from "./modelFactory.js"; +import { detectUserLanguage, type PreviousUserMessage } from "../domain/languageDetect.js"; +import type { AgentModeCompletionAdapter } from "./agentModels.js"; +import { adaptRawStream } from "./streamAdapter.js"; +import type { LlmPort, LlmStreamInput } from "../application/ports.js"; +import type { AgentMessage } from "../domain/turnTypes.js"; + +function toLangchainMessages(messages: AgentMessage[]) { + return messages.map((message) => + message.role === "system" + ? new SystemMessage(message.content) + : new HumanMessage(message.content), + ); +} + +/** + * LangChain/LangGraph implementation of {@link LlmPort}. This is the single + * place that knows about the agent graph, model construction, and the raw + * stream shape; the application layer talks only to the port. + */ +export class LangGraphLlm implements LlmPort { + constructor( + private readonly runtime: AgentRuntime, + private readonly modelFactory: AgentModelFactory, + ) {} + + async streamTurn(input: LlmStreamInput) { + const models = await this.modelFactory.create(input.completionAdapter); + const runtimeInput = + "resume" in input.input + ? new Command({ resume: input.input.resume }) + : { messages: toLangchainMessages(input.input.messages) }; + + const rawStream = await this.runtime.stream({ + models, + input: runtimeInput as any, + context: input.context, + observability: input.observability, + }); + + return adaptRawStream(rawStream as AsyncIterable); + } + + detectLanguage(input: { + completionAdapter: AgentModeCompletionAdapter; + prompt: string; + previousUserMessages: PreviousUserMessage[]; + }) { + return detectUserLanguage( + input.completionAdapter, + input.prompt, + input.previousUserMessages, + ); + } + + async getPendingInterrupts(input: { + completionAdapter: AgentModeCompletionAdapter; + sessionId: string; + }): Promise { + const models = await this.modelFactory.create(input.completionAdapter); + return this.runtime.getPendingInterrupts({ models, sessionId: input.sessionId }); + } +} diff --git a/agent/middleware/apiBasedTools.ts b/llm/middleware/apiToolsMiddleware.ts similarity index 94% rename from agent/middleware/apiBasedTools.ts rename to llm/middleware/apiToolsMiddleware.ts index 500282c..f342450 100644 --- a/agent/middleware/apiBasedTools.ts +++ b/llm/middleware/apiToolsMiddleware.ts @@ -5,16 +5,16 @@ import { logger, type AdminUser, type IAdminForth } from "adminforth"; import { formatApiBasedToolCall, type ApiBasedTool, -} from "../../apiBasedTools.js"; +} from "../../tools/apiBasedTools.js"; import { createToolCallTracker, type ToolCallEventSink, -} from "../toolCallEvents.js"; -import { ALWAYS_AVAILABLE_API_TOOL_NAMES } from "../tools/index.js"; -import { createApiTool } from "../tools/apiTool.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +} from "../../domain/toolCallEvents.js"; +import { ALWAYS_AVAILABLE_API_TOOL_NAMES } from "../../tools/index.js"; +import { createApiTool } from "../../tools/apiTool.js"; +import type { AgentEventEmitter } from "../../domain/agentEvents.js"; import type { SequenceDebugCollector } from "./sequenceDebug.js"; -import { isAbortError } from "../../errors.js"; +import { isAbortError } from "../../shared/errors.js"; function getEnabledApiToolNames(messages: unknown[]) { const enabledToolNames = new Set(); diff --git a/agent/middleware/sequenceDebug.ts b/llm/middleware/sequenceDebug.ts similarity index 99% rename from agent/middleware/sequenceDebug.ts rename to llm/middleware/sequenceDebug.ts index f24939d..6294059 100644 --- a/agent/middleware/sequenceDebug.ts +++ b/llm/middleware/sequenceDebug.ts @@ -1,7 +1,7 @@ import { AIMessage } from "@langchain/core/messages"; import { createMiddleware } from "langchain"; import YAML from "yaml"; -import type { ToolCallEvent } from "../toolCallEvents.js"; +import type { ToolCallEvent } from "../../domain/toolCallEvents.js"; export type SequenceDebugResultType = "tool_calls" | "final_text"; diff --git a/agent/models/AgentModelFactory.ts b/llm/modelFactory.ts similarity index 85% rename from agent/models/AgentModelFactory.ts rename to llm/modelFactory.ts index 66beca8..eefc0fc 100644 --- a/agent/models/AgentModelFactory.ts +++ b/llm/modelFactory.ts @@ -1,6 +1,6 @@ import type { CompletionAdapter } from "adminforth"; -import { createAgentChatModel } from "../simpleAgent.js"; -import type { AgentTurnModels } from "../turn/turnTypes.js"; +import { createAgentChatModel } from "./agentModels.js"; +import type { AgentTurnModels } from "../domain/turnTypes.js"; export class AgentModelFactory { constructor(private readonly maxTokens: number) {} diff --git a/llm/streamAdapter.ts b/llm/streamAdapter.ts new file mode 100644 index 0000000..dd5327a --- /dev/null +++ b/llm/streamAdapter.ts @@ -0,0 +1,77 @@ +import type { AgentStreamChunk } from "../domain/turnTypes.js"; + +// Only tokens emitted by the model node(s) are surfaced to the user; other +// graph nodes (tools, summarization, etc.) are internal. +const STREAMABLE_NODES = ["model", "model_request"]; + +type RawStreamEntry = + | ["messages", [any, any]] + | ["updates", Record]; + +function extractBlocks(token: any): any[] { + if (Array.isArray(token?.contentBlocks)) { + return token.contentBlocks; + } + if (Array.isArray(token?.content)) { + return token.content; + } + return []; +} + +/** + * Convert one raw LangGraph stream entry into zero or more normalized chunks. + * Pure and synchronous so the parsing rules can be unit-tested in isolation. + */ +export function parseRawStreamChunk(entry: RawStreamEntry): AgentStreamChunk[] { + const [mode, chunk] = entry; + + if (mode === "updates") { + if (chunk && typeof chunk === "object" && "__interrupt__" in chunk) { + return [{ kind: "interrupt", interrupt: (chunk as Record).__interrupt__ }]; + } + return []; + } + + const [token, metadata] = chunk as [any, any]; + const nodeName = + typeof metadata?.langgraph_node === "string" ? metadata.langgraph_node : ""; + + if (nodeName && !STREAMABLE_NODES.includes(nodeName)) { + return []; + } + + const blocks = extractBlocks(token); + const chunks: AgentStreamChunk[] = []; + + const reasoning = blocks + .filter((block: any) => block?.type === "reasoning") + .map((block: any) => String(block.reasoning ?? "")) + .join(""); + const text = blocks + .filter((block: any) => block?.type === "text") + .map((block: any) => String(block.text ?? "")) + .join(""); + + if (reasoning) { + chunks.push({ kind: "reasoning", delta: reasoning }); + } + if (text) { + chunks.push({ kind: "text", delta: text }); + } + + return chunks; +} + +/** + * Adapt the raw LangGraph `["messages" | "updates", ...]` stream into a typed + * `AgentStreamChunk` stream for the application layer. + */ +export async function* adaptRawStream( + raw: AsyncIterable, +): AsyncGenerator { + for await (const entry of raw) { + for (const chunk of parseRawStreamChunk(entry)) { + yield chunk; + } + } +} diff --git a/agent/checkpointer.ts b/persistence/checkpointStore.ts similarity index 100% rename from agent/checkpointer.ts rename to persistence/checkpointStore.ts diff --git a/sessionStore.ts b/persistence/sessionStore.ts similarity index 73% rename from sessionStore.ts rename to persistence/sessionStore.ts index dc16784..54aa069 100644 --- a/sessionStore.ts +++ b/persistence/sessionStore.ts @@ -2,8 +2,8 @@ import type { AdminUser, IAdminForth } from "adminforth"; import { Filters, Sorts } from "adminforth"; import { randomUUID } from "crypto"; import type { ChatSurfaceIncomingMessage } from "adminforth"; -import type { PreviousUserMessage } from "./agent/languageDetect.js"; -import type { PluginOptions } from "./types.js"; +import type { PreviousUserMessage } from "../domain/languageDetect.js"; +import type { PluginOptions } from "../types.js"; export const AGENT_SYSTEM_TURN_PROMPT = "__adminforth_system_message__"; @@ -102,4 +102,43 @@ export class AgentSessionStore { return sessionId; } + + async touchSession(sessionId: string) { + await this.getAdminforth().resource(this.options.sessionResource.resourceId).update(sessionId, { + [this.options.sessionResource.createdAtField]: new Date().toISOString(), + }); + } + + async saveTurnResponse(input: { + turnId: string; + responseText: string; + debugHistory?: unknown; + }) { + const turnUpdates: Record = { + [this.options.turnResource.responseField]: input.responseText, + }; + + if (this.options.turnResource.debugField) { + turnUpdates[this.options.turnResource.debugField] = input.debugHistory; + } + + await this.getAdminforth() + .resource(this.options.turnResource.resourceId) + .update(input.turnId, turnUpdates); + } + + async getResumeState(sessionId: string): Promise<{ turnId: string; initialResponse: string }> { + const latestTurn = await this.getLatestTurn(sessionId); + + if (!latestTurn) { + throw new Error(`No agent turn found for session "${sessionId}".`); + } + + const response = latestTurn[this.options.turnResource.responseField]; + + return { + turnId: latestTurn[this.options.turnResource.idField], + initialResponse: response === "not_finished" ? "" : String(response), + }; + } } diff --git a/errors.ts b/shared/errors.ts similarity index 100% rename from errors.ts rename to shared/errors.ts diff --git a/sanitizeSpeechText.ts b/shared/sanitizeSpeechText.ts similarity index 100% rename from sanitizeSpeechText.ts rename to shared/sanitizeSpeechText.ts diff --git a/tests/session_store.test.ts b/tests/session_store.test.ts index a3a37ce..fe3ab81 100644 --- a/tests/session_store.test.ts +++ b/tests/session_store.test.ts @@ -1,7 +1,7 @@ import { AgentSessionStore, AGENT_SYSTEM_TURN_PROMPT, -} from '../sessionStore.js'; +} from '../persistence/sessionStore.js'; // Characterization tests for session/turn persistence. Uses a fake AdminForth resource // so we freeze the field mapping and the read/transform logic (previous-message windowing, diff --git a/tests/sse_emitter.test.ts b/tests/sse_emitter.test.ts index 4b47d05..d5db630 100644 --- a/tests/sse_emitter.test.ts +++ b/tests/sse_emitter.test.ts @@ -1,4 +1,4 @@ -import { createSseEventEmitter } from '../surfaces/web-sse/createSseEventEmitter.js'; +import { createSseEventEmitter } from '../transport/sse/sseWriter.js'; // Characterization tests for the outbound SSE wire contract. This is the plugin's // external streaming protocol (consumed by the Vercel-AI-UI frontend on /agent/response diff --git a/tests/stream_adapter.test.ts b/tests/stream_adapter.test.ts new file mode 100644 index 0000000..cc7b004 --- /dev/null +++ b/tests/stream_adapter.test.ts @@ -0,0 +1,59 @@ +import { parseRawStreamChunk } from '../llm/streamAdapter.js'; + +// Characterization tests for the raw LangGraph -> typed AgentStreamChunk conversion +// (the parsing that previously lived inside TurnStreamConsumer). + +describe('parseRawStreamChunk', () => { + it('extracts text deltas from model-node messages', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ content: [{ type: 'text', text: 'Hi' }] }, { langgraph_node: 'model' }], + ]); + expect(out).toEqual([{ kind: 'text', delta: 'Hi' }]); + }); + + it('emits reasoning before text within a single chunk', () => { + const out = parseRawStreamChunk([ + 'messages', + [ + { content: [{ type: 'reasoning', reasoning: 'r' }, { type: 'text', text: 't' }] }, + { langgraph_node: 'model' }, + ], + ]); + expect(out).toEqual([ + { kind: 'reasoning', delta: 'r' }, + { kind: 'text', delta: 't' }, + ]); + }); + + it('reads token.contentBlocks as well as token.content', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ contentBlocks: [{ type: 'text', text: 'B' }] }, { langgraph_node: 'model_request' }], + ]); + expect(out).toEqual([{ kind: 'text', delta: 'B' }]); + }); + + it('drops tokens emitted by non-model nodes', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ content: [{ type: 'text', text: 'x' }] }, { langgraph_node: 'tools' }], + ]); + expect(out).toEqual([]); + }); + + it('surfaces interrupts from update entries', () => { + const out = parseRawStreamChunk(['updates', { __interrupt__: [{ id: 'i1' }] }]); + expect(out).toEqual([{ kind: 'interrupt', interrupt: [{ id: 'i1' }] }]); + }); + + it('ignores non-interrupt update entries', () => { + expect(parseRawStreamChunk(['updates', { someNode: {} }])).toEqual([]); + }); + + it('returns nothing for empty content', () => { + expect( + parseRawStreamChunk(['messages', [{ content: [] }, { langgraph_node: 'model' }]]), + ).toEqual([]); + }); +}); diff --git a/tests/system_prompt.test.ts b/tests/system_prompt.test.ts index 8b3102a..2896013 100644 --- a/tests/system_prompt.test.ts +++ b/tests/system_prompt.test.ts @@ -3,7 +3,7 @@ import { appendCustomSystemPrompt, buildAgentTurnSystemPrompt, buildAgentSystemPrompt, -} from '../agent/systemPrompt.js'; +} from '../domain/systemPrompt.js'; // Characterization tests for system-prompt assembly (base prompt, per-turn additions, // and the resource/skill catalog prompt). Freezes the composed structure and the diff --git a/tests/turn_flow.test.ts b/tests/turn_flow.test.ts index 3242ae3..e005261 100644 --- a/tests/turn_flow.test.ts +++ b/tests/turn_flow.test.ts @@ -1,98 +1,92 @@ -import { AgentTurnService } from '../agentTurnService.js'; -import { TurnStreamConsumer } from '../agent/turn/TurnStreamConsumer.js'; +import { RunTurnUseCase, buildResumeValue } from '../application/runTurnUseCase.js'; +import type { AgentStreamChunk } from '../domain/turnTypes.js'; // Characterization tests for the turn-orchestration flow (the layer between the HTTP -// endpoint and the LLM). We drive the REAL AgentTurnService + REAL TurnStreamConsumer, -// injecting fakes only for the outer collaborators (lifecycle / context / mode / model / -// prompt / runtime). The fake runtime yields a scripted LangGraph-shaped stream, so we -// freeze the emitted AgentEvent sequence, persistence, error/abort handling, and the -// HITL approve→resume path WITHOUT spinning up a real LLM. +// endpoint and the LLM). We drive the REAL RunTurnUseCase, faking only the two true +// boundaries: the LlmPort (a scripted typed stream) and the session repository. This +// freezes the emitted AgentEvent sequence, persistence, error/abort handling, and the +// HITL approve -> resume path without a real LLM or database. -async function* streamOf(...chunks: any[]) { +async function* streamOf(...chunks: AgentStreamChunk[]) { for (const chunk of chunks) { yield chunk; } } -// Shapes the real TurnStreamConsumer expects on the ['messages', ...] / ['updates', ...] stream. -function textChunk(text: string) { - return ['messages', [{ content: [{ type: 'text', text }] }, { langgraph_node: 'model' }]]; -} -function reasoningChunk(text: string) { - return ['messages', [{ content: [{ type: 'reasoning', reasoning: text }] }, { langgraph_node: 'model' }]]; -} -function interruptChunk(interrupts: unknown) { - return ['updates', { __interrupt__: interrupts }]; -} +const text = (delta: string): AgentStreamChunk => ({ kind: 'text', delta }); +const reasoning = (delta: string): AgentStreamChunk => ({ kind: 'reasoning', delta }); +const interrupt = (value: unknown): AgentStreamChunk => ({ kind: 'interrupt', interrupt: value }); -type BuildOpts = { - streamFor?: (callIndex: number, input: any) => AsyncIterable; - resumeInitialResponse?: string; -}; - -function buildService(opts: BuildOpts = {}) { - const runtimeCalls: any[] = []; - const finishCalls: any[] = []; - const startCalls: any[] = []; - const resumeCalls: any[] = []; - - const lifecycle = { - async start(input: any) { - startCalls.push(input); - return { turnId: 'turn-1', previousUserMessages: [] }; +function fakeLlm( + opts: { + streamFor?: (call: number, input: any) => AsyncIterable; + pendingInterrupts?: unknown[]; + } = {}, +) { + const calls: any[] = []; + const getPendingInterruptsCalls: any[] = []; + return { + calls, + getPendingInterruptsCalls, + async streamTurn(input: any) { + calls.push(input); + const factory = opts.streamFor ?? (() => streamOf()); + return factory(calls.length, input); }, - async resume(input: any) { - resumeCalls.push(input); - return { - turnId: 'turn-1', - previousUserMessages: [], - initialResponse: opts.resumeInitialResponse ?? '', - }; + async detectLanguage() { + return null; }, - async finish(payload: any) { - finishCalls.push(payload); + async getPendingInterrupts(input: any) { + getPendingInterruptsCalls.push(input); + return opts.pendingInterrupts ?? []; }, }; +} - const contextBuilder = { - async build({ base, turnId }: any) { - return { - adminUser: base.adminUser, - sessionId: base.sessionId, - turnId, - abortSignal: base.abortSignal, - userTimeZone: base.userTimeZone ?? 'UTC', - }; - }, +function fakeSessions(opts: { initialResponse?: string } = {}) { + const calls = { + createNewTurn: [] as any[], + touchSession: [] as string[], + saveTurnResponse: [] as any[], + getResumeState: 0, }; - - const modeResolver = { resolve: () => ({ name: 'default', completionAdapter: {} }) }; - const modelFactory = { - async create() { - return { model: {}, summaryModel: {}, modelMiddleware: [] }; + return { + calls, + async getPreviousUserMessages() { + return []; }, - }; - const promptBuilder = { async build() { return []; } }; - - const runtime = { - async stream(input: any) { - runtimeCalls.push(input); - const factory = opts.streamFor ?? (() => streamOf()); - return factory(runtimeCalls.length, input); + async createNewTurn(sessionId: string, prompt: string) { + calls.createNewTurn.push({ sessionId, prompt }); + return 'turn-1'; + }, + async touchSession(sessionId: string) { + calls.touchSession.push(sessionId); + }, + async getResumeState() { + calls.getResumeState += 1; + return { turnId: 'turn-1', initialResponse: opts.initialResponse ?? '' }; + }, + async saveTurnResponse(payload: any) { + calls.saveTurnResponse.push(payload); }, }; +} - const service = new AgentTurnService( - lifecycle as any, - contextBuilder as any, - modeResolver as any, - modelFactory as any, - promptBuilder as any, - runtime as any, - new TurnStreamConsumer() as any, - ); - - return { service, runtimeCalls, finishCalls, startCalls, resumeCalls }; +function buildUseCase(opts: { + streamFor?: (call: number, input: any) => AsyncIterable; + initialResponse?: string; + pendingInterrupts?: unknown[]; +} = {}) { + const llm = fakeLlm({ streamFor: opts.streamFor, pendingInterrupts: opts.pendingInterrupts }); + const sessions = fakeSessions({ initialResponse: opts.initialResponse }); + const useCase = new RunTurnUseCase({ + llm: llm as any, + sessions: sessions as any, + modes: [{ name: 'default', completionAdapter: {} as any }], + getAdminforth: () => ({ config: { auth: { usernameField: 'email' }, baseUrlSlashed: '/admin/' } }) as any, + getAgentSystemPrompt: async () => 'SYS', + }); + return { useCase, llm, sessions }; } function makeInput(overrides: Record = {}) { @@ -113,14 +107,14 @@ function makeInput(overrides: Record = {}) { return { input, events }; } -describe('adminforth-agent turn flow (AgentTurnService.handleTurn)', () => { +describe('RunTurnUseCase.handleTurn', () => { it('streams a text turn, emits response/finish, and persists the assembled text', async () => { - const { service, finishCalls, runtimeCalls } = buildService({ - streamFor: () => streamOf(textChunk('Hello'), textChunk(' world')), + const { useCase, llm, sessions } = buildUseCase({ + streamFor: () => streamOf(text('Hello'), text(' world')), }); const { input, events } = makeInput(); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(events.map((e) => e.type)).toEqual([ 'turn-started', @@ -136,97 +130,136 @@ describe('adminforth-agent turn flow (AgentTurnService.handleTurn)', () => { turnId: 'turn-1', }); expect(result).toMatchObject({ text: 'Hello world', turnId: 'turn-1', aborted: false, failed: false }); - expect(finishCalls[0]).toMatchObject({ turnId: 'turn-1', responseText: 'Hello world' }); - expect(runtimeCalls).toHaveLength(1); - expect('messages' in runtimeCalls[0].input).toBe(true); + expect(sessions.calls.saveTurnResponse[0]).toMatchObject({ turnId: 'turn-1', responseText: 'Hello world' }); + expect(sessions.calls.touchSession).toEqual(['s1']); + expect(llm.calls).toHaveLength(1); + expect('messages' in llm.calls[0].input).toBe(true); }); it('emits reasoning deltas but excludes reasoning from the persisted response', async () => { - const { service, finishCalls } = buildService({ - streamFor: () => streamOf(reasoningChunk('thinking...'), textChunk('Answer')), + const { useCase, sessions } = buildUseCase({ + streamFor: () => streamOf(reasoning('thinking...'), text('Answer')), }); const { input, events } = makeInput(); - await service.handleTurn(input as any); + await useCase.handleTurn(input as any); expect(events.some((e) => e.type === 'reasoning-delta' && e.delta === 'thinking...')).toBe(true); expect(events.find((e) => e.type === 'response').text).toBe('Answer'); - expect(finishCalls[0].responseText).toBe('Answer'); + expect(sessions.calls.saveTurnResponse[0].responseText).toBe('Answer'); }); it('turns an LLM failure into an error event and persists the message as the response (current behavior)', async () => { - const { service, finishCalls } = buildService({ + const { useCase, sessions } = buildUseCase({ streamFor: () => { throw new Error('llm exploded'); }, }); const { input, events } = makeInput(); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(events.map((e) => e.type)).toEqual(['turn-started', 'error', 'finish']); expect(result.failed).toBe(true); expect(result.aborted).toBe(false); expect(result.text).toContain('llm exploded'); expect(events.find((e) => e.type === 'error').error).toContain('llm exploded'); - expect(finishCalls[0].responseText).toContain('llm exploded'); + expect(sessions.calls.saveTurnResponse[0].responseText).toContain('llm exploded'); }); it('handles a client abort without emitting response/error and still finalizes the turn', async () => { const controller = new AbortController(); controller.abort(); - const { service, finishCalls } = buildService({ - streamFor: () => streamOf(textChunk('partial')), + const { useCase, sessions } = buildUseCase({ + streamFor: () => streamOf(text('partial')), }); const { input, events } = makeInput({ abortSignal: controller.signal }); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(result.aborted).toBe(true); expect(result.failed).toBe(false); expect(events.map((e) => e.type)).toEqual(['turn-started', 'finish']); - expect(finishCalls).toHaveLength(1); + expect(sessions.calls.saveTurnResponse).toHaveLength(1); }); - it('emits a HITL interrupt and resumes via a langgraph Command on approval', async () => { - const { service, runtimeCalls, resumeCalls } = buildService({ - resumeInitialResponse: 'prev ', - streamFor: (callIndex) => - callIndex === 1 + it('emits a HITL interrupt and resumes via a provider-agnostic resume payload on approval', async () => { + const { useCase, llm, sessions } = buildUseCase({ + initialResponse: 'prev ', + streamFor: (call) => + call === 1 ? streamOf( - interruptChunk([ + interrupt([ { id: 'int-1', value: { actionRequests: [{ name: 'delete_record', description: 'Delete row' }] } }, ]), ) - : streamOf(textChunk('Done')), + : streamOf(text('Done')), }); const first = makeInput(); - await service.handleTurn(first.input as any); + await useCase.handleTurn(first.input as any); expect(first.events.map((e) => e.type)).toEqual(['turn-started', 'interrupt', 'response', 'finish']); expect(first.events.find((e) => e.type === 'interrupt').sessionId).toBe('s1'); const second = makeInput({ prompt: '', approvalDecision: 'approve' }); - const result = await service.handleTurn(second.input as any); + const result = await useCase.handleTurn(second.input as any); - expect(resumeCalls).toHaveLength(1); - expect(runtimeCalls).toHaveLength(2); - const resumeInput = runtimeCalls[1].input; - expect('messages' in resumeInput).toBe(false); - expect(resumeInput.constructor.name).toBe('Command'); + expect(sessions.calls.getResumeState).toBe(1); + expect(llm.calls).toHaveLength(2); + expect('messages' in llm.calls[1].input).toBe(false); + expect('resume' in llm.calls[1].input).toBe(true); // initialResponse ('prev ') is prepended to the resumed streamed text. expect(result.text).toBe('prev Done'); expect(second.events.find((e) => e.type === 'response').text).toBe('prev Done'); }); + it('rebuilds pending interrupts from persisted state when the in-process cache is empty (restart)', async () => { + const { useCase, llm } = buildUseCase({ + initialResponse: 'prev ', + pendingInterrupts: [{ id: 'int-1', value: { actionRequests: [{ name: 'del', description: 'd' }] } }], + streamFor: () => streamOf(text('Resumed')), + }); + // No interrupt was cached in this process (simulating a restart); approval must still resume. + const { input } = makeInput({ prompt: '', approvalDecision: 'approve' }); + const result = await useCase.handleTurn(input as any); + + expect(llm.getPendingInterruptsCalls).toHaveLength(1); + expect(llm.calls).toHaveLength(1); + expect('resume' in llm.calls[0].input).toBe(true); + expect(result.text).toBe('prev Resumed'); + }); + it('rejects (does not swallow) an approval with no pending interrupt, after emitting turn-started', async () => { - const { service } = buildService(); + const { useCase } = buildUseCase(); const { input, events } = makeInput({ prompt: '', approvalDecision: 'approve' }); // prepareTurn() runs BEFORE the try/catch in runAndPersistAgentResponse, so — unlike an // LLM failure during streaming — this error propagates out of handleTurn instead of // becoming a `failed` result. turn-started has already been emitted by then. - await expect(service.handleTurn(input as any)).rejects.toThrow('No pending approval interrupt'); + await expect(useCase.handleTurn(input as any)).rejects.toThrow('No pending approval interrupt'); expect(events.map((e) => e.type)).toEqual(['turn-started']); }); }); + +describe('buildResumeValue', () => { + it('fans a single approval across the recorded interrupt count', () => { + expect(buildResumeValue({ decision: 'approve', interrupts: [{ id: 'a', count: 2 }] })).toEqual({ + decisions: [{ type: 'approve' }, { type: 'approve' }], + }); + }); + + it('keys resume payloads by interrupt id when there are multiple', () => { + const value = buildResumeValue({ + decision: 'reject', + interrupts: [{ id: 'a', count: 1 }, { id: 'b', count: 1 }], + prompt: 'do this instead', + }) as Record; + expect(Object.keys(value)).toEqual(['a', 'b']); + expect(value.a.decisions[0]).toMatchObject({ type: 'reject' }); + expect(value.a.decisions[0].message).toContain('do this instead'); + }); + + it('throws when there is no interrupt to resume', () => { + expect(() => buildResumeValue({ decision: 'approve', interrupts: [] })).toThrow('No pending approval interrupt'); + }); +}); diff --git a/tests/units.test.ts b/tests/units.test.ts index dcfbed3..d4d0c15 100644 --- a/tests/units.test.ts +++ b/tests/units.test.ts @@ -1,9 +1,8 @@ -import { VegaLiteStreamBuffer } from '../agent/turn/VegaLiteStreamBuffer.js'; -import { isAbortError, getErrorMessage } from '../errors.js'; -import { sanitizeSpeechText } from '../sanitizeSpeechText.js'; -import { AgentModeResolver } from '../agent/models/AgentModeResolver.js'; -import { detectUserLanguage } from '../agent/languageDetect.js'; -import { createToolCallTracker } from '../agent/toolCallEvents.js'; +import { VegaLiteStreamBuffer } from '../domain/vegaLiteStreamBuffer.js'; +import { isAbortError, getErrorMessage } from '../shared/errors.js'; +import { sanitizeSpeechText } from '../shared/sanitizeSpeechText.js'; +import { detectUserLanguage } from '../domain/languageDetect.js'; +import { createToolCallTracker } from '../domain/toolCallEvents.js'; // Characterization tests for the small, deterministic building blocks used across the // main flows: the vega-lite streaming buffer, error helpers, speech-text sanitizer, @@ -91,26 +90,6 @@ describe('sanitizeSpeechText', () => { }); }); -describe('AgentModeResolver', () => { - const options = { - modes: [ - { name: 'A', completionAdapter: {} }, - { name: 'B', completionAdapter: {} }, - ], - } as any; - - it('resolves a mode by name', () => { - expect(new AgentModeResolver(options).resolve('B').name).toBe('B'); - }); - - it('falls back to the first mode for unknown / missing names', () => { - const resolver = new AgentModeResolver(options); - expect(resolver.resolve('nope').name).toBe('A'); - expect(resolver.resolve(undefined).name).toBe('A'); - expect(resolver.resolve(null).name).toBe('A'); - }); -}); - describe('detectUserLanguage', () => { it('parses the structured completion output', async () => { let captured: any; diff --git a/agent/tools/AgentToolProvider.ts b/tools/agentToolProvider.ts similarity index 86% rename from agent/tools/AgentToolProvider.ts rename to tools/agentToolProvider.ts index e15f3d8..ec62140 100644 --- a/agent/tools/AgentToolProvider.ts +++ b/tools/agentToolProvider.ts @@ -1,6 +1,6 @@ import type { IAdminForth } from "adminforth"; -import { prepareApiBasedTools } from "../../apiBasedTools.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import { prepareApiBasedTools } from "./apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; import { createAgentTools } from "./index.js"; export class AgentToolProvider { diff --git a/apiBasedTools.ts b/tools/apiBasedTools.ts similarity index 100% rename from apiBasedTools.ts rename to tools/apiBasedTools.ts diff --git a/agent/tools/apiTool.ts b/tools/apiTool.ts similarity index 96% rename from agent/tools/apiTool.ts rename to tools/apiTool.ts index 4a97af5..5e57e01 100644 --- a/agent/tools/apiTool.ts +++ b/tools/apiTool.ts @@ -1,5 +1,5 @@ import { tool } from "langchain"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; const emptyToolSchema = { type: "object", diff --git a/agent/tools/fetchSkill.ts b/tools/fetchSkill.ts similarity index 98% rename from agent/tools/fetchSkill.ts rename to tools/fetchSkill.ts index 43d6dbf..4f7fa28 100644 --- a/agent/tools/fetchSkill.ts +++ b/tools/fetchSkill.ts @@ -4,7 +4,7 @@ import { listSkillManifests, loadSkillMarkdown, type AgentSkillManifest, -} from "../skills/registry.js"; +} from "./skills/registry.js"; const fetchSkillSchema = z.object({ skillName: z diff --git a/agent/tools/fetchToolSchema.ts b/tools/fetchToolSchema.ts similarity index 94% rename from agent/tools/fetchToolSchema.ts rename to tools/fetchToolSchema.ts index 97cca79..72c031f 100644 --- a/agent/tools/fetchToolSchema.ts +++ b/tools/fetchToolSchema.ts @@ -1,6 +1,6 @@ import { tool } from "langchain"; import { z } from "zod"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; const fetchToolSchemaSchema = z.object({ toolName: z diff --git a/agent/tools/getUserLocation.ts b/tools/getUserLocation.ts similarity index 100% rename from agent/tools/getUserLocation.ts rename to tools/getUserLocation.ts diff --git a/agent/tools/index.ts b/tools/index.ts similarity index 94% rename from agent/tools/index.ts rename to tools/index.ts index efac313..0b2e894 100644 --- a/agent/tools/index.ts +++ b/tools/index.ts @@ -1,7 +1,7 @@ import type { ClientTool } from "@langchain/core/tools"; import { createFetchSkillTool } from "./fetchSkill.js"; import { createFetchToolSchemaTool } from "./fetchToolSchema.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; import { createApiTool } from "./apiTool.js"; import { createGetUserLocationTool } from "./getUserLocation.js"; import { createNavigateUserTool } from "./navigateUser.js"; diff --git a/agent/tools/navigateUser.ts b/tools/navigateUser.ts similarity index 98% rename from agent/tools/navigateUser.ts rename to tools/navigateUser.ts index da4da55..8960fd2 100644 --- a/agent/tools/navigateUser.ts +++ b/tools/navigateUser.ts @@ -1,7 +1,7 @@ import { tool } from "langchain"; import { z } from "zod"; import type { CurrentPageContext } from "./getUserLocation.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "../domain/agentEvents.js"; const filterSchema = z.object({ column: z.string().min(1).describe("Resource column name."), diff --git a/agent/skills/registry.ts b/tools/skills/registry.ts similarity index 100% rename from agent/skills/registry.ts rename to tools/skills/registry.ts diff --git a/endpoints/chatSurfaces.ts b/transport/http/chatSurfaceEndpoints.ts similarity index 100% rename from endpoints/chatSurfaces.ts rename to transport/http/chatSurfaceEndpoints.ts diff --git a/endpoints/context.ts b/transport/http/context.ts similarity index 94% rename from endpoints/context.ts rename to transport/http/context.ts index 95ba626..6d5d69c 100644 --- a/endpoints/context.ts +++ b/transport/http/context.ts @@ -9,8 +9,8 @@ import type { HandleTurnInput, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, -} from "../agentTurnService.js"; -import type { PluginOptions } from "../types.js"; +} from "../../domain/turnTypes.js"; +import type { PluginOptions } from "../../types.js"; export type SessionTurn = { prompt: string; diff --git a/endpoints/core.ts b/transport/http/coreEndpoints.ts similarity index 94% rename from endpoints/core.ts rename to transport/http/coreEndpoints.ts index ec4f466..ae7724b 100644 --- a/endpoints/core.ts +++ b/transport/http/coreEndpoints.ts @@ -1,7 +1,7 @@ import type { IHttpServer } from "adminforth"; import { z } from "zod"; -import { createSseEventEmitter } from "../surfaces/web-sse/createSseEventEmitter.js"; -import type { CurrentPageContext } from "../agent/tools/getUserLocation.js"; +import { createSseEventEmitter } from "../sse/sseWriter.js"; +import type { CurrentPageContext } from "../../tools/getUserLocation.js"; import type { CoreEndpointsContext } from "./context.js"; type MulterFile = { @@ -86,7 +86,7 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe method: 'POST', path: `/agent/response`, request_schema: agentResponseBodySchema, - handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + handler: async ({ body, adminUser, _raw_express_res, abortSignal }) => { const data = body as z.infer; const emit = createSseEventEmitter(_raw_express_res, { vercelAiUiMessageStream: true, @@ -113,7 +113,7 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe method: 'POST', path: `/agent/approval`, request_schema: agentApprovalBodySchema, - handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + handler: async ({ body, adminUser, _raw_express_res, abortSignal }) => { const data = body as z.infer; const emit = createSseEventEmitter(_raw_express_res, { vercelAiUiMessageStream: true, diff --git a/endpoints/sessions.ts b/transport/http/sessionEndpoints.ts similarity index 94% rename from endpoints/sessions.ts rename to transport/http/sessionEndpoints.ts index cda48b9..4e0d433 100644 --- a/endpoints/sessions.ts +++ b/transport/http/sessionEndpoints.ts @@ -2,7 +2,7 @@ import type { IHttpServer } from "adminforth"; import { Filters, Sorts } from "adminforth"; import { randomUUID } from "crypto"; import { z } from "zod"; -import { AGENT_SYSTEM_TURN_PROMPT } from "../sessionStore.js"; +import { AGENT_SYSTEM_TURN_PROMPT } from "../../persistence/sessionStore.js"; import type { SessionEndpointsContext } from "./context.js"; const addSystemMessageBodySchema = z.object({ @@ -56,11 +56,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== userId) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; @@ -135,11 +137,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== userId) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; @@ -165,11 +169,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, data.sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== adminUser!.pk) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; diff --git a/surfaces/web-sse/createSseEventEmitter.ts b/transport/sse/sseWriter.ts similarity index 86% rename from surfaces/web-sse/createSseEventEmitter.ts rename to transport/sse/sseWriter.ts index 9205c30..d879837 100644 --- a/surfaces/web-sse/createSseEventEmitter.ts +++ b/transport/sse/sseWriter.ts @@ -1,7 +1,7 @@ import { randomUUID } from "crypto"; -import type { AgentEvent, AgentEventEmitter } from "../../agentEvents.js"; -import type { ToolCallEvent } from "../../agent/toolCallEvents.js"; +import type { AgentEvent, AgentEventEmitter } from "../../domain/agentEvents.js"; +import type { ToolCallEvent } from "../../domain/toolCallEvents.js"; type AgentEventStreamResponse = { writeHead: (statusCode: number, headers: Record) => void; @@ -247,6 +247,21 @@ function createAgentEventStream( return stream; } +/** + * Maps the internal {@link AgentEvent} vocabulary onto the two Server-Sent-Events + * wire dialects the frontend consumes. Both dialects are HARD external contracts — + * do not change frame names/shapes without updating the corresponding consumer: + * + * - `vercelAiUiMessageStream: true` (used by `/agent/response` and `/agent/approval`): + * the Vercel AI SDK message-stream v1 format (`start`, `text-start`/`text-delta`/ + * `text-end`, `reasoning-*`, `data-*` parts, `error` with `errorText`, `finish`, + * `[DONE]`). Consumed by `DefaultChatTransport` and the manual approval parser in + * `custom/composables/agentStore/useAgentChat.ts`. + * - default (used by `/agent/speech-response`): bare event names + * (`transcript`, `speech-response`, `audio-start`/`audio-delta`/`audio-done`, + * `open-page`, `error`) plus the Vercel-shaped `data-tool-call`. Consumed by + * `custom/composables/useAgentAudio.ts`. + */ export function createSseEventEmitter( res: AgentEventStreamResponse, options: AgentEventStreamOptions = {}, diff --git a/chatSurfaceService.ts b/transport/surfaces/chatSurfaceService.ts similarity index 92% rename from chatSurfaceService.ts rename to transport/surfaces/chatSurfaceService.ts index 7b28e9a..8dc12fa 100644 --- a/chatSurfaceService.ts +++ b/transport/surfaces/chatSurfaceService.ts @@ -6,16 +6,16 @@ import type { IAdminForth, } from "adminforth"; import { Filters, logger } from "adminforth"; -import type { AgentEventEmitter } from "./agentEvents.js"; +import type { AgentEventEmitter } from "../../domain/agentEvents.js"; import type { HandleTurnInput, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, -} from "./agentTurnService.js"; -import { getErrorMessage, isAbortError } from "./errors.js"; -import type { AgentSessionStore } from "./sessionStore.js"; -import { sanitizeSpeechText } from "./sanitizeSpeechText.js"; -import type { PluginOptions } from "./types.js"; +} from "../../domain/turnTypes.js"; +import { getErrorMessage, isAbortError } from "../../shared/errors.js"; +import type { AgentSessionStore } from "../../persistence/sessionStore.js"; +import { sanitizeSpeechText } from "../../shared/sanitizeSpeechText.js"; +import type { PluginOptions } from "../../types.js"; type ChatSurfaceIncomingMessageWithAudio = ChatSurfaceIncomingMessage & { audio?: { @@ -64,9 +64,11 @@ export class ChatSurfaceService { } if (event.type === "error") { + // Full error detail is logged server-side (see runAndPersistAgentResponse); + // external chat users receive a generic message to avoid leaking internals. await sink.emit({ type: "error", - message: event.error, + message: "Agent response failed. Check server logs for details.", }); } }; @@ -171,7 +173,7 @@ export class ChatSurfaceService { logger.error(`Agent ${incoming.surface} surface speech synthesis failed:\n${getErrorMessage(error)}`); await sink.emit({ type: "error", - message: getErrorMessage(error), + message: "Speech synthesis failed. Check server logs for details.", }); } } @@ -224,7 +226,7 @@ export class ChatSurfaceService { if (agentResponse.failed) { await sink.emit({ type: "error", - message: agentResponse.text, + message: "Agent response failed. Check server logs for details.", }); } diff --git a/agent/speech/SpeechTurnService.ts b/transport/surfaces/speechTurnService.ts similarity index 96% rename from agent/speech/SpeechTurnService.ts rename to transport/surfaces/speechTurnService.ts index 35a2618..66b096d 100644 --- a/agent/speech/SpeechTurnService.ts +++ b/transport/surfaces/speechTurnService.ts @@ -1,11 +1,11 @@ import { logger } from "adminforth"; -import { getErrorMessage, isAbortError } from "../../errors.js"; -import { sanitizeSpeechText } from "../../sanitizeSpeechText.js"; +import { getErrorMessage, isAbortError } from "../../shared/errors.js"; +import { sanitizeSpeechText } from "../../shared/sanitizeSpeechText.js"; import type { RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, SpeechAgentTurnInput, -} from "../turn/turnTypes.js"; +} from "../../domain/turnTypes.js"; export class SpeechTurnService { constructor( diff --git a/types.ts b/types.ts index 4559c66..da1afff 100644 --- a/types.ts +++ b/types.ts @@ -5,13 +5,18 @@ import { type AudioAdapter, type ChatSurfaceAdapter, } from "adminforth"; -import type { AgentModeCompletionAdapter } from "./agent/simpleAgent.js"; +import type { AgentModeCompletionAdapter } from "./llm/agentModels.js"; interface ISessionResource { resourceId: string; idField: string; titleField: string; - turnsField: string; + /** + * @deprecated Not used by the plugin — session turns are looked up via + * `turnResource.sessionIdField`. Kept optional for backward compatibility; + * will be removed in a future major version. + */ + turnsField?: string; askerIdField: string; createdAtField: string; } @@ -85,8 +90,9 @@ export interface PluginOptions extends PluginsCommonOptions { systemPrompt?: string; /** - * Response generation level. - * Default is low + * @deprecated Not applied — reasoning effort is configured on the completion + * adapter (e.g. `extraRequestBodyParameters.reasoning.effort`), not here. + * Kept for backward compatibility; will be removed in a future major version. */ reasoning?: "none" | "minimal" | "low" | "medium" | "high" | "xhigh"; From 3de87ee3e29541bfb241db5195b6e0bce2fee24d Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 2 Jul 2026 16:34:21 +0300 Subject: [PATCH 2/3] refactor: enhance interrupt handling and debug tracing in agent flow --- application/ports.ts | 31 +++++++++++---- application/runTurnUseCase.ts | 73 ++++++++++++++++------------------- domain/turnTypes.ts | 53 +++++++++---------------- index.ts | 3 ++ llm/agentModels.ts | 30 +++++++------- llm/agentRuntime.ts | 13 ++++--- llm/langGraphLlm.ts | 12 +++--- llm/modelFactory.ts | 3 +- llm/streamAdapter.ts | 31 ++++++++++++++- tests/stream_adapter.test.ts | 18 ++++++++- tests/turn_flow.test.ts | 42 ++++++++++++++++---- types.ts | 2 +- 12 files changed, 190 insertions(+), 121 deletions(-) diff --git a/application/ports.ts b/application/ports.ts index 02cb4da..b850d0b 100644 --- a/application/ports.ts +++ b/application/ports.ts @@ -1,12 +1,27 @@ -import type { AgentModeCompletionAdapter } from "../llm/agentModels.js"; +import type { CompletionAdapter } from "adminforth"; import type { DetectedLanguage, PreviousUserMessage } from "../domain/languageDetect.js"; import type { AgentMessage, AgentStreamChunk, AgentTurnContext, AgentTurnObservability, + PendingInterrupt, } from "../domain/turnTypes.js"; +export type AgentModelPurpose = "primary" | "summary"; + +/** + * The LLM provider adapter contract (the "provider port"). Extends AdminForth's + * CompletionAdapter with the agent-spec factory. Declared in the application layer + * so the infrastructure (llm/) depends on this contract — not the other way around. + */ +export type AgentModeCompletionAdapter = CompletionAdapter & { + getLangChainAgentSpec(params: { + maxTokens: number; + purpose: AgentModelPurpose; + }): Promise<{ model: unknown; middleware?: unknown[] }> | { model: unknown; middleware?: unknown[] }; +}; + /** * Input for a single streamed agent turn. Either a fresh set of messages, or a * human-in-the-loop resume payload for a previously interrupted turn. @@ -21,9 +36,8 @@ export type LlmStreamInput = { /** * The boundary between the application layer and the concrete LLM runtime * (LangChain / LangGraph). Everything provider-specific — model construction, - * the agent graph, middleware, and the raw stream shape — lives behind this - * port. The application layer depends only on this interface, so the turn - * orchestration is testable with a scripted fake and reusable across providers. + * the agent graph, middleware, the raw stream shape, and the LangGraph interrupt + * shape — lives behind this port and is normalized before crossing it. */ export interface LlmPort { /** @@ -42,12 +56,13 @@ export interface LlmPort { /** * Rebuild the pending human-in-the-loop interrupts for a session from persisted - * state (the checkpointer). Used as a fallback when the in-process interrupt - * cache is empty — e.g. after a restart or on a second instance — so approvals - * survive process boundaries. Returns raw interrupt objects (LangGraph shape). + * checkpoint state (used when the in-process cache is empty — after a restart or + * on another instance). Returns already-normalized descriptors; the LangGraph + * interrupt shape never leaves the llm layer. Throws on a checkpoint/runtime + * failure (the caller must not treat that as "no pending interrupt"). */ getPendingInterrupts(input: { completionAdapter: AgentModeCompletionAdapter; sessionId: string; - }): Promise; + }): Promise; } diff --git a/application/runTurnUseCase.ts b/application/runTurnUseCase.ts index 8492920..d841985 100644 --- a/application/runTurnUseCase.ts +++ b/application/runTurnUseCase.ts @@ -1,6 +1,5 @@ import { logger, type IAdminForth } from "adminforth"; import { randomUUID } from "crypto"; -import { createSequenceDebugCollector } from "../llm/middleware/sequenceDebug.js"; import { VegaLiteStreamBuffer } from "../domain/vegaLiteStreamBuffer.js"; import { buildAgentTurnSystemPrompt } from "../domain/systemPrompt.js"; import { getErrorMessage, isAbortError } from "../shared/errors.js"; @@ -13,15 +12,15 @@ import type { AgentTurnContext, AgentTurnObservability, BaseAgentTurnInput, + DebugSink, HandleTurnInput, + PendingInterrupt, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, } from "../domain/turnTypes.js"; type AgentMode = PluginOptions["modes"][number]; -type PendingInterrupt = { id: string; count: number }; - type PreparedTurn = { prompt: string; sessionId: string; @@ -41,28 +40,6 @@ function getApprovalDecision(input: BaseAgentTurnInput) { : undefined; } -function getInterruptItems(interrupt: unknown): unknown[] { - return Array.isArray(interrupt) ? interrupt : [interrupt]; -} - -function getHitlInterrupts(interrupt: unknown): PendingInterrupt[] { - return getInterruptItems(interrupt).flatMap((item) => { - const value = item && typeof item === "object" && "value" in item - ? (item as { value: unknown }).value - : item; - const actionRequests = value && typeof value === "object" - ? (value as { actionRequests?: unknown }).actionRequests - : undefined; - const interruptId = item && typeof item === "object" - ? (item as { id?: unknown }).id - : undefined; - - return typeof interruptId === "string" && Array.isArray(actionRequests) - ? [{ id: interruptId, count: actionRequests.length }] - : []; - }); -} - function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { if (decision === "approve") { return { type: "approve" as const }; @@ -129,6 +106,10 @@ export type RunTurnUseCaseDeps = { modes: PluginOptions["modes"]; getAdminforth: () => IAdminForth; getAgentSystemPrompt: () => Promise; + /** True when a persistent checkpointer is configured (not the in-memory MemorySaver). */ + hasPersistentCheckpointer: boolean; + /** Factory for the per-turn debug sink; the concrete impl lives in the llm layer. */ + createDebugSink: () => DebugSink; }; /** @@ -161,23 +142,32 @@ export class RunTurnUseCase { } /** - * Resolve the pending HITL interrupts for a resume. Prefers the in-process cache - * (populated when the interrupt fired this run) and falls back to the persisted - * checkpoint via the LLM port when the cache is empty (restart / other instance). + * Resolve the pending HITL interrupts for a resume. With a persistent checkpointer the + * checkpoint is authoritative (correct across instances and restarts); with the in-memory + * MemorySaver the in-process cache is authoritative (single process, cannot be stale). */ private async resolvePendingInterrupts(sessionId: string, mode: AgentMode): Promise { - const cached = this.pendingInterrupts.get(sessionId); - if (cached && cached.length > 0) { - return cached; + if (this.deps.hasPersistentCheckpointer) { + // The in-process cache can be stale after a cross-instance resume, so it is intentionally + // NOT consulted here. A failure to read the checkpoint is surfaced as a real error — never + // masked as "no pending approval". + try { + return await this.deps.llm.getPendingInterrupts({ + completionAdapter: mode.completionAdapter, + sessionId, + }); + } catch (error) { + logger.error( + `Failed to read pending approval state from checkpoint for session "${sessionId}": ${getErrorMessage(error)}`, + ); + throw error; + } } - const raw = await this.deps.llm - .getPendingInterrupts({ completionAdapter: mode.completionAdapter, sessionId }) - .catch(() => [] as unknown[]); - return getHitlInterrupts(raw); + return this.pendingInterrupts.get(sessionId) ?? []; } private async prepareTurn(input: RunAndPersistAgentResponseInput): Promise { - const sequenceDebugSink = createSequenceDebugCollector(); + const sequenceDebugSink = this.deps.createDebugSink(); const mode = this.resolveMode(input.modeName); const approvalDecision = getApprovalDecision(input); const shouldResume = Boolean(approvalDecision); @@ -249,11 +239,14 @@ export class RunTurnUseCase { ]; } - private async handleInterrupt(prepared: PreparedTurn, interrupt: unknown) { - const interrupts = getHitlInterrupts(interrupt); + private async handleInterrupt( + prepared: PreparedTurn, + interrupt: unknown, + descriptors: PendingInterrupt[], + ) { const existing = this.pendingInterrupts.get(prepared.sessionId) ?? []; const merged = new Map(existing.map((item) => [item.id, item.count])); - for (const item of interrupts) { + for (const item of descriptors) { merged.set(item.id, item.count); } this.pendingInterrupts.set( @@ -299,7 +292,7 @@ export class RunTurnUseCase { if (chunk.kind === "interrupt") { interrupted = true; - await this.handleInterrupt(prepared, chunk.interrupt); + await this.handleInterrupt(prepared, chunk.interrupt, chunk.descriptors); continue; } diff --git a/domain/turnTypes.ts b/domain/turnTypes.ts index 1febba4..10956ce 100644 --- a/domain/turnTypes.ts +++ b/domain/turnTypes.ts @@ -1,12 +1,25 @@ import type { AdminUser, AudioAdapter } from "adminforth"; -import type { Messages } from "@langchain/langgraph"; -import type { Command } from "@langchain/langgraph"; -import type { AgentChatModel, AgentMiddleware } from "../llm/agentModels.js"; -import type { SequenceDebugCollector } from "../llm/middleware/sequenceDebug.js"; import type { PreviousUserMessage } from "./languageDetect.js"; import type { CurrentPageContext } from "../tools/getUserLocation.js"; import type { AgentEventEmitter } from "./agentEvents.js"; +/** + * Minimal sink the application layer uses to persist per-turn debug traces. The + * concrete implementation (the sequence-debug collector) lives in the llm layer; + * the domain/application layers depend only on this narrow interface, so the + * dependency direction stays llm -> application/domain. + */ +export type DebugSink = { + flush(): void; + getHistory(): unknown[]; +}; + +/** A pending human-in-the-loop approval, normalized (provider-agnostic). */ +export type PendingInterrupt = { + id: string; + count: number; +}; + export type BaseAgentTurnInput = { prompt: string; sessionId: string; @@ -51,35 +64,7 @@ export type AgentTurnContext = { export type AgentTurnObservability = { emit?: AgentEventEmitter; - sequenceDebugSink: SequenceDebugCollector; -}; - -export type PreparedAgentTurn = { - prompt: string; - sessionId: string; - turnId: string; - previousUserMessages: PreviousUserMessage[]; - modeName?: string | null; - context: AgentTurnContext; - observability: AgentTurnObservability; - resume?: { - decision: "approve" | "reject"; - interrupts?: { id: string; count: number }[]; - }; - initialResponse?: string; -}; - -export type AgentTurnModels = { - model: AgentChatModel; - summaryModel: AgentChatModel; - modelMiddleware?: AgentMiddleware[]; -}; - -export type AgentRuntimeRunInput = { - models: AgentTurnModels; - input: { messages: Messages } | Command; - context: AgentTurnContext; - observability: AgentTurnObservability; + sequenceDebugSink: DebugSink; }; export type RunAndPersistAgentResponseInput = BaseAgentTurnInput & { @@ -116,4 +101,4 @@ export type AgentMessage = { export type AgentStreamChunk = | { kind: "text"; delta: string } | { kind: "reasoning"; delta: string } - | { kind: "interrupt"; interrupt: unknown }; + | { kind: "interrupt"; interrupt: unknown; descriptors: PendingInterrupt[] }; diff --git a/index.ts b/index.ts index aa6c1f8..2408c52 100644 --- a/index.ts +++ b/index.ts @@ -17,6 +17,7 @@ import type { AgentEndpointsContext } from "./transport/http/context.js"; import { AgentSessionStore } from "./persistence/sessionStore.js"; import { ChatSurfaceService } from "./transport/surfaces/chatSurfaceService.js"; import { RunTurnUseCase } from "./application/runTurnUseCase.js"; +import { createSequenceDebugCollector } from "./llm/middleware/sequenceDebug.js"; import { LangGraphLlm } from "./llm/langGraphLlm.js"; import { AgentModelFactory } from "./llm/modelFactory.js"; import { AgentRuntime } from "./llm/agentRuntime.js"; @@ -94,6 +95,8 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { modes: this.options.modes, getAdminforth: () => this.adminforth, getAgentSystemPrompt: this.getAgentSystemPrompt.bind(this), + hasPersistentCheckpointer: Boolean(this.options.checkpointResource), + createDebugSink: createSequenceDebugCollector, }); this.speechTurnService = new SpeechTurnService( this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), diff --git a/llm/agentModels.ts b/llm/agentModels.ts index 03757c8..aa456a1 100644 --- a/llm/agentModels.ts +++ b/llm/agentModels.ts @@ -5,26 +5,30 @@ import { } from "adminforth"; import { BaseCallbackHandler } from "@langchain/core/callbacks/base"; import type { LLMResult } from "@langchain/core/outputs"; +import type { Messages, Command } from "@langchain/langgraph"; import { createSequenceDebugMiddleware, } from "./middleware/sequenceDebug.js"; +import type { AgentModeCompletionAdapter, AgentModelPurpose } from "../application/ports.js"; +import type { AgentTurnContext, AgentTurnObservability } from "../domain/turnTypes.js"; + +export type { AgentModeCompletionAdapter, AgentModelPurpose } from "../application/ports.js"; export type AgentChatModel = BaseChatModel; -export type AgentModelPurpose = "primary" | "summary"; -export type AgentModeCompletionAdapter = CompletionAdapter & { - getLangChainAgentSpec(params: { - maxTokens: number; - purpose: AgentModelPurpose; - }): Promise<{ - model: unknown; - middleware?: unknown[]; - }> | { - model: unknown; - middleware?: unknown[]; - }; +export type AgentMiddleware = ReturnType; + +export type AgentTurnModels = { + model: AgentChatModel; + summaryModel: AgentChatModel; + modelMiddleware?: AgentMiddleware[]; }; -export type AgentMiddleware = ReturnType; +export type AgentRuntimeRunInput = { + models: AgentTurnModels; + input: { messages: Messages } | Command; + context: AgentTurnContext; + observability: AgentTurnObservability; +}; type AgentChatModelSpec = { model: AgentChatModel; diff --git a/llm/agentRuntime.ts b/llm/agentRuntime.ts index cb808a5..9fc7c3c 100644 --- a/llm/agentRuntime.ts +++ b/llm/agentRuntime.ts @@ -2,10 +2,10 @@ import type { IAdminForth } from "adminforth"; import { createAgent, summarizationMiddleware, humanInTheLoopMiddleware } from "langchain"; import type { BaseCheckpointSaver } from "@langchain/langgraph"; import { createApiBasedToolsMiddleware } from "./middleware/apiToolsMiddleware.js"; -import { createSequenceDebugMiddleware } from "./middleware/sequenceDebug.js"; +import { createSequenceDebugMiddleware, type SequenceDebugCollector } from "./middleware/sequenceDebug.js"; import { createAgentLlmMetricsLogger } from "./agentModels.js"; import type { AgentToolProvider } from "../tools/agentToolProvider.js"; -import type { AgentRuntimeRunInput, AgentTurnModels } from "../domain/turnTypes.js"; +import type { AgentRuntimeRunInput, AgentTurnModels } from "./agentModels.js"; import { contextSchema, toLangchainAgentContext } from "./agentContext.js"; import type { ApiBasedTool } from "../tools/apiBasedTools.js"; @@ -42,9 +42,10 @@ export class AgentRuntime { apiBasedTools, adminforth, ); - const sequenceDebugMiddleware = createSequenceDebugMiddleware( - input.observability.sequenceDebugSink, - ); + // The domain exposes only the narrow DebugSink; the concrete collector (with the + // model-call hooks the debug middleware needs) is an llm-layer detail. + const sequenceDebugSink = input.observability.sequenceDebugSink as SequenceDebugCollector; + const sequenceDebugMiddleware = createSequenceDebugMiddleware(sequenceDebugSink); const hitlMiddleware = humanInTheLoopMiddleware({ interruptOn: createHumanInTheLoopInterrupts(apiBasedTools), descriptionPrefix: "Tool execution pending approval", @@ -82,7 +83,7 @@ export class AgentRuntime { ...input.context, adminBaseUrl: adminforth.config.baseUrlSlashed, emit: input.observability.emit, - sequenceDebugSink: input.observability.sequenceDebugSink, + sequenceDebugSink, }), }); } diff --git a/llm/langGraphLlm.ts b/llm/langGraphLlm.ts index bfb2b5a..9d0f5c7 100644 --- a/llm/langGraphLlm.ts +++ b/llm/langGraphLlm.ts @@ -3,10 +3,9 @@ import { HumanMessage, SystemMessage } from "langchain"; import type { AgentRuntime } from "./agentRuntime.js"; import type { AgentModelFactory } from "./modelFactory.js"; import { detectUserLanguage, type PreviousUserMessage } from "../domain/languageDetect.js"; -import type { AgentModeCompletionAdapter } from "./agentModels.js"; -import { adaptRawStream } from "./streamAdapter.js"; -import type { LlmPort, LlmStreamInput } from "../application/ports.js"; -import type { AgentMessage } from "../domain/turnTypes.js"; +import { adaptRawStream, normalizeInterrupts } from "./streamAdapter.js"; +import type { AgentModeCompletionAdapter, LlmPort, LlmStreamInput } from "../application/ports.js"; +import type { AgentMessage, PendingInterrupt } from "../domain/turnTypes.js"; function toLangchainMessages(messages: AgentMessage[]) { return messages.map((message) => @@ -59,8 +58,9 @@ export class LangGraphLlm implements LlmPort { async getPendingInterrupts(input: { completionAdapter: AgentModeCompletionAdapter; sessionId: string; - }): Promise { + }): Promise { const models = await this.modelFactory.create(input.completionAdapter); - return this.runtime.getPendingInterrupts({ models, sessionId: input.sessionId }); + const raw = await this.runtime.getPendingInterrupts({ models, sessionId: input.sessionId }); + return normalizeInterrupts(raw); } } diff --git a/llm/modelFactory.ts b/llm/modelFactory.ts index eefc0fc..187caaa 100644 --- a/llm/modelFactory.ts +++ b/llm/modelFactory.ts @@ -1,6 +1,5 @@ import type { CompletionAdapter } from "adminforth"; -import { createAgentChatModel } from "./agentModels.js"; -import type { AgentTurnModels } from "../domain/turnTypes.js"; +import { createAgentChatModel, type AgentTurnModels } from "./agentModels.js"; export class AgentModelFactory { constructor(private readonly maxTokens: number) {} diff --git a/llm/streamAdapter.ts b/llm/streamAdapter.ts index dd5327a..260526c 100644 --- a/llm/streamAdapter.ts +++ b/llm/streamAdapter.ts @@ -1,9 +1,35 @@ -import type { AgentStreamChunk } from "../domain/turnTypes.js"; +import type { AgentStreamChunk, PendingInterrupt } from "../domain/turnTypes.js"; // Only tokens emitted by the model node(s) are surfaced to the user; other // graph nodes (tools, summarization, etc.) are internal. const STREAMABLE_NODES = ["model", "model_request"]; +function getInterruptItems(interrupt: unknown): unknown[] { + return Array.isArray(interrupt) ? interrupt : [interrupt]; +} + +/** + * Normalize raw LangGraph interrupt object(s) into provider-agnostic descriptors. + * Kept in the llm layer so the LangGraph interrupt shape never reaches the app. + */ +export function normalizeInterrupts(interrupt: unknown): PendingInterrupt[] { + return getInterruptItems(interrupt).flatMap((item) => { + const value = item && typeof item === "object" && "value" in item + ? (item as { value: unknown }).value + : item; + const actionRequests = value && typeof value === "object" + ? (value as { actionRequests?: unknown }).actionRequests + : undefined; + const interruptId = item && typeof item === "object" + ? (item as { id?: unknown }).id + : undefined; + + return typeof interruptId === "string" && Array.isArray(actionRequests) + ? [{ id: interruptId, count: actionRequests.length }] + : []; + }); +} + type RawStreamEntry = | ["messages", [any, any]] | ["updates", Record]; @@ -27,7 +53,8 @@ export function parseRawStreamChunk(entry: RawStreamEntry): AgentStreamChunk[] { if (mode === "updates") { if (chunk && typeof chunk === "object" && "__interrupt__" in chunk) { - return [{ kind: "interrupt", interrupt: (chunk as Record).__interrupt__ }]; + const interrupt = (chunk as Record).__interrupt__; + return [{ kind: "interrupt", interrupt, descriptors: normalizeInterrupts(interrupt) }]; } return []; } diff --git a/tests/stream_adapter.test.ts b/tests/stream_adapter.test.ts index cc7b004..d258fd9 100644 --- a/tests/stream_adapter.test.ts +++ b/tests/stream_adapter.test.ts @@ -42,9 +42,23 @@ describe('parseRawStreamChunk', () => { expect(out).toEqual([]); }); - it('surfaces interrupts from update entries', () => { + it('surfaces interrupts from update entries with normalized descriptors', () => { + const out = parseRawStreamChunk([ + 'updates', + { __interrupt__: [{ id: 'i1', value: { actionRequests: [{}, {}] } }] }, + ]); + expect(out).toEqual([ + { + kind: 'interrupt', + interrupt: [{ id: 'i1', value: { actionRequests: [{}, {}] } }], + descriptors: [{ id: 'i1', count: 2 }], + }, + ]); + }); + + it('yields empty descriptors when the interrupt has no actionRequests', () => { const out = parseRawStreamChunk(['updates', { __interrupt__: [{ id: 'i1' }] }]); - expect(out).toEqual([{ kind: 'interrupt', interrupt: [{ id: 'i1' }] }]); + expect(out).toEqual([{ kind: 'interrupt', interrupt: [{ id: 'i1' }], descriptors: [] }]); }); it('ignores non-interrupt update entries', () => { diff --git a/tests/turn_flow.test.ts b/tests/turn_flow.test.ts index e005261..d8134d7 100644 --- a/tests/turn_flow.test.ts +++ b/tests/turn_flow.test.ts @@ -15,12 +15,16 @@ async function* streamOf(...chunks: AgentStreamChunk[]) { const text = (delta: string): AgentStreamChunk => ({ kind: 'text', delta }); const reasoning = (delta: string): AgentStreamChunk => ({ kind: 'reasoning', delta }); -const interrupt = (value: unknown): AgentStreamChunk => ({ kind: 'interrupt', interrupt: value }); +const interrupt = ( + value: unknown, + descriptors: Array<{ id: string; count: number }> = [{ id: 'int-1', count: 1 }], +): AgentStreamChunk => ({ kind: 'interrupt', interrupt: value, descriptors }); function fakeLlm( opts: { streamFor?: (call: number, input: any) => AsyncIterable; - pendingInterrupts?: unknown[]; + pendingInterrupts?: Array<{ id: string; count: number }>; + pendingInterruptsError?: Error; } = {}, ) { const calls: any[] = []; @@ -38,6 +42,9 @@ function fakeLlm( }, async getPendingInterrupts(input: any) { getPendingInterruptsCalls.push(input); + if (opts.pendingInterruptsError) { + throw opts.pendingInterruptsError; + } return opts.pendingInterrupts ?? []; }, }; @@ -75,9 +82,15 @@ function fakeSessions(opts: { initialResponse?: string } = {}) { function buildUseCase(opts: { streamFor?: (call: number, input: any) => AsyncIterable; initialResponse?: string; - pendingInterrupts?: unknown[]; + pendingInterrupts?: Array<{ id: string; count: number }>; + pendingInterruptsError?: Error; + hasPersistentCheckpointer?: boolean; } = {}) { - const llm = fakeLlm({ streamFor: opts.streamFor, pendingInterrupts: opts.pendingInterrupts }); + const llm = fakeLlm({ + streamFor: opts.streamFor, + pendingInterrupts: opts.pendingInterrupts, + pendingInterruptsError: opts.pendingInterruptsError, + }); const sessions = fakeSessions({ initialResponse: opts.initialResponse }); const useCase = new RunTurnUseCase({ llm: llm as any, @@ -85,6 +98,8 @@ function buildUseCase(opts: { modes: [{ name: 'default', completionAdapter: {} as any }], getAdminforth: () => ({ config: { auth: { usernameField: 'email' }, baseUrlSlashed: '/admin/' } }) as any, getAgentSystemPrompt: async () => 'SYS', + hasPersistentCheckpointer: opts.hasPersistentCheckpointer ?? false, + createDebugSink: () => ({ flush() {}, getHistory: () => [] }), }); return { useCase, llm, sessions }; } @@ -214,13 +229,14 @@ describe('RunTurnUseCase.handleTurn', () => { expect(second.events.find((e) => e.type === 'response').text).toBe('prev Done'); }); - it('rebuilds pending interrupts from persisted state when the in-process cache is empty (restart)', async () => { + it('with a persistent checkpointer, resumes from checkpoint state (ignoring the local cache)', async () => { const { useCase, llm } = buildUseCase({ + hasPersistentCheckpointer: true, initialResponse: 'prev ', - pendingInterrupts: [{ id: 'int-1', value: { actionRequests: [{ name: 'del', description: 'd' }] } }], + pendingInterrupts: [{ id: 'int-1', count: 1 }], streamFor: () => streamOf(text('Resumed')), }); - // No interrupt was cached in this process (simulating a restart); approval must still resume. + // No interrupt cached in this process (restart / other instance); the checkpoint is authoritative. const { input } = makeInput({ prompt: '', approvalDecision: 'approve' }); const result = await useCase.handleTurn(input as any); @@ -230,6 +246,18 @@ describe('RunTurnUseCase.handleTurn', () => { expect(result.text).toBe('prev Resumed'); }); + it('surfaces a checkpoint read failure as a real error, not "no pending approval"', async () => { + const { useCase } = buildUseCase({ + hasPersistentCheckpointer: true, + pendingInterruptsError: new Error('checkpoint DB unavailable'), + }); + const { input } = makeInput({ prompt: '', approvalDecision: 'approve' }); + + // The infra error must propagate, NOT be masked as a missing interrupt. + await expect(useCase.handleTurn(input as any)).rejects.toThrow('checkpoint DB unavailable'); + await expect(useCase.handleTurn(input as any)).rejects.not.toThrow('No pending approval'); + }); + it('rejects (does not swallow) an approval with no pending interrupt, after emitting turn-started', async () => { const { useCase } = buildUseCase(); const { input, events } = makeInput({ prompt: '', approvalDecision: 'approve' }); diff --git a/types.ts b/types.ts index da1afff..a9336ea 100644 --- a/types.ts +++ b/types.ts @@ -5,7 +5,7 @@ import { type AudioAdapter, type ChatSurfaceAdapter, } from "adminforth"; -import type { AgentModeCompletionAdapter } from "./llm/agentModels.js"; +import type { AgentModeCompletionAdapter } from "./application/ports.js"; interface ISessionResource { resourceId: string; From 96322ced5ead74ab22ee4d8b98e4b4f57fada02d Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 2 Jul 2026 17:41:37 +0300 Subject: [PATCH 3/3] refactor: optimize interrupt handling and remove unnecessary logging in agent flow --- application/runTurnUseCase.ts | 20 +++--- llm/agentModels.ts | 102 +-------------------------- llm/middleware/apiToolsMiddleware.ts | 12 ---- tools/apiBasedTools.ts | 10 --- 4 files changed, 12 insertions(+), 132 deletions(-) diff --git a/application/runTurnUseCase.ts b/application/runTurnUseCase.ts index d841985..fda7d8a 100644 --- a/application/runTurnUseCase.ts +++ b/application/runTurnUseCase.ts @@ -244,15 +244,17 @@ export class RunTurnUseCase { interrupt: unknown, descriptors: PendingInterrupt[], ) { - const existing = this.pendingInterrupts.get(prepared.sessionId) ?? []; - const merged = new Map(existing.map((item) => [item.id, item.count])); - for (const item of descriptors) { - merged.set(item.id, item.count); + if (!this.deps.hasPersistentCheckpointer) { + const existing = this.pendingInterrupts.get(prepared.sessionId) ?? []; + const merged = new Map(existing.map((item) => [item.id, item.count])); + for (const item of descriptors) { + merged.set(item.id, item.count); + } + this.pendingInterrupts.set( + prepared.sessionId, + [...merged.entries()].map(([id, count]) => ({ id, count })), + ); } - this.pendingInterrupts.set( - prepared.sessionId, - [...merged.entries()].map(([id, count]) => ({ id, count })), - ); await prepared.observability.emit?.({ type: "interrupt", sessionId: prepared.sessionId, @@ -312,7 +314,7 @@ export class RunTurnUseCase { await textBuffer.flush(emit); return { text: fullResponse }; } finally { - if (!interrupted) { + if (!this.deps.hasPersistentCheckpointer && !interrupted) { this.pendingInterrupts.delete(prepared.sessionId); } } diff --git a/llm/agentModels.ts b/llm/agentModels.ts index aa456a1..bbb80a0 100644 --- a/llm/agentModels.ts +++ b/llm/agentModels.ts @@ -1,6 +1,5 @@ import type { BaseChatModel } from "@langchain/core/language_models/chat_models"; import { - logger, type CompletionAdapter, } from "adminforth"; import { BaseCallbackHandler } from "@langchain/core/callbacks/base"; @@ -35,19 +34,8 @@ type AgentChatModelSpec = { middleware: AgentMiddleware[]; }; -type LlmOutputTokenUsage = { - promptTokens?: unknown; - completionTokens?: unknown; -}; - -type MessageUsageMetadata = { - input_tokens?: unknown; - output_tokens?: unknown; -}; - type PendingLlmRun = { startedAt: number; - firstTokenAt?: number; }; function isLangChainAgentCompletionAdapter( @@ -73,61 +61,6 @@ async function getAgentChatModelSpec(params: { }; } -function getFiniteNumber(value: unknown) { - return typeof value === "number" && Number.isFinite(value) - ? value - : undefined; -} - -function extractTokenUsage(output: LLMResult) { - const llmOutputTokenUsage = ( - output.llmOutput as { tokenUsage?: LlmOutputTokenUsage } | undefined - )?.tokenUsage; - - const promptTokens = getFiniteNumber(llmOutputTokenUsage?.promptTokens); - const completionTokens = getFiniteNumber( - llmOutputTokenUsage?.completionTokens, - ); - - if (promptTokens !== undefined || completionTokens !== undefined) { - return { - InputTokens: promptTokens ?? 0, - outputTokens: completionTokens ?? 0, - }; - } - - let InputTokens = 0; - let outputTokens = 0; - - for (const generationBatch of output.generations) { - for (const generation of generationBatch) { - if (!("message" in generation) || !generation.message) { - continue; - } - - const message = generation.message as { - usage_metadata?: MessageUsageMetadata; - response_metadata?: { - tokenUsage?: LlmOutputTokenUsage; - }; - }; - - InputTokens += - getFiniteNumber(message.usage_metadata?.input_tokens) ?? - getFiniteNumber(message.response_metadata?.tokenUsage?.promptTokens) ?? - 0; - outputTokens += - getFiniteNumber(message.usage_metadata?.output_tokens) ?? - getFiniteNumber( - message.response_metadata?.tokenUsage?.completionTokens, - ) ?? - 0; - } - } - - return { InputTokens, outputTokens }; -} - class AgentLlmMetricsLogger extends BaseCallbackHandler { name = "AgentLlmMetricsLogger"; lc_prefer_streaming = true; @@ -138,41 +71,8 @@ class AgentLlmMetricsLogger extends BaseCallbackHandler { this.pendingRuns.set(runId, { startedAt: Date.now() }); } - async handleLLMNewToken( - _token: string, - _chunk: unknown, - runId: string, - ) { - const pendingRun = this.pendingRuns.get(runId); - - if (!pendingRun || pendingRun.firstTokenAt !== undefined) { - return; - } - - pendingRun.firstTokenAt = Date.now(); - } - - async handleLLMEnd(output: LLMResult, runId: string) { - const pendingRun = this.pendingRuns.get(runId); - - if (!pendingRun) { - return; - } - + async handleLLMEnd(_output: LLMResult, runId: string) { this.pendingRuns.delete(runId); - - const finishedAt = Date.now(); - const rtt = finishedAt - pendingRun.startedAt; - const ttft = - pendingRun.firstTokenAt === undefined - ? rtt - : pendingRun.firstTokenAt - pendingRun.startedAt; - const { InputTokens, outputTokens } = extractTokenUsage(output); - - logger.info( - { InputTokens, outputTokens, ttft, rtt }, - "LLM call finished", - ); } async handleLLMError(_error: unknown, runId: string) { diff --git a/llm/middleware/apiToolsMiddleware.ts b/llm/middleware/apiToolsMiddleware.ts index f342450..726bf0d 100644 --- a/llm/middleware/apiToolsMiddleware.ts +++ b/llm/middleware/apiToolsMiddleware.ts @@ -72,10 +72,6 @@ export function createApiBasedToolsMiddleware( .map((toolName) => dynamicTools[toolName]); const availableTools = [...request.tools, ...tools]; - logger.info( - `AdminForth Agent callable tools: ${availableTools.map((tool) => tool.name).join(", ")}`, - ); - return handler({ ...request, tools: availableTools, @@ -83,7 +79,6 @@ export function createApiBasedToolsMiddleware( }, async wrapToolCall(request, handler) { const startedAt = Date.now(); - const toolInput = JSON.stringify(request.toolCall.args ?? {}); if (!request.toolCall.id) { throw new Error(`Tool call "${request.toolCall.name}" has no id.`); } @@ -128,9 +123,6 @@ export function createApiBasedToolsMiddleware( startedAt, }); toolCallTracker.start(); - logger.info( - `Invoking tool "${request.toolCall.name}" with input: ${toolInput}`, - ); try { @@ -164,10 +156,6 @@ export function createApiBasedToolsMiddleware( status: "error", content: `Error: ${message}`, }) - } finally { - logger.info( - `Tool "${request.toolCall.name}" finished in ${Date.now() - startedAt}ms`, - ); } }, }); diff --git a/tools/apiBasedTools.ts b/tools/apiBasedTools.ts index 7529056..7e34ffe 100644 --- a/tools/apiBasedTools.ts +++ b/tools/apiBasedTools.ts @@ -574,7 +574,6 @@ async function callOpenApiSchema(params: { } const response = createDirectToolResponse(); - logger.info(`Calling OpenAPI tool "${toolName}" with direct handler`); const lang = acceptLanguage ?? "en"; const tr = ( msg: string, @@ -659,15 +658,6 @@ export function prepareApiBasedTools( } } - logger.info( - `AdminForth Agent OpenAPI APIs: ${formatLogNameList( - adminforth.openApi.registeredSchemas.map((schema) => openApiSchemaPathToToolName(schema.path, adminforth)), - )}`, - ); - logger.info( - `AdminForth Agent OpenAPI tools connected: ${formatLogNameList([...openApiSchemasByToolName.keys()])}`, - ); - for (const [toolName, schema] of openApiSchemasByToolName.entries()) { apiBasedTools[toolName] = { description: schema.description,