Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
91d4e2e
feat: generate SSE parser
Vladislav0Art Mar 19, 2026
a7a0068
feat: create tests for SSE parser (spec's example 3 is incorrect)
Vladislav0Art Mar 19, 2026
ace712a
fix: remove example 3 from sse parser tests
Vladislav0Art Mar 19, 2026
17c8897
feat: add comments to implementation of `SseParser`
Vladislav0Art Mar 19, 2026
1970d53
refactor
Vladislav0Art Mar 19, 2026
89a7f73
feat: add TODO in `SseParserTest`
Vladislav0Art Mar 22, 2026
a1fa4d9
feat: introduce `LLMTracingAdapter.handleStreamingEvent` method
Vladislav0Art Mar 22, 2026
aca23ba
feat: remove Tracy request param from adapter's `getSpanName`
Vladislav0Art Mar 22, 2026
836378c
feat: use `getSpanName` in Ktor and OkHttp interceptors
Vladislav0Art Mar 22, 2026
aedbba2
feat: identify streaming response by response body content type
Vladislav0Art Mar 22, 2026
4552ceb
feat: write tests with OpenAI SSE events
Vladislav0Art Mar 23, 2026
6dbfa47
feat: add explanation comments in `OpenTelemetryOkHttpInterceptor`
Vladislav0Art Mar 23, 2026
fb191a2
fix: correctly trace completion event of OpenAI Responses API
Vladislav0Art Mar 23, 2026
cb063cb
feat: rename Tracy SSE events count attribute key
Vladislav0Art Mar 23, 2026
d14c800
refactor: remove commented code
Vladislav0Art Mar 23, 2026
dd3afe3
refactor: imports
Vladislav0Art Mar 23, 2026
3c9cd9c
refactor: return `Result<Unit>` from SSE event handling methods
Vladislav0Art Mar 23, 2026
7a56f47
fix: avoid allocations of `AttributeKey`
Vladislav0Art Mar 23, 2026
70f9ea4
feat: unconditionally set content type in `registerResponse`
Vladislav0Art Mar 23, 2026
6cf0068
refactor: make `SseCapturingSource` private in `OpenTelemetryOkHttpIn…
Vladislav0Art Mar 23, 2026
677fa3d
fix: close SSE parser in Ktor interceptor
Vladislav0Art Mar 23, 2026
e18977f
fix: return index 0 in Chat Completions SSE event handling
Vladislav0Art Mar 23, 2026
94ec8d2
feat(`LLMTracingAdapter`): warn on SSE event parsing failure
Vladislav0Art Mar 23, 2026
c659c39
feat: remove TODO
Vladislav0Art Mar 23, 2026
5b719b3
fix: correctly trace assistant content for chat completions SSE events
Vladislav0Art Mar 23, 2026
7e4389e
feat: rename `gen_ai.response.streaming` -> `gen_ai.response.sse.stre…
Vladislav0Art Mar 23, 2026
e266704
feat: update KDoc of `registerResponseStreamEvent`
Vladislav0Art Mar 25, 2026
e45d61e
feat: call logger's debug instead of warn
Vladislav0Art Mar 25, 2026
a485d3d
refactor: fix mime type name in comments
Vladislav0Art Mar 25, 2026
69fbbe1
refactor: comment (color -> colon) in SseParser.kt
Vladislav0Art Mar 25, 2026
edf5f27
refactor: mention variable type `Long`
Vladislav0Art Mar 25, 2026
d7d5a83
refactor(`OpenAIApiUtils`): convert to `jsonPrimitive` and extract va…
Vladislav0Art Mar 25, 2026
4457403
refactor: fix usage example in `LLMTracingAdapter`'s KDoc
Vladislav0Art Mar 25, 2026
a4fc174
fix: prefix attribute `gen_ai.response.sse.streaming` with `tracy` in…
Vladislav0Art Mar 25, 2026
7a175f5
refactor(`AGENTS.md`): update comment about `LLMTracingAdapter`
Vladislav0Art Mar 25, 2026
6c15c2f
feat: change plugin name in `createClientPlugin` call
Vladislav0Art Mar 30, 2026
9cd1142
refactor: move SSE tracing in Ktor plugin into separate function
Vladislav0Art Mar 30, 2026
1829acd
feat: update Anthropic span name in accordance with GenAI
Vladislav0Art Mar 30, 2026
692a997
feat: add `asString` to `TracyHttpUrl`
Vladislav0Art Mar 30, 2026
18b34a9
feat: create `sseHandlingUnsupported` function
Vladislav0Art Mar 30, 2026
3b99009
refactor: remove redundant `?.`
Vladislav0Art Mar 30, 2026
fb90bc3
feat: use `sseHandlingUnsupported` in Anthropic adapter and Gemini ha…
Vladislav0Art Mar 30, 2026
c31dd93
feat: print warning about unsupported SSE event handling only once pe…
Vladislav0Art Mar 30, 2026
9264178
feat: call `getResponseBodyAttributes` for any non-SSE response type
Vladislav0Art Mar 30, 2026
85e9321
feat: re-implement `TracyHttpUrl.asString`
Vladislav0Art Mar 30, 2026
ed1625a
refactor: imports
Vladislav0Art Mar 30, 2026
9c762af
fix: check for nullness of `mimeType` in `LLMTracingAdapter`
Vladislav0Art Mar 30, 2026
023618b
feat(`VideosOpenAIApiEndpointHandler`): mark stream tracing as unsupp…
Vladislav0Art Mar 30, 2026
aff96b1
refactor: comment wording
Vladislav0Art Mar 30, 2026
d8ca934
refactor: comment wording
Vladislav0Art Mar 30, 2026
e5f1724
fix: set `retryValue` to `null` when dispatching
Vladislav0Art Mar 30, 2026
fd2636b
fix: use streaming UTF-8 CharsetDecoder in Ktor SSE tracing to handle…
Copilot Mar 30, 2026
95c4664
feat(`SseParser`): dispatch final event if any on close
Vladislav0Art Mar 30, 2026
5c140ef
fix: set `endOfInput` as `originalBody.isClosedForRead`
Vladislav0Art Mar 30, 2026
630ab43
refactor: imports
Vladislav0Art Mar 30, 2026
ff430c3
refactor: move call of `traceServerSentEvents` into `transformRespons…
Vladislav0Art Mar 30, 2026
c4c3d9a
feat: don't swallow exceptions in sse parsing
Vladislav0Art Apr 3, 2026
52e0bc0
feat: create stateful `UTF8Decoder`
Vladislav0Art Apr 3, 2026
f0ad2e0
feat(Ktor): use `UTF8Decoder` to parse SSE events
Vladislav0Art Apr 3, 2026
db91929
feat(`OpenTelemetryOkHttpInterceptor`): use `UTF8Decoder` to parse SS…
Vladislav0Art Apr 3, 2026
655486a
feat: make `sseHandlingUnsupportedWarningPrinted` an `AtomicBoolean`
Vladislav0Art Apr 3, 2026
5687846
feat: do the final decoding and flush remaining bytes
Vladislav0Art Apr 3, 2026
41e1184
feat: create a test suite for `UTF8Decoder` (WIP)
Vladislav0Art Apr 3, 2026
fdfebda
feat: add comment about `sseHandlingUnsupportedWarningPrinted`
Vladislav0Art Apr 3, 2026
f0befc2
feat(Ktor): generate comment on why `peek` should come before `awaitC…
Vladislav0Art Apr 3, 2026
0c9fe5e
feat: generate tests for UTF-8 decoder
Vladislav0Art Apr 7, 2026
a010de8
feat: close span at the end of `finally` block
Vladislav0Art Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Use existing providers as reference.

**Steps:**
1. Create `tracing/{provider}/` module, register it in `settings.gradle.kts`
2. Extend `LLMTracingAdapter(genAISystem)` — override `getRequestBodyAttributes`, `getResponseBodyAttributes`, `getSpanName`, `isStreamingRequest`, `handleStreaming`
2. Extend `LLMTracingAdapter(genAISystem)` — override `getRequestBodyAttributes`, `getResponseBodyAttributes`, `getSpanName`, and abstract `registerResponseStreamEvent`
3. If multiple distinct API endpoints exist, implement `EndpointApiHandler` per endpoint and delegate from the adapter
4. Add a public `instrument(client)` function — use `patchOpenAICompatibleClient()` for OpenAI-compatible SDKs, or reflection + `patchInterceptors()` for others (see `GeminiClient.kt`)
5. Write tests extending `BaseAITracingTest`, tag with `@Tag("{provider}")`
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package org.jetbrains.ai.tracy.examples.clients

import org.jetbrains.ai.tracy.anthropic.adapters.AnthropicLLMTracingAdapter
import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor
import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor
import org.jetbrains.ai.tracy.core.TracingManager
import org.jetbrains.ai.tracy.core.configureOpenTelemetrySdk
import org.jetbrains.ai.tracy.core.exporters.ConsoleExporterConfig
import org.jetbrains.ai.tracy.core.instrument
import org.jetbrains.ai.tracy.core.interceptors.instrument
import org.jetbrains.ai.tracy.gemini.adapters.GeminiLLMTracingAdapter
import org.jetbrains.ai.tracy.openai.adapters.OpenAILLMTracingAdapter
import com.openai.models.ChatModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@

package org.jetbrains.ai.tracy.anthropic.adapters

import io.opentelemetry.api.trace.Span
import io.opentelemetry.sdk.trace.ReadableSpan
import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.*
import kotlinx.serialization.json.*
import mu.KotlinLogging
import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter
import org.jetbrains.ai.tracy.core.adapters.LLMTracingAdapter.Companion.PayloadType
import org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingUnsupported
import org.jetbrains.ai.tracy.core.adapters.media.*
import org.jetbrains.ai.tracy.core.http.parsers.SseEvent
import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest
import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse
import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpUrl
Expand All @@ -16,10 +23,6 @@ import org.jetbrains.ai.tracy.core.policy.ContentKind
import org.jetbrains.ai.tracy.core.policy.contentTracingAllowed
import org.jetbrains.ai.tracy.core.policy.orRedactedInput
import org.jetbrains.ai.tracy.core.policy.orRedactedOutput
import io.opentelemetry.api.trace.Span
import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.*
import kotlinx.serialization.json.*
import mu.KotlinLogging

/**
* Tracing adapter for Anthropic Claude API.
Expand Down Expand Up @@ -52,8 +55,8 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn
val body = request.body.asJson()?.jsonObject ?: return

body["temperature"]?.jsonPrimitive?.doubleOrNull?.let { span.setAttribute(GEN_AI_REQUEST_TEMPERATURE, it) }
body["model"]?.jsonPrimitive?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it.content) }
body["max_tokens"]?.jsonPrimitive?.intOrNull?.let { span.setAttribute(GEN_AI_REQUEST_MAX_TOKENS, it.toLong()) }
body["model"]?.jsonPrimitive?.content?.let { span.setAttribute(GEN_AI_REQUEST_MODEL, it) }
body["max_tokens"]?.jsonPrimitive?.longOrNull?.let { span.setAttribute(GEN_AI_REQUEST_MAX_TOKENS, it) }

// metadata
body["metadata"]?.jsonObject?.let { metadata ->
Expand Down Expand Up @@ -127,10 +130,28 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn
val body = response.body.asJson()?.jsonObject ?: return

body["id"]?.let { span.setAttribute(GEN_AI_RESPONSE_ID, it.jsonPrimitive.content) }
body["type"]?.let { span.setAttribute(GEN_AI_OUTPUT_TYPE, it.jsonPrimitive.content) }
body["type"]?.jsonPrimitive?.content?.let {
span.setAttribute(GEN_AI_OUTPUT_TYPE, it)
span.setAttribute(GEN_AI_OPERATION_NAME, it)
}
body["role"]?.let { span.setAttribute("gen_ai.response.role", it.jsonPrimitive.content) }
body["model"]?.let { span.setAttribute(GEN_AI_RESPONSE_MODEL, it.jsonPrimitive.content) }

// update the span name to follow GenAI Anthropic Conventions
// convention: `{gen_ai.operation.name} {gen_ai.request.model}`
// see: https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/#spans
val spanName = run {
val type = body["type"]?.jsonPrimitive?.contentOrNull
val model = (span as? ReadableSpan)
?.attributes
?.get(GEN_AI_REQUEST_MODEL)
?: body["model"]
if (type != null && model != null) "$type $model" else null
}
if (spanName != null) {
span.updateName(spanName)
}

// collecting response messages
body["content"]?.let {
for ((index, message) in it.jsonArray.withIndex()) {
Expand Down Expand Up @@ -204,11 +225,26 @@ class AnthropicLLMTracingAdapter : LLMTracingAdapter(genAISystem = GenAiSystemIn
span.populateUnmappedAttributes(body, mappedAttributes, PayloadType.RESPONSE)
}

override fun getSpanName(request: TracyHttpRequest) = "Anthropic-generation"

// streaming is not supported
override fun isStreamingRequest(request: TracyHttpRequest) = false
override fun handleStreaming(span: Span, url: TracyHttpUrl, events: String) = Unit
/**
* Sets a default span name to **"Anthropic-generation"**.
*
* This name will be overridden in [getResponseBodyAttributes] to follow GenAI Conventions for Anthropic:
* ```
* {gen_ai.operation.name} {gen_ai.request.model}
* ```
*
* See [GenAI Anthropic Spans](https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/#spans)
*/
override fun getSpanName() = "Anthropic-generation"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you, please, align span naming with GenAI conventions
Span names now follow {gen_ai.operation.name} {gen_ai.request.model}:
https://opentelemetry.io/docs/specs/semconv/gen-ai/anthropic/

Copy link
Copy Markdown
Collaborator Author

@Vladislav0Art Vladislav0Art Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 9fe5749

I didn't modify this "Anthropic-generation" name here because I have no information about the model and operation (both of which are only known at the stage of parsing the response body).

Notes:

  1. Now, it looks like:

See the trace.

Screenshot 2026-03-30 at 12 14 57
  1. Pointed to naming problem in: TRACY-114 Audit attribute names used in Tracy to comply with GenAI Spec


override fun registerResponseStreamEvent(
span: Span,
url: TracyHttpUrl,
event: SseEvent,
index: Long
): Result<Unit> {
return sseHandlingUnsupported()
}

/**
* Parses content of the `messages` field when its type is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package org.jetbrains.ai.tracy.anthropic.clients

import org.jetbrains.ai.tracy.anthropic.adapters.AnthropicLLMTracingAdapter
import org.jetbrains.ai.tracy.core.OpenTelemetryOkHttpInterceptor
import org.jetbrains.ai.tracy.core.interceptors.OpenTelemetryOkHttpInterceptor
import org.jetbrains.ai.tracy.core.TracingManager
import org.jetbrains.ai.tracy.core.patchOpenAICompatibleClient
import org.jetbrains.ai.tracy.core.interceptors.patchOpenAICompatibleClient
import com.anthropic.client.AnthropicClient

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.jetbrains.ai.tracy.anthropic

import org.jetbrains.ai.tracy.anthropic.clients.instrument
import org.jetbrains.ai.tracy.core.TracingManager
import org.jetbrains.ai.tracy.core.patchOpenAICompatibleClient
import org.jetbrains.ai.tracy.core.interceptors.patchOpenAICompatibleClient
import org.jetbrains.ai.tracy.core.policy.ContentCapturePolicy
import com.anthropic.core.JsonString
import com.anthropic.core.JsonValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

package org.jetbrains.ai.tracy.core.adapters

import io.opentelemetry.api.common.AttributeKey
import org.jetbrains.ai.tracy.core.http.protocol.*
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.sdk.trace.ReadableSpan
import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes.GEN_AI_SYSTEM
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.boolean
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import mu.KotlinLogging
import org.jetbrains.ai.tracy.core.adapters.handlers.sse.SseEventHandlingUnsupported
import org.jetbrains.ai.tracy.core.http.parsers.SseEvent
import java.util.concurrent.atomic.AtomicBoolean


/**
Expand All @@ -28,15 +32,17 @@ import kotlinx.serialization.json.jsonPrimitive
* Extend this class to create a provider-specific adapter:
* ```kotlin
* class AnthropicLLMTracingAdapter : LLMTracingAdapter(GenAiSystemIncubatingValues.ANTHROPIC) {
* override fun getRequestBodyAttributes(span: Span, request: Request) {
* override fun getSpanName() = "Anthropic-generation"
*
* override fun getRequestBodyAttributes(span: Span, request: TracyHttpRequest) {
* // Parse Anthropic-specific request format
* }
* override fun getResponseBodyAttributes(span: Span, response: Response) {
* override fun getResponseBodyAttributes(span: Span, response: TracyHttpResponse) {
* // Parse Anthropic-specific response format
* }
* override fun getSpanName(request: Request) = "Anthropic-generation"
* override fun isStreamingRequest(request: Request) = false
* override fun handleStreaming(span: Span, url: Url, events: String) = Unit
* override fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent, index: Long): Result<Unit> {
* // Parse `event.data` JSON event (if successful, return `Result.success()`)
* }
* }
* ```
*
Expand All @@ -48,8 +54,19 @@ import kotlinx.serialization.json.jsonPrimitive
* @param genAISystem The name of the GenAI system (e.g., "openai", "anthropic", "gemini")
*/
abstract class LLMTracingAdapter(private val genAISystem: String) {
/**
* Some adapters do NOT support SSE (_Server-Sent Events_) handling, even if
* SSE is supported by the LLM provider API. In this case, we print a single
* warning per trace, not to pollute the logs with repeated warnings.
*
* This flag is used to track whether a warning has already been printed for
* the current trace.
*/
private var sseHandlingUnsupportedWarningPrinted = AtomicBoolean(false)

fun registerRequest(span: Span, request: TracyHttpRequest): Unit = runCatching {
span.updateName(getSpanName(request))
// new request -> new trace, so the warning can be printed once
sseHandlingUnsupportedWarningPrinted.set(false)

// Pre-allocate in case the span reaches the limit
span.setAttribute(DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY, 0L)
Expand All @@ -66,30 +83,32 @@ abstract class LLMTracingAdapter(private val genAISystem: String) {

fun registerResponse(span: Span, response: TracyHttpResponse): Unit =
runCatching {
val body = response.body.asJson()?.jsonObject ?: return
val isStreamingRequest = body["stream"]?.jsonPrimitive?.boolean == true
val mimeType = response.contentType?.mimeType

if (mimeType != null) {
when {
isStreamingRequest && mimeType == TracyContentType.Text.EventStream.mimeType -> {
span.setAttribute("gen_ai.response.streaming", true)
span.setAttribute("gen_ai.completion.content.type", response.contentType?.asString())
}
mimeType != TracyContentType.Text.EventStream.mimeType -> {
// mime type can be application/json, video/mp4 (for OpenAI Video API), etc.
getResponseBodyAttributes(span, response)
}
else -> {
span.setAttribute("gen_ai.completion.content.type", response.contentType?.asString())
}
val contentType = response.contentType
// install the content type of the response body
contentType?.asString()?.let {
span.setAttribute("gen_ai.completion.content.type", it)
}

// set response body attributes only for non-stream content types;
// stream events are handled by `registerResponseStreamEvent`
val mimeType = contentType?.mimeType
when {
mimeType != null && mimeType != TracyContentType.Text.EventStream.mimeType -> {
// register any non-SSE stream response types (application/json, video/mp4, etc.)
getResponseBodyAttributes(span, response)
}
mimeType == TracyContentType.Text.EventStream.mimeType -> {
span.setAttribute("tracy.response.sse.streaming", true)
}
else -> {
logger.debug { "Unsupported content type in LLMTracingAdapter: ${contentType?.asString()}" }
}
}

span.setAttribute("http.status_code", response.code.toLong())

if (response.isError()) {
getResponseErrorBodyAttributes(span, response.body)
getResponseErrorBodyAttributes(span, response)
span.setStatus(StatusCode.ERROR)
} else {
span.setStatus(StatusCode.OK)
Expand All @@ -108,8 +127,14 @@ abstract class LLMTracingAdapter(private val genAISystem: String) {
span.recordException(exception)
}

protected open fun getResponseErrorBodyAttributes(span: Span, body: TracyHttpResponseBody) {
body.asJson()?.jsonObject["error"]?.jsonObject?.let { error ->
protected open fun getResponseErrorBodyAttributes(span: Span, response: TracyHttpResponse) {
// parse only `application/json` responses
if (response.contentType?.mimeType != TracyContentType.Application.Json.mimeType) {
return
}
val body = response.body.asJson()?.jsonObject ?: return

body["error"]?.jsonObject?.let { error ->
error["message"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.message", it.content) }
error["type"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.type", it.content) }
error["param"]?.jsonPrimitive?.let { span.setAttribute("gen_ai.error.param", it.content) }
Expand All @@ -120,12 +145,59 @@ abstract class LLMTracingAdapter(private val genAISystem: String) {
protected abstract fun getRequestBodyAttributes(span: Span, request: TracyHttpRequest)
protected abstract fun getResponseBodyAttributes(span: Span, response: TracyHttpResponse)

abstract fun getSpanName(request: TracyHttpRequest): String
abstract fun isStreamingRequest(request: TracyHttpRequest): Boolean
abstract fun handleStreaming(span: Span, url: TracyHttpUrl, events: String)
abstract fun getSpanName(): String

/**
* Registers a server-sent events (SSE) response event in the given [span].
*
* @param span The [Span] instance in which the response event is registered.
* @param url The [TracyHttpUrl] object representing the URL associated with this SSE event.
* @param event The [SseEvent] to be registered. It represents a single event from the SSE stream.
*/
fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent) {
// factory method workflow:
// 1. extract the index of the current event from span (0 when missing)
// 2. delegate assigning to the implementation:
// - when assigned successfully, increment the index and store in span
val nextEventIndex: Long = (span as? ReadableSpan)?.attributes?.get(STREAM_EVENTS_COUNT_KEY) ?: 0L

val result = registerResponseStreamEvent(span, url, event, index = nextEventIndex)
if (result.isSuccess) {
// event was successfully assigned into span
span.setAttribute(STREAM_EVENTS_COUNT_KEY, nextEventIndex + 1)
} else if (result.isFailure) {
val exception = result.exceptionOrNull()
when {
// print unsupported warning only once per trace
exception is SseEventHandlingUnsupported && !sseHandlingUnsupportedWarningPrinted.getAndSet(true) -> {
logger.warn { "SSE event handling unsupported for ${url.asString()}" }
}
else -> logger.warn { "Failed to assign SSE event to span: $exception" }
}
}
}

/**
* Attempts to register a single SSE response event on the given [span].
*
* Implementations must:
* - return [Result.success] when the event has been successfully assigned to the span
* (for example, attributes or other data derived from [event] have been recorded)
* so that it can be counted towards the stream event index;
* - return [Result.failure] when the event cannot or should not be assigned
* (for example, due to parsing/validation errors or unsupported event type),
* optionally carrying a descriptive exception.
*
* A failure result prevents the caller from incrementing the stored SSE event index,
* and the contained exception (if any) will be logged (see `registerResponseStreamEvent(Span, TracyHttpUrl, SseEvent)`).
*/
protected abstract fun registerResponseStreamEvent(span: Span, url: TracyHttpUrl, event: SseEvent, index: Long): Result<Unit>

companion object {
private const val DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY = "otel.dropped_attributes_count"
private val DROPPED_ATTRIBUTES_COUNT_ATTRIBUTE_KEY = AttributeKey.longKey("otel.dropped_attributes_count")
private val STREAM_EVENTS_COUNT_KEY = AttributeKey.longKey("tracy.response.sse.events.count")

private val logger = KotlinLogging.logger {}

/**
* Adds unmapped payload attributes from a JSON body to the given [Span].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ package org.jetbrains.ai.tracy.core.adapters.handlers
import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpRequest
import org.jetbrains.ai.tracy.core.http.protocol.TracyHttpResponse
import io.opentelemetry.api.trace.Span
import org.jetbrains.ai.tracy.core.http.parsers.SseEvent

/**
* Interface for endpoint API handlers used within adapters
*/
interface EndpointApiHandler {
fun handleRequestAttributes(span: Span, request: TracyHttpRequest)
fun handleResponseAttributes(span: Span, response: TracyHttpResponse)
fun handleStreaming(span: Span, events: String)
/**
* Returns success if the event was handled successfully, or an error otherwise.
*
* @see org.jetbrains.ai.tracy.core.adapters.handlers.sse.sseHandlingFailure
* @see org.jetbrains.ai.tracy.core.adapters.handlers.sse.SseEventHandlingException
*/
fun handleStreamingEvent(span: Span, event: SseEvent, index: Long): Result<Unit>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright © 2026 JetBrains s.r.o. and contributors.
* Use of this source code is governed by the Apache 2.0 license.
*/

package org.jetbrains.ai.tracy.core.adapters.handlers.sse

/**
* Wraps [SseEventHandlingException] into a [Result] indicating failure.
* Should be used when handling of SSE events failed.
*
* @see org.jetbrains.ai.tracy.core.adapters.handlers.EndpointApiHandler.handleStreamingEvent
* @see sseHandlingUnsupported
*/
fun sseHandlingFailure(message: String): Result<Unit> {
return Result.failure(SseEventHandlingException(message))
}

/**
* Wraps [SseEventHandlingUnsupported] into a [Result] indicating
* that SSE handling is not supported in this call site.
*
* @see sseHandlingFailure
*/
fun sseHandlingUnsupported(): Result<Unit> {
return Result.failure(SseEventHandlingUnsupported())
}

class SseEventHandlingException(message: String, cause: Throwable? = null) : RuntimeException(message, cause)

class SseEventHandlingUnsupported : RuntimeException("SSE event handling is not supported yet")
Loading