Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 251 additions & 5 deletions docs-builder.sln

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Buffers;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using Elastic.Documentation.Api.Core.AskAi;
using Microsoft.Extensions.Logging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static LlmGatewayRequest CreateFromRequest(AskAiRequest request) =>
new ChatInput("user", AskAiRequest.SystemPrompt),
new ChatInput("user", request.Message)
],
ThreadId: request.ConversationId ?? "elastic-docs-" + Guid.NewGuid()
ThreadId: request.ConversationId ?? Guid.NewGuid().ToString()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using System.IO.Pipelines;
using System.Text.Json;
using Elastic.Documentation.Api.Core.AskAi;
using Microsoft.Extensions.Logging;
Expand All @@ -15,6 +17,36 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
{
protected override string GetAgentId() => LlmGatewayAskAiGateway.ModelName;
protected override string GetAgentProvider() => LlmGatewayAskAiGateway.ProviderName;

/// <summary>
/// Override to emit ConversationStart event when conversationId is null (new conversation)
/// </summary>
protected override async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken)
{
// If conversationId is null, generate a new one and emit ConversationStart event
// This matches the ThreadId format used in LlmGatewayAskAiGateway
var actualConversationId = conversationId;
if (conversationId == null)
{
actualConversationId = Guid.NewGuid().ToString();
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var conversationStartEvent = new AskAiEvent.ConversationStart(
Id: Guid.NewGuid().ToString(),
Timestamp: timestamp,
ConversationId: actualConversationId
);

// Set activity tags for the new conversation
_ = parentActivity?.SetTag("gen_ai.conversation.id", actualConversationId);
Logger.LogDebug("LLM Gateway conversation started: {ConversationId}", actualConversationId);

// Write the ConversationStart event to the stream
await WriteEventAsync(conversationStartEvent, writer, cancellationToken);
}

// Continue with normal stream processing using the actual conversation ID
await base.ProcessStreamAsync(reader, writer, actualConversationId, parentActivity, cancellationToken);
}
protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json)
{
// LLM Gateway format: ["custom", {type: "...", ...}]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Buffers;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Text;

namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;

/// <summary>
/// Represents a parsed Server-Sent Event (SSE)
/// </summary>
/// <param name="EventType">The event type from the "event:" field, or null if not specified</param>
/// <param name="Data">The accumulated data from all "data:" fields</param>
public record SseEvent(string? EventType, string Data);

/// <summary>
/// Parser for Server-Sent Events (SSE) following the W3C SSE specification.
/// </summary>
public static class SseParser
{
/// <summary>
/// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification.
/// This method handles the standard SSE format with event:, data:, and comment lines.
/// </summary>
public static async IAsyncEnumerable<SseEvent> ParseAsync(
PipeReader reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
string? currentEvent = null;
var dataBuilder = new StringBuilder();

while (!cancellationToken.IsCancellationRequested)
{
var result = await reader.ReadAsync(cancellationToken);
var buffer = result.Buffer;

// Process all complete lines in the buffer
while (TryReadLine(ref buffer, out var line))
{
// SSE comment line - skip
if (line.Length > 0 && line[0] == ':')
continue;

// Event type line
if (line.StartsWith("event:", StringComparison.Ordinal))
currentEvent = line[6..].Trim();
// Data line
else if (line.StartsWith("data:", StringComparison.Ordinal))
_ = dataBuilder.Append(line[5..].Trim());
// Empty line - marks end of event
else if (string.IsNullOrEmpty(line))
{
if (dataBuilder.Length <= 0)
continue;
yield return new SseEvent(currentEvent, dataBuilder.ToString());
currentEvent = null;
_ = dataBuilder.Clear();
}
}

// Tell the PipeReader how much of the buffer we consumed
reader.AdvanceTo(buffer.Start, buffer.End);

// Stop reading if there's no more data coming
if (!result.IsCompleted)
continue;

// Yield any remaining event that hasn't been terminated with an empty line
if (dataBuilder.Length > 0)
yield return new SseEvent(currentEvent, dataBuilder.ToString());
break;
}
}

/// <summary>
/// Try to read a single line from the buffer
/// </summary>
private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string line)
{
// Look for a line ending
var position = buffer.PositionOf((byte)'\n');

if (position == null)
{
line = string.Empty;
return false;
}

// Extract the line (excluding the \n)
var lineSlice = buffer.Slice(0, position.Value);
line = Encoding.UTF8.GetString(lineSlice).TrimEnd('\r');

// Skip past the line + \n
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using Elastic.Documentation.Api.Core;
using Elastic.Documentation.Api.Core.AskAi;
using Microsoft.Extensions.Logging;

namespace Elastic.Documentation.Api.Core.AskAi;

/// <summary>
/// Represents a parsed Server-Sent Event (SSE)
/// </summary>
/// <param name="EventType">The event type from the "event:" field, or null if not specified</param>
/// <param name="Data">The accumulated data from all "data:" fields</param>
public record SseEvent(string? EventType, string Data);
namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;

/// <summary>
/// Base class for stream transformers that handles common streaming logic
Expand Down Expand Up @@ -129,15 +122,15 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string
/// Default implementation parses SSE events and JSON, then calls TransformJsonEvent.
/// </summary>
/// <returns>Stream processing result with metrics and captured output</returns>
private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken)
protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? conversationId, Activity? parentActivity, CancellationToken cancellationToken)
{
using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync));

