Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flight] Basic Streaming Suspense Support #17285

Merged
merged 6 commits into from Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,7 +26,7 @@ function startReadingFromStream(response, stream: ReadableStream): void {
return;
}
let buffer: Uint8Array = (value: any);
processBinaryChunk(response, buffer, 0);
processBinaryChunk(response, buffer);
return reader.read().then(progress, error);
}
function error(e) {
Expand Down
2 changes: 1 addition & 1 deletion packages/react-dom/src/server/ReactDOMFizzServerBrowser.js
Expand Up @@ -23,7 +23,7 @@ function renderToReadableStream(children: ReactNodeList): ReadableStream {
startWork(request);
},
pull(controller) {
startFlowing(request, controller.desiredSize);
startFlowing(request);
},
cancel(reason) {},
});
Expand Down
2 changes: 1 addition & 1 deletion packages/react-dom/src/server/ReactDOMFizzServerNode.js
Expand Up @@ -13,7 +13,7 @@ import type {Writable} from 'stream';
import {createRequest, startWork, startFlowing} from 'react-server/inline.dom';

function createDrainHandler(destination, request) {
return () => startFlowing(request, 0);
return () => startFlowing(request);
}

function pipeToNodeWritable(
Expand Down
Expand Up @@ -23,7 +23,7 @@ function renderToReadableStream(model: ReactModel): ReadableStream {
startWork(request);
},
pull(controller) {
startFlowing(request, controller.desiredSize);
startFlowing(request);
},
cancel(reason) {},
});
Expand Down
Expand Up @@ -17,7 +17,7 @@ import {
} from 'react-server/flight.inline.dom';

function createDrainHandler(destination, request) {
return () => startFlowing(request, 0);
return () => startFlowing(request);
}

function pipeToNodeWritable(model: ReactModel, destination: Writable): void {
Expand Down
277 changes: 223 additions & 54 deletions packages/react-flight/src/ReactFlightClient.js
Expand Up @@ -20,100 +20,269 @@ export type ReactModelRoot<T> = {|
model: T,
|};

type OpaqueResponse = {
type JSONValue = number | null | boolean | string | {[key: string]: JSONValue};

const PENDING = 0;
const RESOLVED = 1;
const ERRORED = 2;

type PendingChunk = {|
status: 0,
value: Promise<void>,
resolve: () => void,
|};
type ResolvedChunk = {|
status: 1,
value: mixed,
resolve: null,
|};
type ErroredChunk = {|
status: 2,
value: Error,
resolve: null,
|};
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;

type OpaqueResponseWithoutDecoder = {
source: Source,
modelRoot: ReactModelRoot<any>,
partialRow: string,
modelRoot: ReactModelRoot<any>,
chunks: Map<number, Chunk>,
fromJSON: (key: string, value: JSONValue) => any,
};

type OpaqueResponse = OpaqueResponseWithoutDecoder & {
stringDecoder: StringDecoder,
rootPing: () => void,
};

export function createResponse(source: Source): OpaqueResponse {
let modelRoot = {};
Object.defineProperty(
modelRoot,
'model',
({
configurable: true,
enumerable: true,
get() {
throw rootPromise;
},
}: any),
);

let rootPing;
let rootPromise = new Promise(resolve => {
rootPing = resolve;
});
let modelRoot: ReactModelRoot<any> = ({}: any);
let rootChunk: Chunk = createPendingChunk();
definePendingProperty(modelRoot, 'model', rootChunk);
let chunks: Map<number, Chunk> = new Map();
chunks.set(0, rootChunk);

let response: OpaqueResponse = ({
let response: OpaqueResponse = (({
source,
modelRoot,
partialRow: '',
rootPing,
}: any);
modelRoot,
chunks: chunks,
fromJSON: function(key, value) {
return parseFromJSON(response, this, key, value);
},
}: OpaqueResponseWithoutDecoder): any);
if (supportsBinaryStreams) {
response.stringDecoder = createStringDecoder();
}
return response;
}

function createPendingChunk(): PendingChunk {
let resolve: () => void = (null: any);
let promise = new Promise(r => (resolve = r));
return {
status: PENDING,
value: promise,
resolve: resolve,
};
}

function createErrorChunk(error: Error): ErroredChunk {
return {
status: ERRORED,
value: error,
resolve: null,
};
}

function triggerErrorOnChunk(chunk: Chunk, error: Error): void {
if (chunk.status !== PENDING) {
// We already resolved. We didn't expect to see this.
return;
}
let resolve = chunk.resolve;
let erroredChunk: ErroredChunk = (chunk: any);
erroredChunk.status = ERRORED;
erroredChunk.value = error;
erroredChunk.resolve = null;
resolve();
}

function createResolvedChunk(value: mixed): ResolvedChunk {
return {
status: RESOLVED,
value: value,
resolve: null,
};
}

function resolveChunk(chunk: Chunk, value: mixed): void {
if (chunk.status !== PENDING) {
// We already resolved. We didn't expect to see this.
return;
}
let resolve = chunk.resolve;
let resolvedChunk: ResolvedChunk = (chunk: any);
resolvedChunk.status = RESOLVED;
resolvedChunk.value = value;
resolvedChunk.resolve = null;
resolve();
}

// Report that any missing chunks in the model is now going to throw this
// error upon read. Also notify any pending promises.
export function reportGlobalError(
response: OpaqueResponse,
error: Error,
): void {
Object.defineProperty(
response.modelRoot,
'model',
({
configurable: true,
enumerable: true,
get() {
throw error;
},
}: any),
);
response.rootPing();
response.chunks.forEach(chunk => {
// If this chunk was already resolved or errored, it won't
// trigger an error but if it wasn't then we need to
// because we won't be getting any new data to resolve it.
triggerErrorOnChunk(chunk, error);
});
}

function definePendingProperty(
object: Object,
key: string,
chunk: Chunk,
): void {
Object.defineProperty(object, key, {
configurable: false,
enumerable: true,
get() {
if (chunk.status === RESOLVED) {
return chunk.value;
} else {
throw chunk.value;
}
},
});
}

function parseFromJSON(
response: OpaqueResponse,
targetObj: Object,
key: string,
value: JSONValue,
): any {
if (typeof value === 'string' && value[0] === '$') {
if (value[1] === '$') {
// This was an escaped string value.
return value.substring(1);
} else {
let id = parseInt(value.substring(1), 16);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunk = createPendingChunk();
chunks.set(id, chunk);
} else if (chunk.status === RESOLVED) {
return chunk.value;
}
definePendingProperty(targetObj, key, chunk);
return undefined;
}
}
return value;
}

function resolveJSONRow(
response: OpaqueResponse,
id: number,
json: string,
): void {
let model = JSON.parse(json, response.fromJSON);
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createResolvedChunk(model));
} else {
resolveChunk(chunk, model);
}
}

