Skip to content

Commit 80ea153

Browse files
sabrennerBridgeAR
andauthored
chore(llmobs): move span processing hook to on span finish instead of core processor (#6875)
* move llmobs span processing to span finish * fix "integration" test file --------- Co-authored-by: Ruben Bridgewater <ruben@bridgewater.de>
1 parent e760b3b commit 80ea153

File tree

10 files changed

+73
-65
lines changed

10 files changed

+73
-65
lines changed

packages/dd-trace/src/llmobs/index.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const telemetry = require('./telemetry')
1212
const LLMObsSpanProcessor = require('./span_processor')
1313

1414
const { channel } = require('dc-polyfill')
15-
const spanProcessCh = channel('dd-trace:span:process')
15+
const spanFinishCh = channel('dd-trace:span:finish')
1616
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
1717
const flushCh = channel('llmobs:writers:flush')
1818
const injectCh = channel('dd-trace:span:inject')
@@ -62,7 +62,7 @@ function enable (config) {
6262
// span processing
6363
spanProcessor = new LLMObsSpanProcessor(config)
6464
spanProcessor.setWriter(spanWriter)
65-
spanProcessCh.subscribe(handleSpanProcess)
65+
spanFinishCh.subscribe(handleSpanProcess)
6666

6767
// distributed tracing for llmobs
6868
injectCh.subscribe(handleLLMObsParentIdInjection)
@@ -86,7 +86,7 @@ function enable (config) {
8686
function disable () {
8787
if (evalMetricAppendCh.hasSubscribers) evalMetricAppendCh.unsubscribe(handleEvalMetricAppend)
8888
if (flushCh.hasSubscribers) flushCh.unsubscribe(handleFlush)
89-
if (spanProcessCh.hasSubscribers) spanProcessCh.unsubscribe(handleSpanProcess)
89+
if (spanFinishCh.hasSubscribers) spanFinishCh.unsubscribe(handleSpanProcess)
9090
if (injectCh.hasSubscribers) injectCh.unsubscribe(handleLLMObsParentIdInjection)
9191
if (registerUserSpanProcessorCh.hasSubscribers) registerUserSpanProcessorCh.unsubscribe(handleRegisterProcessor)
9292

@@ -133,8 +133,8 @@ function handleRegisterProcessor (userSpanProcessor) {
133133
spanProcessor.setUserSpanProcessor(userSpanProcessor)
134134
}
135135

136-
function handleSpanProcess (data) {
137-
spanProcessor.process(data)
136+
function handleSpanProcess (span) {
137+
spanProcessor.process(span)
138138
}
139139

140140
function handleEvalMetricAppend (payload) {

packages/dd-trace/src/llmobs/span_processor.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class LLMObsSpanProcessor {
7272
}
7373

7474
// TODO: instead of relying on the tagger's weakmap registry, can we use some namespaced storage correlation?
75-
process ({ span }) {
75+
process (span) {
7676
if (!this.#config.llmobs.enabled) return
7777
// if the span is not in our private tagger map, it is not an llmobs span
7878
if (!LLMObsTagger.tagMap.has(span)) return

packages/dd-trace/src/span_processor.js

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ const { getEnvironmentVariable } = require('./config-helper')
99
const startedSpans = new WeakSet()
1010
const finishedSpans = new WeakSet()
1111

12-
const { channel } = require('dc-polyfill')
13-
const spanProcessCh = channel('dd-trace:span:process')
14-
1512
class SpanProcessor {
1613
constructor (exporter, prioritySampler, config) {
1714
this._exporter = exporter
@@ -57,8 +54,6 @@ class SpanProcessor {
5754
isChunkRoot = false
5855
this._stats?.onSpanFinished(formattedSpan)
5956
formatted.push(formattedSpan)
60-
61-
spanProcessCh.publish({ span })
6257
}
6358
}
6459

packages/dd-trace/test/llmobs/index.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const proxyquire = require('proxyquire')
88

99
const AgentInfoExporter = require('../../src/exporters/common/agent-info-exporter')
1010

11-
const spanProcessCh = channel('dd-trace:span:process')
11+
const spanFinishCh = channel('dd-trace:span:finish')
1212
const evalMetricAppendCh = channel('llmobs:eval-metric:append')
1313
const flushCh = channel('llmobs:writers:flush')
1414
const injectCh = channel('dd-trace:span:inject')
@@ -260,7 +260,7 @@ describe('module', () => {
260260

261261
expect(injectCh.hasSubscribers).to.be.false
262262
expect(evalMetricAppendCh.hasSubscribers).to.be.false
263-
expect(spanProcessCh.hasSubscribers).to.be.false
263+
expect(spanFinishCh.hasSubscribers).to.be.false
264264
expect(flushCh.hasSubscribers).to.be.false
265265
})
266266
})

packages/dd-trace/test/llmobs/plugins/ai/index.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ describe('Plugin', () => {
402402

403403
const toolCallId = result.steps[0].toolCalls[0].toolCallId
404404

405-
const { apmSpans, llmobsSpans } = await getEvents()
405+
const { apmSpans, llmobsSpans } = await getEvents(4)
406406

407407
let expectedFinalOutput
408408

@@ -565,7 +565,7 @@ describe('Plugin', () => {
565565
const steps = stepsPromise.status.value
566566
const toolCallId = steps[0].toolCalls[0].toolCallId
567567

568-
const { apmSpans, llmobsSpans } = await getEvents()
568+
const { apmSpans, llmobsSpans } = await getEvents(4)
569569

570570
let expectedFinalOutput
571571

packages/dd-trace/test/llmobs/plugins/langchain/index.spec.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ describe('integrations', () => {
409409

410410
await chain.invoke({ input: 'Can you tell me about LangSmith?' })
411411

412-
const { apmSpans, llmobsSpans } = await getEvents()
412+
const { apmSpans, llmobsSpans } = await getEvents(2)
413413

414414
const workflowSpan = apmSpans[0]
415415
const llmSpan = apmSpans[1]
@@ -497,7 +497,7 @@ describe('integrations', () => {
497497
})
498498
assert.ok(result)
499499

500-
const { apmSpans, llmobsSpans } = await getEvents()
500+
const { apmSpans, llmobsSpans } = await getEvents(5)
501501

502502
const topLevelWorkflow = apmSpans[0]
503503
const firstSubWorkflow = apmSpans[1]
@@ -604,7 +604,7 @@ describe('integrations', () => {
604604

605605
await chain.batch(['chickens', 'dogs'])
606606

607-
const { apmSpans, llmobsSpans } = await getEvents()
607+
const { apmSpans, llmobsSpans } = await getEvents(3)
608608

609609
const workflowSpan = apmSpans[0]
610610
const firstLLMSpan = apmSpans[1]
@@ -677,7 +677,7 @@ describe('integrations', () => {
677677
input: 'What is the powerhouse of the cell?'
678678
})
679679

680-
const { apmSpans, llmobsSpans } = await getEvents()
680+
const { apmSpans, llmobsSpans } = await getEvents(2)
681681

682682
const workflowSpan = apmSpans[0]
683683
const llmSpan = apmSpans[1]
@@ -760,7 +760,7 @@ describe('integrations', () => {
760760

761761
await chain.invoke({ foo: 'bar' })
762762

763-
const { apmSpans, llmobsSpans } = await getEvents()
763+
const { apmSpans, llmobsSpans } = await getEvents(3)
764764

765765
const workflowSpan = apmSpans[0]
766766
const taskSpan = apmSpans[1]
@@ -893,7 +893,7 @@ describe('integrations', () => {
893893
it('submits a retrieval span with a child embedding span for similaritySearch', async () => {
894894
await vectorstore.similaritySearch('Biology')
895895

896-
const { apmSpans, llmobsSpans } = await getEvents()
896+
const { apmSpans, llmobsSpans } = await getEvents(2)
897897

898898
// first call was for the embedding span in the beforeEach
899899
const retrievalSpanEvent = llmobsSpans[0]
@@ -918,7 +918,7 @@ describe('integrations', () => {
918918
it('submits a retrieval span with a child embedding span for similaritySearchWithScore', async () => {
919919
await vectorstore.similaritySearchWithScore('Biology')
920920

921-
const { apmSpans, llmobsSpans } = await getEvents()
921+
const { apmSpans, llmobsSpans } = await getEvents(2)
922922

923923
// first call was for the embedding span in the beforeEach
924924
const retrievalSpanEvent = llmobsSpans[0]

packages/dd-trace/test/llmobs/sdk/integration.spec.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const sinon = require('sinon')
66
const { useLlmObs, assertLlmObsSpanEvent } = require('../util')
77

88
const assert = require('node:assert')
9+
const agent = require('../../plugins/agent')
910

1011
function getTag (llmobsSpan, tagName) {
1112
const tag = llmobsSpan.tags.find(tag => tag.split(':')[0] === tagName)
@@ -37,7 +38,7 @@ describe('end to end sdk integration tests', () => {
3738

3839
assert.equal(result, 'boom')
3940

40-
const { apmSpans, llmobsSpans } = await getEvents()
41+
const { apmSpans, llmobsSpans } = await getEvents(2)
4142
assert.equal(apmSpans.length, 3)
4243
assert.equal(llmobsSpans.length, 2)
4344

@@ -86,7 +87,7 @@ describe('end to end sdk integration tests', () => {
8687

8788
agent('my custom input')
8889

89-
const { apmSpans, llmobsSpans } = await getEvents()
90+
const { apmSpans, llmobsSpans } = await getEvents(2)
9091
assert.equal(apmSpans.length, 3)
9192
assert.equal(llmobsSpans.length, 2)
9293

@@ -167,7 +168,7 @@ describe('end to end sdk integration tests', () => {
167168
llmobs.trace({ kind: 'workflow', name: 'child' }, () => {})
168169
})
169170

170-
const { llmobsSpans } = await getEvents()
171+
const { llmobsSpans } = await getEvents(2)
171172
assert.equal(llmobsSpans.length, 2)
172173

173174
assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'test')
@@ -185,7 +186,7 @@ describe('end to end sdk integration tests', () => {
185186
llmobs.trace({ kind: 'workflow', name: 'child' }, () => {})
186187
})
187188

188-
const { llmobsSpans } = await getEvents()
189+
const { llmobsSpans } = await getEvents(2)
189190
assert.equal(llmobsSpans.length, 2)
190191

191192
assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'span-level-ml-app')
@@ -213,7 +214,7 @@ describe('end to end sdk integration tests', () => {
213214
llmobs.trace({ kind: 'workflow', name: 'child-2' }, () => {})
214215
})
215216

216-
const { llmobsSpans } = await getEvents()
217+
const { llmobsSpans } = await getEvents(3)
217218
assert.equal(llmobsSpans.length, 3)
218219

219220
assert.equal(getTag(llmobsSpans[0], 'ml_app'), 'test')
@@ -291,6 +292,7 @@ describe('end to end sdk integration tests', () => {
291292

292293
beforeEach(() => {
293294
llmobs.registerProcessor(processor)
295+
agent.reset() // make sure llmobs requests are cleared
294296
})
295297

296298
it('does not submit the span', async () => {
@@ -333,7 +335,7 @@ describe('end to end sdk integration tests', () => {
333335
})
334336
})
335337

336-
const { llmobsSpans } = await getEvents()
338+
const { llmobsSpans } = await getEvents(2)
337339
assert.equal(llmobsSpans.length, 2)
338340

339341
assert.equal(llmobsSpans[0].meta.input.value, 'REDACTED')
@@ -357,7 +359,7 @@ describe('end to end sdk integration tests', () => {
357359
llmobs.trace({ kind: 'workflow', name: 'afterAnnotationContext' }, () => {})
358360
})
359361

360-
const { llmobsSpans } = await getEvents()
362+
const { llmobsSpans } = await getEvents(6)
361363
assert.equal(llmobsSpans.length, 6)
362364

363365
assert.equal(getTag(llmobsSpans[0], 'foo'), undefined)

packages/dd-trace/test/llmobs/span_processor.spec.js

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ describe('span processor', () => {
3838
it('should do nothing if llmobs is not enabled', () => {
3939
processor = new LLMObsSpanProcessor({ llmobs: { enabled: false } })
4040

41-
expect(() => processor.process({ span })).not.to.throw()
41+
expect(() => processor.process(span)).not.to.throw()
4242
})
4343

4444
it('should do nothing if the span is not an llm obs span', () => {
@@ -71,7 +71,7 @@ describe('span processor', () => {
7171
'_ml_obs.llmobs_parent_id': '1234'
7272
})
7373

74-
processor.process({ span })
74+
processor.process(span)
7575
const payload = writer.append.getCall(0).firstArg
7676

7777
expect(payload).to.deep.equal({
@@ -140,7 +140,7 @@ describe('span processor', () => {
140140
'_ml_obs.meta.metadata': metadata
141141
})
142142

143-
processor.process({ span })
143+
processor.process(span)
144144
const payload = writer.append.getCall(0).firstArg
145145

146146
expect(payload.meta.metadata).to.deep.equal({
@@ -167,7 +167,7 @@ describe('span processor', () => {
167167
'_ml_obs.meta.output.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }]
168168
})
169169

170-
processor.process({ span })
170+
processor.process(span)
171171
const payload = writer.append.getCall(0).firstArg
172172

173173
expect(payload.meta.output.documents).to.deep.equal([{
@@ -194,7 +194,7 @@ describe('span processor', () => {
194194
'_ml_obs.meta.input.documents': [{ text: 'hello', name: 'myDoc', id: '1', score: 0.6 }]
195195
})
196196

197-
processor.process({ span })
197+
processor.process(span)
198198
const payload = writer.append.getCall(0).firstArg
199199

200200
expect(payload.meta.input.documents).to.deep.equal([{
@@ -221,7 +221,7 @@ describe('span processor', () => {
221221
'_ml_obs.meta.model_name': 'myModel'
222222
})
223223

224-
processor.process({ span })
224+
processor.process(span)
225225
const payload = writer.append.getCall(0).firstArg
226226

227227
expect(payload.meta.model_provider).to.equal('custom')
@@ -246,7 +246,7 @@ describe('span processor', () => {
246246
'_ml_obs.meta.span.kind': 'llm'
247247
})
248248

249-
processor.process({ span })
249+
processor.process(span)
250250
const payload = writer.append.getCall(0).firstArg
251251

252252
expect(payload.meta['error.message']).to.equal('error message')
@@ -274,7 +274,7 @@ describe('span processor', () => {
274274
'_ml_obs.meta.span.kind': 'llm'
275275
})
276276

277-
processor.process({ span })
277+
processor.process(span)
278278
const payload = writer.append.getCall(0).firstArg
279279

280280
expect(payload.meta['error.message']).to.equal('error message')
@@ -302,7 +302,7 @@ describe('span processor', () => {
302302
'_ml_obs.name': 'mySpan'
303303
})
304304

305-
processor.process({ span })
305+
processor.process(span)
306306
const payload = writer.append.getCall(0).firstArg
307307

308308
expect(payload.name).to.equal('mySpan')
@@ -324,7 +324,7 @@ describe('span processor', () => {
324324
'_ml_obs.session_id': '1234'
325325
})
326326

327-
processor.process({ span })
327+
processor.process(span)
328328
const payload = writer.append.getCall(0).firstArg
329329

330330
expect(payload.session_id).to.equal('1234')
@@ -347,7 +347,7 @@ describe('span processor', () => {
347347
'_ml_obs.tags': { hostname: 'localhost', foo: 'bar', source: 'mySource' }
348348
})
349349

350-
processor.process({ span })
350+
processor.process(span)
351351
const payload = writer.append.getCall(0).firstArg
352352

353353
expect(payload.tags).to.include('foo:bar')

0 commit comments

Comments
 (0)