if (parentActivity?.Id != null)
_ = activity?.SetParentId(parentActivity.Id);

List<MessagePart> outputMessageParts = [];
await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken))
await foreach (var sseEvent in SseParser.ParseAsync(reader, cancellationToken))
{
AskAiEvent? transformedEvent;
try
Expand Down Expand Up @@ -255,7 +248,7 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, stri
/// <summary>
/// Write a transformed event to the output stream
/// </summary>
private async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken)
protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writer, CancellationToken cancellationToken)
{
if (transformedEvent == null)
return;
Expand All @@ -277,82 +270,4 @@ private async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter writ
throw; // Re-throw to be handled by caller
}
}

/// <summary>
/// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification.
/// This method handles the standard SSE format with event:, data:, and comment lines.
/// </summary>
private static async IAsyncEnumerable<SseEvent> ParseSseEventsAsync(
PipeReader reader,
[EnumeratorCancellation] Cancel cancellationToken
)
{
string? currentEvent = null;
var dataBuilder = new StringBuilder();

while (!cancellationToken.IsCancellationRequested)
{
var result = await reader.ReadAsync(cancellationToken);
var buffer = result.Buffer;

// Process all complete lines in the buffer
while (TryReadLine(ref buffer, out var line))
{
// SSE comment line - skip
if (line.Length > 0 && line[0] == ':')
continue;

// Event type line
if (line.StartsWith("event:", StringComparison.Ordinal))
currentEvent = line[6..].Trim();
// Data line
else if (line.StartsWith("data:", StringComparison.Ordinal))
_ = dataBuilder.Append(line[5..].Trim());
// Empty line - marks end of event
else if (string.IsNullOrEmpty(line))
{
if (dataBuilder.Length <= 0)
continue;
yield return new SseEvent(currentEvent, dataBuilder.ToString());
currentEvent = null;
_ = dataBuilder.Clear();
}
}

// Tell the PipeReader how much of the buffer we consumed
reader.AdvanceTo(buffer.Start, buffer.End);

// Stop reading if there's no more data coming
if (!result.IsCompleted)
continue;

// Yield any remaining event that hasn't been terminated with an empty line
if (dataBuilder.Length > 0)
yield return new SseEvent(currentEvent, dataBuilder.ToString());
break;
}
}

/// <summary>
/// Try to read a single line from the buffer
/// </summary>
private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string line)
{
// Look for a line ending
var position = buffer.PositionOf((byte)'\n');

if (position == null)
{
line = string.Empty;
return false;
}

// Extract the line (excluding the \n)
var lineSlice = buffer.Slice(0, position.Value);
line = Encoding.UTF8.GetString(lineSlice).TrimEnd('\r');

// Skip past the line + \n
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
}
Loading
Loading