Skip to content

Commit 94da067

Browse files
authored
feat(llmobs): add user llm observability span processor to modify spans (#6329)
1 parent 38a72cc commit 94da067

File tree

9 files changed

+337
-51
lines changed

9 files changed

+337
-51
lines changed

docs/test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,27 @@ llmobs.enable({
603603
// manually disable
604604
llmobs.disable()
605605

606+
// register a processor
607+
llmobs.registerProcessor((llmobsSpan) => {
608+
const drop = llmobsSpan.getTag('drop')
609+
if (drop) {
610+
return null
611+
}
612+
613+
const redactInput = llmobsSpan.getTag('redactInput')
614+
if (redactInput) {
615+
llmobsSpan.input = llmobsSpan.input.map(input => {
616+
return {
617+
...input,
618+
}
619+
})
620+
}
621+
622+
return llmobsSpan
623+
})
624+
625+
llmobs.deregisterProcessor()
626+
606627
// trace block of code
607628
llmobs.trace({ name: 'name', kind: 'llm' }, () => {})
608629
llmobs.trace({ kind: 'llm', name: 'myLLM', modelName: 'myModel', modelProvider: 'myProvider' }, () => {})

index.d.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2566,6 +2566,25 @@ declare namespace tracer {
25662566
annotate (options: llmobs.AnnotationOptions): void
25672567
annotate (span: tracer.Span | undefined, options: llmobs.AnnotationOptions): void
25682568

2569+
/**
2570+
* Register a processor to be called on each LLMObs span.
2571+
*
2572+
* This can be used to modify the span before it is sent to LLMObs. For example, you can modify the input/output.
2573+
* You can also return `null` to omit the span entirely from being sent to LLM Observability.
2574+
*
2575+
* Otherwise, if the return value from the processor is not an instance of `LLMObservabilitySpan`, the span will be dropped.
2576+
*
2577+
* To deregister the processor, call `llmobs.deregisterProcessor()`
2578+
* @param processor A function that will be called for each span.
2579+
* @throws {Error} If a processor is already registered.
2580+
*/
2581+
registerProcessor (processor: ((span: LLMObservabilitySpan) => LLMObservabilitySpan | null)): void
2582+
2583+
/**
2584+
* Deregister a processor.
2585+
*/
2586+
deregisterProcessor (): void
2587+
25692588
/**
25702589
* Submits a custom evaluation metric for a given span ID and trace ID.
25712590
* @param spanContext The span context of the span to submit the evaluation metric for.
@@ -2579,6 +2598,25 @@ declare namespace tracer {
25792598
flush (): void
25802599
}
25812600

2601+
interface LLMObservabilitySpan {
2602+
/**
2603+
* The input content associated with the span.
2604+
*/
2605+
input: { content: string, role?: string }[]
2606+
2607+
/**
2608+
* The output content associated with the span.
2609+
*/
2610+
output: { content: string, role?: string }[]
2611+
2612+
/**
2613+
* Get a tag from the span.
2614+
* @param key The key of the tag to get.
2615+
* @returns The value of the tag, or `undefined` if the tag does not exist.
2616+
*/
2617+
getTag (key: string): string | undefined
2618+
}
2619+
25822620
interface EvaluationOptions {
25832621
/**
25842622
* The name of the evaluation metric

packages/dd-trace/src/llmobs/index.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const spanProcessCh = channel('dd-trace:span:process')
1616
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
1717
const flushCh = channel('llmobs:writers:flush')
1818
const injectCh = channel('dd-trace:span:inject')
19+
const registerUserSpanProcessorCh = channel('llmobs:register-processor')
1920

2021
const LLMObsEvalMetricsWriter = require('./writers/evaluations')
2122
const LLMObsTagger = require('./tagger')
@@ -56,6 +57,7 @@ function enable (config) {
5657

5758
evalMetricAppendCh.subscribe(handleEvalMetricAppend)
5859
flushCh.subscribe(handleFlush)
60+
registerUserSpanProcessorCh.subscribe(handleRegisterProcessor)
5961

6062
// span processing
6163
spanProcessor = new LLMObsSpanProcessor(config)
@@ -86,6 +88,7 @@ function disable () {
8688
if (flushCh.hasSubscribers) flushCh.unsubscribe(handleFlush)
8789
if (spanProcessCh.hasSubscribers) spanProcessCh.unsubscribe(handleSpanProcess)
8890
if (injectCh.hasSubscribers) injectCh.unsubscribe(handleLLMObsParentIdInjection)
91+
if (registerUserSpanProcessorCh.hasSubscribers) registerUserSpanProcessorCh.unsubscribe(handleRegisterProcessor)
8992

9093
spanWriter?.destroy()
9194
evalWriter?.destroy()
@@ -126,6 +129,10 @@ function handleFlush () {
126129
telemetry.recordUserFlush(err)
127130
}
128131

132+
function handleRegisterProcessor (userSpanProcessor) {
133+
spanProcessor.setUserSpanProcessor(userSpanProcessor)
134+
}
135+
129136
function handleSpanProcess (data) {
130137
spanProcessor.process(data)
131138
}

packages/dd-trace/src/llmobs/sdk.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@ const LLMObsTagger = require('./tagger')
2323
const { channel } = require('dc-polyfill')
2424
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
2525
const flushCh = channel('llmobs:writers:flush')
26+
const registerUserSpanProcessorCh = channel('llmobs:register-processor')
2627
const NoopLLMObs = require('./noop')
2728

2829
class LLMObs extends NoopLLMObs {
30+
/**
31+
* flag representing if a user span processor has been registered
32+
* @type {boolean}
33+
*/
34+
#hasUserSpanProcessor = false
35+
2936
constructor (tracer, llmobsModule, config) {
3037
super(tracer)
3138

@@ -309,6 +316,27 @@ class LLMObs extends NoopLLMObs {
309316
}
310317
}
311318

319+
registerProcessor (processor) {
320+
if (!this.enabled) return
321+
322+
if (this.#hasUserSpanProcessor) {
323+
throw new Error(
324+
'[LLMObs] Only one user span processor can be registered. ' +
325+
'To register a new processor, deregister the existing processor first using `llmobs.deregisterProcessor()`.'
326+
)
327+
}
328+
329+
this.#hasUserSpanProcessor = true
330+
registerUserSpanProcessorCh.publish(processor)
331+
}
332+
333+
deregisterProcessor () {
334+
if (!this.enabled) return
335+
336+
this.#hasUserSpanProcessor = false
337+
registerUserSpanProcessorCh.publish(null)
338+
}
339+
312340
submitEvaluation (llmobsSpanContext, options = {}) {
313341
if (!this.enabled) return
314342

0 commit comments

Comments
 (0)