Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions packages/react-client/src/ReactFlightClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -4857,6 +4857,7 @@ export function processBinaryChunk(
resolvedRowTag === 65 /* "A" */ ||
resolvedRowTag === 79 /* "O" */ ||
resolvedRowTag === 111 /* "o" */ ||
resolvedRowTag === 98 /* "b" */ ||
resolvedRowTag === 85 /* "U" */ ||
resolvedRowTag === 83 /* "S" */ ||
resolvedRowTag === 115 /* "s" */ ||
Expand Down Expand Up @@ -4916,14 +4917,31 @@ export function processBinaryChunk(
// We found the last chunk of the row
const length = lastIdx - i;
const lastChunk = new Uint8Array(chunk.buffer, offset, length);
processFullBinaryRow(
response,
streamState,
rowID,
rowTag,
buffer,
lastChunk,
);

// Check if this is a Uint8Array for a byte stream. We enqueue it
// immediately but need to determine if we can use zero-copy or must copy.
if (rowTag === 98 /* "b" */) {
resolveBuffer(
response,
rowID,
// If we're at the end of the RSC chunk, no more parsing will access
// this buffer and we don't need to copy the chunk to allow detaching
// the buffer, otherwise we need to copy.
lastIdx === chunkLength ? lastChunk : lastChunk.slice(),
streamState,
);
} else {
// Process all other row types.
processFullBinaryRow(
response,
streamState,
rowID,
rowTag,
buffer,
lastChunk,
);
}

// Reset state machine for a new row
i = lastIdx;
if (rowState === ROW_CHUNK_BY_NEWLINE) {
Expand All @@ -4936,14 +4954,27 @@ export function processBinaryChunk(
rowLength = 0;
buffer.length = 0;
} else {
// The rest of this row is in a future chunk. We stash the rest of the
// current chunk until we can process the full row.
// The rest of this row is in a future chunk.
const length = chunk.byteLength - i;
const remainingSlice = new Uint8Array(chunk.buffer, offset, length);
buffer.push(remainingSlice);
// Update how many bytes we're still waiting for. If we're looking for
// a newline, this doesn't hurt since we'll just ignore it.
rowLength -= remainingSlice.byteLength;

// For byte streams, we can enqueue the partial row immediately without
// copying since we're at the end of the RSC chunk and no more parsing
// will access this buffer.
if (rowTag === 98 /* "b" */) {
// Update how many bytes we're still waiting for. We need to do this
// before enqueueing, as enqueue will detach the buffer and byteLength
// will become 0.
rowLength -= remainingSlice.byteLength;
resolveBuffer(response, rowID, remainingSlice, streamState);
} else {
// For other row types, stash the rest of the current chunk until we can
// process the full row.
buffer.push(remainingSlice);
// Update how many bytes we're still waiting for. If we're looking for
// a newline, this doesn't hurt since we'll just ignore it.
rowLength -= remainingSlice.byteLength;
}
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@
* LICENSE file in the root directory of this source tree.
*
* @emails react-core
* @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment
*/

'use strict';

// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.WritableStream =
require('web-streams-polyfill/ponyfill/es6').WritableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;

let clientExports;
let turbopackMap;
let turbopackModules;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@
* LICENSE file in the root directory of this source tree.
*
* @emails react-core
* @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment
*/

'use strict';

// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;

// let serverExports;
let turbopackServerMap;
let ReactServerDOMServer;
let ReactServerDOMClient;
Expand All @@ -29,7 +23,6 @@ describe('ReactFlightDOMTurbopackReply', () => {
require('react-server-dom-turbopack/server.edge'),
);
const TurbopackMock = require('./utils/TurbopackMock');
// serverExports = TurbopackMock.serverExports;
turbopackServerMap = TurbopackMock.turbopackServerMap;
ReactServerDOMServer = require('react-server-dom-turbopack/server.edge');
jest.resetModules();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@

'use strict';

// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.WritableStream =
require('web-streams-polyfill/ponyfill/es6').WritableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;
global.Blob = require('buffer').Blob;
if (typeof File === 'undefined' || typeof FormData === 'undefined') {
global.File = require('buffer').File || require('undici').File;
global.FormData = require('undici').FormData;
}
// Patch for Edge environments for global scope
global.AsyncLocalStorage = require('async_hooks').AsyncLocalStorage;

Expand Down Expand Up @@ -127,8 +115,16 @@ describe('ReactFlightDOMEdge', () => {
chunk.set(prevChunk, 0);
chunk.set(value, prevChunk.length);
if (chunk.length > 50) {
// Copy the part we're keeping (prevChunk) to avoid buffer
// transfer. When we enqueue the partial chunk below, downstream
// consumers (like byte streams in the Flight Client) may detach
// the underlying buffer. Since prevChunk would share the same
// buffer, we copy it first so it has its own independent buffer.
// TODO: Should we just use {type: 'bytes'} for this stream to
// always transfer ownership, and not only "accidentally" when we
// enqueue in the Flight Client?
prevChunk = chunk.slice(chunk.length - 50);
controller.enqueue(chunk.subarray(0, chunk.length - 50));
prevChunk = chunk.subarray(chunk.length - 50);
} else {
// Wait to see if we get some more bytes to join in.
prevChunk = chunk;
Expand Down Expand Up @@ -1118,25 +1114,121 @@ describe('ReactFlightDOMEdge', () => {
expect(streamedBuffers).toEqual(buffers);
});

it('should support binary ReadableStreams', async () => {
const encoder = new TextEncoder();
const words = ['Hello', 'streaming', 'world'];

const stream = new ReadableStream({
type: 'bytes',
async start(controller) {
for (let i = 0; i < words.length; i++) {
const chunk = encoder.encode(words[i] + ' ');
controller.enqueue(chunk);
}
controller.close();
},
});

const rscStream = await serverAct(() =>
ReactServerDOMServer.renderToReadableStream(stream, {}),
);

const result = await ReactServerDOMClient.createFromReadableStream(
rscStream,
{
serverConsumerManifest: {
moduleMap: null,
moduleLoading: null,
},
},
);

const reader = result.getReader();
const decoder = new TextDecoder();

let text = '';
let entry;
while (!(entry = await reader.read()).done) {
text += decoder.decode(entry.value);
}

expect(text).toBe('Hello streaming world ');
});

it('should support large binary ReadableStreams', async () => {
const chunkCount = 100;
const chunkSize = 1024;
const expectedBytes = [];

const stream = new ReadableStream({
type: 'bytes',
start(controller) {
for (let i = 0; i < chunkCount; i++) {
const chunk = new Uint8Array(chunkSize);
for (let j = 0; j < chunkSize; j++) {
chunk[j] = (i + j) % 256;
}
expectedBytes.push(...Array.from(chunk));
controller.enqueue(chunk);
}
controller.close();
},
});

const rscStream = await serverAct(() =>
ReactServerDOMServer.renderToReadableStream(stream, {}),
);

const result = await ReactServerDOMClient.createFromReadableStream(
// Use passThrough to split and rejoin chunks at arbitrary boundaries.
passThrough(rscStream),
{
serverConsumerManifest: {
moduleMap: null,
moduleLoading: null,
},
},
);

const reader = result.getReader();
const receivedBytes = [];
let entry;
while (!(entry = await reader.read()).done) {
expect(entry.value instanceof Uint8Array).toBe(true);
receivedBytes.push(...Array.from(entry.value));
}

expect(receivedBytes).toEqual(expectedBytes);
});

it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
const sourceBytes = [
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
];

// Create separate buffers for each typed array to avoid ArrayBuffer
// transfer issues. Each view needs its own buffer because enqueue()
// transfers ownership.
const buffers = [
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
new Int8Array(sourceBytes.slice(1)),
new Uint8Array(sourceBytes.slice(2)),
new Uint8ClampedArray(sourceBytes.slice(2)),
new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer),
new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer),
new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new DataView(new Uint8Array(sourceBytes.slice(3)).buffer),
];

// Save expected bytes before enqueueing (which will detach the buffers).
const expectedBytes = buffers.flatMap(c =>
Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)),
);

// This a binary stream where each chunk ends up as Uint8Array.
const s = new ReadableStream({
type: 'bytes',
Expand Down Expand Up @@ -1176,11 +1268,7 @@ describe('ReactFlightDOMEdge', () => {

// The streamed buffers might be in different chunks and in Uint8Array form but
// the concatenated bytes should be the same.
expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(
buffers.flatMap(c =>
Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)),
),
);
expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes);
});

// @gate !__DEV__ || enableComponentPerformanceTrack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@

'use strict';

// Polyfills for test environment
global.ReadableStream =
require('web-streams-polyfill/ponyfill/es6').ReadableStream;
global.TextEncoder = require('util').TextEncoder;
global.TextDecoder = require('util').TextDecoder;

global.Blob = require('buffer').Blob;
if (typeof File === 'undefined' || typeof FormData === 'undefined') {
global.File = require('buffer').File || require('undici').File;
global.FormData = require('undici').FormData;
}

let serverExports;
let webpackServerMap;
let ReactServerDOMServer;
Expand Down Expand Up @@ -194,24 +182,33 @@ describe('ReactFlightDOMReplyEdge', () => {
});

it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
const sourceBytes = [
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
];

// Create separate buffers for each typed array to avoid ArrayBuffer
// transfer issues. Each view needs its own buffer because enqueue()
// transfers ownership.
const buffers = [
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
new Int8Array(sourceBytes.slice(1)),
new Uint8Array(sourceBytes.slice(2)),
new Uint8ClampedArray(sourceBytes.slice(2)),
new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer),
new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer),
new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer),
new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer),
new DataView(new Uint8Array(sourceBytes.slice(3)).buffer),
];

// Save expected bytes before enqueueing (which will detach the buffers).
const expectedBytes = buffers.flatMap(c =>
Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)),
);

// This a binary stream where each chunk ends up as Uint8Array.
const s = new ReadableStream({
type: 'bytes',
Expand Down Expand Up @@ -239,11 +236,7 @@ describe('ReactFlightDOMReplyEdge', () => {

// The streamed buffers might be in different chunks and in Uint8Array form but
// the concatenated bytes should be the same.
expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(
buffers.flatMap(c =>
Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)),
),
);
expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes);
});

it('should abort when parsing an incomplete payload', async () => {
Expand Down
Loading
Loading