diff --git a/Directory.Packages.props b/Directory.Packages.props index 3dce2b56a..3176770de 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -25,7 +25,7 @@ - + @@ -76,10 +76,10 @@ - - - - + + + + @@ -99,4 +99,4 @@ - \ No newline at end of file + diff --git a/src/Elastic.Documentation.Site/Assets/styles.css b/src/Elastic.Documentation.Site/Assets/styles.css index 298ca1a84..fb3bbf8a6 100644 --- a/src/Elastic.Documentation.Site/Assets/styles.css +++ b/src/Elastic.Documentation.Site/Assets/styles.css @@ -1,5 +1,5 @@ @import 'tailwindcss'; -@config "../tailwind.config.js"; +@config '../tailwind.config.js'; @import './fonts.css'; @import './theme.css'; diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx index 1a2e720d2..ece8fa231 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AiProviderSelector.tsx @@ -1,5 +1,5 @@ /** @jsxImportSource @emotion/react */ -import { useAiProviderStore } from './aiProviderStore' +import { useChatActions, useAiProvider, type AiProvider } from './chat.store' import { EuiRadioGroup } from '@elastic/eui' import type { EuiRadioGroupOption } from '@elastic/eui' import { css } from '@emotion/react' @@ -22,16 +22,15 @@ const options: EuiRadioGroupOption[] = [ ] export const AiProviderSelector = () => { - const { provider, setProvider } = useAiProviderStore() + const provider = useAiProvider() + const { setAiProvider } = useChatActions() return (
- setProvider(id as 'AgentBuilder' | 'LlmGateway') - } + onChange={(id) => setAiProvider(id as AiProvider)} name="aiProvider" legend={{ children: 'AI Provider', diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts index 019cc8ed9..dae62305a 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/AskAiEvent.ts @@ -4,8 +4,8 @@ import * as z from 'zod' // Event type constants for type-safe referencing export const EventTypes = { CONVERSATION_START: 'conversation_start', - CHUNK: 'chunk', - CHUNK_COMPLETE: 'chunk_complete', + MESSAGE_CHUNK: 'message_chunk', + MESSAGE_COMPLETE: 'message_complete', SEARCH_TOOL_CALL: 'search_tool_call', TOOL_CALL: 'tool_call', TOOL_RESULT: 'tool_result', @@ -23,14 +23,14 @@ export const ConversationStartEventSchema = z.object({ }) export const ChunkEventSchema = z.object({ - type: z.literal(EventTypes.CHUNK), + type: z.literal(EventTypes.MESSAGE_CHUNK), id: z.string(), timestamp: z.number(), content: z.string(), }) export const ChunkCompleteEventSchema = z.object({ - type: z.literal(EventTypes.CHUNK_COMPLETE), + type: z.literal(EventTypes.MESSAGE_COMPLETE), id: z.string(), timestamp: z.number(), fullContent: z.string(), diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx index 2383034a8..6171b8f26 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/Chat.test.tsx @@ -9,9 +9,11 @@ jest.mock('./chat.store', () => ({ getState: jest.fn(), }, useChatMessages: jest.fn(() => []), + useAiProvider: jest.fn(() => 'LlmGateway'), useChatActions: jest.fn(() => ({ submitQuestion: jest.fn(), clearChat: jest.fn(), + setAiProvider: jest.fn(), })), })) @@ -94,14 +96,14 @@ describe('Chat Component', () => { id: '1', type: 'user' as const, content: 'What is Elasticsearch?', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), }, { id: '2', type: 'ai' as const, content: 'Elasticsearch is a search engine...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete' as const, }, @@ -245,14 +247,14 @@ describe('Chat Component', () => { id: '1', type: 'user' as const, content: 'Question', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), }, { id: '2', type: 'ai' as const, content: 'Answer...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'streaming' as const, }, diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx index ba7b04b6b..5aa1ba5c3 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.test.tsx @@ -13,7 +13,7 @@ describe('ChatMessage Component', () => { id: '1', type: 'user', content: 'What is Elasticsearch?', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), } @@ -44,7 +44,7 @@ describe('ChatMessage Component', () => { id: '2', type: 'ai', content: 'Elasticsearch is a distributed search engine...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete', } @@ -102,7 +102,7 @@ describe('ChatMessage Component', () => { id: '3', type: 'ai', content: 'Elasticsearch is...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'streaming', } @@ -142,7 +142,7 @@ describe('ChatMessage Component', () => { id: '4', type: 'ai', content: 'Previous content...', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'error', } @@ -176,7 +176,7 @@ describe('ChatMessage Component', () => { id: '5', type: 'ai', content: '# Heading\n\n**Bold text** and *italic*', - threadId: 'thread-1', + conversationId: 'thread-1', timestamp: Date.now(), status: 'complete', } diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx index dc033c3fd..a026415c0 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/ChatMessage.tsx @@ -63,13 +63,6 @@ interface ChatMessageProps { onRetry?: () => void } -const getAccumulatedContent = (messages: AskAiEvent[]) => { - return messages - .filter((m) => m.type === 'chunk') - .map((m) => m.content) - .join('') -} - const splitContentAndReferences = ( content: string ): { mainContent: string; referencesJson: string | null } => { @@ -144,7 +137,7 @@ const computeAiStatus = ( m.type === EventTypes.SEARCH_TOOL_CALL || m.type === EventTypes.TOOL_CALL || m.type === EventTypes.TOOL_RESULT || - m.type === EventTypes.CHUNK + m.type === EventTypes.MESSAGE_CHUNK ) .sort((a, b) => a.timestamp - b.timestamp) @@ -166,9 +159,9 @@ const computeAiStatus = ( case EventTypes.TOOL_RESULT: return STATUS_MESSAGES.ANALYZING - case EventTypes.CHUNK: { + case EventTypes.MESSAGE_CHUNK: { const allContent = events - .filter((m) => m.type === EventTypes.CHUNK) + .filter((m) => m.type === EventTypes.MESSAGE_CHUNK) .map((m) => m.content) .join('') @@ -279,9 +272,9 @@ export const ChatMessage = ({ ) } - const content = - streamingContent || - (events.length > 0 ? getAccumulatedContent(events) : message.content) + // Use streamingContent during streaming, otherwise use message.content from store + // message.content is updated atomically with status when CONVERSATION_END arrives + const content = streamingContent || message.content const hasError = message.status === 'error' || !!error diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx index 1143465f6..e90ad5547 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/StreamingAiMessage.tsx @@ -3,10 +3,9 @@ import { ChatMessage } from './ChatMessage' import { ChatMessage as ChatMessageType, useChatActions, - useThreadId, + useConversationId, } from './chat.store' import { useAskAi } from './useAskAi' -import * as React from 'react' import { useEffect, useRef } from 'react' interface StreamingAiMessageProps { @@ -22,20 +21,20 @@ export const StreamingAiMessage = ({ updateAiMessage, hasMessageBeenSent, markMessageAsSent, - setThreadId, + setConversationId, } = useChatActions() - const threadId = useThreadId() + const conversationId = useConversationId() const contentRef = useRef('') const { events, sendQuestion } = useAskAi({ - threadId: threadId ?? undefined, + conversationId: conversationId ?? undefined, onEvent: (event) => { if (event.type === EventTypes.CONVERSATION_START) { // Capture conversationId from backend on first request - if (event.conversationId && !threadId) { - setThreadId(event.conversationId) + if (event.conversationId && !conversationId) { + setConversationId(event.conversationId) } - } else if (event.type === EventTypes.CHUNK) { + } else if (event.type === EventTypes.MESSAGE_CHUNK) { contentRef.current += event.content } else if (event.type === EventTypes.ERROR) { // Handle error events from the stream @@ -45,10 +44,21 @@ export const StreamingAiMessage = ({ 'error' ) } else if (event.type === EventTypes.CONVERSATION_END) { - updateAiMessage(message.id, contentRef.current, 'complete') + updateAiMessage( + message.id, + message.content || contentRef.current, + 'complete' + ) } }, - onError: () => { + onError: (error) => { + console.error('[AI Provider] Error in StreamingAiMessage:', { + messageId: message.id, + errorMessage: error.message, + errorStack: error.stack, + errorName: error.name, + fullError: error, + }) updateAiMessage( message.id, message.content || 'Error occurred', @@ -78,15 +88,16 @@ export const StreamingAiMessage = ({ markMessageAsSent, ]) + // Always use contentRef.current if it has content (regardless of status) + // This way we don't need to save to message.content and can just use streamingContent + const streamingContentToPass = + isLast && contentRef.current ? contentRef.current : undefined + return ( ) } diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts deleted file mode 100644 index 0b7d4f12c..000000000 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderResolver.ts +++ /dev/null @@ -1,6 +0,0 @@ -/** - * AI Provider selection - user-controlled via UI - * This file is kept for backwards compatibility but now just exports the type - */ - -export type AiProvider = 'AgentBuilder' | 'LlmGateway' diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts deleted file mode 100644 index ef1902b6f..000000000 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/aiProviderStore.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { create } from 'zustand' - -type AiProvider = 'AgentBuilder' | 'LlmGateway' - -interface AiProviderState { - provider: AiProvider - setProvider: (provider: AiProvider) => void -} - -export const useAiProviderStore = create((set) => ({ - provider: 'LlmGateway', // Default to LLM Gateway - setProvider: (provider: AiProvider) => { - console.log(`[AI Provider] Switched to ${provider}`) - set({ provider }) - }, -})) - -export const useAiProvider = () => useAiProviderStore((state) => state.provider) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts index feaded37c..a354c2434 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.test.ts @@ -94,7 +94,7 @@ describe('chat.store', () => { // Verify fresh state expect(chatStore.getState().chatMessages).toHaveLength(0) - expect(chatStore.getState().threadId).toBeNull() + expect(chatStore.getState().conversationId).toBeNull() // Start new conversation act(() => { diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts index 4c4008f06..dcf128117 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/chat.store.ts @@ -1,11 +1,13 @@ import { v4 as uuidv4 } from 'uuid' import { create } from 'zustand/react' +export type AiProvider = 'AgentBuilder' | 'LlmGateway' + export interface ChatMessage { id: string type: 'user' | 'ai' content: string - threadId: string + conversationId: string timestamp: number status?: 'streaming' | 'complete' | 'error' question?: string // For AI messages, store the question @@ -16,7 +18,8 @@ const sentAiMessageIds = new Set() interface ChatState { chatMessages: ChatMessage[] - threadId: string | null + conversationId: string | null + aiProvider: AiProvider actions: { submitQuestion: (question: string) => void updateAiMessage: ( @@ -24,7 +27,8 @@ interface ChatState { content: string, status: ChatMessage['status'] ) => void - setThreadId: (threadId: string) => void + setConversationId: (conversationId: string) => void + setAiProvider: (provider: AiProvider) => void clearChat: () => void hasMessageBeenSent: (id: string) => boolean markMessageAsSent: (id: string) => void @@ -33,7 +37,8 @@ interface ChatState { export const chatStore = create((set) => ({ chatMessages: [], - threadId: null, // Start with null - will be set by backend on first request + conversationId: null, // Start with null - will be set by backend on first request + aiProvider: 'LlmGateway', // Default to LLM Gateway actions: { submitQuestion: (question: string) => { set((state) => { @@ -41,7 +46,7 @@ export const chatStore = create((set) => ({ id: uuidv4(), type: 'user', content: question, - threadId: state.threadId ?? '', + conversationId: state.conversationId ?? '', timestamp: Date.now(), } @@ -50,7 +55,7 @@ export const chatStore = create((set) => ({ type: 'ai', content: '', question, - threadId: state.threadId ?? '', + conversationId: state.conversationId ?? '', timestamp: Date.now(), status: 'streaming', } @@ -77,13 +82,17 @@ export const chatStore = create((set) => ({ })) }, - setThreadId: (threadId: string) => { - set({ threadId }) + setConversationId: (conversationId: string) => { + set({ conversationId }) + }, + + setAiProvider: (provider: AiProvider) => { + set({ aiProvider: provider }) }, clearChat: () => { sentAiMessageIds.clear() - set({ chatMessages: [], threadId: null }) + set({ chatMessages: [], conversationId: null }) }, hasMessageBeenSent: (id: string) => sentAiMessageIds.has(id), @@ -95,5 +104,7 @@ export const chatStore = create((set) => ({ })) export const useChatMessages = () => chatStore((state) => state.chatMessages) -export const useThreadId = () => chatStore((state) => state.threadId) +export const useConversationId = () => + chatStore((state) => state.conversationId) +export const useAiProvider = () => chatStore((state) => state.aiProvider) export const useChatActions = () => chatStore((state) => state.actions) diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts index 340939103..9335d1222 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/AskAi/useAskAi.ts @@ -1,5 +1,5 @@ import { AskAiEvent, AskAiEventSchema } from './AskAiEvent' -import { useAiProvider } from './aiProviderStore' +import { useAiProvider } from './chat.store' import { useFetchEventSource } from './useFetchEventSource' import { useMessageThrottling } from './useMessageThrottling' import { EventSourceMessage } from '@microsoft/fetch-event-source' @@ -11,7 +11,7 @@ const MESSAGE_THROTTLE_MS = 25 // Throttle messages to prevent UI flooding export const AskAiRequestSchema = z.object({ message: z.string(), - threadId: z.string().optional(), + conversationId: z.string().optional(), }) export type AskAiRequest = z.infer @@ -26,7 +26,7 @@ export interface UseAskAiResponse { interface Props { onEvent?: (event: AskAiEvent) => void onError?: (error: Error) => void - threadId?: string + conversationId?: string } export const useAskAi = (props: Props): UseAskAiResponse => { @@ -61,18 +61,49 @@ export const useAskAi = (props: Props): UseAskAiResponse => { const onMessage = useCallback( (sseEvent: EventSourceMessage) => { try { - // Parse and validate the canonical AskAiEvent format + // Parse JSON first const rawData = JSON.parse(sseEvent.data) - const askAiEvent = AskAiEventSchema.parse(rawData) - processMessage(askAiEvent) + // Use safeParse with reportInput to include input data in validation errors + const result = AskAiEventSchema.safeParse(rawData, { + reportInput: true, + }) + + if (!result.success) { + // Log detailed validation errors with input data + console.error('[AI Provider] Failed to parse SSE event:', { + eventId: sseEvent.id || 'unknown', + eventType: sseEvent.event || 'unknown', + rawEventData: sseEvent.data, + validationErrors: result.error.issues, + }) + throw new Error( + `Event validation failed: ${result.error.issues.map((e) => `${e.path.join('.')}: ${e.message}`).join('; ')}` + ) + } + + processMessage(result.data) } catch (error) { + // Handle JSON parsing errors or other unexpected errors + if ( + error instanceof Error && + error.message.includes('Event validation failed') + ) { + // Already logged above, just re-throw + throw error + } + + // Log JSON parsing or other errors console.error('[AI Provider] Failed to parse SSE event:', { - eventData: sseEvent.data, + eventId: sseEvent.id || 'unknown', + eventType: sseEvent.event || 'unknown', + rawEventData: sseEvent.data, error: error instanceof Error ? error.message : String(error), + errorStack: + error instanceof Error ? error.stack : undefined, }) - // Re-throw to trigger onError handler + throw new Error( `Event parsing failed: ${error instanceof Error ? error.message : String(error)}` ) @@ -86,6 +117,12 @@ export const useAskAi = (props: Props): UseAskAiResponse => { headers, onMessage, onError: (error) => { + console.error('[AI Provider] Error in useFetchEventSource:', { + errorMessage: error.message, + errorStack: error.stack, + errorName: error.name, + fullError: error, + }) setError(error) props.onError?.(error) }, @@ -99,7 +136,10 @@ export const useAskAi = (props: Props): UseAskAiResponse => { setEvents([]) clearQueue() lastSentQuestionRef.current = question - const payload = createAskAiRequest(question, props.threadId) + const payload = createAskAiRequest( + question, + props.conversationId + ) try { await sendMessage(payload) @@ -111,7 +151,7 @@ export const useAskAi = (props: Props): UseAskAiResponse => { } } }, - [props.threadId, sendMessage, abort, clearQueue] + [props.conversationId, sendMessage, abort, clearQueue] ) useEffect(() => { diff --git a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx index 56d509d3d..734d31e8a 100644 --- a/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx +++ b/src/Elastic.Documentation.Site/Assets/web-components/SearchOrAskAi/Search/Search.test.tsx @@ -26,6 +26,7 @@ jest.mock('../AskAi/chat.store', () => ({ useChatActions: jest.fn(() => ({ submitQuestion: jest.fn(), clearChat: jest.fn(), + setAiProvider: jest.fn(), })), })) @@ -66,6 +67,7 @@ describe('Search Component', () => { mockUseChatActions.mockReturnValue({ submitQuestion: mockSubmitQuestion, clearChat: mockClearChat, + setAiProvider: jest.fn(), }) mockUseModalActions.mockReturnValue({ setModalMode: mockSetModalMode, diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs index d2eb11b69..2ad3e003c 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiEvent.cs @@ -11,8 +11,8 @@ namespace Elastic.Documentation.Api.Core.AskAi; /// [JsonPolymorphic(TypeDiscriminatorPropertyName = "type")] [JsonDerivedType(typeof(ConversationStart), typeDiscriminator: "conversation_start")] -[JsonDerivedType(typeof(Chunk), typeDiscriminator: "chunk")] -[JsonDerivedType(typeof(ChunkComplete), typeDiscriminator: "chunk_complete")] +[JsonDerivedType(typeof(MessageChunk), typeDiscriminator: "message_chunk")] +[JsonDerivedType(typeof(MessageComplete), typeDiscriminator: "message_complete")] [JsonDerivedType(typeof(SearchToolCall), typeDiscriminator: "search_tool_call")] [JsonDerivedType(typeof(ToolCall), typeDiscriminator: "tool_call")] [JsonDerivedType(typeof(ToolResult), typeDiscriminator: "tool_result")] @@ -33,7 +33,7 @@ string ConversationId /// /// Streaming text chunk from AI /// - public sealed record Chunk( + public sealed record MessageChunk( string Id, long Timestamp, string Content @@ -42,7 +42,7 @@ string Content /// /// Complete message when streaming is done /// - public sealed record ChunkComplete( + public sealed record MessageComplete( string Id, long Timestamp, string FullContent @@ -111,8 +111,8 @@ string Message /// [JsonSerializable(typeof(AskAiEvent))] [JsonSerializable(typeof(AskAiEvent.ConversationStart))] -[JsonSerializable(typeof(AskAiEvent.Chunk))] -[JsonSerializable(typeof(AskAiEvent.ChunkComplete))] +[JsonSerializable(typeof(AskAiEvent.MessageChunk))] +[JsonSerializable(typeof(AskAiEvent.MessageComplete))] [JsonSerializable(typeof(AskAiEvent.SearchToolCall))] [JsonSerializable(typeof(AskAiEvent.ToolCall))] [JsonSerializable(typeof(AskAiEvent.ToolResult))] diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs index 554bb3545..5e7a45697 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using System.Diagnostics; +using System.Text.Json; using Microsoft.Extensions.Logging; namespace Elastic.Documentation.Api.Core.AskAi; @@ -16,46 +17,29 @@ public class AskAiUsecase( public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx) { - // Start activity for the chat request - DO NOT use 'using' because the stream is consumed later - // The activity will be passed to the transformer which will dispose it when the stream completes - var activity = AskAiActivitySource.StartActivity("chat", ActivityKind.Client); - - // Generate a correlation ID for tracking if this is a new conversation - // For first messages (no ThreadId), generate a temporary ID that will be updated when the provider responds - var correlationId = askAiRequest.ThreadId ?? $"temp-{Guid.NewGuid()}"; - - // Set GenAI semantic convention attributes - _ = (activity?.SetTag("gen_ai.operation.name", "chat")); - _ = (activity?.SetTag("gen_ai.conversation.id", correlationId)); // Will be updated when we receive ConversationStart with actual ID - _ = (activity?.SetTag("gen_ai.usage.input_tokens", askAiRequest.Message.Length)); // Approximate token count - - // Custom attributes for tracking our abstraction layer - // We use custom attributes because we don't know the actual GenAI provider (OpenAI, Anthropic, etc.) - // or model (gpt-4, claude, etc.) - those are abstracted by AgentBuilder/LlmGateway - _ = (activity?.SetTag("docs.ai.gateway", streamTransformer.AgentProvider)); // agent-builder or llm-gateway - _ = (activity?.SetTag("docs.ai.agent_name", streamTransformer.AgentId)); // docs-agent or docs_assistant - - // Add GenAI prompt event - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.prompt", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.prompt", askAiRequest.Message), - new KeyValuePair("gen_ai.system_instructions", AskAiRequest.SystemPrompt) - ]))); - - logger.LogDebug("Processing AskAiRequest: {Request}", askAiRequest); - + logger.LogInformation("Starting AskAI chat with {AgentProvider} and {AgentId}", streamTransformer.AgentProvider, streamTransformer.AgentId); + var activity = AskAiActivitySource.StartActivity($"chat", ActivityKind.Client); + _ = activity?.SetTag("gen_ai.operation.name", "chat"); + _ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway + _ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant + if (askAiRequest.ConversationId is not null) + _ = activity?.SetTag("gen_ai.conversation.id", askAiRequest.ConversationId); + var inputMessages = new[] + { + new InputMessage("user", [new MessagePart("text", askAiRequest.Message)]) + }; + var inputMessagesJson = JsonSerializer.Serialize(inputMessages, ApiJsonContext.Default.InputMessageArray); + _ = activity?.SetTag("gen_ai.input.messages", inputMessagesJson); + logger.LogInformation("AskAI input message: {InputMessage}", askAiRequest.Message); + logger.LogInformation("Streaming AskAI response"); var rawStream = await askAiGateway.AskAi(askAiRequest, ctx); - // The stream transformer will handle disposing the activity when streaming completes - var transformedStream = await streamTransformer.TransformAsync(rawStream, activity, ctx); - + var transformedStream = await streamTransformer.TransformAsync(rawStream, askAiRequest.ConversationId, activity, ctx); return transformedStream; } } -public record AskAiRequest(string Message, string? ThreadId) +public record AskAiRequest(string Message, string? ConversationId) { public static string SystemPrompt => """ diff --git a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs index b5b2455bd..56435df6d 100644 --- a/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs @@ -23,8 +23,9 @@ public interface IStreamTransformer /// Transforms a raw SSE stream into a stream of AskAiEvent objects /// /// Raw SSE stream from gateway (Agent Builder, LLM Gateway, etc.) + /// Thread/conversation ID (if known) /// Parent activity to track the streaming operation (will be disposed when stream completes) /// Cancellation token /// Stream containing SSE-formatted AskAiEvent objects - Task TransformAsync(Stream rawStream, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); + Task TransformAsync(Stream rawStream, string? conversationId, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default); } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs b/src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs similarity index 50% rename from src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs rename to src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs index f374f8e54..1a2ca22b7 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs +++ b/src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs @@ -8,10 +8,9 @@ using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; -using Elastic.Documentation.Api.Core.AskAi; using Microsoft.Extensions.Logging; -namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi; +namespace Elastic.Documentation.Api.Core.AskAi; /// /// Represents a parsed Server-Sent Event (SSE) @@ -50,16 +49,8 @@ public abstract class StreamTransformerBase(ILogger logger) : IStreamTransformer /// public string AgentProvider => GetAgentProvider(); - public Task TransformAsync(Stream rawStream, Activity? parentActivity, CancellationToken cancellationToken = default) + public Task TransformAsync(Stream rawStream, string? conversationId, Activity? parentActivity, Cancel cancellationToken = default) { - // Create a child activity for the transformation - DO NOT use 'using' because streaming happens asynchronously - var activity = StreamTransformerActivitySource.StartActivity($"chat {GetAgentId()}", ActivityKind.Client); - _ = (activity?.SetTag("gen_ai.operation.name", "chat")); - - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); - // Configure pipe for low-latency streaming var pipeOptions = new PipeOptions( minimumSegmentSize: 1024, // Smaller segments for faster processing @@ -76,8 +67,8 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C // Start processing task to transform and write events to pipe // Note: We intentionally don't await this task as we need to return the stream immediately // The pipe handles synchronization and backpressure between producer and consumer - // Pass both parent and child activities - they will be disposed when streaming completes - _ = ProcessPipeAsync(reader, pipe.Writer, parentActivity, activity, cancellationToken); + // Pass parent activity - it will be disposed when streaming completes + _ = ProcessPipeAsync(reader, pipe.Writer, conversationId, parentActivity, cancellationToken); // Return the read side of the pipe as a stream return Task.FromResult(pipe.Reader.AsStream()); @@ -87,63 +78,22 @@ public Task TransformAsync(Stream rawStream, Activity? parentActivity, C /// Process the pipe reader and write transformed events to the pipe writer. /// This runs concurrently with the consumer reading from the output stream. /// - private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, Activity? transformActivity, CancellationToken cancellationToken) + private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.pipe"); - _ = (activity?.SetTag("transformer.type", GetType().Name)); - try { try { - await ProcessStreamAsync(reader, writer, parentActivity, cancellationToken); + await ProcessStreamAsync(reader, writer, conversationId, parentActivity, cancellationToken); } catch (OperationCanceledException ex) { - // Cancellation is expected and not an error - log as debug Logger.LogDebug(ex, "Stream processing was cancelled for transformer {TransformerType}", GetType().Name); - _ = (activity?.SetTag("gen_ai.response.error", true)); - _ = (activity?.SetTag("gen_ai.response.error_type", "OperationCanceledException")); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", "OperationCanceledException"), - new KeyValuePair("gen_ai.error.message", "Stream processing was cancelled"), - new KeyValuePair("gen_ai.transformer.type", GetType().Name) - ]))); - - try - { - await writer.CompleteAsync(ex); - await reader.CompleteAsync(ex); - } - catch (Exception completeEx) - { - Logger.LogError(completeEx, "Error completing pipe after cancellation for transformer {TransformerType}", GetType().Name); - } - return; } catch (Exception ex) { Logger.LogError(ex, "Error transforming stream for transformer {TransformerType}. Stream processing will be terminated.", GetType().Name); - _ = (activity?.SetTag("gen_ai.response.error", true)); - _ = (activity?.SetTag("gen_ai.response.error_type", ex.GetType().Name)); - _ = (activity?.SetTag("gen_ai.response.error_message", ex.Message)); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", ex.GetType().Name), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.error.stack_trace", ex.StackTrace ?? "") - ]))); - + _ = parentActivity?.SetTag("error.type", ex.GetType().Name); try { await writer.CompleteAsync(ex); @@ -169,32 +119,27 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi } finally { - // Always dispose activities, regardless of how we exit - transformActivity?.Dispose(); parentActivity?.Dispose(); } } + /// /// Process the raw stream and write transformed events to the pipe writer. /// Default implementation parses SSE events and JSON, then calls TransformJsonEvent. /// - protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken) + /// Stream processing result with metrics and captured output + private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.stream"); + using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync)); - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); - - var eventCount = 0; - var jsonParseErrors = 0; + if (parentActivity?.Id != null) + _ = activity?.SetParentId(parentActivity.Id); + List outputMessageParts = []; await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken)) { - eventCount++; - AskAiEvent? transformedEvent = null; - + AskAiEvent? transformedEvent; try { // Parse JSON once in base class @@ -202,86 +147,125 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr var root = doc.RootElement; // Subclass transforms JsonElement to AskAiEvent - transformedEvent = TransformJsonEvent(sseEvent.EventType, root); + transformedEvent = TransformJsonEvent(conversationId, sseEvent.EventType, root); } catch (JsonException ex) { - jsonParseErrors++; Logger.LogError(ex, "Failed to parse JSON from SSE event for transformer {TransformerType}. EventType: {EventType}, Data: {Data}", GetType().Name, sseEvent.EventType, sseEvent.Data); + throw; + } - // Add error event to activity for JSON parsing failures - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", "JsonException"), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.sse.event_type", sseEvent.EventType ?? "unknown"), - new KeyValuePair("gen_ai.sse.data", sseEvent.Data) - ]))); + if (transformedEvent == null) + { + Logger.LogWarning("Transformed event is null for transformer {TransformerType}. Skipping event. EventType: {EventType}", + GetType().Name, sseEvent.EventType); + continue; } - if (transformedEvent != null) + using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event"); + + // Set event type tag on parse_event activity + _ = parseActivity?.SetTag("ask_ai.event.type", transformedEvent.GetType().Name); + _ = parseActivity?.SetTag("gen_ai.response.id", transformedEvent.Id); + + switch (transformedEvent) { - // Update parent activity with conversation ID when we receive ConversationStart events - if (transformedEvent is AskAiEvent.ConversationStart conversationStart) - { - _ = (parentActivity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId)); - _ = (activity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId)); - } + case AskAiEvent.ConversationStart conversationStart: + { + _ = parentActivity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); + _ = activity?.SetTag("gen_ai.conversation.id", conversationStart.ConversationId); + Logger.LogDebug("AskAI conversation started: {ConversationId}", conversationStart.ConversationId); + break; + } + case AskAiEvent.Reasoning reasoning: + { + Logger.LogDebug("AskAI reasoning: {ReasoningMessage}", reasoning.Message); + outputMessageParts.Add(new MessagePart("reasoning", reasoning.Message ?? string.Empty)); + break; + } + case AskAiEvent.MessageChunk messageChunk: + { + Logger.LogDebug("AskAI message chunk: {ChunkContent}", messageChunk.Content); + // Event type already tagged above + break; + } - await WriteEventAsync(transformedEvent, writer, cancellationToken); + case AskAiEvent.ErrorEvent errorEvent: + { + _ = activity?.SetStatus(ActivityStatusCode.Error, "AI provider error event"); + _ = activity?.SetTag("error.type", "AIProviderError"); + _ = activity?.SetTag("error.message", errorEvent.Message); + _ = parseActivity?.SetStatus(ActivityStatusCode.Error, errorEvent.Message); + Logger.LogError("AskAI error event: {Message}", errorEvent.Message); + break; + } + case AskAiEvent.ToolCall toolCall: + { + // Event type already tagged above + Logger.LogDebug("AskAI tool call: {ToolCall}", toolCall.ToolName); + break; + } + case AskAiEvent.SearchToolCall searchToolCall: + { + _ = parseActivity?.SetTag("search.query", searchToolCall.SearchQuery); + Logger.LogDebug("AskAI search tool call: {SearchQuery}", searchToolCall.SearchQuery); + break; + } + case AskAiEvent.ToolResult toolResult: + { + Logger.LogDebug("AskAI tool result: {ToolResult}", toolResult.Result); + break; + } + case AskAiEvent.MessageComplete messageComplete: + { + outputMessageParts.Add(new MessagePart("text", messageComplete.FullContent)); + Logger.LogInformation("AskAI output message: {OutputMessage}", messageComplete.FullContent); + break; + } + case AskAiEvent.ConversationEnd conversationEnd: + { + Logger.LogDebug("AskAI conversation end: {ConversationId}", conversationEnd.Id); + break; + } } + await WriteEventAsync(transformedEvent, writer, cancellationToken); } - // Set metrics on the activity using GenAI conventions - _ = (activity?.SetTag("gen_ai.response.token_count", eventCount)); - _ = (activity?.SetTag("gen_ai.response.error_count", jsonParseErrors)); + // Set output messages tag once after all events are processed + if (outputMessageParts.Count > 0) + { + var outputMessages = new OutputMessage("assistant", outputMessageParts.ToArray(), "stop"); + var outputMessagesJson = JsonSerializer.Serialize(outputMessages, ApiJsonContext.Default.OutputMessage); + _ = parentActivity?.SetTag("gen_ai.output.messages", outputMessagesJson); + _ = activity?.SetTag("gen_ai.output.messages", outputMessagesJson); + } } /// /// Transform a parsed JSON event into an AskAiEvent. /// Subclasses implement provider-specific transformation logic. /// + /// The conversation/thread ID, if available /// The SSE event type (from "event:" field), or null if not present /// The parsed JSON data from the "data:" field /// The transformed AskAiEvent, or null to skip this event - protected abstract AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json); + protected abstract AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json); /// /// Write a transformed event to the output stream /// - protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken) + private async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken) { if (transformedEvent == null) return; - - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.token"); - - // Custom attributes for tracking our abstraction layer - _ = (activity?.SetTag("docs.ai.gateway", GetAgentProvider())); - _ = (activity?.SetTag("docs.ai.agent_name", GetAgentId())); - _ = (activity?.SetTag("gen_ai.response.token_type", transformedEvent.GetType().Name)); - try { - // Add GenAI completion event for each token/chunk - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.completion", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.completion", JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent)) - ]))); - // Serialize as base AskAiEvent type to include the type discriminator var json = JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent); var sseData = $"data: {json}\n\n"; var bytes = Encoding.UTF8.GetBytes(sseData); - _ = (activity?.SetTag("gen_ai.response.token_size", bytes.Length)); - // Write to pipe and flush immediately for real-time streaming _ = await writer.WriteAsync(bytes, cancellationToken); _ = await writer.FlushAsync(cancellationToken); @@ -290,18 +274,6 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr { Logger.LogError(ex, "Error writing event to stream for transformer {TransformerType}. EventType: {EventType}", GetType().Name, transformedEvent.GetType().Name); - - // Add error event to activity - _ = (activity?.AddEvent(new ActivityEvent("gen_ai.error", - timestamp: DateTimeOffset.UtcNow, - tags: - [ - new KeyValuePair("gen_ai.error.type", ex.GetType().Name), - new KeyValuePair("gen_ai.error.message", ex.Message), - new KeyValuePair("gen_ai.transformer.type", GetType().Name), - new KeyValuePair("gen_ai.event.type", transformedEvent.GetType().Name) - ]))); - throw; // Re-throw to be handled by caller } } @@ -310,26 +282,18 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr /// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification. /// This method handles the standard SSE format with event:, data:, and comment lines. /// - protected async IAsyncEnumerable ParseSseEventsAsync( + private static async IAsyncEnumerable ParseSseEventsAsync( PipeReader reader, - [EnumeratorCancellation] CancellationToken cancellationToken) + [EnumeratorCancellation] Cancel cancellationToken + ) { - using var activity = StreamTransformerActivitySource.StartActivity("gen_ai.agent.parse"); - _ = (activity?.SetTag("gen_ai.agent.name", GetAgentId())); - _ = (activity?.SetTag("gen_ai.provider.name", GetAgentProvider())); - string? currentEvent = null; var dataBuilder = new StringBuilder(); - var eventsParsed = 0; - var readOperations = 0; - var totalBytesRead = 0L; while (!cancellationToken.IsCancellationRequested) { - readOperations++; var result = await reader.ReadAsync(cancellationToken); var buffer = result.Buffer; - totalBytesRead += buffer.Length; // Process all complete lines in the buffer while (TryReadLine(ref buffer, out var line)) @@ -340,24 +304,18 @@ protected async IAsyncEnumerable ParseSseEventsAsync( // Event type line if (line.StartsWith("event:", StringComparison.Ordinal)) - { - currentEvent = line.Substring(6).Trim(); - } + currentEvent = line[6..].Trim(); // Data line else if (line.StartsWith("data:", StringComparison.Ordinal)) - { - _ = dataBuilder.Append(line.Substring(5).Trim()); - } + _ = dataBuilder.Append(line[5..].Trim()); // Empty line - marks end of event else if (string.IsNullOrEmpty(line)) { - if (dataBuilder.Length > 0) - { - eventsParsed++; - yield return new SseEvent(currentEvent, dataBuilder.ToString()); - currentEvent = null; - _ = dataBuilder.Clear(); - } + if (dataBuilder.Length <= 0) + continue; + yield return new SseEvent(currentEvent, dataBuilder.ToString()); + currentEvent = null; + _ = dataBuilder.Clear(); } } @@ -365,22 +323,14 @@ protected async IAsyncEnumerable ParseSseEventsAsync( reader.AdvanceTo(buffer.Start, buffer.End); // Stop reading if there's no more data coming - if (result.IsCompleted) - { - // Yield any remaining event that hasn't been terminated with an empty line - if (dataBuilder.Length > 0) - { - eventsParsed++; - yield return new SseEvent(currentEvent, dataBuilder.ToString()); - } - break; - } - } + if (!result.IsCompleted) + continue; - // Set metrics on the activity using GenAI conventions - _ = (activity?.SetTag("gen_ai.response.token_count", eventsParsed)); - _ = (activity?.SetTag("gen_ai.request.input_size", totalBytesRead)); - _ = (activity?.SetTag("gen_ai.model.operation_count", readOperations)); + // Yield any remaining event that hasn't been terminated with an empty line + if (dataBuilder.Length > 0) + yield return new SseEvent(currentEvent, dataBuilder.ToString()); + break; + } } /// diff --git a/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs b/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs index 9cae0eb22..00e519bdb 100644 --- a/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs +++ b/src/api/Elastic.Documentation.Api.Core/SerializationContext.cs @@ -8,9 +8,21 @@ namespace Elastic.Documentation.Api.Core; +/// +/// Types for OpenTelemetry telemetry serialization (AOT-compatible) +/// +public record MessagePart(string Type, string Content); + +public record InputMessage(string Role, MessagePart[] Parts); + +public record OutputMessage(string Role, MessagePart[] Parts, string FinishReason); [JsonSerializable(typeof(AskAiRequest))] [JsonSerializable(typeof(SearchRequest))] [JsonSerializable(typeof(SearchResponse))] +[JsonSerializable(typeof(InputMessage))] +[JsonSerializable(typeof(OutputMessage))] +[JsonSerializable(typeof(MessagePart))] +[JsonSerializable(typeof(InputMessage[]))] [JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)] public partial class ApiJsonContext : JsonSerializerContext; diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index ab5ce75c5..cdf53b3c9 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -26,14 +26,13 @@ public class AgentBuilderAskAiGateway(HttpClient httpClient, IParameterProvider public const string ProviderName = "agent-builder"; public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) { - // Only include conversation_id if threadId is provided (subsequent requests) var agentBuilderPayload = new AgentBuilderPayload( askAiRequest.Message, "docs-agent", - askAiRequest.ThreadId); + askAiRequest.ConversationId); var requestBody = JsonSerializer.Serialize(agentBuilderPayload, AgentBuilderContext.Default.AgentBuilderPayload); - logger.LogInformation("Sending to Agent Builder with conversation_id: {ConversationId}", askAiRequest.ThreadId ?? "(null - first request)"); + logger.LogInformation("Sending to Agent Builder with conversation_id: {ConversationId}", askAiRequest.ConversationId ?? "(null - first request)"); var kibanaUrl = await parameterProvider.GetParam("docs-kibana-url", false, ctx); var kibanaApiKey = await parameterProvider.GetParam("docs-kibana-apikey", true, ctx); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs index 901b059f8..1b5e3d1a3 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs @@ -18,18 +18,12 @@ public class AgentBuilderStreamTransformer(ILogger AgentBuilderAskAiGateway.ModelName; protected override string GetAgentProvider() => AgentBuilderAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json) { var type = eventType ?? "message"; var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var id = Guid.NewGuid().ToString(); - // Special handling for error events - they may have a different structure - if (type == "error") - { - return ParseErrorEventFromRoot(id, timestamp, json); - } - // Most Agent Builder events have data nested in a "data" property if (!json.TryGetProperty("data", out var innerData)) { @@ -39,14 +33,17 @@ public class AgentBuilderStreamTransformer(ILogger + ParseErrorEvent(id, timestamp, json), + "conversation_id_set" when innerData.TryGetProperty("conversation_id", out var convId) => new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!), "message_chunk" when innerData.TryGetProperty("text_chunk", out var textChunk) => - new AskAiEvent.Chunk(id, timestamp, textChunk.GetString()!), + new AskAiEvent.MessageChunk(id, timestamp, textChunk.GetString()!), "message_complete" when innerData.TryGetProperty("message_content", out var fullContent) => - new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!), + new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!), "reasoning" => // Parse reasoning message if available @@ -76,7 +73,7 @@ public class AgentBuilderStreamTransformer(ILogger new ChatInput("user", AskAiRequest.SystemPrompt), new ChatInput("user", request.Message) ], - ThreadId: request.ThreadId ?? "elastic-docs-" + Guid.NewGuid() + ThreadId: request.ConversationId ?? "elastic-docs-" + Guid.NewGuid() ); } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs index 2a97de8e3..2e150bba5 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs @@ -2,9 +2,6 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -using System.Buffers; -using System.IO.Pipelines; -using System.Text; using System.Text.Json; using Elastic.Documentation.Api.Core.AskAi; using Microsoft.Extensions.Logging; @@ -18,7 +15,7 @@ public class LlmGatewayStreamTransformer(ILogger lo { protected override string GetAgentId() => LlmGatewayAskAiGateway.ModelName; protected override string GetAgentProvider() => LlmGatewayAskAiGateway.ProviderName; - protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json) + protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json) { // LLM Gateway format: ["custom", {type: "...", ...}] if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2) @@ -36,15 +33,11 @@ public class LlmGatewayStreamTransformer(ILogger lo return type switch { - "agent_start" => - // LLM Gateway doesn't provide conversation ID, so generate one - new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()), - "ai_message_chunk" when messageData.TryGetProperty("content", out var content) => - new AskAiEvent.Chunk(id, timestamp, content.GetString()!), + new AskAiEvent.MessageChunk(id, timestamp, content.GetString()!), "ai_message" when messageData.TryGetProperty("content", out var fullContent) => - new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!), + new AskAiEvent.MessageComplete(id, timestamp, fullContent.GetString()!), "tool_call" when messageData.TryGetProperty("toolCalls", out var toolCalls) => TransformToolCall(id, timestamp, toolCalls), @@ -56,6 +49,8 @@ public class LlmGatewayStreamTransformer(ILogger lo "agent_end" => new AskAiEvent.ConversationEnd(id, timestamp), + "error" => ParseErrorEvent(id, timestamp, messageData), + "chat_model_start" or "chat_model_end" => null, // Skip model lifecycle events @@ -110,4 +105,18 @@ public class LlmGatewayStreamTransformer(ILogger lo Logger.LogWarning("Unknown LLM Gateway event type: {Type}", type); return null; } + + private AskAiEvent.ErrorEvent ParseErrorEvent(string id, long timestamp, JsonElement messageData) + { + // LLM Gateway error format: {error: "...", message: "..."} + var errorMessage = messageData.TryGetProperty("message", out var msgProp) + ? msgProp.GetString() + : messageData.TryGetProperty("error", out var errProp) + ? errProp.GetString() + : null; + + Logger.LogError("Error event received from LLM Gateway: {ErrorMessage}", errorMessage ?? "Unknown error"); + + return new AskAiEvent.ErrorEvent(id, timestamp, errorMessage ?? "Unknown error occurred"); + } } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs index 049bdd27c..4d9937871 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerFactory.cs @@ -39,9 +39,9 @@ private IStreamTransformer GetTransformer() public string AgentId => GetTransformer().AgentId; public string AgentProvider => GetTransformer().AgentProvider; - public async Task TransformAsync(Stream rawStream, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default) + public async Task TransformAsync(Stream rawStream, string? conversationId, System.Diagnostics.Activity? parentActivity, Cancel cancellationToken = default) { var transformer = GetTransformer(); - return await transformer.TransformAsync(rawStream, parentActivity, cancellationToken); + return await transformer.TransformAsync(rawStream, conversationId, parentActivity, cancellationToken); } } diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj b/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj index e9b64a3b9..5d1c6c895 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Elastic.Documentation.Api.Infrastructure.csproj @@ -19,6 +19,9 @@ + + + diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs new file mode 100644 index 000000000..9f424bb44 --- /dev/null +++ b/src/api/Elastic.Documentation.Api.Infrastructure/OpenTelemetryExtensions.cs @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.OpenTelemetry; +using Microsoft.Extensions.Hosting; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +namespace Elastic.Documentation.Api.Infrastructure; + +public static class OpenTelemetryExtensions +{ + /// + /// Configures Elastic OpenTelemetry (EDOT) for the Docs API. + /// Only enables if OTEL_EXPORTER_OTLP_ENDPOINT environment variable is set. + /// + /// The web application builder + /// The builder for chaining + public static TBuilder AddDocsApiOpenTelemetry( + this TBuilder builder) + where TBuilder : IHostApplicationBuilder + { + var options = new ElasticOpenTelemetryOptions + { + SkipInstrumentationAssemblyScanning = true // Disable instrumentation assembly scanning for AOT + }; + + _ = builder.AddElasticOpenTelemetry(options, edotBuilder => + { + _ = edotBuilder + .WithLogging() + .WithTracing(tracing => + { + _ = tracing + .AddSource("Elastic.Documentation.Api.AskAi") + .AddSource("Elastic.Documentation.Api.StreamTransformer") + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation(); + }) + .WithMetrics(metrics => + { + _ = metrics + .AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation(); + }); + }); + return builder; + } +} diff --git a/src/api/Elastic.Documentation.Api.Lambda/Program.cs b/src/api/Elastic.Documentation.Api.Lambda/Program.cs index c37e90269..a9f51dbda 100644 --- a/src/api/Elastic.Documentation.Api.Lambda/Program.cs +++ b/src/api/Elastic.Documentation.Api.Lambda/Program.cs @@ -8,49 +8,12 @@ using Elastic.Documentation.Api.Core.AskAi; using Elastic.Documentation.Api.Core.Search; using Elastic.Documentation.Api.Infrastructure; -using Elastic.Documentation.ServiceDefaults; -using OpenTelemetry; -using OpenTelemetry.Metrics; -using OpenTelemetry.Trace; try { - var process = System.Diagnostics.Process.GetCurrentProcess(); - Console.WriteLine($"Starting Lambda application... Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var builder = WebApplication.CreateSlimBuilder(args); - process.Refresh(); - Console.WriteLine($"WebApplication builder created. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - // Add logging configuration for Lambda - _ = builder.Services.AddElasticDocumentationLogging(LogLevel.Information); - process.Refresh(); - Console.WriteLine($"Logging configured. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - - _ = builder.AddElasticOpenTelemetry(edotBuilder => - { - _ = edotBuilder - .WithElasticTracing(tracing => - { - _ = tracing - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation() - .AddSource("Elastic.Documentation.Api.AskAi") - .AddSource("Elastic.Documentation.Api.StreamTransformer"); - }) - .WithElasticLogging() - .WithElasticMetrics(metrics => - { - _ = metrics - .AddAspNetCoreInstrumentation() - .AddHttpClientInstrumentation() - .AddProcessInstrumentation() - .AddRuntimeInstrumentation(); - }); - }); - - process.Refresh(); - Console.WriteLine($"Elastic OTel configured. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); + _ = builder.AddDocsApiOpenTelemetry(); // If we are running in Lambda Web Adapter response_stream mode, configure Kestrel to listen on port 8080 // Otherwise, configure AWS Lambda hosting for API Gateway HTTP API @@ -69,21 +32,11 @@ _ = builder.Services.AddAWSLambdaHosting(LambdaEventSource.HttpApi, new SourceGeneratorLambdaJsonSerializer()); _ = builder.WebHost.UseKestrelHttpsConfiguration(); } - - process.Refresh(); - Console.WriteLine($"Kestrel configured to listen on port 8080. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var environment = Environment.GetEnvironmentVariable("ENVIRONMENT"); - Console.WriteLine($"Environment: {environment}"); + Console.WriteLine($"Docs Environment: {environment}"); builder.Services.AddElasticDocsApiUsecases(environment); - process.Refresh(); - Console.WriteLine($"Elastic docs API use cases added. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var app = builder.Build(); - process.Refresh(); - Console.WriteLine($"Application built successfully. Memory: {process.WorkingSet64 / 1024 / 1024} MB"); - var v1 = app.MapGroup("/docs/_api/v1"); v1.MapElasticDocsApiEndpoints(); Console.WriteLine("API endpoints mapped"); diff --git a/src/tooling/docs-builder/Http/DocumentationWebHost.cs b/src/tooling/docs-builder/Http/DocumentationWebHost.cs index dfc0a9aef..82f36367b 100644 --- a/src/tooling/docs-builder/Http/DocumentationWebHost.cs +++ b/src/tooling/docs-builder/Http/DocumentationWebHost.cs @@ -44,6 +44,7 @@ IConfigurationContext configurationContext _writeFileSystem = writeFs; var builder = WebApplication.CreateSlimBuilder(); _ = builder.AddDocumentationServiceDefaults(); + #if DEBUG builder.Services.AddElasticDocsApiUsecases("dev"); #endif diff --git a/src/tooling/docs-builder/docs-builder.csproj b/src/tooling/docs-builder/docs-builder.csproj index 432973fc4..a153355b2 100644 --- a/src/tooling/docs-builder/docs-builder.csproj +++ b/src/tooling/docs-builder/docs-builder.csproj @@ -23,6 +23,8 @@ + +