Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion packages/components/src/handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { OTLPTraceExporter as ProtoOTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto'
import { getPhoenixTracer } from './handler'
import { getPhoenixTracer, AnalyticHandler } from './handler'
import { resetTracingEnvCache } from './tracingEnv'

jest.mock('@opentelemetry/exporter-trace-otlp-proto', () => {
return {
Expand All @@ -9,6 +10,42 @@ jest.mock('@opentelemetry/exporter-trace-otlp-proto', () => {
}
})

// Track every RunTree constructed so tests can assert per-instance lifecycle calls.
// Prefixed with `mock` so Jest allows referencing it from inside the mock factory.
const mockRunTreeInstances: Array<{
id: string
config: any
postRun: jest.Mock
patchRun: jest.Mock
end: jest.Mock
createChild: jest.Mock
}> = []

jest.mock('langsmith', () => {
let counter = 0
class MockRunTree {
id: string
config: any
postRun: jest.Mock
patchRun: jest.Mock
end: jest.Mock
createChild: jest.Mock
constructor(config: any) {
this.config = config
this.id = `run-${++counter}`
this.postRun = jest.fn().mockResolvedValue(undefined)
this.patchRun = jest.fn().mockResolvedValue(undefined)
this.end = jest.fn().mockResolvedValue(undefined)
this.createChild = jest.fn().mockImplementation(async (childCfg: any) => new MockRunTree(childCfg))
mockRunTreeInstances.push(this as any)
}
}
return {
Client: jest.fn().mockImplementation(() => ({})),
RunTree: MockRunTree
}
})

describe('URL Handling For Phoenix Tracer', () => {
const apiKey = 'test-api-key'
const projectName = 'test-project-name'
Expand Down Expand Up @@ -331,3 +368,69 @@ describe('onLLMEnd Usage Metadata Extraction Logic', () => {
})
})
})

/**
* Regression coverage for the chainRun map clobbering bug.
*
* AnalyticHandler is a per-chatId singleton, so recursive executeAgentFlow calls (e.g. an
* iterationAgentflow sub-flow) re-enter `onChainStart` on the same instance. The previous
* implementation replaced `this.handlers['langSmith'].chainRun` wholesale on every call, which
* dropped the outer call's parent RunTree. When control returned to the outer flow, its
* `onChainEnd` lookup missed and the outer run was never `patchRun`-ed — leaving an open run in
* LangSmith and breaking context propagation for any subsequent nodes.
*/
describe('AnalyticHandler chainRun map on recursive onChainStart', () => {
const LANGSMITH_KEYS = ['LANGSMITH_TRACING', 'LANGSMITH_API_KEY', 'LANGSMITH_ENDPOINT', 'LANGSMITH_PROJECT'] as const
let envSnapshot: Partial<Record<(typeof LANGSMITH_KEYS)[number], string | undefined>>

beforeEach(() => {
envSnapshot = {}
for (const k of LANGSMITH_KEYS) envSnapshot[k] = process.env[k]
for (const k of LANGSMITH_KEYS) delete process.env[k]
process.env.LANGSMITH_TRACING = 'true'
process.env.LANGSMITH_API_KEY = 'test-key'
resetTracingEnvCache()
mockRunTreeInstances.length = 0
})

afterEach(() => {
for (const k of LANGSMITH_KEYS) delete process.env[k]
for (const k of LANGSMITH_KEYS) {
const v = envSnapshot[k]
if (v !== undefined) process.env[k] = v
}
resetTracingEnvCache()
AnalyticHandler.resetInstance('recursive-chain-test')
})

it('preserves the outer parent RunTree when a nested onChainStart runs on the same instance', async () => {
const chatId = 'recursive-chain-test'
const handler = AnalyticHandler.getInstance({ inputs: {} } as any, { chatId })
await handler.init()

// Outer flow's onChainStart — simulates the first executeAgentFlow call.
const outerIds = await handler.onChainStart('OuterFlow', 'outer input')
expect(outerIds.langSmith.chainRun).toBeDefined()

// Recursive executeAgentFlow (iterationAgentflow) re-enters on the same singleton.
// No parentIds passed, matching the real call site in buildAgentflow.ts.
const innerIds = await handler.onChainStart('InnerFlow', 'inner input')
expect(innerIds.langSmith.chainRun).toBeDefined()
expect(innerIds.langSmith.chainRun).not.toBe(outerIds.langSmith.chainRun)

// Inner flow completes first.
await handler.onChainEnd(innerIds, 'inner output')

// Outer flow must still be able to end its own run.
await handler.onChainEnd(outerIds, 'outer output', true)

// Both the outer and inner RunTree instances should have been patched exactly once —
// before the fix, the outer's patchRun was skipped because the map entry was overwritten.
expect(mockRunTreeInstances).toHaveLength(2)
const [outerRun, innerRun] = mockRunTreeInstances
expect(outerRun.patchRun).toHaveBeenCalledTimes(1)
expect(innerRun.patchRun).toHaveBeenCalledTimes(1)
expect(outerRun.end).toHaveBeenCalledWith({ outputs: { output: 'outer output' } })
expect(innerRun.end).toHaveBeenCalledWith({ outputs: { output: 'inner output' } })
})
})
45 changes: 33 additions & 12 deletions packages/components/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { Resource } from '@opentelemetry/resources'
import { SimpleSpanProcessor, Tracer } from '@opentelemetry/sdk-trace-base'
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'
import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions'

import { BaseCallbackHandler, NewTokenIndices, HandleLLMNewTokenCallbackFields } from '@langchain/core/callbacks/base'
import * as CallbackManagerModule from '@langchain/core/callbacks/manager'
import { LangChainTracer, LangChainTracerFields } from '@langchain/core/tracers/tracer_langchain'
Expand All @@ -26,6 +25,7 @@ import { AgentAction } from '@langchain/core/agents'
import { LunaryHandler } from '@langchain/community/callbacks/handlers/lunary'

import { getCredentialData, getCredentialParam, getEnvironmentVariable } from './utils'
import { applyEnvTracingProviders, tracingEnvEnabled } from './tracingEnv'
import { EvaluationRunTracer } from '../evaluation/EvaluationRunTracer'
import { EvaluationRunTracerLlama } from '../evaluation/EvaluationRunTracerLlama'
import { ICommonObject, IDatabaseEntity, INodeData, IServerSideEventStreamer } from './Interface'
Expand Down Expand Up @@ -515,16 +515,17 @@ class ExtendedLunaryHandler extends LunaryHandler {

export const additionalCallbacks = async (nodeData: INodeData, options: ICommonObject) => {
try {
if (!options.analytic) return []
if (!options.analytic && !tracingEnvEnabled()) return []

const analytic = JSON.parse(options.analytic)
const initial = options.analytic ? JSON.parse(options.analytic) : {}
const { analytic, envCredentials } = applyEnvTracingProviders(initial)
const callbacks: any = []

for (const provider in analytic) {
const providerStatus = analytic[provider].status as boolean
if (providerStatus) {
const credentialId = analytic[provider].credentialId as string
const credentialData = await getCredentialData(credentialId ?? '', options)
const credentialData =
envCredentials[provider] ?? (await getCredentialData((analytic[provider].credentialId as string) ?? '', options))
if (provider === 'langSmith') {
const langSmithProject = analytic[provider].projectName as string

Expand Down Expand Up @@ -774,23 +775,31 @@ export class AnalyticHandler {
if (this.initialized) return

try {
if (!this.options.analytic) return
const hasAnalyticsConfig = Boolean(this.options.analytic)
if (!hasAnalyticsConfig && !tracingEnvEnabled()) return

const analytic = JSON.parse(this.options.analytic)
const initial = hasAnalyticsConfig ? JSON.parse(this.options.analytic) : {}
const { analytic, envCredentials } = applyEnvTracingProviders(initial)
for (const provider in analytic) {
const providerStatus = analytic[provider].status as boolean
if (providerStatus) {
const credentialId = analytic[provider].credentialId as string
const credentialData = await getCredentialData(credentialId ?? '', this.options)
const credentialData =
envCredentials[provider] ??
(await getCredentialData((analytic[provider].credentialId as string) ?? '', this.options))
await this.initializeProvider(provider, analytic[provider], credentialData)
}
}

this.initialized = true
} catch (e) {
throw new Error(e)
}
}

hasActiveProviders(): boolean {
return Object.keys(this.handlers).length > 0
}

// Add getter for handlers (useful for debugging)
getHandlers(): ICommonObject {
return this.handlers
Expand All @@ -807,7 +816,10 @@ export class AnalyticHandler {
apiKey: langSmithApiKey
})

this.handlers['langSmith'] = { client, langSmithProject }
this.handlers['langSmith'] = {
client,
langSmithProject
}
} else if (provider === 'langFuse') {
const release = providerConfig.release as string
const langFuseSecretKey = getCredentialParam('langFuseSecretKey', credentialData, this.nodeData)
Expand Down Expand Up @@ -927,7 +939,13 @@ export class AnalyticHandler {
}
const parentRun = new RunTree(parentRunConfig)
await parentRun.postRun()
this.handlers['langSmith'].chainRun = { [parentRun.id]: parentRun }
// Merge rather than replace: AnalyticHandler is a per-chatId singleton, so recursive
// executeAgentFlow calls (e.g. iterationAgentflow) re-enter onChainStart on the same
// instance. Replacing would drop the outer call's parent entry and orphan its trace.
this.handlers['langSmith'].chainRun = {
...this.handlers['langSmith'].chainRun,
[parentRun.id]: parentRun
}
returnIds['langSmith'].chainRun = parentRun.id
} else {
const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun]
Expand All @@ -940,7 +958,10 @@ export class AnalyticHandler {
}
})
await childChainRun.postRun()
this.handlers['langSmith'].chainRun = { [childChainRun.id]: childChainRun }
this.handlers['langSmith'].chainRun = {
...this.handlers['langSmith'].chainRun,
[childChainRun.id]: childChainRun
}
returnIds['langSmith'].chainRun = childChainRun.id
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/components/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './textToSpeech'
export * from './storageUtils'
export * from './storage'
export * from './handler'
export { tracingEnvEnabled } from './tracingEnv'
export * from '../evaluation/EvaluationRunner'
export * from './followUpPrompts'
export * from './validator'
Expand Down
Loading
Loading