From 55183ba4c7c759b6dad436238540728526654cda Mon Sep 17 00:00:00 2001 From: JPeer264 Date: Wed, 22 Apr 2026 14:41:11 +0200 Subject: [PATCH 1/2] fix(cloudflare): Use TransformStream to keep streams open --- packages/cloudflare/src/request.ts | 54 +++--- .../worker/instrumentFetch.test.ts | 8 +- packages/cloudflare/test/request.test.ts | 175 ++++++++++++++++-- 3 files changed, 194 insertions(+), 43 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 05e870ff5d81..4fbdd9cf7fb4 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -129,40 +129,36 @@ export function wrapRequestHandler( const classification = classifyResponseStreaming(res); if (classification.isStreaming && res.body) { - // Streaming response detected - monitor consumption to keep span alive try { - const [clientStream, monitorStream] = res.body.tee(); - - // Monitor stream consumption and end span when complete - const streamMonitor = (async () => { - const reader = monitorStream.getReader(); - - try { - let done = false; - while (!done) { - const result = await reader.read(); - done = result.done; - } - } catch { - // Stream error or cancellation - will end span in finally - } finally { - reader.releaseLock(); - span.end(); - waitUntil?.(flushAndDispose(client)); - } - })(); - - // Keep worker alive until stream monitoring completes (otherwise span won't end) - waitUntil?.(streamMonitor); - - // Return response with client stream - return new Response(clientStream, { + let ended = false; + + const endSpanOnce = (): void => { + if (ended) return; + + ended = true; + span.end(); + waitUntil?.(flushAndDispose(client)); + }; + + const transform = new TransformStream({ + flush() { + // Source stream completed normally. + endSpanOnce(); + }, + cancel() { + // Client disconnected (or downstream cancelled). The `cancel` + // is being called while the response is still considered + // active, so this is a safe place to end the span. + endSpanOnce(); + }, + }); + + return new Response(res.body.pipeThrough(transform), { status: res.status, statusText: res.statusText, headers: res.headers, }); - } catch (_e) { - // tee() failed (e.g stream already locked) - fall back to non-streaming handling + } catch { span.end(); waitUntil?.(flushAndDispose(client)); return res; diff --git a/packages/cloudflare/test/instrumentations/worker/instrumentFetch.test.ts b/packages/cloudflare/test/instrumentations/worker/instrumentFetch.test.ts index 5486addb405c..2c8a734890fc 100644 --- a/packages/cloudflare/test/instrumentations/worker/instrumentFetch.test.ts +++ b/packages/cloudflare/test/instrumentations/worker/instrumentFetch.test.ts @@ -163,9 +163,11 @@ describe('instrumentFetch', () => { const wrappedHandler = withSentry(vi.fn(), handler); const waits: Promise[] = []; const waitUntil = vi.fn(promise => waits.push(promise)); - await wrappedHandler.fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, { - waitUntil, - } as unknown as ExecutionContext); + await wrappedHandler + .fetch?.(new Request('https://example.com'), MOCK_ENV_WITHOUT_DSN, { + waitUntil, + } as unknown as ExecutionContext) + .then(response => response.text()); expect(flush).not.toBeCalled(); expect(waitUntil).toBeCalled(); vi.advanceTimersToNextTimer().runAllTimers(); diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index 5160d8976e9b..ecd5765abf11 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -44,7 +44,7 @@ describe('withSentry', () => { await wrapRequestHandler( { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => new Response('test'), - ); + ).then(response => response.text()); expect(waitUntilSpy).toHaveBeenCalledTimes(1); expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise)); @@ -111,11 +111,8 @@ describe('withSentry', () => { await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { addDelayedWaitUntil(context); - const response = new Response('test'); - // Add Content-Length to skip probing - response.headers.set('content-length', '4'); - return response; - }); + return new Response('test'); + }).then(response => response.text()); expect(waitUntil).toBeCalled(); vi.advanceTimersToNextTimer().runAllTimers(); await Promise.all(waits); @@ -336,7 +333,7 @@ describe('withSentry', () => { SentryCore.captureMessage('sentry-trace'); return new Response('test'); }, - ); + ).then(response => response.text()); // Wait for async span end and transaction capture await new Promise(resolve => setTimeout(resolve, 50)); @@ -389,10 +386,8 @@ describe('flushAndDispose', () => { const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { - const response = new Response('test'); - response.headers.set('content-length', '4'); - return response; - }); + return new Response('test'); + }).then(response => response.text()); // Wait for all waitUntil promises to resolve await Promise.all(waits); @@ -518,6 +513,164 @@ describe('flushAndDispose', () => { disposeSpy.mockRestore(); }); + // Regression tests for https://github.com/getsentry/sentry-javascript/issues/20409 + // + // Pre-fix: streaming responses were observed via `body.tee()` + a long-running + // `waitUntil(streamMonitor)`. Cloudflare caps `waitUntil` at ~30s after the + // handler returns, so any stream taking longer than 30s to fully emit had the + // monitor cancelled before `span.end()` / `flushAndDispose()` ran — silently + // dropping the root `http.server` span. + // + // Post-fix: the body is piped through a passthrough `TransformStream`; the + // `flush` (normal completion) and `cancel` (client disconnect) callbacks fire + // while the response stream is still active (no waitUntil cap), so they can + // safely end the span and register `flushAndDispose` via a fresh `waitUntil` + // window. The contract guaranteed below: `waitUntil` is NOT called with any + // long-running stream-observation promise — only with `flushAndDispose`, and + // only after the response stream has finished (either by completion or cancel). + describe('regression #20409: streaming responses do not park stream observation in waitUntil', () => { + test('waitUntil is not called until streaming response is fully delivered', async () => { + const waits: Promise[] = []; + const waitUntil = vi.fn((promise: Promise) => waits.push(promise)); + const context = { waitUntil } as unknown as ExecutionContext; + + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); + const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); + + // Stream emits chunk1, then waits indefinitely until we open the gate + // before emitting chunk2 + closing. Models a long-running upstream + // (e.g. SSE / LLM streaming) whose body takes longer than the + // handler-return time to fully drain. + let releaseLastChunk!: () => void; + const lastChunkGate = new Promise(resolve => { + releaseLastChunk = resolve; + }); + + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + await lastChunkGate; + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, + () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), + ); + + // Handler has returned, but the source stream has NOT closed yet. + // The pre-fix code would have already enqueued a long-running + // `waitUntil(streamMonitor)` task at this point. The post-fix code + // must not call waitUntil at all here. + expect(waitUntil).not.toHaveBeenCalled(); + + // Drain the response — Cloudflare would do this when forwarding to the client. + const reader = result.body!.getReader(); + await reader.read(); // chunk1 + // Source still hasn't closed — still no waitUntil. + expect(waitUntil).not.toHaveBeenCalled(); + + releaseLastChunk(); + await reader.read(); // chunk2 + await reader.read(); // done + reader.releaseLock(); + + // Stream completed → TransformStream `flush` fired → span ended → + // `flushAndDispose(client)` queued via waitUntil exactly once. + await Promise.all(waits); + expect(waitUntil).toHaveBeenCalledTimes(1); + expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); + expect(flushSpy).toHaveBeenCalled(); + expect(disposeSpy).toHaveBeenCalled(); + + flushSpy.mockRestore(); + disposeSpy.mockRestore(); + }); + + test('waitUntil is called once and dispose runs when client cancels mid-stream', async () => { + const waits: Promise[] = []; + const waitUntil = vi.fn((promise: Promise) => waits.push(promise)); + const context = { waitUntil } as unknown as ExecutionContext; + + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); + const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); + + // Stream emits one chunk and then never closes — models an upstream + // that keeps emitting indefinitely. We then cancel the response from + // the consumer side to model a client disconnect. + let sourceCancelled = false; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + // intentionally don't close + }, + cancel() { + sourceCancelled = true; + }, + }); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, + () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), + ); + + // Handler returned, source still open — no waitUntil yet. + expect(waitUntil).not.toHaveBeenCalled(); + + const reader = result.body!.getReader(); + await reader.read(); // chunk1 + await reader.cancel('client disconnected'); // simulates client disconnect + reader.releaseLock(); + + // TransformStream `cancel` fired → span ended → flushAndDispose queued. + await Promise.all(waits); + expect(waitUntil).toHaveBeenCalledTimes(1); + expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); + expect(flushSpy).toHaveBeenCalled(); + expect(disposeSpy).toHaveBeenCalled(); + // pipeThrough should also propagate the cancel upstream to the source. + expect(sourceCancelled).toBe(true); + + flushSpy.mockRestore(); + disposeSpy.mockRestore(); + }); + + test('waitUntil is called exactly once even if the response is consumed multiple times', async () => { + // Sanity: no matter how the response is drained, the TransformStream's + // flush callback must only end the span (and queue flushAndDispose) once. + const waits: Promise[] = []; + const waitUntil = vi.fn((promise: Promise) => waits.push(promise)); + const context = { waitUntil } as unknown as ExecutionContext; + + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); + const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('a')); + controller.enqueue(new TextEncoder().encode('b')); + controller.close(); + }, + }); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, + () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), + ); + + const text = await result.text(); + expect(text).toBe('ab'); + + await Promise.all(waits); + expect(waitUntil).toHaveBeenCalledTimes(1); + + flushSpy.mockRestore(); + disposeSpy.mockRestore(); + }); + }); + test('dispose is NOT called for protocol upgrade responses (status 101)', async () => { const context = createMockExecutionContext(); const waits: Promise[] = []; From 339219a4b6256d4d530af28ef175e9cd985d90c5 Mon Sep 17 00:00:00 2001 From: JPeer264 Date: Wed, 22 Apr 2026 16:52:23 +0200 Subject: [PATCH 2/2] fixup! fix(cloudflare): Use TransformStream to keep streams open --- packages/cloudflare/test/request.test.ts | 99 +++++++++++++----------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index ecd5765abf11..28733ccfe651 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -14,6 +14,8 @@ const MOCK_OPTIONS: CloudflareOptions = { dsn: 'https://public@dsn.ingest.sentry.io/1337', }; +const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0]!); + function addDelayedWaitUntil(context: ExecutionContext) { context.waitUntil(new Promise(resolve => setTimeout(() => resolve()))); } @@ -589,53 +591,60 @@ describe('flushAndDispose', () => { disposeSpy.mockRestore(); }); - test('waitUntil is called once and dispose runs when client cancels mid-stream', async () => { - const waits: Promise[] = []; - const waitUntil = vi.fn((promise: Promise) => waits.push(promise)); - const context = { waitUntil } as unknown as ExecutionContext; - - const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); - const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); - - // Stream emits one chunk and then never closes — models an upstream - // that keeps emitting indefinitely. We then cancel the response from - // the consumer side to model a client disconnect. - let sourceCancelled = false; - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(new TextEncoder().encode('chunk1')); - // intentionally don't close - }, - cancel() { - sourceCancelled = true; - }, - }); - - const result = await wrapRequestHandler( - { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, - () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), - ); - - // Handler returned, source still open — no waitUntil yet. - expect(waitUntil).not.toHaveBeenCalled(); - - const reader = result.body!.getReader(); - await reader.read(); // chunk1 - await reader.cancel('client disconnected'); // simulates client disconnect - reader.releaseLock(); + // Node 18's TransformStream does not invoke the transformer's `cancel` hook + // when the downstream consumer cancels (WHATWG spec addition landed in Node 20). + // Cloudflare Workers run modern V8 where this works, so we only skip the + // test under Node 18. + test.skipIf(NODE_MAJOR_VERSION < 20)( + 'waitUntil is called once and dispose runs when client cancels mid-stream', + async () => { + const waits: Promise[] = []; + const waitUntil = vi.fn((promise: Promise) => waits.push(promise)); + const context = { waitUntil } as unknown as ExecutionContext; + + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); + const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); + + // Stream emits one chunk and then never closes — models an upstream + // that keeps emitting indefinitely. We then cancel the response from + // the consumer side to model a client disconnect. + let sourceCancelled = false; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + // intentionally don't close + }, + cancel() { + sourceCancelled = true; + }, + }); - // TransformStream `cancel` fired → span ended → flushAndDispose queued. - await Promise.all(waits); - expect(waitUntil).toHaveBeenCalledTimes(1); - expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); - expect(flushSpy).toHaveBeenCalled(); - expect(disposeSpy).toHaveBeenCalled(); - // pipeThrough should also propagate the cancel upstream to the source. - expect(sourceCancelled).toBe(true); + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, + () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), + ); - flushSpy.mockRestore(); - disposeSpy.mockRestore(); - }); + // Handler returned, source still open — no waitUntil yet. + expect(waitUntil).not.toHaveBeenCalled(); + + const reader = result.body!.getReader(); + await reader.read(); // chunk1 + await reader.cancel('client disconnected'); // simulates client disconnect + reader.releaseLock(); + + // TransformStream `cancel` fired → span ended → flushAndDispose queued. + await Promise.all(waits); + expect(waitUntil).toHaveBeenCalledTimes(1); + expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); + expect(flushSpy).toHaveBeenCalled(); + expect(disposeSpy).toHaveBeenCalled(); + // pipeThrough should also propagate the cancel upstream to the source. + expect(sourceCancelled).toBe(true); + + flushSpy.mockRestore(); + disposeSpy.mockRestore(); + }, + ); test('waitUntil is called exactly once even if the response is consumed multiple times', async () => { // Sanity: no matter how the response is drained, the TransformStream's