Skip to content

Commit

Permalink
feat: implement support in Response.json/text/arrayBuffer methods for…
Browse files Browse the repository at this point in the history
… guest provided streams
  • Loading branch information
JakeChampion committed Nov 7, 2022
1 parent e5982d8 commit 50cdc44
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 51 deletions.
6 changes: 5 additions & 1 deletion .vscode/settings.json
Expand Up @@ -80,7 +80,11 @@
"map": "cpp",
"set": "cpp",
"unordered_set": "cpp",
"numeric": "cpp"
"numeric": "cpp",
"__tree": "cpp",
"charconv": "cpp",
"list": "cpp",
"regex": "cpp"
},
"git.ignoreLimitWarning": true
}
2 changes: 2 additions & 0 deletions c-dependencies/js-compute-runtime/error-numbers.msg
Expand Up @@ -76,4 +76,6 @@ MSG_DEF(JSMSG_BACKEND_TLS_MIN_INVALID, 0, JSEXN_RANGEERR
MSG_DEF(JSMSG_BACKEND_TLS_MAX_INVALID, 0, JSEXN_RANGEERR, "Backend constructor: tlsMaxVersion must be either 1, 1.1, 1.2, or 1.3")
MSG_DEF(JSMSG_BACKEND_TLS_MIN_GREATER_THAN_TLS_MAX, 0, JSEXN_RANGEERR, "Backend constructor: tlsMinVersion must be less than or equal to tlsMaxVersion")
MSG_DEF(JSMSG_BACKEND_PORT_INVALID, 0, JSEXN_RANGEERR, "Backend constructor: port must be more than 0 and less than 2^16 (65,536)")
MSG_DEF(JSMSG_RESPONSE_VALUE_NOT_UINT8ARRAY, 0, JSEXN_TYPEERR, "Can't convert value to Uint8Array while consuming Body")
MSG_DEF(JSMSG_RESPONSE_BODY_DISTURBED_OR_LOCKED, 0, JSEXN_TYPEERR, "Response body object should not be disturbed or locked")
//clang-format on
258 changes: 249 additions & 9 deletions c-dependencies/js-compute-runtime/js-compute-builtins.cpp
Expand Up @@ -694,11 +694,248 @@ bool parse_body(JSContext *cx, HandleObject self, UniqueChars buf, size_t len) {
return JS::ResolvePromise(cx, result_promise, result);
}

bool consume_content_stream_for_bodyAll(JSContext *cx, HandleObject self, HandleObject stream,
HandleValue body_parser) {
JS_ReportErrorLatin1(cx, "Consuming a content-provided ReadableStream as a body using "
".text(), .json(), or .arrayBuffer() not yet supported");
return false;
bool content_stream_read_then_handler(JSContext *cx, HandleObject self, HandleValue extra,
CallArgs args) {
RootedObject then_handler(cx, &args.callee());
// The reader is stored in the catch handler, which we need here as well.
// So we get that first, then the reader.
MOZ_ASSERT(extra.isObject());
RootedObject catch_handler(cx, &extra.toObject());
#ifdef DEBUG
bool foundContents;
if (!JS_HasElement(cx, catch_handler, 1, &foundContents)) {
return false;
}
MOZ_ASSERT(foundContents);
#endif
RootedValue contents_val(cx);
if (!JS_GetElement(cx, catch_handler, 1, &contents_val)) {
return false;
}
MOZ_ASSERT(contents_val.isObject());
RootedObject contents(cx, &contents_val.toObject());
if (!contents) {
return false;
}
#ifdef DEBUG
bool contentsIsArray;
if (!JS::IsArrayObject(cx, contents, &contentsIsArray)) {
return false;
}
MOZ_ASSERT(contentsIsArray);
#endif

auto reader_val = js::GetFunctionNativeReserved(catch_handler, 1);
MOZ_ASSERT(reader_val.isObject());
RootedObject reader(cx, &reader_val.toObject());

// We're guaranteed to work with a native ReadableStreamDefaultReader here as we used
// `JS::ReadableStreamDefaultReaderRead(cx, reader)`, which in turn is guaranteed to return {done:
// bool, value: any} objects to read promise then callbacks.
MOZ_ASSERT(args[0].isObject());
RootedObject chunk_obj(cx, &args[0].toObject());
RootedValue done_val(cx);
RootedValue value(cx);
#ifdef DEBUG
bool hasValue;
if (!JS_HasProperty(cx, chunk_obj, "value", &hasValue)) {
return false;
}
MOZ_ASSERT(hasValue);
#endif
if (!JS_GetProperty(cx, chunk_obj, "value", &value)) {
return false;
}
#ifdef DEBUG
bool hasDone;
if (!JS_HasProperty(cx, chunk_obj, "done", &hasDone)) {
return false;
}
MOZ_ASSERT(hasDone);
#endif
if (!JS_GetProperty(cx, chunk_obj, "done", &done_val)) {
return false;
}
MOZ_ASSERT(done_val.isBoolean());
if (done_val.toBoolean()) {
// We finished reading the stream
// Now we need to iterate/reduce `contents` JS Array into UniqueChars
uint32_t contentsLength;
if (!JS::GetArrayLength(cx, contents, &contentsLength)) {
return false;
}
std::vector<uint8_t> allBytes;
for (uint32_t index = 0; index < contentsLength; index++) {
RootedValue val(cx);
if (!JS_GetElement(cx, contents, index, &val)) {
return false;
}
{
JS::AutoCheckCannotGC nogc;
MOZ_ASSERT(val.isObject());
JSObject *array = &val.toObject();
MOZ_ASSERT(JS_IsUint8Array(array));
bool is_shared;
uint8_t *bytes = JS_GetUint8ArrayData(array, &is_shared, nogc);
size_t length = JS_GetTypedArrayByteLength(array);
allBytes.insert(allBytes.end(), bytes, bytes + length);
}
}
#ifdef DEBUG
bool foundBodyParser;
if (!JS_HasElement(cx, catch_handler, 2, &foundBodyParser)) {
return false;
}
MOZ_ASSERT(foundBodyParser);
#endif
// Now we can call parse_body on the result
RootedValue body_parser(cx);
if (!JS_GetElement(cx, catch_handler, 2, &body_parser)) {
return false;
}
auto parse_body = (ParseBodyCB *)body_parser.toPrivate();
size_t bytes_read = allBytes.size();
UniqueChars buf(reinterpret_cast<char *>(allBytes.data()));
if (!buf) {
RootedObject result_promise(cx, &JS::GetReservedSlot(self, Slots::BodyAllPromise).toObject());
JS::SetReservedSlot(self, Slots::BodyAllPromise, JS::UndefinedValue());
return RejectPromiseWithPendingError(cx, result_promise);
}
return parse_body(cx, self, std::move(buf), bytes_read);
}

RootedValue val(cx);
if (!JS_GetProperty(cx, chunk_obj, "value", &val)) {
return false;
}

// The read operation can return anything since this stream comes from the guest
// If it is not a UInt8Array -- reject with a TypeError
if (!val.isObject() || !JS_IsUint8Array(&val.toObject())) {
JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr, JSMSG_RESPONSE_VALUE_NOT_UINT8ARRAY);
RootedObject result_promise(cx);
result_promise = &JS::GetReservedSlot(self, Slots::BodyAllPromise).toObject();
JS::SetReservedSlot(self, Slots::BodyAllPromise, JS::UndefinedValue());
return RejectPromiseWithPendingError(cx, result_promise);
}

{
uint32_t contentsLength;
if (!JS::GetArrayLength(cx, contents, &contentsLength)) {
return false;
}
if (!JS_SetElement(cx, contents, contentsLength, val)) {
return false;
}
}

// Read the next chunk.
RootedObject promise(cx, JS::ReadableStreamDefaultReaderRead(cx, reader));
if (!promise)
return false;
return JS::AddPromiseReactions(cx, promise, then_handler, catch_handler);
}

bool content_stream_read_catch_handler(JSContext *cx, HandleObject self, HandleValue extra,
CallArgs args) {
// The stream errored when being consumed
// we need to propagate the stream error
MOZ_ASSERT(extra.isObject());
RootedObject reader(cx, &extra.toObject());
RootedValue stream_val(cx);
if (!JS_GetElement(cx, reader, 1, &stream_val)) {
return false;
}
MOZ_ASSERT(stream_val.isObject());
RootedObject stream(cx, &stream_val.toObject());
if (!stream) {
return false;
}
MOZ_ASSERT(JS::IsReadableStream(stream));
#ifdef DEBUG
bool isError;
if (!JS::ReadableStreamIsErrored(cx, stream, &isError)) {
return false;
}
MOZ_ASSERT(isError);
#endif
RootedValue error(cx, JS::ReadableStreamGetStoredError(cx, stream));
JS_ClearPendingException(cx);
JS_SetPendingException(cx, error, JS::ExceptionStackBehavior::DoNotCapture);
RootedObject result_promise(cx);
result_promise = &JS::GetReservedSlot(self, Slots::BodyAllPromise).toObject();
JS::SetReservedSlot(self, Slots::BodyAllPromise, JS::UndefinedValue());
return RejectPromiseWithPendingError(cx, result_promise);
}

bool consume_content_stream_for_bodyAll(JSContext *cx, HandleObject self, HandleValue stream_val,
CallArgs args) {
// The body_parser is stored in the stream object, which we need here as well.
RootedObject stream(cx, &stream_val.toObject());
RootedValue body_parser(cx);
if (!JS_GetElement(cx, stream, 1, &body_parser)) {
return false;
}
MOZ_ASSERT(JS::IsReadableStream(stream));
if (RequestOrResponse::body_unusable(cx, stream)) {
JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
JSMSG_RESPONSE_BODY_DISTURBED_OR_LOCKED);
RootedObject result_promise(cx);
result_promise = &JS::GetReservedSlot(self, Slots::BodyAllPromise).toObject();
JS::SetReservedSlot(self, Slots::BodyAllPromise, JS::UndefinedValue());
return RejectPromiseWithPendingError(cx, result_promise);
}
JS::Rooted<JSObject *> unwrappedReader(
cx, JS::ReadableStreamGetReader(cx, stream, JS::ReadableStreamReaderMode::Default));
if (!unwrappedReader) {
return false;
}

// contents is the JS Array we store the stream chunks within, to later convert to
// arrayBuffer/json/text
JS::Rooted<JSObject *> contents(cx, JS::NewArrayObject(cx, 0));
if (!contents) {
return false;
}

RootedValue extra(cx, ObjectValue(*unwrappedReader));
// TODO: confirm whether this is observable to the JS application
if (!JS_SetElement(cx, unwrappedReader, 1, stream)) {
return false;
}

// Create handlers for both `then` and `catch`.
// These are functions with two reserved slots, in which we store all
// information required to perform the reactions. We store the actually
// required information on the catch handler, and a reference to that on the
// then handler. This allows us to reuse these functions for the next read
// operation in the then handler. The catch handler won't ever have a need to
// perform another operation in this way.
RootedObject catch_handler(
cx, create_internal_method<content_stream_read_catch_handler>(cx, self, extra));
if (!catch_handler) {
return false;
}

extra.setObject(*catch_handler);
if (!JS_SetElement(cx, catch_handler, 1, contents)) {
return false;
}
if (!JS_SetElement(cx, catch_handler, 2, body_parser)) {
return false;
}
RootedObject then_handler(
cx, create_internal_method<content_stream_read_then_handler>(cx, self, extra));
if (!then_handler) {
return false;
}

// Read the next chunk.
RootedObject promise(cx, JS::ReadableStreamDefaultReaderRead(cx, unwrappedReader));
if (!promise) {
return false;
}
return JS::AddPromiseReactions(cx, promise, then_handler, catch_handler);
}

