From 51747ce155c6437042805bd55032510606f60dd9 Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Wed, 20 May 2026 15:32:43 +0300 Subject: [PATCH 1/5] refactor: implement session management endpoints and refactor session handling - Added new session management endpoints: get-sessions, get-session-info, create-session, delete-session, and add-system-message-to-turns. - Introduced AgentSessionStore class to encapsulate session-related logic, including creating new turns and managing chat surface sessions. - Refactored index.ts to streamline endpoint setup and improve code organization. - Created error handling utilities in errors.ts for better error management. --- agentResponseEvents.ts | 1 - agentTurnService.ts | 355 ++++++++++++ chatSurfaceService.ts | 189 +++++++ endpoints/chatSurfaces.ts | 93 ++++ endpoints/context.ts | 62 +++ endpoints/core.ts | 257 +++++++++ endpoints/sessions.ts | 167 ++++++ errors.ts | 14 + index.ts | 1098 ++----------------------------------- sessionStore.ts | 79 +++ 10 files changed, 1261 insertions(+), 1054 deletions(-) delete mode 100644 agentResponseEvents.ts create mode 100644 agentTurnService.ts create mode 100644 chatSurfaceService.ts create mode 100644 endpoints/chatSurfaces.ts create mode 100644 endpoints/context.ts create mode 100644 endpoints/core.ts create mode 100644 endpoints/sessions.ts create mode 100644 errors.ts create mode 100644 sessionStore.ts diff --git a/agentResponseEvents.ts b/agentResponseEvents.ts deleted file mode 100644 index 5b4d792..0000000 --- a/agentResponseEvents.ts +++ /dev/null @@ -1 +0,0 @@ -export { createSseEventEmitter } from "./surfaces/web-sse/createSseEventEmitter.js"; diff --git a/agentTurnService.ts b/agentTurnService.ts new file mode 100644 index 0000000..0b21060 --- /dev/null +++ b/agentTurnService.ts @@ -0,0 +1,355 @@ +import type { AdminUser, IAdminForth } from "adminforth"; +import { logger } from "adminforth"; +import { randomUUID } from "crypto"; +import { HumanMessage, SystemMessage } from "langchain"; +import type { BaseCheckpointSaver } from "@langchain/langgraph"; +import { createAgentChatModel, callAgent } from "./agent/simpleAgent.js"; +import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js"; +import { detectUserLanguage, type PreviousUserMessage } from "./agent/languageDetect.js"; +import { prepareApiBasedTools as buildApiBasedTools } from "./apiBasedTools.js"; +import type { AgentEventEmitter } from "./agentEvents.js"; +import { buildAgentTurnSystemPrompt } from "./agent/systemPrompt.js"; +import type { CurrentPageContext } from "./agent/tools/getUserLocation.js"; +import { isAbortError, getErrorMessage } from "./errors.js"; +import type { AgentSessionStore } from "./sessionStore.js"; +import type { PluginOptions } from "./types.js"; + +type AgentTurnRunInput = { + prompt: string; + sessionId: string; + turnId: string; + previousUserMessages: PreviousUserMessage[]; + modeName?: string | null; + userTimeZone: string; + currentPage?: CurrentPageContext; + abortSignal?: AbortSignal; + adminUser: AdminUser; + sequenceDebugCollector: ReturnType; + emit?: AgentEventEmitter; +}; + +export type RunAndPersistAgentResponseInput = { + prompt: string; + sessionId: string; + modeName?: string | null; + userTimeZone: string; + currentPage?: CurrentPageContext; + abortSignal?: AbortSignal; + adminUser: AdminUser; + emit?: AgentEventEmitter; + failureLogMessage: string; + abortLogMessage: string; +}; + +export type RunAndPersistAgentResponseResult = { + text: string; + turnId: string; + aborted: boolean; + failed: boolean; +}; + +export type HandleTurnInput = Omit & { + emit: AgentEventEmitter; + failureLogMessage?: string; + abortLogMessage?: string; +}; + +type AgentTurnServiceOptions = { + getAdminforth: () => IAdminForth; + getPluginInstanceId: () => string; + options: PluginOptions; + sessionStore: AgentSessionStore; + getCheckpointer: () => BaseCheckpointSaver; + getInternalAgentResourceIds: () => string[]; + getAgentSystemPrompt: () => Promise; +}; + +const VEGA_LITE_FENCE_START = "```vega-lite"; +const COMPLETE_VEGA_LITE_BLOCK_RE = /```vega-lite[\s\S]*?```/; + +export class AgentTurnService { + constructor(private serviceOptions: AgentTurnServiceOptions) {} + + private async runAgentTurn(input: AgentTurnRunInput) { + const adminforth = this.serviceOptions.getAdminforth(); + const options = this.serviceOptions.options; + let fullResponse = ""; + let bufferedTextDelta = ""; + let isRenderingVegaLite = false; + const maxTokens = options.maxTokens ?? 1000; + const selectedMode = options.modes.find((mode) => mode.name === input.modeName) ?? options.modes[0]; + const [primaryModelSpec, summaryModelSpec] = await Promise.all([ + createAgentChatModel({ + adapter: selectedMode.completionAdapter, + maxTokens, + purpose: "primary", + }), + createAgentChatModel({ + adapter: selectedMode.completionAdapter, + maxTokens, + purpose: "summary", + }), + ]); + const model = primaryModelSpec.model; + const summaryModel = summaryModelSpec.model; + const modelMiddleware = primaryModelSpec.middleware; + + const userLanguage = await detectUserLanguage(selectedMode.completionAdapter, input.prompt, input.previousUserMessages) + .catch((error) => { + if (input.abortSignal?.aborted || isAbortError(error)) { + throw error; + } + + logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); + return null; + }); + const systemPrompt = buildAgentTurnSystemPrompt({ + agentSystemPrompt: await this.serviceOptions.getAgentSystemPrompt(), + adminUser: input.adminUser, + usernameField: adminforth.config.auth!.usernameField, + userLanguage, + }); + const apiBasedTools = buildApiBasedTools( + adminforth, + this.serviceOptions.getInternalAgentResourceIds(), + ); + + const stream = await callAgent({ + name: `adminforth-agent-${this.serviceOptions.getPluginInstanceId()}`, + model, + summaryModel, + modelMiddleware, + checkpointer: this.serviceOptions.getCheckpointer(), + messages: [ + new SystemMessage(systemPrompt), + new HumanMessage(input.prompt), + ], + adminUser: input.adminUser, + adminforth, + apiBasedTools, + customComponentsDir: adminforth.config.customization.customComponentsDir ?? "custom", + sessionId: input.sessionId, + turnId: input.turnId, + currentPage: input.currentPage, + userTimeZone: input.userTimeZone, + abortSignal: input.abortSignal, + emitToolCallEvent: (event) => { + input.sequenceDebugCollector.handleToolCallEvent(event); + void input.emit?.({ + type: "tool-call", + data: event, + }); + }, + sequenceDebugSink: input.sequenceDebugCollector, + }); + + for await (const rawChunk of stream as AsyncIterable<[any, any]>) { + if (input.abortSignal?.aborted) { + throw new DOMException("This operation was aborted", "AbortError"); + } + + const [token, metadata] = rawChunk; + + const nodeName = + typeof metadata?.langgraph_node === "string" + ? metadata.langgraph_node + : ""; + + if (nodeName && !["model", "model_request"].includes(nodeName)) { + continue; + } + + const blocks = Array.isArray(token?.contentBlocks) + ? token.contentBlocks + : Array.isArray(token?.content) + ? token.content + : []; + const reasoningDelta = blocks + .filter((b: any) => b?.type === "reasoning") + .map((b: any) => String(b.reasoning ?? "")) + .join(""); + + const textDelta = blocks + .filter((b: any) => b?.type === "text") + .map((b: any) => String(b.text ?? "")) + .join(""); + + if (reasoningDelta) { + await input.emit?.({ + type: "reasoning-delta", + delta: reasoningDelta, + }); + } + + if (textDelta) { + fullResponse += textDelta; + bufferedTextDelta += textDelta; + + if ( + bufferedTextDelta.includes(VEGA_LITE_FENCE_START) && + !COMPLETE_VEGA_LITE_BLOCK_RE.test(bufferedTextDelta) + ) { + if (!isRenderingVegaLite) { + isRenderingVegaLite = true; + await input.emit?.({ + type: "rendering", + phase: "start", + label: "Rendering...", + }); + } + continue; + } + + if (isRenderingVegaLite) { + isRenderingVegaLite = false; + await input.emit?.({ + type: "rendering", + phase: "end", + label: "Rendering...", + }); + } + + const streamableLength = bufferedTextDelta.includes(VEGA_LITE_FENCE_START) + ? bufferedTextDelta.length + : bufferedTextDelta.length - getPartialVegaLiteFenceStartLength(bufferedTextDelta); + + if (!streamableLength) { + continue; + } + + await input.emit?.({ + type: "text-delta", + delta: bufferedTextDelta.slice(0, streamableLength), + }); + bufferedTextDelta = bufferedTextDelta.slice(streamableLength); + } + } + + if (isRenderingVegaLite) { + await input.emit?.({ + type: "rendering", + phase: "end", + label: "Rendering...", + }); + } + + if (bufferedTextDelta) { + await input.emit?.({ + type: "text-delta", + delta: bufferedTextDelta, + }); + } + + return { + text: fullResponse, + }; + } + + async runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput) { + const adminforth = this.serviceOptions.getAdminforth(); + const options = this.serviceOptions.options; + const previousUserMessages = await this.serviceOptions.sessionStore.getPreviousUserMessages(input.sessionId); + const turnId = await this.serviceOptions.sessionStore.createNewTurn(input.sessionId, input.prompt); + await adminforth.resource(options.sessionResource.resourceId).update(input.sessionId, { + [options.sessionResource.createdAtField]: new Date().toISOString(), + }); + const sequenceDebugCollector = createSequenceDebugCollector(); + let fullResponse = ""; + let aborted = false; + let failed = false; + + try { + const agentResponse = await this.runAgentTurn({ + prompt: input.prompt, + sessionId: input.sessionId, + turnId, + previousUserMessages, + modeName: input.modeName, + userTimeZone: input.userTimeZone, + currentPage: input.currentPage, + abortSignal: input.abortSignal, + adminUser: input.adminUser, + sequenceDebugCollector, + emit: input.emit, + }); + fullResponse = agentResponse.text; + } catch (error) { + if (input.abortSignal?.aborted || isAbortError(error)) { + aborted = true; + logger.info(input.abortLogMessage); + } else { + failed = true; + fullResponse = getErrorMessage(error); + logger.error(`${input.failureLogMessage}:\n${fullResponse}`); + } + } + + sequenceDebugCollector.flush(); + const turnUpdates: Record = { + [options.turnResource.responseField]: fullResponse, + }; + + if (options.turnResource.debugField) { + turnUpdates[options.turnResource.debugField] = sequenceDebugCollector.getHistory(); + } + + await adminforth.resource(options.turnResource.resourceId).update(turnId, turnUpdates); + + return { + text: fullResponse, + turnId, + aborted, + failed, + }; + } + + async handleTurn(input: HandleTurnInput) { + await input.emit({ + type: "turn-started", + messageId: randomUUID(), + }); + + const agentResponse = await this.runAndPersistAgentResponse({ + prompt: input.prompt, + sessionId: input.sessionId, + modeName: input.modeName, + userTimeZone: input.userTimeZone, + currentPage: input.currentPage, + abortSignal: input.abortSignal, + adminUser: input.adminUser, + emit: input.emit, + failureLogMessage: input.failureLogMessage ?? "Agent response failed", + abortLogMessage: input.abortLogMessage ?? "Agent response aborted", + }); + + if (agentResponse.failed) { + await input.emit({ + type: "error", + error: agentResponse.text, + }); + } else if (!agentResponse.aborted) { + await input.emit({ + type: "response", + text: agentResponse.text, + sessionId: input.sessionId, + turnId: agentResponse.turnId, + }); + } + + await input.emit({ + type: "finish", + }); + + return agentResponse; + } +} + +function getPartialVegaLiteFenceStartLength(text: string): number { + for (let length = Math.min(text.length, VEGA_LITE_FENCE_START.length - 1); length > 0; length -= 1) { + if (VEGA_LITE_FENCE_START.startsWith(text.slice(-length))) { + return length; + } + } + + return 0; +} diff --git a/chatSurfaceService.ts b/chatSurfaceService.ts new file mode 100644 index 0000000..628be89 --- /dev/null +++ b/chatSurfaceService.ts @@ -0,0 +1,189 @@ +import type { + AdminUser, + ChatSurfaceAdapter, + ChatSurfaceEventSink, + ChatSurfaceIncomingMessage, + IAdminForth, +} from "adminforth"; +import { Filters } from "adminforth"; +import { randomUUID } from "crypto"; +import type { AgentEventEmitter } from "./agentEvents.js"; +import type { HandleTurnInput } from "./agentTurnService.js"; +import type { PluginOptions } from "./types.js"; +import type { AgentSessionStore } from "./sessionStore.js"; + +type ChatSurfaceConnectAction = { + type: "url"; + label: string; + url: string; +}; + +export type ChatSurfaceAdapterWithConnectAction = ChatSurfaceAdapter & { + createConnectAction?(input: { + token: string; + }): ChatSurfaceConnectAction | Promise; +}; + +type ChatSurfaceLinkTokenPayload = { + surface: string; + adminUserId: AdminUser["pk"]; + expiresAt: number; +}; + +const DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD = "externalUserId"; +const CHAT_SURFACE_LINK_TOKEN_TTL_MS = 60 * 1000; + +export class ChatSurfaceService { + private linkTokens = new Map(); + + constructor( + private getAdminforth: () => IAdminForth, + private options: PluginOptions, + private sessionStore: AgentSessionStore, + private handleTurn: (input: HandleTurnInput) => Promise, + ) {} + + getConnectActionAdapters() { + return (this.options.chatSurfaceAdapters ?? []) + .map((adapter) => adapter as ChatSurfaceAdapterWithConnectAction) + .filter((adapter) => adapter.createConnectAction); + } + + createLinkToken(surface: string, adminUser: AdminUser) { + for (const [token, payload] of this.linkTokens) { + if (payload.expiresAt <= Date.now()) { + this.linkTokens.delete(token); + } + } + + const token = randomUUID(); + this.linkTokens.set(token, { + surface, + adminUserId: adminUser.pk, + expiresAt: Date.now() + CHAT_SURFACE_LINK_TOKEN_TTL_MS, + }); + + return token; + } + + private consumeLinkToken(surface: string, token: string) { + const payload = this.linkTokens.get(token); + this.linkTokens.delete(token); + + if (!payload || payload.surface !== surface || payload.expiresAt <= Date.now()) { + return null; + } + + return payload; + } + + private createEventEmitter(sink: ChatSurfaceEventSink): AgentEventEmitter { + return async (event) => { + if (event.type === "text-delta") { + await sink.emit({ + type: "text_delta", + delta: event.delta, + }); + return; + } + + if (event.type === "response") { + await sink.emit({ + type: "done", + text: event.text, + }); + return; + } + + if (event.type === "error") { + await sink.emit({ + type: "error", + message: event.error, + }); + } + }; + } + + private async handleLink( + incoming: ChatSurfaceIncomingMessage, + sink: ChatSurfaceEventSink, + ) { + if (typeof incoming.metadata?.startPayload !== "string") { + return false; + } + + const payload = this.consumeLinkToken(incoming.surface, incoming.metadata.startPayload); + if (!payload) { + await sink.emit({ + type: "error", + message: "This chat surface link is expired or invalid. Please start linking again from AdminForth.", + }); + return true; + } + const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; + const adminforth = this.getAdminforth(); + const authResourceId = adminforth.config.auth!.usersResourceId!; + const authResource = adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!; + const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!; + const adminUserRecord = await adminforth.resource(authResourceId).get([ + Filters.EQ(primaryKeyField, payload.adminUserId), + ]); + + await adminforth.resource(authResourceId).update(payload.adminUserId, { + [externalUserIdField]: { + ...(adminUserRecord[externalUserIdField] ?? {}), + [incoming.surface]: incoming.externalUserId, + }, + }); + await sink.emit({ + type: "done", + text: `${incoming.surface} account connected to AdminForth.`, + }); + + return true; + } + + async handleMessage( + adapter: ChatSurfaceAdapter, + incoming: ChatSurfaceIncomingMessage, + sink: ChatSurfaceEventSink, + ) { + if (await this.handleLink(incoming, sink)) { + return; + } + + const adminforth = this.getAdminforth(); + const authResourceId = adminforth.config.auth!.usersResourceId!; + const authResource = adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!; + const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!; + const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; + const adminUserRecord = ( + await adminforth.resource(authResourceId).list(Filters.IS_NOT_EMPTY(externalUserIdField)) + ).find((user) => user[externalUserIdField]?.[adapter.name] === incoming.externalUserId); + + if (!adminUserRecord) { + await sink.emit({ + type: "error", + message: "This chat account is not authorized to use AdminForth Agent.", + }); + return; + } + + const adminUser = { + pk: adminUserRecord[primaryKeyField], + username: adminUserRecord[adminforth.config.auth!.usernameField], + dbUser: adminUserRecord, + }; + + await this.handleTurn({ + prompt: incoming.prompt, + sessionId: await this.sessionStore.getOrCreateChatSurfaceSession(incoming, adminUser), + modeName: incoming.modeName, + userTimeZone: incoming.userTimeZone ?? "UTC", + adminUser, + emit: this.createEventEmitter(sink), + failureLogMessage: `Agent ${incoming.surface} surface response failed`, + abortLogMessage: `Agent ${incoming.surface} surface response aborted`, + }); + } +} diff --git a/endpoints/chatSurfaces.ts b/endpoints/chatSurfaces.ts new file mode 100644 index 0000000..cf059bb --- /dev/null +++ b/endpoints/chatSurfaces.ts @@ -0,0 +1,93 @@ +import type { + IAdminForthEndpointHandlerInput, + IHttpServer, +} from "adminforth"; +import type { ChatSurfaceAdapterWithConnectAction, ChatSurfaceEndpointsContext } from "./context.js"; + +const DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD = "externalUserId"; + +export function setupChatSurfaceEndpoints(ctx: ChatSurfaceEndpointsContext, server: IHttpServer) { + if (ctx.getChatSurfaceConnectActionAdapters().length) { + server.endpoint({ + method: "POST", + path: "/agent/surfaces/connectable", + handler: async ({ adminUser }) => { + const externalUserIdField = ctx.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; + const externalIds = adminUser!.dbUser[externalUserIdField] ?? {}; + + return { + surfaces: ctx.getChatSurfaceConnectActionAdapters().map((adapter) => ({ + name: adapter.name, + externalUserId: externalIds[adapter.name] ?? null, + })), + }; + }, + }); + } + + for (const adapter of ctx.options.chatSurfaceAdapters ?? []) { + const connectActionAdapter = adapter as ChatSurfaceAdapterWithConnectAction; + if (connectActionAdapter.createConnectAction) { + server.endpoint({ + method: "POST", + path: `/agent/surface/${adapter.name}/connect-action`, + handler: async ({ adminUser }) => { + const token = ctx.createChatSurfaceLinkToken(adapter.name, adminUser!); + const action = await connectActionAdapter.createConnectAction!({ token }); + + return { + action, + }; + }, + }); + server.endpoint({ + method: "POST", + path: `/agent/surface/${adapter.name}/disconnect`, + handler: async ({ adminUser }) => { + const externalUserIdField = ctx.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; + const externalIds = { + ...(adminUser!.dbUser[externalUserIdField] ?? {}), + }; + + delete externalIds[adapter.name]; + + await ctx.adminforth.resource(ctx.adminforth.config.auth!.usersResourceId!).update(adminUser!.pk, { + [externalUserIdField]: externalIds, + }); + + return { + ok: true, + }; + }, + }); + } + + server.endpoint({ + method: "POST", + noAuth: true, + path: `/agent/surface/${adapter.name}/webhook`, + handler: async (endpointInput: IAdminForthEndpointHandlerInput) => { + const surfaceContext = { + body: endpointInput.body, + headers: endpointInput.headers, + abortSignal: endpointInput.abortSignal, + rawRequest: endpointInput._raw_express_req, + rawResponse: endpointInput._raw_express_res, + }; + const incoming = await adapter.parseIncomingMessage(surfaceContext); + + if (!incoming) return { ok: true }; + + const sink = await adapter.createEventSink(surfaceContext, incoming); + + try { + await ctx.handleChatSurfaceMessage(adapter, incoming, sink); + } finally { + await sink.close?.(); + } + + return { ok: true }; + }, + }); + } +} diff --git a/endpoints/context.ts b/endpoints/context.ts new file mode 100644 index 0000000..6f11e04 --- /dev/null +++ b/endpoints/context.ts @@ -0,0 +1,62 @@ +import type { + AdminUser, + ChatSurfaceAdapter, + ChatSurfaceEventSink, + ChatSurfaceIncomingMessage, + IAdminForth, +} from "adminforth"; +import type { ZodType } from "zod"; +import type { + HandleTurnInput, + RunAndPersistAgentResponseInput, + RunAndPersistAgentResponseResult, +} from "../agentTurnService.js"; +import type { ChatSurfaceAdapterWithConnectAction } from "../chatSurfaceService.js"; +import type { PluginOptions } from "../types.js"; + +export type { ChatSurfaceAdapterWithConnectAction } from "../chatSurfaceService.js"; + +export type EndpointResponse = { + setStatus: (code: number, message: string) => void; +}; + +export type SessionTurn = { + prompt: string; + response: string; +}; + +export type AgentEndpointsContext = { + adminforth: IAdminForth; + options: PluginOptions; + parseBody(schema: ZodType, body: unknown, response: EndpointResponse): T | null; + handleTurn(input: HandleTurnInput): Promise; + runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput): Promise; + getSessionTurns(sessionId: string): Promise; + createNewTurn(sessionId: string, prompt: string, response?: string): Promise; + getChatSurfaceConnectActionAdapters(): ChatSurfaceAdapterWithConnectAction[]; + createChatSurfaceLinkToken(surface: string, adminUser: AdminUser): string; + handleChatSurfaceMessage( + adapter: ChatSurfaceAdapter, + incoming: ChatSurfaceIncomingMessage, + sink: ChatSurfaceEventSink, + ): Promise; +}; + +export type CoreEndpointsContext = Pick< + AgentEndpointsContext, + "options" | "parseBody" | "handleTurn" | "runAndPersistAgentResponse" +>; + +export type SessionEndpointsContext = Pick< + AgentEndpointsContext, + "adminforth" | "options" | "parseBody" | "getSessionTurns" | "createNewTurn" +>; + +export type ChatSurfaceEndpointsContext = Pick< + AgentEndpointsContext, + | "adminforth" + | "options" + | "getChatSurfaceConnectActionAdapters" + | "createChatSurfaceLinkToken" + | "handleChatSurfaceMessage" +>; diff --git a/endpoints/core.ts b/endpoints/core.ts new file mode 100644 index 0000000..a68f432 --- /dev/null +++ b/endpoints/core.ts @@ -0,0 +1,257 @@ +import type { IHttpServer } from "adminforth"; +import { logger } from "adminforth"; +import { z } from "zod"; +import { isAbortError, getErrorMessage } from "../errors.js"; +import { sanitizeSpeechText } from "../sanitizeSpeechText.js"; +import { createSseEventEmitter } from "../surfaces/web-sse/createSseEventEmitter.js"; +import type { CurrentPageContext } from "../agent/tools/getUserLocation.js"; +import type { CoreEndpointsContext } from "./context.js"; + +type MulterFile = { + buffer: Buffer; + originalname: string; + mimetype: string; +}; + +type ExpressMulterRequest = { file?: MulterFile }; + +const agentResponseBodySchema = z.object({ + message: z.string(), + sessionId: z.string(), + mode: z.string().nullish(), + timeZone: z.string().optional(), + currentPage: z.custom().optional(), +}).strict(); + +const agentSpeechResponseBodySchema = agentResponseBodySchema.omit({ message: true }); + +export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServer) { + server.endpoint({ + method: 'POST', + path: `/agent/get-placeholder-messages`, + handler: async ({ headers, adminUser }) => { + if (!ctx.options.placeholderMessages) { + return { + messages: [], + }; + } + + const messages = await ctx.options.placeholderMessages({ + adminUser: adminUser!, + headers, + }); + + return { + messages, + }; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/response`, + handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + const data = ctx.parseBody(agentResponseBodySchema, body, response); + if (!data) return; + const emit = createSseEventEmitter(_raw_express_res, { + vercelAiUiMessageStream: true, + closeActiveBlockOnToolStart: true, + }); + + await ctx.handleTurn({ + prompt: data.message, + sessionId: data.sessionId, + modeName: data.mode, + userTimeZone: data.timeZone ?? 'UTC', + currentPage: data.currentPage, + abortSignal, + adminUser: adminUser!, + emit, + failureLogMessage: "Agent response streaming failed", + abortLogMessage: "Agent response streaming aborted by the client", + }); + return null; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/speech-response`, + target: 'upload', + handler: async ({ body, adminUser, response, _raw_express_req, _raw_express_res, abortSignal }) => { + const req = _raw_express_req as ExpressMulterRequest; + const audioAdapter = ctx.options.audioAdapter; + if (!audioAdapter) { + response.setStatus(400, "Audio adapter is not configured for AdminForth Agent"); + return { error: "Audio adapter is not configured for AdminForth Agent" }; + } + const data = ctx.parseBody(agentSpeechResponseBodySchema, body, response); + if (!data) return; + if (!req.file) { + response.setStatus(400, "Audio file is required"); + return { error: "Audio file is required" }; + } + const emit = createSseEventEmitter(_raw_express_res); + + let transcription; + + try { + transcription = await audioAdapter.transcribe({ + buffer: req.file.buffer, + filename: req.file.originalname, + mimeType: req.file.mimetype, + language: "auto", + abortSignal, + }); + } catch (error) { + if (abortSignal.aborted || isAbortError(error)) { + logger.info("Agent speech transcription aborted by the client"); + await emit({ type: "finish" }); + return null; + } + + logger.error(`Agent speech transcription failed:\n${getErrorMessage(error)}`); + await emit({ + type: "error", + error: "Speech transcription failed. Check server logs for details.", + }); + await emit({ type: "finish" }); + return null; + } + + if (abortSignal.aborted) { + await emit({ type: "finish" }); + return null; + } + + const prompt = transcription.text; + if (!prompt) { + await emit({ + type: "error", + error: "Speech transcription is empty", + }); + await emit({ type: "finish" }); + return null; + } + await emit({ + type: "transcript", + text: transcription.text, + language: transcription.language, + }); + + const sessionId = data.sessionId as string; + const currentPage = data.currentPage; + const agentResponse = await ctx.runAndPersistAgentResponse({ + prompt, + sessionId, + modeName: data.mode, + userTimeZone: data.timeZone ?? 'UTC', + currentPage, + abortSignal, + adminUser: adminUser!, + emit: async (event) => { + if (event.type === "tool-call") { + await emit(event); + } + }, + failureLogMessage: "Agent speech response failed", + abortLogMessage: "Agent speech response aborted by the client", + }); + + if (agentResponse.aborted) { + await emit({ type: "finish" }); + return null; + } + + if (agentResponse.failed) { + await emit({ + type: "error", + error: agentResponse.text, + }); + await emit({ type: "finish" }); + return null; + } + + try { + await emit({ + type: "speech-response", + transcript: { + text: transcription.text, + language: transcription.language, + }, + response: { + text: agentResponse.text, + }, + sessionId, + turnId: agentResponse.turnId, + }); + const speech = await audioAdapter.synthesize({ + text: sanitizeSpeechText(agentResponse.text), + stream: true, + streamFormat: "audio", + format: "pcm", + abortSignal, + }); + + await emit({ + type: "audio-start", + mimeType: speech.mimeType, + format: speech.format, + sampleRate: 24000, + channelCount: 1, + bitsPerSample: 16, + }); + + const reader = speech.audioStream.getReader(); + const cancelAudioStream = () => { + void reader.cancel().catch(() => undefined); + }; + + try { + abortSignal.addEventListener("abort", cancelAudioStream, { once: true }); + + while (true) { + if (abortSignal.aborted) { + await reader.cancel().catch(() => undefined); + break; + } + + const { value, done } = await reader.read(); + + if (done) { + break; + } + + if (abortSignal.aborted) { + break; + } + + await emit({ + type: "audio-delta", + value, + }); + } + } finally { + abortSignal.removeEventListener("abort", cancelAudioStream); + reader.releaseLock(); + } + + await emit({ type: "audio-done" }); + await emit({ type: "finish" }); + return null; + } catch (error) { + if (abortSignal.aborted || isAbortError(error)) { + logger.info("Agent speech audio streaming aborted by the client"); + } else { + logger.error(`Agent speech audio streaming failed:\n${error}`); + await emit({ + type: "error", + error: getErrorMessage(error), + }); + } + await emit({ type: "finish" }); + return null; + } + } + }); +} diff --git a/endpoints/sessions.ts b/endpoints/sessions.ts new file mode 100644 index 0000000..6f5d317 --- /dev/null +++ b/endpoints/sessions.ts @@ -0,0 +1,167 @@ +import type { IHttpServer } from "adminforth"; +import { Filters, Sorts } from "adminforth"; +import { randomUUID } from "crypto"; +import { z } from "zod"; +import type { SessionEndpointsContext } from "./context.js"; + +const addSystemMessageBodySchema = z.object({ + sessionId: z.string(), + systemMessage: z.string(), +}).strict(); + +const getSessionsBodySchema = z.object({ + limit: z.number().optional(), +}).strict(); + +const sessionIdBodySchema = z.object({ + sessionId: z.string(), +}).strict(); + +const createSessionBodySchema = z.object({ + triggerMessage: z.string().optional(), +}).strict(); + +export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHttpServer) { + server.endpoint({ + method: 'POST', + path: `/agent/get-sessions`, + handler: async ({body, adminUser, response }) => { + const data = ctx.parseBody(getSessionsBodySchema, body, response); + if (!data) return; + const userId = adminUser!.pk; + const limit = data.limit ?? 20; + const sessions = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).list( + [Filters.EQ(ctx.options.sessionResource.askerIdField, userId)], limit, undefined, [Sorts.DESC(ctx.options.sessionResource.createdAtField)] + ); + return { + sessions: sessions.map((session) => ({ + sessionId: session[ctx.options.sessionResource.idField], + title: session[ctx.options.sessionResource.titleField], + timestamp: session[ctx.options.sessionResource.createdAtField], + })), + }; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/get-session-info`, + handler: async ({body, adminUser, response }) => { + const parsedBody = sessionIdBodySchema.safeParse(body); + if (!parsedBody.success) { + response.setStatus(422, parsedBody.error.message); + return; + } + const userId = adminUser!.pk; + const sessionId = parsedBody.data.sessionId; + const session = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).get( + [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] + ); + if (!session) { + return { + error: 'Session not found' + }; + } + if (session[ctx.options.sessionResource.askerIdField] !== userId) { + return { + error: 'Unauthorized' + }; + } + const turns = await ctx.getSessionTurns(sessionId); + return { + session: { + sessionId, + title: session[ctx.options.sessionResource.titleField], + timestamp: session[ctx.options.sessionResource.createdAtField], + messages: turns.flatMap(turn => { + const messages: Array<{ text: string; role: 'user' | 'assistant' }> = []; + if (turn.prompt) { + messages.push({ + text: turn.prompt, + role: 'user', + }); + } + if (turn.response && turn.response !== "not_finished") { + messages.push({ + text: turn.response, + role: 'assistant', + }); + } + return messages; + }), + }, + }; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/create-session`, + handler: async ({body, adminUser, response }) => { + const data = ctx.parseBody(createSessionBodySchema, body, response); + if (!data) return; + const triggerMessage = data.triggerMessage; + const userId = adminUser!.pk; + const title = triggerMessage?.slice(0, 40) || "New Session"; + const newSession = { + [ctx.options.sessionResource.idField]: randomUUID(), + [ctx.options.sessionResource.titleField]: title, + [ctx.options.sessionResource.askerIdField]: userId, + }; + await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).create(newSession); + return { + sessionId: newSession[ctx.options.sessionResource.idField], + title: newSession[ctx.options.sessionResource.titleField], + timestamp: newSession[ctx.options.sessionResource.createdAtField], + messages: [] + }; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/delete-session`, + handler: async ({body, adminUser, response }) => { + const data = ctx.parseBody(sessionIdBodySchema, body, response); + if (!data) return; + const sessionId = data.sessionId; + const userId = adminUser!.pk; + const session = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).get( + [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] + ); + if (!session) { + return { + error: 'Session not found' + }; + } + if (session[ctx.options.sessionResource.askerIdField] !== userId) { + return { + error: 'Unauthorized' + }; + } + await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).delete(sessionId); + const turns = await ctx.adminforth.resource(ctx.options.turnResource.resourceId).list( + [Filters.EQ(ctx.options.turnResource.sessionIdField, sessionId)] + ); + for (const turn of turns) { + await ctx.adminforth.resource(ctx.options.turnResource.resourceId).delete(turn[ctx.options.turnResource.idField]); + } + return { + ok: true + }; + } + }); + + server.endpoint({ + method: 'POST', + path: `/agent/add-system-message-to-turns`, + handler: async ({body, response }) => { + const data = ctx.parseBody(addSystemMessageBodySchema, body, response); + if (!data) return; + await ctx.createNewTurn(data.sessionId, data.systemMessage); + return { + ok: true + } + } + }) +} diff --git a/errors.ts b/errors.ts new file mode 100644 index 0000000..1b766b6 --- /dev/null +++ b/errors.ts @@ -0,0 +1,14 @@ +export function isAbortError(error: unknown): boolean { + return ( + error instanceof DOMException && error.name === "AbortError" + ) || ( + typeof error === "object" && + error !== null && + "name" in error && + (error.name === "AbortError" || error.name === "APIUserAbortError") + ); +} + +export function getErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/index.ts b/index.ts index da434e5..570a29c 100644 --- a/index.ts +++ b/index.ts @@ -1,138 +1,33 @@ import type { - AdminUser, AdminForthResource, - ChatSurfaceAdapter, - ChatSurfaceEventSink, - ChatSurfaceIncomingMessage, - IAdminForthEndpointHandlerInput, IAdminForth, IHttpServer, } from "adminforth"; -import { AdminForthPlugin, logger, Filters, Sorts } from "adminforth"; +import { AdminForthPlugin } from "adminforth"; import type { PluginOptions } from './types.js'; -import { randomUUID } from 'crypto'; -import { HumanMessage, SystemMessage } from "langchain"; import { MemorySaver, type BaseCheckpointSaver } from "@langchain/langgraph"; import { z } from "zod"; -import { createAgentChatModel, callAgent } from "./agent/simpleAgent.js"; import { AdminForthCheckpointSaver } from "./agent/checkpointer.js"; -import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js"; -import { detectUserLanguage, type PreviousUserMessage } from "./agent/languageDetect.js"; -import { prepareApiBasedTools as buildApiBasedTools } from './apiBasedTools.js'; -import type { AgentEventEmitter } from "./agentEvents.js"; -import { createSseEventEmitter } from "./surfaces/web-sse/createSseEventEmitter.js"; -import { appendCustomSystemPrompt, buildAgentSystemPrompt, buildAgentTurnSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./agent/systemPrompt.js"; -import type { CurrentPageContext } from "./agent/tools/getUserLocation.js"; -import { sanitizeSpeechText } from "./sanitizeSpeechText.js"; +import { appendCustomSystemPrompt, buildAgentSystemPrompt, DEFAULT_AGENT_SYSTEM_PROMPT} from "./agent/systemPrompt.js"; +import { setupCoreEndpoints } from "./endpoints/core.js"; +import { setupSessionEndpoints } from "./endpoints/sessions.js"; +import { setupChatSurfaceEndpoints } from "./endpoints/chatSurfaces.js"; +import type { AgentEndpointsContext } from "./endpoints/context.js"; +import { AgentSessionStore } from "./sessionStore.js"; +import { ChatSurfaceService } from "./chatSurfaceService.js"; +import { AgentTurnService } from "./agentTurnService.js"; export type { AgentEvent, AgentEventEmitter } from "./agentEvents.js"; -type MulterFile = { - buffer: Buffer; - originalname: string; - mimetype: string; -}; - -type ExpressMulterRequest = { file?: MulterFile }; - -type ChatSurfaceConnectAction = { - type: "url"; - label: string; - url: string; -}; - -type ChatSurfaceAdapterWithConnectAction = ChatSurfaceAdapter & { - createConnectAction?(input: { - token: string; - }): ChatSurfaceConnectAction | Promise; -}; - -type ChatSurfaceLinkTokenPayload = { - surface: string; - adminUserId: AdminUser["pk"]; - expiresAt: number; -}; - -type AgentTurnRunInput = { - prompt: string; - sessionId: string; - turnId: string; - previousUserMessages: PreviousUserMessage[]; - modeName?: string | null; - userTimeZone: string; - currentPage?: CurrentPageContext; - abortSignal?: AbortSignal; - adminUser: AdminUser; - sequenceDebugCollector: ReturnType; - emit?: AgentEventEmitter; -}; - -type RunAndPersistAgentResponseInput = - Omit & { - failureLogMessage: string; - abortLogMessage: string; - }; - -type HandleTurnInput = Omit & { - emit: AgentEventEmitter; - failureLogMessage?: string; - abortLogMessage?: string; -}; - -const agentResponseBodySchema = z.object({ - message: z.string(), - sessionId: z.string(), - mode: z.string().nullish(), - timeZone: z.string().optional(), - currentPage: z.custom().optional(), -}).strict(); - -const agentSpeechResponseBodySchema = agentResponseBodySchema.omit({message: true}) - -const addSystemMessageBodySchema = z.object({ - sessionId: z.string(), - systemMessage: z.string(), -}).strict(); - -const getSessionsBodySchema = z.object({ - limit: z.number().optional(), -}).strict(); - -const sessionIdBodySchema = z.object({ - sessionId: z.string(), -}).strict(); - -const createSessionBodySchema = z.object({ - triggerMessage: z.string().optional(), -}).strict(); - -const VEGA_LITE_FENCE_START = "```vega-lite"; -const COMPLETE_VEGA_LITE_BLOCK_RE = /```vega-lite[\s\S]*?```/; -const DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD = "externalUserId"; -const CHAT_SURFACE_LINK_TOKEN_TTL_MS = 60 * 1000; - -function isAbortError(error: unknown): boolean { - return ( - error instanceof DOMException && error.name === "AbortError" - ) || ( - typeof error === "object" && - error !== null && - "name" in error && - (error.name === "AbortError" || error.name === "APIUserAbortError") - ); -} - -function getErrorMessage(error: unknown): string { - return error instanceof Error ? error.message : String(error); -} - export default class AdminForthAgentPlugin extends AdminForthPlugin { options: PluginOptions; agentSystemPromptPromise: Promise; private checkpointer: BaseCheckpointSaver | null = null; - private chatSurfaceLinkTokens = new Map(); + private sessionStore: AgentSessionStore; + private agentTurnService: AgentTurnService; + private chatSurfaceService: ChatSurfaceService; private chatSurfaceSettingsPageRegistered = false; private parseBody( schema: z.ZodType, @@ -146,72 +41,6 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { } return parsed.data; } - private async createNewTurn(sessionId: string, prompt: string, response?: string) { - const turnId = randomUUID(); - const turnRecord = { - [this.options.turnResource.idField]: turnId, - [this.options.turnResource.sessionIdField]: sessionId, - [this.options.turnResource.promptField]: prompt, - [this.options.turnResource.responseField]: response || "not_finished", - }; - const newTurn = await this.adminforth.resource(this.options.turnResource.resourceId).create(turnRecord); - return newTurn.createdRecord[this.options.turnResource.idField]; - } - - private async getSessionTurns(sessionId: string) { - const turns = await this.adminforth.resource(this.options.turnResource.resourceId).list( - [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], - undefined, - undefined, - [Sorts.ASC(this.options.turnResource.createdAtField)] - ); - return turns.map(turn => ({ - prompt: turn[this.options.turnResource.promptField], - response: turn[this.options.turnResource.responseField], - })); - } - - private async getPreviousUserMessages(sessionId: string) { - const turns = await this.adminforth.resource(this.options.turnResource.resourceId).list( - [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], - 2, - undefined, - [Sorts.DESC(this.options.turnResource.createdAtField)] - ); - return turns - .reverse() - .map((turn): PreviousUserMessage => ({ - text: turn[this.options.turnResource.promptField], - })); - } - - private getChatSurfaceSessionId(incoming: ChatSurfaceIncomingMessage) { - return `${incoming.surface}:${incoming.externalConversationId}`; - } - - private async getOrCreateChatSurfaceSession( - incoming: ChatSurfaceIncomingMessage, - adminUser: AdminUser, - ) { - const sessionId = this.getChatSurfaceSessionId(incoming); - const sessionResource = this.adminforth.resource(this.options.sessionResource.resourceId); - const session = await sessionResource.get( - [Filters.EQ(this.options.sessionResource.idField, sessionId)] - ); - - if (session) { - return sessionId; - } - - await sessionResource.create({ - [this.options.sessionResource.idField]: sessionId, - [this.options.sessionResource.titleField]: incoming.prompt.slice(0, 40) || "New Session", - [this.options.sessionResource.askerIdField]: adminUser.pk, - }); - - return sessionId; - } - private getCheckpointer() { if (this.checkpointer) return this.checkpointer; @@ -230,15 +59,25 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { ].filter((resourceId): resourceId is string => Boolean(resourceId)); } - private getChatSurfaceConnectActionAdapters() { - return (this.options.chatSurfaceAdapters ?? []) - .map((adapter) => adapter as ChatSurfaceAdapterWithConnectAction) - .filter((adapter) => adapter.createConnectAction); - } - constructor(options: PluginOptions) { super(options, import.meta.url); this.options = options; + this.sessionStore = new AgentSessionStore(() => this.adminforth, this.options); + this.agentTurnService = new AgentTurnService({ + getAdminforth: () => this.adminforth, + getPluginInstanceId: () => this.pluginInstanceId, + options: this.options, + sessionStore: this.sessionStore, + getCheckpointer: this.getCheckpointer.bind(this), + getInternalAgentResourceIds: this.getInternalAgentResourceIds.bind(this), + getAgentSystemPrompt: () => this.agentSystemPromptPromise, + }); + this.chatSurfaceService = new ChatSurfaceService( + () => this.adminforth, + this.options, + this.sessionStore, + this.agentTurnService.handleTurn.bind(this.agentTurnService), + ); this.agentSystemPromptPromise = Promise.resolve( appendCustomSystemPrompt(DEFAULT_AGENT_SYSTEM_PROMPT, this.options.systemPrompt), ); @@ -263,7 +102,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { hasAudioAdapter: Boolean(this.options.audioAdapter), } }); - if (this.getChatSurfaceConnectActionAdapters().length && !this.chatSurfaceSettingsPageRegistered) { + if (this.chatSurfaceService.getConnectActionAdapters().length && !this.chatSurfaceSettingsPageRegistered) { if (!this.adminforth.config.auth!.userMenuSettingsPages) { this.adminforth.config.auth!.userMenuSettingsPages = []; } @@ -314,869 +153,22 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { return `single`; } - private async runAgentTurn(input: AgentTurnRunInput) { - let fullResponse = ""; - let bufferedTextDelta = ""; - let isRenderingVegaLite = false; - const maxTokens = this.options.maxTokens ?? 1000; - const selectedMode = this.options.modes.find((mode) => mode.name === input.modeName) ?? this.options.modes[0]; - const [primaryModelSpec, summaryModelSpec] = await Promise.all([ - createAgentChatModel({ - adapter: selectedMode.completionAdapter, - maxTokens, - purpose: "primary", - }), - createAgentChatModel({ - adapter: selectedMode.completionAdapter, - maxTokens, - purpose: "summary", - }), - ]); - const model = primaryModelSpec.model; - const summaryModel = summaryModelSpec.model; - const modelMiddleware = primaryModelSpec.middleware; - - const userLanguage = await detectUserLanguage(selectedMode.completionAdapter, input.prompt, input.previousUserMessages) - .catch((error) => { - if (input.abortSignal?.aborted || isAbortError(error)) { - throw error; - } - - logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`); - return null; - }); - const systemPrompt = buildAgentTurnSystemPrompt({ - agentSystemPrompt: await this.agentSystemPromptPromise, - adminUser: input.adminUser, - usernameField: this.adminforth.config.auth!.usernameField, - userLanguage, - }); - const apiBasedTools = buildApiBasedTools( - this.adminforth, - this.getInternalAgentResourceIds(), - ); - - const stream = await callAgent({ - name: `adminforth-agent-${this.pluginInstanceId}`, - model, - summaryModel, - modelMiddleware, - checkpointer: this.getCheckpointer(), - messages: [ - new SystemMessage(systemPrompt), - new HumanMessage(input.prompt), - ], - adminUser: input.adminUser, - adminforth: this.adminforth, - apiBasedTools, - customComponentsDir: this.adminforth.config.customization.customComponentsDir ?? "custom", - sessionId: input.sessionId, - turnId: input.turnId, - currentPage: input.currentPage, - userTimeZone: input.userTimeZone, - abortSignal: input.abortSignal, - emitToolCallEvent: (event) => { - input.sequenceDebugCollector.handleToolCallEvent(event); - void input.emit?.({ - type: "tool-call", - data: event, - }); - }, - sequenceDebugSink: input.sequenceDebugCollector, - }); - - for await (const rawChunk of stream as AsyncIterable<[any, any]>) { - if (input.abortSignal?.aborted) { - throw new DOMException("This operation was aborted", "AbortError"); - } - - const [token, metadata] = rawChunk; - - const nodeName = - typeof metadata?.langgraph_node === "string" - ? metadata.langgraph_node - : ""; - - if (nodeName && !["model", "model_request"].includes(nodeName)) { - continue; - } - - const blocks = Array.isArray(token?.contentBlocks) - ? token.contentBlocks - : Array.isArray(token?.content) - ? token.content - : []; - const reasoningDelta = blocks - .filter((b: any) => b?.type === "reasoning") - .map((b: any) => String(b.reasoning ?? "")) - .join(""); - - const textDelta = blocks - .filter((b: any) => b?.type === "text") - .map((b: any) => String(b.text ?? "")) - .join(""); - - if (reasoningDelta) { - await input.emit?.({ - type: "reasoning-delta", - delta: reasoningDelta, - }); - } - - if (textDelta) { - fullResponse += textDelta; - bufferedTextDelta += textDelta; - - if ( - bufferedTextDelta.includes(VEGA_LITE_FENCE_START) && - !COMPLETE_VEGA_LITE_BLOCK_RE.test(bufferedTextDelta) - ) { - if (!isRenderingVegaLite) { - isRenderingVegaLite = true; - await input.emit?.({ - type: "rendering", - phase: "start", - label: "Rendering...", - }); - } - continue; - } - - if (isRenderingVegaLite) { - isRenderingVegaLite = false; - await input.emit?.({ - type: "rendering", - phase: "end", - label: "Rendering...", - }); - } - - const streamableLength = bufferedTextDelta.includes(VEGA_LITE_FENCE_START) - ? bufferedTextDelta.length - : bufferedTextDelta.length - getPartialVegaLiteFenceStartLength(bufferedTextDelta); - - if (!streamableLength) { - continue; - } - - await input.emit?.({ - type: "text-delta", - delta: bufferedTextDelta.slice(0, streamableLength), - }); - bufferedTextDelta = bufferedTextDelta.slice(streamableLength); - } - } - - if (isRenderingVegaLite) { - await input.emit?.({ - type: "rendering", - phase: "end", - label: "Rendering...", - }); - } - - if (bufferedTextDelta) { - await input.emit?.({ - type: "text-delta", - delta: bufferedTextDelta, - }); - } - - return { - text: fullResponse, - }; - } - - private async runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput) { - const previousUserMessages = await this.getPreviousUserMessages(input.sessionId); - const turnId = await this.createNewTurn(input.sessionId, input.prompt); - await this.adminforth.resource(this.options.sessionResource.resourceId).update(input.sessionId, { - [this.options.sessionResource.createdAtField]: new Date().toISOString(), - }); - const sequenceDebugCollector = createSequenceDebugCollector(); - let fullResponse = ""; - let aborted = false; - let failed = false; - - try { - const agentResponse = await this.runAgentTurn({ - prompt: input.prompt, - sessionId: input.sessionId, - turnId, - previousUserMessages, - modeName: input.modeName, - userTimeZone: input.userTimeZone, - currentPage: input.currentPage, - abortSignal: input.abortSignal, - adminUser: input.adminUser, - sequenceDebugCollector, - emit: input.emit, - }); - fullResponse = agentResponse.text; - } catch (error) { - if (input.abortSignal?.aborted || isAbortError(error)) { - aborted = true; - logger.info(input.abortLogMessage); - } else { - failed = true; - fullResponse = getErrorMessage(error); - logger.error(`${input.failureLogMessage}:\n${fullResponse}`); - } - } - - sequenceDebugCollector.flush(); - const turnUpdates: Record = { - [this.options.turnResource.responseField]: fullResponse, - }; - - if (this.options.turnResource.debugField) { - turnUpdates[this.options.turnResource.debugField] = sequenceDebugCollector.getHistory(); - } - - await this.adminforth.resource(this.options.turnResource.resourceId).update(turnId, turnUpdates); - - return { - text: fullResponse, - turnId, - aborted, - failed, - }; - } - - async handleTurn(input: HandleTurnInput) { - await input.emit({ - type: "turn-started", - messageId: randomUUID(), - }); - - const agentResponse = await this.runAndPersistAgentResponse({ - prompt: input.prompt, - sessionId: input.sessionId, - modeName: input.modeName, - userTimeZone: input.userTimeZone, - currentPage: input.currentPage, - abortSignal: input.abortSignal, - adminUser: input.adminUser, - emit: input.emit, - failureLogMessage: input.failureLogMessage ?? "Agent response failed", - abortLogMessage: input.abortLogMessage ?? "Agent response aborted", - }); - - if (agentResponse.failed) { - await input.emit({ - type: "error", - error: agentResponse.text, - }); - } else if (!agentResponse.aborted) { - await input.emit({ - type: "response", - text: agentResponse.text, - sessionId: input.sessionId, - turnId: agentResponse.turnId, - }); - } - - await input.emit({ - type: "finish", - }); - - return agentResponse; - } - - private createChatSurfaceEventEmitter(sink: ChatSurfaceEventSink): AgentEventEmitter { - return async (event) => { - if (event.type === "text-delta") { - await sink.emit({ - type: "text_delta", - delta: event.delta, - }); - return; - } - - if (event.type === "response") { - await sink.emit({ - type: "done", - text: event.text, - }); - return; - } - - if (event.type === "error") { - await sink.emit({ - type: "error", - message: event.error, - }); - } - }; - } - - private createChatSurfaceLinkToken(surface: string, adminUser: AdminUser) { - for (const [token, payload] of this.chatSurfaceLinkTokens) { - if (payload.expiresAt <= Date.now()) { - this.chatSurfaceLinkTokens.delete(token); - } - } - - const token = randomUUID(); - this.chatSurfaceLinkTokens.set(token, { - surface, - adminUserId: adminUser.pk, - expiresAt: Date.now() + CHAT_SURFACE_LINK_TOKEN_TTL_MS, - }); - - return token; - } - - private consumeChatSurfaceLinkToken(surface: string, token: string) { - const payload = this.chatSurfaceLinkTokens.get(token); - this.chatSurfaceLinkTokens.delete(token); - - if (!payload || payload.surface !== surface || payload.expiresAt <= Date.now()) { - return null; - } - - return payload; - } - - private async handleChatSurfaceLink( - incoming: ChatSurfaceIncomingMessage, - sink: ChatSurfaceEventSink, - ) { - if (typeof incoming.metadata?.startPayload !== "string") { - return false; - } - - const payload = this.consumeChatSurfaceLinkToken(incoming.surface, incoming.metadata.startPayload); - if (!payload) { - await sink.emit({ - type: "error", - message: "This chat surface link is expired or invalid. Please start linking again from AdminForth.", - }); - return true; - } - const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; - const authResourceId = this.adminforth.config.auth!.usersResourceId!; - const authResource = this.adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!; - const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!; - const adminUserRecord = await this.adminforth.resource(authResourceId).get([ - Filters.EQ(primaryKeyField, payload.adminUserId), - ]); - - await this.adminforth.resource(authResourceId).update(payload.adminUserId, { - [externalUserIdField]: { - ...(adminUserRecord[externalUserIdField] ?? {}), - [incoming.surface]: incoming.externalUserId, - }, - }); - await sink.emit({ - type: "done", - text: `${incoming.surface} account connected to AdminForth.`, - }); - - return true; - } - - private async handleChatSurfaceMessage( - adapter: ChatSurfaceAdapter, - incoming: ChatSurfaceIncomingMessage, - sink: ChatSurfaceEventSink, - ) { - if (await this.handleChatSurfaceLink(incoming, sink)) { - return; - } - - const authResourceId = this.adminforth.config.auth!.usersResourceId!; - const authResource = this.adminforth.config.resources.find((resource) => resource.resourceId === authResourceId)!; - const primaryKeyField = authResource.columns.find((column) => column.primaryKey)!.name!; - const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; - const adminUserRecord = ( - await this.adminforth.resource(authResourceId).list(Filters.IS_NOT_EMPTY(externalUserIdField)) - ).find((user) => user[externalUserIdField]?.[adapter.name] === incoming.externalUserId); - - if (!adminUserRecord) { - await sink.emit({ - type: "error", - message: "This chat account is not authorized to use AdminForth Agent.", - }); - return; - } - - const adminUser = { - pk: adminUserRecord[primaryKeyField], - username: adminUserRecord[this.adminforth.config.auth!.usernameField], - dbUser: adminUserRecord, - }; - - await this.handleTurn({ - prompt: incoming.prompt, - sessionId: await this.getOrCreateChatSurfaceSession(incoming, adminUser), - modeName: incoming.modeName, - userTimeZone: incoming.userTimeZone ?? "UTC", - adminUser, - emit: this.createChatSurfaceEventEmitter(sink), - failureLogMessage: `Agent ${incoming.surface} surface response failed`, - abortLogMessage: `Agent ${incoming.surface} surface response aborted`, - }); - } - setupEndpoints(server: IHttpServer) { - if (this.getChatSurfaceConnectActionAdapters().length) { - server.endpoint({ - method: "POST", - path: "/agent/surfaces/connectable", - handler: async ({ adminUser }) => { - const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; - const externalIds = adminUser.dbUser[externalUserIdField] ?? {}; - - return { - surfaces: this.getChatSurfaceConnectActionAdapters().map((adapter) => ({ - name: adapter.name, - externalUserId: externalIds[adapter.name] ?? null, - })), - }; - }, - }); - } - - for (const adapter of this.options.chatSurfaceAdapters ?? []) { - const connectActionAdapter = adapter as ChatSurfaceAdapterWithConnectAction; - if (connectActionAdapter.createConnectAction) { - server.endpoint({ - method: "POST", - path: `/agent/surface/${adapter.name}/connect-action`, - handler: async ({ adminUser }) => { - const token = this.createChatSurfaceLinkToken(adapter.name, adminUser); - const action = await connectActionAdapter.createConnectAction!({ token }); - - return { - action, - }; - }, - }); - server.endpoint({ - method: "POST", - path: `/agent/surface/${adapter.name}/disconnect`, - handler: async ({ adminUser }) => { - const externalUserIdField = this.options.chatExternalIdsField ?? DEFAULT_ADMIN_USER_EXTERNAL_USER_ID_FIELD; - const externalIds = { - ...(adminUser.dbUser[externalUserIdField] ?? {}), - }; - - delete externalIds[adapter.name]; - - await this.adminforth.resource(this.adminforth.config.auth!.usersResourceId!).update(adminUser.pk, { - [externalUserIdField]: externalIds, - }); - - return { - ok: true, - }; - }, - }); - } - - server.endpoint({ - method: "POST", - noAuth: true, - path: `/agent/surface/${adapter.name}/webhook`, - handler: async (ctx: IAdminForthEndpointHandlerInput) => { - const surfaceContext = { - body: ctx.body, - headers: ctx.headers, - abortSignal: ctx.abortSignal, - rawRequest: ctx._raw_express_req, - rawResponse: ctx._raw_express_res, - }; - const incoming = await adapter.parseIncomingMessage(surfaceContext); - - if (!incoming) return { ok: true }; - - const sink = await adapter.createEventSink(surfaceContext, incoming); - - try { - await this.handleChatSurfaceMessage(adapter, incoming, sink); - } finally { - await sink.close?.(); - } - - return { ok: true }; - }, - }); - } - - server.endpoint({ - method: 'POST', - path: `/agent/get-placeholder-messages`, - handler: async ({ headers, adminUser }) => { - if (!this.options.placeholderMessages) { - return { - messages: [], - }; - } - - const messages = await this.options.placeholderMessages({ - adminUser: adminUser, - headers, - }); - - return { - messages, - }; - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/response`, - handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { - const data = this.parseBody(agentResponseBodySchema, body, response); - if (!data) return; - const emit = createSseEventEmitter(_raw_express_res, { - vercelAiUiMessageStream: true, - closeActiveBlockOnToolStart: true, - }); - - await this.handleTurn({ - prompt: data.message, - sessionId: data.sessionId, - modeName: data.mode, - userTimeZone: data.timeZone ?? 'UTC', - currentPage: data.currentPage, - abortSignal, - adminUser: adminUser, - emit, - failureLogMessage: "Agent response streaming failed", - abortLogMessage: "Agent response streaming aborted by the client", - }); - return null; - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/speech-response`, - target: 'upload', - handler: async ({ body, adminUser, response, _raw_express_req, _raw_express_res, abortSignal }) => { - const req = _raw_express_req as ExpressMulterRequest; - const audioAdapter = this.options.audioAdapter; - if (!audioAdapter) { - response.setStatus(400, "Audio adapter is not configured for AdminForth Agent"); - return { error: "Audio adapter is not configured for AdminForth Agent" }; - } - const data = this.parseBody(agentSpeechResponseBodySchema, body, response); - if (!data) return; - if (!req.file) { - response.setStatus(400, "Audio file is required"); - return { error: "Audio file is required" }; - } - const emit = createSseEventEmitter(_raw_express_res); - - let transcription; - - try { - transcription = await audioAdapter.transcribe({ - buffer: req.file.buffer, - filename: req.file.originalname, - mimeType: req.file.mimetype, - language: "auto", - abortSignal, - }); - } catch (error) { - if (abortSignal.aborted || isAbortError(error)) { - logger.info("Agent speech transcription aborted by the client"); - await emit({ type: "finish" }); - return null; - } - - logger.error(`Agent speech transcription failed:\n${getErrorMessage(error)}`); - await emit({ - type: "error", - error: "Speech transcription failed. Check server logs for details.", - }); - await emit({ type: "finish" }); - return null; - } - - if (abortSignal.aborted) { - await emit({ type: "finish" }); - return null; - } - - const prompt = transcription.text; - if (!prompt) { - await emit({ - type: "error", - error: "Speech transcription is empty", - }); - await emit({ type: "finish" }); - return null; - } - await emit({ - type: "transcript", - text: transcription.text, - language: transcription.language, - }); - - const sessionId = data.sessionId as string; - const currentPage = data.currentPage; - const agentResponse = await this.runAndPersistAgentResponse({ - prompt, - sessionId, - modeName: data.mode, - userTimeZone: data.timeZone ?? 'UTC', - currentPage, - abortSignal, - adminUser: adminUser, - emit: async (event) => { - if (event.type === "tool-call") { - await emit(event); - } - }, - failureLogMessage: "Agent speech response failed", - abortLogMessage: "Agent speech response aborted by the client", - }); - - if (agentResponse.aborted) { - await emit({ type: "finish" }); - return null; - } - - if (agentResponse.failed) { - await emit({ - type: "error", - error: agentResponse.text, - }); - await emit({ type: "finish" }); - return null; - } - - try { - await emit({ - type: "speech-response", - transcript: { - text: transcription.text, - language: transcription.language, - }, - response: { - text: agentResponse.text, - }, - sessionId, - turnId: agentResponse.turnId, - }); - const speech = await audioAdapter.synthesize({ - text: sanitizeSpeechText(agentResponse.text), - stream: true, - streamFormat: "audio", - format: "pcm", - abortSignal, - }); - - await emit({ - type: "audio-start", - mimeType: speech.mimeType, - format: speech.format, - sampleRate: 24000, - channelCount: 1, - bitsPerSample: 16, - }); - - const reader = speech.audioStream.getReader(); - const cancelAudioStream = () => { - void reader.cancel().catch(() => undefined); - }; - - try { - abortSignal.addEventListener("abort", cancelAudioStream, { once: true }); - - while (true) { - if (abortSignal.aborted) { - await reader.cancel().catch(() => undefined); - break; - } - - const { value, done } = await reader.read(); - - if (done) { - break; - } - - if (abortSignal.aborted) { - break; - } - - await emit({ - type: "audio-delta", - value, - }); - } - } finally { - abortSignal.removeEventListener("abort", cancelAudioStream); - reader.releaseLock(); - } - - await emit({ type: "audio-done" }); - await emit({ type: "finish" }); - return null; - } catch (error) { - if (abortSignal.aborted || isAbortError(error)) { - logger.info("Agent speech audio streaming aborted by the client"); - } else { - logger.error(`Agent speech audio streaming failed:\n${error}`); - await emit({ - type: "error", - error: getErrorMessage(error), - }); - } - await emit({ type: "finish" }); - return null; - } - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/get-sessions`, - handler: async ({body, adminUser, response }) => { - const data = this.parseBody(getSessionsBodySchema, body, response); - if (!data) return; - const userId = adminUser.pk; - const limit = data.limit ?? 20; - const sessions = await this.adminforth.resource(this.options.sessionResource.resourceId).list( - [Filters.EQ(this.options.sessionResource.askerIdField, userId)], limit, undefined, [Sorts.DESC(this.options.sessionResource.createdAtField)] - ); - return { - sessions: sessions.map((session) => ({ - sessionId: session[this.options.sessionResource.idField], - title: session[this.options.sessionResource.titleField], - timestamp: session[this.options.sessionResource.createdAtField], - })), - }; - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/get-session-info`, - handler: async ({body, adminUser, response }) => { - const parsedBody = sessionIdBodySchema.safeParse(body); - if (!parsedBody.success) { - response.setStatus(422, parsedBody.error.message); - return; - } - const userId = adminUser.pk; - const sessionId = parsedBody.data.sessionId; - const session = await this.adminforth.resource(this.options.sessionResource.resourceId).get( - [Filters.EQ(this.options.sessionResource.idField, sessionId)] - ); - if (!session) { - return { - error: 'Session not found' - }; - } - if (session[this.options.sessionResource.askerIdField] !== userId) { - return { - error: 'Unauthorized' - }; - } - const turns = await this.getSessionTurns(sessionId); - return { - session: { - sessionId, - title: session[this.options.sessionResource.titleField], - timestamp: session[this.options.sessionResource.createdAtField], - messages: turns.flatMap(turn => { - const messages: Array<{ text: string; role: 'user' | 'assistant' }> = []; - if (turn.prompt) { - messages.push({ - text: turn.prompt, - role: 'user', - }); - } - if (turn.response && turn.response !== "not_finished") { - messages.push({ - text: turn.response, - role: 'assistant', - }); - } - return messages; - }), - }, - }; - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/create-session`, - handler: async ({body, adminUser, response }) => { - const data = this.parseBody(createSessionBodySchema, body, response); - if (!data) return; - const triggerMessage = data.triggerMessage; - const userId = adminUser.pk; - const title = triggerMessage?.slice(0, 40) || "New Session"; - const newSession = { - [this.options.sessionResource.idField]: randomUUID(), - [this.options.sessionResource.titleField]: title, - [this.options.sessionResource.askerIdField]: userId, - }; - await this.adminforth.resource(this.options.sessionResource.resourceId).create(newSession); - return { - sessionId: newSession[this.options.sessionResource.idField], - title: newSession[this.options.sessionResource.titleField], - timestamp: newSession[this.options.sessionResource.createdAtField], - messages: [] - }; - } - }); - server.endpoint({ - method: 'POST', - path: `/agent/delete-session`, - handler: async ({body, adminUser, response }) => { - const data = this.parseBody(sessionIdBodySchema, body, response); - if (!data) return; - const sessionId = data.sessionId; - const userId = adminUser.pk; - const session = await this.adminforth.resource(this.options.sessionResource.resourceId).get( - [Filters.EQ(this.options.sessionResource.idField, sessionId)] - ); - if (!session) { - return { - error: 'Session not found' - }; - } - if (session[this.options.sessionResource.askerIdField] !== userId) { - return { - error: 'Unauthorized' - }; - } - await this.adminforth.resource(this.options.sessionResource.resourceId).delete(sessionId); - const turns = await this.adminforth.resource(this.options.turnResource.resourceId).list( - [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)] - ); - for (const turn of turns) { - await this.adminforth.resource(this.options.turnResource.resourceId).delete(turn[this.options.turnResource.idField]); - } - return { - ok: true - }; - } - }), - server.endpoint({ - method: 'POST', - path: `/agent/add-system-message-to-turns`, - handler: async ({body, response }) => { - const data = this.parseBody(addSystemMessageBodySchema, body, response); - if (!data) return; - await this.createNewTurn(data.sessionId, data.systemMessage); - return { - ok: true - } - } - }) - } -} - -function getPartialVegaLiteFenceStartLength(text: string): number { - for (let length = Math.min(text.length, VEGA_LITE_FENCE_START.length - 1); length > 0; length -= 1) { - if (VEGA_LITE_FENCE_START.startsWith(text.slice(-length))) { - return length; - } + const endpointContext = { + adminforth: this.adminforth, + options: this.options, + parseBody: this.parseBody.bind(this), + handleTurn: this.agentTurnService.handleTurn.bind(this.agentTurnService), + runAndPersistAgentResponse: this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), + getSessionTurns: this.sessionStore.getSessionTurns.bind(this.sessionStore), + createNewTurn: this.sessionStore.createNewTurn.bind(this.sessionStore), + getChatSurfaceConnectActionAdapters: this.chatSurfaceService.getConnectActionAdapters.bind(this.chatSurfaceService), + createChatSurfaceLinkToken: this.chatSurfaceService.createLinkToken.bind(this.chatSurfaceService), + handleChatSurfaceMessage: this.chatSurfaceService.handleMessage.bind(this.chatSurfaceService), + } satisfies AgentEndpointsContext; + + setupCoreEndpoints(endpointContext, server); + setupSessionEndpoints(endpointContext, server); + setupChatSurfaceEndpoints(endpointContext, server); } - - return 0; } diff --git a/sessionStore.ts b/sessionStore.ts new file mode 100644 index 0000000..c12cf4b --- /dev/null +++ b/sessionStore.ts @@ -0,0 +1,79 @@ +import type { AdminUser, IAdminForth } from "adminforth"; +import { Filters, Sorts } from "adminforth"; +import { randomUUID } from "crypto"; +import type { ChatSurfaceIncomingMessage } from "adminforth"; +import type { PreviousUserMessage } from "./agent/languageDetect.js"; +import type { PluginOptions } from "./types.js"; + +export class AgentSessionStore { + constructor( + private getAdminforth: () => IAdminForth, + private options: PluginOptions, + ) {} + + async createNewTurn(sessionId: string, prompt: string, response?: string) { + const turnId = randomUUID(); + const turnRecord = { + [this.options.turnResource.idField]: turnId, + [this.options.turnResource.sessionIdField]: sessionId, + [this.options.turnResource.promptField]: prompt, + [this.options.turnResource.responseField]: response || "not_finished", + }; + const newTurn = await this.getAdminforth().resource(this.options.turnResource.resourceId).create(turnRecord); + return newTurn.createdRecord[this.options.turnResource.idField]; + } + + async getSessionTurns(sessionId: string) { + const turns = await this.getAdminforth().resource(this.options.turnResource.resourceId).list( + [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], + undefined, + undefined, + [Sorts.ASC(this.options.turnResource.createdAtField)] + ); + return turns.map(turn => ({ + prompt: turn[this.options.turnResource.promptField], + response: turn[this.options.turnResource.responseField], + })); + } + + async getPreviousUserMessages(sessionId: string) { + const turns = await this.getAdminforth().resource(this.options.turnResource.resourceId).list( + [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], + 2, + undefined, + [Sorts.DESC(this.options.turnResource.createdAtField)] + ); + return turns + .reverse() + .map((turn): PreviousUserMessage => ({ + text: turn[this.options.turnResource.promptField], + })); + } + + getChatSurfaceSessionId(incoming: ChatSurfaceIncomingMessage) { + return `${incoming.surface}:${incoming.externalConversationId}`; + } + + async getOrCreateChatSurfaceSession( + incoming: ChatSurfaceIncomingMessage, + adminUser: AdminUser, + ) { + const sessionId = this.getChatSurfaceSessionId(incoming); + const sessionResource = this.getAdminforth().resource(this.options.sessionResource.resourceId); + const session = await sessionResource.get( + [Filters.EQ(this.options.sessionResource.idField, sessionId)] + ); + + if (session) { + return sessionId; + } + + await sessionResource.create({ + [this.options.sessionResource.idField]: sessionId, + [this.options.sessionResource.titleField]: incoming.prompt.slice(0, 40) || "New Session", + [this.options.sessionResource.askerIdField]: adminUser.pk, + }); + + return sessionId; + } +} From 8a579909e8220a7ac654240ae6c50a45dcc2a94d Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Wed, 20 May 2026 15:52:35 +0300 Subject: [PATCH 2/5] fix: improve session endpoint error handling and response structure --- endpoints/sessions.ts | 32 +++++++++++++++++++++----------- sessionStore.ts | 2 +- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/endpoints/sessions.ts b/endpoints/sessions.ts index 6f5d317..9a3aadf 100644 --- a/endpoints/sessions.ts +++ b/endpoints/sessions.ts @@ -47,13 +47,10 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt method: 'POST', path: `/agent/get-session-info`, handler: async ({body, adminUser, response }) => { - const parsedBody = sessionIdBodySchema.safeParse(body); - if (!parsedBody.success) { - response.setStatus(422, parsedBody.error.message); - return; - } + const data = ctx.parseBody(sessionIdBodySchema, body, response); + if (!data) return; const userId = adminUser!.pk; - const sessionId = parsedBody.data.sessionId; + const sessionId = data.sessionId; const session = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).get( [Filters.EQ(ctx.options.sessionResource.idField, sessionId)] ); @@ -108,11 +105,11 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt [ctx.options.sessionResource.titleField]: title, [ctx.options.sessionResource.askerIdField]: userId, }; - await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).create(newSession); + const { createdRecord } = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).create(newSession); return { - sessionId: newSession[ctx.options.sessionResource.idField], - title: newSession[ctx.options.sessionResource.titleField], - timestamp: newSession[ctx.options.sessionResource.createdAtField], + sessionId: createdRecord[ctx.options.sessionResource.idField], + title: createdRecord[ctx.options.sessionResource.titleField], + timestamp: createdRecord[ctx.options.sessionResource.createdAtField], messages: [] }; } @@ -155,9 +152,22 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt server.endpoint({ method: 'POST', path: `/agent/add-system-message-to-turns`, - handler: async ({body, response }) => { + handler: async ({body, adminUser, response }) => { const data = ctx.parseBody(addSystemMessageBodySchema, body, response); if (!data) return; + const session = await ctx.adminforth.resource(ctx.options.sessionResource.resourceId).get( + [Filters.EQ(ctx.options.sessionResource.idField, data.sessionId)] + ); + if (!session) { + return { + error: 'Session not found' + }; + } + if (session[ctx.options.sessionResource.askerIdField] !== adminUser!.pk) { + return { + error: 'Unauthorized' + }; + } await ctx.createNewTurn(data.sessionId, data.systemMessage); return { ok: true diff --git a/sessionStore.ts b/sessionStore.ts index c12cf4b..30a8942 100644 --- a/sessionStore.ts +++ b/sessionStore.ts @@ -17,7 +17,7 @@ export class AgentSessionStore { [this.options.turnResource.idField]: turnId, [this.options.turnResource.sessionIdField]: sessionId, [this.options.turnResource.promptField]: prompt, - [this.options.turnResource.responseField]: response || "not_finished", + [this.options.turnResource.responseField]: response ?? "not_finished", }; const newTurn = await this.getAdminforth().resource(this.options.turnResource.resourceId).create(turnRecord); return newTurn.createdRecord[this.options.turnResource.idField]; From 23815ea8cd32a29cb41cc7ef1ebe67c9be96f164 Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 21 May 2026 10:52:39 +0300 Subject: [PATCH 3/5] fix: enhance validation for session limit and optimize error handling logic --- endpoints/sessions.ts | 7 ++----- errors.ts | 8 ++------ 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/endpoints/sessions.ts b/endpoints/sessions.ts index 9a3aadf..32ef896 100644 --- a/endpoints/sessions.ts +++ b/endpoints/sessions.ts @@ -10,7 +10,7 @@ const addSystemMessageBodySchema = z.object({ }).strict(); const getSessionsBodySchema = z.object({ - limit: z.number().optional(), + limit: z.number().int().min(1).max(100).optional(), }).strict(); const sessionIdBodySchema = z.object({ @@ -140,9 +140,7 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt const turns = await ctx.adminforth.resource(ctx.options.turnResource.resourceId).list( [Filters.EQ(ctx.options.turnResource.sessionIdField, sessionId)] ); - for (const turn of turns) { - await ctx.adminforth.resource(ctx.options.turnResource.resourceId).delete(turn[ctx.options.turnResource.idField]); - } + await Promise.all(turns.map(turn => ctx.adminforth.resource(ctx.options.turnResource.resourceId).delete(turn[ctx.options.turnResource.idField]))); return { ok: true }; @@ -168,7 +166,6 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt error: 'Unauthorized' }; } - await ctx.createNewTurn(data.sessionId, data.systemMessage); return { ok: true } diff --git a/errors.ts b/errors.ts index 1b766b6..16f17e8 100644 --- a/errors.ts +++ b/errors.ts @@ -1,12 +1,8 @@ export function isAbortError(error: unknown): boolean { - return ( - error instanceof DOMException && error.name === "AbortError" - ) || ( - typeof error === "object" && + return typeof error === "object" && error !== null && "name" in error && - (error.name === "AbortError" || error.name === "APIUserAbortError") - ); + (error.name === "AbortError" || error.name === "APIUserAbortError"); } export function getErrorMessage(error: unknown): string { From 6a5f6a508c9986e922a3f34ecd31d9ae593bf7ce Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 21 May 2026 11:51:17 +0300 Subject: [PATCH 4/5] fix: add createSystemTurn method to handle system messages in session management --- endpoints/context.ts | 2 ++ endpoints/sessions.ts | 11 ++++++++++- index.ts | 1 + sessionStore.ts | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/endpoints/context.ts b/endpoints/context.ts index 6f11e04..7250bef 100644 --- a/endpoints/context.ts +++ b/endpoints/context.ts @@ -33,6 +33,7 @@ export type AgentEndpointsContext = { runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput): Promise; getSessionTurns(sessionId: string): Promise; createNewTurn(sessionId: string, prompt: string, response?: string): Promise; + createSystemTurn(sessionId: string, systemMessage: string): Promise; getChatSurfaceConnectActionAdapters(): ChatSurfaceAdapterWithConnectAction[]; createChatSurfaceLinkToken(surface: string, adminUser: AdminUser): string; handleChatSurfaceMessage( @@ -50,6 +51,7 @@ export type CoreEndpointsContext = Pick< export type SessionEndpointsContext = Pick< AgentEndpointsContext, "adminforth" | "options" | "parseBody" | "getSessionTurns" | "createNewTurn" + | "createSystemTurn" >; export type ChatSurfaceEndpointsContext = Pick< diff --git a/endpoints/sessions.ts b/endpoints/sessions.ts index 32ef896..35b8e91 100644 --- a/endpoints/sessions.ts +++ b/endpoints/sessions.ts @@ -2,6 +2,7 @@ import type { IHttpServer } from "adminforth"; import { Filters, Sorts } from "adminforth"; import { randomUUID } from "crypto"; import { z } from "zod"; +import { AGENT_SYSTEM_TURN_PROMPT } from "../sessionStore.js"; import type { SessionEndpointsContext } from "./context.js"; const addSystemMessageBodySchema = z.object({ @@ -71,7 +72,14 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt title: session[ctx.options.sessionResource.titleField], timestamp: session[ctx.options.sessionResource.createdAtField], messages: turns.flatMap(turn => { - const messages: Array<{ text: string; role: 'user' | 'assistant' }> = []; + const messages: Array<{ text: string; role: 'user' | 'assistant' | 'system' }> = []; + if (turn.prompt === AGENT_SYSTEM_TURN_PROMPT) { + messages.push({ + text: turn.response, + role: 'system', + }); + return messages; + } if (turn.prompt) { messages.push({ text: turn.prompt, @@ -166,6 +174,7 @@ export function setupSessionEndpoints(ctx: SessionEndpointsContext, server: IHtt error: 'Unauthorized' }; } + await ctx.createSystemTurn(data.sessionId, data.systemMessage); return { ok: true } diff --git a/index.ts b/index.ts index 570a29c..6d1ad9e 100644 --- a/index.ts +++ b/index.ts @@ -162,6 +162,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { runAndPersistAgentResponse: this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), getSessionTurns: this.sessionStore.getSessionTurns.bind(this.sessionStore), createNewTurn: this.sessionStore.createNewTurn.bind(this.sessionStore), + createSystemTurn: this.sessionStore.createSystemTurn.bind(this.sessionStore), getChatSurfaceConnectActionAdapters: this.chatSurfaceService.getConnectActionAdapters.bind(this.chatSurfaceService), createChatSurfaceLinkToken: this.chatSurfaceService.createLinkToken.bind(this.chatSurfaceService), handleChatSurfaceMessage: this.chatSurfaceService.handleMessage.bind(this.chatSurfaceService), diff --git a/sessionStore.ts b/sessionStore.ts index 30a8942..0a288ea 100644 --- a/sessionStore.ts +++ b/sessionStore.ts @@ -5,6 +5,8 @@ import type { ChatSurfaceIncomingMessage } from "adminforth"; import type { PreviousUserMessage } from "./agent/languageDetect.js"; import type { PluginOptions } from "./types.js"; +export const AGENT_SYSTEM_TURN_PROMPT = "__adminforth_system_message__"; + export class AgentSessionStore { constructor( private getAdminforth: () => IAdminForth, @@ -23,6 +25,18 @@ export class AgentSessionStore { return newTurn.createdRecord[this.options.turnResource.idField]; } + async createSystemTurn(sessionId: string, systemMessage: string) { + const turnId = randomUUID(); + const turnRecord = { + [this.options.turnResource.idField]: turnId, + [this.options.turnResource.sessionIdField]: sessionId, + [this.options.turnResource.promptField]: AGENT_SYSTEM_TURN_PROMPT, + [this.options.turnResource.responseField]: systemMessage, + }; + const newTurn = await this.getAdminforth().resource(this.options.turnResource.resourceId).create(turnRecord); + return newTurn.createdRecord[this.options.turnResource.idField]; + } + async getSessionTurns(sessionId: string) { const turns = await this.getAdminforth().resource(this.options.turnResource.resourceId).list( [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], @@ -45,6 +59,7 @@ export class AgentSessionStore { ); return turns .reverse() + .filter((turn) => turn[this.options.turnResource.promptField] !== AGENT_SYSTEM_TURN_PROMPT) .map((turn): PreviousUserMessage => ({ text: turn[this.options.turnResource.promptField], })); From f8f1132b9d8d3d42c11bc9f08b257693a49e9d33 Mon Sep 17 00:00:00 2001 From: Maksym Pipkun Date: Thu, 21 May 2026 11:59:30 +0300 Subject: [PATCH 5/5] refactor: add handleSpeechTurn method to process audio input and integrate with agent response --- agentTurnService.ts | 173 ++++++++++++++++++++++++++++++++++++++++++- endpoints/context.ts | 4 +- endpoints/core.ts | 160 ++------------------------------------- index.ts | 1 + 4 files changed, 184 insertions(+), 154 deletions(-) diff --git a/agentTurnService.ts b/agentTurnService.ts index 0b21060..bf92397 100644 --- a/agentTurnService.ts +++ b/agentTurnService.ts @@ -1,4 +1,4 @@ -import type { AdminUser, IAdminForth } from "adminforth"; +import type { AdminUser, AudioAdapter, IAdminForth } from "adminforth"; import { logger } from "adminforth"; import { randomUUID } from "crypto"; import { HumanMessage, SystemMessage } from "langchain"; @@ -11,6 +11,7 @@ import type { AgentEventEmitter } from "./agentEvents.js"; import { buildAgentTurnSystemPrompt } from "./agent/systemPrompt.js"; import type { CurrentPageContext } from "./agent/tools/getUserLocation.js"; import { isAbortError, getErrorMessage } from "./errors.js"; +import { sanitizeSpeechText } from "./sanitizeSpeechText.js"; import type { AgentSessionStore } from "./sessionStore.js"; import type { PluginOptions } from "./types.js"; @@ -54,6 +55,15 @@ export type HandleTurnInput = Omit & { + audioAdapter: AudioAdapter; + audio: { + buffer: Buffer; + filename: string; + mimeType: string; + }; +}; + type AgentTurnServiceOptions = { getAdminforth: () => IAdminForth; getPluginInstanceId: () => string; @@ -342,6 +352,167 @@ export class AgentTurnService { return agentResponse; } + + async handleSpeechTurn(input: HandleSpeechTurnInput) { + let transcription; + + try { + transcription = await input.audioAdapter.transcribe({ + buffer: input.audio.buffer, + filename: input.audio.filename, + mimeType: input.audio.mimeType, + language: "auto", + abortSignal: input.abortSignal, + }); + } catch (error) { + if (input.abortSignal?.aborted || isAbortError(error)) { + logger.info("Agent speech transcription aborted by the client"); + await input.emit({ type: "finish" }); + return null; + } + + logger.error(`Agent speech transcription failed:\n${getErrorMessage(error)}`); + await input.emit({ + type: "error", + error: "Speech transcription failed. Check server logs for details.", + }); + await input.emit({ type: "finish" }); + return null; + } + + if (input.abortSignal?.aborted) { + await input.emit({ type: "finish" }); + return null; + } + + const prompt = transcription.text; + if (!prompt) { + await input.emit({ + type: "error", + error: "Speech transcription is empty", + }); + await input.emit({ type: "finish" }); + return null; + } + + await input.emit({ + type: "transcript", + text: transcription.text, + language: transcription.language, + }); + + const agentResponse = await this.runAndPersistAgentResponse({ + prompt, + sessionId: input.sessionId, + modeName: input.modeName, + userTimeZone: input.userTimeZone, + currentPage: input.currentPage, + abortSignal: input.abortSignal, + adminUser: input.adminUser, + emit: async (event) => { + if (event.type === "tool-call") { + await input.emit(event); + } + }, + failureLogMessage: input.failureLogMessage ?? "Agent speech response failed", + abortLogMessage: input.abortLogMessage ?? "Agent speech response aborted by the client", + }); + + if (agentResponse.aborted) { + await input.emit({ type: "finish" }); + return agentResponse; + } + + if (agentResponse.failed) { + await input.emit({ + type: "error", + error: agentResponse.text, + }); + await input.emit({ type: "finish" }); + return agentResponse; + } + + try { + await input.emit({ + type: "speech-response", + transcript: { + text: transcription.text, + language: transcription.language, + }, + response: { + text: agentResponse.text, + }, + sessionId: input.sessionId, + turnId: agentResponse.turnId, + }); + const speech = await input.audioAdapter.synthesize({ + text: sanitizeSpeechText(agentResponse.text), + stream: true, + streamFormat: "audio", + format: "pcm", + abortSignal: input.abortSignal, + }); + + await input.emit({ + type: "audio-start", + mimeType: speech.mimeType, + format: speech.format, + sampleRate: 24000, + channelCount: 1, + bitsPerSample: 16, + }); + + const reader = speech.audioStream.getReader(); + const cancelAudioStream = () => { + void reader.cancel().catch(() => undefined); + }; + + try { + input.abortSignal?.addEventListener("abort", cancelAudioStream, { once: true }); + + while (true) { + if (input.abortSignal?.aborted) { + await reader.cancel().catch(() => undefined); + break; + } + + const { value, done } = await reader.read(); + + if (done) { + break; + } + + if (input.abortSignal?.aborted) { + break; + } + + await input.emit({ + type: "audio-delta", + value, + }); + } + } finally { + input.abortSignal?.removeEventListener("abort", cancelAudioStream); + reader.releaseLock(); + } + + await input.emit({ type: "audio-done" }); + await input.emit({ type: "finish" }); + return agentResponse; + } catch (error) { + if (input.abortSignal?.aborted || isAbortError(error)) { + logger.info("Agent speech audio streaming aborted by the client"); + } else { + logger.error(`Agent speech audio streaming failed:\n${getErrorMessage(error)}`); + await input.emit({ + type: "error", + error: getErrorMessage(error), + }); + } + await input.emit({ type: "finish" }); + return agentResponse; + } + } } function getPartialVegaLiteFenceStartLength(text: string): number { diff --git a/endpoints/context.ts b/endpoints/context.ts index 7250bef..83cbba3 100644 --- a/endpoints/context.ts +++ b/endpoints/context.ts @@ -7,6 +7,7 @@ import type { } from "adminforth"; import type { ZodType } from "zod"; import type { + HandleSpeechTurnInput, HandleTurnInput, RunAndPersistAgentResponseInput, RunAndPersistAgentResponseResult, @@ -30,6 +31,7 @@ export type AgentEndpointsContext = { options: PluginOptions; parseBody(schema: ZodType, body: unknown, response: EndpointResponse): T | null; handleTurn(input: HandleTurnInput): Promise; + handleSpeechTurn(input: HandleSpeechTurnInput): Promise; runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput): Promise; getSessionTurns(sessionId: string): Promise; createNewTurn(sessionId: string, prompt: string, response?: string): Promise; @@ -45,7 +47,7 @@ export type AgentEndpointsContext = { export type CoreEndpointsContext = Pick< AgentEndpointsContext, - "options" | "parseBody" | "handleTurn" | "runAndPersistAgentResponse" + "options" | "parseBody" | "handleTurn" | "handleSpeechTurn" >; export type SessionEndpointsContext = Pick< diff --git a/endpoints/core.ts b/endpoints/core.ts index a68f432..882a1d5 100644 --- a/endpoints/core.ts +++ b/endpoints/core.ts @@ -1,8 +1,5 @@ import type { IHttpServer } from "adminforth"; -import { logger } from "adminforth"; import { z } from "zod"; -import { isAbortError, getErrorMessage } from "../errors.js"; -import { sanitizeSpeechText } from "../sanitizeSpeechText.js"; import { createSseEventEmitter } from "../surfaces/web-sse/createSseEventEmitter.js"; import type { CurrentPageContext } from "../agent/tools/getUserLocation.js"; import type { CoreEndpointsContext } from "./context.js"; @@ -93,165 +90,24 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe } const emit = createSseEventEmitter(_raw_express_res); - let transcription; - - try { - transcription = await audioAdapter.transcribe({ + await ctx.handleSpeechTurn({ + audioAdapter, + audio: { buffer: req.file.buffer, filename: req.file.originalname, mimeType: req.file.mimetype, - language: "auto", - abortSignal, - }); - } catch (error) { - if (abortSignal.aborted || isAbortError(error)) { - logger.info("Agent speech transcription aborted by the client"); - await emit({ type: "finish" }); - return null; - } - - logger.error(`Agent speech transcription failed:\n${getErrorMessage(error)}`); - await emit({ - type: "error", - error: "Speech transcription failed. Check server logs for details.", - }); - await emit({ type: "finish" }); - return null; - } - - if (abortSignal.aborted) { - await emit({ type: "finish" }); - return null; - } - - const prompt = transcription.text; - if (!prompt) { - await emit({ - type: "error", - error: "Speech transcription is empty", - }); - await emit({ type: "finish" }); - return null; - } - await emit({ - type: "transcript", - text: transcription.text, - language: transcription.language, - }); - - const sessionId = data.sessionId as string; - const currentPage = data.currentPage; - const agentResponse = await ctx.runAndPersistAgentResponse({ - prompt, - sessionId, + }, + sessionId: data.sessionId, modeName: data.mode, userTimeZone: data.timeZone ?? 'UTC', - currentPage, + currentPage: data.currentPage, abortSignal, adminUser: adminUser!, - emit: async (event) => { - if (event.type === "tool-call") { - await emit(event); - } - }, + emit, failureLogMessage: "Agent speech response failed", abortLogMessage: "Agent speech response aborted by the client", }); - - if (agentResponse.aborted) { - await emit({ type: "finish" }); - return null; - } - - if (agentResponse.failed) { - await emit({ - type: "error", - error: agentResponse.text, - }); - await emit({ type: "finish" }); - return null; - } - - try { - await emit({ - type: "speech-response", - transcript: { - text: transcription.text, - language: transcription.language, - }, - response: { - text: agentResponse.text, - }, - sessionId, - turnId: agentResponse.turnId, - }); - const speech = await audioAdapter.synthesize({ - text: sanitizeSpeechText(agentResponse.text), - stream: true, - streamFormat: "audio", - format: "pcm", - abortSignal, - }); - - await emit({ - type: "audio-start", - mimeType: speech.mimeType, - format: speech.format, - sampleRate: 24000, - channelCount: 1, - bitsPerSample: 16, - }); - - const reader = speech.audioStream.getReader(); - const cancelAudioStream = () => { - void reader.cancel().catch(() => undefined); - }; - - try { - abortSignal.addEventListener("abort", cancelAudioStream, { once: true }); - - while (true) { - if (abortSignal.aborted) { - await reader.cancel().catch(() => undefined); - break; - } - - const { value, done } = await reader.read(); - - if (done) { - break; - } - - if (abortSignal.aborted) { - break; - } - - await emit({ - type: "audio-delta", - value, - }); - } - } finally { - abortSignal.removeEventListener("abort", cancelAudioStream); - reader.releaseLock(); - } - - await emit({ type: "audio-done" }); - await emit({ type: "finish" }); - return null; - } catch (error) { - if (abortSignal.aborted || isAbortError(error)) { - logger.info("Agent speech audio streaming aborted by the client"); - } else { - logger.error(`Agent speech audio streaming failed:\n${error}`); - await emit({ - type: "error", - error: getErrorMessage(error), - }); - } - await emit({ type: "finish" }); - return null; - } + return null; } }); } diff --git a/index.ts b/index.ts index 6d1ad9e..3284d82 100644 --- a/index.ts +++ b/index.ts @@ -159,6 +159,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { options: this.options, parseBody: this.parseBody.bind(this), handleTurn: this.agentTurnService.handleTurn.bind(this.agentTurnService), + handleSpeechTurn: this.agentTurnService.handleSpeechTurn.bind(this.agentTurnService), runAndPersistAgentResponse: this.agentTurnService.runAndPersistAgentResponse.bind(this.agentTurnService), getSessionTurns: this.sessionStore.getSessionTurns.bind(this.sessionStore), createNewTurn: this.sessionStore.createNewTurn.bind(this.sessionStore),