Skip to content

Commit

Permalink
fix: refactor our async task implementation to be a generic AsyncTask…
Browse files Browse the repository at this point in the history
… class instead of separate implementations for each async operation

This implementation enables our event loop to no longer have to be aware of all the different async tasks and how to process them, instead it understands AsyncTask and to call the `process` function contained within an AsyncTask.

A side-effect of this refactoring is that KVStore.prototype.lookup and KVStore.prototype.delete now work when being called multiple times in parallel - previously this would not work as the KVStore class contained a slot for the Promise of the async work, which meant if you called either method more than once, the previous method's Promise would have been overwritten within the Slot. The new implementation creates a new AsyncTask instance for each lookup and each delete invocation.
  • Loading branch information
JakeChampion committed Apr 11, 2024
1 parent 29764e2 commit 68dfec7
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 203 deletions.
32 changes: 32 additions & 0 deletions integration-tests/js-compute/fixtures/app/src/kv-store.js
Expand Up @@ -833,6 +833,38 @@ import { routes } from "./routes.js";
if (error) { return error }
return pass()
});

routes.set("/kv-store/get/multiple-lookups-at-once", async () => {
let store = createValidStore()
let key1 = `key-exists-${Math.random()}`;
await store.put(key1, '1hello1')
let key2 = `key-exists-${Math.random()}`;
await store.put(key2, '2hello2')
let key3 = `key-exists-${Math.random()}`;
await store.put(key3, '3hello3')
let key4 = `key-exists-${Math.random()}`;
await store.put(key4, '4hello4')
let key5 = `key-exists-${Math.random()}`;
await store.put(key5, '5hello5')
let results = await Promise.all([
store.get(key1),
store.get(key2),
store.get(key3),
store.get(key4),
store.get(key5),
]);
let error = assert(await results[0].text(), '1hello1', `await results[0].text()`)
if (error) { return error }
error = assert(await results[1].text(), '2hello2', `await results[1].text()`)
if (error) { return error }
error = assert(await results[2].text(), '3hello3', `await results[2].text()`)
if (error) { return error }
error = assert(await results[3].text(), '4hello4', `await results[3].text()`)
if (error) { return error }
error = assert(await results[4].text(), '5hello5', `await results[4].text()`)
if (error) { return error }
return pass()
});
}
}

Expand Down
86 changes: 19 additions & 67 deletions runtime/js-compute-runtime/builtins/kv-store.cpp
Expand Up @@ -183,44 +183,17 @@ bool parse_and_validate_key(JSContext *cx, const char *key, size_t len) {

} // namespace

bool KVStore::has_pending_delete_handle(JSObject *self) {
MOZ_ASSERT(KVStore::is_instance(self));

JS::Value handle_val =
JS::GetReservedSlot(self, static_cast<uint32_t>(Slots::PendingDeleteHandle));
return handle_val.isInt32() &&
handle_val.toInt32() != host_api::ObjectStorePendingDelete::invalid;
}

host_api::ObjectStorePendingDelete KVStore::pending_delete_handle(JSObject *self) {
MOZ_ASSERT(KVStore::is_instance(self));
host_api::ObjectStorePendingDelete res;

JS::Value handle_val =
JS::GetReservedSlot(self, static_cast<uint32_t>(Slots::PendingDeleteHandle));
if (handle_val.isInt32()) {
res = host_api::ObjectStorePendingDelete(handle_val.toInt32());
}

return res;
}

