diff --git a/AGENTS.md b/AGENTS.md index 09991b10e..a19060aa7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -82,7 +82,7 @@ Use existing providers as reference. **Steps:** 1. Create `tracing/{provider}/` module, register it in `settings.gradle.kts` -2. Extend `LLMTracingAdapter(genAISystem)` — override `getRequestBodyAttributes`, `getResponseBodyAttributes`, `getSpanName`, `isStreamingRequest`, `handleStreaming` +2. Extend `LLMTracingAdapter(genAISystem)` — override `getRequestBodyAttributes`, `getResponseBodyAttributes`, `getSpanName`, and abstract `registerResponseStreamEvent` 3. If multiple distinct API endpoints exist, implement `EndpointApiHandler` per endpoint and delegate from the adapter 4. Add a public `instrument(client)` function — use `patchOpenAICompatibleClient()` for OpenAI-compatible SDKs, or reflection + `patchInterceptors()` for others (see `GeminiClient.kt`) 5. Write tests extending `BaseAITracingTest`, tag with `@Tag("{provider}")` diff --git a/examples/src/main/kotlin/org/jetbrains/ai/tracy/examples/clients/OkHttpClientAutotracingExample.kt b/examples/src/main/kotlin/org/jetbrains/ai/tracy/examples/clients/OkHttpClientAutotracingExample.kt index 4a01f430e..2672d6f9d 100644 --- a/examples/src/main/kotlin/org/jetbrains/ai/tracy/examples/clients/OkHttpClientAutotracingExample.kt +++ b/examples/src/main/kotlin/org/jetbrains/ai/tracy/examples/clients/OkHttpClientAutotracingExample.kt @@ -6,11 +6,11 @@ package org.jetbrains.ai.tracy.examples.clients import org.jetbrains.ai.tracy.anthropic.adapters.AnthropicLLMTracingAdapter -import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor +import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor import org.jetbrains.ai.tracy.core.TracingManager import org.jetbrains.ai.tracy.core.configureOpenTelemetrySdk import org.jetbrains.ai.tracy.core.exporters.ConsoleExporterConfig -import org.jetbrains.ai.tracy.core.instrument +import org.jetbrains.ai.tracy.core.interceptors.instrument import org.jetbrains.ai.tracy.gemini.adapters.GeminiLLMTracingAdapter import org.jetbrains.ai.tracy.openai.adapters.OpenAILLMTracingAdapter import com.openai.models.ChatModel diff --git a/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/adapters/AnthropicLLMTracingAdapter.kt b/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/adapters/AnthropicLLMTracingAdapter.kt index 609c11325..03de29876 100644 --- a/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/adapters/AnthropicLLMTracingAdapter.kt +++ b/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/adapters/AnthropicLLMTracingAdapter.kt @@ -5,9 +5,16 @@ package org.jetbrains.ai.tracy.anthropic.adapters +import io.opentelemetry.api.trace.Span +import io.opentelemetry.sdk.trace.ReadableSpan +import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* +import kotlinx.serialization.json.* +import mu.KotlinLogging import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.PayloadType +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingUnsupported import org.jetbrains.ai.tracy.core.adapters.media.* +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrl @@ -16,10 +23,6 @@ import org.jetbrains.ai.tracy.core.policy.ContentKind import org.jetbrains.ai.tracy.core.policy.contentTracingAllowed import org.jetbrains.ai.tracy.core.policy.orRedactedInput import org.jetbrains.ai.tracy.core.policy.orRedactedOutput -import io.opentelemetry.api.trace.Span -import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* -import kotlinx.serialization.json.* -import mu.KotlinLogging /** * Tracing adapter for Anthropic Claude API. @@ -52,8 +55,8 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn val body = request.body.asJson()?.jsonObject ?: return body["temperature"]?.jsonPrimitive?.doubleOrNull?.let { span.setAttribute(GEN_AI_REQUEST_TEMPERATURE, it) } - body["model"]?.jsonPrimitive?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it.content) } - body["max_tokens"]?.jsonPrimitive?.intOrNull?.let { span.setAttribute(GEN_AI_REQUEST_MAX_TOKENS, it.toLong()) } + body["model"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it) } + body["max_tokens"]?.jsonPrimitive?.longOrNull?.let { span.setAttribute(GEN_AI_REQUEST_MAX_TOKENS, it) } // metadata body["metadata"]?.jsonObject?.let { metadata -> @@ -127,10 +130,28 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn val body = response.body.asJson()?.jsonObject ?: return body["id"]?.let { span.setAttribute(GEN_AI_RESPONSE_ID, it.jsonPrimitive.content) } - body["type"]?.let { span.setAttribute(GEN_AI_OUTPUT_TYPE, it.jsonPrimitive.content) } + body["type"]?.jsonPrimitive?.content?.let { + span.setAttribute(GEN_AI_OUTPUT_TYPE, it) + span.setAttribute(GEN_AI_OPERATION_NAME, it) + } body["role"]?.let { span.setAttribute("gen_ai.response.role", it.jsonPrimitive.content) } body["model"]?.let { span.setAttribute(GEN_AI_RESPONSE_MODEL, it.jsonPrimitive.content) } + // update the span name to follow GenAI Anthropic Conventions + // convention: `{gen_ai.operation.name} {gen_ai.request.model}` + // see: https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/#spans + val spanName = run { + val type = body["type"]?.jsonPrimitive?.contentOrNull + val model = (span as? ReadableSpan) + ?.attributes + ?.get(GEN_AI_REQUEST_MODEL) + ?: body["model"] + if (type != null && model != null) "$type $model" else null + } + if (spanName != null) { + span.updateName(spanName) + } + // collecting response messages body["content"]?.let { for ((index, message) in it.jsonArray.withIndex()) { @@ -204,11 +225,26 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn span.populateUnmappedAttributes(body, mappedAttributes, PayloadType.RESPONSE) } - override fun getSpanName(request: TracyHttpRequest) = "Anthropic-generation" - - // streaming is not supported - override fun isStreamingRequest(request: TracyHttpRequest) = false - override fun handleStreaming(span: Span, url: TracyHttpUrl, events: String) = Unit + /** + * Sets a default span name to **"Anthropic-generation"**. + * + * This name will be overridden in [getResponseBodyAttributes] to follow GenAI Conventions for Anthropic: + * ``` + * {gen_ai.operation.name} {gen_ai.request.model} + * ``` + * + * See [GenAI Anthropic Spans](https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/#spans) + */ + override fun getSpanName() = "Anthropic-generation" + + override fun registerResponseStreamEvent( + span: Span, + url: TracyHttpUrl, + event: SseEvent, + index: Long + ): Result { + return sseHandlingUnsupported() + } /** * Parses content of the `messages` field when its type is diff --git a/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/clients/AnthropicAIClient.kt b/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/clients/AnthropicAIClient.kt index 4d5f6938e..5ac0a8e16 100644 --- a/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/clients/AnthropicAIClient.kt +++ b/tracing/anthropic/src/jvmMain/kotlin/org/jetbrains/ai/tracy/anthropic/clients/AnthropicAIClient.kt @@ -6,9 +6,9 @@ package org.jetbrains.ai.tracy.anthropic.clients import org.jetbrains.ai.tracy.anthropic.adapters.AnthropicLLMTracingAdapter -import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor +import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor import org.jetbrains.ai.tracy.core.TracingManager -import org.jetbrains.ai.tracy.core.patchOpenAICompatibleClient +import org.jetbrains.ai.tracy.core.interceptors.patchOpenAICompatibleClient import com.anthropic.client.AnthropicClient /** diff --git a/tracing/anthropic/src/jvmTest/kotlin/org/jetbrains/ai/tracy/anthropic/AnthropicTracingTest.kt b/tracing/anthropic/src/jvmTest/kotlin/org/jetbrains/ai/tracy/anthropic/AnthropicTracingTest.kt index 3d20c8ebe..9178ee7ed 100644 --- a/tracing/anthropic/src/jvmTest/kotlin/org/jetbrains/ai/tracy/anthropic/AnthropicTracingTest.kt +++ b/tracing/anthropic/src/jvmTest/kotlin/org/jetbrains/ai/tracy/anthropic/AnthropicTracingTest.kt @@ -7,7 +7,7 @@ package org.jetbrains.ai.tracy.anthropic import org.jetbrains.ai.tracy.anthropic.clients.instrument import org.jetbrains.ai.tracy.core.TracingManager -import org.jetbrains.ai.tracy.core.patchOpenAICompatibleClient +import org.jetbrains.ai.tracy.core.interceptors.patchOpenAICompatibleClient import org.jetbrains.ai.tracy.core.policy.ContentCapturePolicy import com.anthropic.core.JsonString import com.anthropic.core.JsonValue diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt index a4f381c6e..668a8ad19 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/LLMTracingAdapter.kt @@ -5,15 +5,19 @@ package org.jetbrains.ai.tracy.core.adapters +import io.opentelemetry.api.common.AttributeKey import org.jetbrains.ai.tracy.core.http.protocol.* import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.sdk.trace.ReadableSpan import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_SYSTEM import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.boolean import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive +import mu.KotlinLogging +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.SseEventHandlingUnsupported +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent +import java.util.concurrent.atomic.AtomicBoolean /** @@ -28,15 +32,17 @@ import kotlinx.serialization.json.jsonPrimitive * Extend this class to create a provider-specific adapter: * ```kotlin * class AnthropicLLMTracingAdapter : LLMTracingAdapter(GenAiSystemIncubatingValues.ANTHROPIC) { - * override fun getRequestBodyAttributes(span: Span, request: Request) { + * override fun getSpanName() = "Anthropic-generation" + * + * override fun getRequestBodyAttributes(span: Span, request: TracyHttpRequest) { * // Parse Anthropic-specific request format * } - * override fun getResponseBodyAttributes(span: Span, response: Response) { + * override fun getResponseBodyAttributes(span: Span, response: TracyHttpResponse) { * // Parse Anthropic-specific response format * } - * override fun getSpanName(request: Request) = "Anthropic-generation" - * override fun isStreamingRequest(request: Request) = false - * override fun handleStreaming(span: Span, url: Url, events: String) = Unit + * override fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent, index: Long): Result { + * // Parse `event.data` JSON event (if successful, return `Result.success()`) + * } * } * ``` * @@ -48,8 +54,19 @@ import kotlinx.serialization.json.jsonPrimitive * @param genAISystem The name of the GenAI system (e.g., "openai", "anthropic", "gemini") */ abstract class LLMTracingAdapter(private val genAISystem: String) { + /** + * Some adapters do NOT support SSE (_Server-Sent Events_) handling, even if + * SSE is supported by the LLM provider API. In this case, we print a single + * warning per trace, not to pollute the logs with repeated warnings. + * + * This flag is used to track whether a warning has already been printed for + * the current trace. + */ + private var sseHandlingUnsupportedWarningPrinted = AtomicBoolean(false) + fun registerRequest(span: Span, request: TracyHttpRequest): Unit = runCatching { - span.updateName(getSpanName(request)) + // new request -> new trace, so the warning can be printed once + sseHandlingUnsupportedWarningPrinted.set(false) // Pre-allocate in case the span reaches the limit span.setAttribute(DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY, 0L) @@ -66,30 +83,32 @@ abstract class LLMTracingAdapter(private val genAISystem: String) { fun registerResponse(span: Span, response: TracyHttpResponse): Unit = runCatching { - val body = response.body.asJson()?.jsonObject ?: return - val isStreamingRequest = body["stream"]?.jsonPrimitive?.boolean == true - val mimeType = response.contentType?.mimeType - - if (mimeType != null) { - when { - isStreamingRequest && mimeType == TracyContentType.Text.EventStream.mimeType -> { - span.setAttribute("gen_ai.response.streaming", true) - span.setAttribute("gen_ai.completion.content.type", response.contentType?.asString()) - } - mimeType != TracyContentType.Text.EventStream.mimeType -> { - // mime type can be application/json, video/mp4 (for OpenAI Video API), etc. - getResponseBodyAttributes(span, response) - } - else -> { - span.setAttribute("gen_ai.completion.content.type", response.contentType?.asString()) - } + val contentType = response.contentType + // install the content type of the response body + contentType?.asString()?.let { + span.setAttribute("gen_ai.completion.content.type", it) + } + + // set response body attributes only for non-stream content types; + // stream events are handled by `registerResponseStreamEvent` + val mimeType = contentType?.mimeType + when { + mimeType != null && mimeType != TracyContentType.Text.EventStream.mimeType -> { + // register any non-SSE stream response types (application/json, video/mp4, etc.) + getResponseBodyAttributes(span, response) + } + mimeType == TracyContentType.Text.EventStream.mimeType -> { + span.setAttribute("tracy.response.sse.streaming", true) + } + else -> { + logger.debug { "Unsupported content type in LLMTracingAdapter: ${contentType?.asString()}" } } } span.setAttribute("http.status_code", response.code.toLong()) if (response.isError()) { - getResponseErrorBodyAttributes(span, response.body) + getResponseErrorBodyAttributes(span, response) span.setStatus(StatusCode.ERROR) } else { span.setStatus(StatusCode.OK) @@ -108,8 +127,14 @@ abstract class LLMTracingAdapter(private val genAISystem: String) { span.recordException(exception) } - protected open fun getResponseErrorBodyAttributes(span: Span, body: TracyHttpResponseBody) { - body.asJson()?.jsonObject["error"]?.jsonObject?.let { error -> + protected open fun getResponseErrorBodyAttributes(span: Span, response: TracyHttpResponse) { + // parse only `application/json` responses + if (response.contentType?.mimeType != TracyContentType.Application.Json.mimeType) { + return + } + val body = response.body.asJson()?.jsonObject ?: return + + body["error"]?.jsonObject?.let { error -> error["message"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.message", it.content) } error["type"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.type", it.content) } error["param"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.param", it.content) } @@ -120,12 +145,59 @@ abstract class LLMTracingAdapter(private val genAISystem: String) { protected abstract fun getRequestBodyAttributes(span: Span, request: TracyHttpRequest) protected abstract fun getResponseBodyAttributes(span: Span, response: TracyHttpResponse) - abstract fun getSpanName(request: TracyHttpRequest): String - abstract fun isStreamingRequest(request: TracyHttpRequest): Boolean - abstract fun handleStreaming(span: Span, url: TracyHttpUrl, events: String) + abstract fun getSpanName(): String + + /** + * Registers a server-sent events (SSE) response event in the given [span]. + * + * @param span The [Span] instance in which the response event is registered. + * @param url The [TracyHttpUrl] object representing the URL associated with this SSE event. + * @param event The [SseEvent] to be registered. It represents a single event from the SSE stream. + */ + fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent) { + // factory method workflow: + // 1. extract the index of the current event from span (0 when missing) + // 2. delegate assigning to the implementation: + // - when assigned successfully, increment the index and store in span + val nextEventIndex: Long = (span as? ReadableSpan)?.attributes?.get(STREAM_EVENTS_COUNT_KEY) ?: 0L + + val result = registerResponseStreamEvent(span, url, event, index = nextEventIndex) + if (result.isSuccess) { + // event was successfully assigned into span + span.setAttribute(STREAM_EVENTS_COUNT_KEY, nextEventIndex + 1) + } else if (result.isFailure) { + val exception = result.exceptionOrNull() + when { + // print unsupported warning only once per trace + exception is SseEventHandlingUnsupported && !sseHandlingUnsupportedWarningPrinted.getAndSet(true) -> { + logger.warn { "SSE event handling unsupported for ${url.asString()}" } + } + else -> logger.warn { "Failed to assign SSE event to span: $exception" } + } + } + } + + /** + * Attempts to register a single SSE response event on the given [span]. + * + * Implementations must: + * - return [Result.success] when the event has been successfully assigned to the span + * (for example, attributes or other data derived from [event] have been recorded) + * so that it can be counted towards the stream event index; + * - return [Result.failure] when the event cannot or should not be assigned + * (for example, due to parsing/validation errors or unsupported event type), + * optionally carrying a descriptive exception. + * + * A failure result prevents the caller from incrementing the stored SSE event index, + * and the contained exception (if any) will be logged (see `registerResponseStreamEvent(Span, TracyHttpUrl, SseEvent)`). + */ + protected abstract fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent, index: Long): Result companion object { - private const val DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY = "otel.dropped_attributes_count" + private val DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY = AttributeKey.longKey("otel.dropped_attributes_count") + private val STREAM_EVENTS_COUNT_KEY = AttributeKey.longKey("tracy.response.sse.events.count") + + private val logger = KotlinLogging.logger {} /** * Adds unmapped payload attributes from a JSON body to the given [Span]. diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/EndpointApiHandler.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/EndpointApiHandler.kt index 7f1822abf..b2bb86162 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/EndpointApiHandler.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/EndpointApiHandler.kt @@ -8,6 +8,7 @@ package org.jetbrains.ai.tracy.core.adapters.handlers import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import io.opentelemetry.api.trace.Span +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent /** * Interface for endpoint API handlers used within adapters @@ -15,5 +16,11 @@ import io.opentelemetry.api.trace.Span interface EndpointApiHandler { fun handleRequestAttributes(span: Span, request: TracyHttpRequest) fun handleResponseAttributes(span: Span, response: TracyHttpResponse) - fun handleStreaming(span: Span, events: String) + /** + * Returns success if the event was handled successfully, or an error otherwise. + * + * @see org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure + * @see org.jetbrains.ai.tracy.core.adapters.handlers.sse.SseEventHandlingException + */ + fun handleStreamingEvent(span: Span, event: SseEvent, index: Long): Result } diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/sse/SseHandling.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/sse/SseHandling.kt new file mode 100644 index 000000000..87b98cf1e --- /dev/null +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/adapters/handlers/sse/SseHandling.kt @@ -0,0 +1,31 @@ +/* + * Copyright © 2026 JetBrains s.r.o. and contributors. + * Use of this source code is governed by the Apache 2.0 license. + */ + +package org.jetbrains.ai.tracy.core.adapters.handlers.sse + +/** + * Wraps [SseEventHandlingException] into a [Result] indicating failure. + * Should be used when handling of SSE events failed. + * + * @see org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler.handleStreamingEvent + * @see sseHandlingUnsupported + */ +fun sseHandlingFailure(message: String): Result { + return Result.failure(SseEventHandlingException(message)) +} + +/** + * Wraps [SseEventHandlingUnsupported] into a [Result] indicating + * that SSE handling is not supported in this call site. + * + * @see sseHandlingFailure + */ +fun sseHandlingUnsupported(): Result { + return Result.failure(SseEventHandlingUnsupported()) +} + +class SseEventHandlingException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) + +class SseEventHandlingUnsupported : RuntimeException("SSE event handling is not supported yet") diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt new file mode 100644 index 000000000..f0bc2ed54 --- /dev/null +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParser.kt @@ -0,0 +1,176 @@ +/* + * Copyright © 2026 JetBrains s.r.o. and contributors. + * Use of this source code is governed by the Apache 2.0 license. + */ + +package org.jetbrains.ai.tracy.core.http.parsers + +import java.io.Closeable + +/** + * A **stateful, non-thread-safe** parser of Server-Sent Events (SSE) compliant with the SSE specification. + * + * Parses server-sent events present in the input text and yields them as a structured [SseEvent]. + * + * See: [SSE Specification | Event Stream Interpretation](https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation) + * + * @param onEvent A callback invoked for each parsed event. + * @see SseEvent + * @see UTF8Decoder + */ +class SseParser(private val onEvent: (SseEvent) -> Unit) : Closeable { + // state of an event being parsed + private val lineBuffer = StringBuilder() + private val dataBuffer = StringBuilder() + private var eventType = "" + private var lastEventId = "" + private var retryValue: Long? = null + + /** + * **The [utf8Input] is expected to be already decoded with UTF-8** + * (see the note in [spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation)). + * + * @param utf8Input The UTF-8 encoded input text; it may contain either partial event of multiple events; + * the [onEvent] callback will be invoked for each fully parsed event. + * + * @see UTF8Decoder + */ + fun feed(utf8Input: String) { + var i = 0 + while (i < utf8Input.length) { + // lines may be split by any of: + // 1. \r\n (CRLF): `U+000D` CARRIAGE RETURN, `U+000A` LINE FEED + // 2. \n (LF): `U+000A` LINE FEED + // 3. \r (CR): `U+000D` CARRIAGE RETURN + when (val ch = utf8Input[i]) { + // CR + '\r' -> { + processLine(lineBuffer.toString()) + lineBuffer.clear() + // CRLF: skip the next character + if (i + 1 < utf8Input.length && utf8Input[i + 1] == '\n') { + i++ + } + } + // LF + '\n' -> { + processLine(lineBuffer.toString()) + lineBuffer.clear() + } + else -> lineBuffer.append(ch) + } + i++ + } + } + + /** + * + * @param line An entry line of an event. The line **does NOT** end with CRLF/LF/CR; it only contains the event-related information. + */ + private fun processLine(line: String) { + // empty line -> dispatch + if (line.isEmpty()) { + dispatchEvent() + return + } + // starts with colon (a comment line) -> ignore + if (line.startsWith(':')) { + return + } + + val colonIdx = line.indexOf(':') + val field: String + val value: String + + if (colonIdx == -1) { + // no colon -> use the whole line as field name, and empty string as field value + field = line + value = "" + } else { + // content before colon -> field name + field = line.substring(0, colonIdx) + // content after colon -> field value + val start = if (colonIdx + 1 < line.length && line[colonIdx + 1] == ' ') { + // ignore the very first space after colon + colonIdx + 2 + } else { + colonIdx + 1 + } + value = line.substring(start) + } + + when (field) { + "data" -> { + // Steps: + // 1. Append field value to data buffer + // 2. Append a single U+000A LINE FEED (LF) character to data buffer + // Note: when dispatching, the trailing LF is removed; here, we simply don't add it right after + // the value but rather append it to the previous value, if any. + if (dataBuffer.isNotEmpty()) { + dataBuffer.append('\n') + } + dataBuffer.append(value) + } + "event" -> { + // set the event type buffer to the field value + eventType = value + } + "id" -> { + // field value doesn't contain U+0000 NULL -> set last event ID buffer to the field value + if ('\u0000' !in value) { + lastEventId = value + } + } + "retry" -> { + // contains only ASCII digits -> interpret the field value as int in base ten + if (value.isNotEmpty() && value.all { it in '0'..'9' }) { + retryValue = value.toLongOrNull() + } + } + } + } + + private fun dispatchEvent() { + if (dataBuffer.isEmpty()) { + eventType = "" + retryValue = null + return + } + + onEvent(SseEvent( + data = dataBuffer.toString(), + event = eventType.ifEmpty { "message" }, + id = lastEventId, + retry = retryValue, + )) + + dataBuffer.clear() + eventType = "" + retryValue = null + // `lastEventId` persists across events per spec + } + + override fun close() { + // flush any pending line as if the stream ended with a newline + if (lineBuffer.isNotEmpty()) { + processLine(lineBuffer.toString()) + lineBuffer.clear() + } + // if there is any buffered event data, dispatch a final event + if (dataBuffer.isNotEmpty()) { + dispatchEvent() + } + } +} + +/** + * Represents a single event in a Server-Sent Events stream. + * + * @see SseParser + */ +data class SseEvent( + val event: String = "", + val id: String = "", + val data: String, + val retry: Long? = null, +) diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8Decoder.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8Decoder.kt new file mode 100644 index 000000000..865df1408 --- /dev/null +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8Decoder.kt @@ -0,0 +1,81 @@ +/* + * Copyright © 2026 JetBrains s.r.o. and contributors. + * Use of this source code is governed by the Apache 2.0 license. + */ + +package org.jetbrains.ai.tracy.core.http.parsers + +import java.nio.ByteBuffer +import java.nio.CharBuffer +import java.nio.charset.CharsetDecoder + +/** + * Stateful UTF-8 decoder that accepts raw chunks of bytes and converts them to valid UTF-8 sequences. + * + * This decoder is expected to be used together with [SseParser], + * as the latter expects valid UTF-8 input. + * + * @see SseParser + */ +class UTF8Decoder { + // Use a stateful UTF-8 decoder so that multibyte sequences split across + // read boundaries are reassembled correctly instead of producing replacement chars. + private val utf8Decoder = Charsets.UTF_8.newDecoder() + // An extra 3 bytes of capacity hold at most one incomplete multi-byte sequence + // (a 4-byte UTF-8 code-point can leave up to 3 bytes undecoded when only + // the first 1–3 bytes arrive in a chunk). + private val byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE + 3) + // Each UTF-8 code unit is at least 1 byte, so the char count cannot exceed the + // byte count. In the worst case (all ASCII), byteBuffer holds DEFAULT_BUFFER_SIZE + 3 + // bytes, so charBuffer needs the same capacity. + private val charBuffer = CharBuffer.allocate(DEFAULT_BUFFER_SIZE + 3) + + /** + * + * @param buffer The raw bytes to decode. + * @param bytesRead The number of bytes read from the buffer. + * @param endOfInput `true` if, and only if, the invoker can provide + * no additional input bytes beyond those in the given buffer + * (see `endOfInput` description in [CharsetDecoder.decode]) + */ + fun decode( + buffer: ByteArray, + bytesRead: Int, + endOfInput: Boolean, + ): String { + if (bytesRead < 0) { + return "" + } + + byteBuffer.put(buffer, 0, bytesRead) + byteBuffer.flip() + charBuffer.clear() + + utf8Decoder.decode(byteBuffer, charBuffer, endOfInput) + // move any undecoded partial-sequence bytes to the start of the buffer + byteBuffer.compact() + charBuffer.flip() + + return if (charBuffer.hasRemaining()) { + charBuffer.toString() + } else { + "" + } + } + + /** + * Flushes any remaining state in the decoder and returns the final decoded characters. + * Should be called after the last call to [decode] with `endOfInput = true`. + */ + fun flush(): String { + charBuffer.clear() + utf8Decoder.flush(charBuffer) + charBuffer.flip() + + return if (charBuffer.hasRemaining()) { + charBuffer.toString() + } else { + "" + } + } +} diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpResponse.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpResponse.kt index 3bf7030c3..9b830e7fd 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpResponse.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpResponse.kt @@ -43,11 +43,14 @@ interface TracyHttpResponse { @InternalTracyApi sealed class TracyHttpResponseBody { data class Json(val json: JsonElement) : TracyHttpResponseBody() + object EventStream : TracyHttpResponseBody() + object Empty : TracyHttpResponseBody() } @InternalTracyApi fun TracyHttpResponseBody.asJson(): JsonElement? { return when (this) { is TracyHttpResponseBody.Json -> this.json + else -> null } } diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpUrl.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpUrl.kt index ee4472c64..9f4d26101 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpUrl.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/http/protocol/TracyHttpUrl.kt @@ -25,6 +25,8 @@ interface TracyHttpUrl { val host: String val pathSegments: List val parameters: TracyQueryParameters + + fun asString(): String } @InternalTracyApi @@ -54,7 +56,10 @@ data class TracyHttpUrlImpl( override val host: String, override val pathSegments: List, override val parameters: TracyQueryParameters, -) : TracyHttpUrl + private val url: String, +) : TracyHttpUrl { + override fun asString(): String = url +} /** * Converts an instance of [HttpUrl] into a [TracyHttpUrl] object by extracting its @@ -65,7 +70,6 @@ data class TracyHttpUrlImpl( @InternalTracyApi fun HttpUrl.toProtocolUrl(): TracyHttpUrl { val httpUrl = this - val params = object : TracyQueryParameters { override fun queryParameter(name: String) = httpUrl.queryParameter(name) override fun queryParameterValues(name: String) = httpUrl.queryParameterValues(name) @@ -76,5 +80,6 @@ fun HttpUrl.toProtocolUrl(): TracyHttpUrl { host = httpUrl.host, pathSegments = httpUrl.pathSegments, parameters = params, + url = httpUrl.toString() ) } diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/Interceptors.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/Interceptors.kt similarity index 95% rename from tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/Interceptors.kt rename to tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/Interceptors.kt index cc0d42f83..00aa067a1 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/Interceptors.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/Interceptors.kt @@ -3,7 +3,7 @@ * Use of this source code is governed by the Apache 2.0 license. */ -package org.jetbrains.ai.tracy.core +package org.jetbrains.ai.tracy.core.interceptors import okhttp3.Interceptor diff --git a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/OpenTelemetryOkHttpInterceptor.kt b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt similarity index 67% rename from tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/OpenTelemetryOkHttpInterceptor.kt rename to tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt index 13367e20e..327a04904 100644 --- a/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/OpenTelemetryOkHttpInterceptor.kt +++ b/tracing/core/src/jvmMain/kotlin/org/jetbrains/ai/tracy/core/interceptors/OpenTelemetryOkHttpInterceptor.kt @@ -3,22 +3,21 @@ * Use of this source code is governed by the Apache 2.0 license. */ -package org.jetbrains.ai.tracy.core +package org.jetbrains.ai.tracy.core.interceptors import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.StatusCode import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive import okhttp3.Interceptor import okhttp3.MediaType import okhttp3.OkHttpClient import okhttp3.RequestBody.Companion.toRequestBody -import okio.Buffer -import okio.BufferedSource -import okio.ForwardingSource -import okio.buffer +import okio.* +import org.jetbrains.ai.tracy.core.TracingManager import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter +import org.jetbrains.ai.tracy.core.http.parsers.SseParser +import org.jetbrains.ai.tracy.core.http.parsers.UTF8Decoder import org.jetbrains.ai.tracy.core.http.protocol.* import okhttp3.Request as OkHttpRequest import okhttp3.Response as OkHttpResponse @@ -116,15 +115,15 @@ import okhttp3.ResponseBody as OkHttpResponseBody * will not result in duplicate interceptors. * - Tracing can be controlled globally via `TracingManager.isTracingEnabled`. * - **The original client is not modified; a new client instance with instrumentation is returned**. - * - Content capture policies [TracingManager.contentCapturePolicy] can be configured to redact sensitive data. + * - Content capture policies [org.jetbrains.ai.tracy.core.TracingManager.contentCapturePolicy] can be configured to redact sensitive data. * - Error responses are automatically captured with error status and messages. * * @param client The OkHttp client to instrument * @param adapter The [LLMTracingAdapter] specifying which LLM provider adapter to use for tracing * @return **A new [OkHttpClient] instance** with OpenTelemetry tracing enabled (i.e., the initial [client] remains **unmodified**) * - * @see TracingManager - * @see TracingManager.traceSensitiveContent + * @see org.jetbrains.ai.tracy.core.TracingManager + * @see org.jetbrains.ai.tracy.core.TracingManager.traceSensitiveContent */ fun instrument(client: OkHttpClient, adapter: LLMTracingAdapter): OkHttpClient { val clientBuilder = client.newBuilder() @@ -212,8 +211,9 @@ class OpenTelemetryOkHttpInterceptor( val tracer = TracingManager.tracer - val span = tracer.spanBuilder("").startSpan() - var isStreamingRequest = false + val span = tracer.spanBuilder(adapter.getSpanName()).startSpan() + // whether the response content type is `text/event-stream` + var isStreamingResponse = false span.makeCurrent().use { _ -> try { @@ -237,100 +237,66 @@ class OpenTelemetryOkHttpInterceptor( ) } - isStreamingRequest = adapter.isStreamingRequest(tracyRequest) adapter.registerRequest(span, tracyRequest) // register response val response = chain.proceed(request) + adapter.registerResponse(span, response = response.asResponseView()) - return if (isStreamingRequest) { - val streamingMarker = JsonObject(mapOf("stream" to JsonPrimitive(true))) - val url = request.url.toProtocolUrl() - adapter.registerResponse(span, response = response.asResponseView(streamingMarker)) - - wrapStreamingResponse(response, url, span) - } else { - // if the content type is `application/json`, we decode a response body; - // otherwise (e.g., when the body is binary), we pass an empty JSON object as the response body. - val contentType = response.body?.contentType() - val mimeType = if (contentType != null) "${contentType.type}/${contentType.subtype}" else null - val responseBody = when (mimeType?.lowercase()) { - "application/json" -> try { - val peekedBody = response.peekBody(Long.MAX_VALUE).string() - Json.decodeFromString(peekedBody) - } catch (_: Exception) { - JsonObject(emptyMap()) - } - else -> { - JsonObject(emptyMap()) - } - } - - adapter.registerResponse(span, response = response.asResponseView(responseBody)) - response + // response is of streaming type when its body MIME type is `text/event-stream` + isStreamingResponse = response.body.contentType()?.let { + "${it.type}/${it.subtype}" == "text/event-stream" + } ?: false + + // trace SSE events into span when the content type of the response body is `text/event-stream` + return when { + isStreamingResponse -> response.withTracedSSE(span, requestUrl = request.url.toProtocolUrl()) + else -> response } } catch (e: Exception) { span.setStatus(StatusCode.ERROR) span.recordException(e) throw e } finally { - if (!isStreamingRequest) { + // when dealing with the streaming response, + // its span will be closed when all events are traced (see `withTracedSSE`) + if (!isStreamingResponse) { span.end() } } } } - private fun wrapStreamingResponse( - originalResponse: OkHttpResponse, - url: TracyHttpUrl, - span: Span, - ): OkHttpResponse { - val originalBody = originalResponse.body ?: return originalResponse - - val tracingBody = object : OkHttpResponseBody() { - private val capturedText = StringBuilder() - - override fun contentType() = originalBody.contentType() - override fun contentLength() = -1L - - override fun source(): BufferedSource { - val originalSource = originalBody.source() - - return object : ForwardingSource(originalSource) { - private val acc = Buffer() - override fun read(sink: Buffer, byteCount: Long): Long { - val bytesRead = try { - super.read(sink, byteCount) - } catch (e: Exception) { - span.setStatus(StatusCode.ERROR) - span.recordException(e) - span.end() - throw e - } - - if (bytesRead > 0) { - val start = sink.size - bytesRead - sink.copyTo(acc, start, bytesRead) - - capturedText.append(acc.readUtf8(bytesRead)) - } - - return bytesRead - } - }.buffer() - } - - override fun close() { - try { - adapter.handleStreaming(span, url, capturedText.toString()) - } finally { - span.end() - } + private fun OkHttpResponse.withTracedSSE(span: Span, requestUrl: TracyHttpUrl): OkHttpResponse { + val originalResponse = this + val originalBody = originalResponse.body + + // wrap the source of the original body with a capturing source + // that forwards UTF-8-decoded bytes into SSE parser + val originalBodyWithTracedSSE = object : OkHttpResponseBody() { + private val bufferedSource: BufferedSource by lazy { + // capture SSE events via SSE parser and forward them into the adapter + SseCapturingSource( + delegate = originalBody.source(), + utf8Decoder = UTF8Decoder(), + // dispatch SSE events to the adapter + parser = SseParser { event -> + adapter.registerResponseStreamEvent(span, requestUrl, event) + }, + // NOTE: end the span when the source is closed + onClose = { + span.end() + }, + ).buffer() } + override fun contentType() = originalBody.contentType() + override fun contentLength() = originalBody.contentLength() + override fun source() = bufferedSource } - return originalResponse.newBuilder().body(tracingBody).build() + return originalResponse.newBuilder() + .body(originalBodyWithTracedSSE) + .build() } private fun OkHttpRequest.withCopiedBodyContent(): Pair { @@ -357,14 +323,27 @@ class OpenTelemetryOkHttpInterceptor( return content to request } - private fun OkHttpResponse.asResponseView(body: JsonObject): TracyHttpResponse { + private fun OkHttpResponse.asResponseView(): TracyHttpResponse { val response = this - val mediaType = response.body?.contentType() + val mediaType = response.body.contentType() + val mimeType = mediaType?.let { "${it.type}/${it.subtype}" } + + // select a body type based on the response MIME type + val body = when(mimeType) { + "application/json" -> try { + val json = Json.decodeFromString(response.peekBody(Long.MAX_VALUE).string()) + TracyHttpResponseBody.Json(json) + } catch (_: Exception) { + TracyHttpResponseBody.Empty + } + "text/event-stream" -> TracyHttpResponseBody.EventStream + else -> TracyHttpResponseBody.Empty + } return object : TracyHttpResponse { override val contentType = mediaType?.toContentType() override val code = response.code - override val body = TracyHttpResponseBody.Json(body) + override val body = body override val url = response.request.url.toProtocolUrl() override val requestMethod = response.request.method.uppercase() @@ -377,3 +356,54 @@ class OpenTelemetryOkHttpInterceptor( charset = mediaType.charset() ?: Charsets.UTF_8, ) } + +/** + * Peeks bytes of the original source [delegate], decodes them as UTF-8, + * and forwards into SSE parser [parser]. + */ +private class SseCapturingSource( + delegate: Source, + private val utf8Decoder: UTF8Decoder, + private val parser: SseParser, + private val onClose: () -> Unit = {}, +) : ForwardingSource(delegate) { + override fun read(sink: Buffer, byteCount: Long): Long { + val bytesRead = super.read(sink, byteCount) + if (bytesRead == -1L) { + return -1L + } + + // peek at the bytes just written to sink + val buffer = sink.peek().apply { + skip(sink.size - bytesRead) + }.readByteArray(bytesRead) + + val utf8Input = utf8Decoder.decode(buffer, bytesRead.toInt(), endOfInput = false) + if (utf8Input.isNotEmpty()) { + parser.feed(utf8Input) + } + + return bytesRead + } + + override fun close() { + try { + super.close() + // perform final decode and flush to handle any remaining bytes + val remainingUtf8Inputs = listOf( + // decode the remaining bytes in the buffer + utf8Decoder.decode(byteArrayOf(), 0, endOfInput = true), + // flush the remaining bytes in the buffer + utf8Decoder.flush(), + ) + for (input in remainingUtf8Inputs) { + if (input.isNotEmpty()) { + parser.feed(input) + } + } + parser.close() + } finally { + onClose() + } + } +} diff --git a/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParserTest.kt b/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParserTest.kt new file mode 100644 index 000000000..03c394250 --- /dev/null +++ b/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/SseParserTest.kt @@ -0,0 +1,263 @@ +/* + * Copyright © 2026 JetBrains s.r.o. and contributors. + * Use of this source code is governed by the Apache 2.0 license. + */ + +package org.jetbrains.ai.tracy.core.http.parsers + +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals + +/** + * See examples from the [Event Stream Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation) + * + * Besides, see the stream events' schemas for different LLM providers: + * 1. OpenAI: [Responses: Streaming Events](https://developers.openai.com/api/reference/resources/responses/streaming-events) + */ +class SseParserTest { + private val collector = EventsCollector() + + @Test + fun `test example 1 from spec`() = runTest { + val stream = """ + data: YHOO + data: +2 + data: 10 + """.trimIndent().endWithBlankLine() + + val parser = SseParser(collector::collect) + parser.feed(stream) + val events = collector.events() + + val expectedEvent = SseEvent( + event = "message", + data = "YHOO\n+2\n10", + ) + + assertEquals(1, events.size) + assertEquals(expectedEvent, events.first()) + } + + @Test + fun `test example 2 from spec`() = runTest { + val stream = """ + : test stream + + data: first event + id: 1 + + data:second event + id + + data: third event + """.trimIndent().endWithBlankLine() + + // 4 blocks: + // 1. comment -> dropped + // 2. event 1: (`message`, `first event`, 1) + // 3. event 2: (`message`, `second event`, "") + // 4. event 3: (`message`, ` third event`, "") <- mind the leading whitespace! + + val parser = SseParser(collector::collect) + parser.feed(stream) + + val events = collector.events() + + val expectedEvents = listOf( + SseEvent( + event = "message", + data = "first event", + id = "1", + ), + SseEvent( + event = "message", + data = "second event", + ), + SseEvent( + event = "message", + data = " third event", + ), + ) + + assertEquals(expectedEvents, events) + } + + @Test + fun `test example 4 from spec`() = runTest { + val stream = """ + data:test + + data: test + """.trimIndent().endWithBlankLine() + + // 2 blocks: + // 1. event 1: (`message`, `test`, "") + // 2. event 2: (`message`, `test`, "") <- the first whitespace after colon is ignored + + val parser = SseParser(collector::collect) + parser.feed(stream) + val events = collector.events() + + val expectedEvents = listOf( + SseEvent( + event = "message", + data = "test", + ), + SseEvent( + event = "message", + data = "test", + ), + ) + + assertEquals(expectedEvents, events) + } + + // OpenAI stream events tests + @Test + fun `test OpenAI (chat-completions) stream events are parsed correctly`() = runTest { + // send 3 Chat Completions Events and end with the 4th `[DONE]` event + val openaiEvents = listOf( + """ + {"id":"chatcmpl-123","object":"chat.completion.chunk","created":0,"model":"gpt-4o-mini","system_fingerprint":"fp_44709d6fcb","choices":[{"index":0,"delta":{"role":"assistant","content":""}}]} + """.trimIndent(), + """ + {"id":"chatcmpl-123","object":"chat.completion.chunk","created":0,"model":"gpt-4o-mini","system_fingerprint":"fp_44709d6fcb","choices":[{"index":0,"delta":{"content":"Hello"}}]} + """.trimIndent(), + """ + {"id":"chatcmpl-123","object":"chat.completion.chunk","created":0,"model":"gpt-4o-mini","system_fingerprint":"fp_44709d6fcb","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + """.trimIndent(), + "[DONE]", + ) + + val eventsStream = openaiEvents + .joinToString("\n\n") { "data: $it" } + .endWithBlankLine() + + val parser = SseParser(collector::collect) + parser.feed(eventsStream) + val events = collector.events() + + val expectedEvents = openaiEvents + .map { SseEvent("message", data = it) } + .toList() + + assertEquals(expectedEvents, events) + } + + @Test + fun `test OpenAI (responses-api) stream events are parsed correctly`() = runTest { + val openaiEvents = listOf( + """ + {"type":"response.created","response":{"id":"resp_123","created_at":1774266416,"metadata":{},"model":"gpt-4o-mini-2024-07-18","object":"response","output":[],"parallel_tool_calls":true,"temperature":0.0,"tool_choice":"auto","tools":[],"top_p":1.0,"reasoning":{},"status":"in_progress","text":{"format":{"type":"text"},"verbosity":"medium"},"truncation":"disabled","store":true,"background":false,"frequency_penalty":0.0,"presence_penalty":0.0,"service_tier":"auto","top_logprobs":0},"sequence_number":0} + """.trimIndent(), + """ + {"type":"response.output_item.added","output_index":0,"item":{"id":"msg_123","type":"message","status":"in_progress","content":[],"role":"assistant"},"sequence_number":2} + """.trimIndent(), + """ + {"type":"response.output_text.done","item_id":"msg_123","output_index":0,"content_index":0,"text":"Hello!","logprobs":[],"sequence_number":43} + """.trimIndent(), + + """ + {"type":"response.completed","response":{"id":"resp_123","created_at":1774266416,"metadata":{},"model":"gpt-4o-mini-2024-07-18","object":"response","output":[{"id":"msg_123","content":[{"annotations":[],"text":"Hello!","type":"output_text","logprobs":[]}],"role":"assistant","status":"completed","type":"message"}],"parallel_tool_calls":true,"temperature":0.0,"tool_choice":"auto","tools":[],"top_p":1.0,"reasoning":{},"status":"completed","text":{"format":{"type":"text"},"verbosity":"medium"},"truncation":"disabled","usage":{"input_tokens":13,"input_tokens_details":{"cached_tokens":0},"output_tokens":40,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":53},"store":true,"background":false,"completed_at":1774266417,"frequency_penalty":0.0,"presence_penalty":0.0,"service_tier":"default","top_logprobs":0},"sequence_number":46} + """.trimIndent(), + + "[DONE]", + ) + + val eventsStream = openaiEvents + .joinToString("\n\n") { "data: $it" } + .endWithBlankLine() + + val parser = SseParser(collector::collect) + parser.feed(eventsStream) + val events = collector.events() + + val expectedEvents = openaiEvents + .map { SseEvent("message", data = it) } + .toList() + + assertEquals(expectedEvents, events) + } + + @Test + fun `test OpenAI (image-completions) stream events are parsed correctly`() = runTest { + val openaiEvents = listOf( + "image_generation.partial_image" to + """ + { "created_at": 1774267828, "type": "image_generation.partial_image", "b64_json": "...","background":"opaque","output_format":"png","partial_image_index":1,"quality":"medium","sequence_number":1,"size":"1024x1024"} + """.trimIndent(), + + "image_generation.completed" to + """ + {"created_at":1774267837,"type":"image_generation.completed","b64_json": "...", "background":"opaque","output_format":"png","quality":"medium","sequence_number":2,"size":"1024x1024","usage":{"input_tokens":16,"input_tokens_details":{"image_tokens":0,"text_tokens":16},"output_tokens":1256,"total_tokens":1272}} + """.trimIndent() + ) + + val eventsStream = openaiEvents + .joinToString("\n\n") { + val event = "event: ${it.first}" + val data = "data: ${it.second}" + "$event\n$data" + }.endWithBlankLine() + + val parser = SseParser(collector::collect) + parser.feed(eventsStream) + val events = collector.events() + + val expectedEvents = openaiEvents + .map { SseEvent(it.first, data = it.second) } + .toList() + + assertEquals(expectedEvents, events) + } + + @Test + fun `test OpenAI (image-edits) stream events are parsed correctly`() = runTest { + val openaiEvents = listOf( + "image_edit.partial_image" to + """ + {"created_at":1774268492,"type":"image_edit.partial_image","b64_json":"...","background":"opaque","output_format":"png","partial_image_index":0,"quality":"high","sequence_number":0,"size":"1024x1024"} + """.trimIndent(), + + "image_edit.completed" to + """ + {"created_at":1774268530,"type":"image_edit.completed","b64_json":"...","background":"opaque","output_format":"png","quality":"high","sequence_number":2,"size":"1024x1024","usage":{"input_tokens":430,"input_tokens_details":{"image_tokens":388,"text_tokens":42},"output_tokens":4360,"total_tokens":4790}} + """.trimIndent() + ) + + val eventsStream = openaiEvents + .joinToString("\n\n") { + val event = "event: ${it.first}" + val data = "data: ${it.second}" + "$event\n$data" + }.endWithBlankLine() + + val parser = SseParser(collector::collect) + parser.feed(eventsStream) + val events = collector.events() + + val expectedEvents = openaiEvents + .map { SseEvent(it.first, data = it.second) } + .toList() + + assertEquals(expectedEvents, events) + } + + // Anthropic stream events tests + + // Gemini stream events tests + + + private fun String.endWithBlankLine() = this.plus("\n\n") + + private class EventsCollector { + private val events = mutableListOf() + + fun collect(event: SseEvent) { + events.add(event) + } + + fun events(): List = events + } +} \ No newline at end of file diff --git a/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8DecoderTest.kt b/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8DecoderTest.kt new file mode 100644 index 000000000..bb292e212 --- /dev/null +++ b/tracing/core/src/jvmTest/kotlin/org/jetbrains/ai/tracy/core/http/parsers/UTF8DecoderTest.kt @@ -0,0 +1,464 @@ +/* + * Copyright © 2026 JetBrains s.r.o. and contributors. + * Use of this source code is governed by the Apache 2.0 license. + */ + +package org.jetbrains.ai.tracy.core.http.parsers + +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals + +class UTF8DecoderTest { + + // ============================================================================ + // Basic Scenarios + // ============================================================================ + + @Test + fun `decode simple ASCII text`() { + val decoder = UTF8Decoder() + val text = "Hello, world!" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode 2-byte UTF-8 characters`() { + val decoder = UTF8Decoder() + // é character: + // 1. Latin Small Letter E With Acute + // 2. it is 2-byte UTF-8: C3 A9 + val text = "café" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode 3-byte UTF-8 characters`() { + val decoder = UTF8Decoder() + // Chinese characters are 3-byte UTF-8 + // 你: U+4F60 + // 好: U+597D + val text = "你好" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode 4-byte UTF-8 characters (emoji)`() { + val decoder = UTF8Decoder() + // 😀 is 4-byte UTF-8: F0 9F 98 80 + val text = "Hello 😀 World" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode mixed ASCII and multi-byte characters`() { + val decoder = UTF8Decoder() + val text = "Hello café 你好 😀" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode empty input`() { + val decoder = UTF8Decoder() + val bytes = ByteArray(0) + + val decoded = decoder.decode(bytes, 0, endOfInput = true) + + assertEquals("", decoded) + } + + @Test + fun `decode with negative bytesRead returns empty string`() { + val decoder = UTF8Decoder() + val bytes = "test".encodeToByteArray() + + val decoded = decoder.decode(bytes, -1, endOfInput = true) + + assertEquals("", decoded) + } + + // ============================================================================ + // Split Multi-byte Sequences + // ============================================================================ + + @Test + fun `decode 2-byte character split across two chunks`() { + val decoder = UTF8Decoder() + val text = "café" + val bytes = text.encodeToByteArray() + + // Find where 'é' (2-byte) starts + // Split right before 'é' + val splitPoint = "caf".encodeToByteArray().size + + val part1 = decoder.decode(bytes, splitPoint, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(splitPoint, bytes.size), + bytes.size - splitPoint, endOfInput = true) + + assertEquals("caf", part1) + assertEquals("é", part2) + assertEquals(text, part1 + part2) + } + + @Test + fun `decode 2-byte character split in the middle`() { + val decoder = UTF8Decoder() + // 2-byte: C3 A9 + val text = "é" + val bytes = text.encodeToByteArray() + assert(bytes.size == 2) + + // Split after first byte + val part1 = decoder.decode(bytes, 1, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(1, 2), 1, endOfInput = true) + + // First byte alone is incomplete + assertEquals("", part1) + assertEquals("é", part2) + } + + @Test + fun `decode 3-byte character split 1+2`() { + val decoder = UTF8Decoder() + // 3-byte UTF-8 + val text = "你" + val bytes = text.encodeToByteArray() + + // Split after first byte + val part1 = decoder.decode(bytes, 1, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(1, 3), 2, endOfInput = true) + + // Incomplete + assertEquals("", part1) + assertEquals("你", part2) + } + + @Test + fun `decode 3-byte character split 2+1`() { + val decoder = UTF8Decoder() + // 3-byte UTF-8 + val text = "你" + val bytes = text.encodeToByteArray() + + // Split after first two bytes + val part1 = decoder.decode(bytes, 2, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(2, 3), 1, endOfInput = true) + + // Incomplete + assertEquals("", part1) + assertEquals("你", part2) + } + + @Test + fun `decode 4-byte character (emoji) split 1+3`() { + val decoder = UTF8Decoder() + // 4-byte: F0 9F 98 80 + val text = "😀" + val bytes = text.encodeToByteArray() + + val part1 = decoder.decode(bytes, 1, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(1, 4), 3, endOfInput = true) + + assertEquals("", part1) + assertEquals("😀", part2) + } + + @Test + fun `decode 4-byte character (emoji) split 2+2`() { + val decoder = UTF8Decoder() + // 4-byte + val text = "😀" + val bytes = text.encodeToByteArray() + + val part1 = decoder.decode(bytes, 2, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(2, 4), 2, endOfInput = true) + + assertEquals("", part1) + assertEquals("😀", part2) + } + + @Test + fun `decode 4-byte character (emoji) split 3+1`() { + val decoder = UTF8Decoder() + // 4-byte + val text = "😀" + val bytes = text.encodeToByteArray() + + val part1 = decoder.decode(bytes, 3, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(3, 4), 1, endOfInput = true) + + assertEquals("", part1) + assertEquals("😀", part2) + } + + @Test + fun `decode multiple consecutive split characters`() { + val decoder = UTF8Decoder() + val text = "café你好😀" + val bytes = text.encodeToByteArray() + + // Simulate byte-by-byte streaming (worst case) + val result = StringBuilder() + for (i in bytes.indices) { + val chunk = decoder.decode(bytes.copyOfRange(i, i + 1), 1, + endOfInput = i == bytes.size - 1) + result.append(chunk) + } + + assertEquals(text, result.toString()) + } + + // ============================================================================ + // Buffer Boundaries and Stateful Behavior + // ============================================================================ + + @Test + fun `decode large input in multiple chunks`() { + val decoder = UTF8Decoder() + // Large text with multi-byte chars + val text = "Hello 😀 ".repeat(1000) + val bytes = text.encodeToByteArray() + + val chunkSize = 100 + val result = StringBuilder() + var offset = 0 + + while (offset < bytes.size) { + val end = minOf(offset + chunkSize, bytes.size) + val chunk = bytes.copyOfRange(offset, end) + val decoded = decoder.decode(chunk, chunk.size, endOfInput = end == bytes.size) + result.append(decoded) + offset = end + } + + assertEquals(text, result.toString()) + } + + @Test + fun `decode with empty chunks between non-empty chunks`() { + val decoder = UTF8Decoder() + val text = "Hello" + val bytes = text.encodeToByteArray() + + val part1 = decoder.decode(bytes, 3, endOfInput = false) + val empty = decoder.decode(ByteArray(0), 0, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(3, 5), 2, endOfInput = true) + + assertEquals("Hel", part1) + assertEquals("", empty) + assertEquals("lo", part2) + assertEquals(text, part1 + empty + part2) + } + + @Test + fun `decode maintains state across multiple calls`() { + val decoder = UTF8Decoder() + val text = "Part1 你好 Part2 😀" + val bytes = text.encodeToByteArray() + + // Split at arbitrary points that might cut multi-byte sequences + val chunks = listOf( + bytes.copyOfRange(0, 7), + bytes.copyOfRange(7, 14), + bytes.copyOfRange(14, bytes.size) + ) + + val result = StringBuilder() + for ((index, chunk) in chunks.withIndex()) { + val decoded = decoder.decode(chunk, chunk.size, endOfInput = index == chunks.size - 1) + result.append(decoded) + } + + assertEquals(text, result.toString()) + } + + // ============================================================================ + // Special UTF-8 Characters + // ============================================================================ + + @Test + fun `decode multiple emojis`() { + val decoder = UTF8Decoder() + val text = "😀😃😄😁🎉🔥" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode emoji with skin tone modifier`() { + val decoder = UTF8Decoder() + // Wave + medium skin tone (multi-codepoint) + val text = "👋🏽" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode combining characters`() { + val decoder = UTF8Decoder() + // e + combining acute accent + val text = "e\u0301" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode RTL (right-to-left) text`() { + val decoder = UTF8Decoder() + // Arabic "hello" + val text = "مرحبا" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + @Test + fun `decode zero-width characters`() { + val decoder = UTF8Decoder() + // Zero-width space + val text = "Hello\u200BWorld" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + + assertEquals(text, decoded) + } + + // ============================================================================ + // endOfInput and Flush Behavior + // ============================================================================ + + @Test + fun `decode with endOfInput false then true`() { + val decoder = UTF8Decoder() + val text = "Hello World" + val bytes = text.encodeToByteArray() + + val part1 = decoder.decode(bytes, 5, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(5, 11), 6, endOfInput = true) + + assertEquals("Hello", part1) + assertEquals(" World", part2) + } + + @Test + fun `flush after final decode`() { + val decoder = UTF8Decoder() + val text = "Hello" + val bytes = text.encodeToByteArray() + + val decoded = decoder.decode(bytes, bytes.size, endOfInput = true) + val flushed = decoder.flush() + + assertEquals("Hello", decoded) + // Nothing left to flush + assertEquals("", flushed) + } + + @Test + fun `flush after decode with partial sequence`() { + val decoder = UTF8Decoder() + val text = "café" + val bytes = text.encodeToByteArray() + + // Decode up to but not including the last byte of 'é' + val splitPoint = bytes.size - 1 + val decoded = decoder.decode(bytes, splitPoint, endOfInput = false) + + // Now feed the last byte and flush + val lastByte = decoder.decode(bytes.copyOfRange(splitPoint, bytes.size), 1, endOfInput = true) + val flushed = decoder.flush() + + assertEquals("caf", decoded) + assertEquals("é", lastByte) + assertEquals("", flushed) + } + + @Test + fun `decode complete sequence in multiple small chunks`() { + val decoder = UTF8Decoder() + // ASCII, emoji, ASCII + val text = "a😀b" + val bytes = text.encodeToByteArray() + + val result = StringBuilder() + var offset = 0 + + // Feed 2 bytes at a time + while (offset < bytes.size) { + val end = minOf(offset + 2, bytes.size) + val chunk = bytes.copyOfRange(offset, end) + val decoded = decoder.decode(chunk, chunk.size, endOfInput = end == bytes.size) + result.append(decoded) + offset = end + } + + val flushed = decoder.flush() + result.append(flushed) + + assertEquals(text, result.toString()) + } + + // ============================================================================ + // Decoder Reuse + // ============================================================================ + + @Test + fun `decoder can be reused for multiple independent sequences`() { + val decoder = UTF8Decoder() + + // First sequence + val text1 = "Hello 😀" + val bytes1 = text1.encodeToByteArray() + val decoded1 = decoder.decode(bytes1, bytes1.size, endOfInput = true) + decoder.flush() + + // Note: In practice, decoder state is NOT reset between calls + // This test verifies current behavior - decoder maintains state + assertEquals(text1, decoded1) + } + + @Test + fun `decode with trailing complete and incomplete sequences`() { + val decoder = UTF8Decoder() + val text = "abc😀" + val bytes = text.encodeToByteArray() + + // Decode all but last byte of emoji + val part1 = decoder.decode(bytes, bytes.size - 1, endOfInput = false) + val part2 = decoder.decode(bytes.copyOfRange(bytes.size - 1, bytes.size), 1, endOfInput = true) + + assertEquals("abc", part1) + assertEquals("😀", part2) + } +} \ No newline at end of file diff --git a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/GeminiLLMTracingAdapter.kt b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/GeminiLLMTracingAdapter.kt index 1992eb688..44fb8a326 100644 --- a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/GeminiLLMTracingAdapter.kt +++ b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/GeminiLLMTracingAdapter.kt @@ -16,6 +16,7 @@ import org.jetbrains.ai.tracy.gemini.adapters.handlers.GeminiContentGenHandler import org.jetbrains.ai.tracy.gemini.adapters.handlers.GeminiImagenHandler import io.opentelemetry.api.trace.Span import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent /** * Tracing adapter for Google Gemini and Imagen APIs. @@ -57,13 +58,16 @@ class GeminiLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIncub handler.handleResponseAttributes(span, response) } - override fun getSpanName(request: TracyHttpRequest) = "Gemini-generation" + override fun getSpanName() = "Gemini-generation" - // streaming is not supported - override fun isStreamingRequest(request: TracyHttpRequest) = false - override fun handleStreaming(span: Span, url: TracyHttpUrl, events: String) { + override fun registerResponseStreamEvent( + span: Span, + url: TracyHttpUrl, + event: SseEvent, + index: Long, + ): Result { val handler = selectHandler(url) - handler.handleStreaming(span, events) + return handler.handleStreamingEvent(span, event, index) } private fun selectHandler(url: TracyHttpUrl): EndpointApiHandler = when { diff --git a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiContentGenHandler.kt b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiContentGenHandler.kt index cce94c6b5..46b8ae533 100644 --- a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiContentGenHandler.kt +++ b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiContentGenHandler.kt @@ -5,13 +5,18 @@ package org.jetbrains.ai.tracy.gemini.adapters.handlers +import io.opentelemetry.api.trace.Span +import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* +import kotlinx.serialization.json.* import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.PayloadType import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.populateUnmappedAttributes import org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingUnsupported import org.jetbrains.ai.tracy.core.adapters.media.MediaContent import org.jetbrains.ai.tracy.core.adapters.media.MediaContentExtractor import org.jetbrains.ai.tracy.core.adapters.media.MediaContentPart import org.jetbrains.ai.tracy.core.adapters.media.Resource +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import org.jetbrains.ai.tracy.core.http.protocol.asJson @@ -19,9 +24,6 @@ import org.jetbrains.ai.tracy.core.policy.ContentKind import org.jetbrains.ai.tracy.core.policy.contentTracingAllowed import org.jetbrains.ai.tracy.core.policy.orRedactedInput import org.jetbrains.ai.tracy.core.policy.orRedactedOutput -import io.opentelemetry.api.trace.Span -import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* -import kotlinx.serialization.json.* /** * Parses Generate Content API requests and responses @@ -217,7 +219,13 @@ class GeminiContentGenHandler( span.populateUnmappedAttributes(body, mappedAttributes, PayloadType.RESPONSE) } - override fun handleStreaming(span: Span, events: String) = Unit + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long + ): Result { + return sseHandlingUnsupported() + } private fun parseRequestMediaContent(body: JsonObject): MediaContent? { val contents = body["contents"] diff --git a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiImagenHandler.kt b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiImagenHandler.kt index 92270776c..a7d234709 100644 --- a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiImagenHandler.kt +++ b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/adapters/handlers/GeminiImagenHandler.kt @@ -5,16 +5,18 @@ package org.jetbrains.ai.tracy.gemini.adapters.handlers +import io.opentelemetry.api.trace.Span +import kotlinx.serialization.json.* import org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingUnsupported import org.jetbrains.ai.tracy.core.adapters.media.MediaContent import org.jetbrains.ai.tracy.core.adapters.media.MediaContentExtractor import org.jetbrains.ai.tracy.core.adapters.media.MediaContentPart import org.jetbrains.ai.tracy.core.adapters.media.Resource +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import org.jetbrains.ai.tracy.core.http.protocol.asJson -import io.opentelemetry.api.trace.Span -import kotlinx.serialization.json.* /** * Parses Imagen API requests and responses @@ -87,7 +89,13 @@ class GeminiImagenHandler( extractor.setUploadableContentAttributes(span, field = "output", mediaContent) } - override fun handleStreaming(span: Span, events: String) = Unit + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long + ): Result { + return sseHandlingUnsupported() + } /** * Expects an array of schemas: diff --git a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/clients/GeminiClient.kt b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/clients/GeminiClient.kt index 477ed8f8c..829892fac 100644 --- a/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/clients/GeminiClient.kt +++ b/tracing/gemini/src/jvmMain/kotlin/org/jetbrains/ai/tracy/gemini/clients/GeminiClient.kt @@ -5,9 +5,9 @@ package org.jetbrains.ai.tracy.gemini.clients -import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor +import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor import org.jetbrains.ai.tracy.core.TracingManager -import org.jetbrains.ai.tracy.core.patchInterceptors +import org.jetbrains.ai.tracy.core.interceptors.patchInterceptors import org.jetbrains.ai.tracy.gemini.adapters.GeminiLLMTracingAdapter import okhttp3.Interceptor import okhttp3.OkHttpClient diff --git a/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt b/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt index 2b8906f8b..8e87a23c5 100644 --- a/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt +++ b/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorHttpClient.kt @@ -5,11 +5,6 @@ package org.jetbrains.ai.tracy.ktor -import org.jetbrains.ai.tracy.core.TracingManager -import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse -import org.jetbrains.ai.tracy.core.http.protocol.asRequestBody -import org.jetbrains.ai.tracy.core.http.protocol.asRequestView import io.ktor.client.* import io.ktor.client.plugins.api.* import io.ktor.client.request.* @@ -33,12 +28,14 @@ import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.InternalSerializationApi import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive import kotlinx.serialization.json.jsonObject import kotlinx.serialization.serializer import mu.KotlinLogging -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequestBody +import org.jetbrains.ai.tracy.core.TracingManager +import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter +import org.jetbrains.ai.tracy.core.http.parsers.SseParser +import org.jetbrains.ai.tracy.core.http.parsers.UTF8Decoder +import org.jetbrains.ai.tracy.core.http.protocol.* import kotlin.reflect.full.hasAnnotation import kotlin.reflect.full.starProjectedType @@ -158,14 +155,13 @@ fun instrument(client: HttpClient, adapter: LLMTracingAdapter): HttpClient { private class TracingPlugin(private val adapter: LLMTracingAdapter) { private val httpSpanKey = AttributeKey("HttpSpanKey") private val tracingEnabledKey = AttributeKey("TracingEnabledKey") - private val isStreamingRequestKey = AttributeKey("IsStreamingRequestKey") @OptIn(InternalAPI::class, InternalIoApi::class) fun setup(config: HttpClientConfig<*>) { val tracer = TracingManager.tracer // duplicate plugins are ignored by the API implementation - config.install(createClientPlugin("NetworkParamsPlugin") { + config.install(createClientPlugin("TracyInterceptingPlugin") { onRequest { request, _ -> val tracingEnabled = TracingManager.isTracingEnabled request.attributes.put(tracingEnabledKey, tracingEnabled) @@ -173,7 +169,7 @@ private class TracingPlugin(private val adapter: LLMTracingAdapter) { return@onRequest } - val span = tracer.spanBuilder("http-client-span").startSpan() + val span = tracer.spanBuilder(adapter.getSpanName()).startSpan() span.makeCurrent().use { request.attributes.put(httpSpanKey, span) @@ -204,107 +200,195 @@ private class TracingPlugin(private val adapter: LLMTracingAdapter) { url = request.url.toProtocolUrl(), method = request.method.value, ) - - request.attributes.put(isStreamingRequestKey, value = adapter.isStreamingRequest(tracyRequest)) adapter.registerRequest(span, tracyRequest) } } + // processes any response (i.e., application/json) except for SSE responses (i.e., text/event-stream) onResponse { response -> val enabled = response.call.request.attributes[tracingEnabledKey] - if (!enabled) return@onResponse - val isStreamingRequest = response.call.request.attributes.getOrNull(isStreamingRequestKey) - ?: return@onResponse + if (!enabled) { + return@onResponse + } + // don't register SSE responses here + val mimeType = response.contentType()?.toContentType()?.mimeType + if (mimeType == "text/event-stream") { + return@onResponse + } val span = response.call.request.attributes.getOrNull(httpSpanKey) ?: return@onResponse - if (isStreamingRequest) return@onResponse - - - // when the content type is `application/json`, we decode the response body; - // otherwise, (e.g., when the body is binary), we pass an empty JSON object as the response body. - val responseBody = when (response.contentType()?.withoutParameters()) { - ContentType.Application.Json -> try { - val body = run { - // peek the response body to avoid consuming the underlying channel - // NOTE: we must first peek and only then await. - // otherwise there are cases when an empty body gets peeked - val peeked = response.rawContent.readBuffer.peek() - response.rawContent.awaitContent(Int.MAX_VALUE) - peeked.request(Long.MAX_VALUE) - val buffer = Buffer() - buffer.write(peeked, peeked.buffer.size) - buffer.readString() - } - Json.parseToJsonElement(body).jsonObject - } catch (err: Exception) { - logger.trace("Error while parsing response body", err) - JsonObject(emptyMap()) - } - else -> { - JsonObject(emptyMap()) - } - } - adapter.registerResponse(span, response = response.asResponseView(responseBody)) + adapter.registerResponse(span, response = response.asResponseView()) span.end() } + // processes ONLY SSE responses (i.e., text/event-stream) transformResponseBody { response, content, typeInfo -> val enabled = response.call.request.attributes[tracingEnabledKey] - if (!enabled) return@transformResponseBody null - - val isStreamingRequest = response.call.request.attributes.getOrNull(isStreamingRequestKey) - ?: return@transformResponseBody null + if (!enabled) { + return@transformResponseBody null + } + // skip non-SSE responses + val mimeType = response.contentType()?.toContentType()?.mimeType + if (mimeType != "text/event-stream") { + return@transformResponseBody null + } val span = response.call.request.attributes.getOrNull(httpSpanKey) ?: return@transformResponseBody null - if (!isStreamingRequest) { - return@transformResponseBody null + adapter.registerResponse(span, response = response.asResponseView()) + + return@transformResponseBody when { + typeInfo.type == ByteReadChannel::class -> { + // trace SSE events and register them in the adapter one by one + val tracingChannel = traceServerSentEvents( + response = response, + originalBody = content, + span = span, + ) + tracingChannel + } + else -> null } + } + }) + } - val body = JsonObject(mapOf("stream" to JsonPrimitive(true))) - // registering response attributes into span - adapter.registerResponse(span, response = response.asResponseView(body)) - - val originalBody: ByteReadChannel = content - val tracingChannel = ByteChannel(autoFlush = true) - val capturedText = StringBuilder() - - CoroutineScope(response.coroutineContext).launch(start = CoroutineStart.UNDISPATCHED) { - try { - val buffer = ByteArray(DEFAULT_BUFFER_SIZE) - while (!originalBody.isClosedForRead) { - val bytesRead = originalBody.readAvailable(buffer, 0, buffer.size) - if (bytesRead == -1) break - if (bytesRead > 0) { - capturedText.append(buffer.decodeToString(0, bytesRead)) - tracingChannel.writeFully(buffer, 0, bytesRead) - tracingChannel.flush() - } - } - } catch (e: Exception) { - span.setStatus(StatusCode.ERROR) - span.recordException(e) - if (!tracingChannel.isClosedForWrite) tracingChannel.close(e) - } finally { - try { - adapter.handleStreaming( - span = span, - url = response.request.url.toProtocolUrl(), - events = capturedText.toString() - ) - } finally { - span.end() - if (!tracingChannel.isClosedForWrite) tracingChannel.close() + /** + * Reads available bytes from [originalBody] and feeds them into an instance of [SseParser] + * that registers SSE events into [adapter]. + * + * The caller MUST ensure that the underlying response is indeed of `text/event-stream` MIME type. + */ + private fun traceServerSentEvents( + response: HttpResponse, + originalBody: ByteReadChannel, + span: Span, + ): ByteChannel { + // trace SSE events and register them in the adapter one by one + val tracingChannel = ByteChannel(autoFlush = true) + + val url = response.request.url.toProtocolUrl() + val sseParser = SseParser { event -> + // trace SSE event in adapter + adapter.registerResponseStreamEvent(span, url, event) + } + + CoroutineScope(response.coroutineContext).launch(start = CoroutineStart.UNDISPATCHED) { + try { + val utf8Decoder = UTF8Decoder() + val buffer = ByteArray(DEFAULT_BUFFER_SIZE) + + while (!originalBody.isClosedForRead) { + val bytesRead = originalBody.readAvailable(buffer, 0, buffer.size) + if (bytesRead == -1) { + break + } + if (bytesRead > 0) { + val utf8Input = utf8Decoder.decode(buffer, bytesRead, endOfInput = false) + if (utf8Input.isNotEmpty()) { + sseParser.feed(utf8Input) } + + // forward unmodified types + tracingChannel.writeFully(buffer, 0, bytesRead) + tracingChannel.flush() + } + } + + // perform final decode and flush to handle any remaining bytes + val remainingUtf8Inputs = listOf( + // decode the remaining bytes in the buffer + utf8Decoder.decode(byteArrayOf(), 0, endOfInput = true), + // flush the remaining bytes in the buffer + utf8Decoder.flush(), + ) + for (input in remainingUtf8Inputs) { + if (input.isNotEmpty()) { + sseParser.feed(input) } } - if (typeInfo.type != ByteReadChannel::class) null else tracingChannel + } catch (e: Exception) { + span.setStatus(StatusCode.ERROR) + span.recordException(e) + if (!tracingChannel.isClosedForWrite) { + tracingChannel.close(e) + } + } finally { + sseParser.close() + if (!tracingChannel.isClosedForWrite) { + tracingChannel.close() + } + // close the span after all operations + span.end() } - }) + } + + return tracingChannel } - private fun HttpResponse.asResponseView(body: JsonObject): TracyHttpResponse = TracyHttpResponseView(response = this, body) + @OptIn(InternalIoApi::class, InternalAPI::class) + private suspend fun HttpResponse.asResponseView(): TracyHttpResponse { + val response = this + val responseBody = when (response.contentType()?.withoutParameters()) { + ContentType.Application.Json -> try { + // peek the response body to avoid consuming the underlying channel + val responseString = run { + /** + * CRITICAL ORDERING REQUIREMENT: peek() MUST be called BEFORE awaitContent() + * + * We must first peek and only then await; otherwise, + * there are cases when an empty body gets peeked. + * + * WHY THIS ORDER MATTERS (relies on Ktor ByteReadChannel internals): + * + * 1. peek() creates a kotlinx-io Source that captures a snapshot reference + * to the current state of ByteReadChannel's internal readBuffer + * + * 2. awaitContent() suspends until data is available, triggering async network I/O + * that fills the underlying buffer with response data + * + * 3. The peek source can only read data if it was created BEFORE the buffer + * was filled. The peek() call essentially registers: "I want to observe + * whatever gets written to this buffer from now on" + * + * WHAT BREAKS IF ORDER IS SWAPPED (await → peek → request): + * + * - awaitContent() fills the buffer with response data via async I/O + * - By the time peek() is called, the buffer state has already been modified + * - peek() captures a post-modification buffer state, creating a race condition: + * * Other operations may have already consumed the data + * * Buffer may be in an inconsistent state + * * ByteReadChannel is designed for single-consumer scenarios + * - Result: peeked.request() reads an empty/inconsistent buffer even though + * data was successfully received from the network + * + * This is non-obvious because: + * - The failure is a race condition that depends on async timing + * - Ktor's ByteReadChannel buffer state changes are internal implementation details + * - The kotlinx-io Source.peek() documentation doesn't explicitly document + * this ordering requirement with async I/O operations + */ + val peeked = response.rawContent.readBuffer.peek() + response.rawContent.awaitContent(Int.MAX_VALUE) + peeked.request(Long.MAX_VALUE) + + val buffer = Buffer() + buffer.write(peeked, peeked.buffer.size) + buffer.readString() + } + val json = Json.parseToJsonElement(responseString).jsonObject + TracyHttpResponseBody.Json(json) + } catch (exception: Exception) { + logger.trace("Error while parsing response body", exception) + TracyHttpResponseBody.Empty + } + ContentType.Text.EventStream -> TracyHttpResponseBody.EventStream + else -> TracyHttpResponseBody.Empty + } + + return TracyHttpResponseView(response, body = responseBody) + } /** * Helper function to serialize `@Serializable` objects with an unknown type diff --git a/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorProtocolAdapters.kt b/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorProtocolAdapters.kt index 9e7d2ae9f..cd0ae39d6 100644 --- a/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorProtocolAdapters.kt +++ b/tracing/ktor/src/jvmMain/kotlin/org/jetbrains/ai/tracy/ktor/KtorProtocolAdapters.kt @@ -5,22 +5,12 @@ package org.jetbrains.ai.tracy.ktor -import org.jetbrains.ai.tracy.core.http.protocol.TracyContentType -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponseBody -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrl -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrlImpl -import org.jetbrains.ai.tracy.core.http.protocol.TracyQueryParameters -import io.ktor.client.statement.HttpResponse -import io.ktor.client.statement.request -import io.ktor.http.URLBuilder +import io.ktor.client.statement.* +import io.ktor.http.* +import org.jetbrains.ai.tracy.core.http.protocol.* import io.ktor.http.Url as KtorUrl -import io.ktor.http.charset -import io.ktor.http.contentType -import io.ktor.http.isSuccess -import kotlinx.serialization.json.JsonObject -internal fun io.ktor.http.ContentType.toContentType(): TracyContentType { +internal fun ContentType.toContentType(): TracyContentType { val contentType = this return object : TracyContentType { override val type = contentType.contentType @@ -32,16 +22,17 @@ internal fun io.ktor.http.ContentType.toContentType(): TracyContentType { } internal class TracyHttpResponseView( - private val response: HttpResponse, - body: JsonObject, + response: HttpResponse, + override val body: TracyHttpResponseBody, ) : TracyHttpResponse { + private val isError = response.status.isSuccess().not() + override val contentType = response.contentType()?.toContentType() override val code = response.status.value - override val body = TracyHttpResponseBody.Json(body) override val url = response.request.url.toProtocolUrl() override val requestMethod = response.request.method.value.uppercase() - override fun isError() = response.status.isSuccess().not() + override fun isError() = isError } internal fun URLBuilder.toProtocolUrl(): TracyHttpUrl { @@ -58,6 +49,7 @@ internal fun URLBuilder.toProtocolUrl(): TracyHttpUrl { host = builder.host, pathSegments = builder.pathSegments, parameters = params, + url = builder.buildString(), ) } @@ -74,5 +66,6 @@ internal fun KtorUrl.toProtocolUrl(): TracyHttpUrl { host = url.host, pathSegments = url.segments, parameters = params, + url = url.toString(), ) } diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt index f6e9726f9..73361b603 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/OpenAILLMTracingAdapter.kt @@ -5,22 +5,24 @@ package org.jetbrains.ai.tracy.openai.adapters +import io.opentelemetry.api.trace.Span +import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GenAiSystemIncubatingValues +import kotlinx.serialization.json.jsonObject +import mu.KotlinLogging import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter import org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler import org.jetbrains.ai.tracy.core.adapters.media.MediaContentExtractorImpl -import org.jetbrains.ai.tracy.core.http.protocol.* +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent +import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest +import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse +import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrl +import org.jetbrains.ai.tracy.core.http.protocol.asJson import org.jetbrains.ai.tracy.openai.adapters.handlers.ChatCompletionsOpenAIApiEndpointHandler import org.jetbrains.ai.tracy.openai.adapters.handlers.OpenAIApiUtils import org.jetbrains.ai.tracy.openai.adapters.handlers.ResponsesOpenAIApiEndpointHandler import org.jetbrains.ai.tracy.openai.adapters.handlers.images.ImagesCreateEditOpenAIApiEndpointHandler import org.jetbrains.ai.tracy.openai.adapters.handlers.images.ImagesCreateOpenAIApiEndpointHandler import org.jetbrains.ai.tracy.openai.adapters.handlers.videos.VideosOpenAIApiEndpointHandler -import io.opentelemetry.api.trace.Span -import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GenAiSystemIncubatingValues -import kotlinx.serialization.json.boolean -import kotlinx.serialization.json.jsonObject -import kotlinx.serialization.json.jsonPrimitive -import mu.KotlinLogging import java.util.concurrent.ConcurrentHashMap @@ -95,32 +97,22 @@ class OpenAILLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIncub override fun getResponseBodyAttributes(span: Span, response: TracyHttpResponse) { val handler = handlerFor(response.url) - OpenAIApiUtils.setCommonResponseAttributes(span, response) + response.body.asJson()?.jsonObject?.let { + OpenAIApiUtils.setCommonResponseAttributes(span, response = it) + } handler.handleResponseAttributes(span, response) } - override fun getSpanName(request: TracyHttpRequest) = "OpenAI-generation" - - override fun isStreamingRequest(request: TracyHttpRequest): Boolean { - return when (request.body) { - is TracyHttpRequestBody.FormData -> { - val data = request.body.asFormData() ?: return false - data.parts.filter { it.name == "stream" }.any { - val value = it.content.toString(it.contentType?.charset() ?: Charsets.UTF_8) - value.toBooleanStrictOrNull() ?: false - } - } - is TracyHttpRequestBody.Json -> { - val body = request.body.asJson()?.jsonObject ?: return false - body["stream"]?.jsonPrimitive?.boolean ?: false - } - is TracyHttpRequestBody.Empty -> false - } - } + override fun getSpanName() = "OpenAI-generation" - override fun handleStreaming(span: Span, url: TracyHttpUrl, events: String) { + override fun registerResponseStreamEvent( + span: Span, + url: TracyHttpUrl, + event: SseEvent, + index: Long, + ): Result { val handler = handlerFor(url) - handler.handleStreaming(span, events) + return handler.handleStreamingEvent(span, event, index) } /** diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ChatCompletionsOpenAIApiEndpointHandler.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ChatCompletionsOpenAIApiEndpointHandler.kt index dbe646149..45f5b5807 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ChatCompletionsOpenAIApiEndpointHandler.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ChatCompletionsOpenAIApiEndpointHandler.kt @@ -5,36 +5,23 @@ package org.jetbrains.ai.tracy.openai.adapters.handlers +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.sdk.trace.ReadableSpan +import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_USAGE_INPUT_TOKENS +import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_USAGE_OUTPUT_TOKENS +import kotlinx.serialization.json.* import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.PayloadType import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.populateUnmappedAttributes import org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler -import org.jetbrains.ai.tracy.core.adapters.media.MediaContent -import org.jetbrains.ai.tracy.core.adapters.media.MediaContentExtractor -import org.jetbrains.ai.tracy.core.adapters.media.MediaContentPart -import org.jetbrains.ai.tracy.core.adapters.media.Resource -import org.jetbrains.ai.tracy.core.adapters.media.isValidUrl +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure +import org.jetbrains.ai.tracy.core.adapters.media.* +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import org.jetbrains.ai.tracy.core.http.protocol.asJson -import org.jetbrains.ai.tracy.core.policy.ContentKind -import org.jetbrains.ai.tracy.core.policy.contentTracingAllowed -import org.jetbrains.ai.tracy.core.policy.orRedacted -import org.jetbrains.ai.tracy.core.policy.orRedactedInput -import org.jetbrains.ai.tracy.core.policy.orRedactedOutput -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.StatusCode -import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_USAGE_INPUT_TOKENS -import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_USAGE_OUTPUT_TOKENS -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonArray -import kotlinx.serialization.json.JsonElement -import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive -import kotlinx.serialization.json.boolean -import kotlinx.serialization.json.intOrNull -import kotlinx.serialization.json.jsonArray -import kotlinx.serialization.json.jsonObject -import kotlinx.serialization.json.jsonPrimitive +import org.jetbrains.ai.tracy.core.policy.* /** @@ -199,39 +186,59 @@ internal class ChatCompletionsOpenAIApiEndpointHandler( span.populateUnmappedAttributes(body, mappedAttributes, PayloadType.RESPONSE) } - override fun handleStreaming(span: Span, events: String): Unit = runCatching { - var role: String? = null - val out = buildString { - for (line in events.lineSequence()) { - if (!line.startsWith("data:")) { - continue - } - val data = line.removePrefix("data:").trim() + /** + * In chat completions, assistant message content arrives by deltas. + * Here, we accumulate deltas into the span by appending new deltas at + * the end of the previously assigned content attribute. + */ + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long, + ): Result = runCatching { + val data = runCatching { + Json.parseToJsonElement(event.data).jsonObject + }.getOrElse { err -> + return sseHandlingFailure("Cannot parse event data as JSON: ${err.message}") + } - val event = runCatching { - Json.parseToJsonElement(data).jsonObject - }.getOrNull() ?: continue + val choice = data["choices"]?.jsonArray?.firstOrNull()?.jsonObject + ?: return sseHandlingFailure("Event's JSON has no 'choices' field") - val choice = event["choices"]?.jsonArray?.firstOrNull()?.jsonObject ?: continue - val delta = choice["delta"]?.jsonObject ?: continue + val delta = choice["delta"]?.jsonObject + ?: return sseHandlingFailure("Event's 'choices' field has no 'delta' field") - if (role == null) { - role = delta["role"]?.jsonPrimitive?.content - } - delta["content"]?.jsonPrimitive?.content?.let { append(it) } - } + val role = delta["role"]?.jsonPrimitive?.content + val contentDelta = delta["content"]?.jsonPrimitive?.content + + if (!role.isNullOrEmpty()) { + span.setAttribute("gen_ai.completion.0.role", role) } - if (out.isNotEmpty()) { - val kind = kindByRole(role) - span.setAttribute("gen_ai.completion.0.content", out.orRedacted(kind)) + val alreadyInstalledRole = (span as? ReadableSpan)?.attributes + ?.get(AttributeKey.stringKey("gen_ai.completion.0.role")) ?: role + + // concatenate already traced deltas with the new one and install as content if content tracing allowed; + // otherwise, when tracing is disallowed, redact an empty string to derive '[REDACTED]' + val contentKind = kindByRole(alreadyInstalledRole) + val tracingAllowed = contentTracingAllowed(contentKind) + + if (!contentDelta.isNullOrEmpty() && tracingAllowed) { + val previousDeltas = (span as? ReadableSpan)?.attributes + ?.get(AttributeKey.stringKey("gen_ai.completion.0.content")) ?: "" + val content = previousDeltas + contentDelta + + span.setAttribute("gen_ai.completion.0.content", content.orRedacted(contentKind)) + } else if (!tracingAllowed) { + // assign empty redacted string + span.setAttribute("gen_ai.completion.0.content", "".orRedacted(contentKind)) } - role?.let { span.setAttribute("gen_ai.completion.0.role", it) } - return@runCatching - }.getOrElse { exception -> + return Result.success(Unit) + }.getOrElse { err -> span.setStatus(StatusCode.ERROR) - span.recordException(exception) + span.recordException(err) + return sseHandlingFailure("Failed to handle streaming event: ${err.message}") } /** diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/OpenAIApiUtils.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/OpenAIApiUtils.kt index 879c1976f..da22947e4 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/OpenAIApiUtils.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/OpenAIApiUtils.kt @@ -5,12 +5,11 @@ package org.jetbrains.ai.tracy.openai.adapters.handlers -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest -import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse -import org.jetbrains.ai.tracy.core.http.protocol.asJson import io.opentelemetry.api.trace.Span import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* import kotlinx.serialization.json.* +import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest +import org.jetbrains.ai.tracy.core.http.protocol.asJson /** * Common utilities for OpenAI API handling @@ -23,19 +22,17 @@ internal object OpenAIApiUtils { fun setCommonRequestAttributes(span: Span, request: TracyHttpRequest) { val body = request.body.asJson()?.jsonObject ?: return - body["temperature"]?.let { span.setAttribute(GEN_AI_REQUEST_TEMPERATURE, it.jsonPrimitive.doubleOrNull) } - body["model"]?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it.jsonPrimitive.content) } + body["temperature"]?.jsonPrimitive?.doubleOrNull?.let { span.setAttribute(GEN_AI_REQUEST_TEMPERATURE, it) } + body["model"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it) } } /** * Sets common response attributes (id, model, object type) */ - fun setCommonResponseAttributes(span: Span, response: TracyHttpResponse) { - val body = response.body.asJson()?.jsonObject ?: return - - body["id"]?.let { span.setAttribute(GEN_AI_RESPONSE_ID, it.jsonPrimitive.content) } - body["object"]?.let { span.setAttribute(GEN_AI_OPERATION_NAME, it.jsonPrimitive.content) } - body["model"]?.let { span.setAttribute(GEN_AI_RESPONSE_MODEL, it.jsonPrimitive.content) } + fun setCommonResponseAttributes(span: Span, response: JsonObject) { + response["id"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_RESPONSE_ID, it) } + response["object"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_OPERATION_NAME, it) } + response["model"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_RESPONSE_MODEL, it) } } } @@ -44,4 +41,4 @@ internal val JsonElement.asString: String is JsonArray -> this.jsonArray.toString() is JsonObject -> this.jsonObject.toString() is JsonPrimitive -> this.jsonPrimitive.content - } \ No newline at end of file + } diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ResponsesOpenAIApiEndpointHandler.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ResponsesOpenAIApiEndpointHandler.kt index e114fb552..1e95e6135 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ResponsesOpenAIApiEndpointHandler.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/ResponsesOpenAIApiEndpointHandler.kt @@ -20,6 +20,8 @@ import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.* import kotlinx.serialization.json.* +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent /** * Handler for OpenAI Responses API @@ -138,11 +140,42 @@ internal class ResponsesOpenAIApiEndpointHandler( */ override fun handleResponseAttributes(span: Span, response: TracyHttpResponse) { val body = response.body.asJson()?.jsonObject ?: return - OpenAIApiUtils.setCommonResponseAttributes(span, response) + OpenAIApiUtils.setCommonResponseAttributes(span, response = body) + parseResponseAttributes(span, response = body) + } + + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long, + ): Result = runCatching { + val data = runCatching { + Json.parseToJsonElement(event.data).jsonObject + }.getOrNull() ?: return@runCatching sseHandlingFailure("Failed to parse SSE event") + + val type = data["type"]?.jsonPrimitive?.content + if (type == "response.completed") { + val response = data["response"]?.jsonObject + ?: return@runCatching sseHandlingFailure("Failed to parse response object") + + OpenAIApiUtils.setCommonResponseAttributes(span, response) + parseResponseAttributes(span, response) + + span.setAttribute("gen_ai.completion.0.finish_reason", "stop") + } + + return@runCatching Result.success(Unit) + }.getOrElse { exception -> + span.setStatus(StatusCode.ERROR) + span.recordException(exception) + + return@getOrElse sseHandlingFailure("Failed to handle streaming event: ${exception.message}") + } + private fun parseResponseAttributes(span: Span, response: JsonObject) { // we manually map `output` and `usage` attributes; // the rest of attributes get mapped by `populateUnmappedAttributes` below. - body["output"]?.let { outputs -> + response["output"]?.let { outputs -> for ((index, output) in outputs.jsonArray.withIndex()) { when (val type = output.jsonObject["type"]?.jsonPrimitive?.content) { "message", null -> { @@ -213,33 +246,11 @@ internal class ResponsesOpenAIApiEndpointHandler( } } - body["usage"]?.let { usage -> - setUsageAttributes(span, usage.jsonObject) + response["usage"]?.jsonObject?.let { usage -> + setUsageAttributes(span, usage) } - span.populateUnmappedAttributes(body, mappedAttributes, PayloadType.RESPONSE) - } - - override fun handleStreaming(span: Span, events: String): Unit = runCatching { - for (line in events.lineSequence()) { - if (!line.startsWith("data:")) continue - val data = line.removePrefix("data:").trim() - - val event = runCatching { - Json.parseToJsonElement(data).jsonObject - }.getOrNull() ?: continue - - val type = event["type"]?.jsonPrimitive?.content - if (type == "response.output_text.done") { - event["text"]?.jsonPrimitive?.content?.let { - span.setAttribute("gen_ai.completion.0.content", it.orRedactedOutput()) - span.setAttribute("gen_ai.completion.0.finish_reason", "stop") - } - } - } - }.getOrElse { exception -> - span.setStatus(StatusCode.ERROR) - span.recordException(exception) + span.populateUnmappedAttributes(response, mappedAttributes, PayloadType.RESPONSE) } /** diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateEditOpenAIApiEndpointHandler.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateEditOpenAIApiEndpointHandler.kt index cee08e3ae..2b7544c63 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateEditOpenAIApiEndpointHandler.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateEditOpenAIApiEndpointHandler.kt @@ -21,6 +21,8 @@ import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes import kotlinx.serialization.json.Json import kotlinx.serialization.json.jsonObject import mu.KotlinLogging +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import java.util.* /** @@ -119,24 +121,22 @@ internal class ImagesCreateEditOpenAIApiEndpointHandler( handleImageGenerationResponseAttributes(span, response, extractor) } - override fun handleStreaming(span: Span, events: String) { - for (line in events.lineSequence()) { - if (!line.startsWith("data:")) { - continue - } - val data = try { - Json.parseToJsonElement(line.removePrefix("data:").trim()).jsonObject - } catch (err: Exception) { - logger.trace("Failed to parse streaming data: '$line'", err) - null - } ?: continue - - handleStreamedImage( - span, data, extractor, - completedType = "image_edit.completed", - partialImageType = "image_edit.partial_image", - ) - } + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long, + ): Result { + val data = runCatching { + Json.parseToJsonElement(event.data).jsonObject + }.getOrNull() ?: return sseHandlingFailure("Cannot parse event data as JSON") + + handleStreamedImage( + span, data, extractor, + completedType = "image_edit.completed", + partialImageType = "image_edit.partial_image", + ) + + return Result.success(Unit) } companion object { diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateOpenAIApiEndpointHandler.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateOpenAIApiEndpointHandler.kt index 9945ba068..2a0187be4 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateOpenAIApiEndpointHandler.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/images/ImagesCreateOpenAIApiEndpointHandler.kt @@ -17,6 +17,8 @@ import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_REQU import kotlinx.serialization.json.Json import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent /** * Extracts request/response bodies of Image Generation API. @@ -45,18 +47,21 @@ internal class ImagesCreateOpenAIApiEndpointHandler( handleImageGenerationResponseAttributes(span, response, extractor) } - override fun handleStreaming(span: Span, events: String) { - for (line in events.lineSequence()) { - if (!line.startsWith("data:")) { - continue - } - val data = Json.parseToJsonElement(line.removePrefix("data:").trim()).jsonObject + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long, + ): Result { + val data = runCatching { + Json.parseToJsonElement(event.data).jsonObject + }.getOrNull() ?: return sseHandlingFailure("Cannot parse event data as JSON") - handleStreamedImage( - span, data, extractor, - completedType = "image_generation.completed", - partialImageType = "image_generation.partial_image", - ) - } + handleStreamedImage( + span, data, extractor, + completedType = "image_generation.completed", + partialImageType = "image_generation.partial_image", + ) + + return Result.success(Unit) } } \ No newline at end of file diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/videos/VideosOpenAIApiEndpointHandler.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/videos/VideosOpenAIApiEndpointHandler.kt index 2cf777ab4..e6e1246f2 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/videos/VideosOpenAIApiEndpointHandler.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/adapters/handlers/videos/VideosOpenAIApiEndpointHandler.kt @@ -8,7 +8,9 @@ package org.jetbrains.ai.tracy.openai.adapters.handlers.videos import io.opentelemetry.api.trace.Span import mu.KotlinLogging import org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler +import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingUnsupported import org.jetbrains.ai.tracy.core.adapters.media.MediaContentExtractor +import org.jetbrains.ai.tracy.core.http.parsers.SseEvent import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrl @@ -56,10 +58,12 @@ internal class VideosOpenAIApiEndpointHandler( routeHandlers[route]?.handleResponse(span, response) } - override fun handleStreaming(span: Span, events: String) { - // Videos API doesn't support SSE streaming for creation - // Content download is binary streaming handled separately - logger.warn { "Videos API does not use server-sent events streaming" } + override fun handleStreamingEvent( + span: Span, + event: SseEvent, + index: Long, + ): Result { + return sseHandlingUnsupported() } /** diff --git a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/clients/OpenAIClient.kt b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/clients/OpenAIClient.kt index 0ebc270a8..b413ab638 100644 --- a/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/clients/OpenAIClient.kt +++ b/tracing/openai/src/jvmMain/kotlin/org/jetbrains/ai/tracy/openai/clients/OpenAIClient.kt @@ -5,8 +5,8 @@ package org.jetbrains.ai.tracy.openai.clients -import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor -import org.jetbrains.ai.tracy.core.patchOpenAICompatibleClient +import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor +import org.jetbrains.ai.tracy.core.interceptors.patchOpenAICompatibleClient import org.jetbrains.ai.tracy.openai.adapters.OpenAILLMTracingAdapter import com.openai.client.OpenAIClient import org.jetbrains.ai.tracy.core.TracingManager