diff --git a/agent/middleware/openAiResponsesContinuation.ts b/agent/middleware/openAiResponsesContinuation.ts deleted file mode 100644 index 6dbeda7..0000000 --- a/agent/middleware/openAiResponsesContinuation.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { AIMessage } from "@langchain/core/messages"; -import { createMiddleware } from "langchain"; - -type OpenAiResponsesMetadata = { - id?: string; -}; - -type OpenAiResponsesContext = { - sessionId: string; - turnId: string; -}; - -function getTurnKey(context: OpenAiResponsesContext) { - return `${context.sessionId}:${context.turnId}`; -} - -function getResponseId(message: AIMessage) { - const metadata = message.response_metadata as OpenAiResponsesMetadata | undefined; - return metadata?.id ?? null; -} - -function getPreviousResponseId(modelSettings?: Record) { - return (modelSettings as { previous_response_id?: string } | undefined) - ?.previous_response_id; -} - -function getContinuationMessages( - messages: T[], - previousResponseId: string, -) { - let continuationStartIndex: number | null = null; - - for (let index = messages.length - 1; index >= 0; index -= 1) { - const message = messages[index]; - - if ( - AIMessage.isInstance(message) && - (message.response_metadata as OpenAiResponsesMetadata | undefined)?.id === - previousResponseId - ) { - continuationStartIndex = index + 1; - break; - } - } - - if (continuationStartIndex === null) { - return null; - } - - return messages.slice(continuationStartIndex); -} - -export function createOpenAiResponsesContinuationMiddleware() { - const responseIdsByTurn = new Map(); - - return createMiddleware({ - name: "OpenAiResponsesContinuationMiddleware", - async wrapModelCall(request, handler) { - const context = request.runtime.context as OpenAiResponsesContext; - const turnKey = getTurnKey(context); - const previousResponseId = - getPreviousResponseId(request.modelSettings) ?? - responseIdsByTurn.get(turnKey); - const continuationMessages = previousResponseId - ? getContinuationMessages(request.messages, previousResponseId) - : null; - - const response = await handler( - previousResponseId && continuationMessages - ? { - ...request, - messages: continuationMessages, - modelSettings: { - ...request.modelSettings, - previous_response_id: previousResponseId, - }, - } - : request, - ) as AIMessage; - - const responseId = getResponseId(response); - - if (responseId) { - responseIdsByTurn.set(turnKey, responseId); - } else { - responseIdsByTurn.delete(turnKey); - } - - return response; - }, - }); -} diff --git a/agent/models/AgentModeResolver.ts b/agent/models/AgentModeResolver.ts deleted file mode 100644 index 50f5bd4..0000000 --- a/agent/models/AgentModeResolver.ts +++ /dev/null @@ -1,9 +0,0 @@ -import type { PluginOptions } from "../../types.js"; - -export class AgentModeResolver { - constructor(private readonly options: PluginOptions) {} - - resolve(modeName?: string | null) { - return this.options.modes.find((mode) => mode.name === modeName) ?? this.options.modes[0]; - } -} diff --git a/agent/simpleAgent.ts b/agent/simpleAgent.ts deleted file mode 100644 index 03757c8..0000000 --- a/agent/simpleAgent.ts +++ /dev/null @@ -1,199 +0,0 @@ -import type { BaseChatModel } from "@langchain/core/language_models/chat_models"; -import { - logger, - type CompletionAdapter, -} from "adminforth"; -import { BaseCallbackHandler } from "@langchain/core/callbacks/base"; -import type { LLMResult } from "@langchain/core/outputs"; -import { - createSequenceDebugMiddleware, -} from "./middleware/sequenceDebug.js"; - -export type AgentChatModel = BaseChatModel; -export type AgentModelPurpose = "primary" | "summary"; -export type AgentModeCompletionAdapter = CompletionAdapter & { - getLangChainAgentSpec(params: { - maxTokens: number; - purpose: AgentModelPurpose; - }): Promise<{ - model: unknown; - middleware?: unknown[]; - }> | { - model: unknown; - middleware?: unknown[]; - }; -}; - -export type AgentMiddleware = ReturnType; - -type AgentChatModelSpec = { - model: AgentChatModel; - middleware: AgentMiddleware[]; -}; - -type LlmOutputTokenUsage = { - promptTokens?: unknown; - completionTokens?: unknown; -}; - -type MessageUsageMetadata = { - input_tokens?: unknown; - output_tokens?: unknown; -}; - -type PendingLlmRun = { - startedAt: number; - firstTokenAt?: number; -}; - -function isLangChainAgentCompletionAdapter( - adapter: CompletionAdapter, -): adapter is AgentModeCompletionAdapter { - return typeof (adapter as AgentModeCompletionAdapter) - .getLangChainAgentSpec === "function"; -} - -async function getAgentChatModelSpec(params: { - adapter: AgentModeCompletionAdapter; - maxTokens: number; - purpose: AgentModelPurpose; -}): Promise { - const spec = await params.adapter.getLangChainAgentSpec({ - maxTokens: params.maxTokens, - purpose: params.purpose, - }); - - return { - model: spec.model as AgentChatModel, - middleware: (spec.middleware ?? []) as AgentMiddleware[], - }; -} - -function getFiniteNumber(value: unknown) { - return typeof value === "number" && Number.isFinite(value) - ? value - : undefined; -} - -function extractTokenUsage(output: LLMResult) { - const llmOutputTokenUsage = ( - output.llmOutput as { tokenUsage?: LlmOutputTokenUsage } | undefined - )?.tokenUsage; - - const promptTokens = getFiniteNumber(llmOutputTokenUsage?.promptTokens); - const completionTokens = getFiniteNumber( - llmOutputTokenUsage?.completionTokens, - ); - - if (promptTokens !== undefined || completionTokens !== undefined) { - return { - InputTokens: promptTokens ?? 0, - outputTokens: completionTokens ?? 0, - }; - } - - let InputTokens = 0; - let outputTokens = 0; - - for (const generationBatch of output.generations) { - for (const generation of generationBatch) { - if (!("message" in generation) || !generation.message) { - continue; - } - - const message = generation.message as { - usage_metadata?: MessageUsageMetadata; - response_metadata?: { - tokenUsage?: LlmOutputTokenUsage; - }; - }; - - InputTokens += - getFiniteNumber(message.usage_metadata?.input_tokens) ?? - getFiniteNumber(message.response_metadata?.tokenUsage?.promptTokens) ?? - 0; - outputTokens += - getFiniteNumber(message.usage_metadata?.output_tokens) ?? - getFiniteNumber( - message.response_metadata?.tokenUsage?.completionTokens, - ) ?? - 0; - } - } - - return { InputTokens, outputTokens }; -} - -class AgentLlmMetricsLogger extends BaseCallbackHandler { - name = "AgentLlmMetricsLogger"; - lc_prefer_streaming = true; - - private readonly pendingRuns = new Map(); - - async handleLLMStart(_llm: unknown, _prompts: string[], runId: string) { - this.pendingRuns.set(runId, { startedAt: Date.now() }); - } - - async handleLLMNewToken( - _token: string, - _chunk: unknown, - runId: string, - ) { - const pendingRun = this.pendingRuns.get(runId); - - if (!pendingRun || pendingRun.firstTokenAt !== undefined) { - return; - } - - pendingRun.firstTokenAt = Date.now(); - } - - async handleLLMEnd(output: LLMResult, runId: string) { - const pendingRun = this.pendingRuns.get(runId); - - if (!pendingRun) { - return; - } - - this.pendingRuns.delete(runId); - - const finishedAt = Date.now(); - const rtt = finishedAt - pendingRun.startedAt; - const ttft = - pendingRun.firstTokenAt === undefined - ? rtt - : pendingRun.firstTokenAt - pendingRun.startedAt; - const { InputTokens, outputTokens } = extractTokenUsage(output); - - logger.info( - { InputTokens, outputTokens, ttft, rtt }, - "LLM call finished", - ); - } - - async handleLLMError(_error: unknown, runId: string) { - this.pendingRuns.delete(runId); - } -} - -export function createAgentLlmMetricsLogger() { - return new AgentLlmMetricsLogger(); -} - -export async function createAgentChatModel(params: { - adapter: CompletionAdapter; - maxTokens: number; - purpose: AgentModelPurpose; -}) { - if (!isLangChainAgentCompletionAdapter(params.adapter)) { - throw new Error( - "AdminForth Agent requires completionAdapter to implement getLangChainAgentSpec({ maxTokens, purpose }).", - ); - } - - return await getAgentChatModelSpec({ - adapter: params.adapter, - maxTokens: params.maxTokens, - purpose: params.purpose, - }); -} diff --git a/agent/turn/TurnContextBuilder.ts b/agent/turn/TurnContextBuilder.ts deleted file mode 100644 index 5ccfd15..0000000 --- a/agent/turn/TurnContextBuilder.ts +++ /dev/null @@ -1,36 +0,0 @@ -import type { AdminUser, IAdminForth } from "adminforth"; -import type { AgentTurnContext, BaseAgentTurnInput } from "./turnTypes.js"; - -export type UserContextProvider = { - getUserTimeZone(adminUser: AdminUser): Promise | string | null | undefined; -}; - -export class TurnContextBuilder { - constructor( - private readonly getAdminforth: () => IAdminForth, - private readonly userContextProvider?: UserContextProvider, - ) {} - - async build(input: { - base: BaseAgentTurnInput; - turnId: string; - }): Promise { - const adminforth = this.getAdminforth(); - - return { - adminUser: input.base.adminUser, - sessionId: input.base.sessionId, - turnId: input.turnId, - abortSignal: input.base.abortSignal, - currentPage: input.base.currentPage, - chatSurface: input.base.chatSurface, - userTimeZone: - input.base.userTimeZone - ?? await this.userContextProvider?.getUserTimeZone(input.base.adminUser) - ?? "UTC", - adminPublicOrigin: - input.base.adminPublicOrigin - ?? adminforth.config.baseUrlSlashed, - }; - } -} diff --git a/agent/turn/TurnLifecycleService.ts b/agent/turn/TurnLifecycleService.ts deleted file mode 100644 index 6987f1c..0000000 --- a/agent/turn/TurnLifecycleService.ts +++ /dev/null @@ -1,47 +0,0 @@ -import type { AgentSessionStore } from "../../sessionStore.js"; -import type { PluginOptions } from "../../types.js"; -import type { BaseAgentTurnInput } from "./turnTypes.js"; -import { TurnPersistenceService } from "./TurnPersistenceService.js"; - -export class TurnLifecycleService { - constructor( - private readonly sessionStore: AgentSessionStore, - private readonly persistence: TurnPersistenceService, - private readonly options: PluginOptions, - ) {} - - async start(input: BaseAgentTurnInput) { - const previousUserMessages = await this.sessionStore.getPreviousUserMessages(input.sessionId); - const turnId = await this.sessionStore.createNewTurn(input.sessionId, input.prompt); - await this.persistence.touchSession(input.sessionId); - - return { - turnId, - previousUserMessages, - }; - } - - async resume(input: BaseAgentTurnInput) { - const latestTurn = await this.sessionStore.getLatestTurn(input.sessionId); - - if (!latestTurn) { - throw new Error(`No agent turn found for session "${input.sessionId}".`); - } - - return { - turnId: latestTurn[this.options.turnResource.idField], - previousUserMessages: await this.sessionStore.getPreviousUserMessages(input.sessionId), - initialResponse: latestTurn[this.options.turnResource.responseField] === "not_finished" - ? "" - : String(latestTurn[this.options.turnResource.responseField]), - }; - } - - async finish(input: { - turnId: string; - responseText: string; - debugHistory?: unknown; - }) { - await this.persistence.saveTurnResponse(input); - } -} diff --git a/agent/turn/TurnPersistenceService.ts b/agent/turn/TurnPersistenceService.ts deleted file mode 100644 index abfee05..0000000 --- a/agent/turn/TurnPersistenceService.ts +++ /dev/null @@ -1,33 +0,0 @@ -import type { IAdminForth } from "adminforth"; -import type { PluginOptions } from "../../types.js"; - -export class TurnPersistenceService { - constructor( - private readonly getAdminforth: () => IAdminForth, - private readonly options: PluginOptions, - ) {} - - async touchSession(sessionId: string) { - await this.getAdminforth().resource(this.options.sessionResource.resourceId).update(sessionId, { - [this.options.sessionResource.createdAtField]: new Date().toISOString(), - }); - } - - async saveTurnResponse(input: { - turnId: string; - responseText: string; - debugHistory?: unknown; - }) { - const turnUpdates: Record = { - [this.options.turnResource.responseField]: input.responseText, - }; - - if (this.options.turnResource.debugField) { - turnUpdates[this.options.turnResource.debugField] = input.debugHistory; - } - - await this.getAdminforth() - .resource(this.options.turnResource.resourceId) - .update(input.turnId, turnUpdates); - } -} diff --git a/agent/turn/TurnPromptBuilder.ts b/agent/turn/TurnPromptBuilder.ts deleted file mode 100644 index 8c04965..0000000 --- a/agent/turn/TurnPromptBuilder.ts +++ /dev/null @@ -1,51 +0,0 @@ -import type { AdminUser, IAdminForth } from "adminforth"; -import { logger } from "adminforth"; -import { HumanMessage, SystemMessage } from "langchain"; -import { detectUserLanguage, type PreviousUserMessage } from "../languageDetect.js"; -import { buildAgentTurnSystemPrompt } from "../systemPrompt.js"; -import { getErrorMessage, isAbortError } from "../../errors.js"; -import type { AgentModeCompletionAdapter } from "../simpleAgent.js"; - -export class TurnPromptBuilder { - constructor( - private readonly options: { - getAgentSystemPrompt: () => Promise; - getAdminforth: () => IAdminForth; - }, - ) {} - - async build(input: { - prompt: string; - previousUserMessages: PreviousUserMessage[]; - adminUser: AdminUser; - completionAdapter: AgentModeCompletionAdapter; - chatSurface?: string; - abortSignal?: AbortSignal; - }) { - const adminforth = this.options.getAdminforth(); - const userLanguage = await detectUserLanguage( - input.completionAdapter, - input.prompt, - input.previousUserMessages, - ).catch((error) => { - if (input.abortSignal?.aborted || isAbortError(error)) { - throw error; - } - - logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); - return null; - }); - const systemPrompt = buildAgentTurnSystemPrompt({ - agentSystemPrompt: await this.options.getAgentSystemPrompt(), - adminUser: input.adminUser, - usernameField: adminforth.config.auth!.usernameField, - userLanguage, - chatSurface: input.chatSurface, - }); - - return [ - new SystemMessage(systemPrompt), - new HumanMessage(input.prompt), - ]; - } -} diff --git a/agent/turn/TurnStreamConsumer.ts b/agent/turn/TurnStreamConsumer.ts deleted file mode 100644 index 8f306a9..0000000 --- a/agent/turn/TurnStreamConsumer.ts +++ /dev/null @@ -1,69 +0,0 @@ -import type { AgentEventEmitter } from "../../agentEvents.js"; -import { VegaLiteStreamBuffer } from "./VegaLiteStreamBuffer.js"; - -export class TurnStreamConsumer { - async consume(input: { - stream: AsyncIterable<["messages", [any, any]] | ["updates", Record]>; - abortSignal?: AbortSignal; - emit?: AgentEventEmitter; - onInterrupt?: (interrupt: unknown) => void | Promise; - }) { - let fullResponse = ""; - const textBuffer = new VegaLiteStreamBuffer(); - - for await (const [mode, chunk] of input.stream) { - if (input.abortSignal?.aborted) { - throw new DOMException("This operation was aborted", "AbortError"); - } - - if (mode === "updates") { - if ("__interrupt__" in chunk) { - await input.onInterrupt?.(chunk.__interrupt__); - } - continue; - } - - const [token, metadata] = chunk; - const nodeName = - typeof metadata?.langgraph_node === "string" - ? metadata.langgraph_node - : ""; - - if (nodeName && !["model", "model_request"].includes(nodeName)) { - continue; - } - - const blocks = Array.isArray(token?.contentBlocks) - ? token.contentBlocks - : Array.isArray(token?.content) - ? token.content - : []; - const reasoningDelta = blocks - .filter((block: any) => block?.type === "reasoning") - .map((block: any) => String(block.reasoning ?? "")) - .join(""); - const textDelta = blocks - .filter((block: any) => block?.type === "text") - .map((block: any) => String(block.text ?? "")) - .join(""); - - if (reasoningDelta) { - await input.emit?.({ - type: "reasoning-delta", - delta: reasoningDelta, - }); - } - - if (textDelta) { - fullResponse += textDelta; - await textBuffer.push(textDelta, input.emit); - } - } - - await textBuffer.flush(input.emit); - - return { - text: fullResponse, - }; - } -} diff --git a/agentTurnService.ts b/agentTurnService.ts deleted file mode 100644 index b49cc24..0000000 --- a/agentTurnService.ts +++ /dev/null @@ -1,314 +0,0 @@ -import { logger } from "adminforth"; -import { randomUUID } from "crypto"; -import { Command } from "@langchain/langgraph"; -import { AgentModelFactory } from "./agent/models/AgentModelFactory.js"; -import { AgentModeResolver } from "./agent/models/AgentModeResolver.js"; -import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js"; -import { AgentRuntime } from "./agent/runtime/AgentRuntime.js"; -import { TurnContextBuilder } from "./agent/turn/TurnContextBuilder.js"; -import { TurnLifecycleService } from "./agent/turn/TurnLifecycleService.js"; -import { TurnPromptBuilder } from "./agent/turn/TurnPromptBuilder.js"; -import { TurnStreamConsumer } from "./agent/turn/TurnStreamConsumer.js"; -import type { - BaseAgentTurnInput, - HandleTurnInput, - PreparedAgentTurn, - RunAndPersistAgentResponseInput, - RunAndPersistAgentResponseResult, -} from "./agent/turn/turnTypes.js"; -import { getErrorMessage, isAbortError } from "./errors.js"; - -export type { - BaseAgentTurnInput, - HandleSpeechTurnInput, - HandleTurnInput, - RunAndPersistAgentResponseInput, - RunAndPersistAgentResponseResult, -} from "./agent/turn/turnTypes.js"; - -function getApprovalDecision(input: BaseAgentTurnInput) { - return "approvalDecision" in input - && (input.approvalDecision === "approve" || input.approvalDecision === "reject") - ? input.approvalDecision - : undefined; -} - -function getInterruptItems(interrupt: unknown): unknown[] { - return Array.isArray(interrupt) ? interrupt : [interrupt]; -} - -function getHitlInterrupts(interrupt: unknown): { id: string; count: number }[] { - return getInterruptItems(interrupt).flatMap((item) => { - const value = item && typeof item === "object" && "value" in item - ? (item as { value: unknown }).value - : item; - const actionRequests = value && typeof value === "object" - ? (value as { actionRequests?: unknown }).actionRequests - : undefined; - const interruptId = item && typeof item === "object" - ? (item as { id?: unknown }).id - : undefined; - - return typeof interruptId === "string" && Array.isArray(actionRequests) - ? [{ id: interruptId, count: actionRequests.length }] - : []; - }); -} - -function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { - if (decision === "approve") { - return { type: "approve" as const }; - } - - return { - type: "reject" as const, - message: prompt - ? `User rejected the pending tool execution and sent a new instruction instead: ${prompt}` - : "User rejected executing this tool", - }; -} - -function buildHitlResumeValue(input: { - decision: "approve" | "reject"; - count: number; - prompt?: string; -}) { - return { - decisions: Array.from({ length: input.count }, () => ( - buildHitlDecision(input.decision, input.prompt) - )), - }; -} - -function buildLangGraphResume(input: { - decision: "approve" | "reject"; - interrupts?: { id: string; count: number }[]; - prompt?: string; -}) { - const interrupts = input.interrupts ?? []; - - if (interrupts.length === 0) { - throw new Error("No pending approval interrupt found for resume."); - } - - if (interrupts.length === 1) { - return buildHitlResumeValue({ - decision: input.decision, - count: interrupts[0].count, - prompt: input.prompt, - }); - } - - return Object.fromEntries( - interrupts.map((interrupt) => [ - interrupt.id, - buildHitlResumeValue({ - decision: input.decision, - count: interrupt.count, - prompt: input.prompt, - }), - ]), - ); -} - -export class AgentTurnService { - private readonly pendingInterrupts = new Map(); - - constructor( - private readonly lifecycle: TurnLifecycleService, - private readonly contextBuilder: TurnContextBuilder, - private readonly modeResolver: AgentModeResolver, - private readonly modelFactory: AgentModelFactory, - private readonly promptBuilder: TurnPromptBuilder, - private readonly runtime: AgentRuntime, - private readonly streamConsumer: TurnStreamConsumer, - ) {} - - private async prepareTurn(input: BaseAgentTurnInput): Promise { - const sequenceDebugCollector = createSequenceDebugCollector(); - const approvalDecision = getApprovalDecision(input); - const shouldResume = Boolean(approvalDecision); - const pendingInterrupts = this.pendingInterrupts.get(input.sessionId); - if (shouldResume && (!pendingInterrupts || pendingInterrupts.length === 0)) { - throw new Error(`No pending approval interrupt found for session "${input.sessionId}".`); - } - const lifecycleTurn = shouldResume - ? await this.lifecycle.resume(input) - : await this.lifecycle.start(input); - const context = await this.contextBuilder.build({ - base: input, - turnId: lifecycleTurn.turnId, - }); - - return { - prompt: input.prompt, - sessionId: input.sessionId, - turnId: lifecycleTurn.turnId, - previousUserMessages: lifecycleTurn.previousUserMessages, - modeName: input.modeName, - context, - observability: { - emit: undefined, - sequenceDebugSink: sequenceDebugCollector, - }, - resume: shouldResume - ? { - decision: approvalDecision!, - interrupts: pendingInterrupts, - } - : undefined, - initialResponse: shouldResume && "initialResponse" in lifecycleTurn - ? (lifecycleTurn as { initialResponse?: string }).initialResponse - : undefined, - }; - } - - private async runAgentTurn(input: PreparedAgentTurn) { - const selectedMode = this.modeResolver.resolve(input.modeName); - const [models, messages] = await Promise.all([ - this.modelFactory.create(selectedMode.completionAdapter), - this.promptBuilder.build({ - prompt: input.prompt, - previousUserMessages: input.previousUserMessages, - adminUser: input.context.adminUser, - completionAdapter: selectedMode.completionAdapter, - chatSurface: input.context.chatSurface, - abortSignal: input.context.abortSignal, - }), - ]); - const stream = await this.runtime.stream({ - models, - input: input.resume - ? new Command({ - resume: buildLangGraphResume({ - decision: input.resume.decision, - interrupts: input.resume.interrupts, - prompt: input.prompt, - }), - }) - : { messages }, - context: input.context, - observability: input.observability, - }); - - let interrupted = false; - try { - return await this.streamConsumer.consume({ - stream: stream as AsyncIterable<["messages", [any, any]] | ["updates", Record]>, - abortSignal: input.context.abortSignal, - emit: input.observability.emit, - onInterrupt: async (interrupt) => { - interrupted = true; - const interrupts = getHitlInterrupts(interrupt); - const pendingInterrupts = this.pendingInterrupts.get(input.sessionId) ?? []; - const mergedInterrupts = new Map( - pendingInterrupts.map((pendingInterrupt) => [ - pendingInterrupt.id, - pendingInterrupt.count, - ]), - ); - - for (const pendingInterrupt of interrupts) { - mergedInterrupts.set(pendingInterrupt.id, pendingInterrupt.count); - } - - this.pendingInterrupts.set( - input.sessionId, - [...mergedInterrupts.entries()].map(([id, count]) => ({ id, count })), - ); - await input.observability.emit?.({ - type: "interrupt", - sessionId: input.sessionId, - interrupt, - }); - }, - }); - } finally { - if (!interrupted) { - this.pendingInterrupts.delete(input.sessionId); - } - } - } - - async runAndPersistAgentResponse( - input: RunAndPersistAgentResponseInput, - ): Promise { - const preparedTurn = await this.prepareTurn(input); - preparedTurn.observability.emit = input.emit; - - let fullResponse = preparedTurn.initialResponse ?? ""; - let aborted = false; - let failed = false; - - try { - const agentResponse = await this.runAgentTurn(preparedTurn); - fullResponse += agentResponse.text; - } catch (error) { - if (input.abortSignal?.aborted || isAbortError(error)) { - aborted = true; - logger.info(input.abortLogMessage); - } else { - failed = true; - fullResponse = getErrorMessage(error); - logger.error(`${input.failureLogMessage}:\n${fullResponse}`); - } - } - - preparedTurn.observability.sequenceDebugSink.flush(); - await this.lifecycle.finish({ - turnId: preparedTurn.turnId, - responseText: fullResponse, - debugHistory: preparedTurn.observability.sequenceDebugSink.getHistory(), - }); - - return { - text: fullResponse, - turnId: preparedTurn.turnId, - aborted, - failed, - }; - } - - async handleTurn(input: HandleTurnInput) { - await input.emit({ - type: "turn-started", - messageId: randomUUID(), - }); - - const agentResponse = await this.runAndPersistAgentResponse({ - prompt: input.prompt, - sessionId: input.sessionId, - modeName: input.modeName, - userTimeZone: input.userTimeZone, - currentPage: input.currentPage, - chatSurface: input.chatSurface, - adminPublicOrigin: input.adminPublicOrigin, - approvalDecision: input.approvalDecision, - abortSignal: input.abortSignal, - adminUser: input.adminUser, - emit: input.emit, - failureLogMessage: input.failureLogMessage ?? "Agent response failed", - abortLogMessage: input.abortLogMessage ?? "Agent response aborted", - }); - - if (agentResponse.failed) { - await input.emit({ - type: "error", - error: agentResponse.text, - }); - } else if (!agentResponse.aborted) { - await input.emit({ - type: "response", - text: agentResponse.text, - sessionId: input.sessionId, - turnId: agentResponse.turnId, - }); - } - - await input.emit({ - type: "finish", - }); - - return agentResponse; - } -} diff --git a/application/ports.ts b/application/ports.ts new file mode 100644 index 0000000..b850d0b --- /dev/null +++ b/application/ports.ts @@ -0,0 +1,68 @@ +import type { CompletionAdapter } from "adminforth"; +import type { DetectedLanguage, PreviousUserMessage } from "../domain/languageDetect.js"; +import type { + AgentMessage, + AgentStreamChunk, + AgentTurnContext, + AgentTurnObservability, + PendingInterrupt, +} from "../domain/turnTypes.js"; + +export type AgentModelPurpose = "primary" | "summary"; + +/** + * The LLM provider adapter contract (the "provider port"). Extends AdminForth's + * CompletionAdapter with the agent-spec factory. Declared in the application layer + * so the infrastructure (llm/) depends on this contract — not the other way around. + */ +export type AgentModeCompletionAdapter = CompletionAdapter & { + getLangChainAgentSpec(params: { + maxTokens: number; + purpose: AgentModelPurpose; + }): Promise<{ model: unknown; middleware?: unknown[] }> | { model: unknown; middleware?: unknown[] }; +}; + +/** + * Input for a single streamed agent turn. Either a fresh set of messages, or a + * human-in-the-loop resume payload for a previously interrupted turn. + */ +export type LlmStreamInput = { + completionAdapter: AgentModeCompletionAdapter; + input: { messages: AgentMessage[] } | { resume: unknown }; + context: AgentTurnContext; + observability: AgentTurnObservability; +}; + +/** + * The boundary between the application layer and the concrete LLM runtime + * (LangChain / LangGraph). Everything provider-specific — model construction, + * the agent graph, middleware, the raw stream shape, and the LangGraph interrupt + * shape — lives behind this port and is normalized before crossing it. + */ +export interface LlmPort { + /** + * Run one turn and return a normalized stream of typed chunks. + */ + streamTurn(input: LlmStreamInput): Promise>; + + /** + * Detect the language the assistant should answer in for the current message. + */ + detectLanguage(input: { + completionAdapter: AgentModeCompletionAdapter; + prompt: string; + previousUserMessages: PreviousUserMessage[]; + }): Promise; + + /** + * Rebuild the pending human-in-the-loop interrupts for a session from persisted + * checkpoint state (used when the in-process cache is empty — after a restart or + * on another instance). Returns already-normalized descriptors; the LangGraph + * interrupt shape never leaves the llm layer. Throws on a checkpoint/runtime + * failure (the caller must not treat that as "no pending interrupt"). + */ + getPendingInterrupts(input: { + completionAdapter: AgentModeCompletionAdapter; + sessionId: string; + }): Promise; +} diff --git a/application/runTurnUseCase.ts b/application/runTurnUseCase.ts new file mode 100644 index 0000000..fda7d8a --- /dev/null +++ b/application/runTurnUseCase.ts @@ -0,0 +1,403 @@ +import { logger, type IAdminForth } from "adminforth"; +import { randomUUID } from "crypto"; +import { VegaLiteStreamBuffer } from "../domain/vegaLiteStreamBuffer.js"; +import { buildAgentTurnSystemPrompt } from "../domain/systemPrompt.js"; +import { getErrorMessage, isAbortError } from "../shared/errors.js"; +import type { PreviousUserMessage } from "../domain/languageDetect.js"; +import type { AgentSessionStore } from "../persistence/sessionStore.js"; +import type { PluginOptions } from "../types.js"; +import type { LlmPort } from "./ports.js"; +import type { + AgentMessage, + AgentTurnContext, + AgentTurnObservability, + BaseAgentTurnInput, + DebugSink, + HandleTurnInput, + PendingInterrupt, + RunAndPersistAgentResponseInput, + RunAndPersistAgentResponseResult, +} from "../domain/turnTypes.js"; + +type AgentMode = PluginOptions["modes"][number]; + +type PreparedTurn = { + prompt: string; + sessionId: string; + turnId: string; + previousUserMessages: PreviousUserMessage[]; + mode: AgentMode; + context: AgentTurnContext; + observability: AgentTurnObservability; + resume?: { decision: "approve" | "reject"; interrupts?: PendingInterrupt[] }; + initialResponse?: string; +}; + +function getApprovalDecision(input: BaseAgentTurnInput) { + return "approvalDecision" in input + && (input.approvalDecision === "approve" || input.approvalDecision === "reject") + ? input.approvalDecision + : undefined; +} + +function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { + if (decision === "approve") { + return { type: "approve" as const }; + } + + return { + type: "reject" as const, + message: prompt + ? `User rejected the pending tool execution and sent a new instruction instead: ${prompt}` + : "User rejected executing this tool", + }; +} + +function buildHitlResumeValue(input: { + decision: "approve" | "reject"; + count: number; + prompt?: string; +}) { + return { + decisions: Array.from({ length: input.count }, () => ( + buildHitlDecision(input.decision, input.prompt) + )), + }; +} + +/** + * Build the provider-agnostic resume payload for a human-in-the-loop turn. + * Exported for unit testing; the LLM port wraps it into a LangGraph Command. + */ +export function buildResumeValue(input: { + decision: "approve" | "reject"; + interrupts?: PendingInterrupt[]; + prompt?: string; +}) { + const interrupts = input.interrupts ?? []; + + if (interrupts.length === 0) { + throw new Error("No pending approval interrupt found for resume."); + } + + if (interrupts.length === 1) { + return buildHitlResumeValue({ + decision: input.decision, + count: interrupts[0].count, + prompt: input.prompt, + }); + } + + return Object.fromEntries( + interrupts.map((interrupt) => [ + interrupt.id, + buildHitlResumeValue({ + decision: input.decision, + count: interrupt.count, + prompt: input.prompt, + }), + ]), + ); +} + +export type RunTurnUseCaseDeps = { + llm: LlmPort; + sessions: AgentSessionStore; + modes: PluginOptions["modes"]; + getAdminforth: () => IAdminForth; + getAgentSystemPrompt: () => Promise; + /** True when a persistent checkpointer is configured (not the in-memory MemorySaver). */ + hasPersistentCheckpointer: boolean; + /** Factory for the per-turn debug sink; the concrete impl lives in the llm layer. */ + createDebugSink: () => DebugSink; +}; + +/** + * Single entry point for running an agent turn: prepare (start/resume) → build + * the prompt → stream from the LLM port → consume the typed stream into SSE + * events → persist the result. Replaces the former AgentTurnService plus the + * TurnLifecycle/Context/Prompt/Persistence/StreamConsumer + ModeResolver stack. + */ +export class RunTurnUseCase { + private readonly pendingInterrupts = new Map(); + + constructor(private readonly deps: RunTurnUseCaseDeps) {} + + private resolveMode(modeName?: string | null): AgentMode { + return this.deps.modes.find((mode) => mode.name === modeName) ?? this.deps.modes[0]; + } + + private buildContext(input: BaseAgentTurnInput, turnId: string): AgentTurnContext { + return { + adminUser: input.adminUser, + sessionId: input.sessionId, + turnId, + abortSignal: input.abortSignal, + currentPage: input.currentPage, + chatSurface: input.chatSurface, + userTimeZone: input.userTimeZone ?? "UTC", + adminPublicOrigin: + input.adminPublicOrigin ?? this.deps.getAdminforth().config.baseUrlSlashed, + }; + } + + /** + * Resolve the pending HITL interrupts for a resume. With a persistent checkpointer the + * checkpoint is authoritative (correct across instances and restarts); with the in-memory + * MemorySaver the in-process cache is authoritative (single process, cannot be stale). + */ + private async resolvePendingInterrupts(sessionId: string, mode: AgentMode): Promise { + if (this.deps.hasPersistentCheckpointer) { + // The in-process cache can be stale after a cross-instance resume, so it is intentionally + // NOT consulted here. A failure to read the checkpoint is surfaced as a real error — never + // masked as "no pending approval". + try { + return await this.deps.llm.getPendingInterrupts({ + completionAdapter: mode.completionAdapter, + sessionId, + }); + } catch (error) { + logger.error( + `Failed to read pending approval state from checkpoint for session "${sessionId}": ${getErrorMessage(error)}`, + ); + throw error; + } + } + return this.pendingInterrupts.get(sessionId) ?? []; + } + + private async prepareTurn(input: RunAndPersistAgentResponseInput): Promise { + const sequenceDebugSink = this.deps.createDebugSink(); + const mode = this.resolveMode(input.modeName); + const approvalDecision = getApprovalDecision(input); + const shouldResume = Boolean(approvalDecision); + + let turnId: string; + let previousUserMessages: PreviousUserMessage[] = []; + let initialResponse: string | undefined; + let resumeInterrupts: PendingInterrupt[] | undefined; + + if (shouldResume) { + resumeInterrupts = await this.resolvePendingInterrupts(input.sessionId, mode); + if (resumeInterrupts.length === 0) { + throw new Error(`No pending approval interrupt found for session "${input.sessionId}".`); + } + const resumeState = await this.deps.sessions.getResumeState(input.sessionId); + turnId = resumeState.turnId; + initialResponse = resumeState.initialResponse; + } else { + previousUserMessages = await this.deps.sessions.getPreviousUserMessages(input.sessionId); + turnId = await this.deps.sessions.createNewTurn(input.sessionId, input.prompt); + await this.deps.sessions.touchSession(input.sessionId); + } + + return { + prompt: input.prompt, + sessionId: input.sessionId, + turnId, + previousUserMessages, + mode, + context: this.buildContext(input, turnId), + observability: { + emit: input.emit, + sequenceDebugSink, + }, + resume: shouldResume + ? { decision: approvalDecision!, interrupts: resumeInterrupts } + : undefined, + initialResponse, + }; + } + + private async buildMessages(prepared: PreparedTurn): Promise { + const userLanguage = await this.deps.llm + .detectLanguage({ + completionAdapter: prepared.mode.completionAdapter, + prompt: prepared.prompt, + previousUserMessages: prepared.previousUserMessages, + }) + .catch((error) => { + if (prepared.context.abortSignal?.aborted || isAbortError(error)) { + throw error; + } + logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); + return null; + }); + + const adminforth = this.deps.getAdminforth(); + const systemPrompt = buildAgentTurnSystemPrompt({ + agentSystemPrompt: await this.deps.getAgentSystemPrompt(), + adminUser: prepared.context.adminUser, + usernameField: adminforth.config.auth!.usernameField, + userLanguage, + chatSurface: prepared.context.chatSurface, + }); + + return [ + { role: "system", content: systemPrompt }, + { role: "user", content: prepared.prompt }, + ]; + } + + private async handleInterrupt( + prepared: PreparedTurn, + interrupt: unknown, + descriptors: PendingInterrupt[], + ) { + if (!this.deps.hasPersistentCheckpointer) { + const existing = this.pendingInterrupts.get(prepared.sessionId) ?? []; + const merged = new Map(existing.map((item) => [item.id, item.count])); + for (const item of descriptors) { + merged.set(item.id, item.count); + } + this.pendingInterrupts.set( + prepared.sessionId, + [...merged.entries()].map(([id, count]) => ({ id, count })), + ); + } + await prepared.observability.emit?.({ + type: "interrupt", + sessionId: prepared.sessionId, + interrupt, + }); + } + + private async runAgentTurn(prepared: PreparedTurn): Promise<{ text: string }> { + const streamInput = prepared.resume + ? { + resume: buildResumeValue({ + decision: prepared.resume.decision, + interrupts: prepared.resume.interrupts, + prompt: prepared.prompt, + }), + } + : { messages: await this.buildMessages(prepared) }; + + const stream = await this.deps.llm.streamTurn({ + completionAdapter: prepared.mode.completionAdapter, + input: streamInput, + context: prepared.context, + observability: prepared.observability, + }); + + const { emit } = prepared.observability; + const abortSignal = prepared.context.abortSignal; + const textBuffer = new VegaLiteStreamBuffer(); + let fullResponse = ""; + let interrupted = false; + + try { + for await (const chunk of stream) { + if (abortSignal?.aborted) { + throw new DOMException("This operation was aborted", "AbortError"); + } + + if (chunk.kind === "interrupt") { + interrupted = true; + await this.handleInterrupt(prepared, chunk.interrupt, chunk.descriptors); + continue; + } + + if (chunk.kind === "reasoning") { + if (chunk.delta) { + await emit?.({ type: "reasoning-delta", delta: chunk.delta }); + } + continue; + } + + if (chunk.delta) { + fullResponse += chunk.delta; + await textBuffer.push(chunk.delta, emit); + } + } + + await textBuffer.flush(emit); + return { text: fullResponse }; + } finally { + if (!this.deps.hasPersistentCheckpointer && !interrupted) { + this.pendingInterrupts.delete(prepared.sessionId); + } + } + } + + async runAndPersistAgentResponse( + input: RunAndPersistAgentResponseInput, + ): Promise { + const prepared = await this.prepareTurn(input); + + let fullResponse = prepared.initialResponse ?? ""; + let aborted = false; + let failed = false; + + try { + const agentResponse = await this.runAgentTurn(prepared); + fullResponse += agentResponse.text; + } catch (error) { + if (input.abortSignal?.aborted || isAbortError(error)) { + aborted = true; + logger.info(input.abortLogMessage); + } else { + failed = true; + fullResponse = getErrorMessage(error); + logger.error(`${input.failureLogMessage}:\n${fullResponse}`); + } + } + + prepared.observability.sequenceDebugSink.flush(); + await this.deps.sessions.saveTurnResponse({ + turnId: prepared.turnId, + responseText: fullResponse, + debugHistory: prepared.observability.sequenceDebugSink.getHistory(), + }); + + return { + text: fullResponse, + turnId: prepared.turnId, + aborted, + failed, + }; + } + + async handleTurn(input: HandleTurnInput) { + await input.emit({ + type: "turn-started", + messageId: randomUUID(), + }); + + const agentResponse = await this.runAndPersistAgentResponse({ + prompt: input.prompt, + sessionId: input.sessionId, + modeName: input.modeName, + userTimeZone: input.userTimeZone, + currentPage: input.currentPage, + chatSurface: input.chatSurface, + adminPublicOrigin: input.adminPublicOrigin, + approvalDecision: input.approvalDecision, + abortSignal: input.abortSignal, + adminUser: input.adminUser, + emit: input.emit, + failureLogMessage: input.failureLogMessage ?? "Agent response failed", + abortLogMessage: input.abortLogMessage ?? "Agent response aborted", + }); + + if (agentResponse.failed) { + await input.emit({ + type: "error", + error: agentResponse.text, + }); + } else if (!agentResponse.aborted) { + await input.emit({ + type: "response", + text: agentResponse.text, + sessionId: input.sessionId, + turnId: agentResponse.turnId, + }); + } + + await input.emit({ + type: "finish", + }); + + return agentResponse; + } +} diff --git a/agentEvents.ts b/domain/agentEvents.ts similarity index 95% rename from agentEvents.ts rename to domain/agentEvents.ts index 1625cda..70c8448 100644 --- a/agentEvents.ts +++ b/domain/agentEvents.ts @@ -1,4 +1,4 @@ -import type { ToolCallEvent } from "./agent/toolCallEvents.js"; +import type { ToolCallEvent } from "./toolCallEvents.js"; export type AgentEvent = | { diff --git a/agent/languageDetect.ts b/domain/languageDetect.ts similarity index 100% rename from agent/languageDetect.ts rename to domain/languageDetect.ts diff --git a/agent/systemPrompt.ts b/domain/systemPrompt.ts similarity index 99% rename from agent/systemPrompt.ts rename to domain/systemPrompt.ts index 010324d..ea6fab6 100644 --- a/agent/systemPrompt.ts +++ b/domain/systemPrompt.ts @@ -3,7 +3,7 @@ import type { DetectedLanguage } from "./languageDetect.js"; import { listProjectSkillManifests, type AgentSkillManifest, -} from "./skills/registry.js"; +} from "../tools/skills/registry.js"; export const DEFAULT_AGENT_SYSTEM_PROMPT = [ "You are helpful AI Assistant for Admin Panel.", diff --git a/agent/toolCallEvents.ts b/domain/toolCallEvents.ts similarity index 100% rename from agent/toolCallEvents.ts rename to domain/toolCallEvents.ts diff --git a/agent/turn/turnTypes.ts b/domain/turnTypes.ts similarity index 55% rename from agent/turn/turnTypes.ts rename to domain/turnTypes.ts index a260297..10956ce 100644 --- a/agent/turn/turnTypes.ts +++ b/domain/turnTypes.ts @@ -1,11 +1,24 @@ import type { AdminUser, AudioAdapter } from "adminforth"; -import type { Messages } from "@langchain/langgraph"; -import type { Command } from "@langchain/langgraph"; -import type { AgentChatModel, AgentMiddleware } from "../simpleAgent.js"; -import type { SequenceDebugCollector } from "../middleware/sequenceDebug.js"; -import type { PreviousUserMessage } from "../languageDetect.js"; +import type { PreviousUserMessage } from "./languageDetect.js"; import type { CurrentPageContext } from "../tools/getUserLocation.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "./agentEvents.js"; + +/** + * Minimal sink the application layer uses to persist per-turn debug traces. The + * concrete implementation (the sequence-debug collector) lives in the llm layer; + * the domain/application layers depend only on this narrow interface, so the + * dependency direction stays llm -> application/domain. + */ +export type DebugSink = { + flush(): void; + getHistory(): unknown[]; +}; + +/** A pending human-in-the-loop approval, normalized (provider-agnostic). */ +export type PendingInterrupt = { + id: string; + count: number; +}; export type BaseAgentTurnInput = { prompt: string; @@ -51,35 +64,7 @@ export type AgentTurnContext = { export type AgentTurnObservability = { emit?: AgentEventEmitter; - sequenceDebugSink: SequenceDebugCollector; -}; - -export type PreparedAgentTurn = { - prompt: string; - sessionId: string; - turnId: string; - previousUserMessages: PreviousUserMessage[]; - modeName?: string | null; - context: AgentTurnContext; - observability: AgentTurnObservability; - resume?: { - decision: "approve" | "reject"; - interrupts?: { id: string; count: number }[]; - }; - initialResponse?: string; -}; - -export type AgentTurnModels = { - model: AgentChatModel; - summaryModel: AgentChatModel; - modelMiddleware?: AgentMiddleware[]; -}; - -export type AgentRuntimeRunInput = { - models: AgentTurnModels; - input: { messages: Messages } | Command; - context: AgentTurnContext; - observability: AgentTurnObservability; + sequenceDebugSink: DebugSink; }; export type RunAndPersistAgentResponseInput = BaseAgentTurnInput & { @@ -98,3 +83,22 @@ export type RunAndPersistAgentResponseResult = { export type HandleTurnInput = TextAgentTurnInput; export type HandleSpeechTurnInput = SpeechAgentTurnInput; + +/** + * Provider-agnostic message passed from the application layer to the LLM port. + * The infrastructure adapter maps it onto concrete LangChain messages. + */ +export type AgentMessage = { + role: "system" | "user"; + content: string; +}; + +/** + * Normalized unit of a streamed agent turn. The LLM port converts the raw + * LangGraph stream into these typed chunks so the application layer never + * touches LangGraph's tuple/`any` stream shape. + */ +export type AgentStreamChunk = + | { kind: "text"; delta: string } + | { kind: "reasoning"; delta: string } + | { kind: "interrupt"; interrupt: unknown; descriptors: PendingInterrupt[] }; diff --git a/agent/turn/VegaLiteStreamBuffer.ts b/domain/vegaLiteStreamBuffer.ts similarity index 97% rename from agent/turn/VegaLiteStreamBuffer.ts rename to domain/vegaLiteStreamBuffer.ts index 69ef2b3..8edbf5f 100644 --- a/agent/turn/VegaLiteStreamBuffer.ts +++ b/domain/vegaLiteStreamBuffer.ts @@ -1,4 +1,4 @@ -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "./agentEvents.js"; const VEGA_LITE_FENCE_START = "```vega-lite"; const FENCE_END = "```"; diff --git a/index.ts b/index.ts index 400e35c..2408c52 100644 --- a/index.ts +++ b/index.ts @@ -4,31 +4,27 @@ import type { IHttpServer, } from "adminforth"; -import { AdminForthPlugin } from "adminforth"; +import { AdminForthPlugin, logger } from "adminforth"; import type { PluginOptions } from './types.js'; import { MemorySaver, type BaseCheckpointSaver } from "@langchain/langgraph"; -import { AdminForthCheckpointSaver } from "./agent/checkpointer.js"; -import { appendCustomSystemPrompt, buildAgentSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./agent/systemPrompt.js"; -import { setupCoreEndpoints } from "./endpoints/core.js"; -import { setupSessionEndpoints } from "./endpoints/sessions.js"; -import { setupChatSurfaceEndpoints } from "./endpoints/chatSurfaces.js"; -import type { AgentEndpointsContext } from "./endpoints/context.js"; -import { AgentSessionStore } from "./sessionStore.js"; -import { ChatSurfaceService } from "./chatSurfaceService.js"; -import { AgentTurnService } from "./agentTurnService.js"; -import { AgentModelFactory } from "./agent/models/AgentModelFactory.js"; -import { AgentModeResolver } from "./agent/models/AgentModeResolver.js"; -import { AgentRuntime } from "./agent/runtime/AgentRuntime.js"; -import { SpeechTurnService } from "./agent/speech/SpeechTurnService.js"; -import { AgentToolProvider } from "./agent/tools/AgentToolProvider.js"; -import { TurnContextBuilder } from "./agent/turn/TurnContextBuilder.js"; -import { TurnLifecycleService } from "./agent/turn/TurnLifecycleService.js"; -import { TurnPersistenceService } from "./agent/turn/TurnPersistenceService.js"; -import { TurnPromptBuilder } from "./agent/turn/TurnPromptBuilder.js"; -import { TurnStreamConsumer } from "./agent/turn/TurnStreamConsumer.js"; +import { AdminForthCheckpointSaver } from "./persistence/checkpointStore.js"; +import { appendCustomSystemPrompt, buildAgentSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./domain/systemPrompt.js"; +import { setupCoreEndpoints } from "./transport/http/coreEndpoints.js"; +import { setupSessionEndpoints } from "./transport/http/sessionEndpoints.js"; +import { setupChatSurfaceEndpoints } from "./transport/http/chatSurfaceEndpoints.js"; +import type { AgentEndpointsContext } from "./transport/http/context.js"; +import { AgentSessionStore } from "./persistence/sessionStore.js"; +import { ChatSurfaceService } from "./transport/surfaces/chatSurfaceService.js"; +import { RunTurnUseCase } from "./application/runTurnUseCase.js"; +import { createSequenceDebugCollector } from "./llm/middleware/sequenceDebug.js"; +import { LangGraphLlm } from "./llm/langGraphLlm.js"; +import { AgentModelFactory } from "./llm/modelFactory.js"; +import { AgentRuntime } from "./llm/agentRuntime.js"; +import { SpeechTurnService } from "./transport/surfaces/speechTurnService.js"; +import { AgentToolProvider } from "./tools/agentToolProvider.js"; -export type { AgentEvent, AgentEventEmitter } from "./agentEvents.js"; +export type { AgentEvent, AgentEventEmitter } from "./domain/agentEvents.js"; export default class AdminForthAgentPlugin extends AdminForthPlugin { options: PluginOptions; @@ -37,9 +33,10 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { private agentSystemPrompt: string | null = null; private checkpointer: BaseCheckpointSaver | null = null; private sessionStore: AgentSessionStore; - private agentTurnService: AgentTurnService; + private runTurnUseCase: RunTurnUseCase; private speechTurnService: SpeechTurnService; private chatSurfaceService: ChatSurfaceService; + private getCheckpointer() { if (this.checkpointer) return this.checkpointer; @@ -58,6 +55,22 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { ].filter((resourceId): resourceId is string => Boolean(resourceId)); } + private async getAgentSystemPrompt(): Promise { + if (!this.agentSystemPrompt) { + const systemPrompt = await buildAgentSystemPrompt( + this.adminforth, + this.getInternalAgentResourceIds(), + ).catch((err) => { + logger.error( + `Failed to build agent system prompt, falling back to default: ${err instanceof Error ? err.message : String(err)}`, + ); + return DEFAULT_AGENT_SYSTEM_PROMPT; + }); + this.agentSystemPrompt = appendCustomSystemPrompt(systemPrompt, this.options.systemPrompt); + } + return this.agentSystemPrompt; + } + constructor(options: PluginOptions) { super(options, import.meta.url); this.options = options; @@ -72,39 +85,28 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { getCheckpointer: this.getCheckpointer.bind(this), toolProvider, }); - const persistence = new TurnPersistenceService(() => this.adminforth, this.options); - this.agentTurnService = new AgentTurnService( - new TurnLifecycleService(this.sessionStore, persistence, this.options), - new TurnContextBuilder(() => this.adminforth), - new AgentModeResolver(this.options), - new AgentModelFactory(this.options.maxTokens ?? 1000), - new TurnPromptBuilder({ - getAdminforth: () => this.adminforth, - getAgentSystemPrompt: async () => { - if (!this.agentSystemPrompt) { - const systemPrompt = await buildAgentSystemPrompt( - this.adminforth, - this.getInternalAgentResourceIds(), - ).catch((err) => { - return DEFAULT_AGENT_SYSTEM_PROMPT; - }); - this.agentSystemPrompt = appendCustomSystemPrompt(systemPrompt, this.options.systemPrompt); - } - return this.agentSystemPrompt; - }, - }), + const llm = new LangGraphLlm( runtime, - new TurnStreamConsumer(), + new AgentModelFactory(this.options.maxTokens ?? 1000), ); + this.runTurnUseCase = new RunTurnUseCase({ + llm, + sessions: this.sessionStore, + modes: this.options.modes, + getAdminforth: () => this.adminforth, + getAgentSystemPrompt: this.getAgentSystemPrompt.bind(this), + hasPersistentCheckpointer: Boolean(this.options.checkpointResource), + createDebugSink: createSequenceDebugCollector, + }); this.speechTurnService = new SpeechTurnService( - this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), ); this.chatSurfaceService = new ChatSurfaceService( () => this.adminforth, this.options, this.sessionStore, - this.agentTurnService.handleTurn.bind(this.agentTurnService), - this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + this.runTurnUseCase.handleTurn.bind(this.runTurnUseCase), + this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), ); this.agentSystemPromptPromise = Promise.resolve( appendCustomSystemPrompt(DEFAULT_AGENT_SYSTEM_PROMPT, this.options.systemPrompt), @@ -151,7 +153,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { throw new Error("sessionResource is required for AdminForthAgentPlugin"); } } - + validateConfigAfterDiscover(adminforth: IAdminForth, resourceConfig: AdminForthResource) { this.options.audioAdapter?.validate(); for (const chatSurfaceAdapter of this.options.chatSurfaceAdapters ?? []) { @@ -176,9 +178,9 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { const endpointContext = { adminforth: this.adminforth, options: this.options, - handleTurn: this.agentTurnService.handleTurn.bind(this.agentTurnService), + handleTurn: this.runTurnUseCase.handleTurn.bind(this.runTurnUseCase), handleSpeechTurn: this.speechTurnService.handle.bind(this.speechTurnService), - runAndPersistAgentResponse: this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + runAndPersistAgentResponse: this.runTurnUseCase.runAndPersistAgentResponse.bind(this.runTurnUseCase), getSessionTurns: this.sessionStore.getSessionTurns.bind(this.sessionStore), createNewTurn: this.sessionStore.createNewTurn.bind(this.sessionStore), createSystemTurn: this.sessionStore.createSystemTurn.bind(this.sessionStore), diff --git a/agent/runtime/AgentContext.ts b/llm/agentContext.ts similarity index 80% rename from agent/runtime/AgentContext.ts rename to llm/agentContext.ts index 12d3d65..9747e62 100644 --- a/agent/runtime/AgentContext.ts +++ b/llm/agentContext.ts @@ -1,9 +1,9 @@ import type { AdminUser } from "adminforth"; import { z } from "zod"; -import type { AgentEventEmitter } from "../../agentEvents.js"; -import type { SequenceDebugCollector } from "../middleware/sequenceDebug.js"; +import type { AgentEventEmitter } from "../domain/agentEvents.js"; +import type { SequenceDebugCollector } from "./middleware/sequenceDebug.js"; import type { CurrentPageContext } from "../tools/getUserLocation.js"; -import type { AgentTurnContext } from "../turn/turnTypes.js"; +import type { AgentTurnContext } from "../domain/turnTypes.js"; export const contextSchema = z.object({ adminUser: z.custom(), diff --git a/llm/agentModels.ts b/llm/agentModels.ts new file mode 100644 index 0000000..bbb80a0 --- /dev/null +++ b/llm/agentModels.ts @@ -0,0 +1,103 @@ +import type { BaseChatModel } from "@langchain/core/language_models/chat_models"; +import { + type CompletionAdapter, +} from "adminforth"; +import { BaseCallbackHandler } from "@langchain/core/callbacks/base"; +import type { LLMResult } from "@langchain/core/outputs"; +import type { Messages, Command } from "@langchain/langgraph"; +import { + createSequenceDebugMiddleware, +} from "./middleware/sequenceDebug.js"; +import type { AgentModeCompletionAdapter, AgentModelPurpose } from "../application/ports.js"; +import type { AgentTurnContext, AgentTurnObservability } from "../domain/turnTypes.js"; + +export type { AgentModeCompletionAdapter, AgentModelPurpose } from "../application/ports.js"; + +export type AgentChatModel = BaseChatModel; +export type AgentMiddleware = ReturnType; + +export type AgentTurnModels = { + model: AgentChatModel; + summaryModel: AgentChatModel; + modelMiddleware?: AgentMiddleware[]; +}; + +export type AgentRuntimeRunInput = { + models: AgentTurnModels; + input: { messages: Messages } | Command; + context: AgentTurnContext; + observability: AgentTurnObservability; +}; + +type AgentChatModelSpec = { + model: AgentChatModel; + middleware: AgentMiddleware[]; +}; + +type PendingLlmRun = { + startedAt: number; +}; + +function isLangChainAgentCompletionAdapter( + adapter: CompletionAdapter, +): adapter is AgentModeCompletionAdapter { + return typeof (adapter as AgentModeCompletionAdapter) + .getLangChainAgentSpec === "function"; +} + +async function getAgentChatModelSpec(params: { + adapter: AgentModeCompletionAdapter; + maxTokens: number; + purpose: AgentModelPurpose; +}): Promise { + const spec = await params.adapter.getLangChainAgentSpec({ + maxTokens: params.maxTokens, + purpose: params.purpose, + }); + + return { + model: spec.model as AgentChatModel, + middleware: (spec.middleware ?? []) as AgentMiddleware[], + }; +} + +class AgentLlmMetricsLogger extends BaseCallbackHandler { + name = "AgentLlmMetricsLogger"; + lc_prefer_streaming = true; + + private readonly pendingRuns = new Map(); + + async handleLLMStart(_llm: unknown, _prompts: string[], runId: string) { + this.pendingRuns.set(runId, { startedAt: Date.now() }); + } + + async handleLLMEnd(_output: LLMResult, runId: string) { + this.pendingRuns.delete(runId); + } + + async handleLLMError(_error: unknown, runId: string) { + this.pendingRuns.delete(runId); + } +} + +export function createAgentLlmMetricsLogger() { + return new AgentLlmMetricsLogger(); +} + +export async function createAgentChatModel(params: { + adapter: CompletionAdapter; + maxTokens: number; + purpose: AgentModelPurpose; +}) { + if (!isLangChainAgentCompletionAdapter(params.adapter)) { + throw new Error( + "AdminForth Agent requires completionAdapter to implement getLangChainAgentSpec({ maxTokens, purpose }).", + ); + } + + return await getAgentChatModelSpec({ + adapter: params.adapter, + maxTokens: params.maxTokens, + purpose: params.purpose, + }); +} diff --git a/agent/runtime/AgentRuntime.ts b/llm/agentRuntime.ts similarity index 54% rename from agent/runtime/AgentRuntime.ts rename to llm/agentRuntime.ts index ef8a8f8..9fc7c3c 100644 --- a/agent/runtime/AgentRuntime.ts +++ b/llm/agentRuntime.ts @@ -1,13 +1,13 @@ import type { IAdminForth } from "adminforth"; import { createAgent, summarizationMiddleware, humanInTheLoopMiddleware } from "langchain"; import type { BaseCheckpointSaver } from "@langchain/langgraph"; -import { createApiBasedToolsMiddleware } from "../middleware/apiBasedTools.js"; -import { createSequenceDebugMiddleware } from "../middleware/sequenceDebug.js"; -import { createAgentLlmMetricsLogger } from "../simpleAgent.js"; -import type { AgentToolProvider } from "../tools/AgentToolProvider.js"; -import type { AgentRuntimeRunInput } from "../turn/turnTypes.js"; -import { contextSchema, toLangchainAgentContext } from "./AgentContext.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import { createApiBasedToolsMiddleware } from "./middleware/apiToolsMiddleware.js"; +import { createSequenceDebugMiddleware, type SequenceDebugCollector } from "./middleware/sequenceDebug.js"; +import { createAgentLlmMetricsLogger } from "./agentModels.js"; +import type { AgentToolProvider } from "../tools/agentToolProvider.js"; +import type { AgentRuntimeRunInput, AgentTurnModels } from "./agentModels.js"; +import { contextSchema, toLangchainAgentContext } from "./agentContext.js"; +import type { ApiBasedTool } from "../tools/apiBasedTools.js"; function createHumanInTheLoopInterrupts( apiBasedTools: Record, @@ -42,9 +42,10 @@ export class AgentRuntime { apiBasedTools, adminforth, ); - const sequenceDebugMiddleware = createSequenceDebugMiddleware( - input.observability.sequenceDebugSink, - ); + // The domain exposes only the narrow DebugSink; the concrete collector (with the + // model-call hooks the debug middleware needs) is an llm-layer detail. + const sequenceDebugSink = input.observability.sequenceDebugSink as SequenceDebugCollector; + const sequenceDebugMiddleware = createSequenceDebugMiddleware(sequenceDebugSink); const hitlMiddleware = humanInTheLoopMiddleware({ interruptOn: createHumanInTheLoopInterrupts(apiBasedTools), descriptionPrefix: "Tool execution pending approval", @@ -82,8 +83,40 @@ export class AgentRuntime { ...input.context, adminBaseUrl: adminforth.config.baseUrlSlashed, emit: input.observability.emit, - sequenceDebugSink: input.observability.sequenceDebugSink, + sequenceDebugSink, }), }); } + + /** + * Read the pending human-in-the-loop interrupts persisted for a thread from the + * checkpointer. Builds a minimal agent (model + checkpointer + HITL middleware) and + * queries its state — no model call is made. Returns raw LangGraph interrupt objects. + */ + async getPendingInterrupts(input: { + models: AgentTurnModels; + sessionId: string; + }): Promise { + const apiBasedTools = this.options.toolProvider.getApiBasedTools(); + const tools = await this.options.toolProvider.getTools(apiBasedTools); + const agent = createAgent({ + name: this.options.name, + model: input.models.model, + checkpointer: this.options.getCheckpointer(), + tools, + contextSchema, + middleware: [ + humanInTheLoopMiddleware({ + interruptOn: createHumanInTheLoopInterrupts(apiBasedTools), + descriptionPrefix: "Tool execution pending approval", + }), + ], + }); + + const state = await (agent as { getState: (config: unknown) => Promise }).getState({ + configurable: { thread_id: input.sessionId }, + }); + const tasks = Array.isArray(state?.tasks) ? state.tasks : []; + return tasks.flatMap((task: any) => (Array.isArray(task?.interrupts) ? task.interrupts : [])); + } } diff --git a/llm/langGraphLlm.ts b/llm/langGraphLlm.ts new file mode 100644 index 0000000..9d0f5c7 --- /dev/null +++ b/llm/langGraphLlm.ts @@ -0,0 +1,66 @@ +import { Command } from "@langchain/langgraph"; +import { HumanMessage, SystemMessage } from "langchain"; +import type { AgentRuntime } from "./agentRuntime.js"; +import type { AgentModelFactory } from "./modelFactory.js"; +import { detectUserLanguage, type PreviousUserMessage } from "../domain/languageDetect.js"; +import { adaptRawStream, normalizeInterrupts } from "./streamAdapter.js"; +import type { AgentModeCompletionAdapter, LlmPort, LlmStreamInput } from "../application/ports.js"; +import type { AgentMessage, PendingInterrupt } from "../domain/turnTypes.js"; + +function toLangchainMessages(messages: AgentMessage[]) { + return messages.map((message) => + message.role === "system" + ? new SystemMessage(message.content) + : new HumanMessage(message.content), + ); +} + +/** + * LangChain/LangGraph implementation of {@link LlmPort}. This is the single + * place that knows about the agent graph, model construction, and the raw + * stream shape; the application layer talks only to the port. + */ +export class LangGraphLlm implements LlmPort { + constructor( + private readonly runtime: AgentRuntime, + private readonly modelFactory: AgentModelFactory, + ) {} + + async streamTurn(input: LlmStreamInput) { + const models = await this.modelFactory.create(input.completionAdapter); + const runtimeInput = + "resume" in input.input + ? new Command({ resume: input.input.resume }) + : { messages: toLangchainMessages(input.input.messages) }; + + const rawStream = await this.runtime.stream({ + models, + input: runtimeInput as any, + context: input.context, + observability: input.observability, + }); + + return adaptRawStream(rawStream as AsyncIterable); + } + + detectLanguage(input: { + completionAdapter: AgentModeCompletionAdapter; + prompt: string; + previousUserMessages: PreviousUserMessage[]; + }) { + return detectUserLanguage( + input.completionAdapter, + input.prompt, + input.previousUserMessages, + ); + } + + async getPendingInterrupts(input: { + completionAdapter: AgentModeCompletionAdapter; + sessionId: string; + }): Promise { + const models = await this.modelFactory.create(input.completionAdapter); + const raw = await this.runtime.getPendingInterrupts({ models, sessionId: input.sessionId }); + return normalizeInterrupts(raw); + } +} diff --git a/agent/middleware/apiBasedTools.ts b/llm/middleware/apiToolsMiddleware.ts similarity index 86% rename from agent/middleware/apiBasedTools.ts rename to llm/middleware/apiToolsMiddleware.ts index 500282c..726bf0d 100644 --- a/agent/middleware/apiBasedTools.ts +++ b/llm/middleware/apiToolsMiddleware.ts @@ -5,16 +5,16 @@ import { logger, type AdminUser, type IAdminForth } from "adminforth"; import { formatApiBasedToolCall, type ApiBasedTool, -} from "../../apiBasedTools.js"; +} from "../../tools/apiBasedTools.js"; import { createToolCallTracker, type ToolCallEventSink, -} from "../toolCallEvents.js"; -import { ALWAYS_AVAILABLE_API_TOOL_NAMES } from "../tools/index.js"; -import { createApiTool } from "../tools/apiTool.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +} from "../../domain/toolCallEvents.js"; +import { ALWAYS_AVAILABLE_API_TOOL_NAMES } from "../../tools/index.js"; +import { createApiTool } from "../../tools/apiTool.js"; +import type { AgentEventEmitter } from "../../domain/agentEvents.js"; import type { SequenceDebugCollector } from "./sequenceDebug.js"; -import { isAbortError } from "../../errors.js"; +import { isAbortError } from "../../shared/errors.js"; function getEnabledApiToolNames(messages: unknown[]) { const enabledToolNames = new Set(); @@ -72,10 +72,6 @@ export function createApiBasedToolsMiddleware( .map((toolName) => dynamicTools[toolName]); const availableTools = [...request.tools, ...tools]; - logger.info( - `AdminForth Agent callable tools: ${availableTools.map((tool) => tool.name).join(", ")}`, - ); - return handler({ ...request, tools: availableTools, @@ -83,7 +79,6 @@ export function createApiBasedToolsMiddleware( }, async wrapToolCall(request, handler) { const startedAt = Date.now(); - const toolInput = JSON.stringify(request.toolCall.args ?? {}); if (!request.toolCall.id) { throw new Error(`Tool call "${request.toolCall.name}" has no id.`); } @@ -128,9 +123,6 @@ export function createApiBasedToolsMiddleware( startedAt, }); toolCallTracker.start(); - logger.info( - `Invoking tool "${request.toolCall.name}" with input: ${toolInput}`, - ); try { @@ -164,10 +156,6 @@ export function createApiBasedToolsMiddleware( status: "error", content: `Error: ${message}`, }) - } finally { - logger.info( - `Tool "${request.toolCall.name}" finished in ${Date.now() - startedAt}ms`, - ); } }, }); diff --git a/agent/middleware/sequenceDebug.ts b/llm/middleware/sequenceDebug.ts similarity index 99% rename from agent/middleware/sequenceDebug.ts rename to llm/middleware/sequenceDebug.ts index f24939d..6294059 100644 --- a/agent/middleware/sequenceDebug.ts +++ b/llm/middleware/sequenceDebug.ts @@ -1,7 +1,7 @@ import { AIMessage } from "@langchain/core/messages"; import { createMiddleware } from "langchain"; import YAML from "yaml"; -import type { ToolCallEvent } from "../toolCallEvents.js"; +import type { ToolCallEvent } from "../../domain/toolCallEvents.js"; export type SequenceDebugResultType = "tool_calls" | "final_text"; diff --git a/agent/models/AgentModelFactory.ts b/llm/modelFactory.ts similarity index 86% rename from agent/models/AgentModelFactory.ts rename to llm/modelFactory.ts index 66beca8..187caaa 100644 --- a/agent/models/AgentModelFactory.ts +++ b/llm/modelFactory.ts @@ -1,6 +1,5 @@ import type { CompletionAdapter } from "adminforth"; -import { createAgentChatModel } from "../simpleAgent.js"; -import type { AgentTurnModels } from "../turn/turnTypes.js"; +import { createAgentChatModel, type AgentTurnModels } from "./agentModels.js"; export class AgentModelFactory { constructor(private readonly maxTokens: number) {} diff --git a/llm/streamAdapter.ts b/llm/streamAdapter.ts new file mode 100644 index 0000000..260526c --- /dev/null +++ b/llm/streamAdapter.ts @@ -0,0 +1,104 @@ +import type { AgentStreamChunk, PendingInterrupt } from "../domain/turnTypes.js"; + +// Only tokens emitted by the model node(s) are surfaced to the user; other +// graph nodes (tools, summarization, etc.) are internal. +const STREAMABLE_NODES = ["model", "model_request"]; + +function getInterruptItems(interrupt: unknown): unknown[] { + return Array.isArray(interrupt) ? interrupt : [interrupt]; +} + +/** + * Normalize raw LangGraph interrupt object(s) into provider-agnostic descriptors. + * Kept in the llm layer so the LangGraph interrupt shape never reaches the app. + */ +export function normalizeInterrupts(interrupt: unknown): PendingInterrupt[] { + return getInterruptItems(interrupt).flatMap((item) => { + const value = item && typeof item === "object" && "value" in item + ? (item as { value: unknown }).value + : item; + const actionRequests = value && typeof value === "object" + ? (value as { actionRequests?: unknown }).actionRequests + : undefined; + const interruptId = item && typeof item === "object" + ? (item as { id?: unknown }).id + : undefined; + + return typeof interruptId === "string" && Array.isArray(actionRequests) + ? [{ id: interruptId, count: actionRequests.length }] + : []; + }); +} + +type RawStreamEntry = + | ["messages", [any, any]] + | ["updates", Record]; + +function extractBlocks(token: any): any[] { + if (Array.isArray(token?.contentBlocks)) { + return token.contentBlocks; + } + if (Array.isArray(token?.content)) { + return token.content; + } + return []; +} + +/** + * Convert one raw LangGraph stream entry into zero or more normalized chunks. + * Pure and synchronous so the parsing rules can be unit-tested in isolation. + */ +export function parseRawStreamChunk(entry: RawStreamEntry): AgentStreamChunk[] { + const [mode, chunk] = entry; + + if (mode === "updates") { + if (chunk && typeof chunk === "object" && "__interrupt__" in chunk) { + const interrupt = (chunk as Record).__interrupt__; + return [{ kind: "interrupt", interrupt, descriptors: normalizeInterrupts(interrupt) }]; + } + return []; + } + + const [token, metadata] = chunk as [any, any]; + const nodeName = + typeof metadata?.langgraph_node === "string" ? metadata.langgraph_node : ""; + + if (nodeName && !STREAMABLE_NODES.includes(nodeName)) { + return []; + } + + const blocks = extractBlocks(token); + const chunks: AgentStreamChunk[] = []; + + const reasoning = blocks + .filter((block: any) => block?.type === "reasoning") + .map((block: any) => String(block.reasoning ?? "")) + .join(""); + const text = blocks + .filter((block: any) => block?.type === "text") + .map((block: any) => String(block.text ?? "")) + .join(""); + + if (reasoning) { + chunks.push({ kind: "reasoning", delta: reasoning }); + } + if (text) { + chunks.push({ kind: "text", delta: text }); + } + + return chunks; +} + +/** + * Adapt the raw LangGraph `["messages" | "updates", ...]` stream into a typed + * `AgentStreamChunk` stream for the application layer. + */ +export async function* adaptRawStream( + raw: AsyncIterable, +): AsyncGenerator { + for await (const entry of raw) { + for (const chunk of parseRawStreamChunk(entry)) { + yield chunk; + } + } +} diff --git a/agent/checkpointer.ts b/persistence/checkpointStore.ts similarity index 100% rename from agent/checkpointer.ts rename to persistence/checkpointStore.ts diff --git a/sessionStore.ts b/persistence/sessionStore.ts similarity index 73% rename from sessionStore.ts rename to persistence/sessionStore.ts index dc16784..54aa069 100644 --- a/sessionStore.ts +++ b/persistence/sessionStore.ts @@ -2,8 +2,8 @@ import type { AdminUser, IAdminForth } from "adminforth"; import { Filters, Sorts } from "adminforth"; import { randomUUID } from "crypto"; import type { ChatSurfaceIncomingMessage } from "adminforth"; -import type { PreviousUserMessage } from "./agent/languageDetect.js"; -import type { PluginOptions } from "./types.js"; +import type { PreviousUserMessage } from "../domain/languageDetect.js"; +import type { PluginOptions } from "../types.js"; export const AGENT_SYSTEM_TURN_PROMPT = "__adminforth_system_message__"; @@ -102,4 +102,43 @@ export class AgentSessionStore { return sessionId; } + + async touchSession(sessionId: string) { + await this.getAdminforth().resource(this.options.sessionResource.resourceId).update(sessionId, { + [this.options.sessionResource.createdAtField]: new Date().toISOString(), + }); + } + + async saveTurnResponse(input: { + turnId: string; + responseText: string; + debugHistory?: unknown; + }) { + const turnUpdates: Record = { + [this.options.turnResource.responseField]: input.responseText, + }; + + if (this.options.turnResource.debugField) { + turnUpdates[this.options.turnResource.debugField] = input.debugHistory; + } + + await this.getAdminforth() + .resource(this.options.turnResource.resourceId) + .update(input.turnId, turnUpdates); + } + + async getResumeState(sessionId: string): Promise<{ turnId: string; initialResponse: string }> { + const latestTurn = await this.getLatestTurn(sessionId); + + if (!latestTurn) { + throw new Error(`No agent turn found for session "${sessionId}".`); + } + + const response = latestTurn[this.options.turnResource.responseField]; + + return { + turnId: latestTurn[this.options.turnResource.idField], + initialResponse: response === "not_finished" ? "" : String(response), + }; + } } diff --git a/errors.ts b/shared/errors.ts similarity index 100% rename from errors.ts rename to shared/errors.ts diff --git a/sanitizeSpeechText.ts b/shared/sanitizeSpeechText.ts similarity index 100% rename from sanitizeSpeechText.ts rename to shared/sanitizeSpeechText.ts diff --git a/tests/session_store.test.ts b/tests/session_store.test.ts index a3a37ce..fe3ab81 100644 --- a/tests/session_store.test.ts +++ b/tests/session_store.test.ts @@ -1,7 +1,7 @@ import { AgentSessionStore, AGENT_SYSTEM_TURN_PROMPT, -} from '../sessionStore.js'; +} from '../persistence/sessionStore.js'; // Characterization tests for session/turn persistence. Uses a fake AdminForth resource // so we freeze the field mapping and the read/transform logic (previous-message windowing, diff --git a/tests/sse_emitter.test.ts b/tests/sse_emitter.test.ts index 4b47d05..d5db630 100644 --- a/tests/sse_emitter.test.ts +++ b/tests/sse_emitter.test.ts @@ -1,4 +1,4 @@ -import { createSseEventEmitter } from '../surfaces/web-sse/createSseEventEmitter.js'; +import { createSseEventEmitter } from '../transport/sse/sseWriter.js'; // Characterization tests for the outbound SSE wire contract. This is the plugin's // external streaming protocol (consumed by the Vercel-AI-UI frontend on /agent/response diff --git a/tests/stream_adapter.test.ts b/tests/stream_adapter.test.ts new file mode 100644 index 0000000..d258fd9 --- /dev/null +++ b/tests/stream_adapter.test.ts @@ -0,0 +1,73 @@ +import { parseRawStreamChunk } from '../llm/streamAdapter.js'; + +// Characterization tests for the raw LangGraph -> typed AgentStreamChunk conversion +// (the parsing that previously lived inside TurnStreamConsumer). + +describe('parseRawStreamChunk', () => { + it('extracts text deltas from model-node messages', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ content: [{ type: 'text', text: 'Hi' }] }, { langgraph_node: 'model' }], + ]); + expect(out).toEqual([{ kind: 'text', delta: 'Hi' }]); + }); + + it('emits reasoning before text within a single chunk', () => { + const out = parseRawStreamChunk([ + 'messages', + [ + { content: [{ type: 'reasoning', reasoning: 'r' }, { type: 'text', text: 't' }] }, + { langgraph_node: 'model' }, + ], + ]); + expect(out).toEqual([ + { kind: 'reasoning', delta: 'r' }, + { kind: 'text', delta: 't' }, + ]); + }); + + it('reads token.contentBlocks as well as token.content', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ contentBlocks: [{ type: 'text', text: 'B' }] }, { langgraph_node: 'model_request' }], + ]); + expect(out).toEqual([{ kind: 'text', delta: 'B' }]); + }); + + it('drops tokens emitted by non-model nodes', () => { + const out = parseRawStreamChunk([ + 'messages', + [{ content: [{ type: 'text', text: 'x' }] }, { langgraph_node: 'tools' }], + ]); + expect(out).toEqual([]); + }); + + it('surfaces interrupts from update entries with normalized descriptors', () => { + const out = parseRawStreamChunk([ + 'updates', + { __interrupt__: [{ id: 'i1', value: { actionRequests: [{}, {}] } }] }, + ]); + expect(out).toEqual([ + { + kind: 'interrupt', + interrupt: [{ id: 'i1', value: { actionRequests: [{}, {}] } }], + descriptors: [{ id: 'i1', count: 2 }], + }, + ]); + }); + + it('yields empty descriptors when the interrupt has no actionRequests', () => { + const out = parseRawStreamChunk(['updates', { __interrupt__: [{ id: 'i1' }] }]); + expect(out).toEqual([{ kind: 'interrupt', interrupt: [{ id: 'i1' }], descriptors: [] }]); + }); + + it('ignores non-interrupt update entries', () => { + expect(parseRawStreamChunk(['updates', { someNode: {} }])).toEqual([]); + }); + + it('returns nothing for empty content', () => { + expect( + parseRawStreamChunk(['messages', [{ content: [] }, { langgraph_node: 'model' }]]), + ).toEqual([]); + }); +}); diff --git a/tests/system_prompt.test.ts b/tests/system_prompt.test.ts index 8b3102a..2896013 100644 --- a/tests/system_prompt.test.ts +++ b/tests/system_prompt.test.ts @@ -3,7 +3,7 @@ import { appendCustomSystemPrompt, buildAgentTurnSystemPrompt, buildAgentSystemPrompt, -} from '../agent/systemPrompt.js'; +} from '../domain/systemPrompt.js'; // Characterization tests for system-prompt assembly (base prompt, per-turn additions, // and the resource/skill catalog prompt). Freezes the composed structure and the diff --git a/tests/turn_flow.test.ts b/tests/turn_flow.test.ts index 3242ae3..d8134d7 100644 --- a/tests/turn_flow.test.ts +++ b/tests/turn_flow.test.ts @@ -1,98 +1,107 @@ -import { AgentTurnService } from '../agentTurnService.js'; -import { TurnStreamConsumer } from '../agent/turn/TurnStreamConsumer.js'; +import { RunTurnUseCase, buildResumeValue } from '../application/runTurnUseCase.js'; +import type { AgentStreamChunk } from '../domain/turnTypes.js'; // Characterization tests for the turn-orchestration flow (the layer between the HTTP -// endpoint and the LLM). We drive the REAL AgentTurnService + REAL TurnStreamConsumer, -// injecting fakes only for the outer collaborators (lifecycle / context / mode / model / -// prompt / runtime). The fake runtime yields a scripted LangGraph-shaped stream, so we -// freeze the emitted AgentEvent sequence, persistence, error/abort handling, and the -// HITL approve→resume path WITHOUT spinning up a real LLM. +// endpoint and the LLM). We drive the REAL RunTurnUseCase, faking only the two true +// boundaries: the LlmPort (a scripted typed stream) and the session repository. This +// freezes the emitted AgentEvent sequence, persistence, error/abort handling, and the +// HITL approve -> resume path without a real LLM or database. -async function* streamOf(...chunks: any[]) { +async function* streamOf(...chunks: AgentStreamChunk[]) { for (const chunk of chunks) { yield chunk; } } -// Shapes the real TurnStreamConsumer expects on the ['messages', ...] / ['updates', ...] stream. -function textChunk(text: string) { - return ['messages', [{ content: [{ type: 'text', text }] }, { langgraph_node: 'model' }]]; -} -function reasoningChunk(text: string) { - return ['messages', [{ content: [{ type: 'reasoning', reasoning: text }] }, { langgraph_node: 'model' }]]; -} -function interruptChunk(interrupts: unknown) { - return ['updates', { __interrupt__: interrupts }]; -} +const text = (delta: string): AgentStreamChunk => ({ kind: 'text', delta }); +const reasoning = (delta: string): AgentStreamChunk => ({ kind: 'reasoning', delta }); +const interrupt = ( + value: unknown, + descriptors: Array<{ id: string; count: number }> = [{ id: 'int-1', count: 1 }], +): AgentStreamChunk => ({ kind: 'interrupt', interrupt: value, descriptors }); -type BuildOpts = { - streamFor?: (callIndex: number, input: any) => AsyncIterable; - resumeInitialResponse?: string; -}; - -function buildService(opts: BuildOpts = {}) { - const runtimeCalls: any[] = []; - const finishCalls: any[] = []; - const startCalls: any[] = []; - const resumeCalls: any[] = []; - - const lifecycle = { - async start(input: any) { - startCalls.push(input); - return { turnId: 'turn-1', previousUserMessages: [] }; +function fakeLlm( + opts: { + streamFor?: (call: number, input: any) => AsyncIterable; + pendingInterrupts?: Array<{ id: string; count: number }>; + pendingInterruptsError?: Error; + } = {}, +) { + const calls: any[] = []; + const getPendingInterruptsCalls: any[] = []; + return { + calls, + getPendingInterruptsCalls, + async streamTurn(input: any) { + calls.push(input); + const factory = opts.streamFor ?? (() => streamOf()); + return factory(calls.length, input); }, - async resume(input: any) { - resumeCalls.push(input); - return { - turnId: 'turn-1', - previousUserMessages: [], - initialResponse: opts.resumeInitialResponse ?? '', - }; + async detectLanguage() { + return null; }, - async finish(payload: any) { - finishCalls.push(payload); + async getPendingInterrupts(input: any) { + getPendingInterruptsCalls.push(input); + if (opts.pendingInterruptsError) { + throw opts.pendingInterruptsError; + } + return opts.pendingInterrupts ?? []; }, }; +} - const contextBuilder = { - async build({ base, turnId }: any) { - return { - adminUser: base.adminUser, - sessionId: base.sessionId, - turnId, - abortSignal: base.abortSignal, - userTimeZone: base.userTimeZone ?? 'UTC', - }; - }, +function fakeSessions(opts: { initialResponse?: string } = {}) { + const calls = { + createNewTurn: [] as any[], + touchSession: [] as string[], + saveTurnResponse: [] as any[], + getResumeState: 0, }; - - const modeResolver = { resolve: () => ({ name: 'default', completionAdapter: {} }) }; - const modelFactory = { - async create() { - return { model: {}, summaryModel: {}, modelMiddleware: [] }; + return { + calls, + async getPreviousUserMessages() { + return []; }, - }; - const promptBuilder = { async build() { return []; } }; - - const runtime = { - async stream(input: any) { - runtimeCalls.push(input); - const factory = opts.streamFor ?? (() => streamOf()); - return factory(runtimeCalls.length, input); + async createNewTurn(sessionId: string, prompt: string) { + calls.createNewTurn.push({ sessionId, prompt }); + return 'turn-1'; + }, + async touchSession(sessionId: string) { + calls.touchSession.push(sessionId); + }, + async getResumeState() { + calls.getResumeState += 1; + return { turnId: 'turn-1', initialResponse: opts.initialResponse ?? '' }; + }, + async saveTurnResponse(payload: any) { + calls.saveTurnResponse.push(payload); }, }; +} - const service = new AgentTurnService( - lifecycle as any, - contextBuilder as any, - modeResolver as any, - modelFactory as any, - promptBuilder as any, - runtime as any, - new TurnStreamConsumer() as any, - ); - - return { service, runtimeCalls, finishCalls, startCalls, resumeCalls }; +function buildUseCase(opts: { + streamFor?: (call: number, input: any) => AsyncIterable; + initialResponse?: string; + pendingInterrupts?: Array<{ id: string; count: number }>; + pendingInterruptsError?: Error; + hasPersistentCheckpointer?: boolean; +} = {}) { + const llm = fakeLlm({ + streamFor: opts.streamFor, + pendingInterrupts: opts.pendingInterrupts, + pendingInterruptsError: opts.pendingInterruptsError, + }); + const sessions = fakeSessions({ initialResponse: opts.initialResponse }); + const useCase = new RunTurnUseCase({ + llm: llm as any, + sessions: sessions as any, + modes: [{ name: 'default', completionAdapter: {} as any }], + getAdminforth: () => ({ config: { auth: { usernameField: 'email' }, baseUrlSlashed: '/admin/' } }) as any, + getAgentSystemPrompt: async () => 'SYS', + hasPersistentCheckpointer: opts.hasPersistentCheckpointer ?? false, + createDebugSink: () => ({ flush() {}, getHistory: () => [] }), + }); + return { useCase, llm, sessions }; } function makeInput(overrides: Record = {}) { @@ -113,14 +122,14 @@ function makeInput(overrides: Record = {}) { return { input, events }; } -describe('adminforth-agent turn flow (AgentTurnService.handleTurn)', () => { +describe('RunTurnUseCase.handleTurn', () => { it('streams a text turn, emits response/finish, and persists the assembled text', async () => { - const { service, finishCalls, runtimeCalls } = buildService({ - streamFor: () => streamOf(textChunk('Hello'), textChunk(' world')), + const { useCase, llm, sessions } = buildUseCase({ + streamFor: () => streamOf(text('Hello'), text(' world')), }); const { input, events } = makeInput(); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(events.map((e) => e.type)).toEqual([ 'turn-started', @@ -136,97 +145,149 @@ describe('adminforth-agent turn flow (AgentTurnService.handleTurn)', () => { turnId: 'turn-1', }); expect(result).toMatchObject({ text: 'Hello world', turnId: 'turn-1', aborted: false, failed: false }); - expect(finishCalls[0]).toMatchObject({ turnId: 'turn-1', responseText: 'Hello world' }); - expect(runtimeCalls).toHaveLength(1); - expect('messages' in runtimeCalls[0].input).toBe(true); + expect(sessions.calls.saveTurnResponse[0]).toMatchObject({ turnId: 'turn-1', responseText: 'Hello world' }); + expect(sessions.calls.touchSession).toEqual(['s1']); + expect(llm.calls).toHaveLength(1); + expect('messages' in llm.calls[0].input).toBe(true); }); it('emits reasoning deltas but excludes reasoning from the persisted response', async () => { - const { service, finishCalls } = buildService({ - streamFor: () => streamOf(reasoningChunk('thinking...'), textChunk('Answer')), + const { useCase, sessions } = buildUseCase({ + streamFor: () => streamOf(reasoning('thinking...'), text('Answer')), }); const { input, events } = makeInput(); - await service.handleTurn(input as any); + await useCase.handleTurn(input as any); expect(events.some((e) => e.type === 'reasoning-delta' && e.delta === 'thinking...')).toBe(true); expect(events.find((e) => e.type === 'response').text).toBe('Answer'); - expect(finishCalls[0].responseText).toBe('Answer'); + expect(sessions.calls.saveTurnResponse[0].responseText).toBe('Answer'); }); it('turns an LLM failure into an error event and persists the message as the response (current behavior)', async () => { - const { service, finishCalls } = buildService({ + const { useCase, sessions } = buildUseCase({ streamFor: () => { throw new Error('llm exploded'); }, }); const { input, events } = makeInput(); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(events.map((e) => e.type)).toEqual(['turn-started', 'error', 'finish']); expect(result.failed).toBe(true); expect(result.aborted).toBe(false); expect(result.text).toContain('llm exploded'); expect(events.find((e) => e.type === 'error').error).toContain('llm exploded'); - expect(finishCalls[0].responseText).toContain('llm exploded'); + expect(sessions.calls.saveTurnResponse[0].responseText).toContain('llm exploded'); }); it('handles a client abort without emitting response/error and still finalizes the turn', async () => { const controller = new AbortController(); controller.abort(); - const { service, finishCalls } = buildService({ - streamFor: () => streamOf(textChunk('partial')), + const { useCase, sessions } = buildUseCase({ + streamFor: () => streamOf(text('partial')), }); const { input, events } = makeInput({ abortSignal: controller.signal }); - const result = await service.handleTurn(input as any); + const result = await useCase.handleTurn(input as any); expect(result.aborted).toBe(true); expect(result.failed).toBe(false); expect(events.map((e) => e.type)).toEqual(['turn-started', 'finish']); - expect(finishCalls).toHaveLength(1); + expect(sessions.calls.saveTurnResponse).toHaveLength(1); }); - it('emits a HITL interrupt and resumes via a langgraph Command on approval', async () => { - const { service, runtimeCalls, resumeCalls } = buildService({ - resumeInitialResponse: 'prev ', - streamFor: (callIndex) => - callIndex === 1 + it('emits a HITL interrupt and resumes via a provider-agnostic resume payload on approval', async () => { + const { useCase, llm, sessions } = buildUseCase({ + initialResponse: 'prev ', + streamFor: (call) => + call === 1 ? streamOf( - interruptChunk([ + interrupt([ { id: 'int-1', value: { actionRequests: [{ name: 'delete_record', description: 'Delete row' }] } }, ]), ) - : streamOf(textChunk('Done')), + : streamOf(text('Done')), }); const first = makeInput(); - await service.handleTurn(first.input as any); + await useCase.handleTurn(first.input as any); expect(first.events.map((e) => e.type)).toEqual(['turn-started', 'interrupt', 'response', 'finish']); expect(first.events.find((e) => e.type === 'interrupt').sessionId).toBe('s1'); const second = makeInput({ prompt: '', approvalDecision: 'approve' }); - const result = await service.handleTurn(second.input as any); + const result = await useCase.handleTurn(second.input as any); - expect(resumeCalls).toHaveLength(1); - expect(runtimeCalls).toHaveLength(2); - const resumeInput = runtimeCalls[1].input; - expect('messages' in resumeInput).toBe(false); - expect(resumeInput.constructor.name).toBe('Command'); + expect(sessions.calls.getResumeState).toBe(1); + expect(llm.calls).toHaveLength(2); + expect('messages' in llm.calls[1].input).toBe(false); + expect('resume' in llm.calls[1].input).toBe(true); // initialResponse ('prev ') is prepended to the resumed streamed text. expect(result.text).toBe('prev Done'); expect(second.events.find((e) => e.type === 'response').text).toBe('prev Done'); }); + it('with a persistent checkpointer, resumes from checkpoint state (ignoring the local cache)', async () => { + const { useCase, llm } = buildUseCase({ + hasPersistentCheckpointer: true, + initialResponse: 'prev ', + pendingInterrupts: [{ id: 'int-1', count: 1 }], + streamFor: () => streamOf(text('Resumed')), + }); + // No interrupt cached in this process (restart / other instance); the checkpoint is authoritative. + const { input } = makeInput({ prompt: '', approvalDecision: 'approve' }); + const result = await useCase.handleTurn(input as any); + + expect(llm.getPendingInterruptsCalls).toHaveLength(1); + expect(llm.calls).toHaveLength(1); + expect('resume' in llm.calls[0].input).toBe(true); + expect(result.text).toBe('prev Resumed'); + }); + + it('surfaces a checkpoint read failure as a real error, not "no pending approval"', async () => { + const { useCase } = buildUseCase({ + hasPersistentCheckpointer: true, + pendingInterruptsError: new Error('checkpoint DB unavailable'), + }); + const { input } = makeInput({ prompt: '', approvalDecision: 'approve' }); + + // The infra error must propagate, NOT be masked as a missing interrupt. + await expect(useCase.handleTurn(input as any)).rejects.toThrow('checkpoint DB unavailable'); + await expect(useCase.handleTurn(input as any)).rejects.not.toThrow('No pending approval'); + }); + it('rejects (does not swallow) an approval with no pending interrupt, after emitting turn-started', async () => { - const { service } = buildService(); + const { useCase } = buildUseCase(); const { input, events } = makeInput({ prompt: '', approvalDecision: 'approve' }); // prepareTurn() runs BEFORE the try/catch in runAndPersistAgentResponse, so — unlike an // LLM failure during streaming — this error propagates out of handleTurn instead of // becoming a `failed` result. turn-started has already been emitted by then. - await expect(service.handleTurn(input as any)).rejects.toThrow('No pending approval interrupt'); + await expect(useCase.handleTurn(input as any)).rejects.toThrow('No pending approval interrupt'); expect(events.map((e) => e.type)).toEqual(['turn-started']); }); }); + +describe('buildResumeValue', () => { + it('fans a single approval across the recorded interrupt count', () => { + expect(buildResumeValue({ decision: 'approve', interrupts: [{ id: 'a', count: 2 }] })).toEqual({ + decisions: [{ type: 'approve' }, { type: 'approve' }], + }); + }); + + it('keys resume payloads by interrupt id when there are multiple', () => { + const value = buildResumeValue({ + decision: 'reject', + interrupts: [{ id: 'a', count: 1 }, { id: 'b', count: 1 }], + prompt: 'do this instead', + }) as Record; + expect(Object.keys(value)).toEqual(['a', 'b']); + expect(value.a.decisions[0]).toMatchObject({ type: 'reject' }); + expect(value.a.decisions[0].message).toContain('do this instead'); + }); + + it('throws when there is no interrupt to resume', () => { + expect(() => buildResumeValue({ decision: 'approve', interrupts: [] })).toThrow('No pending approval interrupt'); + }); +}); diff --git a/tests/units.test.ts b/tests/units.test.ts index dcfbed3..d4d0c15 100644 --- a/tests/units.test.ts +++ b/tests/units.test.ts @@ -1,9 +1,8 @@ -import { VegaLiteStreamBuffer } from '../agent/turn/VegaLiteStreamBuffer.js'; -import { isAbortError, getErrorMessage } from '../errors.js'; -import { sanitizeSpeechText } from '../sanitizeSpeechText.js'; -import { AgentModeResolver } from '../agent/models/AgentModeResolver.js'; -import { detectUserLanguage } from '../agent/languageDetect.js'; -import { createToolCallTracker } from '../agent/toolCallEvents.js'; +import { VegaLiteStreamBuffer } from '../domain/vegaLiteStreamBuffer.js'; +import { isAbortError, getErrorMessage } from '../shared/errors.js'; +import { sanitizeSpeechText } from '../shared/sanitizeSpeechText.js'; +import { detectUserLanguage } from '../domain/languageDetect.js'; +import { createToolCallTracker } from '../domain/toolCallEvents.js'; // Characterization tests for the small, deterministic building blocks used across the // main flows: the vega-lite streaming buffer, error helpers, speech-text sanitizer, @@ -91,26 +90,6 @@ describe('sanitizeSpeechText', () => { }); }); -describe('AgentModeResolver', () => { - const options = { - modes: [ - { name: 'A', completionAdapter: {} }, - { name: 'B', completionAdapter: {} }, - ], - } as any; - - it('resolves a mode by name', () => { - expect(new AgentModeResolver(options).resolve('B').name).toBe('B'); - }); - - it('falls back to the first mode for unknown / missing names', () => { - const resolver = new AgentModeResolver(options); - expect(resolver.resolve('nope').name).toBe('A'); - expect(resolver.resolve(undefined).name).toBe('A'); - expect(resolver.resolve(null).name).toBe('A'); - }); -}); - describe('detectUserLanguage', () => { it('parses the structured completion output', async () => { let captured: any; diff --git a/agent/tools/AgentToolProvider.ts b/tools/agentToolProvider.ts similarity index 86% rename from agent/tools/AgentToolProvider.ts rename to tools/agentToolProvider.ts index e15f3d8..ec62140 100644 --- a/agent/tools/AgentToolProvider.ts +++ b/tools/agentToolProvider.ts @@ -1,6 +1,6 @@ import type { IAdminForth } from "adminforth"; -import { prepareApiBasedTools } from "../../apiBasedTools.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import { prepareApiBasedTools } from "./apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; import { createAgentTools } from "./index.js"; export class AgentToolProvider { diff --git a/apiBasedTools.ts b/tools/apiBasedTools.ts similarity index 98% rename from apiBasedTools.ts rename to tools/apiBasedTools.ts index 7529056..7e34ffe 100644 --- a/apiBasedTools.ts +++ b/tools/apiBasedTools.ts @@ -574,7 +574,6 @@ async function callOpenApiSchema(params: { } const response = createDirectToolResponse(); - logger.info(`Calling OpenAPI tool "${toolName}" with direct handler`); const lang = acceptLanguage ?? "en"; const tr = ( msg: string, @@ -659,15 +658,6 @@ export function prepareApiBasedTools( } } - logger.info( - `AdminForth Agent OpenAPI APIs: ${formatLogNameList( - adminforth.openApi.registeredSchemas.map((schema) => openApiSchemaPathToToolName(schema.path, adminforth)), - )}`, - ); - logger.info( - `AdminForth Agent OpenAPI tools connected: ${formatLogNameList([...openApiSchemasByToolName.keys()])}`, - ); - for (const [toolName, schema] of openApiSchemasByToolName.entries()) { apiBasedTools[toolName] = { description: schema.description, diff --git a/agent/tools/apiTool.ts b/tools/apiTool.ts similarity index 96% rename from agent/tools/apiTool.ts rename to tools/apiTool.ts index 4a97af5..5e57e01 100644 --- a/agent/tools/apiTool.ts +++ b/tools/apiTool.ts @@ -1,5 +1,5 @@ import { tool } from "langchain"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; const emptyToolSchema = { type: "object", diff --git a/agent/tools/fetchSkill.ts b/tools/fetchSkill.ts similarity index 98% rename from agent/tools/fetchSkill.ts rename to tools/fetchSkill.ts index 43d6dbf..4f7fa28 100644 --- a/agent/tools/fetchSkill.ts +++ b/tools/fetchSkill.ts @@ -4,7 +4,7 @@ import { listSkillManifests, loadSkillMarkdown, type AgentSkillManifest, -} from "../skills/registry.js"; +} from "./skills/registry.js"; const fetchSkillSchema = z.object({ skillName: z diff --git a/agent/tools/fetchToolSchema.ts b/tools/fetchToolSchema.ts similarity index 94% rename from agent/tools/fetchToolSchema.ts rename to tools/fetchToolSchema.ts index 97cca79..72c031f 100644 --- a/agent/tools/fetchToolSchema.ts +++ b/tools/fetchToolSchema.ts @@ -1,6 +1,6 @@ import { tool } from "langchain"; import { z } from "zod"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; const fetchToolSchemaSchema = z.object({ toolName: z diff --git a/agent/tools/getUserLocation.ts b/tools/getUserLocation.ts similarity index 100% rename from agent/tools/getUserLocation.ts rename to tools/getUserLocation.ts diff --git a/agent/tools/index.ts b/tools/index.ts similarity index 94% rename from agent/tools/index.ts rename to tools/index.ts index efac313..0b2e894 100644 --- a/agent/tools/index.ts +++ b/tools/index.ts @@ -1,7 +1,7 @@ import type { ClientTool } from "@langchain/core/tools"; import { createFetchSkillTool } from "./fetchSkill.js"; import { createFetchToolSchemaTool } from "./fetchToolSchema.js"; -import type { ApiBasedTool } from "../../apiBasedTools.js"; +import type { ApiBasedTool } from "./apiBasedTools.js"; import { createApiTool } from "./apiTool.js"; import { createGetUserLocationTool } from "./getUserLocation.js"; import { createNavigateUserTool } from "./navigateUser.js"; diff --git a/agent/tools/navigateUser.ts b/tools/navigateUser.ts similarity index 98% rename from agent/tools/navigateUser.ts rename to tools/navigateUser.ts index da4da55..8960fd2 100644 --- a/agent/tools/navigateUser.ts +++ b/tools/navigateUser.ts @@ -1,7 +1,7 @@ import { tool } from "langchain"; import { z } from "zod"; import type { CurrentPageContext } from "./getUserLocation.js"; -import type { AgentEventEmitter } from "../../agentEvents.js"; +import type { AgentEventEmitter } from "../domain/agentEvents.js"; const filterSchema = z.object({ column: z.string().min(1).describe("Resource column name."), diff --git a/agent/skills/registry.ts b/tools/skills/registry.ts similarity index 100% rename from agent/skills/registry.ts rename to tools/skills/registry.ts diff --git a/endpoints/chatSurfaces.ts b/transport/http/chatSurfaceEndpoints.ts similarity index 100% rename from endpoints/chatSurfaces.ts rename to transport/http/chatSurfaceEndpoints.ts diff --git a/endpoints/context.ts b/transport/http/context.ts similarity index 94% rename from endpoints/context.ts rename to transport/http/context.ts index 95ba626..6d5d69c 100644 --- a/endpoints/context.ts +++ b/transport/http/context.ts @@ -9,8 +9,8 @@ import type { HandleTurnInput, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, -} from "../agentTurnService.js"; -import type { PluginOptions } from "../types.js"; +} from "../../domain/turnTypes.js"; +import type { PluginOptions } from "../../types.js"; export type SessionTurn = { prompt: string; diff --git a/endpoints/core.ts b/transport/http/coreEndpoints.ts similarity index 94% rename from endpoints/core.ts rename to transport/http/coreEndpoints.ts index ec4f466..ae7724b 100644 --- a/endpoints/core.ts +++ b/transport/http/coreEndpoints.ts @@ -1,7 +1,7 @@ import type { IHttpServer } from "adminforth"; import { z } from "zod"; -import { createSseEventEmitter } from "../surfaces/web-sse/createSseEventEmitter.js"; -import type { CurrentPageContext } from "../agent/tools/getUserLocation.js"; +import { createSseEventEmitter } from "../sse/sseWriter.js"; +import type { CurrentPageContext } from "../../tools/getUserLocation.js"; import type { CoreEndpointsContext } from "./context.js"; type MulterFile = { @@ -86,7 +86,7 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe method: 'POST', path: `/agent/response`, request_schema: agentResponseBodySchema, - handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + handler: async ({ body, adminUser, _raw_express_res, abortSignal }) => { const data = body as z.infer; const emit = createSseEventEmitter(_raw_express_res, { vercelAiUiMessageStream: true, @@ -113,7 +113,7 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe method: 'POST', path: `/agent/approval`, request_schema: agentApprovalBodySchema, - handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + handler: async ({ body, adminUser, _raw_express_res, abortSignal }) => { const data = body as z.infer; const emit = createSseEventEmitter(_raw_express_res, { vercelAiUiMessageStream: true, diff --git a/endpoints/sessions.ts b/transport/http/sessionEndpoints.ts similarity index 94% rename from endpoints/sessions.ts rename to transport/http/sessionEndpoints.ts index cda48b9..4e0d433 100644 --- a/endpoints/sessions.ts +++ b/transport/http/sessionEndpoints.ts @@ -2,7 +2,7 @@ import type { IHttpServer } from "adminforth"; import { Filters, Sorts } from "adminforth"; import { randomUUID } from "crypto"; import { z } from "zod"; -import { AGENT_SYSTEM_TURN_PROMPT } from "../sessionStore.js"; +import { AGENT_SYSTEM_TURN_PROMPT } from "../../persistence/sessionStore.js"; import type { SessionEndpointsContext } from "./context.js"; const addSystemMessageBodySchema = z.object({ @@ -56,11 +56,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== userId) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; @@ -135,11 +137,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== userId) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; @@ -165,11 +169,13 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [Filters.EQ(ctx.options.sessionResource.idField, data.sessionId)] ); if (!session) { + response.setStatus(404, 'Session not found'); return { error: 'Session not found' }; } if (session[ctx.options.sessionResource.askerIdField] !== adminUser!.pk) { + response.setStatus(403, 'Unauthorized'); return { error: 'Unauthorized' }; diff --git a/surfaces/web-sse/createSseEventEmitter.ts b/transport/sse/sseWriter.ts similarity index 86% rename from surfaces/web-sse/createSseEventEmitter.ts rename to transport/sse/sseWriter.ts index 9205c30..d879837 100644 --- a/surfaces/web-sse/createSseEventEmitter.ts +++ b/transport/sse/sseWriter.ts @@ -1,7 +1,7 @@ import { randomUUID } from "crypto"; -import type { AgentEvent, AgentEventEmitter } from "../../agentEvents.js"; -import type { ToolCallEvent } from "../../agent/toolCallEvents.js"; +import type { AgentEvent, AgentEventEmitter } from "../../domain/agentEvents.js"; +import type { ToolCallEvent } from "../../domain/toolCallEvents.js"; type AgentEventStreamResponse = { writeHead: (statusCode: number, headers: Record) => void; @@ -247,6 +247,21 @@ function createAgentEventStream( return stream; } +/** + * Maps the internal {@link AgentEvent} vocabulary onto the two Server-Sent-Events + * wire dialects the frontend consumes. Both dialects are HARD external contracts — + * do not change frame names/shapes without updating the corresponding consumer: + * + * - `vercelAiUiMessageStream: true` (used by `/agent/response` and `/agent/approval`): + * the Vercel AI SDK message-stream v1 format (`start`, `text-start`/`text-delta`/ + * `text-end`, `reasoning-*`, `data-*` parts, `error` with `errorText`, `finish`, + * `[DONE]`). Consumed by `DefaultChatTransport` and the manual approval parser in + * `custom/composables/agentStore/useAgentChat.ts`. + * - default (used by `/agent/speech-response`): bare event names + * (`transcript`, `speech-response`, `audio-start`/`audio-delta`/`audio-done`, + * `open-page`, `error`) plus the Vercel-shaped `data-tool-call`. Consumed by + * `custom/composables/useAgentAudio.ts`. + */ export function createSseEventEmitter( res: AgentEventStreamResponse, options: AgentEventStreamOptions = {}, diff --git a/chatSurfaceService.ts b/transport/surfaces/chatSurfaceService.ts similarity index 92% rename from chatSurfaceService.ts rename to transport/surfaces/chatSurfaceService.ts index 7b28e9a..8dc12fa 100644 --- a/chatSurfaceService.ts +++ b/transport/surfaces/chatSurfaceService.ts @@ -6,16 +6,16 @@ import type { IAdminForth, } from "adminforth"; import { Filters, logger } from "adminforth"; -import type { AgentEventEmitter } from "./agentEvents.js"; +import type { AgentEventEmitter } from "../../domain/agentEvents.js"; import type { HandleTurnInput, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, -} from "./agentTurnService.js"; -import { getErrorMessage, isAbortError } from "./errors.js"; -import type { AgentSessionStore } from "./sessionStore.js"; -import { sanitizeSpeechText } from "./sanitizeSpeechText.js"; -import type { PluginOptions } from "./types.js"; +} from "../../domain/turnTypes.js"; +import { getErrorMessage, isAbortError } from "../../shared/errors.js"; +import type { AgentSessionStore } from "../../persistence/sessionStore.js"; +import { sanitizeSpeechText } from "../../shared/sanitizeSpeechText.js"; +import type { PluginOptions } from "../../types.js"; type ChatSurfaceIncomingMessageWithAudio = ChatSurfaceIncomingMessage & { audio?: { @@ -64,9 +64,11 @@ export class ChatSurfaceService { } if (event.type === "error") { + // Full error detail is logged server-side (see runAndPersistAgentResponse); + // external chat users receive a generic message to avoid leaking internals. await sink.emit({ type: "error", - message: event.error, + message: "Agent response failed. Check server logs for details.", }); } }; @@ -171,7 +173,7 @@ export class ChatSurfaceService { logger.error(`Agent ${incoming.surface} surface speech synthesis failed:\n${getErrorMessage(error)}`); await sink.emit({ type: "error", - message: getErrorMessage(error), + message: "Speech synthesis failed. Check server logs for details.", }); } } @@ -224,7 +226,7 @@ export class ChatSurfaceService { if (agentResponse.failed) { await sink.emit({ type: "error", - message: agentResponse.text, + message: "Agent response failed. Check server logs for details.", }); } diff --git a/agent/speech/SpeechTurnService.ts b/transport/surfaces/speechTurnService.ts similarity index 96% rename from agent/speech/SpeechTurnService.ts rename to transport/surfaces/speechTurnService.ts index 35a2618..66b096d 100644 --- a/agent/speech/SpeechTurnService.ts +++ b/transport/surfaces/speechTurnService.ts @@ -1,11 +1,11 @@ import { logger } from "adminforth"; -import { getErrorMessage, isAbortError } from "../../errors.js"; -import { sanitizeSpeechText } from "../../sanitizeSpeechText.js"; +import { getErrorMessage, isAbortError } from "../../shared/errors.js"; +import { sanitizeSpeechText } from "../../shared/sanitizeSpeechText.js"; import type { RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, SpeechAgentTurnInput, -} from "../turn/turnTypes.js"; +} from "../../domain/turnTypes.js"; export class SpeechTurnService { constructor( diff --git a/types.ts b/types.ts index 4559c66..a9336ea 100644 --- a/types.ts +++ b/types.ts @@ -5,13 +5,18 @@ import { type AudioAdapter, type ChatSurfaceAdapter, } from "adminforth"; -import type { AgentModeCompletionAdapter } from "./agent/simpleAgent.js"; +import type { AgentModeCompletionAdapter } from "./application/ports.js"; interface ISessionResource { resourceId: string; idField: string; titleField: string; - turnsField: string; + /** + * @deprecated Not used by the plugin — session turns are looked up via + * `turnResource.sessionIdField`. Kept optional for backward compatibility; + * will be removed in a future major version. + */ + turnsField?: string; askerIdField: string; createdAtField: string; } @@ -85,8 +90,9 @@ export interface PluginOptions extends PluginsCommonOptions { systemPrompt?: string; /** - * Response generation level. - * Default is low + * @deprecated Not applied — reasoning effort is configured on the completion + * adapter (e.g. `extraRequestBodyParameters.reasoning.effort`), not here. + * Kept for backward compatibility; will be removed in a future major version. */ reasoning?: "none" | "minimal" | "low" | "medium" | "high" | "xhigh";