bool KVStore::process_pending_kv_store_delete(JSContext *cx, JS::HandleObject self) {
MOZ_ASSERT(KVStore::is_instance(self));

auto pending_promise_value =
JS::GetReservedSlot(self, static_cast<uint32_t>(Slots::PendingDeletePromise));
MOZ_ASSERT(pending_promise_value.isObject());
JS::RootedObject result_promise(cx, &pending_promise_value.toObject());

auto res = builtins::KVStore::pending_delete_handle(self).wait();
bool KVStore::process_pending_kv_store_delete(JSContext *cx, int32_t handle,
JS::HandleObject context, JS::HandleObject promise) {
host_api::ObjectStorePendingDelete pending_lookup(handle);

auto res = pending_lookup.wait();
if (auto *err = res.to_err()) {
HANDLE_ERROR(cx, *err);
return RejectPromiseWithPendingError(cx, result_promise);
return RejectPromiseWithPendingError(cx, promise);
}

JS::ResolvePromise(cx, result_promise, JS::UndefinedHandleValue);
JS::ResolvePromise(cx, promise, JS::UndefinedHandleValue);

return true;
}
Expand Down Expand Up @@ -253,45 +226,26 @@ bool KVStore::delete_(JSContext *cx, unsigned argc, JS::Value *vp) {
}
auto ret = res.unwrap();

JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::PendingDeleteHandle),
JS::Int32Value(ret.handle));
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::PendingDeletePromise),
JS::ObjectValue(*result_promise));
auto task = core::AsyncTask::create(cx, ret.handle, self, result_promise,
KVStore::process_pending_kv_store_delete);

if (!core::EventLoop::queue_async_task(self)) {
if (!core::EventLoop::queue_async_task(task)) {
return ReturnPromiseRejectedWithPendingError(cx, args);
}

args.rval().setObject(*result_promise);
return true;
}

host_api::ObjectStorePendingLookup KVStore::pending_lookup_handle(JSObject *self) {
MOZ_ASSERT(KVStore::is_instance(self));
host_api::ObjectStorePendingLookup res;

JS::Value handle_val =
JS::GetReservedSlot(self, static_cast<uint32_t>(Slots::PendingLookupHandle));
if (handle_val.isInt32()) {
res = host_api::ObjectStorePendingLookup(handle_val.toInt32());
}

return res;
}

bool KVStore::process_pending_kv_store_lookup(JSContext *cx, JS::HandleObject self) {
MOZ_ASSERT(KVStore::is_instance(self));

auto pending_promise_value =
JS::GetReservedSlot(self, static_cast<uint32_t>(Slots::PendingLookupPromise));
MOZ_ASSERT(pending_promise_value.isObject());
JS::RootedObject result_promise(cx, &pending_promise_value.toObject());
bool KVStore::process_pending_kv_store_lookup(JSContext *cx, int32_t handle,
JS::HandleObject context, JS::HandleObject promise) {
host_api::ObjectStorePendingLookup pending_lookup(handle);

auto res = builtins::KVStore::pending_lookup_handle(self).wait();
auto res = pending_lookup.wait();

if (auto *err = res.to_err()) {
HANDLE_ERROR(cx, *err);
return RejectPromiseWithPendingError(cx, result_promise);
return RejectPromiseWithPendingError(cx, promise);
}

auto ret = res.unwrap();
Expand All @@ -300,15 +254,15 @@ bool KVStore::process_pending_kv_store_lookup(JSContext *cx, JS::HandleObject se
if (!ret.has_value()) {
JS::RootedValue result(cx);
result.setNull();
JS::ResolvePromise(cx, result_promise, result);
JS::ResolvePromise(cx, promise, result);
} else {
JS::RootedObject entry(cx, KVStoreEntry::create(cx, ret.value()));
if (!entry) {
return false;
}
JS::RootedValue result(cx);
result.setObject(*entry);
JS::ResolvePromise(cx, result_promise, result);
JS::ResolvePromise(cx, promise, result);
}

return true;
Expand Down Expand Up @@ -342,12 +296,10 @@ bool KVStore::get(JSContext *cx, unsigned argc, JS::Value *vp) {
}
auto ret = res.unwrap();

JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::PendingLookupHandle),
JS::Int32Value(ret.handle));
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::PendingLookupPromise),
JS::ObjectValue(*result_promise));
auto task = core::AsyncTask::create(cx, ret.handle, self, result_promise,
KVStore::process_pending_kv_store_lookup);