bool consume_body_handle_for_bodyAll(JSContext *cx, HandleObject self, HandleValue body_parser,
Expand Down Expand Up @@ -757,7 +994,11 @@ bool bodyAll(JSContext *cx, CallArgs args, HandleObject self) {
// https://github.com/fastly/js-compute-runtime/issues/218
RootedObject stream(cx, body_stream(self));
if (stream && !builtins::NativeStreamSource::stream_is_body(cx, stream)) {
if (!consume_content_stream_for_bodyAll(cx, self, stream, body_parser)) {
if (!JS_SetElement(cx, stream, 1, body_parser)) {
return false;
}
RootedValue extra(cx, ObjectValue(*stream));
if (!enqueue_internal_method<consume_content_stream_for_bodyAll>(cx, self, extra)) {
return ReturnPromiseRejectedWithPendingError(cx, args);
}
} else {
Expand Down Expand Up @@ -947,9 +1188,8 @@ bool maybe_stream_body(JSContext *cx, HandleObject body_owner, bool *requires_st
}

if (RequestOrResponse::body_unusable(cx, stream)) {
// TODO: Improve this message; `disturbed` is probably too spec-internal a
// term.
JS_ReportErrorUTF8(cx, "Can't send a body stream that's locked or disturbed");
JS_ReportErrorNumberASCII(cx, GetErrorMessage, nullptr,
JSMSG_RESPONSE_BODY_DISTURBED_OR_LOCKED);
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion c-dependencies/js-compute-runtime/js-compute-runtime.cpp
Expand Up @@ -219,8 +219,9 @@ static void abort(JSContext *cx, const char *description) {

// Respond with status `500` if no response was ever sent.
HandleObject fetch_event = FetchEvent::instance();
if (hasWizeningFinished() && !FetchEvent::response_started(fetch_event))
if (hasWizeningFinished() && !FetchEvent::response_started(fetch_event)) {
FetchEvent::respondWithError(cx, fetch_event);
}

fflush(stderr);
exit(1);
Expand Down
Expand Up @@ -6,7 +6,7 @@
"status": "PASS"
},
"ReadableStream start() Error propagates to Response.arrayBuffer() Promise": {
"status": "FAIL"
"status": "PASS"
},
"ReadableStream start() Error propagates to Response.blob() Promise": {
"status": "FAIL"
Expand All @@ -15,13 +15,13 @@
"status": "FAIL"
},
"ReadableStream start() Error propagates to Response.json() Promise": {
"status": "FAIL"
"status": "PASS"
},
"ReadableStream start() Error propagates to Response.text() Promise": {
"status": "FAIL"
"status": "PASS"
},
"ReadableStream pull() Error propagates to Response.arrayBuffer() Promise": {
"status": "FAIL"
"status": "PASS"
},
"ReadableStream pull() Error propagates to Response.blob() Promise": {
"status": "FAIL"
Expand All @@ -30,9 +30,9 @@
"status": "FAIL"
},
"ReadableStream pull() Error propagates to Response.json() Promise": {
"status": "FAIL"
"status": "PASS"
},
"ReadableStream pull() Error propagates to Response.text() Promise": {
"status": "FAIL"
"status": "PASS"
}
}
Expand Up @@ -15,13 +15,13 @@
"status": "FAIL"
},
"Getting text after getting the Response body - not disturbed, not locked (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting json after getting the Response body - not disturbed, not locked (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting arrayBuffer after getting the Response body - not disturbed, not locked (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting blob after getting the Response body - not disturbed, not locked (body source: string)": {
"status": "FAIL"
Expand Down
Expand Up @@ -15,13 +15,13 @@
"status": "FAIL"
},
"Getting text after getting a locked Response body (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting json after getting a locked Response body (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting arrayBuffer after getting a locked Response body (body source: stream)": {
"status": "FAIL"
"status": "PASS"
},
"Getting blob after getting a locked Response body (body source: string)": {
"status": "FAIL"
Expand Down

0 comments on commit 50cdc44

Please sign in to comment.