function processFullRow(response: OpaqueResponse, row: string): void {
if (row === '') {
return;
}
let tag = row[0];
switch (tag) {
case 'J': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
resolveJSONRow(response, id, json);
return;
}
case 'E': {
let colon = row.indexOf(':', 1);
let id = parseInt(row.substring(1, colon), 16);
let json = row.substring(colon + 1);
let errorInfo = JSON.parse(json);
let error = new Error(errorInfo.message);
error.stack = errorInfo.stack;
let chunks = response.chunks;
let chunk = chunks.get(id);
if (!chunk) {
chunks.set(id, createErrorChunk(error));
} else {
triggerErrorOnChunk(chunk, error);
}
return;
}
default: {
// Assume this is the root model.
resolveJSONRow(response, 0, row);
return;
}
}
}

export function processStringChunk(
response: OpaqueResponse,
chunk: string,
offset: number,
): void {
response.partialRow += chunk.substr(offset);
let linebreak = chunk.indexOf('\n', offset);
while (linebreak > -1) {
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
processFullRow(response, fullrow);
response.partialRow = '';
offset = linebreak + 1;
linebreak = chunk.indexOf('\n', offset);
}
response.partialRow += chunk.substring(offset);
}

export function processBinaryChunk(
response: OpaqueResponse,
chunk: Uint8Array,
offset: number,
): void {
if (!supportsBinaryStreams) {
throw new Error("This environment don't support binary chunks.");
}
response.partialRow += readPartialStringChunk(response.stringDecoder, chunk);
let stringDecoder = response.stringDecoder;
let linebreak = chunk.indexOf(10); // newline
while (linebreak > -1) {
let fullrow =
response.partialRow +
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
processFullRow(response, fullrow);
response.partialRow = '';
chunk = chunk.subarray(linebreak + 1);
linebreak = chunk.indexOf(10); // newline
}
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
}

let emptyBuffer = new Uint8Array(0);
export function complete(response: OpaqueResponse): void {
if (supportsBinaryStreams) {
// This should never be needed since we're expected to have complete
// code units at the end of JSON.
response.partialRow += readFinalStringChunk(
response.stringDecoder,
emptyBuffer,
);
}
let modelRoot = response.modelRoot;
let model = JSON.parse(response.partialRow);
Object.defineProperty(modelRoot, 'model', {
value: model,
});
response.rootPing();
// In case there are any remaining unresolved chunks, they won't
// be resolved now. So we need to issue an error to those.
// Ideally we should be able to early bail out if we kept a
// ref count of pending chunks.
reportGlobalError(response, new Error('Connection closed.'));
gaearon marked this conversation as resolved.
Show resolved Hide resolved
}

export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {
Expand Down
5 changes: 1 addition & 4 deletions packages/react-server/src/ReactFizzStreamer.js
Expand Up @@ -76,10 +76,7 @@ export function startWork(request: OpaqueRequest): void {
scheduleWork(() => performWork(request));
}

export function startFlowing(
request: OpaqueRequest,
desiredBytes: number,
): void {
export function startFlowing(request: OpaqueRequest): void {
request.flowing = false;
flushCompletedChunks(request);
}