if (!core::EventLoop::queue_async_task(self)) {
if (!core::EventLoop::queue_async_task(std::move(task))) {
return ReturnPromiseRejectedWithPendingError(cx, args);
}

Expand Down
13 changes: 4 additions & 9 deletions runtime/js-compute-runtime/builtins/kv-store.h
Expand Up @@ -39,10 +39,6 @@ class KVStore final : public BuiltinImpl<KVStore> {
static constexpr const char *class_name = "KVStore";
enum class Slots {
KVStore,
PendingLookupPromise,
PendingLookupHandle,
PendingDeletePromise,
PendingDeleteHandle,
Count,
};
static const JSFunctionSpec static_methods[];
Expand All @@ -54,11 +50,10 @@ class KVStore final : public BuiltinImpl<KVStore> {

static bool init_class(JSContext *cx, JS::HandleObject global);
static bool constructor(JSContext *cx, unsigned argc, JS::Value *vp);
static host_api::ObjectStorePendingLookup pending_lookup_handle(JSObject *self);
static bool process_pending_kv_store_lookup(JSContext *cx, JS::HandleObject self);
static host_api::ObjectStorePendingDelete pending_delete_handle(JSObject *self);
static bool process_pending_kv_store_delete(JSContext *cx, JS::HandleObject self);
static bool has_pending_delete_handle(JSObject *self);
static bool process_pending_kv_store_lookup(JSContext *cx, int32_t handle,
JS::HandleObject context, JS::HandleObject promise);
static bool process_pending_kv_store_delete(JSContext *cx, int32_t handle,
JS::HandleObject context, JS::HandleObject promise);
};

} // namespace builtins
Expand Down
111 changes: 108 additions & 3 deletions runtime/js-compute-runtime/builtins/request-response.cpp
@@ -1,11 +1,11 @@
#include "builtins/request-response.h"

#include "builtins/cache-override.h"
#include "builtins/client-info.h"
#include "builtins/fastly.h"
#include "builtins/fetch-event.h"
#include "builtins/kv-store.h"
#include "builtins/native-stream-source.h"
#include "builtins/shared/dom-exception.h"
#include "builtins/shared/url.h"
#include "builtins/transform-stream.h"
#include "core/encode.h"
Expand All @@ -29,8 +29,68 @@
namespace builtins {

namespace {
bool error_stream_controller_with_pending_exception(JSContext *cx, JS::HandleObject controller) {
JS::RootedValue exn(cx);
if (!JS_GetPendingException(cx, &exn))
return false;
JS_ClearPendingException(cx);

JS::RootedValueArray<1> args(cx);
args[0].set(exn);
JS::RootedValue r(cx);
return JS::Call(cx, controller, "error", args, &r);
}
constexpr size_t HANDLE_READ_CHUNK_SIZE = 8192;

bool process_body_read(JSContext *cx, int32_t handle, JS::HandleObject context,
JS::HandleObject promise) {
MOZ_ASSERT(context);
JS::RootedObject streamSource(cx, context);
MOZ_ASSERT(builtins::NativeStreamSource::is_instance(streamSource));
host_api::HttpBody body(handle);
JS::RootedObject owner(cx, builtins::NativeStreamSource::owner(streamSource));
JS::RootedObject controller(cx, builtins::NativeStreamSource::controller(streamSource));

auto read_res = body.read(HANDLE_READ_CHUNK_SIZE);
if (auto *err = read_res.to_err()) {
HANDLE_ERROR(cx, *err);
return error_stream_controller_with_pending_exception(cx, controller);
}

auto &chunk = read_res.unwrap();
if (chunk.len == 0) {
JS::RootedValue r(cx);
return JS::Call(cx, controller, "close", JS::HandleValueArray::empty(), &r);
}

// We don't release control of chunk's data until after we've checked that the array buffer
// allocation has been successful, as that ensures that the return path frees chunk automatically
// when necessary.
JS::RootedObject buffer(
cx, JS::NewArrayBufferWithContents(cx, chunk.len, chunk.ptr.get(),
JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory));
if (!buffer) {
return error_stream_controller_with_pending_exception(cx, controller);
}

// At this point `buffer` has taken full ownership of the chunk's data.
std::ignore = chunk.ptr.release();

JS::RootedObject byte_array(cx, JS_NewUint8ArrayWithBuffer(cx, buffer, 0, chunk.len));
if (!byte_array) {
return false;
}

JS::RootedValueArray<1> enqueue_args(cx);
enqueue_args[0].setObject(*byte_array);
JS::RootedValue r(cx);
if (!JS::Call(cx, controller, "enqueue", enqueue_args, &r)) {
return error_stream_controller_with_pending_exception(cx, controller);
}

return true;
}

// https://fetch.spec.whatwg.org/#concept-method-normalize
// Returns `true` if the method name was normalized, `false` otherwise.
bool normalize_http_method(char *method) {
Expand Down Expand Up @@ -120,6 +180,39 @@ bool enqueue_internal_method(JSContext *cx, JS::HandleObject receiver,

} // namespace

bool RequestOrResponse::process_pending_request(JSContext *cx, int32_t handle,
JS::HandleObject context,
JS::HandleObject promise) {
MOZ_ASSERT(builtins::Request::is_instance(context));
host_api::HttpPendingReq pending(handle);
auto res = pending.wait();
if (auto *err = res.to_err()) {
std::string message = std::move(err->message()).value_or("when attempting to fetch resource.");
builtins::DOMException::raise(cx, message, "NetworkError");
return RejectPromiseWithPendingError(cx, promise);
}

auto [response_handle, body] = res.unwrap();
JS::RootedObject response_instance(cx, JS_NewObjectWithGivenProto(cx, &builtins::Response::class_,
builtins::Response::proto_obj));
if (!response_instance) {
return false;
}

bool is_upstream = true;
bool is_grip_upgrade = false;
JS::RootedObject response(cx,
builtins::Response::create(cx, response_instance, response_handle, body,
is_upstream, is_grip_upgrade, nullptr));
if (!response) {
return false;
}

builtins::RequestOrResponse::set_url(response, builtins::RequestOrResponse::url(context));
JS::RootedValue response_val(cx, JS::ObjectValue(*response));
return JS::ResolvePromise(cx, promise, response_val);
}

bool RequestOrResponse::is_instance(JSObject *obj) {
return Request::is_instance(obj) || Response::is_instance(obj) || KVStoreEntry::is_instance(obj);
}
Expand Down Expand Up @@ -814,8 +907,15 @@ bool RequestOrResponse::body_source_pull_algorithm(JSContext *cx, JS::CallArgs a
// (This deadlock happens in automated tests, but admittedly might not happen
// in real usage.)

if (!core::EventLoop::queue_async_task(source))
JS::RootedObject self(cx, &args.thisv().toObject());
JS::RootedObject owner(cx, builtins::NativeStreamSource::owner(self));
auto task = core::AsyncTask::create(
cx, builtins::RequestOrResponse::body_handle(owner).async_handle().handle, source, nullptr,
process_body_read);

if (!core::EventLoop::queue_async_task(task)) {
return false;
}

args.rval().setUndefined();
return true;
Expand Down Expand Up @@ -862,7 +962,12 @@ bool RequestOrResponse::body_reader_then_handler(JSContext *cx, JS::HandleObject
}

if (Request::is_instance(body_owner)) {
if (!core::EventLoop::queue_async_task(body_owner)) {
JS::RootedObject promise(cx, Request::response_promise(body_owner));
auto task = core::AsyncTask::create(
cx, builtins::Request::pending_handle(body_owner).async_handle().handle, body_owner,
promise, RequestOrResponse::process_pending_request);

if (!core::EventLoop::queue_async_task(task)) {
return false;
}
}
Expand Down
2 changes: 2 additions & 0 deletions runtime/js-compute-runtime/builtins/request-response.h
Expand Up @@ -36,6 +36,8 @@ class RequestOrResponse final {
static void set_manual_framing_headers(JSContext *cx, JSObject *obj, JS::HandleValue url);
static bool body_unusable(JSContext *cx, JS::HandleObject body);
static bool extract_body(JSContext *cx, JS::HandleObject self, JS::HandleValue body_val);
static bool process_pending_request(JSContext *cx, int32_t handle, JS::HandleObject context,
JS::HandleObject promise);

/**
* Returns the RequestOrResponse's Headers if it has been reified, nullptr if
Expand Down

0 comments on commit 68dfec7

Please sign in to comment.