From c74052a990e949647b2e30dc9016a79b4926efe6 Mon Sep 17 00:00:00 2001 From: s1gr1d Date: Wed, 10 Sep 2025 11:33:02 +0200 Subject: [PATCH 1/3] Removed per-step init; Sentry is initialized once per workflow run --- packages/cloudflare/src/workflows.ts | 100 +++++++++++++-------------- 1 file changed, 49 insertions(+), 51 deletions(-) diff --git a/packages/cloudflare/src/workflows.ts b/packages/cloudflare/src/workflows.ts index 7ca31d00bbd5..974e5af1a144 100644 --- a/packages/cloudflare/src/workflows.ts +++ b/packages/cloudflare/src/workflows.ts @@ -56,29 +56,6 @@ async function propagationContextFromInstanceId(instanceId: string): Promise( - instanceId: string, - options: CloudflareOptions, - callback: () => V, -): Promise { - setAsyncLocalStorageAsyncContextStrategy(); - - return withIsolationScope(async isolationScope => { - const client = init({ ...options, enableDedupe: false }); - isolationScope.setClient(client); - - addCloudResourceContext(isolationScope); - - return withScope(async scope => { - const propagationContext = await propagationContextFromInstanceId(instanceId); - scope.setPropagationContext(propagationContext); - - // eslint-disable-next-line no-return-await - return await callback(); - }); - }); -} - class WrappedWorkflowStep implements WorkflowStep { public constructor( private _instanceId: string, @@ -102,34 +79,32 @@ class WrappedWorkflowStep implements WorkflowStep { const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; const instrumentedCallback: () => Promise = async () => { - return workflowStepWithSentry(this._instanceId, this._options, async () => { - return startSpan( - { - op: 'function.step.do', - name, - attributes: { - 'cloudflare.workflow.timeout': config?.timeout, - 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, - 'cloudflare.workflow.retries.delay': config?.retries?.delay, - 'cloudflare.workflow.retries.limit': config?.retries?.limit, - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', - [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', - }, + return startSpan( + { + op: 'function.step.do', + name, + attributes: { + 'cloudflare.workflow.timeout': config?.timeout, + 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, + 'cloudflare.workflow.retries.delay': config?.retries?.delay, + 'cloudflare.workflow.retries.limit': config?.retries?.limit, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', }, - async span => { - try { - const result = await userCallback(); - span.setStatus({ code: 1 }); - return result; - } catch (error) { - captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); - throw error; - } finally { - this._ctx.waitUntil(flush(2000)); - } - }, - ); - }); + }, + async span => { + try { + const result = await userCallback(); + span.setStatus({ code: 1 }); + return result; + } catch (error) { + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); + throw error; + } finally { + this._ctx.waitUntil(flush(2000)); + } + }, + ); }; return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback); @@ -183,7 +158,30 @@ export function instrumentWorkflowWithSentry< get(obj, prop, receiver) { if (prop === 'run') { return async function (event: WorkflowEvent

, step: WorkflowStep): Promise { - return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step)); + // Ensure async context strategy is set once per workflow run + setAsyncLocalStorageAsyncContextStrategy(); + + return withIsolationScope(async isolationScope => { + const client = init({ ...options, enableDedupe: false }); + isolationScope.setClient(client); + + addCloudResourceContext(isolationScope); + + return withScope(async scope => { + const propagationContext = await propagationContextFromInstanceId(event.instanceId); + scope.setPropagationContext(propagationContext); + + try { + return await obj.run.call( + obj, + event, + new WrappedWorkflowStep(event.instanceId, ctx, options, step), + ); + } finally { + ctx.waitUntil(flush(2000)); + } + }); + }); }; } return Reflect.get(obj, prop, receiver); From e86bcd295d3325523156971d0abc3a92655bc7b5 Mon Sep 17 00:00:00 2001 From: s1gr1d Date: Wed, 10 Sep 2025 11:59:22 +0200 Subject: [PATCH 2/3] add scope for each step --- packages/cloudflare/src/workflows.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/cloudflare/src/workflows.ts b/packages/cloudflare/src/workflows.ts index 974e5af1a144..97726e9b9270 100644 --- a/packages/cloudflare/src/workflows.ts +++ b/packages/cloudflare/src/workflows.ts @@ -2,6 +2,7 @@ import type { PropagationContext } from '@sentry/core'; import { captureException, flush, + getCurrentScope, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, startSpan, @@ -75,6 +76,9 @@ class WrappedWorkflowStep implements WorkflowStep { configOrCallback: WorkflowStepConfig | (() => Promise), maybeCallback?: () => Promise, ): Promise { + // Capture the current scope, so parent span (e.g., a startSpan surrounding step.do) is preserved + const scopeForStep = getCurrentScope(); + const userCallback = (maybeCallback || configOrCallback) as () => Promise; const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; @@ -83,6 +87,7 @@ class WrappedWorkflowStep implements WorkflowStep { { op: 'function.step.do', name, + scope: scopeForStep, attributes: { 'cloudflare.workflow.timeout': config?.timeout, 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, @@ -158,7 +163,6 @@ export function instrumentWorkflowWithSentry< get(obj, prop, receiver) { if (prop === 'run') { return async function (event: WorkflowEvent

, step: WorkflowStep): Promise { - // Ensure async context strategy is set once per workflow run setAsyncLocalStorageAsyncContextStrategy(); return withIsolationScope(async isolationScope => { From 169297f06d14b1064e2f4816dd944126c9b6ccf1 Mon Sep 17 00:00:00 2001 From: s1gr1d Date: Wed, 10 Sep 2025 12:09:11 +0200 Subject: [PATCH 3/3] add test --- packages/cloudflare/test/workflow.test.ts | 54 ++++++++++++++++++++--- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/packages/cloudflare/test/workflow.test.ts b/packages/cloudflare/test/workflow.test.ts index c403023fb525..e1a6c87e5279 100644 --- a/packages/cloudflare/test/workflow.test.ts +++ b/packages/cloudflare/test/workflow.test.ts @@ -1,4 +1,5 @@ /* eslint-disable @typescript-eslint/unbound-method */ +import { startSpan } from '@sentry/core'; import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers'; import { beforeEach, describe, expect, test, vi } from 'vitest'; import { deterministicTraceIdFromInstanceId, instrumentWorkflowWithSentry } from '../src/workflows'; @@ -96,7 +97,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => { expect(mockStep.do).toHaveBeenCalledTimes(1); expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function)); - expect(mockContext.waitUntil).toHaveBeenCalledTimes(1); + // We flush after the step.do and at the end of the run + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); expect(mockTransport.send).toHaveBeenCalledTimes(1); expect(mockTransport.send).toHaveBeenCalledWith([ @@ -161,7 +163,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => { expect(mockStep.do).toHaveBeenCalledTimes(1); expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function)); - expect(mockContext.waitUntil).toHaveBeenCalledTimes(1); + // We flush after the step.do and at the end of the run + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); expect(mockTransport.send).toHaveBeenCalledTimes(1); expect(mockTransport.send).toHaveBeenCalledWith([ @@ -232,8 +235,10 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => { expect(mockStep.do).toHaveBeenCalledTimes(1); expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); - expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + // One flush for the error transaction, one for the retry success, one at end of run + expect(mockContext.waitUntil).toHaveBeenCalledTimes(3); expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + // error event + failed transaction + successful retry transaction expect(mockTransport.send).toHaveBeenCalledTimes(3); // First we should get the error event @@ -376,11 +381,11 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => { expect(mockStep.do).toHaveBeenCalledTimes(1); expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); - expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + // One flush for the error event and one at end of run + expect(mockContext.waitUntil).toHaveBeenCalledTimes(3); expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); - // We should get the error event and then nothing else. No transactions - // should be sent + // We should get the error event and then nothing else. No transactions should be sent expect(mockTransport.send).toHaveBeenCalledTimes(1); expect(mockTransport.send).toHaveBeenCalledWith([ @@ -421,4 +426,41 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => { ], ]); }); + + test('Step.do span becomes child of surrounding custom span', async () => { + class ParentChildWorkflow { + constructor(_ctx: ExecutionContext, _env: unknown) {} + + async run(_event: Readonly>, step: WorkflowStep): Promise { + await startSpan({ name: 'custom span' }, async () => { + await step.do('first step', async () => { + return { files: ['a'] }; + }); + }); + } + } + + const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, ParentChildWorkflow as any); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as ParentChildWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + // Flush after step.do and at end of run + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + expect(mockTransport.send).toHaveBeenCalledTimes(1); + + const sendArg = mockTransport.send.mock.calls[0]![0] as any; + const items = sendArg[1] as any[]; + const rootSpanItem = items.find(i => i[0].type === 'transaction'); + expect(rootSpanItem).toBeDefined(); + const rootSpan = rootSpanItem[1]; + + expect(rootSpan.transaction).toBe('custom span'); + const rootSpanId = rootSpan.contexts.trace.span_id; + + // Child span for the step.do with the custom span as parent + const stepSpan = rootSpan.spans.find((s: any) => s.description === 'first step' && s.op === 'function.step.do'); + expect(stepSpan).toBeDefined(); + expect(stepSpan.parent_span_id).toBe(rootSpanId); + }); });