Skip to content

Commit

Permalink
[Flight] Encode ReadableStream and AsyncIterables (#28847)
Browse files Browse the repository at this point in the history
This adds support in Flight for serializing four kinds of streams:

- `ReadableStream` with objects as a model. This is a single shot
iterator so you can read it only once. It can contain any value
including Server Components. Chunks are encoded as is so if you send in
10 typed arrays, you get the same typed arrays out on the other side.
- Binary `ReadableStream` with `type: 'bytes'` option. This supports the
BYOB protocol. In this mode, the receiving side just gets `Uint8Array`s
and they can be split across any single byte boundary into arbitrary
chunks.
- `AsyncIterable` where the `AsyncIterator` function is different than
the `AsyncIterable` itself. In this case we assume that this might be a
multi-shot iterable and so we buffer its value and you can iterate it
multiple times on the other side. We support the `return` value as a
value in the single completion slot, but you can't pass values in
`next()`. If you want single-shot, return the AsyncIterator instead.
- `AsyncIterator`. These gets serialized as a single-shot as it's just
an iterator.

`AsyncIterable`/`AsyncIterator` yield Promises that are instrumented
with our `.status`/`.value` convention so that they can be synchronously
looped over if available. They are also lazily parsed upon read.

We can't do this with `ReadableStream` because we use the native
implementation of `ReadableStream` which owns the promises.

The format is a leading row that indicates which type of stream it is.
Then a new row with the same ID is emitted for every chunk. Followed by
either an error or close row.

`AsyncIterable`s can also be returned as children of Server Components
and then they're conceptually the same as fragment arrays/iterables.
They can't actually be used as children in Fizz/Fiber but there's a
separate plan for that. Only `AsyncIterable` not `AsyncIterator` will be
valid as children - just like sync `Iterable` is already supported but
single-shot `Iterator` is not. Notably, neither of these streams
represent updates over time to a value. They represent multiple values
in a list.

When the server stream is aborted we also close the underlying stream.
However, closing a stream on the client, doesn't close the underlying
stream.

A couple of possible follow ups I'm not planning on doing right now:

- [ ] Free memory by releasing the buffer if an Iterator has been
exhausted. Single shots could be optimized further to release individual
items as you go.
- [ ] We could clean up the underlying stream if the only pending data
that's still flowing is from streams and all the streams have cleaned
up. It's not very reliable though. It's better to do cancellation for
the whole stream - e.g. at the framework level.
- [ ] Implement smarter Binary Stream chunk handling. Currently we wait
until we've received a whole row for binary chunks and copy them into
consecutive memory. We need this to preserve semantics when passing
typed arrays. However, for binary streams we don't need that. We can
just send whatever pieces we have so far.

DiffTrain build for [7909d8e](7909d8e)
  • Loading branch information
sebmarkbage committed Apr 16, 2024
1 parent 34d4293 commit 9080248
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 167 deletions.
2 changes: 1 addition & 1 deletion compiled/facebook-www/REVISION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9defcd56bc3cd53ac2901ed93f29218007010434
7909d8eabb7a702618f51e16a351df41aa8da88e
32 changes: 19 additions & 13 deletions compiled/facebook-www/ReactDOMServer-dev.classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if (__DEV__) {
var React = require("react");
var ReactDOM = require("react-dom");

var ReactVersion = "19.0.0-www-classic-bf1258d7";
var ReactVersion = "19.0.0-www-classic-9835bfc2";

// This refers to a WWW module.
var warningWWW = require("warning");
Expand Down Expand Up @@ -9940,8 +9940,14 @@ if (__DEV__) {
}

default: {
if (typeof thenable.status === "string");
else {
if (typeof thenable.status === "string") {
// Only instrument the thenable if the status if not defined. If
// it's defined, but an unknown value, assume it's been instrumented by
// some custom userspace implementation. We treat it as "pending".
// Attach a dummy listener, to ensure that any lazy initialization can
// happen. Flight lazily parses JSON when the value is actually awaited.
thenable.then(noop$2, noop$2);
} else {
var pendingThenable = thenable;
pendingThenable.status = "pending";
pendingThenable.then(
Expand All @@ -9959,18 +9965,18 @@ if (__DEV__) {
rejectedThenable.reason = error;
}
}
); // Check one more time in case the thenable resolved synchronously
);
} // Check one more time in case the thenable resolved synchronously

switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}
switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}

