From ae0bcd415314aec3485a5b5740d42266a11f9d4a Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Wed, 10 Jul 2024 18:26:07 +0000 Subject: [PATCH 1/3] Fix issue where failed paths are written with parent path due to rethrows. --- js/core/src/tracing/instrumentation.ts | 61 +++++++++++-------- js/plugins/google-cloud/tests/metrics_test.ts | 43 ++++++++++++- 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index b1ff3b49b5..5b74684512 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -87,14 +87,12 @@ export async function runInNewSpan( async (otSpan) => { if (opts.labels) otSpan.setAttributes(opts.labels); try { - const parentPath = parentStep?.path || ''; - const stepType = - opts.labels && opts.labels['genkit:type'] - ? `,t:${opts.labels['genkit:type']}` - : ''; - opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`; - - const pathCount = getCurrentPathCount(); + opts.metadata.path = buildPath( + opts.metadata.name, + parentStep?.path || '', + opts.labels + ); + const output = await spanMetadataAls.run(opts.metadata, () => fn(opts.metadata, otSpan, isInRoot) ); @@ -103,27 +101,12 @@ export async function runInNewSpan( } opts.metadata.path = decoratePathWithSubtype(opts.metadata); - if (pathCount == getCurrentPathCount()) { - const now = performance.now(); - const start = traceMetadataAls.getStore()?.timestamp || now; - traceMetadataAls.getStore()?.paths?.add({ - path: opts.metadata.path, - status: 'success', - latency: now - start, - }); - } + recordPath(opts.metadata.path); return output; } catch (e) { opts.metadata.path = decoratePathWithSubtype(opts.metadata); - const now = performance.now(); - const start = traceMetadataAls.getStore()?.timestamp || now; - traceMetadataAls.getStore()?.paths?.add({ - path: opts.metadata.path, - status: 'failure', - error: (e as any).name, - latency: now - start, - }); + recordPath(opts.metadata.path, e); opts.metadata.state = 'error'; otSpan.setStatus({ code: SpanStatusCode.ERROR, @@ -216,6 +199,34 @@ function getCurrentPathCount(): number { return traceMetadataAls.getStore()?.paths?.size || 0; } +function buildPath( + name: string, + parentPath: string, + labels?: Record +) { + const stepType = + labels && labels['genkit:type'] ? `,t:${labels['genkit:type']}` : ''; + return parentPath + `/{${name}${stepType}}`; +} + +function recordPath(path: string, err?: any) { + // Only add the path if a child has not already been added. In the event that + // an error is rethrown, we don't want to add each step in the unwind. + const paths = Array.from( + traceMetadataAls.getStore()?.paths || new Set() + ); + if (!paths.some((p) => p.path.startsWith(path))) { + const now = performance.now(); + const start = traceMetadataAls.getStore()?.timestamp || now; + traceMetadataAls.getStore()?.paths?.add({ + path, + status: err ? 'failure' : 'success', + error: err?.name, + latency: now - start, + }); + } +} + function decoratePathWithSubtype(metadata: SpanMetadata): string { if (!metadata.path) { return ''; diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index f278e269ab..f3253c7ee8 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -406,7 +406,7 @@ describe('GoogleCloudMetrics', () => { }); }); - it('writes flow path failure metrics', async () => { + it('writes flow path failure metrics in root', async () => { const flow = createFlow('testFlow', async () => { const subPath = await run('sub-action', async () => { return 'done'; @@ -440,6 +440,47 @@ describe('GoogleCloudMetrics', () => { ]); }); + it('writes flow path failure metrics in subaction', async () => { + const flow = createFlow('testFlow', async () => { + const subPath1 = await run('sub-action-1', async () => { + const subPath2 = await run('sub-action-2', async () => { + return Promise.reject(new Error('failed')); + }); + return 'done'; + }); + return 'done'; + }); + + assert.rejects(async () => { + await runFlow(flow); + }); + + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); + const reqStatuses = reqPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(reqStatuses, [ + [ + '/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}', + 'failure', + ], + ]); + const latencyPoints = await getHistogramDataPoints( + 'genkit/flow/path/latency' + ); + const latencyStatuses = latencyPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(latencyStatuses, [ + [ + '/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}', + 'failure', + ], + ]); + }); + it('writes flow path failure in sub-action metrics', async () => { const flow = createFlow('testFlow', async () => { const subPath1 = await run('sub-action-1', async () => { From 82b60d3bf14fdcc1799d55dda1501c60e86c012b Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Wed, 10 Jul 2024 18:35:16 +0000 Subject: [PATCH 2/3] Remove unused function. --- js/core/src/tracing/instrumentation.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 5b74684512..827c3bd127 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -195,10 +195,6 @@ function getCurrentSpan(): SpanMetadata { return step; } -function getCurrentPathCount(): number { - return traceMetadataAls.getStore()?.paths?.size || 0; -} - function buildPath( name: string, parentPath: string, From 987f20bfef1f2d152186631ac883a47437c4d07c Mon Sep 17 00:00:00 2001 From: Bryan Atkinson Date: Wed, 10 Jul 2024 19:06:42 +0000 Subject: [PATCH 3/3] Handle subtype being added to parent path. --- js/core/src/tracing/instrumentation.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 827c3bd127..9f68565518 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -100,13 +100,10 @@ export async function runInNewSpan( opts.metadata.state = 'success'; } - opts.metadata.path = decoratePathWithSubtype(opts.metadata); - recordPath(opts.metadata.path); - + recordPath(opts.metadata); return output; } catch (e) { - opts.metadata.path = decoratePathWithSubtype(opts.metadata); - recordPath(opts.metadata.path, e); + recordPath(opts.metadata, e); opts.metadata.state = 'error'; otSpan.setStatus({ code: SpanStatusCode.ERROR, @@ -205,7 +202,9 @@ function buildPath( return parentPath + `/{${name}${stepType}}`; } -function recordPath(path: string, err?: any) { +function recordPath(spanMeta: SpanMetadata, err?: any) { + const path = spanMeta.path || ''; + const decoratedPath = decoratePathWithSubtype(spanMeta); // Only add the path if a child has not already been added. In the event that // an error is rethrown, we don't want to add each step in the unwind. const paths = Array.from( @@ -215,12 +214,13 @@ function recordPath(path: string, err?: any) { const now = performance.now(); const start = traceMetadataAls.getStore()?.timestamp || now; traceMetadataAls.getStore()?.paths?.add({ - path, + path: decoratedPath, status: err ? 'failure' : 'success', error: err?.name, latency: now - start, }); } + spanMeta.path = decoratedPath; } function decoratePathWithSubtype(metadata: SpanMetadata): string {