From c1b5d1d0a92e13d26146eb12f52ae4393a817898 Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Thu, 13 Nov 2025 12:45:45 +0100 Subject: [PATCH 1/4] [Flight] Fix broken byte stream parsing caused by buffer detachment This PR fixes a critical bug where `ReadableStream({type: 'bytes'})` instances passed through React Server Components (RSC) would stall after reading only the first chunk or the first few chunks in the client. This issue was masked by using `web-streams-polyfill` in tests, but manifests with native Web Streams implementations. The root cause is that when a chunk is enqueued to a `ReadableByteStreamController`, the spec requires the underlying ArrayBuffer to be synchronously transferred/detached. In the React Flight Client's chunk parsing, embedded byte stream chunks are created as views into the incoming RSC stream chunk buffer using `new Uint8Array(chunk.buffer, offset, length)`. When the first embedded byte stream chunk is enqueued, it detaches the shared buffer, leaving the entire RSC stream parsing in a broken state. The fix is to buffer all embedded byte stream chunks during each `processBinaryChunk()` call and enqueue them at the end, after parsing is complete. This ensures the parser never accesses detached buffers, and it allows us to determine if we need to copy chunks. When there's only a single embedded stream with a single chunk in the RSC stream chunk, we use a zero-copy optimization and enqueue the buffer directly. When there are multiple embedded streams in the same RSC stream chunk, each chunk must be copied to avoid buffer detachment. When the same stream has multiple chunks in the RSC stream chunk, we concatenate them into a single contiguous buffer before enqueueing to reduce downstream overhead. A future follow-up will implement server-side optimizations to concatenate multiple embedded byte stream rows before flushing, increasing the likelihood of hitting the zero-copy path on the client and reducing per-row RSC protocol overhead. Tests now use the proper Jest environment with native Web Streams instead of polyfills, exposing and validating the fix for this issue. --- .../react-client/src/ReactFlightClient.js | 122 ++++++++++++-- .../ReactFlightTurbopackDOMEdge-test.js | 9 +- .../ReactFlightTurbopackDOMReplyEdge-test.js | 9 +- .../src/__tests__/ReactFlightDOMEdge-test.js | 152 ++++++++++++++---- .../__tests__/ReactFlightDOMReplyEdge-test.js | 55 +++---- 5 files changed, 256 insertions(+), 91 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 7b6356361570d..0ac79b96905d4 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -2758,6 +2758,8 @@ export type StreamState = { _rowTag: number, // 0 indicates that we're currently parsing the row ID _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. _buffer: Array, // chunks received so far as part of this row + _pendingByteStreamIDs: Set, // IDs of pending streams with type: 'bytes' + _toBeClosedByteStreamIDs: Set, // IDs of byte streams that received close but may still need to enqueue chunks _debugInfo: ReactIOInfo, // DEV-only _debugTargetChunkSize: number, // DEV-only }; @@ -2772,6 +2774,8 @@ export function createStreamState( _rowTag: 0, _rowLength: 0, _buffer: [], + _pendingByteStreamIDs: new Set(), + _toBeClosedByteStreamIDs: new Set(), }: Omit): any); if (__DEV__ && enableAsyncDebugInfo) { const response = unwrapWeakResponse(weakResponse); @@ -4788,6 +4792,7 @@ function processFullStringRow( } // Fallthrough case 114 /* "r" */: { + streamState._pendingByteStreamIDs.add(id); startReadableStream(response, id, 'bytes', streamState); return; } @@ -4803,7 +4808,13 @@ function processFullStringRow( } // Fallthrough case 67 /* "C" */: { - stopStream(response, id, row); + // If this is for a pending byte stream, defer the close until all + // buffered chunks are enqueued at the end of processBinaryChunk. + if (streamState._pendingByteStreamIDs.has(id)) { + streamState._toBeClosedByteStreamIDs.add(id); + } else { + stopStream(response, id, row); + } return; } // Fallthrough @@ -4837,6 +4848,12 @@ export function processBinaryChunk( const buffer = streamState._buffer; const chunkLength = chunk.length; incrementChunkDebugInfo(streamState, chunkLength); + + // Buffer chunks for byte streams during parsing to avoid ArrayBuffer + // detachment. We'll enqueue them after the while loop to apply a zero-copy + // optimization when there's only a single chunk overall. + const pendingByteStreamChunks: Map> = new Map(); + while (i < chunkLength) { let lastIdx = -1; switch (rowState) { @@ -4916,14 +4933,44 @@ export function processBinaryChunk( // We found the last chunk of the row const length = lastIdx - i; const lastChunk = new Uint8Array(chunk.buffer, offset, length); - processFullBinaryRow( - response, - streamState, - rowID, - rowTag, - buffer, - lastChunk, - ); + + // Check if this is a Uint8Array for a pending byte stream that needs to + // be buffered until the end of this chunk, to avoid detaching the + // underlying shared ArrayBuffer. + if ( + rowTag === 111 /* "o" */ && + streamState._pendingByteStreamIDs.has(rowID) + ) { + let chunks = pendingByteStreamChunks.get(rowID); + if (chunks === undefined) { + chunks = []; + pendingByteStreamChunks.set(rowID, chunks); + } + chunks.push(lastChunk); + } else { + // Process all other row types immediately. + processFullBinaryRow( + response, + streamState, + rowID, + rowTag, + buffer, + lastChunk, + ); + + // If this was a close command for a byte stream that has no pending + // buffered chunks in this parse cycle, we can close it immediately + // instead of deferring until the end of the chunk. + if ( + streamState._toBeClosedByteStreamIDs.has(rowID) && + !pendingByteStreamChunks.has(rowID) + ) { + streamState._toBeClosedByteStreamIDs.delete(rowID); + streamState._pendingByteStreamIDs.delete(rowID); + stopStream(response, rowID, ''); + } + } + // Reset state machine for a new row i = lastIdx; if (rowState === ROW_CHUNK_BY_NEWLINE) { @@ -4936,17 +4983,68 @@ export function processBinaryChunk( rowLength = 0; buffer.length = 0; } else { - // The rest of this row is in a future chunk. We stash the rest of the - // current chunk until we can process the full row. + // The rest of this row is in a future chunk. const length = chunk.byteLength - i; const remainingSlice = new Uint8Array(chunk.buffer, offset, length); - buffer.push(remainingSlice); + + // For byte streams, we can enqueue chunks immediately rather than + // buffering them until the row completes. + if ( + rowTag === 111 /* "o" */ && + streamState._pendingByteStreamIDs.has(rowID) + ) { + let chunks = pendingByteStreamChunks.get(rowID); + if (chunks === undefined) { + chunks = []; + pendingByteStreamChunks.set(rowID, chunks); + } + chunks.push(remainingSlice); + } else { + // For other row types, stash the rest of the current chunk until we can + // process the full row. + buffer.push(remainingSlice); + } + // Update how many bytes we're still waiting for. If we're looking for // a newline, this doesn't hurt since we'll just ignore it. rowLength -= remainingSlice.byteLength; break; } } + + // Enqueue all buffered byte stream chunks. + const streamCount = pendingByteStreamChunks.size; + if (streamCount > 0) { + pendingByteStreamChunks.forEach((chunks, streamId) => { + if (streamCount === 1 && chunks.length === 1) { + // Single stream with single chunk - use zero-copy optimization. + resolveBuffer(response, streamId, chunks[0], streamState); + } else if (chunks.length === 1) { + // Single chunk but multiple streams - must copy to avoid buffer + // detachment. + resolveBuffer(response, streamId, chunks[0].slice(), streamState); + } else { + // Multiple chunks - concatenate them into a single buffer to give the + // consumer a contiguous chunk. + const totalLength = chunks.reduce((sum, c) => sum + c.length, 0); + const concatenated = new Uint8Array(totalLength); + let offset = 0; + for (let j = 0; j < chunks.length; j++) { + concatenated.set(chunks[j], offset); + offset += chunks[j].length; + } + resolveBuffer(response, streamId, concatenated, streamState); + } + }); + } + + // Process deferred closes for byte streams that were in this chunk. + streamState._toBeClosedByteStreamIDs.forEach(id => { + streamState._pendingByteStreamIDs.delete(id); + stopStream(response, id, ''); + }); + streamState._toBeClosedByteStreamIDs.clear(); + streamState._rowState = rowState; streamState._rowID = rowID; streamState._rowTag = rowTag; diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js index ec2d42201b97a..0bd60cb5e4c47 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js @@ -5,18 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - let clientExports; let turbopackMap; let turbopackModules; diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js index cbc06ef80d503..09803ab017c18 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js @@ -5,17 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -// let serverExports; let turbopackServerMap; let ReactServerDOMServer; let ReactServerDOMClient; @@ -29,7 +23,6 @@ describe('ReactFlightDOMTurbopackReply', () => { require('react-server-dom-turbopack/server.edge'), ); const TurbopackMock = require('./utils/TurbopackMock'); - // serverExports = TurbopackMock.serverExports; turbopackServerMap = TurbopackMock.turbopackServerMap; ReactServerDOMServer = require('react-server-dom-turbopack/server.edge'); jest.resetModules(); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index f5758ec22cfeb..5f7dfa516eb6a 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} // Patch for Edge environments for global scope global.AsyncLocalStorage = require('async_hooks').AsyncLocalStorage; @@ -127,8 +115,16 @@ describe('ReactFlightDOMEdge', () => { chunk.set(prevChunk, 0); chunk.set(value, prevChunk.length); if (chunk.length > 50) { + // Copy the part we're keeping (prevChunk) to avoid buffer + // transfer. When we enqueue the partial chunk below, downstream + // consumers (like byte streams in the Flight Client) may detach + // the underlying buffer. Since prevChunk would share the same + // buffer, we copy it first so it has its own independent buffer. + // TODO: Should we just use {type: 'bytes'} for this stream to + // always transfer ownership, and not only "accidentally" when we + // enqueue in the Flight Client? + prevChunk = chunk.slice(chunk.length - 50); controller.enqueue(chunk.subarray(0, chunk.length - 50)); - prevChunk = chunk.subarray(chunk.length - 50); } else { // Wait to see if we get some more bytes to join in. prevChunk = chunk; @@ -1118,25 +1114,121 @@ describe('ReactFlightDOMEdge', () => { expect(streamedBuffers).toEqual(buffers); }); + it('should support binary ReadableStreams', async () => { + const encoder = new TextEncoder(); + const words = ['Hello', 'streaming', 'world']; + + const stream = new ReadableStream({ + type: 'bytes', + async start(controller) { + for (let i = 0; i < words.length; i++) { + const chunk = encoder.encode(words[i] + ' '); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + rscStream, + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const decoder = new TextDecoder(); + + let text = ''; + let entry; + while (!(entry = await reader.read()).done) { + text += decoder.decode(entry.value); + } + + expect(text).toBe('Hello streaming world '); + }); + + it('should support large binary ReadableStreams', async () => { + const chunkCount = 100; + const chunkSize = 1024; + const expectedBytes = []; + + const stream = new ReadableStream({ + type: 'bytes', + start(controller) { + for (let i = 0; i < chunkCount; i++) { + const chunk = new Uint8Array(chunkSize); + for (let j = 0; j < chunkSize; j++) { + chunk[j] = (i + j) % 256; + } + expectedBytes.push(...Array.from(chunk)); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + // Use passThrough to split and rejoin chunks at arbitrary boundaries. + passThrough(rscStream), + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const receivedBytes = []; + let entry; + while (!(entry = await reader.read()).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + receivedBytes.push(...Array.from(entry.value)); + } + + expect(receivedBytes).toEqual(expectedBytes); + }); + it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -1176,11 +1268,7 @@ describe('ReactFlightDOMEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); // @gate !__DEV__ || enableComponentPerformanceTrack diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index 7315e78c619d2..b874383dcdd78 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} - let serverExports; let webpackServerMap; let ReactServerDOMServer; @@ -194,24 +182,33 @@ describe('ReactFlightDOMReplyEdge', () => { }); it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -239,11 +236,7 @@ describe('ReactFlightDOMReplyEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); it('should abort when parsing an incomplete payload', async () => { From b6fbec3733213e2d23e40acc6304e3156102e728 Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Thu, 13 Nov 2025 17:10:49 +0100 Subject: [PATCH 2/4] Do not defer enqueuing of byte stream chunks This simplifies the hot-path logic, but also means we're copying more often than in the previous approach, at least for small 'o' chunks. --- .../react-client/src/ReactFlightClient.js | 105 +++++------------- 1 file changed, 25 insertions(+), 80 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 0ac79b96905d4..2b4939497a9dd 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -2759,7 +2759,6 @@ export type StreamState = { _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. _buffer: Array, // chunks received so far as part of this row _pendingByteStreamIDs: Set, // IDs of pending streams with type: 'bytes' - _toBeClosedByteStreamIDs: Set, // IDs of byte streams that received close but may still need to enqueue chunks _debugInfo: ReactIOInfo, // DEV-only _debugTargetChunkSize: number, // DEV-only }; @@ -2775,7 +2774,6 @@ export function createStreamState( _rowLength: 0, _buffer: [], _pendingByteStreamIDs: new Set(), - _toBeClosedByteStreamIDs: new Set(), }: Omit): any); if (__DEV__ && enableAsyncDebugInfo) { const response = unwrapWeakResponse(weakResponse); @@ -4808,13 +4806,10 @@ function processFullStringRow( } // Fallthrough case 67 /* "C" */: { - // If this is for a pending byte stream, defer the close until all - // buffered chunks are enqueued at the end of processBinaryChunk. if (streamState._pendingByteStreamIDs.has(id)) { - streamState._toBeClosedByteStreamIDs.add(id); - } else { - stopStream(response, id, row); + streamState._pendingByteStreamIDs.delete(id); } + stopStream(response, id, row); return; } // Fallthrough @@ -4849,11 +4844,6 @@ export function processBinaryChunk( const chunkLength = chunk.length; incrementChunkDebugInfo(streamState, chunkLength); - // Buffer chunks for byte streams during parsing to avoid ArrayBuffer - // detachment. We'll enqueue them after the while loop to apply a zero-copy - // optimization when there's only a single chunk overall. - const pendingByteStreamChunks: Map> = new Map(); - while (i < chunkLength) { let lastIdx = -1; switch (rowState) { @@ -4934,21 +4924,23 @@ export function processBinaryChunk( const length = lastIdx - i; const lastChunk = new Uint8Array(chunk.buffer, offset, length); - // Check if this is a Uint8Array for a pending byte stream that needs to - // be buffered until the end of this chunk, to avoid detaching the - // underlying shared ArrayBuffer. + // Check if this is a Uint8Array for a byte stream. We enqueue it + // immediately but need to determine if we can use zero-copy or must copy. if ( rowTag === 111 /* "o" */ && streamState._pendingByteStreamIDs.has(rowID) ) { - let chunks = pendingByteStreamChunks.get(rowID); - if (chunks === undefined) { - chunks = []; - pendingByteStreamChunks.set(rowID, chunks); - } - chunks.push(lastChunk); + resolveBuffer( + response, + rowID, + // If we're at the end of the RSC chunk, no more parsing will access + // this buffer and we don't need to copy the chunk to allow detaching + // the buffer, otherwise we need to copy. + lastIdx === chunkLength ? lastChunk : lastChunk.slice(), + streamState, + ); } else { - // Process all other row types immediately. + // Process all other row types. processFullBinaryRow( response, streamState, @@ -4957,18 +4949,6 @@ export function processBinaryChunk( buffer, lastChunk, ); - - // If this was a close command for a byte stream that has no pending - // buffered chunks in this parse cycle, we can close it immediately - // instead of deferring until the end of the chunk. - if ( - streamState._toBeClosedByteStreamIDs.has(rowID) && - !pendingByteStreamChunks.has(rowID) - ) { - streamState._toBeClosedByteStreamIDs.delete(rowID); - streamState._pendingByteStreamIDs.delete(rowID); - stopStream(response, rowID, ''); - } } // Reset state machine for a new row @@ -4987,64 +4967,29 @@ export function processBinaryChunk( const length = chunk.byteLength - i; const remainingSlice = new Uint8Array(chunk.buffer, offset, length); - // For byte streams, we can enqueue chunks immediately rather than - // buffering them until the row completes. + // For byte streams, we can enqueue the partial chunk immediately using + // zero-copy since we're at the end of the RSC chunk and no more parsing + // will access this buffer. if ( rowTag === 111 /* "o" */ && streamState._pendingByteStreamIDs.has(rowID) ) { - let chunks = pendingByteStreamChunks.get(rowID); - if (chunks === undefined) { - chunks = []; - pendingByteStreamChunks.set(rowID, chunks); - } - chunks.push(remainingSlice); + // Update how many bytes we're still waiting for. We need to do this + // before enqueueing as enqueue will detach the buffer and byteLength + // will become 0. + rowLength -= remainingSlice.byteLength; + resolveBuffer(response, rowID, remainingSlice, streamState); } else { // For other row types, stash the rest of the current chunk until we can // process the full row. buffer.push(remainingSlice); + // Update how many bytes we're still waiting for. If we're looking for + // a newline, this doesn't hurt since we'll just ignore it. + rowLength -= remainingSlice.byteLength; } - - // Update how many bytes we're still waiting for. If we're looking for - // a newline, this doesn't hurt since we'll just ignore it. - rowLength -= remainingSlice.byteLength; break; } } - - // Enqueue all buffered byte stream chunks. - const streamCount = pendingByteStreamChunks.size; - if (streamCount > 0) { - pendingByteStreamChunks.forEach((chunks, streamId) => { - if (streamCount === 1 && chunks.length === 1) { - // Single stream with single chunk - use zero-copy optimization. - resolveBuffer(response, streamId, chunks[0], streamState); - } else if (chunks.length === 1) { - // Single chunk but multiple streams - must copy to avoid buffer - // detachment. - resolveBuffer(response, streamId, chunks[0].slice(), streamState); - } else { - // Multiple chunks - concatenate them into a single buffer to give the - // consumer a contiguous chunk. - const totalLength = chunks.reduce((sum, c) => sum + c.length, 0); - const concatenated = new Uint8Array(totalLength); - let offset = 0; - for (let j = 0; j < chunks.length; j++) { - concatenated.set(chunks[j], offset); - offset += chunks[j].length; - } - resolveBuffer(response, streamId, concatenated, streamState); - } - }); - } - - // Process deferred closes for byte streams that were in this chunk. - streamState._toBeClosedByteStreamIDs.forEach(id => { - streamState._pendingByteStreamIDs.delete(id); - stopStream(response, id, ''); - }); - streamState._toBeClosedByteStreamIDs.clear(); - streamState._rowState = rowState; streamState._rowID = rowID; streamState._rowTag = rowTag; From 485953e4fed56eea2da0e5403be2bd90f831d2b0 Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Thu, 13 Nov 2025 18:02:01 +0100 Subject: [PATCH 3/4] Serialize byte stream chunks as `'b'` rows instead of `o` rows. This allows us to avoid using the `_pendingByteStreamIDs` Set in the client to track which streams are byte streams. --- .../react-client/src/ReactFlightClient.js | 24 +++++-------------- .../react-server/src/ReactFlightServer.js | 21 ++++++++++------ 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 2b4939497a9dd..cc0361e6b8dc2 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -2758,7 +2758,6 @@ export type StreamState = { _rowTag: number, // 0 indicates that we're currently parsing the row ID _rowLength: number, // remaining bytes in the row. 0 indicates that we're looking for a newline. _buffer: Array, // chunks received so far as part of this row - _pendingByteStreamIDs: Set, // IDs of pending streams with type: 'bytes' _debugInfo: ReactIOInfo, // DEV-only _debugTargetChunkSize: number, // DEV-only }; @@ -2773,7 +2772,6 @@ export function createStreamState( _rowTag: 0, _rowLength: 0, _buffer: [], - _pendingByteStreamIDs: new Set(), }: Omit): any); if (__DEV__ && enableAsyncDebugInfo) { const response = unwrapWeakResponse(weakResponse); @@ -4790,7 +4788,6 @@ function processFullStringRow( } // Fallthrough case 114 /* "r" */: { - streamState._pendingByteStreamIDs.add(id); startReadableStream(response, id, 'bytes', streamState); return; } @@ -4806,9 +4803,6 @@ function processFullStringRow( } // Fallthrough case 67 /* "C" */: { - if (streamState._pendingByteStreamIDs.has(id)) { - streamState._pendingByteStreamIDs.delete(id); - } stopStream(response, id, row); return; } @@ -4843,7 +4837,6 @@ export function processBinaryChunk( const buffer = streamState._buffer; const chunkLength = chunk.length; incrementChunkDebugInfo(streamState, chunkLength); - while (i < chunkLength) { let lastIdx = -1; switch (rowState) { @@ -4864,6 +4857,7 @@ export function processBinaryChunk( resolvedRowTag === 65 /* "A" */ || resolvedRowTag === 79 /* "O" */ || resolvedRowTag === 111 /* "o" */ || + resolvedRowTag === 98 /* "b" */ || resolvedRowTag === 85 /* "U" */ || resolvedRowTag === 83 /* "S" */ || resolvedRowTag === 115 /* "s" */ || @@ -4926,10 +4920,7 @@ export function processBinaryChunk( // Check if this is a Uint8Array for a byte stream. We enqueue it // immediately but need to determine if we can use zero-copy or must copy. - if ( - rowTag === 111 /* "o" */ && - streamState._pendingByteStreamIDs.has(rowID) - ) { + if (rowTag === 98 /* "b" */) { resolveBuffer( response, rowID, @@ -4967,15 +4958,12 @@ export function processBinaryChunk( const length = chunk.byteLength - i; const remainingSlice = new Uint8Array(chunk.buffer, offset, length); - // For byte streams, we can enqueue the partial chunk immediately using - // zero-copy since we're at the end of the RSC chunk and no more parsing + // For byte streams, we can enqueue the partial row immediately without + // copying since we're at the end of the RSC chunk and no more parsing // will access this buffer. - if ( - rowTag === 111 /* "o" */ && - streamState._pendingByteStreamIDs.has(rowID) - ) { + if (rowTag === 98 /* "b" */) { // Update how many bytes we're still waiting for. We need to do this - // before enqueueing as enqueue will detach the buffer and byteLength + // before enqueueing, as enqueue will detach the buffer and byteLength // will become 0. rowLength -= remainingSlice.byteLength; resolveBuffer(response, rowID, remainingSlice, streamState); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 1f5e24ff5962f..bbe18270e4482 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -1149,6 +1149,8 @@ function serializeReadableStream( supportsBYOB = false; } } + // At this point supportsBYOB is guaranteed to be a boolean. + const isByteStream: boolean = supportsBYOB; const reader = stream.getReader(); @@ -1172,7 +1174,7 @@ function serializeReadableStream( // The task represents the Stop row. This adds a Start row. request.pendingChunks++; const startStreamRow = - streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + streamTask.id.toString(16) + ':' + (isByteStream ? 'r' : 'R') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); function progress(entry: {done: boolean, value: ReactClientValue, ...}) { @@ -1192,7 +1194,7 @@ function serializeReadableStream( try { streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask); + tryStreamTask(request, streamTask, isByteStream); enqueueFlush(request); reader.read().then(progress, error); } catch (x) { @@ -1319,7 +1321,7 @@ function serializeAsyncIterable( try { streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask); + tryStreamTask(request, streamTask, false); enqueueFlush(request); if (__DEV__) { callIteratorInDEV(iterator, progress, error); @@ -5534,6 +5536,7 @@ function emitChunk( request: Request, task: Task, value: ReactClientValue, + isByteStream: boolean, ): void { const id = task.id; // For certain types we have special types, we typically outlined them but @@ -5559,7 +5562,7 @@ function emitChunk( } if (value instanceof Uint8Array) { // unsigned char - emitTypedArrayChunk(request, id, 'o', value, false); + emitTypedArrayChunk(request, id, isByteStream ? 'b' : 'o', value, false); return; } if (value instanceof Uint8ClampedArray) { @@ -5717,7 +5720,7 @@ function retryTask(request: Request, task: Task): void { // Object might contain unresolved values like additional elements. // This is simulating what the JSON loop would do if this was part of it. - emitChunk(request, task, resolvedModel); + emitChunk(request, task, resolvedModel, false); } else { // If the value is a string, it means it's a terminal value and we already escaped it // We don't need to escape it again so it's not passed the toJSON replacer. @@ -5776,7 +5779,11 @@ function retryTask(request: Request, task: Task): void { } } -function tryStreamTask(request: Request, task: Task): void { +function tryStreamTask( + request: Request, + task: Task, + isByteStream: boolean, +): void { // This is used to try to emit something synchronously but if it suspends, // we emit a reference to a new outlined task immediately instead. const prevCanEmitDebugInfo = canEmitDebugInfo; @@ -5787,7 +5794,7 @@ function tryStreamTask(request: Request, task: Task): void { } const parentSerializedSize = serializedSize; try { - emitChunk(request, task, task.model); + emitChunk(request, task, task.model, isByteStream); } finally { serializedSize = parentSerializedSize; if (__DEV__) { From 77cf36637adc6971dbad7a25f1e28dbd3f81552b Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Thu, 13 Nov 2025 19:11:57 +0100 Subject: [PATCH 4/4] Emit byte stream chunk in `progress` inside `serializeReadableStream` --- .../react-server/src/ReactFlightServer.js | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index bbe18270e4482..f9729b535e073 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -1192,9 +1192,15 @@ function serializeReadableStream( callOnAllReadyIfReady(request); } else { try { - streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask, isByteStream); + streamTask.model = entry.value; + if (isByteStream) { + // Chunks of byte streams are always Uint8Array instances. + const chunk: Uint8Array = (streamTask.model: any); + emitTypedArrayChunk(request, streamTask.id, 'b', chunk, false); + } else { + tryStreamTask(request, streamTask); + } enqueueFlush(request); reader.read().then(progress, error); } catch (x) { @@ -1321,7 +1327,7 @@ function serializeAsyncIterable( try { streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask, false); + tryStreamTask(request, streamTask); enqueueFlush(request); if (__DEV__) { callIteratorInDEV(iterator, progress, error); @@ -5536,7 +5542,6 @@ function emitChunk( request: Request, task: Task, value: ReactClientValue, - isByteStream: boolean, ): void { const id = task.id; // For certain types we have special types, we typically outlined them but @@ -5562,7 +5567,7 @@ function emitChunk( } if (value instanceof Uint8Array) { // unsigned char - emitTypedArrayChunk(request, id, isByteStream ? 'b' : 'o', value, false); + emitTypedArrayChunk(request, id, 'o', value, false); return; } if (value instanceof Uint8ClampedArray) { @@ -5720,7 +5725,7 @@ function retryTask(request: Request, task: Task): void { // Object might contain unresolved values like additional elements. // This is simulating what the JSON loop would do if this was part of it. - emitChunk(request, task, resolvedModel, false); + emitChunk(request, task, resolvedModel); } else { // If the value is a string, it means it's a terminal value and we already escaped it // We don't need to escape it again so it's not passed the toJSON replacer. @@ -5779,11 +5784,7 @@ function retryTask(request: Request, task: Task): void { } } -function tryStreamTask( - request: Request, - task: Task, - isByteStream: boolean, -): void { +function tryStreamTask(request: Request, task: Task): void { // This is used to try to emit something synchronously but if it suspends, // we emit a reference to a new outlined task immediately instead. const prevCanEmitDebugInfo = canEmitDebugInfo; @@ -5794,7 +5795,7 @@ function tryStreamTask( } const parentSerializedSize = serializedSize; try { - emitChunk(request, task, task.model, isByteStream); + emitChunk(request, task, task.model); } finally { serializedSize = parentSerializedSize; if (__DEV__) {