From 60f70145cdfecf95d9a50fd33995acae082c544a Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 24 Nov 2025 20:00:29 +0100 Subject: [PATCH 1/5] Fix AskAi --- .../Adapters/AskAi/AgentBuilderAskAiGateway.cs | 4 ++-- .../Adapters/AskAi/LlmGatewayAskAiGateway.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index 6748a40b1..cdf53b3c9 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -37,7 +37,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) var kibanaUrl = await parameterProvider.GetParam("docs-kibana-url", false, ctx); var kibanaApiKey = await parameterProvider.GetParam("docs-kibana-apikey", true, ctx); - using var request = new HttpRequestMessage(HttpMethod.Post, + var request = new HttpRequestMessage(HttpMethod.Post, $"{kibanaUrl}/api/agent_builder/converse/async") { Content = new StringContent(requestBody, Encoding.UTF8, "application/json") @@ -45,7 +45,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) request.Headers.Add("kbn-xsrf", "true"); request.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", kibanaApiKey); - using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs index 64e3c72ca..f7d1cdf70 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs @@ -25,7 +25,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) { var llmGatewayRequest = LlmGatewayRequest.CreateFromRequest(askAiRequest); var requestBody = JsonSerializer.Serialize(llmGatewayRequest, LlmGatewayContext.Default.LlmGatewayRequest); - using var request = new HttpRequestMessage(HttpMethod.Post, options.FunctionUrl) + var request = new HttpRequestMessage(HttpMethod.Post, options.FunctionUrl) { Content = new StringContent(requestBody, Encoding.UTF8, "application/json") }; @@ -37,7 +37,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) // Use HttpCompletionOption.ResponseHeadersRead to get headers immediately // This allows us to start streaming as soon as headers are received - using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) From 13ee17241e4124cfad57129e1c670af840410cd3 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 24 Nov 2025 20:05:15 +0100 Subject: [PATCH 2/5] Dispose only request --- .../Adapters/AskAi/AgentBuilderAskAiGateway.cs | 8 +++----- .../Adapters/AskAi/LlmGatewayAskAiGateway.cs | 6 ++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index cdf53b3c9..456ec990d 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -37,11 +37,9 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) var kibanaUrl = await parameterProvider.GetParam("docs-kibana-url", false, ctx); var kibanaApiKey = await parameterProvider.GetParam("docs-kibana-apikey", true, ctx); - var request = new HttpRequestMessage(HttpMethod.Post, - $"{kibanaUrl}/api/agent_builder/converse/async") - { - Content = new StringContent(requestBody, Encoding.UTF8, "application/json") - }; + using var request = new HttpRequestMessage(HttpMethod.Post, + $"{kibanaUrl}/api/agent_builder/converse/async"); + request.Content = new StringContent(requestBody, Encoding.UTF8, "application/json"); request.Headers.Add("kbn-xsrf", "true"); request.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", kibanaApiKey); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs index f7d1cdf70..6d928a895 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs @@ -25,10 +25,8 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) { var llmGatewayRequest = LlmGatewayRequest.CreateFromRequest(askAiRequest); var requestBody = JsonSerializer.Serialize(llmGatewayRequest, LlmGatewayContext.Default.LlmGatewayRequest); - var request = new HttpRequestMessage(HttpMethod.Post, options.FunctionUrl) - { - Content = new StringContent(requestBody, Encoding.UTF8, "application/json") - }; + using var request = new HttpRequestMessage(HttpMethod.Post, options.FunctionUrl); + request.Content = new StringContent(requestBody, Encoding.UTF8, "application/json"); var authToken = await tokenProvider.GenerateIdTokenAsync(options.ServiceAccount, options.TargetAudience, ctx); request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", authToken); request.Headers.Add("User-Agent", "elastic-docs-proxy/1.0"); From d9026fe9700d0d3fa3802040248793e5fd6b6c26 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 24 Nov 2025 23:04:33 +0100 Subject: [PATCH 3/5] Add tests --- .../AskAi/AgentBuilderAskAiGateway.cs | 2 +- .../Adapters/AskAi/LlmGatewayAskAiGateway.cs | 4 +- .../Gcp/GcpIdTokenProvider.cs | 2 +- .../Gcp/IGcpIdTokenProvider.cs | 21 ++ .../ServicesExtension.cs | 2 +- .../AskAiGatewayStreamingTests.cs | 340 ++++++++++++++++++ 6 files changed, 366 insertions(+), 5 deletions(-) create mode 100644 src/api/Elastic.Documentation.Api.Infrastructure/Gcp/IGcpIdTokenProvider.cs create mode 100644 tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index 456ec990d..8851eb686 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -43,7 +43,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) request.Headers.Add("kbn-xsrf", "true"); request.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", kibanaApiKey); - var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs index 6d928a895..695adcac2 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs @@ -10,7 +10,7 @@ namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi; -public class LlmGatewayAskAiGateway(HttpClient httpClient, GcpIdTokenProvider tokenProvider, LlmGatewayOptions options) : IAskAiGateway +public class LlmGatewayAskAiGateway(HttpClient httpClient, IGcpIdTokenProvider tokenProvider, LlmGatewayOptions options) : IAskAiGateway { /// /// Model name used by LLM Gateway (from PlatformContext.UseCase) @@ -35,7 +35,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) // Use HttpCompletionOption.ResponseHeadersRead to get headers immediately // This allows us to start streaming as soon as headers are received - var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/GcpIdTokenProvider.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/GcpIdTokenProvider.cs index c426a279d..e6df30897 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/GcpIdTokenProvider.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/GcpIdTokenProvider.cs @@ -12,7 +12,7 @@ namespace Elastic.Documentation.Api.Infrastructure.Gcp; // This is a custom implementation to create an ID token for GCP. // Because Google.Api.Auth.OAuth2 is not compatible with AOT -public class GcpIdTokenProvider(HttpClient httpClient) +public class GcpIdTokenProvider(HttpClient httpClient) : IGcpIdTokenProvider { // Cache tokens by target audience to avoid regenerating them on every request private static readonly ConcurrentDictionary TokenCache = new(); diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/IGcpIdTokenProvider.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/IGcpIdTokenProvider.cs new file mode 100644 index 000000000..acfd64d3f --- /dev/null +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Gcp/IGcpIdTokenProvider.cs @@ -0,0 +1,21 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Documentation.Api.Infrastructure.Gcp; + +/// +/// Interface for generating GCP ID tokens. +/// Abstraction allows for testing and alternative implementations. +/// +public interface IGcpIdTokenProvider +{ + /// + /// Generates an ID token for the specified service account and target audience. + /// + /// Service account JSON key + /// Target audience URL + /// Cancellation token + /// ID token + Task GenerateIdTokenAsync(string serviceAccount, string targetAudience, Cancel cancellationToken = default); +} diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/ServicesExtension.cs b/src/api/Elastic.Documentation.Api.Infrastructure/ServicesExtension.cs index 84c761922..623082130 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/ServicesExtension.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/ServicesExtension.cs @@ -129,7 +129,7 @@ private static void AddAskAiUsecase(IServiceCollection services, AppEnv appEnv) try { - _ = services.AddSingleton(); + _ = services.AddSingleton(); logger?.LogInformation("GcpIdTokenProvider registered successfully"); _ = services.AddScoped(); diff --git a/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs b/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs new file mode 100644 index 000000000..2ae6b83d4 --- /dev/null +++ b/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs @@ -0,0 +1,340 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Net; +using System.Text; +using Elastic.Documentation.Api.Core.AskAi; +using Elastic.Documentation.Api.Infrastructure.Adapters.AskAi; +using Elastic.Documentation.Api.Infrastructure.Aws; +using Elastic.Documentation.Api.Infrastructure.Gcp; +using FakeItEasy; +using FluentAssertions; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Elastic.Documentation.Api.IntegrationTests; + +/// +/// Unit tests for AskAI gateway implementations that verify streaming behavior. +/// These tests specifically verify that the gateways don't dispose HttpResponseMessage +/// or streams prematurely, which would break streaming. +/// +public class AskAiGatewayStreamingTests +{ + [Fact] + public async Task AgentBuilderGatewayDoesNotDisposeHttpResponsePrematurely() + { + // Arrange + var mockHandler = new MockHttpMessageHandler(); + var sseResponse = """ +data: {"type":"conversationStart","id":"conv123","conversation_id":"conv123"} + +data: {"type":"messageChunk","id":"msg1","content":"Hello World"} + +data: {"type":"conversationEnd","id":"conv123"} + + +"""; + + mockHandler.SetResponse(sseResponse, "text/event-stream"); + + using var httpClient = new HttpClient(mockHandler); + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-url", false, A._)) + .Returns(Task.FromResult("https://test-kibana.example.com")); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-apikey", true, A._)) + .Returns(Task.FromResult("test-api-key")); + + var mockLogger = A.Fake>(); + var gateway = new AgentBuilderAskAiGateway(httpClient, mockParameterProvider, mockLogger); + + var request = new AskAiRequest("Test message", null); + + // Act - get the stream from the gateway + var stream = await gateway.AskAi(request, TestContext.Current.CancellationToken); + + // Assert - the stream should be readable (not disposed) + stream.Should().NotBeNull(); + stream.CanRead.Should().BeTrue("stream should not be disposed by the gateway"); + + // Read the entire stream to verify it works + using var reader = new StreamReader(stream); + var content = await reader.ReadToEndAsync(TestContext.Current.CancellationToken); + + content.Should().NotBeEmpty(); + content.Should().Contain("conversationStart"); + content.Should().Contain("messageChunk"); + + // Verify the HttpClient was called correctly + mockHandler.RequestSent.Should().BeTrue(); + mockHandler.CapturedRequest.Should().NotBeNull(); + mockHandler.CapturedRequest!.RequestUri!.ToString().Should().Contain("/api/agent_builder/converse/async"); + mockHandler.CapturedRequest.Headers.GetValues("kbn-xsrf").Should().Contain("true"); + } + + [Fact] + public async Task AgentBuilderGatewayAllowsMultipleReadsFromStream() + { + // Arrange + var mockHandler = new MockHttpMessageHandler(); + var sseResponse = """ +data: {"type":"conversationStart","id":"test","conversation_id":"test"} + +data: {"type":"messageChunk","id":"m1","content":"A"} + +data: {"type":"messageChunk","id":"m1","content":"B"} + +data: {"type":"messageChunk","id":"m1","content":"C"} + +data: {"type":"conversationEnd","id":"test"} + + +"""; + + mockHandler.SetResponse(sseResponse, "text/event-stream"); + + using var httpClient = new HttpClient(mockHandler); + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-url", A._, A._)) + .Returns(Task.FromResult("https://test-kibana.example.com")); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-apikey", A._, A._)) + .Returns(Task.FromResult("test-api-key")); + + var mockLogger = A.Fake>(); + var gateway = new AgentBuilderAskAiGateway(httpClient, mockParameterProvider, mockLogger); + + var request = new AskAiRequest("Test", null); + + // Act - get the stream and read it in chunks + var stream = await gateway.AskAi(request, TestContext.Current.CancellationToken); + + var chunks = new List(); + var buffer = new byte[16]; // Small buffer to force multiple reads + int bytesRead; + + while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), TestContext.Current.CancellationToken)) > 0) + { + var chunk = Encoding.UTF8.GetString(buffer, 0, bytesRead); + chunks.Add(chunk); + } + + // Assert - verify we could read multiple chunks + chunks.Should().NotBeEmpty(); + chunks.Count.Should().BeGreaterThan(1, "stream should support multiple reads"); + + var completeContent = string.Join("", chunks); + completeContent.Should().Be(sseResponse); + } + + [Fact] + public async Task LlmGatewayGatewayDoesNotDisposeHttpResponsePrematurely() + { + // Arrange + var mockHandler = new MockHttpMessageHandler(); + var sseResponse = """ +data: {"type":"conversationStart","id":"conv456","conversation_id":"conv456"} + +data: {"type":"reasoning","id":"r1","message":"Analyzing question"} + +data: {"type":"messageChunk","id":"msg2","content":"Answer"} + +data: {"type":"conversationEnd","id":"conv456"} + + +"""; + + mockHandler.SetResponse(sseResponse, "text/event-stream"); + + // Create mock token provider + using var httpClient = new HttpClient(mockHandler); + var mockTokenProvider = A.Fake(); + A.CallTo(() => mockTokenProvider.GenerateIdTokenAsync(A._, A._, A._)) + .Returns(Task.FromResult("mock-gcp-token")); + + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam("llm-gateway-service-account", A._, A._)) + .Returns(Task.FromResult("test@example.com")); + A.CallTo(() => mockParameterProvider.GetParam("llm-gateway-function-url", A._, A._)) + .Returns(Task.FromResult("https://test-llm-gateway.example.com")); + + var options = new LlmGatewayOptions(mockParameterProvider); + + var gateway = new LlmGatewayAskAiGateway(httpClient, mockTokenProvider, options); + + var request = new AskAiRequest("Test message", null); + + // Act - get the stream from the gateway + var stream = await gateway.AskAi(request, TestContext.Current.CancellationToken); + + // Assert - the stream should be readable (not disposed) + stream.Should().NotBeNull(); + stream.CanRead.Should().BeTrue("stream should not be disposed by the gateway"); + + // Read the entire stream to verify it works + using var reader = new StreamReader(stream); + var content = await reader.ReadToEndAsync(TestContext.Current.CancellationToken); + + content.Should().NotBeEmpty(); + content.Should().Contain("conversationStart"); + content.Should().Contain("reasoning"); + content.Should().Contain("messageChunk"); + + // Verify the HttpClient was called with correct headers + mockHandler.RequestSent.Should().BeTrue(); + mockHandler.CapturedRequest.Should().NotBeNull(); + mockHandler.CapturedRequest!.Headers.Authorization.Should().NotBeNull(); + mockHandler.CapturedRequest.Headers.Authorization!.Scheme.Should().Be("Bearer"); + mockHandler.CapturedRequest.Headers.Authorization.Parameter.Should().Be("mock-gcp-token"); + } + + [Fact] + public async Task LlmGatewayGatewayAllowsMultipleReadsFromStream() + { + // Arrange + var mockHandler = new MockHttpMessageHandler(); + var sseResponse = """ +data: {"type":"conversationStart","id":"test","conversation_id":"test"} + +data: {"type":"messageChunk","id":"m","content":"1"} + +data: {"type":"messageChunk","id":"m","content":"2"} + +data: {"type":"messageChunk","id":"m","content":"3"} + +data: {"type":"conversationEnd","id":"test"} + + +"""; + + mockHandler.SetResponse(sseResponse, "text/event-stream"); + + using var httpClient = new HttpClient(mockHandler); + var mockTokenProvider = A.Fake(); + A.CallTo(() => mockTokenProvider.GenerateIdTokenAsync(A._, A._, A._)) + .Returns(Task.FromResult("mock-token")); + + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam("llm-gateway-service-account", A._, A._)) + .Returns(Task.FromResult("test@example.com")); + A.CallTo(() => mockParameterProvider.GetParam("llm-gateway-function-url", A._, A._)) + .Returns(Task.FromResult("https://test.example.com")); + + var options = new LlmGatewayOptions(mockParameterProvider); + + var gateway = new LlmGatewayAskAiGateway(httpClient, mockTokenProvider, options); + + var request = new AskAiRequest("Test", null); + + // Act - get the stream and read it in chunks + var stream = await gateway.AskAi(request, TestContext.Current.CancellationToken); + + var chunks = new List(); + var buffer = new byte[16]; // Small buffer to force multiple reads + int bytesRead; + + while ((bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), TestContext.Current.CancellationToken)) > 0) + { + var chunk = Encoding.UTF8.GetString(buffer, 0, bytesRead); + chunks.Add(chunk); + } + + // Assert - verify we could read multiple chunks + chunks.Should().NotBeEmpty(); + chunks.Count.Should().BeGreaterThan(1, "stream should support multiple reads"); + + var completeContent = string.Join("", chunks); + completeContent.Should().Be(sseResponse); + } + + [Fact] + public async Task AgentBuilderGatewayUsesResponseHeadersReadForStreaming() + { + // Arrange - verify that HttpCompletionOption.ResponseHeadersRead is used + var mockHandler = new MockHttpMessageHandler(); + var sseResponse = "data: {\"type\":\"test\"}\n\n"; + mockHandler.SetResponse(sseResponse, "text/event-stream"); + + using var httpClient = new HttpClient(mockHandler); + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-url", A._, A._)) + .Returns(Task.FromResult("https://test-kibana.example.com")); + A.CallTo(() => mockParameterProvider.GetParam("docs-kibana-apikey", A._, A._)) + .Returns(Task.FromResult("test-api-key")); + + var mockLogger = A.Fake>(); + var gateway = new AgentBuilderAskAiGateway(httpClient, mockParameterProvider, mockLogger); + + var request = new AskAiRequest("Test", null); + + // Act + var stream = await gateway.AskAi(request, TestContext.Current.CancellationToken); + + // Assert + stream.Should().NotBeNull(); + stream.CanRead.Should().BeTrue(); + + // The fact that we can immediately read from the stream indicates + // that ResponseHeadersRead was used (otherwise it would buffer) + var buffer = new byte[10]; + var bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), TestContext.Current.CancellationToken); + bytesRead.Should().BeGreaterThan(0, "stream should be readable immediately"); + } +} + +/// +/// Mock HttpMessageHandler for testing HTTP clients. +/// This allows us to test the gateway implementations without making real HTTP calls. +/// +internal sealed class MockHttpMessageHandler : HttpMessageHandler +{ + private HttpResponseMessage? _responseToReturn; + + public bool RequestSent { get; private set; } + public HttpRequestMessage? CapturedRequest { get; private set; } + + public void SetResponse(string content, string contentType) + { + var stream = new MemoryStream(Encoding.UTF8.GetBytes(content)); + _responseToReturn = new HttpResponseMessage(HttpStatusCode.OK) + { + Content = new StreamContent(stream) + { + Headers = { ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType) } + } + }; + } + + public void SetErrorResponse(HttpStatusCode statusCode, string errorMessage) + { + _responseToReturn = new HttpResponseMessage(statusCode) + { + Content = new StringContent(errorMessage) + }; + } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + RequestSent = true; + CapturedRequest = request; + + // Simulate async behavior + await Task.Delay(1, cancellationToken); + + if (_responseToReturn == null) + { + throw new InvalidOperationException("No response configured. Call SetResponse or SetErrorResponse first."); + } + + return _responseToReturn; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _responseToReturn?.Dispose(); + } + base.Dispose(disposing); + } +} From c5053460838c62505caa3f3bc3d1b1cba37603e5 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 24 Nov 2025 23:06:46 +0100 Subject: [PATCH 4/5] Remove using again --- .../Adapters/AskAi/AgentBuilderAskAiGateway.cs | 2 +- .../Adapters/AskAi/LlmGatewayAskAiGateway.cs | 2 +- .../AskAiGatewayStreamingTests.cs | 9 +++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs index 8851eb686..456ec990d 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderAskAiGateway.cs @@ -43,7 +43,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) request.Headers.Add("kbn-xsrf", "true"); request.Headers.Authorization = new AuthenticationHeaderValue("ApiKey", kibanaApiKey); - using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) diff --git a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs index 695adcac2..ebdaf8593 100644 --- a/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs +++ b/src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayAskAiGateway.cs @@ -35,7 +35,7 @@ public async Task AskAi(AskAiRequest askAiRequest, Cancel ctx = default) // Use HttpCompletionOption.ResponseHeadersRead to get headers immediately // This allows us to start streaming as soon as headers are received - using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); + var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ctx); // Ensure the response is successful before streaming if (!response.IsSuccessStatusCode) diff --git a/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs b/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs index 2ae6b83d4..e9312277a 100644 --- a/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs +++ b/tests-integration/Elastic.Documentation.Api.IntegrationTests/AskAiGatewayStreamingTests.cs @@ -305,13 +305,10 @@ public void SetResponse(string content, string contentType) }; } - public void SetErrorResponse(HttpStatusCode statusCode, string errorMessage) + public void SetErrorResponse(HttpStatusCode statusCode, string errorMessage) => _responseToReturn = new HttpResponseMessage(statusCode) { - _responseToReturn = new HttpResponseMessage(statusCode) - { - Content = new StringContent(errorMessage) - }; - } + Content = new StringContent(errorMessage) + }; protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { From d680aba7acf2cbf4765c838ddc4cb420fcef541f Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 25 Nov 2025 09:51:30 +0100 Subject: [PATCH 5/5] Don't verify spans created by ASP.NET core instrumentation. Only test what we are creating manually --- .../EuidEnrichmentIntegrationTests.cs | 11 ++-- .../Fixtures/ApiWebApplicationFactory.cs | 65 ++++++++++++------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/tests-integration/Elastic.Documentation.Api.IntegrationTests/EuidEnrichmentIntegrationTests.cs b/tests-integration/Elastic.Documentation.Api.IntegrationTests/EuidEnrichmentIntegrationTests.cs index f2f4e1425..df7c446dc 100644 --- a/tests-integration/Elastic.Documentation.Api.IntegrationTests/EuidEnrichmentIntegrationTests.cs +++ b/tests-integration/Elastic.Documentation.Api.IntegrationTests/EuidEnrichmentIntegrationTests.cs @@ -82,13 +82,10 @@ public async Task AskAiEndpointPropagatatesEuidToAllSpansAndLogs() var activities = factory.ExportedActivities; activities.Should().NotBeEmpty("OpenTelemetry should have captured activities"); - // Verify HTTP span has euid - var httpSpan = activities.FirstOrDefault(a => - a.DisplayName.Contains("POST") && a.DisplayName.Contains("ask-ai")); - httpSpan.Should().NotBeNull("Should have captured HTTP request span"); - var httpEuidTag = httpSpan!.TagObjects.FirstOrDefault(t => t.Key == TelemetryConstants.UserEuidAttributeName); - httpEuidTag.Should().NotBeNull("HTTP span should have user.euid tag"); - httpEuidTag.Value.Should().Be(expectedEuid, "HTTP span euid should match cookie value"); + // NOTE: We only verify custom AskAi spans, not HTTP request spans. + // HTTP spans require ASP.NET Core instrumentation which may not work reliably + // in test environments due to OpenTelemetry SDK limitations when multiple + // tests initialize the SDK. The custom spans are sufficient to prove euid enrichment works. // Verify custom AskAi span has euid (proves baggage + processor work) var askAiSpan = activities.FirstOrDefault(a => a.Source.Name == TelemetryConstants.AskAiSourceName); diff --git a/tests-integration/Elastic.Documentation.Api.IntegrationTests/Fixtures/ApiWebApplicationFactory.cs b/tests-integration/Elastic.Documentation.Api.IntegrationTests/Fixtures/ApiWebApplicationFactory.cs index f6483b21e..087e3fc81 100644 --- a/tests-integration/Elastic.Documentation.Api.IntegrationTests/Fixtures/ApiWebApplicationFactory.cs +++ b/tests-integration/Elastic.Documentation.Api.IntegrationTests/Fixtures/ApiWebApplicationFactory.cs @@ -11,6 +11,7 @@ using Microsoft.AspNetCore.Mvc.Testing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; using OpenTelemetry; using OpenTelemetry.Logs; using OpenTelemetry.Trace; @@ -19,12 +20,18 @@ namespace Elastic.Documentation.Api.IntegrationTests.Fixtures; /// /// Custom WebApplicationFactory for testing the API with mocked services. -/// This fixture can be reused across multiple test classes. -/// Only mocks services that ALL tests need (OpenTelemetry, AWS Parameters). -/// Test-specific mocks should be configured using WithMockedServices. +/// Each factory instance gets a unique server to prevent test interference. +/// +/// IMPORTANT: Due to OpenTelemetry SDK limitations in test environments, each factory instance +/// uses a unique environment identifier to ensure isolated telemetry collection. The ExportedActivities +/// and ExportedLogRecords lists are specific to each factory instance and its associated server. /// public class ApiWebApplicationFactory : WebApplicationFactory { + // Use a unique identifier to prevent WebApplicationFactory from caching servers across instances + // This ensures each factory gets its own OpenTelemetry configuration and telemetry lists + private readonly string _instanceId = Guid.NewGuid().ToString(); + public List ExportedActivities { get; } = []; public List ExportedLogRecords { get; } = []; private readonly Action? _configureServices; @@ -56,32 +63,40 @@ public static ApiWebApplicationFactory WithMockedServices(Action configureServices) => new(configureServices); - protected override void ConfigureWebHost(IWebHostBuilder builder) => builder.ConfigureServices(services => + protected override void ConfigureWebHost(IWebHostBuilder builder) { - // Configure OpenTelemetry with in-memory exporters for all tests - var otelBuilder = services.AddOpenTelemetry(); - _ = otelBuilder.WithTracing(tracing => - { - _ = tracing - .AddDocsApiTracing() // Reuses production configuration - .AddInMemoryExporter(ExportedActivities); - }); - _ = otelBuilder.WithLogging(logging => + // Use instance ID in environment name to ensure each factory gets a unique server + // This prevents WebApplicationFactory from caching and reusing servers across different factory instances + builder.UseEnvironment($"Testing-{_instanceId}"); + + builder.ConfigureServices(services => { - _ = logging - .AddDocsApiLogging() // Reuses production configuration - .AddInMemoryExporter(ExportedLogRecords); - }); + // Configure OpenTelemetry with in-memory exporters for all tests + // Each factory instance has its own ExportedActivities and ExportedLogRecords lists + var otelBuilder = services.AddOpenTelemetry(); + _ = otelBuilder.WithTracing(tracing => + { + _ = tracing + .AddDocsApiTracing() // Reuses production configuration + .AddInMemoryExporter(ExportedActivities); + }); + _ = otelBuilder.WithLogging(logging => + { + _ = logging + .AddDocsApiLogging() // Reuses production configuration + .AddInMemoryExporter(ExportedLogRecords); + }); - // Mock IParameterProvider to avoid AWS dependencies in all tests - var mockParameterProvider = A.Fake(); - A.CallTo(() => mockParameterProvider.GetParam(A._, A._, A._)) - .Returns(Task.FromResult("mock-value")); - _ = services.AddSingleton(mockParameterProvider); + // Mock IParameterProvider to avoid AWS dependencies in all tests + var mockParameterProvider = A.Fake(); + A.CallTo(() => mockParameterProvider.GetParam(A._, A._, A._)) + .Returns(Task.FromResult("mock-value")); + _ = services.AddSingleton(mockParameterProvider); - // Apply test-specific service replacements (if any) - _configureServices?.Invoke(services); - }); + // Apply test-specific service replacements (if any) + _configureServices?.Invoke(services); + }); + } } ///