Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ dotnet_diagnostic.IDE0059.severity = suggestion

dotnet_diagnostic.CA1859.severity = none

# https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca1727
dotnet_diagnostic.CA1727.severity = none

dotnet_diagnostic.IDE0305.severity = none

# https://github.com/dotnet/roslyn/issues/60784
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task<Stream> AskAi(AskAiRequest askAiRequest, Cancel ctx)
};
var inputMessagesJson = JsonSerializer.Serialize(inputMessages, ApiJsonContext.Default.InputMessageArray);
_ = activity?.SetTag("gen_ai.input.messages", inputMessagesJson);
logger.LogInformation("AskAI input message: {InputMessage}", askAiRequest.Message);
logger.LogInformation("AskAI input message: {ask_ai.input.message}", askAiRequest.Message);
logger.LogInformation("Streaming AskAI response");
var rawStream = await askAiGateway.AskAi(askAiRequest, ctx);
// The stream transformer will handle disposing the activity when streaming completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public record OutputMessage(string Role, MessagePart[] Parts, string FinishReaso
[JsonSerializable(typeof(SearchRequest))]
[JsonSerializable(typeof(SearchResponse))]
[JsonSerializable(typeof(InputMessage))]
[JsonSerializable(typeof(OutputMessage))]
[JsonSerializable(typeof(OutputMessage[]))]
[JsonSerializable(typeof(MessagePart))]
[JsonSerializable(typeof(InputMessage[]))]
[JsonSourceGenerationOptions(PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer
{
protected override string GetAgentId() => AgentBuilderAskAiGateway.ModelName;
protected override string GetAgentProvider() => AgentBuilderAskAiGateway.ProviderName;
protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json)
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
{
var type = eventType ?? "message";
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var id = Guid.NewGuid().ToString();

// Handle error events first - they have a different structure (no "data" wrapper)
if (type == "error")
{
return ParseErrorEvent(id, timestamp, json);
}

// Most Agent Builder events have data nested in a "data" property
if (!json.TryGetProperty("data", out var innerData))
{
Expand All @@ -30,8 +36,6 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer

return type switch
{
"error" =>
ParseErrorEvent(id, timestamp, json),

"conversation_id_set" when innerData.TryGetProperty("conversation_id", out var convId) =>
new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected override async Task ProcessStreamAsync(PipeReader reader, PipeWriter w
// Continue with normal stream processing using the actual conversation ID
await base.ProcessStreamAsync(reader, writer, actualConversationId, parentActivity, cancellationToken);
}
protected override AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json)
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
{
// LLM Gateway format: ["custom", {type: "...", ...}]
if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json.Serialization;

namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;

Expand Down Expand Up @@ -99,3 +100,7 @@ private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string li
return true;
}
}


[JsonSerializable(typeof(SseEvent))]
internal sealed partial class SseSerializerContext : JsonSerializerContext;
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
var root = doc.RootElement;

// Subclass transforms JsonElement to AskAiEvent
transformedEvent = TransformJsonEvent(conversationId, sseEvent.EventType, root);
transformedEvent = TransformJsonEvent(sseEvent.EventType, root);
}
catch (JsonException ex)
{
Expand All @@ -153,6 +153,7 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
{
Logger.LogWarning("Transformed event is null for transformer {TransformerType}. Skipping event. EventType: {EventType}",
GetType().Name, sseEvent.EventType);
Logger.LogWarning("Original event: {event}", JsonSerializer.Serialize(sseEvent, SseSerializerContext.Default.SseEvent));
continue;
}

Expand Down Expand Up @@ -213,7 +214,7 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
case AskAiEvent.MessageComplete messageComplete:
{
outputMessageParts.Add(new MessagePart("text", messageComplete.FullContent));
Logger.LogInformation("AskAI output message: {OutputMessage}", messageComplete.FullContent);
Logger.LogInformation("AskAI output message: {ask_ai.output.message}", messageComplete.FullContent);
break;
}
case AskAiEvent.ConversationEnd conversationEnd:
Expand All @@ -228,22 +229,21 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
// Set output messages tag once after all events are processed
if (outputMessageParts.Count > 0)
{
var outputMessages = new OutputMessage("assistant", outputMessageParts.ToArray(), "stop");
var outputMessagesJson = JsonSerializer.Serialize(outputMessages, ApiJsonContext.Default.OutputMessage);
var outputMessage = new OutputMessage("assistant", outputMessageParts.ToArray(), "stop");
var outputMessages = new[] { outputMessage };
var outputMessagesJson = JsonSerializer.Serialize(outputMessages, ApiJsonContext.Default.OutputMessageArray);
_ = parentActivity?.SetTag("gen_ai.output.messages", outputMessagesJson);
_ = activity?.SetTag("gen_ai.output.messages", outputMessagesJson);
}
}

/// <summary>
/// Transform a parsed JSON event into an AskAiEvent.
/// Subclasses implement provider-specific transformation logic.
/// </summary>
/// <param name="conversationId">The conversation/thread ID, if available</param>
/// <param name="eventType">The SSE event type (from "event:" field), or null if not present</param>
/// <param name="json">The parsed JSON data from the "data:" field</param>
/// <returns>The transformed AskAiEvent, or null to skip this event</returns>
protected abstract AskAiEvent? TransformJsonEvent(string? conversationId, string? eventType, JsonElement json);
protected abstract AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json);

/// <summary>
/// Write a transformed event to the output stream
Expand Down
Loading