Skip to content

Commit

Permalink
Implement buffering in pipeable streams
Browse files Browse the repository at this point in the history
The previous implementation of pipeable streaming (Node) suffered some performance issues brought about by the high chunk counts and innefficiencies with how node streams handle this situation. In particular the use of cork/uncork was meant to alleviate this but these methods do not do anything unless the receiving Writable Stream implements _writev which many won't.

This change adopts the view based buffering techniques previously implemented for the Browser execution context. The main difference is the use of backpressure provided by the writable stream which is not implementable in the other context. Another change to note is the use of standards constructs like TextEncoder and TypedArrays.
  • Loading branch information
gnoff committed Apr 8, 2022
1 parent 6ad3494 commit fc976d7
Showing 1 changed file with 72 additions and 16 deletions.
88 changes: 72 additions & 16 deletions packages/react-server/src/ReactServerStreamConfigNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

import type {Writable} from 'stream';
import {TextEncoder} from 'util';

type MightBeFlushable = {
flush?: () => void,
Expand All @@ -17,7 +18,7 @@ type MightBeFlushable = {
export type Destination = Writable & MightBeFlushable;

export type PrecomputedChunk = Uint8Array;
export type Chunk = string;
export type Chunk = Uint8Array;

export function scheduleWork(callback: () => void) {
setImmediate(callback);
Expand All @@ -33,46 +34,101 @@ export function flushBuffered(destination: Destination) {
}
}

const VIEW_SIZE = 2048;
let currentView = null;
let writtenBytes = 0;
let destinationHasCapacity = true;

export function beginWriting(destination: Destination) {
// Older Node streams like http.createServer don't have this.
if (typeof destination.cork === 'function') {
destination.cork();
}
currentView = new Uint8Array(VIEW_SIZE);
writtenBytes = 0;
destinationHasCapacity = true;
}

export function writeChunk(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: PrecomputedChunk | Chunk,
): void {
const nodeBuffer = ((chunk: any): Buffer | string); // close enough
destination.write(nodeBuffer);
if (chunk.byteLength === 0) {
return;
}

if (chunk.byteLength > VIEW_SIZE) {
// this chunk may overflow a single view which implies it was not
// one that is cached by the streaming renderer. We will enqueu
// it directly and expect it is not re-used
if (writtenBytes > 0) {
writeToDestination(
destination,
((currentView: any): Uint8Array).subarray(0, writtenBytes),
);
currentView = new Uint8Array(VIEW_SIZE);
writtenBytes = 0;
}
writeToDestination(destination, chunk);
return;
}

let bytesToWrite = chunk;
const allowableBytes = ((currentView: any): Uint8Array).length - writtenBytes;
if (allowableBytes < bytesToWrite.byteLength) {
// this chunk would overflow the current view. We enqueue a full view
// and start a new view with the remaining chunk
if (allowableBytes === 0) {
// the current view is already full, send it
writeToDestination(destination, (currentView: any));
} else {
// fill up the current view and apply the remaining chunk bytes
// to a new view.
((currentView: any): Uint8Array).set(
bytesToWrite.subarray(0, allowableBytes),
writtenBytes,
);
writtenBytes += allowableBytes;
writeToDestination(destination, (currentView: any));
bytesToWrite = bytesToWrite.subarray(allowableBytes);
}
currentView = new Uint8Array(VIEW_SIZE);
writtenBytes = 0;
}
((currentView: any): Uint8Array).set(bytesToWrite, writtenBytes);
writtenBytes += bytesToWrite.byteLength;
}

function writeToDestination(destination: Destination, view: Uint8Array) {
const currentHasCapacity = destination.write(view);
destinationHasCapacity = destinationHasCapacity && currentHasCapacity;
}

export function writeChunkAndReturn(
destination: Destination,
chunk: Chunk | PrecomputedChunk,
chunk: PrecomputedChunk | Chunk,
): boolean {
const nodeBuffer = ((chunk: any): Buffer | string); // close enough
return destination.write(nodeBuffer);
writeChunk(destination, chunk);
return destinationHasCapacity;
}

export function completeWriting(destination: Destination) {
// Older Node streams like http.createServer don't have this.
if (typeof destination.uncork === 'function') {
destination.uncork();
if (currentView && writtenBytes > 0) {
destination.write(currentView.subarray(0, writtenBytes));
}
currentView = null;
writtenBytes = 0;
destinationHasCapacity = true;
}

export function close(destination: Destination) {
destination.end();
}

const textEncoder = new TextEncoder();

export function stringToChunk(content: string): Chunk {
return content;
return textEncoder.encode(content);
}

export function stringToPrecomputedChunk(content: string): PrecomputedChunk {
return Buffer.from(content, 'utf8');
return textEncoder.encode(content);
}

export function closeWithError(destination: Destination, error: mixed): void {
Expand Down

0 comments on commit fc976d7

Please sign in to comment.