case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
} // Suspend.
//
Expand Down
32 changes: 19 additions & 13 deletions compiled/facebook-www/ReactDOMServer-dev.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if (__DEV__) {
var React = require("react");
var ReactDOM = require("react-dom");

var ReactVersion = "19.0.0-www-modern-d9b30156";
var ReactVersion = "19.0.0-www-modern-72ca4dea";

// This refers to a WWW module.
var warningWWW = require("warning");
Expand Down Expand Up @@ -9861,8 +9861,14 @@ if (__DEV__) {
}

default: {
if (typeof thenable.status === "string");
else {
if (typeof thenable.status === "string") {
// Only instrument the thenable if the status if not defined. If
// it's defined, but an unknown value, assume it's been instrumented by
// some custom userspace implementation. We treat it as "pending".
// Attach a dummy listener, to ensure that any lazy initialization can
// happen. Flight lazily parses JSON when the value is actually awaited.
thenable.then(noop$2, noop$2);
} else {
var pendingThenable = thenable;
pendingThenable.status = "pending";
pendingThenable.then(
Expand All @@ -9880,18 +9886,18 @@ if (__DEV__) {
rejectedThenable.reason = error;
}
}
); // Check one more time in case the thenable resolved synchronously
);
} // Check one more time in case the thenable resolved synchronously

switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}
switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}

case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
} // Suspend.
//
Expand Down
23 changes: 11 additions & 12 deletions compiled/facebook-www/ReactDOMServer-prod.classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2952,9 +2952,9 @@ function trackUsedThenable(thenableState, thenable, index) {
case "rejected":
throw thenable.reason;
default:
if ("string" !== typeof thenable.status)
switch (
((thenableState = thenable),
"string" === typeof thenable.status
? thenable.then(noop$2, noop$2)
: ((thenableState = thenable),
(thenableState.status = "pending"),
thenableState.then(
function (fulfilledValue) {
Expand All @@ -2971,14 +2971,13 @@ function trackUsedThenable(thenableState, thenable, index) {
rejectedThenable.reason = error;
}
}
),
thenable.status)
) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
));
switch (thenable.status) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
suspendedThenable = thenable;
throw SuspenseException;
}
Expand Down Expand Up @@ -5681,4 +5680,4 @@ exports.renderToString = function (children, options) {
'The server used "renderToString" which does not support Suspense. If you intended for this Suspense boundary to render the fallback content on the server consider throwing an Error somewhere within the Suspense boundary. If you intended to have the server wait for the suspended component please switch to "renderToReadableStream" which supports Suspense on the server'
);
};
exports.version = "19.0.0-www-classic-c54d680a";
exports.version = "19.0.0-www-classic-fef33f20";
23 changes: 11 additions & 12 deletions compiled/facebook-www/ReactDOMServer-prod.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -2944,9 +2944,9 @@ function trackUsedThenable(thenableState, thenable, index) {
case "rejected":
throw thenable.reason;
default:
if ("string" !== typeof thenable.status)
switch (
((thenableState = thenable),
"string" === typeof thenable.status
? thenable.then(noop$2, noop$2)
: ((thenableState = thenable),
(thenableState.status = "pending"),
thenableState.then(
function (fulfilledValue) {
Expand All @@ -2963,14 +2963,13 @@ function trackUsedThenable(thenableState, thenable, index) {
rejectedThenable.reason = error;
}
}
),
thenable.status)
) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
));
switch (thenable.status) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
suspendedThenable = thenable;
throw SuspenseException;
}
Expand Down Expand Up @@ -5659,4 +5658,4 @@ exports.renderToString = function (children, options) {
'The server used "renderToString" which does not support Suspense. If you intended for this Suspense boundary to render the fallback content on the server consider throwing an Error somewhere within the Suspense boundary. If you intended to have the server wait for the suspended component please switch to "renderToReadableStream" which supports Suspense on the server'
);
};
exports.version = "19.0.0-www-modern-4944636f";
exports.version = "19.0.0-www-modern-39d2e934";
30 changes: 18 additions & 12 deletions compiled/facebook-www/ReactDOMServerStreaming-dev.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -9743,8 +9743,14 @@ if (__DEV__) {
}

default: {
if (typeof thenable.status === "string");
else {
if (typeof thenable.status === "string") {
// Only instrument the thenable if the status if not defined. If
// it's defined, but an unknown value, assume it's been instrumented by
// some custom userspace implementation. We treat it as "pending".
// Attach a dummy listener, to ensure that any lazy initialization can
// happen. Flight lazily parses JSON when the value is actually awaited.
thenable.then(noop$2, noop$2);
} else {
var pendingThenable = thenable;
pendingThenable.status = "pending";
pendingThenable.then(
Expand All @@ -9762,18 +9768,18 @@ if (__DEV__) {
rejectedThenable.reason = error;
}
}
); // Check one more time in case the thenable resolved synchronously
);
} // Check one more time in case the thenable resolved synchronously

switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}
switch (thenable.status) {
case "fulfilled": {
var fulfilledThenable = thenable;
return fulfilledThenable.value;
}

case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
case "rejected": {
var rejectedThenable = thenable;
throw rejectedThenable.reason;
}
} // Suspend.
//
Expand Down
21 changes: 10 additions & 11 deletions compiled/facebook-www/ReactDOMServerStreaming-prod.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -2818,9 +2818,9 @@ function trackUsedThenable(thenableState, thenable, index) {
case "rejected":
throw thenable.reason;
default:
if ("string" !== typeof thenable.status)
switch (
((thenableState = thenable),
"string" === typeof thenable.status
? thenable.then(noop$2, noop$2)
: ((thenableState = thenable),
(thenableState.status = "pending"),
thenableState.then(
function (fulfilledValue) {
Expand All @@ -2837,14 +2837,13 @@ function trackUsedThenable(thenableState, thenable, index) {
rejectedThenable.reason = error;
}
}
),
thenable.status)
) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
));
switch (thenable.status) {
case "fulfilled":
return thenable.value;
case "rejected":
throw thenable.reason;
}
suspendedThenable = thenable;
throw SuspenseException;
}
Expand Down
40 changes: 33 additions & 7 deletions compiled/facebook-www/ReactFlightDOMClient-dev.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ if (__DEV__) {
break;

default:
reject(chunk.reason);
if (reject) {
reject(chunk.reason);
}

break;
}
};
Expand Down Expand Up @@ -472,7 +475,6 @@ if (__DEV__) {

function triggerErrorOnChunk(chunk, error) {
if (chunk.status !== PENDING && chunk.status !== BLOCKED) {
// We already resolved. We didn't expect to see this.
return;
}

Expand Down Expand Up @@ -503,7 +505,6 @@ if (__DEV__) {

function resolveModelChunk(chunk, value) {
if (chunk.status !== PENDING) {
// We already resolved. We didn't expect to see this.
return;
}

Expand Down Expand Up @@ -816,6 +817,7 @@ if (__DEV__) {
typeof chunkValue === "object" &&
chunkValue !== null &&
(Array.isArray(chunkValue) ||
typeof chunkValue[ASYNC_ITERATOR] === "function" ||
chunkValue.$$typeof === REACT_ELEMENT_TYPE) &&
!chunkValue._debugInfo
) {
Expand Down Expand Up @@ -1118,8 +1120,7 @@ if (__DEV__) {
}

function resolveText(response, id, text) {
var chunks = response._chunks; // We assume that we always reference large strings after they've been
// emitted.
var chunks = response._chunks;

chunks.set(id, createInitializedTextChunk(response, text));
}
Expand Down Expand Up @@ -1171,6 +1172,8 @@ if (__DEV__) {
}
}

var ASYNC_ITERATOR = Symbol.asyncIterator;

function resolveErrorDev(response, id, digest, message, stack) {
var error = new Error(
message ||
Expand Down Expand Up @@ -1275,6 +1278,26 @@ if (__DEV__) {
}
}

case 82:
/* "R" */
// Fallthrough

case 114:
/* "r" */
// Fallthrough

case 88:
/* "X" */
// Fallthrough

case 120:
/* "x" */
// Fallthrough

case 67:
/* "C" */
// Fallthrough

case 80:
/* "P" */
// Fallthrough
Expand Down Expand Up @@ -1330,9 +1353,12 @@ if (__DEV__) {
rowState = ROW_LENGTH;
i++;
} else if (
resolvedRowTag > 64 &&
resolvedRowTag < 91
(resolvedRowTag > 64 && resolvedRowTag < 91) ||
/* "A"-"Z" */
resolvedRowTag === 114 ||
/* "r" */
resolvedRowTag === 120
/* "x" */
) {
rowTag = resolvedRowTag;
rowState = ROW_CHUNK_BY_NEWLINE;
Expand Down
6 changes: 4 additions & 2 deletions compiled/facebook-www/ReactFlightDOMClient-prod.modern.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Chunk.prototype.then = function (resolve, reject) {
(null === this.reason && (this.reason = []), this.reason.push(reject));
break;
default:
reject(this.reason);
reject && reject(this.reason);
}
};
function readChunk(chunk) {
Expand Down Expand Up @@ -438,7 +438,9 @@ function startReadingFromStream(response, stream) {
rowState = value[i];
84 === rowState
? ((rowTag = rowState), (rowState = 2), i++)
: 64 < rowState && 91 > rowState
: (64 < rowState && 91 > rowState) ||
114 === rowState ||
120 === rowState
? ((rowTag = rowState), (rowState = 3), i++)
: ((rowTag = 0), (rowState = 3));
continue;
Expand Down
Loading

0 comments on commit 9080248

Please sign in to comment.